IKEAwin + cookie

This commit is contained in:
va1is 2025-08-25 19:58:58 +03:00
parent 8f32eaa3e1
commit 9568a9c28d
698 changed files with 371 additions and 372 deletions

View File

@ -433,6 +433,15 @@ def extract_data(url: str) -> dict:
try: try:
resp = requests.get(url, headers=HEADERS, timeout=15) resp = requests.get(url, headers=HEADERS, timeout=15)
resp.raise_for_status() resp.raise_for_status()
# 🔎 DEBUG: вывести в консоль базовую информацию об ответе
print("\n=== FETCH DEBUG ===")
print("URL: ", url)
print("Final URL: ", resp.url)
print("Status: ", resp.status_code)
print("ContentType:", resp.headers.get("Content-Type"))
print("Length: ", len(resp.text))
print("Snippet ↓↓↓")
print(resp.text[:1000]) # покажет первые 1000 символов HTML
soup = BeautifulSoup(resp.text, "html.parser") soup = BeautifulSoup(resp.text, "html.parser")
target = soup.select_one(CSS_SELECTOR) target = soup.select_one(CSS_SELECTOR)
@ -510,6 +519,7 @@ def extract_data(url: str) -> dict:
return filtered return filtered
except Exception as e: except Exception as e:
print(e)
return {"url": url, "error": str(e)} return {"url": url, "error": str(e)}
# ───────────────────── ПОСТРОЕНИЕ ВАРИАНТА / POST ───────────────── # ───────────────────── ПОСТРОЕНИЕ ВАРИАНТА / POST ─────────────────

View File

