MacOS_Parsers/Парсер_IKEA/main_win proxy_all-1.0.py
2025-10-07 14:17:12 +03:00

912 lines
35 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# ikea_pipeline.py — Фаза 1 (API → flattened) + Фаза 2 (PIP → records)
# v1.0
import os, json, re, math, time, html, requests, datetime, pathlib
from urllib.parse import urljoin
from bs4 import BeautifulSoup
from openpyxl import Workbook, load_workbook
# ───────────────────────── ПУТИ / ПАПКИ ───────────────────────────
BASE_DIR = pathlib.Path(__file__).resolve().parent
RECORDS_DIR = BASE_DIR / "records_folder"
JSON_DIR = BASE_DIR / "json_raw"
RECORDS_DIR.mkdir(exist_ok=True)
JSON_DIR.mkdir(exist_ok=True)
# Файлы ввода/вывода
CAT_FILE = BASE_DIR / "leaf_categories.txt" # вход: список URL категорий
OUT_JSON = JSON_DIR / "flattened_products.json" # выход фазы 1 (json)
OUT_XLSX = JSON_DIR / "flattened_products.xlsx" # выход фазы 1 (xlsx)
OUTPUT_FILE = RECORDS_DIR / "records.xlsx" # выход фазы 2 (xlsx)
POST_LOG = RECORDS_DIR / "post_log.txt" # лог POST пакетов
DICT_FILE = BASE_DIR / "dictionary_main.txt"
EXCL_FILE = BASE_DIR / "exclusion_materials.txt"
# ───────────────────────── ПРОКСИ (общий) ────────────────────────
# Используется и для Фазы 1 (API POST), и для Фазы 2 (GET карточек).
PROXY_SCHEME = "http"
PROXY_USER = "vdE9MRLB"
PROXY_PASS = "YW9ZvHLU"
PROXY_HOST = "146.19.76.243"
PROXY_PORT = 63276
_AUTH = f"{PROXY_USER}:{PROXY_PASS}@" if PROXY_USER and PROXY_PASS else ""
PROXY_URL = f"{PROXY_SCHEME}://{_AUTH}{PROXY_HOST}:{PROXY_PORT}"
PROXIES_WEB = {"http": PROXY_URL, "https": PROXY_URL}
# ───────────────────────── НАСТРОЙКИ POST (Фаза 2) ───────────────
POST_URL = os.getenv("IKEA_POST_URL", "http://172.25.4.101:3005/parser/data")
POST_API_KEY = os.getenv("IKEA_POST_API_KEY", "")
POST_TIMEOUT = 20
BATCH_SIZE = 50
# ───────────────────────── НАСТРОЙКИ IKEA API (Фаза 1) ───────────
SEARCH_URL = "https://sik.search.blue.cdtapps.com/pl/pl/search?c=listaf&v=20250507"
API_HEADERS = {
"User-Agent": "Mozilla/5.0",
"Content-Type": "application/json",
}
REQUEST_TIMEOUT = 30
# ───────────────────────── НАСТРОЙКИ PIP (Фаза 2) ────────────────
HEADERS = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/126.0.0.0 Safari/537.36",
"Accept-Language": "pl-PL,pl;q=0.9,en;q=0.8,ru;q=0.7",
}
CSS_SELECTOR = ".pip-product__subgrid.product-pip.js-product-pip"
REQUEST_TIMEOUT_GET = 20
BLOCKS = [
"buyModule",
"productSummary",
"pipPricePackage",
"productInformationSection",
"keyFacts",
"stockcheckSection",
"availabilityGroup",
"productGallery",
]
# Столбцы для Excel (Фаза 2, карточка) + мы добавим flat.* (Фаза 1)
KEEP_COLUMNS = [
"availabilityGroup.serverOnlineSellable",
"availabilityGroup.storeHeader",
"buyModule.onlineSellable",
"buyModule.productName",
"buyModule.productPrice",
"buyModule.productType",
"keyFacts.ariaLabels",
"keyFacts.gaLabel",
"keyFacts.keyFacts",
"keyFacts.keyFacts_formatted",
"pipPricePackage.measurementText",
"pipPricePackage.productDescription",
"productGallery.urls",
"productInformationSection.dimensionProps",
"productInformationSection.dimensionProps_formatted",
"productInformationSection.dimensionProps_formatted_html_translated",
"productInformationSection.productDetailsProps",
"productInformationSection.productDetailsProps_formatted",
"productInformationSection.productDetailsProps_formatted_html",
"productInformationSection.dimensionsOnly_formatted_html_translated",
"productSummary.description",
"productSummary.visibleItemNo",
"stockcheckSection.packagingProps",
"stockcheckSection.typeName",
"total brutto",
"prductVariantColorMeasure",
"categoryBreadcrumb",
"originalName",
"url",
]
# Доп. столбцы из фазы 1, которые вливаем в итоговый records.xlsx
FLAT_EXTRA_COLS = [
"flat.id",
"flat.price",
"flat.availability_0_status",
"flat.availability_1_status",
"flat.availability_1_store",
"flat.category_path",
]
# ───────────────────────── УТИЛИТЫ ───────────────────────────────
def _now_tag():
return datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
def ask_bool(prompt: str, default: str = "1") -> bool:
try:
val = input(f"{prompt} (1=yes, 0=no) [{default}]: ").strip() or default
except EOFError:
val = default
return val == "1"
def _post_log(msg: str):
try:
with open(POST_LOG, "a", encoding="utf-8") as f:
f.write(msg.rstrip() + "\n")
except Exception:
pass
def log(msg: str):
ts = datetime.datetime.now().strftime("[%Y-%m-%d %H:%M:%S] ")
print(ts + msg)
# ───────────────────────── СЛОВАРИ / ФИЛЬТРЫ (Фаза 2) ────────────
def load_dictionary(path: pathlib.Path) -> dict:
if not path.exists():
return {}
txt = path.read_text(encoding="utf-8")
pairs = re.findall(r'"([^"]+)"\s*:\s*"([^"]+)"', txt)
return {k: v for k, v in pairs}
DICT = load_dictionary(DICT_FILE)
def translate_token(token: str) -> str:
return DICT.get(token, token)
def load_exclusions(path: pathlib.Path) -> set:
if not path.exists():
return set()
txt = path.read_text(encoding="utf-8")
quoted = re.findall(r'"([^"]+)"', txt, flags=re.S)
tokens = quoted if quoted else re.split(r"[,;\n\r]+", txt)
return {t.strip().lower() for t in tokens if t.strip()}
EXCLUSIONS = load_exclusions(EXCL_FILE)
def materials_from_details_json(details: dict) -> list[str]:
out = []
def walk(node):
if isinstance(node, dict):
for k, v in node.items():
if k == "material" and isinstance(v, str):
out.append(v)
else:
walk(v)
elif isinstance(node, list):
for x in node:
walk(x)
walk(details or {})
return out
def materials_match_exclusions(details: dict, exclusion_tokens: set) -> bool:
if not exclusion_tokens:
return False
mats = materials_from_details_json(details)
joined = "\n".join(mats).lower()
return any(tok in joined for tok in exclusion_tokens)
# ───────────────────────── ФОРМАТТЕРЫ (Фаза 2) ───────────────────
def _parse_json_value(val):
if isinstance(val, (dict, list)) or val is None:
return val
if isinstance(val, str):
s = val.strip()
if not s:
return val
try:
return json.loads(s)
except Exception:
return val
return val
def flatten_block(block_name, data):
if not isinstance(data, dict):
return {}
flat = {}
for k, v in data.items():
if block_name == "productGallery" and k == "mediaList":
if isinstance(v, list):
urls = []
for item in v:
content = item.get("content", {})
if isinstance(content, dict) and "url" in content:
urls.append(content["url"])
flat["productGallery.urls"] = "\n".join(urls)
return flat
key = f"{block_name}.{k}"
flat[key] = v
return flat
def format_keyfacts(raw_keyfacts):
if not isinstance(raw_keyfacts, list):
return ""
out = []
header_added = False
for el in raw_keyfacts:
lbl = (el or {}).get("label")
name = (el or {}).get("name", "Właściwości")
if not header_added:
out.append(name)
header_added = True
if lbl:
out.append(lbl)
return "\n".join(out)
def _fmt_float(x):
try:
return f"{float(x):.2f}".rstrip("0").rstrip(".")
except Exception:
return ""
def _collect_packaging_total_kg(packaging):
total = 0.0
if not isinstance(packaging, dict):
return total
content = (packaging.get("contentProps") or {}).get("packages") or []
for pkg in content:
qty = ((pkg.get("quantity") or {}).get("value")) or 1
ms = pkg.get("measurements") or []
for block in ms:
if not isinstance(block, list):
continue
weight_lbl = next((m for m in block if (m.get("type") == "weight" or m.get("label") == "Waga")), None)
if weight_lbl and isinstance(weight_lbl.get("value"), (int, float)):
total += float(weight_lbl["value"]) * (qty or 1)
return total
def format_dimensions(raw_dim_props, with_html=False, translated=False):
if not isinstance(raw_dim_props, dict):
return ""
lines = []
br = "<br/>" if with_html else "\n"
title = translate_token("Wymiary") if translated else "Wymiary"
lines.append(f"<b>{title}</b>" if with_html else title)
for d in raw_dim_props.get("dimensions", []):
name = d.get("name", "")
meas = d.get("measure", "")
if not name and not meas:
continue
name_t = translate_token(name) if translated else name
line = f"{name_t}: {meas}".strip()
lines.append(line)
pack = (raw_dim_props.get("packaging") or {})
pack_title = translate_token("Opakowanie") if translated else "Opakowanie"
lines.append(br if with_html else "")
lines.append(f"<b>{pack_title}</b>" if with_html else pack_title)
content = (pack.get("contentProps") or {}).get("packages") or []
for pkg in content:
name = pkg.get("name") or ""
if name:
lines.append(name)
art = (pkg.get("articleNumber") or {}).get("value")
if art:
art_lbl = "Numer artykułu"
if translated:
art_lbl = translate_token(art_lbl)
lines.append(art_lbl)
lines.append(f"{art}")
ms = pkg.get("measurements") or []
for block in ms:
if not isinstance(block, list):
continue
for m in block:
lbl = m.get("label", "")
txt = m.get("text", "")
if translated and lbl:
lbl = translate_token(lbl)
if lbl or txt:
lines.append(f"{lbl}: {txt}".strip(": "))
q_val = ((pkg.get("quantity") or {}).get("value"))
if q_val:
q_lbl = "Paczka(i)"
if translated:
q_lbl = translate_token(q_lbl)
lines.append(f"{q_lbl}: {q_val}")
if with_html:
s = br.join([x for x in lines if x is not None])
s = re.sub(r"(" + re.escape(br) + r"){2,}", br*2, s).strip(br)
if s.startswith("b>"): # защита для Excel-превью
s = "<" + s
return s
return "\n".join([x for x in lines if x is not None]).strip()
def format_product_details(raw_details, add_summary_desc="", with_html=False, skip_assembly=True):
if not isinstance(raw_details, dict):
return add_summary_desc if with_html else add_summary_desc
br = "<br/>" if with_html else "\n"
out = []
if add_summary_desc:
out.append(add_summary_desc)
out.append(br if with_html else "")
t1 = "Informacje o produkcie"
out.append(f"<b>{t1}</b>" if with_html else t1)
pd = (raw_details.get("productDescriptionProps") or {})
for p in (pd.get("paragraphs") or []):
out.append(p)
dlabel = pd.get("designerLabel")
dname = pd.get("designerName")
if dlabel and dname:
out.append(dlabel)
out.append(dname)
if raw_details.get("productId"):
out.append("Numer artykułu")
out.append(raw_details["productId"])
acc = (raw_details.get("accordionObject") or {})
gk = ((acc.get("goodToKnow") or {}).get("contentProps") or {}).get("goodToKnow") or []
if gk:
out.append(br if with_html else "")
t2 = "Dobrze wiedzieć"
out.append(f"<b>{t2}</b>" if with_html else t2)
for item in gk:
txt = item.get("text")
if txt:
out.append(txt)
mac = (acc.get("materialsAndCare") or {}).get("contentProps") or {}
mats = mac.get("materials") or []
care = mac.get("careInstructions") or []
t3 = "Materiały i pielęgnacja"
if mats or care:
out.append(br if with_html else "")
out.append(f"<b>{t3}</b>" if with_html else t3)
if mats:
out.append("Materiały")
for m in mats:
ptype = m.get("productType", "")
for mat in (m.get("materials") or []):
material = mat.get("material", "")
if ptype:
out.append(ptype)
if material:
out.append(material)
if care:
detailsCareText = mac.get("detailsCareText", "Pielęgnacja")
out.append(detailsCareText)
for c in care:
ptype = c.get("productType", "")
for t in (c.get("texts") or []):
if ptype:
out.append(ptype)
out.append(t)
safety = (raw_details.get("safetyAndCompliance") or {}).get("contentProps") or {}
sc = safety.get("safetyAndCompliance") or []
if sc:
out.append(br if with_html else "")
t4 = "Bezpieczeństwo i zgodność с przepisami"
out.append(f"<b>{t4}</b>" if with_html else t4)
for s in sc:
txt = s.get("text")
if txt:
out.append(txt)
if with_html:
s = br.join([x for x in out if x is not None])
s = re.sub(r"(" + re.escape(br) + r"){2,}", br*2, s).strip(br)
return s
return "\n".join([x for x in out if x is not None]).strip()
def build_variant_color_measure(desc: str, type_name: str, measurement: str) -> str:
s = (desc or "")
t = (type_name or "").strip()
if t:
pattern = r"^\s*" + re.escape(t) + r"[\s,;:\-–—/]*"
s = re.sub(pattern, "", s, flags=re.IGNORECASE)
if not re.search(r"[0-9A-Za-zА-Яа-яЁёÀ-ž]", s or ""):
s = ""
s = s.strip()
meas = (measurement or "").strip()
if not s:
return meas if meas else ""
s = s[:1].upper() + s[1:]
return f"{s}, {meas}" if meas else s
def format_dimensions_only(raw_dim_props, with_html=False, translated=False):
"""Только секция размеров (Wymiary) без упаковки (для originalComposition)."""
if not isinstance(raw_dim_props, dict):
return ""
lines = []
br = "<br/>" if with_html else "\n"
title = translate_token("Wymiary") if translated else "Wymiary"
lines.append(f"<b>{title}</b>" if with_html else title)
for d in raw_dim_props.get("dimensions", []):
name = d.get("name", "")
meas = d.get("measure", "")
if not name and not meas:
continue
name_t = translate_token(name) if translated else name
lines.append(f"{name_t}: {meas}".strip())
if with_html:
s = br.join([x for x in lines if x is not None])
s = re.sub(r"(" + re.escape(br) + r"){2,}", br*2, s).strip(br)
if s.startswith("b>"):
s = "<" + s
return s
return "\n".join([x for x in lines if x is not None]).strip()
# ───────────────────────── ФАЗА 1: IKEA API → FLATTENED ──────────
def fetch_category_json(category_id: str) -> dict:
"""POST к IKEA API, возврат JSON (через прокси)."""
payload = {
"searchParameters": {"input": category_id, "type": "CATEGORY"},
"zip": "05-090",
"store": "188",
"isUserLoggedIn": False,
"optimizely": {
"listing_3547_filter_hnf_sticky": None,
"listing_3332_collapsed_filter_bar": None,
"discount_percentage": None,
"listing_3790_simplify_rating_stars": None
},
"optimizelyAttributes": {
"market": "pl",
"device": "desktop",
"deviceVendor": "Apple",
"deviceType": "desktop",
"isLoggedIn": False,
"environment": "prod",
"browser": "Chrome",
"os": "Mac OS",
"language": "pl",
"feedMarket": "pl-PL",
"locale": "pl-PL",
"customerType": "guest",
"isEntranceVisit": False,
"pip_to_pip_src": ""
},
"components": [{
"component": "PRIMARY_AREA",
"columns": 4,
"types": {"main": "PRODUCT", "breakouts": ["PLANNER", "LOGIN_REMINDER", "MATTRESS_WARRANTY"]},
"filterConfig": {"max-num-filters": 6},
"window": {"size": 1000, "offset": 0},
"forceFilterCalculation": True
}]
}
log(f"POST {SEARCH_URL} category_id={category_id}")
r = requests.post(SEARCH_URL, headers=API_HEADERS, json=payload, timeout=REQUEST_TIMEOUT, proxies=PROXIES_WEB)
log(f"→ Status: {r.status_code}")
r.raise_for_status()
return r.json()
def extract_products_from_api(data: dict) -> list[dict]:
"""Извлекает список товаров и вариантов: id/pipUrl/availability/price/category_path."""
products = []
for result in data.get("results", []):
for item in result.get("items", []):
product = item.get("product")
if not product:
continue
category_path = " / ".join(c.get("name", "") for c in product.get("categoryPath", []))
def extract_one(prod):
av = prod.get("availability", [])
av0_status = av[0].get("status") if len(av) > 0 else ""
av1_status = av[1].get("status") if len(av) > 1 else ""
av1_store = av[1].get("store") if len(av) > 1 else ""
price = (prod.get("salesPrice", {}).get("current", {}).get("wholeNumber", "")) or ""
return {
"id": prod.get("id") or prod.get("itemNoGlobal") or prod.get("itemNo"),
"pipUrl": prod.get("pipUrl", ""),
"availability_0_status": av0_status,
"availability_1_status": av1_status,
"availability_1_store": av1_store,
"price": price,
"category_path": category_path,
}
# Основной продукт
products.append(extract_one(product))
# Варианты
for v in (product.get("gprDescription", {}).get("variants", []) or []):
products.append(extract_one(v))
return products
def phase1_collect_flattened():
"""Читает leaf_categories.txt, дергает API, пишет flattened_products.json/xlsx. Возвращает список dict."""
if not CAT_FILE.exists():
log("✖ leaf_categories.txt не найден.")
return []
categories = [line.strip() for line in CAT_FILE.read_text(encoding="utf-8").splitlines() if line.strip()]
if not categories:
log("✖ Нет категорий для обработки.")
return []
all_products = []
for idx, url in enumerate(categories, 1):
log(f"[{idx}/{len(categories)}] {url}")
m = re.search(r"-([0-9]+)/?$", url.rstrip("/"))
if not m:
log("⚠️ Не найден ID категории в URL")
continue
cat_id = m.group(1)
try:
data = fetch_category_json(cat_id)
items = extract_products_from_api(data)
all_products.extend(items)
log(f"{len(items)} товаров добавлено из категории {cat_id}")
except Exception as e:
log(f"❌ Ошибка при категории {cat_id}: {e}")
if not all_products:
log("⚠️ Нет товаров для сохранения.")
return []
# JSON
OUT_JSON.write_text(json.dumps(all_products, ensure_ascii=False, indent=2), encoding="utf-8")
log(f"💾 JSON сохранён → {OUT_JSON.name} ({len(all_products)} записей)")
# Excel
wb = Workbook()
ws = wb.active
ws.title = "IKEA_flat"
headers = list(all_products[0].keys())
ws.append(headers)
for row in all_products:
ws.append([row.get(h, "") for h in headers])
wb.save(OUT_XLSX)
log(f"📊 Excel сохранён → {OUT_XLSX.name}")
return all_products
# ───────────────────────── ФАЗА 2: PIP карточки → records ────────
def _ceil_price(v):
try:
return int(math.ceil(float(v)))
except Exception:
return None
def _ceil_int(v):
try:
return int(math.ceil(float(v)))
except Exception:
return None
def build_variant(row: dict) -> dict:
visible = row.get("productSummary.visibleItemNo") or ""
sku = visible.replace(" ", "")
category_name = row.get("categoryBreadcrumb") or ""
cdesc = row.get("pipPricePackage.productDescription") or ""
tname = row.get("stockcheckSection.typeName") or ""
meas = row.get("pipPricePackage.measurementText") or ""
csm = build_variant_color_measure(cdesc, tname, meas)
color, size = ("", "")
if csm:
parts = [p.strip() for p in csm.split(",", 1)]
if len(parts) == 2:
color, size = parts[0], parts[1]
else:
color, size = parts[0], ""
if not color and not size:
size = (row.get("pipPricePackage.measurementText") or "").strip()
cost = _ceil_price(row.get("buyModule.productPrice"))
url = row.get("url") or ""
name = row.get("originalName") or row.get("buyModule.productName") or ""
desc_html = row.get("productInformationSection.productDetailsProps_formatted_html") or ""
composition_html = row.get("productInformationSection.dimensionsOnly_formatted_html_translated") or ""
imgs = []
raw_imgs = row.get("productGallery.urls") or ""
if isinstance(raw_imgs, str):
imgs = [x for x in raw_imgs.split("\n") if x.strip()]
in_stock = bool(row.get("availabilityGroup.serverOnlineSellable")) or bool(row.get("buyModule.onlineSellable"))
weight_kg = _ceil_int(row.get("total brutto"))
variant = {
"status_id": 1,
"color": color,
"sku": sku,
"size": size,
"cost": cost,
"originalUrl": url,
"originalName": name,
"originalDescription": desc_html,
"originalComposition": composition_html,
"images": imgs,
"inStock": in_stock,
"weight": weight_kg if weight_kg is not None else 0,
}
return {
"category": {"name": category_name},
"brand": {"name": "ikea"},
"variant": variant,
}
def post_payload(payload: dict) -> dict:
headers = {"Content-Type": "application/json"}
if POST_API_KEY:
headers["Authorization"] = f"Bearer {POST_API_KEY}"
body = json.dumps(payload, ensure_ascii=False)
_post_log(f"→ POST {POST_URL}\nHeaders: {headers}\nBody: {body}")
try:
r = requests.post(POST_URL, headers=headers, data=body.encode("utf-8"), timeout=POST_TIMEOUT)
text = r.text
_post_log(f"{r.status_code}\n{text}\n{'-'*60}")
ok = 200 <= r.status_code < 300
return {"ok": ok, "status": r.status_code, "response": text}
except Exception as e:
_post_log(f"× ERROR: {e}\n{'-'*60}")
return {"ok": False, "status": None, "error": str(e)}
def safe_cell(val):
if isinstance(val, (dict, list)):
return json.dumps(val, ensure_ascii=False)
return "" if val is None else val
def extract_data(url: str) -> dict:
"""Парсинг карточки: вытягиваем data-hydration-props, раскладываем по KEEP_COLUMNS."""
try:
resp = requests.get(
url, headers=HEADERS, timeout=REQUEST_TIMEOUT_GET,
proxies=PROXIES_WEB, allow_redirects=True
)
resp.raise_for_status()
soup = BeautifulSoup(resp.text, "html.parser")
target = soup.select_one(CSS_SELECTOR)
if not target:
return {"url": url, "error": "CSS selector not found"}
raw = target.get("data-hydration-props")
if not raw:
return {"url": url, "error": "data-hydration-props not found"}
decoded = html.unescape(raw)
full_json = json.loads(decoded)
result = {"url": url}
for block in BLOCKS:
result.update(flatten_block(block, full_json.get(block, {})))
kf_json = _parse_json_value(result.get("keyFacts.keyFacts"))
dim_json = _parse_json_value(result.get("productInformationSection.dimensionProps"))
det_json = _parse_json_value(result.get("productInformationSection.productDetailsProps"))
result["keyFacts.keyFacts_formatted"] = format_keyfacts(kf_json)
# Полные размеры (с упаковкой) в HTML
html_trans = format_dimensions(dim_json, with_html=True, translated=True)
if isinstance(html_trans, str) and html_trans.startswith("b>"):
html_trans = "<" + html_trans
result["productInformationSection.dimensionProps_formatted_html_translated"] = html_trans
# Только "Wymiary" (без упаковки) в HTML → для originalComposition
dims_only_html = format_dimensions_only(dim_json, with_html=True, translated=True)
result["productInformationSection.dimensionsOnly_formatted_html_translated"] = dims_only_html
# Текстовая версия размеров
result["productInformationSection.dimensionProps_formatted"] = format_dimensions(dim_json, with_html=False, translated=False)
total_kg = _collect_packaging_total_kg((dim_json or {}).get("packaging") or {})
result["total brutto"] = _fmt_float(total_kg)
summary_desc = result.get("productSummary.description", "") or ""
result["productInformationSection.productDetailsProps_formatted"] = format_product_details(det_json, add_summary_desc=summary_desc, with_html=False, skip_assembly=True)
result["productInformationSection.productDetailsProps_formatted_html"] = format_product_details(det_json, add_summary_desc=summary_desc, with_html=True, skip_assembly=True)
desc = result.get("pipPricePackage.productDescription", "") or ""
tname = result.get("stockcheckSection.typeName", "") or ""
meas = result.get("pipPricePackage.measurementText", "") or ""
result["prductVariantColorMeasure"] = build_variant_color_measure(desc, tname, meas)
# breadcrumb из ld+json
breadcrumb = None
for tag in soup.find_all("script", attrs={"type": lambda t: t and "ld+json" in t}):
try:
data = json.loads(tag.string)
except Exception:
continue
if isinstance(data, list):
data = next((d for d in data if isinstance(d, dict) and d.get("@type") == "BreadcrumbList"), None)
if isinstance(data, dict) and data.get("@type") == "BreadcrumbList":
items = data.get("itemListElement", [])
names = [it.get("name", "") for it in items]
breadcrumb = "/".join(names)
break
if breadcrumb:
result["categoryBreadcrumb"] = breadcrumb
# whitelist + originalName
filtered = {k: result.get(k) for k in KEEP_COLUMNS if k != "originalName"}
pn = (result.get("buyModule.productName") or "").strip()
tn = (result.get("stockcheckSection.typeName") or "").strip()
filtered["originalName"] = (f"{pn} {tn}".strip() or pn or tn)
return filtered
except Exception as e:
return {"url": url, "error": str(e)}
# ───────────────────────── ОБЩИЙ PIPELINE ────────────────────────
def run_pipeline():
log(f"Запуск {datetime.datetime.now()} pid={os.getpid()}")
# ФАЗА 1: собрать flattened (API)
flat_items = phase1_collect_flattened()
# Карта → для быстрого присоединения полей в Фазе 2
# ключ = pipUrl, значение = dict(flat.*)
flat_by_url = {}
links_in_order = [] # порядок обхода (все pipUrl, уникальные, по порядку)
for row in flat_items:
url = row.get("pipUrl") or ""
if not url:
continue
flat_by_url[url] = {
"flat.id": row.get("id", ""),
"flat.price": row.get("price", ""),
"flat.availability_0_status": row.get("availability_0_status", ""),
"flat.availability_1_status": row.get("availability_1_status", ""),
"flat.availability_1_store": row.get("availability_1_store", ""),
"flat.category_path": row.get("category_path", ""),
}
if url not in links_in_order:
links_in_order.append(url)
if not links_in_order:
log("⚠️ Нет ссылок для Фазы 2.")
return
# ФАЗА 2: карточки по ссылкам → records.xlsx (+ POST/JSON батчи)
SAVE_JSON = ask_bool("SAVE_JSON (сохранять JSON батчи?)", "1")
SEND_JSON = ask_bool("SEND_JSON (отправлять на API?)", "1")
# === Загружаем данные из flattened_products.xlsx начало===
FLAT_FILE = os.path.join(BASE_DIR, "json_raw", "flattened_products.xlsx")
AVAIL_MAP = {}
if os.path.exists(FLAT_FILE):
wb_flat = load_workbook(FLAT_FILE, read_only=True)
ws_flat = wb_flat.active
# определяем индексы нужных колонок
headers = [c.value for c in next(ws_flat.iter_rows(min_row=1, max_row=1))]
url_idx = headers.index("pipUrl") + 1 if "pipUrl" in headers else None
a0_idx = headers.index("availability_0_status") + 1 if "availability_0_status" in headers else None
a1_idx = headers.index("availability_1_status") + 1 if "availability_1_status" in headers else None
if url_idx and (a0_idx or a1_idx):
for row in ws_flat.iter_rows(min_row=2, values_only=True):
url = row[url_idx - 1]
if not url:
continue
a0 = (row[a0_idx - 1] if a0_idx else "") or ""
a1 = (row[a1_idx - 1] if a1_idx else "") or ""
AVAIL_MAP[url] = {
"availability_0_status": str(a0).strip(),
"availability_1_status": str(a1).strip(),
}
print(f"📦 Загружено {len(AVAIL_MAP)} записей из flattened_products.xlsx")
else:
print("⚠️ Файл flattened_products.xlsx не найден, фильтр по HIGH_IN_STOCK не будет применён.")
# === Загружаем данные из flattened_products.xlsx конец===
wb = Workbook()
ws = wb.active
ws.title = "IKEA Products"
# Заголовки = карточка (KEEP_COLUMNS) + flat.*
headers = KEEP_COLUMNS + FLAT_EXTRA_COLS
ws.append(headers)
batch_items = []
batch_index = 1
def _save_json_batch(payload: dict, batch_index: int):
fname = f"ikea_batch_{_now_tag()}_{batch_index:04d}.json"
fpath = RECORDS_DIR / fname
fpath.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8")
print(f"💾 JSON saved: {fname}")
return fpath
def flush_batch():
nonlocal batch_items, batch_index
if not batch_items:
return
payload = {"parserName": "ikea", "items": batch_items}
if SAVE_JSON:
_save_json_batch(payload, batch_index)
if SEND_JSON:
res = post_payload(payload)
ok = res.get("ok")
print(f"POST batch {batch_index}: {'OK' if ok else 'FAIL'} (status={res.get('status')})")
batch_index += 1
batch_items = []
log(f"Всего ссылок к обходу: {len(links_in_order)}")
for idx, link in enumerate(links_in_order, 1):
print(f"[{idx}/{len(links_in_order)}] {link}")
row = extract_data(link)
# Вставляем URL (страховка) + добавим flat.* в Excel
row["url"] = link
# Excel: карточка
excel_row = [safe_cell(row.get(col, "")) for col in KEEP_COLUMNS]
# Excel: flat.*
flat_extra = flat_by_url.get(link, {})
excel_row.extend([flat_extra.get(col, "") for col in FLAT_EXTRA_COLS])
ws.append(excel_row)
try:
price = float(row.get("buyModule.productPrice") or 0)
except Exception:
price = 0.0
try:
total_kg = float(row.get("total brutto") or 0)
except Exception:
total_kg = 0.0
details_json = row.get("productInformationSection.productDetailsProps") or {}
# --- Проверяем наличие HIGH_IN_STOCK на основании flattened_products ---
avail_0 = ""
avail_1 = ""
if link in AVAIL_MAP:
avail_0 = AVAIL_MAP[link]["availability_0_status"].upper()
avail_1 = AVAIL_MAP[link]["availability_1_status"].upper()
avail_ok = (avail_0 == "HIGH_IN_STOCK") or (avail_1 == "HIGH_IN_STOCK")
# --- Фильтры ---
if not (20 <= price <= 2000):
pass
elif total_kg > 30:
pass
elif materials_match_exclusions(details_json, EXCLUSIONS):
pass
elif not avail_ok:
pass
else:
try:
item = build_variant(row)
batch_items.append(item)
except Exception as e:
_post_log(f"× build_variant error for {link}: {e}")
# autosave Excel каждые 50 строк
if idx % 50 == 0:
wb.save(OUTPUT_FILE)
print(f"💾 autosave: {OUTPUT_FILE}")
# флаш батча при достижении лимита
if len(batch_items) >= BATCH_SIZE:
flush_batch()
# финал
wb.save(OUTPUT_FILE)
print(f"\n✅ Excel готов: {OUTPUT_FILE}")
flush_batch()
print("🎯 Готово.")
# ───────────────────────── Точка входа ───────────────────────────
if __name__ == "__main__":
run_pipeline()