MacOS_Parsers/Парсер_IKEA/main_win.py
2025-08-25 19:58:58 +03:00

738 lines
28 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
IKEA Parser — надежный вариант: только Playwright (Chrome, persistent profile),
без requests, без параллельности и ускорителей. По одной вкладке, максимум логов.
"""
import os, sys, re, json, math, time, html as html_mod, datetime, traceback
from typing import Optional
import logging
from logging.handlers import RotatingFileHandler
from urllib import request as urlrequest
from urllib.error import URLError, HTTPError
from bs4 import BeautifulSoup
from openpyxl import Workbook
from playwright.sync_api import sync_playwright, TimeoutError as PWTimeout
# ───────────────────────── ПУТИ / ФАЙЛЫ ───────────────────────────
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
RECORDS_DIR = os.path.join(BASE_DIR, "records_folder")
os.makedirs(RECORDS_DIR, exist_ok=True)
INPUT_FILE = os.path.join(BASE_DIR, "product_links.txt")
OUTPUT_FILE = os.path.join(RECORDS_DIR, "records.xlsx")
DICT_FILE = os.path.join(BASE_DIR, "dictionary_main.txt")
EXCL_FILE = os.path.join(BASE_DIR, "exclusion_materials.txt")
POST_LOG = os.path.join(RECORDS_DIR, "post_log.txt")
LOGS_DIR = os.path.join(RECORDS_DIR, "logs")
HTML_DIR = os.path.join(RECORDS_DIR, "html_debug")
JSON_DIR = os.path.join(RECORDS_DIR, "json_debug")
PROFILE_DIR = os.path.join(BASE_DIR, "playwright_profile")
os.makedirs(LOGS_DIR, exist_ok=True)
os.makedirs(HTML_DIR, exist_ok=True)
os.makedirs(JSON_DIR, exist_ok=True)
os.makedirs(PROFILE_DIR, exist_ok=True)
APP_LOG_FILE = os.path.join(LOGS_DIR, "app.log")
# ───────────────────────── НАСТРОЙКИ ──────────────────────────────
CSS_SELECTOR = ".pip-product__subgrid.product-pip.js-product-pip"
BLOCKS = [
"buyModule",
"productSummary",
"pipPricePackage",
"productInformationSection",
"keyFacts",
"stockcheckSection",
"availabilityGroup",
"productGallery",
]
KEEP_COLUMNS = [
"availabilityGroup.serverOnlineSellable",
"availabilityGroup.storeHeader",
"buyModule.onlineSellable",
"buyModule.productName",
"buyModule.productPrice",
"buyModule.productType",
"keyFacts.ariaLabels",
"keyFacts.gaLabel",
"keyFacts.keyFacts",
"keyFacts.keyFacts_formatted",
"pipPricePackage.measurementText",
"pipPricePackage.productDescription",
"productGallery.urls",
"productInformationSection.dimensionProps",
"productInformationSection.dimensionProps_formatted",
"productInformationSection.dimensionProps_formatted_html_translated",
"productInformationSection.productDetailsProps",
"productInformationSection.productDetailsProps_formatted",
"productInformationSection.productDetailsProps_formatted_html",
"productSummary.description",
"productSummary.visibleItemNo",
"stockcheckSection.packagingProps",
"stockcheckSection.typeName",
"total brutto",
"prductVariantColorMeasure",
"categoryBreadcrumb",
"originalName",
"url",
]
UA = os.getenv(
"IKEA_UA",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36"
)
ACCEPT_LANG = os.getenv("IKEA_ACCEPT_LANGUAGE", "pl-PL,pl;q=0.9,en;q=0.8,ru;q=0.7")
# Playwright запускаем Chrome, persistent profile; по умолчанию не headless —
# так Cloudflare реже блокирует.
HEADLESS = os.getenv("IKEA_HEADLESS", "0") not in {"0", "false", "False", ""}
# POST/API без requests
POST_URL = os.getenv("IKEA_POST_URL", "http://localhost:3005/parser/data")
POST_API_KEY = os.getenv("IKEA_POST_API_KEY", "")
POST_TIMEOUT = int(os.getenv("IKEA_POST_TIMEOUT", "20"))
BATCH_SIZE = int(os.getenv("IKEA_BATCH_SIZE", "50"))
# ───────────────────────── ЛОГИРОВАНИЕ ────────────────────────────
logger = logging.getLogger("ikea_pw_simple")
logger.setLevel(logging.DEBUG)
_fmt = logging.Formatter("%(asctime)s [%(levelname)s] %(message)s")
fh = RotatingFileHandler(APP_LOG_FILE, maxBytes=2_000_000, backupCount=3, encoding="utf-8")
fh.setFormatter(_fmt)
fh.setLevel(logging.DEBUG)
logger.addHandler(fh)
ch = logging.StreamHandler(sys.stdout)
ch.setFormatter(_fmt)
ch.setLevel(logging.INFO)
logger.addHandler(ch)
def _now_tag():
return datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
def _post_log(msg: str):
try:
with open(POST_LOG, "a", encoding="utf-8") as f:
f.write(msg.rstrip() + "\n")
except Exception:
pass
def _save_json_batch(payload: dict, batch_index: int):
fname = f"ikea_batch_{_now_tag()}_{batch_index:04d}.json"
fpath = os.path.join(JSON_DIR, fname)
with open(fpath, "w", encoding="utf-8") as fh:
json.dump(payload, fh, ensure_ascii=False, indent=2)
logger.debug(f"💾 JSON saved: {fpath}")
return fpath
def _save_html_snapshot(prefix: str, idx: int, content: str):
fname = f"{idx:04d}_{prefix}_{_now_tag()}.html"
fpath = os.path.join(HTML_DIR, fname)
try:
with open(fpath, "w", encoding="utf-8") as fh:
fh.write(content)
logger.debug("🧪 HTML snapshot: %s", fpath)
except Exception:
logger.exception("Failed to save HTML snapshot")
def ask_bool(prompt: str, default: str = "1") -> bool:
try:
val = input(f"{prompt} (1=yes, 0=no) [{default}]: ").strip() or default
except EOFError:
val = default
return val == "1"
# ───────────────────────── СЛОВАРИ / ФИЛЬТРЫ ──────────────────────
def load_dictionary(path: str) -> dict:
if not os.path.isfile(path):
return {}
txt = open(path, "r", encoding="utf-8").read()
pairs = re.findall(r'"([^"]+)"\s*:\s*"([^"]+)"', txt)
return {k: v for k, v in pairs}
DICT = load_dictionary(DICT_FILE)
def translate_token(token: str) -> str:
return DICT.get(token, token)
def load_exclusions(path: str) -> set:
if not os.path.isfile(path):
return set()
txt = open(path, "r", encoding="utf-8").read()
quoted = re.findall(r'"([^"]+)"', txt, flags=re.S)
tokens = quoted if quoted else re.split(r"[,;\n\r]+", txt)
return {t.strip().lower() for t in tokens if t.strip()}
EXCLUSIONS = load_exclusions(EXCL_FILE)
def materials_from_details_json(details: dict) -> list[str]:
out: list[str] = []
def walk(node):
if isinstance(node, dict):
for k, v in node.items():
if k == "material" and isinstance(v, str):
out.append(v)
else:
walk(v)
elif isinstance(node, list):
for x in node:
walk(x)
walk(details or {})
return out
def materials_match_exclusions(details: dict, exclusion_tokens: set) -> bool:
if not exclusion_tokens:
return False
mats = materials_from_details_json(details)
joined = "\n".join(mats).lower()
return any(tok in joined for tok in exclusion_tokens)
# ───────────────────────── ФОРМАТТЕРЫ ─────────────────────────────
def _parse_json_value(val):
if isinstance(val, (dict, list)) or val is None:
return val
if isinstance(val, str):
s = val.strip()
if not s:
return val
try:
return json.loads(s)
except Exception:
return val
return val
def flatten_block(block_name, data):
if not isinstance(data, dict):
return {}
flat = {}
for k, v in data.items():
if block_name == "productGallery" and k == "mediaList":
if isinstance(v, list):
urls = []
for item in v:
content = item.get("content", {})
if isinstance(content, dict) and "url" in content:
urls.append(content["url"])
flat["productGallery.urls"] = "\n".join(urls)
return flat
key = f"{block_name}.{k}"
flat[key] = v
return flat
def format_keyfacts(raw_keyfacts):
if not isinstance(raw_keyfacts, list):
return ""
out = []
header_added = False
for el in raw_keyfacts:
lbl = (el or {}).get("label")
name = (el or {}).get("name", "Właściwości")
if not header_added:
out.append(name)
header_added = True
if lbl:
out.append(lbl)
return "\n".join(out)
def _fmt_float(x):
try:
return f"{float(x):.2f}".rstrip("0").rstrip(".")
except Exception:
return ""
def _collect_packaging_total_kg(packaging):
total = 0.0
if not isinstance(packaging, dict):
return total
content = (packaging.get("contentProps") or {}).get("packages") or []
for pkg in content:
qty = ((pkg.get("quantity") or {}).get("value")) or 1
ms = pkg.get("measurements") or []
for block in ms:
if not isinstance(block, list):
continue
weight_lbl = next((m for m in block if (m.get("type") == "weight" or m.get("label") == "Waga")), None)
if weight_lbl and isinstance(weight_lbl.get("value"), (int, float)):
total += float(weight_lbl["value"]) * (qty or 1)
return total
def format_dimensions(raw_dim_props, with_html=False, translated=False):
if not isinstance(raw_dim_props, dict):
return ""
lines = []
br = "<br/>" if with_html else "\n"
title = translate_token("Wymiary") if translated else "Wymiary"
lines.append(f"<strong>{title}</strong>" if with_html else title)
for d in raw_dim_props.get("dimensions", []):
name = d.get("name", "")
meas = d.get("measure", "")
if not name and not meas:
continue
if translated:
name_t = translate_token(name)
line = f"{name_t}: {meas}".strip()
else:
line = f"{name}: {meas}".strip()
lines.append(line)
pack = (raw_dim_props.get("packaging") or {})
pack_title = translate_token("Opakowanie") if translated else "Opakowanie"
lines.append(br if with_html else "")
lines.append(f"<strong>{pack_title}</strong>" if with_html else pack_title)
content = (pack.get("contentProps") or {}).get("packages") or []
for pkg in content:
name = pkg.get("name") or ""
if name:
lines.append(name)
art = (pkg.get("articleNumber") or {}).get("value")
if art:
art_lbl = "Numer artykułu"
if translated:
art_lbl = translate_token(art_lbl)
lines.append(art_lbl)
lines.append(f"{art}")
ms = pkg.get("measurements") or []
for block in ms:
if not isinstance(block, list):
continue
for m in block:
lbl = m.get("label", "")
txt = m.get("text", "")
if translated:
lbl = translate_token(lbl) if lbl else lbl
if lbl or txt:
lines.append(f"{lbl}: {txt}".strip(": "))
q_val = ((pkg.get("quantity") or {}).get("value"))
if q_val:
q_lbl = "Paczka(i)"
if translated:
q_lbl = translate_token(q_lbl)
lines.append(f"{q_lbl}: {q_val}")
if with_html:
s = br.join([x for x in lines if x is not None])
s = re.sub(r"(" + re.escape(br) + r"){2,}", br*2, s)
s = s.strip(br)
if s.startswith("strong>"):
s = "<" + s
return s
return "\n".join([x for x in lines if x is not None]).strip()
def format_product_details(raw_details, add_summary_desc="", with_html=False, skip_assembly=True):
if not isinstance(raw_details, dict):
return add_summary_desc if with_html else add_summary_desc
br = "<br/>" if with_html else "\n"
out = []
if add_summary_desc:
out.append(add_summary_desc)
out.append(br if with_html else "")
t1 = "Informacje o produkcie"
out.append(f"<strong>{t1}</strong>" if with_html else t1)
pd = (raw_details.get("productDescriptionProps") or {})
paragraphs = pd.get("paragraphs") or []
for p in paragraphs:
out.append(p)
dlabel = pd.get("designerLabel")
dname = pd.get("designerName")
if dlabel and dname:
out.append(dlabel)
out.append(dname)
if raw_details.get("productId"):
out.append("Numer artykułu")
out.append(raw_details["productId"])
acc = (raw_details.get("accordionObject") or {})
gk = ((acc.get("goodToKnow") or {}).get("contentProps") or {}).get("goodToKnow") or []
if gk:
out.append(br if with_html else "")
t2 = "Dobrze wiedzieć"
out.append(f"<strong>{t2}</strong>" if with_html else t2)
for item in gk:
txt = item.get("text")
if txt:
out.append(txt)
mac = (acc.get("materialsAndCare") or {}).get("contentProps") or {}
mats = mac.get("materials") or []
care = mac.get("careInstructions") or []
t3 = "Materiały i pielęgnacja"
if mats or care:
out.append(br if with_html else "")
out.append(f"<strong>{t3}</strong>" if with_html else t3)
if mats:
out.append("Materiały")
for m in mats:
ptype = m.get("productType", "")
for mat in (m.get("materials") or []):
material = mat.get("material", "")
if ptype:
out.append(ptype)
if material:
out.append(material)
if care:
detailsCareText = mac.get("detailsCareText", "Pielęgnacja")
out.append(detailsCareText)
for c in care:
ptype = c.get("productType", "")
texts = c.get("texts") or []
if ptype:
out.append(ptype)
for t in texts:
out.append(t)
safety = (raw_details.get("safetyAndCompliance") or {}).get("contentProps") or {}
sc = safety.get("safetyAndCompliance") or []
if sc:
out.append(br if with_html else "")
t4 = "Bezpieczeństwo i zgodność z przepisami"
out.append(f"<strong>{t4}</strong>" if with_html else t4)
for s in sc:
txt = s.get("text")
if txt:
out.append(txt)
if with_html:
s = br.join([x for x in out if x is not None])
s = re.sub(r"(" + re.escape(br) + r"){2,}", br*2, s)
return s.strip(br)
return "\n".join([x for x in out if x is not None]).strip()
def build_variant_color_measure(desc: str, type_name: str, measurement: str) -> str:
s = (desc or "")
t = (type_name or "").strip()
if t:
pattern = r"^\s*" + re.escape(t) + r"[\s,;:\-–—/]*"
s = re.sub(pattern, "", s, flags=re.IGNORECASE)
if not re.search(r"[0-9A-Za-zА-Яа-яЁёÀ-ž]", s or ""):
s = ""
s = s.strip()
meas = (measurement or "").strip()
if not s:
return meas if meas else ""
s = s[:1].upper() + s[1:]
return f"{s}, {meas}" if meas else s
# ───────────────────── Playwright — одна вкладка ──────────────────
def open_browser():
pw = sync_playwright().start()
# persistent Chrome: Cloudflare к нему относится лояльнее
ctx = pw.chromium.launch_persistent_context(
PROFILE_DIR,
headless=HEADLESS,
channel="chrome", # важное отличие
user_agent=UA,
locale="pl-PL",
java_script_enabled=True,
accept_downloads=False,
viewport={"width": 1366, "height": 864},
# можно добавить proxy={"server": "..."} при необходимости
)
page = ctx.new_page()
# базовые заголовки (совпадают с реальным браузером)
page.set_extra_http_headers({"Accept-Language": ACCEPT_LANG})
return pw, ctx, page
def close_browser(pw, ctx):
try:
ctx.close()
pw.stop()
except Exception:
pass
def fetch_page(page, url: str, idx: int) -> tuple[str, Optional[str]]:
"""
Возвращает (full_html, hydration_raw_json_or_None).
Сохраняет снапшот, если не нашли data-hydration-props.
"""
t0 = time.time()
resp = page.goto(url, wait_until="domcontentloaded", timeout=60_000)
status = resp.status if resp else 0
# ждём селектор, но не слишком долго
try:
page.wait_for_selector(CSS_SELECTOR, timeout=25_000, state="attached")
except PWTimeout:
pass
full_html = page.content()
# прямой атрибут
raw = None
try:
el = page.locator(CSS_SELECTOR).first
raw = el.get_attribute("data-hydration-props")
except Exception:
raw = None
elapsed = time.time() - t0
logger.info("PW %s status=%s %.2fs len=%s", url, status, elapsed, len(full_html or ""))
# если Cloudflare/403 — сохраним снапшот для диагностики
if not raw:
_save_html_snapshot("no_hydration", idx, full_html or "")
return full_html or "", raw
# ───────────────────── Парсинг страницы ───────────────────────────
def parse_page(url: str, full_html: str, raw_json: Optional[str]) -> dict:
if not full_html:
return {"url": url, "error": "no html"}
soup = BeautifulSoup(full_html, "html.parser")
# Fallback: если не пришёл raw, попробуем из DOM
if not raw_json:
target = soup.select_one(CSS_SELECTOR)
if target:
raw_json = target.get("data-hydration-props")
if not raw_json:
return {"url": url, "error": "data-hydration-props not found"}
try:
decoded = html_mod.unescape(raw_json)
full_json = json.loads(decoded)
except Exception as e:
return {"url": url, "error": f"json decode error: {e}"}
result = {"url": url}
for block in BLOCKS:
result.update(flatten_block(block, full_json.get(block, {})))
kf_json = _parse_json_value(result.get("keyFacts.keyFacts"))
dim_json = _parse_json_value(result.get("productInformationSection.dimensionProps"))
det_json = _parse_json_value(result.get("productInformationSection.productDetailsProps"))
result["keyFacts.keyFacts_formatted"] = format_keyfacts(kf_json)
result["productInformationSection.dimensionProps_formatted"] = format_dimensions(dim_json, with_html=False, translated=False)
html_trans = format_dimensions(dim_json, with_html=True, translated=True)
if isinstance(html_trans, str) and html_trans.startswith("strong>"):
html_trans = "<" + html_trans
result["productInformationSection.dimensionProps_formatted_html_translated"] = html_trans
total_kg = _collect_packaging_total_kg((dim_json or {}).get("packaging") or {})
result["total brutto"] = _fmt_float(total_kg)
summary_desc = result.get("productSummary.description", "") or ""
result["productInformationSection.productDetailsProps_formatted"] = format_product_details(det_json, add_summary_desc=summary_desc, with_html=False, skip_assembly=True)
result["productInformationSection.productDetailsProps_formatted_html"] = format_product_details(det_json, add_summary_desc=summary_desc, with_html=True, skip_assembly=True)
desc = result.get("pipPricePackage.productDescription", "") or ""
tname = result.get("stockcheckSection.typeName", "") or ""
meas = result.get("pipPricePackage.measurementText", "") or ""
result["prductVariantColorMeasure"] = build_variant_color_measure(desc, tname, meas)
# breadcrumb
breadcrumb = None
for tag in soup.find_all("script", attrs={"type": lambda t: t and "ld+json" in t}):
try:
data = json.loads(tag.string)
except Exception:
continue
if isinstance(data, list):
data = next((d for d in data if isinstance(d, dict) and d.get("@type") == "BreadcrumbList"), None)
if isinstance(data, dict) and data.get("@type") == "BreadcrumbList":
items = data.get("itemListElement", [])
names = [it.get("name", "") for it in items]
breadcrumb = "/".join(names)
break
if breadcrumb:
result["categoryBreadcrumb"] = breadcrumb
filtered = {k: result.get(k) for k in KEEP_COLUMNS if k != "originalName"}
pn = (result.get("buyModule.productName") or "").strip()
tn = (result.get("stockcheckSection.typeName") or "").strip()
filtered["originalName"] = (f"{pn} {tn}".strip() or pn or tn)
return filtered
# ───────────────────── POST (urllib) ──────────────────────────────
def post_payload(payload: dict) -> dict:
headers = {"Content-Type": "application/json; charset=utf-8"}
if POST_API_KEY:
headers["Authorization"] = f"Bearer {POST_API_KEY}"
body = json.dumps(payload, ensure_ascii=False).encode("utf-8")
req = urlrequest.Request(POST_URL, data=body, headers=headers, method="POST")
_post_log(f"→ POST {POST_URL}\nHeaders: {headers}\nBody: {body.decode('utf-8')}")
try:
with urlrequest.urlopen(req, timeout=POST_TIMEOUT) as resp:
txt = resp.read().decode("utf-8", errors="replace")
code = resp.getcode()
_post_log(f"{code}\n{txt}\n{'-'*60}")
return {"ok": 200 <= code < 300, "status": code, "response": txt}
except HTTPError as e:
txt = e.read().decode("utf-8", errors="replace")
_post_log(f"{e.code}\n{txt}\n{'-'*60}")
return {"ok": False, "status": e.code, "response": txt}
except URLError as e:
_post_log(f"× ERROR: {e}\n{'-'*60}")
return {"ok": False, "status": None, "error": str(e)}
# ───────────────────────── СЕРДЦЕ СКРИПТА ─────────────────────────
def safe_cell(val):
if isinstance(val, (dict, list)):
return json.dumps(val, ensure_ascii=False)
return "" if val is None else val
def main():
logger.info("Playwright-only (simple). BASE_DIR=%s", BASE_DIR)
logger.info("Python=%s", sys.version.replace("\n", " "))
logger.info("POST_URL=%s OUTPUT_FILE=%s", POST_URL, OUTPUT_FILE)
logger.info("HEADLESS=%s UA=%s Accept-Language=%s", HEADLESS, UA, ACCEPT_LANG)
SAVE_JSON = ask_bool("SAVE_JSON (сохранять JSON на диск?)", "1")
SEND_JSON = ask_bool("SEND_JSON (отправлять на API?)", "1")
with open(INPUT_FILE, "r", encoding="utf-8") as f:
links = [line.strip() for line in f if line.strip()]
print(f"Всего ссылок: {len(links)}")
wb = Workbook()
ws = wb.active
ws.title = "IKEA Products"
ws.append(KEEP_COLUMNS)
batch_items = []
batch_index = 1
def flush_batch():
nonlocal batch_items, batch_index
if not batch_items:
return
payload = {"parserName": "ikea", "items": batch_items}
if SAVE_JSON:
_save_json_batch(payload, batch_index)
if SEND_JSON:
res = post_payload(payload)
ok = res.get("ok")
print(f"POST batch {batch_index}: {'OK' if ok else 'FAIL'} (status={res.get('status')})")
logger.info("POST batch %d: %s (status=%s)", batch_index, "OK" if ok else "FAIL", res.get("status"))
batch_index += 1
batch_items = []
pw, ctx, page = open_browser()
try:
for idx, url in enumerate(links, 1):
print(f"[{idx}/{len(links)}] {url}")
try:
full_html, raw = fetch_page(page, url, idx)
except Exception:
logger.exception("Fetch error for %s", url)
continue
row = parse_page(url, full_html, raw)
if row.get("error"):
logger.warning("Extract error [%d] %s: %s", idx, url, row["error"])
# Excel
ws.append([safe_cell(row.get(col, "")) for col in KEEP_COLUMNS])
# Фильтры для JSON/API
try:
price = float(row.get("buyModule.productPrice") or 0)
except Exception:
price = 0.0
try:
total_kg = float(row.get("total brutto") or 0)
except Exception:
total_kg = 0.0
details_json = row.get("productInformationSection.productDetailsProps") or {}
if not (20 <= price <= 1500):
pass
elif total_kg > 30:
pass
elif materials_match_exclusions(details_json, EXCLUSIONS):
pass
else:
# build variant (минимально как раньше)
def _ceil_price(v):
try: return int(math.ceil(float(v)))
except: return None
def _ceil_int(v):
try: return int(math.ceil(float(v)))
except: return None
visible = row.get("productSummary.visibleItemNo") or ""
sku = visible.replace(" ", "")
csm = (row.get("prductVariantColorMeasure") or "").strip()
color, size = "", ""
if csm:
parts = [p.strip() for p in csm.split(",", 1)]
if len(parts) == 2:
color, size = parts[0], parts[1]
else:
size = parts[0]
if not color and not size:
size = (row.get("pipPricePackage.measurementText") or "").strip()
cost = _ceil_price(row.get("buyModule.productPrice"))
name = row.get("originalName") or row.get("buyModule.productName") or ""
desc_html = row.get("productInformationSection.productDetailsProps_formatted_html") or ""
composition_html = row.get("productInformationSection.dimensionProps_formatted_html_translated") or ""
imgs = []
raw_imgs = row.get("productGallery.urls") or ""
if isinstance(raw_imgs, str):
imgs = [x for x in raw_imgs.split("\n") if x.strip()]
in_stock = bool(row.get("availabilityGroup.serverOnlineSellable")) or bool(row.get("buyModule.onlineSellable"))
weight_kg = _ceil_int(row.get("total brutto"))
item = {
"category": {"name": "TEST/IKEA"},
"brand": {"name": "ikea"},
"variant": {
"status_id": 1,
"color": color.capitalize() if color else "none",
"sku": sku,
"size": size,
"cost": cost,
"originalUrl": url,
"originalName": name,
"originalDescription": desc_html,
"originalComposition": composition_html,
"images": imgs,
"inStock": in_stock,
"weight": weight_kg if weight_kg is not None else 0,
},
}
batch_items.append(item)
if idx % 50 == 0:
wb.save(OUTPUT_FILE)
print(f"💾 autosave: {OUTPUT_FILE}")
if len(batch_items) >= BATCH_SIZE:
flush_batch()
wb.save(OUTPUT_FILE)
print(f"\n✅ Excel готов: {OUTPUT_FILE}")
flush_batch()
finally:
close_browser(pw, ctx)
logger.info("Playwright closed. Bye.")
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print("\nInterrupted by user.")
logger.warning("Interrupted by user")
except Exception:
logger.exception("Fatal error")
raise