MacOS_Parsers/Parsing ZARAHOME/src/extractor.py
2025-07-31 13:29:40 +03:00

342 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# extractor.py · v 2.0 · 2025-07-24
from json import load, loads
from os.path import abspath
from bs4 import BeautifulSoup
from lxml import etree
import logging, os, sys
# ────────────────────────── конфигурация ───────────────────────────
DEL_SAME = "YES" # "YES" → фильтрация, "NO" → без фильтра
_log_level = os.getenv("LOG_LEVEL", "INFO").upper()
logging.basicConfig(
level=_log_level,
stream=sys.stdout,
format="%(asctime)s%(levelname)-5s%(message)s",
datefmt="%H:%M:%S"
)
log = logging.getLogger("extractor")
# ────────────────────── вспом-функции Zara Home ────────────────────
def extract_components_zarahome(parts):
comp = []
for part in parts:
if part.get("areas") and part.get("description"):
if len(parts) != 1:
comp.append(part["description"])
for area in part["areas"]:
comp.append(f"{area['description']} ({area['percentageArea']})")
for c in area["components"]:
comp.append(f"{c['percentage']} {c['material']}")
elif part.get("components") and part.get("description"):
if len(parts) != 1:
comp.append(part["description"])
for c in part["components"]:
comp.append(f"{c['percentage']} {c['material']}")
return comp
# ───────────────────── фильтр дубликатов on-the-fly ────────────────
def push_row_dedup(row, idx, seen, out):
"""Добавляет строку в out, соблюдая правила DEL_SAME."""
art, name, size, price, clr = (row[idx["Артикул"]],
row[idx["Название товара или услуги"]],
row[idx["Свойство: Размер"]],
row[idx["Цена закупки"]],
row[idx["Свойство: Цвет"]])
base = (art, name, size, price, clr)
if base not in seen:
seen[base] = row
out.append(row)
return
old = seen[base]
pn_old, pn_new = old[idx["PartNumber"]], row[idx["PartNumber"]]
vis_old, vis_new = old[idx["Наличие на сайте"]], row[idx["Наличие на сайте"]]
# 2) одинаковый PartNumber -> игнор новой строки
if pn_old == pn_new:
return
# 3) vis одинаковый?
if vis_old == vis_new:
art4 = art[:4]
pn4_old = pn_old[1:5] if len(pn_old) >= 5 else ""
pn4_new = pn_new[1:5] if len(pn_new) >= 5 else ""
# 4) оставляем только совпавшие 4-символа
if art4 == pn4_new and art4 != pn4_old:
# новая подходит лучше -> заменить
seen[base] = row
out[out.index(old)] = row
# если старая совпадает, новая — нет -> игнор
return
# 5) vis разные -> оставляем SHOW
if vis_new == "SHOW" and vis_old != "SHOW":
seen[base] = row
out[out.index(old)] = row
# иначе — оставляем старую (SHOW уже сохранён) или обе off-SHOW пропускаем.
class Extractor:
# ----------------------------------------------------------------
def __init__(self, json_data):
self.methods = {
"": (self.default_extract_method, []),
"zarahome": (self.zarahome_extract_method, [
"Краткое описание",
"Артикул",
"SKU",
"PartNumber",
"Название товара или услуги",
"Полное описание",
"Образец цвета",
"Свойство: Цвет",
"Свойство: Размер",
"Цена закупки",
"Свойство: Вес(г)",
"Наличие на сайте",
"Изображения",
"Изображения варианта",
"Параметр: Состав",
"Параметр: Уход",
"Параметр: Происхождение",
"Размещение на сайте",
"Свойство: Бренд"
]),
"zara": (self.zara_extract_method, []),
"eobuwie": (self.eobuwie_extract_method, []),
"decathlon": (self.decathlon_extract_method, []),
"chanel": (self.chanel_extract_method, []),
}
self.method = json_data["method"]
self.tags = json_data["tags"]
self.headers = self.methods[self.method][1].copy()
for tag in self.tags:
self.headers.insert(tag["column_number"], tag["column_name"])
# ─────────────────────────── утилиты ───────────────────────────
def extract(self, parser, recorder, categories):
self.methods[self.method][0](parser, recorder, categories)
def default_extract_method(self, *a, **kw):
log.info("Default extractor → nothing to do.")
def tags_extract(self, soup, row):
dom = etree.HTML(str(soup))
for tag in self.tags:
res = dom.xpath(tag["xpath"])
col = ""
if res:
for el in res:
col += ''.join(el.itertext()).strip() + "\n"
row.insert(tag["column_number"], col)
# ───── заглушки ─────
def zara_extract_method(self, *_, **__): log.info("ZARA extractor disabled.")
def eobuwie_extract_method(self, *_, **__): log.info("Eobuwie extractor disabled.")
def decathlon_extract_method(self, *_, **__): log.info("Decathlon extractor disabled.")
def chanel_extract_method(self, *_, **__): log.info("Chanel extractor disabled.")
# ─────────────────────── ZARA HOME ─────────────────────────────
def zarahome_extract_method(self, parser, recorder, categories):
BASE_API = "https://www.zarahome.com/itxrest/3/catalog/store/85009924/80290000"
USER_BRAND = "ZARAHOME"
def fetch_json(url):
try:
return parser.parse(url, return_type="json")
except Exception as err:
log.warning("Request Error: %s - %s", err, url)
alt = url.replace("ieec2cihslb3-zarahome.central.inditex.grp",
"www.zarahome.com")
if alt != url:
log.info("→ retry via public host")
return parser.parse(alt, return_type="json")
return None
for c_idx, category in enumerate(categories, 1):
log.info("Categories: %s / %s %s", c_idx, len(categories), category)
# подготовка структур фильтра
clean_rows = [self.headers]
if DEL_SAME == "YES":
idx_map = {h: i for i, h in enumerate(self.headers)}
seen = {}
# ── HTML категории
html = parser.parse(category)
if html is None:
log.warning("Extractor Error: empty page"); continue
soup = BeautifulSoup(html, "lxml")
script = soup.select_one("#serverApp-state")
if not script:
log.warning("Extractor Error: script not found"); continue
state = loads(script.string)
cat_key = next(k for k in state if "/category?" in k)
cat_info = state[cat_key]
ids = [str(p["id"]) for p in cat_info.get("products", [])]
summaries = []
# (A) via productIds
if ids:
CHUNK = 60
for p in range(0, len(ids), CHUNK):
api = (f"{BASE_API}/productsArray?languageId=-1&"
f"productIds={','.join(ids[p:p+CHUNK])}&appId=1")
data = fetch_json(api)
if data and "products" in data:
summaries += data["products"]
else:
prod_key = next((k for k in state if "/product?" in k), None)
if prod_key and "products" in state[prod_key]:
for grp in state[prod_key]["products"]:
summaries += grp.get("bundleProductSummaries", [])
elif prod_key and "productIds" in state[prod_key]:
ids = state[prod_key]["productIds"]
CHUNK = 60
for p in range(0, len(ids), CHUNK):
api = (f"{BASE_API}/productsArray?languageId=-1&"
f"productIds={','.join(map(str, ids[p:p+CHUNK]))}&appId=1")
data = fetch_json(api)
if data and "products" in data:
summaries += data["products"]
else:
subcats = cat_info.get("subcategories") or []
for sub in subcats:
sub_url = "https://www.zarahome.com/pl/en/" + sub["url"]
sub_html = parser.parse(sub_url)
if not sub_html:
continue
sub_state = loads(BeautifulSoup(sub_html, "lxml")
.select_one("#serverApp-state").string)
sub_prod_key = next((k for k in sub_state if "/product?" in k), None)
if sub_prod_key and "products" in sub_state[sub_prod_key]:
for grp in sub_state[sub_prod_key]["products"]:
summaries += grp.get("bundleProductSummaries", [])
seen_ids = set()
for prod in summaries:
prod_id = prod.get("id")
short_url = prod.get("productUrl") or (
f"{prod['seo']['keyword']}-p{prod['seo']['seoProductId']}.html"
if prod.get("seo") else "")
if not short_url or prod_id in seen_ids:
continue
seen_ids.add(prod_id)
variants = prod.get("bundleProductSummaries") or [prod]
for vprod in variants:
det = vprod["detail"]
sec, fam, subfam = (vprod.get("sectionNameEN") or "",
vprod.get("familyName") or "",
vprod.get("subFamilyName") or "")
cat_path = "Каталог/ZaraHome/" + "/".join(p for p in (sec, fam, subfam) if p)
url_full = f"https://www.zarahome.com/pl/en/{vprod.get('productUrl','')}"
name = vprod.get("name", "")
article = det["displayReference"]
root_price = int(vprod.get("price", 0)) / 100
root_wt = vprod.get("weight", "")
raw_xmedia = det.get("xmedia") or vprod.get("xmedia") or []
default_idx = det.get("xmediaDefaultSet")
if isinstance(raw_xmedia, list) and raw_xmedia:
media_sets = [raw_xmedia[default_idx]] if isinstance(default_idx, int) else raw_xmedia
elif isinstance(raw_xmedia, dict):
media_sets = [raw_xmedia]
else:
media_sets = []
all_imgs = [f"https://static.zarahome.net/8/photos4{loc['path']}/{m['idMedia']}2.jpg"
for loc in media_sets for m in loc["xmediaItems"][0]["medias"]]
all_imgs_s = "\n".join(all_imgs)
comp_txt = ""
if det.get("compositionDetail") and det["compositionDetail"].get("parts"):
comp_txt = "\n".join(
extract_components_zarahome(det["compositionDetail"]["parts"])
)
care = "\n".join(c["description"] for c in det.get("care", []))
trace = ""
colors = det.get("colors") or [{
"id": 0, "name": "DEFAULT", "image": {"url": ""},
"sizes": [{
"visibilityValue": "SHOW",
"name": "", "description": "",
"weight": root_wt, "price": vprod.get("price", 0)
}]
}]
for clr in colors:
clr_code = clr.get("id")
clr_name = clr.get("name", "")
clr_image = ""
if clr.get("image") and clr["image"].get("url"):
clr_image = f"https://static.zarahome.net/8/photos4{clr['image']['url']}_3_1_5.jpg"
clr_sets = [loc for loc in media_sets if loc.get("colorCode") == clr_code] or media_sets
clr_imgs = [f"https://static.zarahome.net/8/photos4{loc['path']}/{m['idMedia']}2.jpg"
for loc in clr_sets for m in loc["xmediaItems"][0]["medias"]]
clr_imgs_s = "\n".join(clr_imgs)
for size in clr["sizes"]:
vis = size.get("visibilityValue", "UNKNOWN")
price = int(size.get("price") or vprod.get("price", 0)) / 100
weight = size.get("weight") or root_wt
size_name = size.get("name", "")
size_descr = size.get("description", "")
size_full = f"{size_descr} ({size_name})" if size_descr else size_name
sku_val = size.get("sku", "")
partnumber_val = size.get("partnumber", "")
country = size.get("country") or ""
trace_local = f"Made in {country}" if country else trace
row = [
url_full,
article,
sku_val,
partnumber_val,
name,
det.get("longDescription", ""),
clr_image,
clr_name,
size_full,
price,
weight,
vis,
all_imgs_s,
clr_imgs_s,
comp_txt,
care,
trace_local,
cat_path,
USER_BRAND
]
if DEL_SAME == "YES":
push_row_dedup(row, idx_map, seen, clean_rows)
else:
clean_rows.append(row)
csv_name = category.split("/")[-1]
recorder.record(csv_name, clean_rows)
# ───────────────────────────────────────────────────────────────────
def get_extractor():
with open(abspath("parse_settings.json"), "r", encoding="utf-8") as fh:
return Extractor(load(fh))