@ -1,15 +1,21 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
import os, json, re, math, time, html, requests, datetime, http.cookiejar as cookiejar """
from collections import Counter IKEA Parser надежный вариант: только Playwright (Chrome, persistent profile),
from typing import List, Optional без requests, без параллельности и ускорителей. По одной вкладке, максимум логов.
"""
import os, sys, re, json, math, time, html as html_mod, datetime, traceback
from typing import Optional
import logging
from logging.handlers import RotatingFileHandler
from urllib import request as urlrequest
from urllib.error import URLError, HTTPError
from bs4 import BeautifulSoup from bs4 import BeautifulSoup
from openpyxl import Workbook from openpyxl import Workbook
from requests.adapters import HTTPAdapter from playwright.sync_api import sync_playwright, TimeoutError as PWTimeout
from urllib3.util.retry import Retry
import logging
import socket
# ───────────────────────── ПУТИ / ФАЙЛЫ ─────────────────────────── # ───────────────────────── ПУТИ / ФАЙЛЫ ───────────────────────────
BASE_DIR = os.path.dirname(os.path.abspath(__file__)) BASE_DIR = os.path.dirname(os.path.abspath(__file__))
@ -21,47 +27,19 @@ OUTPUT_FILE = os.path.join(RECORDS_DIR, "records.xlsx")
DICT_FILE = os.path.join(BASE_DIR, "dictionary_main.txt") DICT_FILE = os.path.join(BASE_DIR, "dictionary_main.txt")
EXCL_FILE = os.path.join(BASE_DIR, "exclusion_materials.txt") EXCL_FILE = os.path.join(BASE_DIR, "exclusion_materials.txt")
POST_LOG = os.path.join(RECORDS_DIR, "post_log.txt") POST_LOG = os.path.join(RECORDS_DIR, "post_log.txt")
FETCH_LOG = os.path.join(RECORDS_DIR, "fetch_debug.log")
COOKIES_TXT = os.path.join(BASE_DIR, "cookies.txt")
# ───────────────────────── ЛОГИРОВАНИЕ ──────────────────────────── LOGS_DIR = os.path.join(RECORDS_DIR, "logs")
logger = logging.getLogger("ikea_parser") HTML_DIR = os.path.join(RECORDS_DIR, "html_debug")
logger.setLevel(logging.DEBUG) JSON_DIR = os.path.join(RECORDS_DIR, "json_debug")
# файл — максимум подробностей PROFILE_DIR = os.path.join(BASE_DIR, "playwright_profile")
fh = logging.FileHandler(FETCH_LOG, encoding="utf-8") os.makedirs(LOGS_DIR, exist_ok=True)
fh.setLevel(logging.DEBUG) os.makedirs(HTML_DIR, exist_ok=True)
fh.setFormatter(logging.Formatter("%(asctime)s [%(levelname)s] %(message)s")) os.makedirs(JSON_DIR, exist_ok=True)
# консоль — INFO os.makedirs(PROFILE_DIR, exist_ok=True)
ch = logging.StreamHandler()
ch.setLevel(logging.INFO)
ch.setFormatter(logging.Formatter("%(message)s"))
logger.addHandler(fh)
logger.addHandler(ch)
logger.info("=== IKEA parser started ===") APP_LOG_FILE = os.path.join(LOGS_DIR, "app.log")
logger.info(f"BASE_DIR={BASE_DIR}")
logger.info(f"Python={os.sys.version}")
try:
logger.info(f"Hostname={socket.gethostname()} IP={socket.gethostbyname(socket.gethostname())}")
except Exception as _e:
logger.info("Hostname/IP: unavailable")
# ───────────────────────── НАСТРОЙКИ POST ───────────────────────── # ───────────────────────── НАСТРОЙКИ ──────────────────────────────
POST_URL = os.getenv("IKEA_POST_URL", "http://localhost:3005/parser/data")
POST_API_KEY = os.getenv("IKEA_POST_API_KEY", "")
POST_TIMEOUT = 20
BATCH_SIZE = 50
# ───────────────────────── НАСТРОЙКИ САЙТА ────────────────────────
HEADERS = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/124.0.0.0 Safari/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "pl-PL,pl;q=0.9,en;q=0.8,ru;q=0.7",
"Cache-Control": "no-cache",
"Pragma": "no-cache",
}
CSS_SELECTOR = ".pip-product__subgrid.product-pip.js-product-pip" CSS_SELECTOR = ".pip-product__subgrid.product-pip.js-product-pip"
BLOCKS = [ BLOCKS = [
@ -106,45 +84,40 @@ KEEP_COLUMNS = [
"url", "url",
] ]
# ───────────────────────── HTTP СЕССИЯ ──────────────────────────── UA = os.getenv(
def make_session() -> requests.Session: "IKEA_UA",
s = requests.Session() "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
s.headers.update(HEADERS) "(KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36"
# игнор системных прокси/mitm переменных окружения Windows )
s.trust_env = False ACCEPT_LANG = os.getenv("IKEA_ACCEPT_LANGUAGE", "pl-PL,pl;q=0.9,en;q=0.8,ru;q=0.7")
retries = Retry(
total=5,
backoff_factor=0.5,
status_forcelist=(403, 429, 500, 502, 503, 504),
allowed_methods=frozenset(["GET", "POST"])
)
s.mount("https://", HTTPAdapter(max_retries=retries))
s.mount("http://", HTTPAdapter(max_retries=retries))
return s
SESSION = make_session() # Playwright запускаем Chrome, persistent profile; по умолчанию не headless —
# так Cloudflare реже блокирует.
HEADLESS = os.getenv("IKEA_HEADLESS", "0") not in {"0", "false", "False", ""}
def load_netscape_cookies(session: requests.Session, path: str): # POST/API без requests
if os.path.isfile(path): POST_URL = os.getenv("IKEA_POST_URL", "http://localhost:3005/parser/data")
cj = cookiejar.MozillaCookieJar() POST_API_KEY = os.getenv("IKEA_POST_API_KEY", "")
try: POST_TIMEOUT = int(os.getenv("IKEA_POST_TIMEOUT", "20"))
cj.load(path, ignore_discard=True, ignore_expires=True) BATCH_SIZE = int(os.getenv("IKEA_BATCH_SIZE", "50"))
session.cookies.update(cj)
logger.info(f"🍪 Cookies loaded: {path} ({len(cj)} pcs)")
except Exception as e:
logger.warning(f"⚠️ Failed to load cookies.txt: {e}")
else:
logger.info("cookies.txt not found — proceeding without external cookies")
load_netscape_cookies(SESSION, COOKIES_TXT) # ───────────────────────── ЛОГИРОВАНИЕ ────────────────────────────
logger = logging.getLogger("ikea_pw_simple")
logger.setLevel(logging.DEBUG)
_fmt = logging.Formatter("%(asctime)s [%(levelname)s] %(message)s")
# ───────────────────────── УТИЛИТЫ I/O ──────────────────────────── fh = RotatingFileHandler(APP_LOG_FILE, maxBytes=2_000_000, backupCount=3, encoding="utf-8")
def ask_bool(prompt: str, default: str = "1") -> bool: fh.setFormatter(_fmt)
try: fh.setLevel(logging.DEBUG)
val = input(f"{prompt} (1=yes, 0=no) [{default}]: ").strip() or default logger.addHandler(fh)
except EOFError:
val = default ch = logging.StreamHandler(sys.stdout)
return val == "1" ch.setFormatter(_fmt)
ch.setLevel(logging.INFO)
logger.addHandler(ch)
def _now_tag():
return datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
def _post_log(msg: str): def _post_log(msg: str):
try: try:
@ -153,50 +126,30 @@ def _post_log(msg: str):
except Exception: except Exception:
pass pass
def _now_tag():
return datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
def _save_json_batch(payload: dict, batch_index: int): def _save_json_batch(payload: dict, batch_index: int):
fname = f"ikea_batch_{_now_tag()}_{batch_index:04d}.json" fname = f"ikea_batch_{_now_tag()}_{batch_index:04d}.json"
fpath = os.path.join(RECORDS_DIR, fname) fpath = os.path.join(JSON_DIR, fname)
with open(fpath, "w", encoding="utf-8") as fh: with open(fpath, "w", encoding="utf-8") as fh:
json.dump(payload, fh, ensure_ascii=False, indent=2) json.dump(payload, fh, ensure_ascii=False, indent=2)
logger.info(f"💾 JSON saved: {fname}") logger.debug(f"💾 JSON saved: {fpath}")
return fpath return fpath
def _safe_name_from_url(url: str) -> str: def _save_html_snapshot(prefix: str, idx: int, content: str):
return re.sub(r"[^a-zA-Z0-9]+", "_", url)[:80] fname = f"{idx:04d}_{prefix}_{_now_tag()}.html"
fpath = os.path.join(HTML_DIR, fname)
def _dump_meta(prefix: str, url: str, status: int, elapsed: float, text_len: int, final_url: str, headers: dict, note: str = ""):
try: try:
base = f"{prefix}_{_now_tag()}_{_safe_name_from_url(url)}"
meta = os.path.join(RECORDS_DIR, base + ".meta.txt")
with open(meta, "w", encoding="utf-8") as fh:
fh.write(f"URL: {url}\n")
fh.write(f"FINAL_URL: {final_url}\n")
fh.write(f"STATUS: {status}\n")
fh.write(f"ELAPSED_SEC: {elapsed:.3f}\n")
fh.write(f"RESP_LEN: {text_len}\n")
fh.write(f"NOTE: {note}\n")
fh.write("HEADERS:\n")
for k, v in headers.items():
hv = v if isinstance(v, str) else str(v)
fh.write(f" {k}: {hv}\n")
except Exception as e:
logger.debug(f"Meta dump failed: {e}")
def _save_debug_html(url: str, text: str, prefix: str = "debug", note: str = "", status: Optional[int] = None, elapsed: Optional[float] = None, headers: Optional[dict] = None, final_url: Optional[str] = None):
try:
base = f"{prefix}_{_now_tag()}_{_safe_name_from_url(url)}"
fpath = os.path.join(RECORDS_DIR, base + ".html")
with open(fpath, "w", encoding="utf-8") as fh: with open(fpath, "w", encoding="utf-8") as fh:
fh.write(text) fh.write(content)
logger.info(f"🧪 Saved HTML snapshot: {os.path.basename(fpath)}") logger.debug("🧪 HTML snapshot: %s", fpath)
# мета рядом except Exception:
if status is not None and headers is not None and final_url is not None and elapsed is not None: logger.exception("Failed to save HTML snapshot")
_dump_meta(prefix, url, status, elapsed, len(text or ""), final_url, headers, note=note)
except Exception as e: def ask_bool(prompt: str, default: str = "1") -> bool:
logger.debug(f"HTML dump failed: {e}") try:
val = input(f"{prompt} (1=yes, 0=no) [{default}]: ").strip() or default
except EOFError:
val = default
return val == "1"
# ───────────────────────── СЛОВАРИ / ФИЛЬТРЫ ────────────────────── # ───────────────────────── СЛОВАРИ / ФИЛЬТРЫ ──────────────────────
def load_dictionary(path: str) -> dict: def load_dictionary(path: str) -> dict:
@ -207,7 +160,6 @@ def load_dictionary(path: str) -> dict:
return {k: v for k, v in pairs} return {k: v for k, v in pairs}
DICT = load_dictionary(DICT_FILE) DICT = load_dictionary(DICT_FILE)
def translate_token(token: str) -> str: def translate_token(token: str) -> str:
return DICT.get(token, token) return DICT.get(token, token)
@ -221,8 +173,8 @@ def load_exclusions(path: str) -> set:
EXCLUSIONS = load_exclusions(EXCL_FILE) EXCLUSIONS = load_exclusions(EXCL_FILE)
def materials_from_details_json(details: dict) -> List[str]: def materials_from_details_json(details: dict) -> list[str]:
out: List[str] = [] out: list[str] = []
def walk(node): def walk(node):
if isinstance(node, dict): if isinstance(node, dict):
for k, v in node.items(): for k, v in node.items():
@ -473,62 +425,89 @@ def build_variant_color_measure(desc: str, type_name: str, measurement: str) ->
if t: if t:
pattern = r"^\s*" + re.escape(t) + r"[\s,;:\-–—/]*" pattern = r"^\s*" + re.escape(t) + r"[\s,;:\-–—/]*"
s = re.sub(pattern, "", s, flags=re.IGNORECASE) s = re.sub(pattern, "", s, flags=re.IGNORECASE)
if not re.search(r"[0-9A-Za-zА-Яа-яЁёÀ-ž]", s or ""): if not re.search(r"[0-9A-Za-zА-Яа-яЁёÀ-ž]", s or ""):
s = "" s = ""
s = s.strip() s = s.strip()
meas = (measurement or "").strip() meas = (measurement or "").strip()
if not s: if not s:
return meas if meas else "" return meas if meas else ""
s = s[:1].upper() + s[1:] s = s[:1].upper() + s[1:]
return f"{s}, {meas}" if meas else s return f"{s}, {meas}" if meas else s
# ───────────────────── СКРАПИНГ КАРТОЧКИ ────────────────────────── # ───────────────────── Playwright — одна вкладка ──────────────────
def extract_data(url: str, force_dump: bool = False) -> dict: def open_browser():
pw = sync_playwright().start()
# persistent Chrome: Cloudflare к нему относится лояльнее
ctx = pw.chromium.launch_persistent_context(
PROFILE_DIR,
headless=HEADLESS,
channel="chrome", # важное отличие
user_agent=UA,
locale="pl-PL",
java_script_enabled=True,
accept_downloads=False,
viewport={"width": 1366, "height": 864},
# можно добавить proxy={"server": "..."} при необходимости
)
page = ctx.new_page()
# базовые заголовки (совпадают с реальным браузером)
page.set_extra_http_headers({"Accept-Language": ACCEPT_LANG})
return pw, ctx, page
def close_browser(pw, ctx):
try: try:
logger.debug(f"GET {url}") ctx.close()
pw.stop()
except Exception:
pass
def fetch_page(page, url: str, idx: int) -> tuple[str, Optional[str]]:
"""
Возвращает (full_html, hydration_raw_json_or_None).
Сохраняет снапшот, если не нашли data-hydration-props.
"""
t0 = time.time() t0 = time.time()
resp = SESSION.get(url, timeout=25, allow_redirects=True) resp = page.goto(url, wait_until="domcontentloaded", timeout=60_000)
elapsed = time.time() - t0 status = resp.status if resp else 0
status = resp.status_code # ждём селектор, но не слишком долго
final_url = str(getattr(resp, "url", url))
text_len = len(resp.text or "")
logger.info(f"HTTP {status} {final_url} ({elapsed:.2f}s, {text_len} bytes)")
# Всегда сохраняем первые (force_dump=True) или любую «сомнительную» страницу
need_dump = force_dump or status != 200 or ("data-hydration-props" not in resp.text)
if need_dump:
note = "force_dump" if force_dump else ("no_hydration" if "data-hydration-props" not in resp.text else f"status_{status}")
_save_debug_html(url, resp.text, prefix=f"resp{status}", note=note, status=status, elapsed=elapsed, headers=resp.headers, final_url=final_url)
resp.raise_for_status()
soup = BeautifulSoup(resp.text, "html.parser")
target = soup.select_one(CSS_SELECTOR)
if not target:
logger.warning("CSS selector NOT FOUND")
_save_debug_html(url, resp.text, prefix="no_selector", note="css_selector_missing", status=status, elapsed=elapsed, headers=resp.headers, final_url=final_url)
return {"url": url, "error": "CSS selector not found", "http_status": status}
raw = target.get("data-hydration-props")
if not raw:
logger.warning("data-hydration-props NOT FOUND")
_save_debug_html(url, resp.text, prefix="no_hydration", note="attribute_missing", status=status, elapsed=elapsed, headers=resp.headers, final_url=final_url)
return {"url": url, "error": "data-hydration-props not found", "http_status": status}
decoded = html.unescape(raw)
try: try:
page.wait_for_selector(CSS_SELECTOR, timeout=25_000, state="attached")
except PWTimeout:
pass
full_html = page.content()
# прямой атрибут
raw = None
try:
el = page.locator(CSS_SELECTOR).first
raw = el.get_attribute("data-hydration-props")
except Exception:
raw = None
elapsed = time.time() - t0
logger.info("PW %s status=%s %.2fs len=%s", url, status, elapsed, len(full_html or ""))
# если Cloudflare/403 — сохраним снапшот для диагностики
if not raw:
_save_html_snapshot("no_hydration", idx, full_html or "")
return full_html or "", raw
# ───────────────────── Парсинг страницы ───────────────────────────
def parse_page(url: str, full_html: str, raw_json: Optional[str]) -> dict:
if not full_html:
return {"url": url, "error": "no html"}
soup = BeautifulSoup(full_html, "html.parser")
# Fallback: если не пришёл raw, попробуем из DOM
if not raw_json:
target = soup.select_one(CSS_SELECTOR)
if target:
raw_json = target.get("data-hydration-props")
if not raw_json:
return {"url": url, "error": "data-hydration-props not found"}
try:
decoded = html_mod.unescape(raw_json)
full_json = json.loads(decoded) full_json = json.loads(decoded)
except Exception as je: except Exception as e:
logger.error(f"JSON decode error: {je}") return {"url": url, "error": f"json decode error: {e}"}
# сохраним кусок для анализа
sample_name = f"bad_json_{_now_tag()}_{_safe_name_from_url(url)}.txt"
with open(os.path.join(RECORDS_DIR, sample_name), "w", encoding="utf-8") as fh:
fh.write(decoded[:20000])
return {"url": url, "error": f"json decode error: {je}", "http_status": status}
result = {"url": url} result = {"url": url}
for block in BLOCKS: for block in BLOCKS:
@ -574,71 +553,148 @@ def extract_data(url: str, force_dump: bool = False) -> dict:
if breadcrumb: if breadcrumb:
result["categoryBreadcrumb"] = breadcrumb result["categoryBreadcrumb"] = breadcrumb
# применяем whitelist
filtered = {k: result.get(k) for k in KEEP_COLUMNS if k != "originalName"} filtered = {k: result.get(k) for k in KEEP_COLUMNS if k != "originalName"}
# originalName = productName + " " + typeName
pn = (result.get("buyModule.productName") or "").strip() pn = (result.get("buyModule.productName") or "").strip()
tn = (result.get("stockcheckSection.typeName") or "").strip() tn = (result.get("stockcheckSection.typeName") or "").strip()
if pn and tn: filtered["originalName"] = (f"{pn} {tn}".strip() or pn or tn)
orig_name = f"{pn} {tn}"
else:
orig_name = pn or tn
filtered["originalName"] = orig_name
return filtered return filtered
except Exception as e: # ───────────────────── POST (urllib) ──────────────────────────────
logger.error(f"Request error for {url}: {e}") def post_payload(payload: dict) -> dict:
return {"url": url, "error": str(e), "http_status": None} headers = {"Content-Type": "application/json; charset=utf-8"}
if POST_API_KEY:
# ───────────────────── ПОСТРОЕНИЕ ВАРИАНТА / POST ───────────────── headers["Authorization"] = f"Bearer {POST_API_KEY}"
def _split_color_size(text: str): body = json.dumps(payload, ensure_ascii=False).encode("utf-8")
if not text: req = urlrequest.Request(POST_URL, data=body, headers=headers, method="POST")
return "", "" _post_log(f"→ POST {POST_URL}\nHeaders: {headers}\nBody: {body.decode('utf-8')}")
parts = [p.strip() for p in text.split(",", 1)]
if len(parts) == 2:
return parts[0], parts[1]
return "", parts[0]
def _ceil_price(v):
try: try:
return int(math.ceil(float(v))) with urlrequest.urlopen(req, timeout=POST_TIMEOUT) as resp:
except Exception: txt = resp.read().decode("utf-8", errors="replace")
return None code = resp.getcode()
_post_log(f"{code}\n{txt}\n{'-'*60}")
return {"ok": 200 <= code < 300, "status": code, "response": txt}
except HTTPError as e:
txt = e.read().decode("utf-8", errors="replace")
_post_log(f"{e.code}\n{txt}\n{'-'*60}")
return {"ok": False, "status": e.code, "response": txt}
except URLError as e:
_post_log(f"× ERROR: {e}\n{'-'*60}")
return {"ok": False, "status": None, "error": str(e)}
def _ceil_int(v): # ───────────────────────── СЕРДЦЕ СКРИПТА ─────────────────────────
def safe_cell(val):
if isinstance(val, (dict, list)):
return json.dumps(val, ensure_ascii=False)
return "" if val is None else val
def main():
logger.info("Playwright-only (simple). BASE_DIR=%s", BASE_DIR)
logger.info("Python=%s", sys.version.replace("\n", " "))
logger.info("POST_URL=%s OUTPUT_FILE=%s", POST_URL, OUTPUT_FILE)
logger.info("HEADLESS=%s UA=%s Accept-Language=%s", HEADLESS, UA, ACCEPT_LANG)
SAVE_JSON = ask_bool("SAVE_JSON (сохранять JSON на диск?)", "1")
SEND_JSON = ask_bool("SEND_JSON (отправлять на API?)", "1")
with open(INPUT_FILE, "r", encoding="utf-8") as f:
links = [line.strip() for line in f if line.strip()]
print(f"Всего ссылок: {len(links)}")
wb = Workbook()
ws = wb.active
ws.title = "IKEA Products"
ws.append(KEEP_COLUMNS)
batch_items = []
batch_index = 1
def flush_batch():
nonlocal batch_items, batch_index
if not batch_items:
return
payload = {"parserName": "ikea", "items": batch_items}
if SAVE_JSON:
_save_json_batch(payload, batch_index)
if SEND_JSON:
res = post_payload(payload)
ok = res.get("ok")
print(f"POST batch {batch_index}: {'OK' if ok else 'FAIL'} (status={res.get('status')})")
logger.info("POST batch %d: %s (status=%s)", batch_index, "OK" if ok else "FAIL", res.get("status"))
batch_index += 1
batch_items = []
pw, ctx, page = open_browser()
try: try:
return int(math.ceil(float(v))) for idx, url in enumerate(links, 1):
print(f"[{idx}/{len(links)}] {url}")
try:
full_html, raw = fetch_page(page, url, idx)
except Exception: except Exception:
return None logger.exception("Fetch error for %s", url)
continue
row = parse_page(url, full_html, raw)
if row.get("error"):
logger.warning("Extract error [%d] %s: %s", idx, url, row["error"])
# Excel
ws.append([safe_cell(row.get(col, "")) for col in KEEP_COLUMNS])
# Фильтры для JSON/API
try:
price = float(row.get("buyModule.productPrice") or 0)
except Exception:
price = 0.0
try:
total_kg = float(row.get("total brutto") or 0)
except Exception:
total_kg = 0.0
details_json = row.get("productInformationSection.productDetailsProps") or {}
if not (20 <= price <= 1500):
pass
elif total_kg > 30:
pass
elif materials_match_exclusions(details_json, EXCLUSIONS):
pass
else:
# build variant (минимально как раньше)
def _ceil_price(v):
try: return int(math.ceil(float(v)))
except: return None
def _ceil_int(v):
try: return int(math.ceil(float(v)))
except: return None
def build_variant(row: dict) -> dict:
visible = row.get("productSummary.visibleItemNo") or "" visible = row.get("productSummary.visibleItemNo") or ""
sku = visible.replace(" ", "") sku = visible.replace(" ", "")
csm = (row.get("prductVariantColorMeasure") or "").strip() csm = (row.get("prductVariantColorMeasure") or "").strip()
color, size = _split_color_size(csm) color, size = "", ""
if csm:
parts = [p.strip() for p in csm.split(",", 1)]
if len(parts) == 2:
color, size = parts[0], parts[1]
else:
size = parts[0]
if not color and not size: if not color and not size:
size = (row.get("pipPricePackage.measurementText") or "").strip() size = (row.get("pipPricePackage.measurementText") or "").strip()
cost = _ceil_price(row.get("buyModule.productPrice")) cost = _ceil_price(row.get("buyModule.productPrice"))
url = row.get("url") or ""
name = row.get("originalName") or row.get("buyModule.productName") or "" name = row.get("originalName") or row.get("buyModule.productName") or ""
desc_html = row.get("productInformationSection.productDetailsProps_formatted_html") or "" desc_html = row.get("productInformationSection.productDetailsProps_formatted_html") or ""
composition_html = row.get("productInformationSection.dimensionProps_formatted_html_translated") or "" composition_html = row.get("productInformationSection.dimensionProps_formatted_html_translated") or ""
imgs = [] imgs = []
raw_imgs = row.get("productGallery.urls") or "" raw_imgs = row.get("productGallery.urls") or ""
if isinstance(raw_imgs, str): if isinstance(raw_imgs, str):
imgs = [x for x in raw_imgs.split("\n") if x.strip()] imgs = [x for x in raw_imgs.split("\n") if x.strip()]
in_stock = bool(row.get("availabilityGroup.serverOnlineSellable")) or bool(row.get("buyModule.onlineSellable")) in_stock = bool(row.get("availabilityGroup.serverOnlineSellable")) or bool(row.get("buyModule.onlineSellable"))
weight_kg = _ceil_int(row.get("total brutto")) weight_kg = _ceil_int(row.get("total brutto"))
variant = { item = {
"category": {"name": "TEST/IKEA"},
"brand": {"name": "ikea"},
"variant": {
"status_id": 1, "status_id": 1,
"color": color.capitalize() if color else "none", "color": color.capitalize() if color else "none",
"sku": sku, "sku": sku,
@ -651,143 +707,31 @@ def build_variant(row: dict) -> dict:
"images": imgs, "images": imgs,
"inStock": in_stock, "inStock": in_stock,
"weight": weight_kg if weight_kg is not None else 0, "weight": weight_kg if weight_kg is not None else 0,
},
} }
return {
"category": {"name": "TEST/IKEA"}, # временно по вашему ТЗ
"brand": {"name": "ikea"},
"variant": variant,
}
def post_payload(payload: dict) -> dict:
headers = {"Content-Type": "application/json"}
if POST_API_KEY:
headers["Authorization"] = f"Bearer {POST_API_KEY}"
body = json.dumps(payload, ensure_ascii=False)
_post_log(f"→ POST {POST_URL}\nHeaders: {headers}\nBody: {body}")
try:
r = SESSION.post(POST_URL, headers=headers, data=body.encode("utf-8"), timeout=POST_TIMEOUT)
text = r.text
_post_log(f"{r.status_code}\n{text}\n{'-'*60}")
ok = 200 <= r.status_code < 300
return {"ok": ok, "status": r.status_code, "response": text}
except Exception as e:
_post_log(f"× ERROR: {e}\n{'-'*60}")
return {"ok": False, "status": None, "error": str(e)}
# ───────────────────────── СЕРДЦЕ СКРИПТА ─────────────────────────
def safe_cell(val):
if isinstance(val, (dict, list)):
return json.dumps(val, ensure_ascii=False)
return "" if val is None else val
def _clean_url(u: str) -> str:
if not isinstance(u, str):
return ""
u = u.strip().replace("\t", " ")
u = u.replace("\ufeff", "").replace("\xa0", "")
u = u.strip("\r\n ")
return u
def main():
logger.info(f"POST_URL={POST_URL} OUTPUT_FILE={OUTPUT_FILE}")
SAVE_JSON = ask_bool("SAVE_JSON (сохранять JSON на диск?)", "1")
SEND_JSON = ask_bool("SEND_JSON (отправлять на API?)", "1")
# читаем ссылки (utf-8-sig для BOM)
with open(INPUT_FILE, "r", encoding="utf-8-sig", newline="") as f:
raw_lines = f.readlines()
links = [_clean_url(x) for x in raw_lines if _clean_url(x)]
logger.info(f"Всего ссылок: {len(links)}")
if not links:
logger.warning("Список ссылок пуст — проверьте product_links.txt")
# готовим Excel
wb = Workbook()
ws = wb.active
ws.title = "IKEA Products"
ws.append(KEEP_COLUMNS)
# батч для JSON/API
batch_items = []
batch_index = 1
STATUS_COUNTER = Counter()
def flush_batch():
nonlocal batch_items, batch_index
if not batch_items:
return
payload = {"parserName": "ikea", "items": batch_items}
if SAVE_JSON:
_save_json_batch(payload, batch_index)
if SEND_JSON:
res = post_payload(payload)
ok = res.get("ok")
logger.info(f"POST batch {batch_index}: {'OK' if ok else 'FAIL'} (status={res.get('status')})")
batch_index += 1
batch_items = []
for idx, link in enumerate(links, 1):
logger.info(f"[{idx}/{len(links)}] {link}")
force_dump = idx <= 3 # ← Принудительно сохраняем HTML для первых 3 ссылок
row = extract_data(link, force_dump=force_dump)
st = row.get("http_status")
if st is None and "error" in row:
STATUS_COUNTER["err"] += 1
else:
STATUS_COUNTER[str(st or 200)] += 1
# пишем в Excel
ws.append([safe_cell(row.get(col, "")) for col in KEEP_COLUMNS])
# ФИЛЬТРЫ для JSON/API
try:
price = float(row.get("buyModule.productPrice") or 0)
except Exception:
price = 0.0
try:
total_kg = float(row.get("total brutto") or 0)
except Exception:
total_kg = 0.0
details_json = row.get("productInformationSection.productDetailsProps") or {}
if not (20 <= price <= 1500):
logger.debug(f"Skip by price: {price}")
elif total_kg > 30:
logger.debug(f"Skip by weight: {total_kg} kg")
elif materials_match_exclusions(details_json, EXCLUSIONS):
logger.debug("Skip by exclusions (materials)")
else:
try:
item = build_variant(row)
batch_items.append(item) batch_items.append(item)
except Exception as e:
logger.error(f"build_variant error for {link}: {e}")
_post_log(f"× build_variant error for {link}: {e}")
# авто-сейв Excel каждые 50 строк
if idx % 50 == 0: if idx % 50 == 0:
wb.save(OUTPUT_FILE) wb.save(OUTPUT_FILE)
logger.info(f"💾 autosave: {OUTPUT_FILE}") print(f"💾 autosave: {OUTPUT_FILE}")
# флаш батча при достижении лимита
if len(batch_items) >= BATCH_SIZE: if len(batch_items) >= BATCH_SIZE:
flush_batch() flush_batch()
# финал
wb.save(OUTPUT_FILE) wb.save(OUTPUT_FILE)
logger.info(f"\n✅ Excel готов: {OUTPUT_FILE}") print(f"\n✅ Excel готов: {OUTPUT_FILE}")
flush_batch() flush_batch()
logger.info(f"HTTP stats: {dict(STATUS_COUNTER)}") finally:
logger.info("🎯 Готово.") close_browser(pw, ctx)
logger.info("Playwright closed. Bye.")
if __name__ == "__main__": if __name__ == "__main__":
try:
main() main()
except KeyboardInterrupt:
print("\nInterrupted by user.")
logger.warning("Interrupted by user")
except Exception:
logger.exception("Fatal error")
raise

View File

@ -0,0 +1 @@
{"disable-features":"AcceptCHFrame,AutoExpandDetailsElement,AvoidUnnecessaryBeforeUnloadCheckSync,CertificateTransparencyComponentUpdater,DestroyProfileOnBrowserClose,DialMediaRouteProvider,GlobalMediaControls,HttpsUpgrades,ImprovedCookieControls,LazyFrameLoading,MediaRouter,PaintHolding,PlzDedicatedWorker,Translate","enable-features":"UkmSamplingRate\u003CUkmSamplingRate","force-fieldtrial-params":"UkmSamplingRate.Sampled_NoSeed_Stable:_default_sampling/1000000","force-fieldtrials":"*SeedFileTrial/Default/UkmSamplingRate/Sampled_NoSeed_Stable"}

Some files were not shown because too many files have changed in this diff Show More