#!/usr/bin/env python3 # -*- coding: utf-8 -*- import os, json, re, math, time, html, requests, datetime, http.cookiejar as cookiejar from collections import Counter from typing import List, Optional from bs4 import BeautifulSoup from openpyxl import Workbook from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry import logging import socket # ───────────────────────── ПУТИ / ФАЙЛЫ ─────────────────────────── BASE_DIR = os.path.dirname(os.path.abspath(__file__)) RECORDS_DIR = os.path.join(BASE_DIR, "records_folder") os.makedirs(RECORDS_DIR, exist_ok=True) INPUT_FILE = os.path.join(BASE_DIR, "product_links.txt") OUTPUT_FILE = os.path.join(RECORDS_DIR, "records.xlsx") DICT_FILE = os.path.join(BASE_DIR, "dictionary_main.txt") EXCL_FILE = os.path.join(BASE_DIR, "exclusion_materials.txt") POST_LOG = os.path.join(RECORDS_DIR, "post_log.txt") FETCH_LOG = os.path.join(RECORDS_DIR, "fetch_debug.log") COOKIES_TXT = os.path.join(BASE_DIR, "cookies.txt") # ───────────────────────── ЛОГИРОВАНИЕ ──────────────────────────── logger = logging.getLogger("ikea_parser") logger.setLevel(logging.DEBUG) # файл — максимум подробностей fh = logging.FileHandler(FETCH_LOG, encoding="utf-8") fh.setLevel(logging.DEBUG) fh.setFormatter(logging.Formatter("%(asctime)s [%(levelname)s] %(message)s")) # консоль — INFO ch = logging.StreamHandler() ch.setLevel(logging.INFO) ch.setFormatter(logging.Formatter("%(message)s")) logger.addHandler(fh) logger.addHandler(ch) logger.info("=== IKEA parser started ===") logger.info(f"BASE_DIR={BASE_DIR}") logger.info(f"Python={os.sys.version}") try: logger.info(f"Hostname={socket.gethostname()} IP={socket.gethostbyname(socket.gethostname())}") except Exception as _e: logger.info("Hostname/IP: unavailable") # ───────────────────────── НАСТРОЙКИ POST ───────────────────────── POST_URL = os.getenv("IKEA_POST_URL", "http://localhost:3005/parser/data") POST_API_KEY = os.getenv("IKEA_POST_API_KEY", "") POST_TIMEOUT = 20 BATCH_SIZE = 50 # ───────────────────────── НАСТРОЙКИ САЙТА ──────────────────────── HEADERS = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/124.0.0.0 Safari/537.36", "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", "Accept-Language": "pl-PL,pl;q=0.9,en;q=0.8,ru;q=0.7", "Cache-Control": "no-cache", "Pragma": "no-cache", } CSS_SELECTOR = ".pip-product__subgrid.product-pip.js-product-pip" BLOCKS = [ "buyModule", "productSummary", "pipPricePackage", "productInformationSection", "keyFacts", "stockcheckSection", "availabilityGroup", "productGallery", ] KEEP_COLUMNS = [ "availabilityGroup.serverOnlineSellable", "availabilityGroup.storeHeader", "buyModule.onlineSellable", "buyModule.productName", "buyModule.productPrice", "buyModule.productType", "keyFacts.ariaLabels", "keyFacts.gaLabel", "keyFacts.keyFacts", "keyFacts.keyFacts_formatted", "pipPricePackage.measurementText", "pipPricePackage.productDescription", "productGallery.urls", "productInformationSection.dimensionProps", "productInformationSection.dimensionProps_formatted", "productInformationSection.dimensionProps_formatted_html_translated", "productInformationSection.productDetailsProps", "productInformationSection.productDetailsProps_formatted", "productInformationSection.productDetailsProps_formatted_html", "productSummary.description", "productSummary.visibleItemNo", "stockcheckSection.packagingProps", "stockcheckSection.typeName", "total brutto", "prductVariantColorMeasure", "categoryBreadcrumb", "originalName", "url", ] # ───────────────────────── HTTP СЕССИЯ ──────────────────────────── def make_session() -> requests.Session: s = requests.Session() s.headers.update(HEADERS) # игнор системных прокси/mitm переменных окружения Windows s.trust_env = False retries = Retry( total=5, backoff_factor=0.5, status_forcelist=(403, 429, 500, 502, 503, 504), allowed_methods=frozenset(["GET", "POST"]) ) s.mount("https://", HTTPAdapter(max_retries=retries)) s.mount("http://", HTTPAdapter(max_retries=retries)) return s SESSION = make_session() def load_netscape_cookies(session: requests.Session, path: str): if os.path.isfile(path): cj = cookiejar.MozillaCookieJar() try: cj.load(path, ignore_discard=True, ignore_expires=True) session.cookies.update(cj) logger.info(f"🍪 Cookies loaded: {path} ({len(cj)} pcs)") except Exception as e: logger.warning(f"⚠️ Failed to load cookies.txt: {e}") else: logger.info("cookies.txt not found — proceeding without external cookies") load_netscape_cookies(SESSION, COOKIES_TXT) # ───────────────────────── УТИЛИТЫ I/O ──────────────────────────── def ask_bool(prompt: str, default: str = "1") -> bool: try: val = input(f"{prompt} (1=yes, 0=no) [{default}]: ").strip() or default except EOFError: val = default return val == "1" def _post_log(msg: str): try: with open(POST_LOG, "a", encoding="utf-8") as f: f.write(msg.rstrip() + "\n") except Exception: pass def _now_tag(): return datetime.datetime.now().strftime("%Y%m%d_%H%M%S") def _save_json_batch(payload: dict, batch_index: int): fname = f"ikea_batch_{_now_tag()}_{batch_index:04d}.json" fpath = os.path.join(RECORDS_DIR, fname) with open(fpath, "w", encoding="utf-8") as fh: json.dump(payload, fh, ensure_ascii=False, indent=2) logger.info(f"💾 JSON saved: {fname}") return fpath def _safe_name_from_url(url: str) -> str: return re.sub(r"[^a-zA-Z0-9]+", "_", url)[:80] def _dump_meta(prefix: str, url: str, status: int, elapsed: float, text_len: int, final_url: str, headers: dict, note: str = ""): try: base = f"{prefix}_{_now_tag()}_{_safe_name_from_url(url)}" meta = os.path.join(RECORDS_DIR, base + ".meta.txt") with open(meta, "w", encoding="utf-8") as fh: fh.write(f"URL: {url}\n") fh.write(f"FINAL_URL: {final_url}\n") fh.write(f"STATUS: {status}\n") fh.write(f"ELAPSED_SEC: {elapsed:.3f}\n") fh.write(f"RESP_LEN: {text_len}\n") fh.write(f"NOTE: {note}\n") fh.write("HEADERS:\n") for k, v in headers.items(): hv = v if isinstance(v, str) else str(v) fh.write(f" {k}: {hv}\n") except Exception as e: logger.debug(f"Meta dump failed: {e}") def _save_debug_html(url: str, text: str, prefix: str = "debug", note: str = "", status: Optional[int] = None, elapsed: Optional[float] = None, headers: Optional[dict] = None, final_url: Optional[str] = None): try: base = f"{prefix}_{_now_tag()}_{_safe_name_from_url(url)}" fpath = os.path.join(RECORDS_DIR, base + ".html") with open(fpath, "w", encoding="utf-8") as fh: fh.write(text) logger.info(f"🧪 Saved HTML snapshot: {os.path.basename(fpath)}") # мета рядом if status is not None and headers is not None and final_url is not None and elapsed is not None: _dump_meta(prefix, url, status, elapsed, len(text or ""), final_url, headers, note=note) except Exception as e: logger.debug(f"HTML dump failed: {e}") # ───────────────────────── СЛОВАРИ / ФИЛЬТРЫ ────────────────────── def load_dictionary(path: str) -> dict: if not os.path.isfile(path): return {} txt = open(path, "r", encoding="utf-8").read() pairs = re.findall(r'"([^"]+)"\s*:\s*"([^"]+)"', txt) return {k: v for k, v in pairs} DICT = load_dictionary(DICT_FILE) def translate_token(token: str) -> str: return DICT.get(token, token) def load_exclusions(path: str) -> set: if not os.path.isfile(path): return set() txt = open(path, "r", encoding="utf-8").read() quoted = re.findall(r'"([^"]+)"', txt, flags=re.S) tokens = quoted if quoted else re.split(r"[,;\n\r]+", txt) return {t.strip().lower() for t in tokens if t.strip()} EXCLUSIONS = load_exclusions(EXCL_FILE) def materials_from_details_json(details: dict) -> List[str]: out: List[str] = [] def walk(node): if isinstance(node, dict): for k, v in node.items(): if k == "material" and isinstance(v, str): out.append(v) else: walk(v) elif isinstance(node, list): for x in node: walk(x) walk(details or {}) return out def materials_match_exclusions(details: dict, exclusion_tokens: set) -> bool: if not exclusion_tokens: return False mats = materials_from_details_json(details) joined = "\n".join(mats).lower() return any(tok in joined for tok in exclusion_tokens) # ───────────────────────── ФОРМАТТЕРЫ ───────────────────────────── def _parse_json_value(val): if isinstance(val, (dict, list)) or val is None: return val if isinstance(val, str): s = val.strip() if not s: return val try: return json.loads(s) except Exception: return val return val def flatten_block(block_name, data): if not isinstance(data, dict): return {} flat = {} for k, v in data.items(): if block_name == "productGallery" and k == "mediaList": if isinstance(v, list): urls = [] for item in v: content = item.get("content", {}) if isinstance(content, dict) and "url" in content: urls.append(content["url"]) flat["productGallery.urls"] = "\n".join(urls) return flat key = f"{block_name}.{k}" flat[key] = v return flat def format_keyfacts(raw_keyfacts): if not isinstance(raw_keyfacts, list): return "" out = [] header_added = False for el in raw_keyfacts: lbl = (el or {}).get("label") name = (el or {}).get("name", "Właściwości") if not header_added: out.append(name) header_added = True if lbl: out.append(lbl) return "\n".join(out) def _fmt_float(x): try: return f"{float(x):.2f}".rstrip("0").rstrip(".") except Exception: return "" def _collect_packaging_total_kg(packaging): total = 0.0 if not isinstance(packaging, dict): return total content = (packaging.get("contentProps") or {}).get("packages") or [] for pkg in content: qty = ((pkg.get("quantity") or {}).get("value")) or 1 ms = pkg.get("measurements") or [] for block in ms: if not isinstance(block, list): continue weight_lbl = next((m for m in block if (m.get("type") == "weight" or m.get("label") == "Waga")), None) if weight_lbl and isinstance(weight_lbl.get("value"), (int, float)): total += float(weight_lbl["value"]) * (qty or 1) return total def format_dimensions(raw_dim_props, with_html=False, translated=False): if not isinstance(raw_dim_props, dict): return "" lines = [] br = "
" if with_html else "\n" title = translate_token("Wymiary") if translated else "Wymiary" lines.append(f"{title}" if with_html else title) for d in raw_dim_props.get("dimensions", []): name = d.get("name", "") meas = d.get("measure", "") if not name and not meas: continue if translated: name_t = translate_token(name) line = f"{name_t}: {meas}".strip() else: line = f"{name}: {meas}".strip() lines.append(line) pack = (raw_dim_props.get("packaging") or {}) pack_title = translate_token("Opakowanie") if translated else "Opakowanie" lines.append(br if with_html else "") lines.append(f"{pack_title}" if with_html else pack_title) content = (pack.get("contentProps") or {}).get("packages") or [] for pkg in content: name = pkg.get("name") or "" if name: lines.append(name) art = (pkg.get("articleNumber") or {}).get("value") if art: art_lbl = "Numer artykułu" if translated: art_lbl = translate_token(art_lbl) lines.append(art_lbl) lines.append(f"{art}") ms = pkg.get("measurements") or [] for block in ms: if not isinstance(block, list): continue for m in block: lbl = m.get("label", "") txt = m.get("text", "") if translated: lbl = translate_token(lbl) if lbl else lbl if lbl or txt: lines.append(f"{lbl}: {txt}".strip(": ")) q_val = ((pkg.get("quantity") or {}).get("value")) if q_val: q_lbl = "Paczka(i)" if translated: q_lbl = translate_token(q_lbl) lines.append(f"{q_lbl}: {q_val}") if with_html: s = br.join([x for x in lines if x is not None]) s = re.sub(r"(" + re.escape(br) + r"){2,}", br*2, s) s = s.strip(br) if s.startswith("strong>"): s = "<" + s return s return "\n".join([x for x in lines if x is not None]).strip() def format_product_details(raw_details, add_summary_desc="", with_html=False, skip_assembly=True): if not isinstance(raw_details, dict): return add_summary_desc if with_html else add_summary_desc br = "
" if with_html else "\n" out = [] if add_summary_desc: out.append(add_summary_desc) out.append(br if with_html else "") t1 = "Informacje o produkcie" out.append(f"{t1}" if with_html else t1) pd = (raw_details.get("productDescriptionProps") or {}) paragraphs = pd.get("paragraphs") or [] for p in paragraphs: out.append(p) dlabel = pd.get("designerLabel") dname = pd.get("designerName") if dlabel and dname: out.append(dlabel) out.append(dname) if raw_details.get("productId"): out.append("Numer artykułu") out.append(raw_details["productId"]) acc = (raw_details.get("accordionObject") or {}) gk = ((acc.get("goodToKnow") or {}).get("contentProps") or {}).get("goodToKnow") or [] if gk: out.append(br if with_html else "") t2 = "Dobrze wiedzieć" out.append(f"{t2}" if with_html else t2) for item in gk: txt = item.get("text") if txt: out.append(txt) mac = (acc.get("materialsAndCare") or {}).get("contentProps") or {} mats = mac.get("materials") or [] care = mac.get("careInstructions") or [] t3 = "Materiały i pielęgnacja" if mats or care: out.append(br if with_html else "") out.append(f"{t3}" if with_html else t3) if mats: out.append("Materiały") for m in mats: ptype = m.get("productType", "") for mat in (m.get("materials") or []): material = mat.get("material", "") if ptype: out.append(ptype) if material: out.append(material) if care: detailsCareText = mac.get("detailsCareText", "Pielęgnacja") out.append(detailsCareText) for c in care: ptype = c.get("productType", "") texts = c.get("texts") or [] if ptype: out.append(ptype) for t in texts: out.append(t) safety = (raw_details.get("safetyAndCompliance") or {}).get("contentProps") or {} sc = safety.get("safetyAndCompliance") or [] if sc: out.append(br if with_html else "") t4 = "Bezpieczeństwo i zgodność z przepisami" out.append(f"{t4}" if with_html else t4) for s in sc: txt = s.get("text") if txt: out.append(txt) if with_html: s = br.join([x for x in out if x is not None]) s = re.sub(r"(" + re.escape(br) + r"){2,}", br*2, s) return s.strip(br) return "\n".join([x for x in out if x is not None]).strip() def build_variant_color_measure(desc: str, type_name: str, measurement: str) -> str: s = (desc or "") t = (type_name or "").strip() if t: pattern = r"^\s*" + re.escape(t) + r"[\s,;:\-–—/]*" s = re.sub(pattern, "", s, flags=re.IGNORECASE) if not re.search(r"[0-9A-Za-zА-Яа-яЁёÀ-ž]", s or ""): s = "" s = s.strip() meas = (measurement or "").strip() if not s: return meas if meas else "" s = s[:1].upper() + s[1:] return f"{s}, {meas}" if meas else s # ───────────────────── СКРАПИНГ КАРТОЧКИ ────────────────────────── def extract_data(url: str, force_dump: bool = False) -> dict: try: logger.debug(f"GET {url}") t0 = time.time() resp = SESSION.get(url, timeout=25, allow_redirects=True) elapsed = time.time() - t0 status = resp.status_code final_url = str(getattr(resp, "url", url)) text_len = len(resp.text or "") logger.info(f"HTTP {status} {final_url} ({elapsed:.2f}s, {text_len} bytes)") # Всегда сохраняем первые (force_dump=True) или любую «сомнительную» страницу need_dump = force_dump or status != 200 or ("data-hydration-props" not in resp.text) if need_dump: note = "force_dump" if force_dump else ("no_hydration" if "data-hydration-props" not in resp.text else f"status_{status}") _save_debug_html(url, resp.text, prefix=f"resp{status}", note=note, status=status, elapsed=elapsed, headers=resp.headers, final_url=final_url) resp.raise_for_status() soup = BeautifulSoup(resp.text, "html.parser") target = soup.select_one(CSS_SELECTOR) if not target: logger.warning("CSS selector NOT FOUND") _save_debug_html(url, resp.text, prefix="no_selector", note="css_selector_missing", status=status, elapsed=elapsed, headers=resp.headers, final_url=final_url) return {"url": url, "error": "CSS selector not found", "http_status": status} raw = target.get("data-hydration-props") if not raw: logger.warning("data-hydration-props NOT FOUND") _save_debug_html(url, resp.text, prefix="no_hydration", note="attribute_missing", status=status, elapsed=elapsed, headers=resp.headers, final_url=final_url) return {"url": url, "error": "data-hydration-props not found", "http_status": status} decoded = html.unescape(raw) try: full_json = json.loads(decoded) except Exception as je: logger.error(f"JSON decode error: {je}") # сохраним кусок для анализа sample_name = f"bad_json_{_now_tag()}_{_safe_name_from_url(url)}.txt" with open(os.path.join(RECORDS_DIR, sample_name), "w", encoding="utf-8") as fh: fh.write(decoded[:20000]) return {"url": url, "error": f"json decode error: {je}", "http_status": status} result = {"url": url} for block in BLOCKS: result.update(flatten_block(block, full_json.get(block, {}))) kf_json = _parse_json_value(result.get("keyFacts.keyFacts")) dim_json = _parse_json_value(result.get("productInformationSection.dimensionProps")) det_json = _parse_json_value(result.get("productInformationSection.productDetailsProps")) result["keyFacts.keyFacts_formatted"] = format_keyfacts(kf_json) result["productInformationSection.dimensionProps_formatted"] = format_dimensions(dim_json, with_html=False, translated=False) html_trans = format_dimensions(dim_json, with_html=True, translated=True) if isinstance(html_trans, str) and html_trans.startswith("strong>"): html_trans = "<" + html_trans result["productInformationSection.dimensionProps_formatted_html_translated"] = html_trans total_kg = _collect_packaging_total_kg((dim_json or {}).get("packaging") or {}) result["total brutto"] = _fmt_float(total_kg) summary_desc = result.get("productSummary.description", "") or "" result["productInformationSection.productDetailsProps_formatted"] = format_product_details(det_json, add_summary_desc=summary_desc, with_html=False, skip_assembly=True) result["productInformationSection.productDetailsProps_formatted_html"] = format_product_details(det_json, add_summary_desc=summary_desc, with_html=True, skip_assembly=True) desc = result.get("pipPricePackage.productDescription", "") or "" tname = result.get("stockcheckSection.typeName", "") or "" meas = result.get("pipPricePackage.measurementText", "") or "" result["prductVariantColorMeasure"] = build_variant_color_measure(desc, tname, meas) # breadcrumb breadcrumb = None for tag in soup.find_all("script", attrs={"type": lambda t: t and "ld+json" in t}): try: data = json.loads(tag.string) except Exception: continue if isinstance(data, list): data = next((d for d in data if isinstance(d, dict) and d.get("@type") == "BreadcrumbList"), None) if isinstance(data, dict) and data.get("@type") == "BreadcrumbList": items = data.get("itemListElement", []) names = [it.get("name", "") for it in items] breadcrumb = "/".join(names) break if breadcrumb: result["categoryBreadcrumb"] = breadcrumb # применяем whitelist filtered = {k: result.get(k) for k in KEEP_COLUMNS if k != "originalName"} # originalName = productName + " " + typeName pn = (result.get("buyModule.productName") or "").strip() tn = (result.get("stockcheckSection.typeName") or "").strip() if pn and tn: orig_name = f"{pn} {tn}" else: orig_name = pn or tn filtered["originalName"] = orig_name return filtered except Exception as e: logger.error(f"Request error for {url}: {e}") return {"url": url, "error": str(e), "http_status": None} # ───────────────────── ПОСТРОЕНИЕ ВАРИАНТА / POST ───────────────── def _split_color_size(text: str): if not text: return "", "" parts = [p.strip() for p in text.split(",", 1)] if len(parts) == 2: return parts[0], parts[1] return "", parts[0] def _ceil_price(v): try: return int(math.ceil(float(v))) except Exception: return None def _ceil_int(v): try: return int(math.ceil(float(v))) except Exception: return None def build_variant(row: dict) -> dict: visible = row.get("productSummary.visibleItemNo") or "" sku = visible.replace(" ", "") csm = (row.get("prductVariantColorMeasure") or "").strip() color, size = _split_color_size(csm) if not color and not size: size = (row.get("pipPricePackage.measurementText") or "").strip() cost = _ceil_price(row.get("buyModule.productPrice")) url = row.get("url") or "" name = row.get("originalName") or row.get("buyModule.productName") or "" desc_html = row.get("productInformationSection.productDetailsProps_formatted_html") or "" composition_html = row.get("productInformationSection.dimensionProps_formatted_html_translated") or "" imgs = [] raw_imgs = row.get("productGallery.urls") or "" if isinstance(raw_imgs, str): imgs = [x for x in raw_imgs.split("\n") if x.strip()] in_stock = bool(row.get("availabilityGroup.serverOnlineSellable")) or bool(row.get("buyModule.onlineSellable")) weight_kg = _ceil_int(row.get("total brutto")) variant = { "status_id": 1, "color": color.capitalize() if color else "none", "sku": sku, "size": size, "cost": cost, "originalUrl": url, "originalName": name, "originalDescription": desc_html, "originalComposition": composition_html, "images": imgs, "inStock": in_stock, "weight": weight_kg if weight_kg is not None else 0, } return { "category": {"name": "TEST/IKEA"}, # временно по вашему ТЗ "brand": {"name": "ikea"}, "variant": variant, } def post_payload(payload: dict) -> dict: headers = {"Content-Type": "application/json"} if POST_API_KEY: headers["Authorization"] = f"Bearer {POST_API_KEY}" body = json.dumps(payload, ensure_ascii=False) _post_log(f"→ POST {POST_URL}\nHeaders: {headers}\nBody: {body}") try: r = SESSION.post(POST_URL, headers=headers, data=body.encode("utf-8"), timeout=POST_TIMEOUT) text = r.text _post_log(f"← {r.status_code}\n{text}\n{'-'*60}") ok = 200 <= r.status_code < 300 return {"ok": ok, "status": r.status_code, "response": text} except Exception as e: _post_log(f"× ERROR: {e}\n{'-'*60}") return {"ok": False, "status": None, "error": str(e)} # ───────────────────────── СЕРДЦЕ СКРИПТА ───────────────────────── def safe_cell(val): if isinstance(val, (dict, list)): return json.dumps(val, ensure_ascii=False) return "" if val is None else val def _clean_url(u: str) -> str: if not isinstance(u, str): return "" u = u.strip().replace("\t", " ") u = u.replace("\ufeff", "").replace("\xa0", "") u = u.strip("\r\n ") return u def main(): logger.info(f"POST_URL={POST_URL} OUTPUT_FILE={OUTPUT_FILE}") SAVE_JSON = ask_bool("SAVE_JSON (сохранять JSON на диск?)", "1") SEND_JSON = ask_bool("SEND_JSON (отправлять на API?)", "1") # читаем ссылки (utf-8-sig для BOM) with open(INPUT_FILE, "r", encoding="utf-8-sig", newline="") as f: raw_lines = f.readlines() links = [_clean_url(x) for x in raw_lines if _clean_url(x)] logger.info(f"Всего ссылок: {len(links)}") if not links: logger.warning("Список ссылок пуст — проверьте product_links.txt") # готовим Excel wb = Workbook() ws = wb.active ws.title = "IKEA Products" ws.append(KEEP_COLUMNS) # батч для JSON/API batch_items = [] batch_index = 1 STATUS_COUNTER = Counter() def flush_batch(): nonlocal batch_items, batch_index if not batch_items: return payload = {"parserName": "ikea", "items": batch_items} if SAVE_JSON: _save_json_batch(payload, batch_index) if SEND_JSON: res = post_payload(payload) ok = res.get("ok") logger.info(f"POST batch {batch_index}: {'OK' if ok else 'FAIL'} (status={res.get('status')})") batch_index += 1 batch_items = [] for idx, link in enumerate(links, 1): logger.info(f"[{idx}/{len(links)}] {link}") force_dump = idx <= 3 # ← Принудительно сохраняем HTML для первых 3 ссылок row = extract_data(link, force_dump=force_dump) st = row.get("http_status") if st is None and "error" in row: STATUS_COUNTER["err"] += 1 else: STATUS_COUNTER[str(st or 200)] += 1 # пишем в Excel ws.append([safe_cell(row.get(col, "")) for col in KEEP_COLUMNS]) # ФИЛЬТРЫ для JSON/API try: price = float(row.get("buyModule.productPrice") or 0) except Exception: price = 0.0 try: total_kg = float(row.get("total brutto") or 0) except Exception: total_kg = 0.0 details_json = row.get("productInformationSection.productDetailsProps") or {} if not (20 <= price <= 1500): logger.debug(f"Skip by price: {price}") elif total_kg > 30: logger.debug(f"Skip by weight: {total_kg} kg") elif materials_match_exclusions(details_json, EXCLUSIONS): logger.debug("Skip by exclusions (materials)") else: try: item = build_variant(row) batch_items.append(item) except Exception as e: logger.error(f"build_variant error for {link}: {e}") _post_log(f"× build_variant error for {link}: {e}") # авто-сейв Excel каждые 50 строк if idx % 50 == 0: wb.save(OUTPUT_FILE) logger.info(f"💾 autosave: {OUTPUT_FILE}") # флаш батча при достижении лимита if len(batch_items) >= BATCH_SIZE: flush_batch() # финал wb.save(OUTPUT_FILE) logger.info(f"\n✅ Excel готов: {OUTPUT_FILE}") flush_batch() logger.info(f"HTTP stats: {dict(STATUS_COUNTER)}") logger.info("🎯 Готово.") if __name__ == "__main__": main()