MacOS_Parsers/Parser_NEXT/fetcher.py
2025-08-25 14:46:19 +03:00

669 lines
26 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import asyncio
import logging
import os
import json
import re
from datetime import datetime
from pathlib import Path
from typing import List, Dict, Any, Optional
from playwright.async_api import async_playwright
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
class FetchError(Exception):
pass
# ---- Price parsing helpers ----
_PLN_PRICE_RE = re.compile(
r'(?<!\d)(\d{1,3}(?:[ \u00A0]?\d{3})*(?:[.,]\d{2})?)(?:\s*(?:zł|PLN))',
re.IGNORECASE,
)
def parse_pln_price_to_float(price_text: str | None) -> float | None:
if not price_text:
return None
t = (
price_text.replace("\u00a0", " ")
.replace("\u2009", " ")
.strip()
)
m = _PLN_PRICE_RE.search(t)
if not m:
return None
num = m.group(1)
num = num.replace(" ", "").replace("\u00a0", "").replace("\u2009", "")
num = num.replace(",", ".")
try:
return float(num)
except Exception:
return None
class Fetcher:
"""
Playwright layer + инструменты:
- Лёгкий рендер (блокируем image/font/media, оставляем CSS).
- PLP: скролл до полного количества, сбор SSR + DOM.
- PDP: обогащение color/description.
- Дампы HTML/PNG в out/raw_html для отладки.
"""
def __init__(self, cfg: Dict[str, Any]):
self.cfg = cfg
self.base_url = cfg.get("base_url")
self.xhr_patterns = [re.compile(p) for p in cfg.get("xhr_patterns", [])]
self.collected_xhr: List[Dict[str, Any]] = []
async def __aenter__(self):
self.playwright = await async_playwright().start()
args = ["--disable-dev-shm-usage", "--no-sandbox"]
self.browser = await self.playwright.chromium.launch(
headless=self.cfg.get("headless", True),
args=args,
devtools=not self.cfg.get("headless", True),
)
self.context = await self.browser.new_context(
locale=self.cfg.get("locale", "en-GB"),
timezone_id=self.cfg.get("timezoneId", "Europe/Warsaw"),
user_agent=(
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/124.0.0.0 Safari/537.36"
),
viewport={"width": 1366, "height": 900},
)
self.page = await self.context.new_page()
await self.context.route("**/*", self._route)
self.page.on("response", self._on_response)
self.page.on("console", lambda msg: logging.debug(f"[page.console] {msg.type} {msg.text}"))
return self
async def __aexit__(self, exc_type, exc, tb):
await self.context.close()
await self.browser.close()
await self.playwright.stop()
async def _route(self, route, request):
if request.resource_type in ["font", "media", "image"]:
return await route.abort()
return await route.continue_()
def _on_response(self, response):
try:
url = response.url
if any(p.search(url) for p in self.xhr_patterns):
if "application/json" in (response.headers.get("content-type", "")):
self.collected_xhr.append({"url": url, "response": response})
except Exception:
pass
async def _dump_debug(self, tag: str):
try:
raw_dir = Path("out/raw_html").resolve()
raw_dir.mkdir(parents=True, exist_ok=True)
ts = datetime.utcnow().strftime("%Y%m%d_%H%M%S_%f")
html_path = raw_dir / f"{ts}_{tag}.html"
png_path = raw_dir / f"{ts}_{tag}.png"
logging.info(f"[dump_debug] CWD={Path(os.getcwd()).resolve()} → html={html_path} png={png_path}")
try:
html = await self.page.content()
html_path.write_text(html, encoding="utf-8")
except Exception as e:
logging.warning(f"[dump_debug] writing HTML failed: {e}")
try:
await self.page.screenshot(path=str(png_path), full_page=True)
except Exception as e:
logging.warning(f"[dump_debug] screenshot failed: {e}")
logging.info(f"[dump_debug] saved OK: {html_path.name}, {png_path.name}")
except Exception as e:
logging.warning(f"[dump_debug] general fail: {e}")
async def _accept_cookies_if_any(self):
selectors = [
"#onetrust-accept-btn-handler",
"button#onetrust-accept-btn-handler",
'button:has-text("Accept all")',
'button:has-text("Accept All")',
'button[aria-label*="Accept"]',
]
for sel in selectors:
try:
el = self.page.locator(sel)
if await el.count() > 0:
await el.first.click(timeout=2000)
logging.info("Cookie banner accepted.")
break
except Exception:
pass
async def _log_plp_state(self, stage: str):
try:
scripts_count = await self.page.locator('script[id^="next-product-summary-script-"]').count()
except Exception:
scripts_count = -1
try:
has_window = await self.page.evaluate("""() => {
const ps = globalThis?.ssrClientSettings?.productSummary;
return !!(ps && Array.isArray(ps.itemNumbers) && ps.itemNumbers.length > 0);
}""")
except Exception:
has_window = False
logging.info(f"[{stage}] scripts: {scripts_count}, window.ps: {has_window}")
@retry(stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=1, max=8),
retry=retry_if_exception_type(FetchError))
async def load_category(self, url: str):
try:
await self.page.goto(url, timeout=self.cfg.get("nav_timeout_ms", 60000), wait_until="domcontentloaded")
await self._dump_debug("after_goto")
await self._accept_cookies_if_any()
await self._dump_debug("after_cookies")
await self._log_plp_state("after_accept")
for _ in range(3):
await self.page.mouse.wheel(0, 1600)
await self.page.wait_for_timeout(300)
await self._dump_debug("after_warmup")
await self._log_plp_state("after_warmup")
await self.page.wait_for_selector('script[id^="next-product-summary-script-"]',
state="attached",
timeout=self.cfg.get("wait_timeout_ms", 30000))
await self._dump_debug("after_scripts_present")
try:
await self.page.wait_for_function(
"""() => {
const ps = globalThis?.ssrClientSettings?.productSummary;
return !!(ps && Array.isArray(ps.itemNumbers) && ps.itemNumbers.length > 0);
}""",
timeout=5000,
)
except Exception:
logging.info("window.ssrClientSettings not ready (non-fatal).")
await self._dump_debug("after_window_check")
return True
except Exception as e:
logging.error(f"load_category failed: {e}")
await self._dump_debug("fail_load_category")
raise FetchError(str(e))
async def read_total_from_header(self) -> Optional[int]:
sels = ["#plp-seo-heading .esi-count", ".esi-count"]
for sel in sels:
try:
el = self.page.locator(sel)
if await el.count() > 0:
txt = await el.first.inner_text(timeout=1500)
digits = "".join(ch for ch in txt if ch.isdigit())
if digits:
total = int(digits)
logging.info(f"Total from header: {total}")
return total
except Exception:
continue
logging.info("Total from header: not found")
return None
async def auto_scroll_until_total(self, hard_max_scrolls: Optional[int] = None):
hard_cap = hard_max_scrolls or self.cfg.get("scroll", {}).get("hard_max_scrolls", 2000)
netidle_ms = self.cfg.get("scroll", {}).get("wait_networkidle_timeout_ms", 8000)
sel_tiles = '[data-testid="plp-product-grid-item"], [data-testid="product-tile"], .ProductCard, [data-qa="plp-product"]'
target = await self.read_total_from_header()
last = 0
same_ticks = 0
same_limit = self.cfg.get("scroll", {}).get("stop_if_no_new_items_after", 8)
for i in range(hard_cap):
try:
await self.page.evaluate("window.scrollTo(0, document.body.scrollHeight)")
except Exception:
pass
try:
await self.page.wait_for_load_state("networkidle", timeout=netidle_ms)
except Exception:
await asyncio.sleep(0.25)
try:
await self.page.mouse.wheel(0, -200); await asyncio.sleep(0.1)
await self.page.mouse.wheel(0, 1200)
except Exception:
pass
try:
seen = await self.page.locator(sel_tiles).count()
except Exception:
seen = last
if target and seen >= target:
logging.info(f"Reached target: seen {seen}/{target} (i={i})")
break
if seen <= last:
same_ticks += 1
if same_ticks >= same_limit:
logging.info(f"No growth for a while: seen={seen}, i={i}")
break
else:
same_ticks = 0
last = seen
logging.info(f"Final seen items: {last} (target={target}, cap={hard_cap})")
async def current_html(self) -> str:
return await self.page.content()
async def extract_xhr_json(self) -> List[Dict[str, Any]]:
results = []
for entry in self.collected_xhr:
try:
body = await entry["response"].json()
results.append({"url": entry["url"], "json": body})
except Exception:
pass
return results
# ---------- PLP: SSR + DOM ----------
async def read_ssr_product_summaries(self) -> List[Dict[str, Any]]:
# 1) window.*
js_window = """
() => {
const out = [];
const ps = globalThis?.ssrClientSettings?.productSummary;
if (!ps) return out;
const ids = Array.isArray(ps.itemNumbers) ? ps.itemNumbers : [];
for (const id of ids) {
const obj = ps[id];
if (!obj) continue;
const sd = obj?._STATE_?.productSummary?.summaryData;
if (!sd) continue;
const cw = Array.isArray(sd.colourways) && sd.colourways.length ? sd.colourways[0] : null;
out.push({
id: sd.id || null,
title: sd.title || null,
baseUrl: sd.baseUrl || null,
brand: sd.brand || null,
category: sd.category || null,
currencyCode: sd.currencyCode || null,
colourway: cw ? {
id: cw.id ?? null,
url: cw.url ?? null,
color: cw.c ?? null,
title: cw.t ?? null,
price: cw.p ?? null,
priceMarket: cw.mp ?? null,
selected: !!cw.s
} : null,
imageCdnUrl: sd.imageCdnUrl || null,
productImageUrlPart: sd.productImageUrlPart || null,
lgImagePath: sd.lgImagePath || null
});
}
return out;
}
"""
try:
w = await self.page.evaluate(js_window)
if isinstance(w, list) and w:
logging.info(f"SSR(window) summaries: {len(w)}")
return w
except Exception:
pass
# 2) inline <script>
js_scripts = """
() => {
const list = Array.from(document.querySelectorAll('script[id^="next-product-summary-script-"]'));
return list.map(s => s.textContent || "");
}
"""
try:
texts = await self.page.evaluate(js_scripts)
except Exception:
return []
out: List[Dict[str, Any]] = []
assign_re = re.compile(r'productSummary\s*\[\s*([\'"])(.*?)\1\s*\]\s*=\s*\{')
for t in texts or []:
for m in assign_re.finditer(t):
start = m.end() - 1
depth = 0
end = None
for i in range(start, len(t)):
ch = t[i]
if ch == "{":
depth += 1
elif ch == "}":
depth -= 1
if depth == 0:
end = i + 1
break
if end is None:
continue
block = t[start:end]
try:
data = json.loads(block)
sd = data.get("_STATE_", {}).get("productSummary", {}).get("summaryData", {})
cws = sd.get("colourways") or []
cw = cws[0] if cws else None
out.append({
"id": sd.get("id"),
"title": sd.get("title"),
"baseUrl": sd.get("baseUrl"),
"brand": sd.get("brand"),
"category": sd.get("category"),
"currencyCode": sd.get("currencyCode"),
"colourway": ({
"id": cw.get("id"),
"url": cw.get("url"),
"color": cw.get("c"),
"title": cw.get("t"),
"price": cw.get("p"),
"priceMarket": cw.get("mp"),
"selected": bool(cw.get("s")),
} if cw else None),
"imageCdnUrl": sd.get("imageCdnUrl"),
"productImageUrlPart": sd.get("productImageUrlPart"),
"lgImagePath": sd.get("lgImagePath"),
})
except Exception:
continue
logging.info(f"SSR(scripts) summaries: {len(out)}")
return out
async def read_dom_products(self) -> List[Dict[str, Any]]:
js = r"""
() => {
const out = [];
const gridItems = document.querySelectorAll('[data-testid="plp-product-grid-item"], .ProductCard, [data-qa="plp-product"]');
const getPid = (container) => {
const entry = container.querySelector('[id^="plp-product-summary-entrypoint-"]');
if (entry && entry.getAttribute('data-pid')) return entry.getAttribute('data-pid');
const tile = container.closest('[id^="plp-product-summary-tile-"]') || container.querySelector('[id^="plp-product-summary-tile-"]');
if (tile) {
const m = (tile.id || '').match(/plp-product-summary-tile-([A-Za-z0-9]+)/);
if (m) return m[1];
}
const a = container.querySelector('a[href*="/style/"], a[data-testid^="product_summary_tile_"], a[href*="/p/"]');
if (a) {
const href = a.getAttribute('href') || '';
const m2 = href.match(/([A-Z]\d{4,})/i);
if (m2) return m2[1].toUpperCase();
}
return null;
};
const getAbsUrl = (href) => {
try {
if (!href) return null;
if (/^https?:\/\//i.test(href)) return href;
const a = document.createElement('a');
a.href = href;
return a.href;
} catch { return href || null; }
};
const getTitle = (container) => {
const t1 = container.querySelector('[data-testid="product_summary_title"]');
if (t1) return (t1.getAttribute('data-label') || t1.textContent || '').trim();
const t2 = container.querySelector('[data-testid="product-name"], .productName, [itemprop="name"]');
if (t2) return (t2.textContent || '').trim();
return null;
};
const getPriceText = (container) => {
const roots = [
container.querySelector('[data-testid="price"]'),
container.querySelector('[data-testid="ProductCard-Price"]'),
container.querySelector('[itemprop="price"]'),
container.querySelector('[data-testid^="product_summary_price"]'),
container.querySelector('[aria-label*="price" i]'),
container
].filter(Boolean);
for (const root of roots) {
const spans = root.querySelectorAll('span, div');
for (const el of spans) {
const t = (el.textContent || '').trim();
if (!t) continue;
if (/\d/.test(t) && (t.includes('') || /PLN/i.test(t))) return t;
}
}
return null;
};
const getColor = (container) => {
const s1 = container.querySelector('[data-testid="product_summary_colour"], [data-testid="product_summary_color"]');
if (s1) return (s1.textContent || '').trim() || null;
// иногда цвет в title: "Natural Rib Soap Dispenser" — берём первое слово, если оно выглядит как цвет (эвристика)
const t = getTitle(container);
if (t && t.split(' ').length > 1) {
const first = t.split(' ')[0];
if (first.length > 2 && /^[A-Za-z-]+$/.test(first)) return first; // простая эвристика
}
return null;
};
gridItems.forEach(container => {
const link = container.querySelector('a[href*="/style/"], a[data-testid^="product_summary_tile_"], a[href*="/p/"]');
const href = link ? link.getAttribute('href') : null;
const rec = {
id: getPid(container),
title: getTitle(container),
url: getAbsUrl(href),
price_text: getPriceText(container),
currency: null,
color: getColor(container)
};
if (rec.price_text && (rec.price_text.includes('') || /PLN/i.test(rec.price_text))) rec.currency = 'PLN';
if (rec.url || rec.title) out.push(rec);
});
const seen = new Set(); const uniq = [];
for (const d of out) {
const key = `${d.id || ''}|${d.url || ''}`;
if (seen.has(key)) continue;
seen.add(key);
uniq.push(d);
}
return uniq;
}
"""
try:
data = await self.page.evaluate(js)
logging.info(f"DOM cards parsed: {len(data)}")
return data
except Exception as e:
logging.warning(f"read_dom_products failed: {e}")
return []
async def collect_products(self) -> List[Dict[str, Any]]:
ssr = await self.read_ssr_product_summaries() or []
dom = await self.read_dom_products() or []
bykey: Dict[str, Dict[str, Any]] = {}
def key(d: Dict[str, Any]) -> str: return f"{(d.get('id') or '')}|{(d.get('url') or '')}"
# DOM как база
for d in dom:
bykey[key(d)] = {
"id": d.get("id"),
"title": d.get("title"),
"url": d.get("url"),
"price_text": d.get("price_text"),
"currency": d.get("currency"),
"color": d.get("color"),
}
# Обогащаем SSR
for s in ssr:
cw = (s.get("colourway") or {})
url = None
try:
base = (s.get("baseUrl") or "").rstrip("/")
rel = (cw.get("url") or "").lstrip("/")
url = f"{base}/{rel}" if (base and rel) else None
except Exception:
pass
cand = {"id": s.get("id"), "url": url}
k = key(cand)
rec = bykey.get(k)
if rec is None:
bykey[k] = {
"id": s.get("id"),
"title": s.get("title"),
"url": url,
"price_text": cw.get("price"),
"currency": s.get("currencyCode"),
"color": cw.get("color"), # ← цвет из SSR
}
else:
if not rec.get("title") and s.get("title"):
rec["title"] = s["title"]
if not rec.get("price_text") and cw.get("price"):
rec["price_text"] = cw["price"]
if not rec.get("currency") and s.get("currencyCode"):
rec["currency"] = s["currencyCode"]
if not rec.get("color") and cw.get("color"):
rec["color"] = cw["color"]
# Нормализация (без description — дополним на PDP)
out: List[Dict[str, Any]] = []
for v in bykey.values():
price_val = parse_pln_price_to_float(v.get("price_text"))
currency = v.get("currency")
if not currency and (v.get("price_text") or "").lower().find("") != -1:
currency = "PLN"
out.append({
"id": v.get("id"),
"title": v.get("title"),
"url": v.get("url"),
"price": price_val,
"currency": (currency or "PLN").upper(),
"color": v.get("color"),
"description": None,
})
logging.info(f"Total collected (SSR+DOM): {len(out)}")
return out
# ---------- PDP enrichment ----------
async def _parse_pdp_page(self, page, url: str) -> Dict[str, Optional[str]]:
try:
await page.goto(url, timeout=self.cfg.get("pdp", {}).get("nav_timeout_ms", 45000),
wait_until="domcontentloaded")
# cookie баннер редко повторяется, но попробуем
try:
el = page.locator('#onetrust-accept-btn-handler')
if await el.count() > 0:
await el.first.click(timeout=1500)
except Exception:
pass
# ждём, чтобы гидратация успела
try:
await page.wait_for_load_state("networkidle", timeout=self.cfg.get("pdp", {}).get("wait_timeout_ms", 15000))
except Exception:
pass
js = """
() => {
const pickText = (sels) => {
for (const s of sels) {
const el = document.querySelector(s);
if (el) {
const t = (el.innerText || el.textContent || "").trim();
if (t) return t;
}
}
return null;
};
let desc = pickText([
'[data-testid="product-description"]',
'[data-testid="pdp-description"]',
'[data-testid="ProductDetail-Description"]',
'#product-description',
'[itemprop="description"]',
'.productDescription'
]);
let color = pickText([
'[data-testid="selectedColourName"]',
'[data-testid="selected-colour-name"]',
'[data-testid="colour-name"]',
'.selectedColourName',
'.colourName',
'span[data-testid*="colour"]'
]);
try {
const g = globalThis;
const pd = g?.ssrClientSettings?.productDetails || g?.ssrClientSettings?.productDetail || null;
const st = pd?._STATE_?.productDetails || pd?._STATE_?.productDetail || {};
if (!desc) desc = st?.details?.description || st?.description || null;
if (!color) color = st?.selectedColourway?.name || st?.selectedColourway?.colour || st?.colourway?.name || null;
} catch (e) {}
return { desc, color };
}
"""
data = await page.evaluate(js)
return {
"description": (data or {}).get("desc"),
"color": (data or {}).get("color"),
}
except Exception:
return {"description": None, "color": None}
async def enrich_with_pdp_details(self, items: List[Dict[str, Any]], max_concurrency: Optional[int] = None) -> List[Dict[str, Any]]:
"""
Для каждого товара с URL заходим на PDP и тянем description и (если нет) color.
"""
urls = []
for it in items:
u = it.get("url")
if u and u not in urls:
urls.append(u)
sem = asyncio.Semaphore(max_concurrency or self.cfg.get("pdp", {}).get("max_concurrency", 3))
async def worker(u: str) -> tuple[str, Dict[str, Optional[str]]]:
async with sem:
page = await self.context.new_page()
# те же блокировки, что и на PLP
await self.context.route("**/*", self._route)
try:
res = await self._parse_pdp_page(page, u)
finally:
try:
await page.close()
except Exception:
pass
return u, res
tasks = [worker(u) for u in urls]
results = {}
for fut in asyncio.as_completed(tasks):
u, res = await fut
results[u] = res
# апдейт элементов
for it in items:
u = it.get("url")
if not u:
continue
det = results.get(u) or {}
if not it.get("description") and det.get("description"):
it["description"] = det["description"]
if not it.get("color") and det.get("color"):
it["color"] = det["color"]
return items