IKEAmain for WIN

This commit is contained in:
va1is 2025-08-25 14:46:19 +03:00
parent b3c1ee2b69
commit 30456b2541
6 changed files with 978 additions and 268 deletions

View File

@ -48,6 +48,11 @@ output:
csv_also: true
jsonl_also: true
pdp:
max_concurrency: 3 # одновременно открытых PDP-страниц
nav_timeout_ms: 45000
wait_timeout_ms: 15000
debug:
dump_always: false # true — чтобы писать дампы на каждом шаге

View File

@ -1,16 +1,20 @@
import asyncio
import logging
import re
import json
import os
import json
import re
from datetime import datetime
from pathlib import Path
from typing import List, Dict, Any, Optional
import re
from playwright.async_api import async_playwright
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
class FetchError(Exception):
pass
# ---- Price parsing helpers ----
_PLN_PRICE_RE = re.compile(
r'(?<!\d)(\d{1,3}(?:[ \u00A0]?\d{3})*(?:[.,]\d{2})?)(?:\s*(?:zł|PLN))',
@ -18,17 +22,12 @@ _PLN_PRICE_RE = re.compile(
)
def parse_pln_price_to_float(price_text: str | None) -> float | None:
"""
'1 299,00 zł' / '1299 zł' / '1 299 zł' -> 1299.00
Возвращает None, если распарсить не удалось.
"""
if not price_text:
return None
t = (
price_text
.replace("\u00a0", " ") # NBSP
.replace("\u2009", " ") # thin space
.strip()
price_text.replace("\u00a0", " ")
.replace("\u2009", " ")
.strip()
)
m = _PLN_PRICE_RE.search(t)
if not m:
@ -42,20 +41,13 @@ def parse_pln_price_to_float(price_text: str | None) -> float | None:
return None
class FetchError(Exception):
pass
class Fetcher:
"""
Browser layer: Playwright Chromium with anti-bot hygiene and robust debug dumps.
- Blocks heavy resources (fonts/media/images), keeps stylesheets.
- Waits for either SSR summary scripts or window.ssrClientSettings.
- Two ways to read product summaries:
1) window.ssrClientSettings.productSummary
2) inline <script id="next-product-summary-script-..."> content (fallback)
- Captures XHR JSON responses by patterns.
- Dumps HTML/PNG with timestamps at key checkpoints and on failure.
Playwright layer + инструменты:
- Лёгкий рендер (блокируем image/font/media, оставляем CSS).
- PLP: скролл до полного количества, сбор SSR + DOM.
- PDP: обогащение color/description.
- Дампы HTML/PNG в out/raw_html для отладки.
"""
def __init__(self, cfg: Dict[str, Any]):
@ -83,11 +75,7 @@ class Fetcher:
viewport={"width": 1366, "height": 900},
)
self.page = await self.context.new_page()
# Block heavy resources; keep stylesheets.
await self.context.route("**/*", self._route)
# Listen to JSON XHRs for optional parsing.
self.page.on("response", self._on_response)
self.page.on("console", lambda msg: logging.debug(f"[page.console] {msg.type} {msg.text}"))
return self
@ -98,10 +86,6 @@ class Fetcher:
await self.playwright.stop()
async def _route(self, route, request):
"""
Блокируем часть тяжёлых ресурсов.
Для отладки с картинками убери 'image' из списка.
"""
if request.resource_type in ["font", "media", "image"]:
return await route.abort()
return await route.continue_()
@ -116,28 +100,22 @@ class Fetcher:
pass
async def _dump_debug(self, tag: str):
"""Save HTML and screenshot with timestamp; log absolute paths and CWD."""
try:
raw_dir = Path("out/raw_html").resolve()
raw_dir.mkdir(parents=True, exist_ok=True)
ts = datetime.utcnow().strftime("%Y%m%d_%H%M%S_%f")
html_path = raw_dir / f"{ts}_{tag}.html"
png_path = raw_dir / f"{ts}_{tag}.png"
cwd = Path(os.getcwd()).resolve()
logging.info(f"[dump_debug] CWD={cwd} → html={html_path} png={png_path}")
logging.info(f"[dump_debug] CWD={Path(os.getcwd()).resolve()} → html={html_path} png={png_path}")
try:
html = await self.page.content()
html_path.write_text(html, encoding="utf-8")
except Exception as e:
logging.warning(f"[dump_debug] writing HTML failed: {e}")
try:
await self.page.screenshot(path=str(png_path), full_page=True)
except Exception as e:
logging.warning(f"[dump_debug] screenshot failed: {e}")
logging.info(f"[dump_debug] saved OK: {html_path.name}, {png_path.name}")
except Exception as e:
logging.warning(f"[dump_debug] general fail: {e}")
@ -161,7 +139,6 @@ class Fetcher:
pass
async def _log_plp_state(self, stage: str):
"""Log counts of SSR scripts and presence of window.ssrClientSettings."""
try:
scripts_count = await self.page.locator('script[id^="next-product-summary-script-"]').count()
except Exception:
@ -175,57 +152,35 @@ class Fetcher:
has_window = False
logging.info(f"[{stage}] scripts: {scripts_count}, window.ps: {has_window}")
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=1, max=8),
retry=retry_if_exception_type(FetchError),
)
@retry(stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=1, max=8),
retry=retry_if_exception_type(FetchError))
async def load_category(self, url: str):
"""
Navigation + robust readiness:
1) domcontentloaded
2) accept cookies
3) warm-up scroll
4) wait for <script id^="next-product-summary-script-"> (attached)
5) attempt window.ssrClientSettings (non-fatal)
Dumps at key checkpoints and on failure.
"""
try:
await self.page.goto(
url,
timeout=self.cfg.get("nav_timeout_ms", 60000),
wait_until="domcontentloaded",
)
await self.page.goto(url, timeout=self.cfg.get("nav_timeout_ms", 60000), wait_until="domcontentloaded")
await self._dump_debug("after_goto")
await self._accept_cookies_if_any()
await self._dump_debug("after_cookies")
await self._log_plp_state("after_accept")
# warm-up scroll to trigger scripts/lazy
for _ in range(3):
await self.page.mouse.wheel(0, 1600)
await self.page.wait_for_timeout(300)
await self._dump_debug("after_warmup")
await self._log_plp_state("after_warmup")
# wait for SSR script tags
await self.page.wait_for_selector(
'script[id^="next-product-summary-script-"]',
state="attached",
timeout=self.cfg.get("wait_timeout_ms", 30000),
)
await self.page.wait_for_selector('script[id^="next-product-summary-script-"]',
state="attached",
timeout=self.cfg.get("wait_timeout_ms", 30000))
await self._dump_debug("after_scripts_present")
# optional window readiness
try:
await self.page.wait_for_function(
"""
() => {
"""() => {
const ps = globalThis?.ssrClientSettings?.productSummary;
return !!(ps && Array.isArray(ps.itemNumbers) && ps.itemNumbers.length > 0);
}
""",
}""",
timeout=5000,
)
except Exception:
@ -233,19 +188,12 @@ class Fetcher:
await self._dump_debug("after_window_check")
return True
except Exception as e:
logging.error(f"load_category failed: {e}")
await self._dump_debug("fail_load_category")
raise FetchError(str(e))
# ---------- NEW: read total count and scroll until target ----------
async def read_total_from_header(self) -> Optional[int]:
"""
Tries to read category total from the header count like '(434)'.
Looks in '#plp-seo-heading .esi-count' or any '.esi-count' fallback.
"""
sels = ["#plp-seo-heading .esi-count", ".esi-count"]
for sel in sels:
try:
@ -263,42 +211,28 @@ class Fetcher:
return None
async def auto_scroll_until_total(self, hard_max_scrolls: Optional[int] = None):
"""
Scrolls until we reach target total (from header), with a hard cap.
Uses networkidle + a small jiggle to retrigger lazy loading.
"""
hard_cap = hard_max_scrolls or self.cfg.get("scroll", {}).get("hard_max_scrolls", 2000)
netidle_ms = self.cfg.get("scroll", {}).get("wait_networkidle_timeout_ms", 8000)
# Combined product tile selector
sel_tiles = '[data-testid="plp-product-grid-item"], [data-testid="product-tile"], .ProductCard, [data-qa="plp-product"]'
target = await self.read_total_from_header()
last = 0
same_ticks = 0
same_limit = self.cfg.get("scroll", {}).get("stop_if_no_new_items_after", 8)
for i in range(hard_cap):
# Scroll to bottom
try:
await self.page.evaluate("window.scrollTo(0, document.body.scrollHeight)")
except Exception:
pass
# Wait for network idle
try:
await self.page.wait_for_load_state("networkidle", timeout=netidle_ms)
except Exception:
# not fatal
await asyncio.sleep(0.25)
# Jiggle to retrigger observers
try:
await self.page.mouse.wheel(0, -200)
await asyncio.sleep(0.1)
await self.page.mouse.wheel(0, -200); await asyncio.sleep(0.1)
await self.page.mouse.wheel(0, 1200)
except Exception:
pass
try:
seen = await self.page.locator(sel_tiles).count()
except Exception:
@ -307,7 +241,6 @@ class Fetcher:
if target and seen >= target:
logging.info(f"Reached target: seen {seen}/{target} (i={i})")
break
if seen <= last:
same_ticks += 1
if same_ticks >= same_limit:
@ -316,11 +249,8 @@ class Fetcher:
else:
same_ticks = 0
last = seen
logging.info(f"Final seen items: {last} (target={target}, cap={hard_cap})")
# ---------- existing helpers ----------
async def current_html(self) -> str:
return await self.page.content()
@ -334,13 +264,10 @@ class Fetcher:
pass
return results
# ---------- PLP: SSR + DOM ----------
async def read_ssr_product_summaries(self) -> List[Dict[str, Any]]:
"""
Returns simplified product summaries.
Path 1: window.ssrClientSettings.productSummary
Path 2: parse inline <script id="next-product-summary-script-..."> blocks
"""
# Path 1 — from window
# 1) window.*
js_window = """
() => {
const out = [];
@ -385,7 +312,7 @@ class Fetcher:
except Exception:
pass
# Path 2 — parse inline scripts
# 2) inline <script>
js_scripts = """
() => {
const list = Array.from(document.querySelectorAll('script[id^="next-product-summary-script-"]'));
@ -398,11 +325,10 @@ class Fetcher:
return []
out: List[Dict[str, Any]] = []
# productSummary["ID"] = { ... } OR productSummary['ID'] = { ... }
assign_re = re.compile(r'productSummary\s*\[\s*([\'"])(.*?)\1\s*\]\s*=\s*\{')
for t in texts or []:
for m in assign_re.finditer(t):
start = m.end() - 1 # at '{'
start = m.end() - 1
depth = 0
end = None
for i in range(start, len(t)):
@ -419,58 +345,49 @@ class Fetcher:
block = t[start:end]
try:
data = json.loads(block)
sd = (
data.get("_STATE_", {})
.get("productSummary", {})
.get("summaryData", {})
)
sd = data.get("_STATE_", {}).get("productSummary", {}).get("summaryData", {})
cws = sd.get("colourways") or []
cw = cws[0] if cws else None
out.append(
{
"id": sd.get("id"),
"title": sd.get("title"),
"baseUrl": sd.get("baseUrl"),
"brand": sd.get("brand"),
"category": sd.get("category"),
"currencyCode": sd.get("currencyCode"),
"colourway": {
"id": cw.get("id"),
"url": cw.get("url"),
"color": cw.get("c"),
"title": cw.get("t"),
"price": cw.get("p"),
"priceMarket": cw.get("mp"),
"selected": bool(cw.get("s")),
} if cw else None,
"imageCdnUrl": sd.get("imageCdnUrl"),
"productImageUrlPart": sd.get("productImageUrlPart"),
"lgImagePath": sd.get("lgImagePath"),
}
)
out.append({
"id": sd.get("id"),
"title": sd.get("title"),
"baseUrl": sd.get("baseUrl"),
"brand": sd.get("brand"),
"category": sd.get("category"),
"currencyCode": sd.get("currencyCode"),
"colourway": ({
"id": cw.get("id"),
"url": cw.get("url"),
"color": cw.get("c"),
"title": cw.get("t"),
"price": cw.get("p"),
"priceMarket": cw.get("mp"),
"selected": bool(cw.get("s")),
} if cw else None),
"imageCdnUrl": sd.get("imageCdnUrl"),
"productImageUrlPart": sd.get("productImageUrlPart"),
"lgImagePath": sd.get("lgImagePath"),
})
except Exception:
continue
logging.info(f"SSR(scripts) summaries: {len(out)}")
return out
async def read_dom_products(self) -> List[Dict[str, Any]]:
"""
Парсит карточки из DOM после прокрутки.
Покрывает несколько вариантов разметки Next PLP.
"""
js = r"""
() => {
const out = [];
const gridItems = document.querySelectorAll('[data-testid="plp-product-grid-item"], .ProductCard, [data-qa="plp-product"]');
const getPid = (container) => {
// Вариант 1: data-pid на entrypoint
const entry = container.querySelector('[id^="plp-product-summary-entrypoint-"]');
if (entry && entry.getAttribute('data-pid')) return entry.getAttribute('data-pid');
// Вариант 2: id="plp-product-summary-tile-<ID>"
const tile = container.closest('[id^="plp-product-summary-tile-"]') || container.querySelector('[id^="plp-product-summary-tile-"]');
if (tile) {
const m = (tile.id || '').match(/plp-product-summary-tile-([A-Za-z0-9]+)/);
if (m) return m[1];
}
// Вариант 3: вытащим из href вида .../<ID>#<ID> или .../T43162
const a = container.querySelector('a[href*="/style/"], a[data-testid^="product_summary_tile_"], a[href*="/p/"]');
if (a) {
const href = a.getAttribute('href') || '';
@ -499,30 +416,38 @@ class Fetcher:
};
const getPriceText = (container) => {
// охватим несколько вариантов
const priceRoots = [
const roots = [
container.querySelector('[data-testid="price"]'),
container.querySelector('[data-testid="ProductCard-Price"]'),
container.querySelector('[itemprop="price"]'),
container.querySelector('[data-testid^="product_summary_price"]'),
container.querySelector('[aria-label*="price" i]'),
container
].filter(Boolean);
for (const root of priceRoots) {
for (const root of roots) {
const spans = root.querySelectorAll('span, div');
for (const el of spans) {
const t = (el.textContent || '').trim();
if (!t) continue;
if (/\d/.test(t) && (t.includes('') || /PLN/i.test(t))) {
return t;
}
if (/\d/.test(t) && (t.includes('') || /PLN/i.test(t))) return t;
}
}
return null;
};
const getColor = (container) => {
const s1 = container.querySelector('[data-testid="product_summary_colour"], [data-testid="product_summary_color"]');
if (s1) return (s1.textContent || '').trim() || null;
// иногда цвет в title: "Natural Rib Soap Dispenser" берём первое слово, если оно выглядит как цвет (эвристика)
const t = getTitle(container);
if (t && t.split(' ').length > 1) {
const first = t.split(' ')[0];
if (first.length > 2 && /^[A-Za-z-]+$/.test(first)) return first; // простая эвристика
}
return null;
};
gridItems.forEach(container => {
// Основная ссылка
const link = container.querySelector('a[href*="/style/"], a[data-testid^="product_summary_tile_"], a[href*="/p/"]');
const href = link ? link.getAttribute('href') : null;
@ -531,20 +456,15 @@ class Fetcher:
title: getTitle(container),
url: getAbsUrl(href),
price_text: getPriceText(container),
currency: null
currency: null,
color: getColor(container)
};
if (rec.price_text) {
if (rec.price_text.includes('') || /PLN/i.test(rec.price_text)) rec.currency = 'PLN';
}
// фильтруем пустые карточки без ссылки и заголовка
if (rec.price_text && (rec.price_text.includes('') || /PLN/i.test(rec.price_text))) rec.currency = 'PLN';
if (rec.url || rec.title) out.push(rec);
});
// Удаляем дубли по id|url
const seen = new Set();
const uniq = [];
const seen = new Set(); const uniq = [];
for (const d of out) {
const key = `${d.id || ''}|${d.url || ''}`;
if (seen.has(key)) continue;
@ -563,19 +483,13 @@ class Fetcher:
return []
async def collect_products(self) -> List[Dict[str, Any]]:
"""
Унифицированный сбор: SSR (если есть) + DOM.
Нормализуем к: id, title, url, price(float|None), currency('PLN'|...).
"""
ssr = await self.read_ssr_product_summaries() or []
dom = await self.read_dom_products() or []
bykey: Dict[str, Dict[str, Any]] = {}
def key(d: Dict[str, Any]) -> str: return f"{(d.get('id') or '')}|{(d.get('url') or '')}"
def key(d: Dict[str, Any]) -> str:
return f"{(d.get('id') or '')}|{(d.get('url') or '')}"
# 1) Скелет из DOM
# DOM как база
for d in dom:
bykey[key(d)] = {
"id": d.get("id"),
@ -583,12 +497,12 @@ class Fetcher:
"url": d.get("url"),
"price_text": d.get("price_text"),
"currency": d.get("currency"),
"color": d.get("color"),
}
# 2) Обогащаем из SSR (если есть)
# Обогащаем SSR
for s in ssr:
cw = (s.get("colourway") or {})
# собрать абсолютный URL
url = None
try:
base = (s.get("baseUrl") or "").rstrip("/")
@ -607,6 +521,7 @@ class Fetcher:
"url": url,
"price_text": cw.get("price"),
"currency": s.get("currencyCode"),
"color": cw.get("color"), # ← цвет из SSR
}
else:
if not rec.get("title") and s.get("title"):
@ -615,8 +530,10 @@ class Fetcher:
rec["price_text"] = cw["price"]
if not rec.get("currency") and s.get("currencyCode"):
rec["currency"] = s["currencyCode"]
if not rec.get("color") and cw.get("color"):
rec["color"] = cw["color"]
# 3) Финальная нормализация цены
# Нормализация (без description — дополним на PDP)
out: List[Dict[str, Any]] = []
for v in bykey.values():
price_val = parse_pln_price_to_float(v.get("price_text"))
@ -627,10 +544,125 @@ class Fetcher:
"id": v.get("id"),
"title": v.get("title"),
"url": v.get("url"),
"price": price_val, # float или None
"currency": currency or "PLN"
"price": price_val,
"currency": (currency or "PLN").upper(),
"color": v.get("color"),
"description": None,
})
logging.info(f"Total collected (SSR+DOM): {len(out)}")
return out
# ---------- PDP enrichment ----------
async def _parse_pdp_page(self, page, url: str) -> Dict[str, Optional[str]]:
try:
await page.goto(url, timeout=self.cfg.get("pdp", {}).get("nav_timeout_ms", 45000),
wait_until="domcontentloaded")
# cookie баннер редко повторяется, но попробуем
try:
el = page.locator('#onetrust-accept-btn-handler')
if await el.count() > 0:
await el.first.click(timeout=1500)
except Exception:
pass
# ждём, чтобы гидратация успела
try:
await page.wait_for_load_state("networkidle", timeout=self.cfg.get("pdp", {}).get("wait_timeout_ms", 15000))
except Exception:
pass
js = """
() => {
const pickText = (sels) => {
for (const s of sels) {
const el = document.querySelector(s);
if (el) {
const t = (el.innerText || el.textContent || "").trim();
if (t) return t;
}
}
return null;
};
let desc = pickText([
'[data-testid="product-description"]',
'[data-testid="pdp-description"]',
'[data-testid="ProductDetail-Description"]',
'#product-description',
'[itemprop="description"]',
'.productDescription'
]);
let color = pickText([
'[data-testid="selectedColourName"]',
'[data-testid="selected-colour-name"]',
'[data-testid="colour-name"]',
'.selectedColourName',
'.colourName',
'span[data-testid*="colour"]'
]);
try {
const g = globalThis;
const pd = g?.ssrClientSettings?.productDetails || g?.ssrClientSettings?.productDetail || null;
const st = pd?._STATE_?.productDetails || pd?._STATE_?.productDetail || {};
if (!desc) desc = st?.details?.description || st?.description || null;
if (!color) color = st?.selectedColourway?.name || st?.selectedColourway?.colour || st?.colourway?.name || null;
} catch (e) {}
return { desc, color };
}
"""
data = await page.evaluate(js)
return {
"description": (data or {}).get("desc"),
"color": (data or {}).get("color"),
}
except Exception:
return {"description": None, "color": None}
async def enrich_with_pdp_details(self, items: List[Dict[str, Any]], max_concurrency: Optional[int] = None) -> List[Dict[str, Any]]:
"""
Для каждого товара с URL заходим на PDP и тянем description и (если нет) color.
"""
urls = []
for it in items:
u = it.get("url")
if u and u not in urls:
urls.append(u)
sem = asyncio.Semaphore(max_concurrency or self.cfg.get("pdp", {}).get("max_concurrency", 3))
async def worker(u: str) -> tuple[str, Dict[str, Optional[str]]]:
async with sem:
page = await self.context.new_page()
# те же блокировки, что и на PLP
await self.context.route("**/*", self._route)
try:
res = await self._parse_pdp_page(page, u)
finally:
try:
await page.close()
except Exception:
pass
return u, res
tasks = [worker(u) for u in urls]
results = {}
for fut in asyncio.as_completed(tasks):
u, res = await fut
results[u] = res
# апдейт элементов
for it in items:
u = it.get("url")
if not u:
continue
det = results.get(u) or {}
if not it.get("description") and det.get("description"):
it["description"] = det["description"]
if not it.get("color") and det.get("color"):
it["color"] = det["color"]
return items

View File

@ -2,12 +2,13 @@ import asyncio
import logging
from pathlib import Path
from typing import List, Tuple
from datetime import timedelta
from datetime import datetime, timedelta
import pandas as pd
import yaml
from fetcher import Fetcher, FetchError
from sink import write_outputs
from sink import write_outputs, write_master_excel
from models import Product
@ -24,6 +25,7 @@ def setup_logging():
],
)
def load_config() -> dict:
with open("config.yaml", "r", encoding="utf-8") as f:
return yaml.safe_load(f)
@ -38,6 +40,7 @@ def load_categories() -> List[Tuple[str, str]]:
Имя категории вычисляет из последнего сегмента URL.
"""
from urllib.parse import urlparse
xlsx = Path("categories.xlsx")
if not xlsx.exists():
# демо, если файл не создан
@ -72,33 +75,29 @@ def load_categories() -> List[Tuple[str, str]]:
# ---------- адаптер: dict -> Product ----------
def normalize_to_models(collected: List[dict]) -> List[Product]:
"""
Вход: элементы от fetcher.collect_products():
{ id, title, url, price(float|None), currency('PLN'|...), color, description }
Выход: список Product (минимально необходимые поля)
"""
out: List[Product] = []
for d in collected:
pid = d.get("id")
url = d.get("url")
title = d.get("title")
price_val = d.get("price") # float | None
currency = (d.get("currency") or "PLN").upper()
price_str = None
if price_val is not None:
try:
price_str = f"{float(price_val):.2f}"
except Exception:
price_str = None
price_val = d.get("price")
price_str = f"{float(price_val):.2f}" if isinstance(price_val, (int, float)) else None
out.append(Product(
product_id=str(pid) if pid is not None else None,
url=str(url) if url else None,
name=title,
product_id=str(d.get("id")) if d.get("id") is not None else None,
url=str(d.get("url")) if d.get("url") else None,
name=d.get("title"),
price=price_str,
currency=currency,
currency=(d.get("currency") or "PLN").upper(),
color=d.get("color"),
description=d.get("description"),
image_urls=[],
color=None,
size_variants=[]
))
return out
# ---------- основной сценарий ----------
async def run_category(fetcher: Fetcher, cfg: dict, name: str, url: str):
@ -108,12 +107,17 @@ async def run_category(fetcher: Fetcher, cfg: dict, name: str, url: str):
# доскроллить до полного количества (считает из шапки "(N)")
await fetcher.auto_scroll_until_total()
# собрать товары (SSR + DOM)
# собрать товары (SSR + DOM) и обогатить с PDP
collected = await fetcher.collect_products()
collected = await fetcher.enrich_with_pdp_details(
collected,
max_concurrency=cfg.get("pdp", {}).get("max_concurrency", 3)
)
products = normalize_to_models(collected)
# сохранить в xlsx/csv/jsonl
path, n = write_outputs(
path, n, rows = write_outputs(
category_name=name,
category_url=url,
products=products,
@ -123,11 +127,14 @@ async def run_category(fetcher: Fetcher, cfg: dict, name: str, url: str):
jsonl_also=cfg["output"].get("jsonl_also", True),
)
logging.info(f"{name}: {n} товаров → {path}")
return rows
except FetchError as e:
logging.error(f"Category failed: {name}{e}")
return []
except Exception as e:
logging.exception(f"Category crashed: {name}{e}")
return []
async def main_async():
@ -142,49 +149,20 @@ async def main_async():
master_rows: List[dict] = []
# Имя общего файла: all_YYYYMMDD_HHMMSS_UTC+3.xlsx
now_utc = pd.Timestamp.utcnow().to_pydatetime()
ts_utc_plus3 = (now_utc + timedelta(hours=3)).strftime("%Y%m%d_%H%M%S")
ts_utc_plus3 = (datetime.utcnow() + timedelta(hours=3)).strftime("%Y%m%d_%H%M%S")
all_filename = f"all_{ts_utc_plus3}_UTC+3.xlsx"
all_path = str(Path(cfg["output"]["folder"]) / all_filename)
async with Fetcher(cfg) as fetcher:
for name, url in categories:
# обычный прогон по категории
try:
logging.info(f"Category start: {name}{url}")
await fetcher.load_category(url)
await fetcher.auto_scroll_until_total()
collected = await fetcher.collect_products()
products = normalize_to_models(collected)
# запись percategory
path, n, rows = write_outputs(
category_name=name,
category_url=url,
products=products,
out_folder=cfg["output"]["folder"],
excel_prefix=cfg["output"]["excel_prefix"],
csv_also=cfg["output"].get("csv_also", True),
jsonl_also=cfg["output"].get("jsonl_also", True),
)
logging.info(f"{name}: {n} товаров → {path}")
# накапливаем в общий список
master_rows.extend(rows)
except FetchError as e:
logging.error(f"Category failed: {name}{e}")
except Exception as e:
logging.exception(f"Category crashed: {name}{e}")
rows = await run_category(fetcher, cfg, name, url)
master_rows.extend(rows)
# По завершении всех категорий — пишем общий XLSX
from sink import write_master_excel
all_written_path, total = write_master_excel(all_path, master_rows)
logging.info(f"◎ ALL: {total} товаров → {all_written_path}")
def main():
asyncio.run(main_async())

View File

@ -1,23 +1,27 @@
from pydantic import BaseModel, Field, HttpUrl
from typing import Optional, List
from pydantic import BaseModel, Field
class Product(BaseModel):
product_id: Optional[str] = Field(default=None)
url: Optional[HttpUrl] = None
product_id: Optional[str] = None
url: Optional[str] = None
name: Optional[str] = None
price: Optional[str] = None
currency: Optional[str] = None
image_urls: List[str] = []
color: Optional[str] = None
size_variants: List[str] = []
description: Optional[str] = None
image_urls: List[str] = Field(default_factory=list)
size_variants: List[str] = Field(default_factory=list)
class RowOut(BaseModel):
category_name: str
category_url: str
product_id: Optional[str]
url: Optional[str]
name: Optional[str]
price: Optional[str]
currency: Optional[str]
color: Optional[str]
images_joined: Optional[str]
product_id: Optional[str] = None
url: Optional[str] = None
name: Optional[str] = None
price: Optional[str] = None
currency: Optional[str] = None
color: Optional[str] = None
description: Optional[str] = None
images_joined: Optional[str] = None

View File

@ -3,47 +3,21 @@ from pathlib import Path
from typing import List, Dict, Any
from models import Product, RowOut
import hashlib, json, datetime
import re
# ---- Price parsing helpers ----
_PLN_PRICE_RE = re.compile(
r'(?<!\d)(\d{1,3}(?:[ \u00A0]?\d{3})*(?:[.,]\d{2})?)(?:\s*(?:zł|PLN))',
re.IGNORECASE,
)
def parse_pln_price_to_float(price_text: str | None) -> float | None:
"""
Из строки вида '1 299,00 zł' / '1299 zł' / '1 299 zł' достаём float 1299.00.
Возвращает None, если распарсить не удалось.
"""
if not price_text:
return None
t = (
price_text.replace("\u00a0", " ") # NBSP
.replace("\u2009", " ") # thin space
.strip()
)
m = _PLN_PRICE_RE.search(t)
if not m:
return None
num = m.group(1)
num = num.replace(" ", "").replace("\u00a0", "").replace("\u2009", "")
num = num.replace(",", ".")
try:
return float(num)
except Exception:
return None
def _as_str(v):
return str(v) if v is not None else ""
def _key_from_fields(product_id: str | None, url: str | None) -> str:
base = f"{_as_str(product_id)}|{_as_str(url)}"
return hashlib.md5(base.encode("utf-8")).hexdigest()
def _key(p: Product) -> str:
return _key_from_fields(p.product_id, _as_str(p.url))
def build_rows(category_name: str, category_url: str, products: List[Product]) -> List[Dict[str, Any]]:
"""Построить список строк RowOut (dict) из продуктов."""
rows: List[Dict[str, Any]] = []
@ -62,12 +36,15 @@ def build_rows(category_name: str, category_url: str, products: List[Product]) -
price=p.price,
currency=p.currency,
color=p.color,
description=p.description,
images_joined="\n".join(p.image_urls) if p.image_urls else None
).model_dump())
return rows
def write_outputs(category_name: str, category_url: str, products: List[Product], out_folder: str, excel_prefix: str, csv_also: bool, jsonl_also: bool):
"""Запись percategory файлов (xlsx + опционально csv/jsonl). Возвращает (excel_path, nrows, rows)."""
def write_outputs(category_name: str, category_url: str, products: List[Product],
out_folder: str, excel_prefix: str, csv_also: bool, jsonl_also: bool):
"""Запись per-category файлов (xlsx + опционально csv/jsonl). Возвращает (excel_path, nrows, rows)."""
Path(out_folder).mkdir(parents=True, exist_ok=True)
rows = build_rows(category_name, category_url, products)
@ -87,11 +64,11 @@ def write_outputs(category_name: str, category_url: str, products: List[Product]
return str(excel_path), len(rows), rows
def write_master_excel(all_path: str, rows: List[Dict[str, Any]]):
"""Записать общий XLSX (один лист AllProducts). Перезаписывает файл целиком один раз в конце."""
Path(all_path).parent.mkdir(parents=True, exist_ok=True)
if not rows:
# ничего не писать — пусто
return str(all_path), 0
# дедуп на всякий случай (по product_id|url)
seen: set[str] = set()

View File

@ -0,0 +1,714 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os, json, re, math, time, html, requests, datetime
from collections import Counter
from typing import List
from bs4 import BeautifulSoup
from openpyxl import Workbook
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
# ───────────────────────── ПУТИ / ФАЙЛЫ ───────────────────────────
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
RECORDS_DIR = os.path.join(BASE_DIR, "records_folder")
os.makedirs(RECORDS_DIR, exist_ok=True)
INPUT_FILE = os.path.join(BASE_DIR, "product_links.txt")
OUTPUT_FILE = os.path.join(RECORDS_DIR, "records.xlsx")
DICT_FILE = os.path.join(BASE_DIR, "dictionary_main.txt")
EXCL_FILE = os.path.join(BASE_DIR, "exclusion_materials.txt")
POST_LOG = os.path.join(RECORDS_DIR, "post_log.txt")
# ───────────────────────── НАСТРОЙКИ POST ─────────────────────────
POST_URL = os.getenv("IKEA_POST_URL", "http://localhost:3005/parser/data")
POST_API_KEY = os.getenv("IKEA_POST_API_KEY", "")
POST_TIMEOUT = 20
BATCH_SIZE = 50
# ───────────────────────── НАСТРОЙКИ САЙТА ────────────────────────
HEADERS = {
# Ближе к Windows Chrome
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/124.0.0.0 Safari/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "pl-PL,pl;q=0.9,en;q=0.8,ru;q=0.7",
"Cache-Control": "no-cache",
"Pragma": "no-cache",
}
CSS_SELECTOR = ".pip-product__subgrid.product-pip.js-product-pip"
BLOCKS = [
"buyModule",
"productSummary",
"pipPricePackage",
"productInformationSection",
"keyFacts",
"stockcheckSection",
"availabilityGroup",
"productGallery",
]
KEEP_COLUMNS = [
"availabilityGroup.serverOnlineSellable",
"availabilityGroup.storeHeader",
"buyModule.onlineSellable",
"buyModule.productName",
"buyModule.productPrice",
"buyModule.productType",
"keyFacts.ariaLabels",
"keyFacts.gaLabel",
"keyFacts.keyFacts",
"keyFacts.keyFacts_formatted",
"pipPricePackage.measurementText",
"pipPricePackage.productDescription",
"productGallery.urls",
"productInformationSection.dimensionProps",
"productInformationSection.dimensionProps_formatted",
"productInformationSection.dimensionProps_formatted_html_translated",
"productInformationSection.productDetailsProps",
"productInformationSection.productDetailsProps_formatted",
"productInformationSection.productDetailsProps_formatted_html",
"productSummary.description",
"productSummary.visibleItemNo",
"stockcheckSection.packagingProps",
"stockcheckSection.typeName",
"total brutto",
"prductVariantColorMeasure",
"categoryBreadcrumb",
"originalName",
"url",
]
# ───────────────────────── HTTP СЕССИЯ ────────────────────────────
def make_session() -> requests.Session:
s = requests.Session()
s.headers.update(HEADERS)
retries = Retry(
total=5,
backoff_factor=0.5,
status_forcelist=(403, 429, 500, 502, 503, 504),
allowed_methods=frozenset(["GET", "POST"])
)
s.mount("https://", HTTPAdapter(max_retries=retries))
s.mount("http://", HTTPAdapter(max_retries=retries))
# При необходимости задайте рыночные куки (пример, если нужен PL):
# s.cookies.set("ikeaMarket", "PL")
# s.cookies.set("ikeaCurrency", "PLN")
return s
SESSION = make_session()
# ───────────────────────── УТИЛИТЫ I/O ────────────────────────────
def ask_bool(prompt: str, default: str = "1") -> bool:
try:
val = input(f"{prompt} (1=yes, 0=no) [{default}]: ").strip() or default
except EOFError:
val = default
return val == "1"
def _post_log(msg: str):
try:
with open(POST_LOG, "a", encoding="utf-8") as f:
f.write(msg.rstrip() + "\n")
except Exception:
pass
def _now_tag():
return datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
def _save_json_batch(payload: dict, batch_index: int):
fname = f"ikea_batch_{_now_tag()}_{batch_index:04d}.json"
fpath = os.path.join(RECORDS_DIR, fname)
with open(fpath, "w", encoding="utf-8") as fh:
json.dump(payload, fh, ensure_ascii=False, indent=2)
print(f"💾 JSON saved: {fname}")
return fpath
def _save_debug_html(url: str, text: str, prefix: str = "debug"):
try:
safe = re.sub(r"[^a-zA-Z0-9]+", "_", url)[:80]
fname = f"{prefix}_{_now_tag()}_{safe}.html"
fpath = os.path.join(RECORDS_DIR, fname)
with open(fpath, "w", encoding="utf-8") as fh:
fh.write(text)
print(f"🧪 Saved HTML snapshot: {fname}")
except Exception:
pass
# ───────────────────────── СЛОВАРИ / ФИЛЬТРЫ ──────────────────────
def load_dictionary(path: str) -> dict:
if not os.path.isfile(path):
return {}
txt = open(path, "r", encoding="utf-8").read()
pairs = re.findall(r'"([^"]+)"\s*:\s*"([^"]+)"', txt)
return {k: v for k, v in pairs}
DICT = load_dictionary(DICT_FILE)
def translate_token(token: str) -> str:
return DICT.get(token, token)
def load_exclusions(path: str) -> set:
if not os.path.isfile(path):
return set()
txt = open(path, "r", encoding="utf-8").read()
quoted = re.findall(r'"([^"]+)"', txt, flags=re.S)
tokens = quoted if quoted else re.split(r"[,;\n\r]+", txt)
return {t.strip().lower() for t in tokens if t.strip()}
EXCLUSIONS = load_exclusions(EXCL_FILE)
def materials_from_details_json(details: dict) -> List[str]:
out: List[str] = []
def walk(node):
if isinstance(node, dict):
for k, v in node.items():
if k == "material" and isinstance(v, str):
out.append(v)
else:
walk(v)
elif isinstance(node, list):
for x in node:
walk(x)
walk(details or {})
return out
def materials_match_exclusions(details: dict, exclusion_tokens: set) -> bool:
if not exclusion_tokens:
return False
mats = materials_from_details_json(details)
joined = "\n".join(mats).lower()
return any(tok in joined for tok in exclusion_tokens)
# ───────────────────────── ФОРМАТТЕРЫ ─────────────────────────────
def _parse_json_value(val):
if isinstance(val, (dict, list)) or val is None:
return val
if isinstance(val, str):
s = val.strip()
if not s:
return val
try:
return json.loads(s)
except Exception:
return val
return val
def flatten_block(block_name, data):
if not isinstance(data, dict):
return {}
flat = {}
for k, v in data.items():
if block_name == "productGallery" and k == "mediaList":
if isinstance(v, list):
urls = []
for item in v:
content = item.get("content", {})
if isinstance(content, dict) and "url" in content:
urls.append(content["url"])
flat["productGallery.urls"] = "\n".join(urls)
return flat
key = f"{block_name}.{k}"
flat[key] = v
return flat
def format_keyfacts(raw_keyfacts):
if not isinstance(raw_keyfacts, list):
return ""
out = []
header_added = False
for el in raw_keyfacts:
lbl = (el or {}).get("label")
name = (el or {}).get("name", "Właściwości")
if not header_added:
out.append(name)
header_added = True
if lbl:
out.append(lbl)
return "\n".join(out)
def _fmt_float(x):
try:
return f"{float(x):.2f}".rstrip("0").rstrip(".")
except Exception:
return ""
def _collect_packaging_total_kg(packaging):
total = 0.0
if not isinstance(packaging, dict):
return total
content = (packaging.get("contentProps") or {}).get("packages") or []
for pkg in content:
qty = ((pkg.get("quantity") or {}).get("value")) or 1
ms = pkg.get("measurements") or []
for block in ms:
if not isinstance(block, list):
continue
weight_lbl = next((m for m in block if (m.get("type") == "weight" or m.get("label") == "Waga")), None)
if weight_lbl and isinstance(weight_lbl.get("value"), (int, float)):
total += float(weight_lbl["value"]) * (qty or 1)
return total
def format_dimensions(raw_dim_props, with_html=False, translated=False):
if not isinstance(raw_dim_props, dict):
return ""
lines = []
br = "<br/>" if with_html else "\n"
title = translate_token("Wymiary") if translated else "Wymiary"
lines.append(f"<strong>{title}</strong>" if with_html else title)
for d in raw_dim_props.get("dimensions", []):
name = d.get("name", "")
meas = d.get("measure", "")
if not name and not meas:
continue
if translated:
name_t = translate_token(name)
line = f"{name_t}: {meas}".strip()
else:
line = f"{name}: {meas}".strip()
lines.append(line)
pack = (raw_dim_props.get("packaging") or {})
pack_title = translate_token("Opakowanie") if translated else "Opakowanie"
lines.append(br if with_html else "")
lines.append(f"<strong>{pack_title}</strong>" if with_html else pack_title)
content = (pack.get("contentProps") or {}).get("packages") or []
for pkg in content:
name = pkg.get("name") or ""
if name:
lines.append(name)
art = (pkg.get("articleNumber") or {}).get("value")
if art:
art_lbl = "Numer artykułu"
if translated:
art_lbl = translate_token(art_lbl)
lines.append(art_lbl)
lines.append(f"{art}")
ms = pkg.get("measurements") or []
for block in ms:
if not isinstance(block, list):
continue
for m in block:
lbl = m.get("label", "")
txt = m.get("text", "")
if translated:
lbl = translate_token(lbl) if lbl else lbl
if lbl or txt:
lines.append(f"{lbl}: {txt}".strip(": "))
q_val = ((pkg.get("quantity") or {}).get("value"))
if q_val:
q_lbl = "Paczka(i)"
if translated:
q_lbl = translate_token(q_lbl)
lines.append(f"{q_lbl}: {q_val}")
if with_html:
s = br.join([x for x in lines if x is not None])
s = re.sub(r"(" + re.escape(br) + r"){2,}", br*2, s)
s = s.strip(br)
if s.startswith("strong>"):
s = "<" + s
return s
return "\n".join([x for x in lines if x is not None]).strip()
def format_product_details(raw_details, add_summary_desc="", with_html=False, skip_assembly=True):
if not isinstance(raw_details, dict):
return add_summary_desc if with_html else add_summary_desc
br = "<br/>" if with_html else "\n"
out = []
if add_summary_desc:
out.append(add_summary_desc)
out.append(br if with_html else "")
t1 = "Informacje o produkcie"
out.append(f"<strong>{t1}</strong>" if with_html else t1)
pd = (raw_details.get("productDescriptionProps") or {})
paragraphs = pd.get("paragraphs") or []
for p in paragraphs:
out.append(p)
dlabel = pd.get("designerLabel")
dname = pd.get("designerName")
if dlabel and dname:
out.append(dlabel)
out.append(dname)
if raw_details.get("productId"):
out.append("Numer artykułu")
out.append(raw_details["productId"])
acc = (raw_details.get("accordionObject") or {})
gk = ((acc.get("goodToKnow") or {}).get("contentProps") or {}).get("goodToKnow") or []
if gk:
out.append(br if with_html else "")
t2 = "Dobrze wiedzieć"
out.append(f"<strong>{t2}</strong>" if with_html else t2)
for item in gk:
txt = item.get("text")
if txt:
out.append(txt)
mac = (acc.get("materialsAndCare") or {}).get("contentProps") or {}
mats = mac.get("materials") or []
care = mac.get("careInstructions") or []
t3 = "Materiały i pielęgnacja"
if mats or care:
out.append(br if with_html else "")
out.append(f"<strong>{t3}</strong>" if with_html else t3)
if mats:
out.append("Materiały")
for m in mats:
ptype = m.get("productType", "")
for mat in (m.get("materials") or []):
material = mat.get("material", "")
if ptype:
out.append(ptype)
if material:
out.append(material)
if care:
detailsCareText = mac.get("detailsCareText", "Pielęgnacja")
out.append(detailsCareText)
for c in care:
ptype = c.get("productType", "")
texts = c.get("texts") or []
if ptype:
out.append(ptype)
for t in texts:
out.append(t)
safety = (raw_details.get("safetyAndCompliance") or {}).get("contentProps") or {}
sc = safety.get("safetyAndCompliance") or []
if sc:
out.append(br if with_html else "")
t4 = "Bezpieczeństwo i zgodność z przepisami"
out.append(f"<strong>{t4}</strong>" if with_html else t4)
for s in sc:
txt = s.get("text")
if txt:
out.append(txt)
if with_html:
s = br.join([x for x in out if x is not None])
s = re.sub(r"(" + re.escape(br) + r"){2,}", br*2, s)
return s.strip(br)
return "\n".join([x for x in out if x is not None]).strip()
def build_variant_color_measure(desc: str, type_name: str, measurement: str) -> str:
s = (desc or "")
t = (type_name or "").strip()
if t:
pattern = r"^\s*" + re.escape(t) + r"[\s,;:\-–—/]*"
s = re.sub(pattern, "", s, flags=re.IGNORECASE)
if not re.search(r"[0-9A-Za-zА-Яа-яЁёÀ-ž]", s or ""):
s = ""
s = s.strip()
meas = (measurement or "").strip()
if not s:
return meas if meas else ""
s = s[:1].upper() + s[1:]
return f"{s}, {meas}" if meas else s
# ───────────────────── СКРАПИНГ КАРТОЧКИ ──────────────────────────
def extract_data(url: str) -> dict:
try:
resp = SESSION.get(url, timeout=20, allow_redirects=True)
status = resp.status_code
if status != 200 or not resp.text or "data-hydration-props" not in resp.text:
_save_debug_html(url, resp.text, prefix=f"resp{status}")
resp.raise_for_status()
soup = BeautifulSoup(resp.text, "html.parser")
target = soup.select_one(CSS_SELECTOR)
if not target:
_save_debug_html(url, resp.text, prefix="no_selector")
return {"url": url, "error": "CSS selector not found", "http_status": status}
raw = target.get("data-hydration-props")
if not raw:
_save_debug_html(url, resp.text, prefix="no_hydration")
return {"url": url, "error": "data-hydration-props not found", "http_status": status}
decoded = html.unescape(raw)
full_json = json.loads(decoded)
result = {"url": url}
for block in BLOCKS:
result.update(flatten_block(block, full_json.get(block, {})))
kf_json = _parse_json_value(result.get("keyFacts.keyFacts"))
dim_json = _parse_json_value(result.get("productInformationSection.dimensionProps"))
det_json = _parse_json_value(result.get("productInformationSection.productDetailsProps"))
result["keyFacts.keyFacts_formatted"] = format_keyfacts(kf_json)
result["productInformationSection.dimensionProps_formatted"] = format_dimensions(dim_json, with_html=False, translated=False)
html_trans = format_dimensions(dim_json, with_html=True, translated=True)
if isinstance(html_trans, str) and html_trans.startswith("strong>"):
html_trans = "<" + html_trans
result["productInformationSection.dimensionProps_formatted_html_translated"] = html_trans
total_kg = _collect_packaging_total_kg((dim_json or {}).get("packaging") or {})
result["total brutto"] = _fmt_float(total_kg)
summary_desc = result.get("productSummary.description", "") or ""
result["productInformationSection.productDetailsProps_formatted"] = format_product_details(det_json, add_summary_desc=summary_desc, with_html=False, skip_assembly=True)
result["productInformationSection.productDetailsProps_formatted_html"] = format_product_details(det_json, add_summary_desc=summary_desc, with_html=True, skip_assembly=True)
desc = result.get("pipPricePackage.productDescription", "") or ""
tname = result.get("stockcheckSection.typeName", "") or ""
meas = result.get("pipPricePackage.measurementText", "") or ""
result["prductVariantColorMeasure"] = build_variant_color_measure(desc, tname, meas)
# breadcrumb
breadcrumb = None
for tag in soup.find_all("script", attrs={"type": lambda t: t and "ld+json" in t}):
try:
data = json.loads(tag.string)
except Exception:
continue
if isinstance(data, list):
data = next((d for d in data if isinstance(d, dict) and d.get("@type") == "BreadcrumbList"), None)
if isinstance(data, dict) and data.get("@type") == "BreadcrumbList":
items = data.get("itemListElement", [])
names = [it.get("name", "") for it in items]
breadcrumb = "/".join(names)
break
if breadcrumb:
result["categoryBreadcrumb"] = breadcrumb
# применяем whitelist
filtered = {k: result.get(k) for k in KEEP_COLUMNS if k != "originalName"}
# originalName = productName + " " + typeName (без двойных пробелов)
pn = (result.get("buyModule.productName") or "").strip()
tn = (result.get("stockcheckSection.typeName") or "").strip()
if pn and tn:
orig_name = f"{pn} {tn}"
else:
orig_name = pn or tn
filtered["originalName"] = orig_name
return filtered
except Exception as e:
return {"url": url, "error": str(e), "http_status": None}
# ───────────────────── ПОСТРОЕНИЕ ВАРИАНТА / POST ─────────────────
def _split_color_size(text: str):
if not text:
return "", ""
parts = [p.strip() for p in text.split(",", 1)]
if len(parts) == 2:
return parts[0], parts[1]
return "", parts[0]
def _ceil_price(v):
try:
return int(math.ceil(float(v)))
except Exception:
return None
def _ceil_int(v):
try:
return int(math.ceil(float(v)))
except Exception:
return None
def build_variant(row: dict) -> dict:
category_name = row.get("categoryBreadcrumb") or ""
brand_name = "ikea"
visible = row.get("productSummary.visibleItemNo") or ""
sku = visible.replace(" ", "")
csm = (row.get("prductVariantColorMeasure") or "").strip()
color, size = _split_color_size(csm)
if not color and not size:
size = (row.get("pipPricePackage.measurementText") or "").strip()
cost = _ceil_price(row.get("buyModule.productPrice"))
url = row.get("url") or ""
name = row.get("originalName") or row.get("buyModule.productName") or ""
desc_html = row.get("productInformationSection.productDetailsProps_formatted_html") or ""
composition_html = row.get("productInformationSection.dimensionProps_formatted_html_translated") or ""
imgs = []
raw_imgs = row.get("productGallery.urls") or ""
if isinstance(raw_imgs, str):
imgs = [x for x in raw_imgs.split("\n") if x.strip()]
in_stock = bool(row.get("availabilityGroup.serverOnlineSellable"))
if not in_stock:
in_stock = bool(row.get("buyModule.onlineSellable"))
weight_kg = _ceil_int(row.get("total brutto"))
variant = {
"status_id": 1,
"color": color.capitalize() if color else "none",
"sku": sku,
"size": size,
"cost": cost,
"originalUrl": url,
"originalName": name,
"originalDescription": desc_html,
"originalComposition": composition_html,
"images": imgs,
"inStock": in_stock,
"weight": weight_kg if weight_kg is not None else 0,
}
return {
# Временно по вашему запросу:
"category": {"name": "TEST/IKEA"},
"brand": {"name": "ikea"},
"variant": variant,
}
def post_payload(payload: dict) -> dict:
headers = {"Content-Type": "application/json"}
if POST_API_KEY:
headers["Authorization"] = f"Bearer {POST_API_KEY}"
body = json.dumps(payload, ensure_ascii=False)
_post_log(f"→ POST {POST_URL}\nHeaders: {headers}\nBody: {body}")
try:
r = SESSION.post(POST_URL, headers=headers, data=body.encode("utf-8"), timeout=POST_TIMEOUT)
text = r.text
_post_log(f"{r.status_code}\n{text}\n{'-'*60}")
ok = 200 <= r.status_code < 300
return {"ok": ok, "status": r.status_code, "response": text}
except Exception as e:
_post_log(f"× ERROR: {e}\n{'-'*60}")
return {"ok": False, "status": None, "error": str(e)}
# ───────────────────────── СЕРДЦЕ СКРИПТА ─────────────────────────
def safe_cell(val):
if isinstance(val, (dict, list)):
return json.dumps(val, ensure_ascii=False)
return "" if val is None else val
def _clean_url(u: str) -> str:
if not isinstance(u, str):
return ""
u = u.strip().replace("\t", " ")
u = u.replace("\ufeff", "").replace("\xa0", "")
u = u.strip("\r\n ")
return u
def main():
SAVE_JSON = ask_bool("SAVE_JSON (сохранять JSON на диск?)", "1")
SEND_JSON = ask_bool("SEND_JSON (отправлять на API?)", "1")
# читаем ссылки (utf-8-sig для BOM)
with open(INPUT_FILE, "r", encoding="utf-8-sig", newline="") as f:
raw_lines = f.readlines()
links = [_clean_url(x) for x in raw_lines if _clean_url(x)]
print(f"Всего ссылок: {len(links)}")
# готовим Excel
wb = Workbook()
ws = wb.active
ws.title = "IKEA Products"
ws.append(KEEP_COLUMNS)
# батч для JSON/API
batch_items = []
batch_index = 1
STATUS_COUNTER = Counter()
def flush_batch():
nonlocal batch_items, batch_index
if not batch_items:
return
payload = {"parserName": "ikea", "items": batch_items}
if SAVE_JSON:
_save_json_batch(payload, batch_index)
if SEND_JSON:
res = post_payload(payload)
ok = res.get("ok")
print(f"POST batch {batch_index}: {'OK' if ok else 'FAIL'} (status={res.get('status')})")
batch_index += 1
batch_items = []
for idx, link in enumerate(links, 1):
print(f"[{idx}/{len(links)}] {link}")
row = extract_data(link)
# учёт статусов
st = row.get("http_status")
if st is None and "error" in row:
STATUS_COUNTER["err"] += 1
else:
STATUS_COUNTER[str(st or 200)] += 1
# пишем в Excel
ws.append([safe_cell(row.get(col, "")) for col in KEEP_COLUMNS])
# ФИЛЬТРЫ для JSON/API
try:
price = float(row.get("buyModule.productPrice") or 0)
except Exception:
price = 0.0
try:
total_kg = float(row.get("total brutto") or 0)
except Exception:
total_kg = 0.0
details_json = row.get("productInformationSection.productDetailsProps") or {}
if not (20 <= price <= 1500):
pass
elif total_kg > 30:
pass
elif materials_match_exclusions(details_json, EXCLUSIONS):
pass
else:
try:
item = build_variant(row)
batch_items.append(item)
except Exception as e:
_post_log(f"× build_variant error for {link}: {e}")
# авто-сейв Excel каждые 50 строк
if idx % 50 == 0:
wb.save(OUTPUT_FILE)
print(f"💾 autosave: {OUTPUT_FILE}")
# флаш батча при достижении лимита
if len(batch_items) >= BATCH_SIZE:
flush_batch()
# финал
wb.save(OUTPUT_FILE)
print(f"\n✅ Excel готов: {OUTPUT_FILE}")
flush_batch()
# сводка по HTTP
print("HTTP stats:", dict(STATUS_COUNTER))
print("🎯 Готово.")
if __name__ == "__main__":
main()