This commit is contained in:
va1is 2025-07-28 16:20:11 +03:00
parent fb6b1048d3
commit 16a31f92c5
340 changed files with 22776 additions and 1 deletions

2
.gitignore vendored
View File

@ -28,4 +28,4 @@ Temporary Items
.apdisk .apdisk
__pycache__/* __pycache__/*
records_folder/** records_folder/*

View File

@ -0,0 +1,36 @@
import pandas as pd
def add_depend_stock_column(file_path: str, output_path: str):
# Загружаем Excel-файл
df = pd.read_excel(file_path)
# Проверка наличия нужных колонок
required_columns = ['Артикул', 'PartNumber', 'Наличие на сайте']
for col in required_columns:
if col not in df.columns:
raise ValueError(f"Колонка '{col}' не найдена в файле.")
# Создаем словарь для быстрого поиска по (Артикул, последние 11 символов PartNumber)
lookup = {
(row['Артикул'], str(row['PartNumber'])[-11:]): row['Наличие на сайте']
for _, row in df.iterrows()
}
# Функция поиска соответствия
def get_depend_stock(row):
part = str(row['PartNumber'])[-11:]
key = (row['Артикул'], part)
return lookup.get(key, 'Нет данных')
# Добавляем колонку
df['DependStock'] = df.apply(get_depend_stock, axis=1)
# Сохраняем в новый файл
df.to_excel(output_path, index=False)
print(f"Готово! Результат сохранён в: {output_path}")
# Пример использования
if __name__ == "__main__":
input_file = "/Users/valis/Yandex.Disk.localized/Python3/Parsing ZARAHOME/src_2024-09-05/records_folder/lighting-ceiling-lamps-n4884.xlsx" # <-- ваш входной файл
output_file = "/Users/valis/Yandex.Disk.localized/Python3/Parsing ZARAHOME/src_2024-09-05/records_folder/result_with_depend_stock.xlsx" # <-- имя выходного файла
add_depend_stock_column(input_file, output_file)

Binary file not shown.

View File

@ -0,0 +1,17 @@
from openpyxl import load_workbook
from os.path import abspath
# получаем все ссылки из categories.xlsx
def get_categories():
wookbook = load_workbook(abspath("categories.xlsx"))
worksheet = wookbook.active
categories = []
for i in worksheet["A"]:
value = i.value
if value != None:
categories.append(value)
return categories

Binary file not shown.

View File

@ -0,0 +1,344 @@
# extractor.py · обновлён 2025-07-23
from json import load, loads
from os.path import abspath
from bs4 import BeautifulSoup
from lxml import etree
from re import split, search, sub
# ─────────────────────────────────────────────────────────────────────────────
def extract_components_zarahome(parts):
composition = []
for part in parts:
if part.get("areas") and part.get("description"):
if len(parts) != 1:
composition.append(part["description"])
for area in part["areas"]:
area_name = area["description"]
percentage_area = area["percentageArea"]
composition.append(f"{area_name} ({percentage_area})")
for component in area["components"]:
material = component["material"]
percentage = component["percentage"]
composition.append(f"{percentage} {material}")
elif part.get("components") and part.get("description"):
if len(parts) != 1:
composition.append(part["description"])
for component in part["components"]:
material = component["material"]
percentage = component["percentage"]
composition.append(f"{percentage} {material}")
return composition
# ─────────────────────────────────────────────────────────────────────────────
class Extractor:
# ----------------------------------------------------------------
def __init__(self, json_data):
self.methods = {
"": (self.default_extract_method, []),
"zarahome": (self.zarahome_extract_method, [
"Краткое описание",
"Артикул",
"SKU",
"PartNumber",
"Название товара или услуги",
"Полное описание",
"Образец цвета",
"Свойство: Цвет",
"Свойство: Размер",
"Цена закупки",
"Свойство: Вес(г)",
"Наличие на сайте",
"Изображения",
"Изображения варианта",
"Параметр: Состав",
"Параметр: Уход",
"Параметр: Происхождение",
"Размещение на сайте",
"Свойство: Бренд"
]),
# заглушки для старых магазинов
"zara": (self.zara_extract_method, []),
"eobuwie": (self.eobuwie_extract_method, []),
"decathlon": (self.decathlon_extract_method, []),
"chanel": (self.chanel_extract_method, []),
}
self.method = json_data["method"]
self.tags = json_data["tags"]
self.headers = self.methods[self.method][1].copy()
for tag in self.tags:
self.headers.insert(tag["column_number"], tag["column_name"])
# ----------------------------------------------------------------
# общие утилиты
def extract(self, parser, recorder, categories):
self.methods[self.method][0](parser, recorder, categories)
def default_extract_method(self, *a, **kw):
print("Default extractor → nothing to do.")
def tags_extract(self, soup, row):
dom = etree.HTML(str(soup))
for tag in self.tags:
res = dom.xpath(tag["xpath"])
col = ""
if res:
for el in res:
col += ''.join(el.itertext()).strip() + "\n"
row.insert(tag["column_number"], col)
# ----------------------------------------------------------------
# заглушки для других методов
def zara_extract_method(self, *_, **__): print("ZARA extractor disabled.")
def eobuwie_extract_method(self, *_, **__): print("Eobuwie extractor disabled.")
def decathlon_extract_method(self, *_, **__): print("Decathlon extractor disabled.")
def chanel_extract_method(self, *_, **__): print("Chanel extractor disabled.")
# ----------------------------------------------------------------
# Z A R A H O M E
# ----------------------------------------------------------------
def zarahome_extract_method(self, parser, recorder, categories):
BASE_API = "https://www.zarahome.com/itxrest/3/catalog/store/85009924/80290000"
USER_BRAND = "ZARAHOME"
for i, category in enumerate(categories):
table = [self.headers]
print(f"Categories: {i + 1} / {len(categories)} {category}")
# ── HTML категории ───────────────────────────────────────
html = parser.parse(category)
if html is None:
print("Extractor Error: empty page"); continue
soup = BeautifulSoup(html, "html.parser")
script = soup.select_one("#serverApp-state")
####### Вывод того что есть Начало
# dump_name = f"state_dump_{int(time.time())}.json"
# pathlib.Path(dump_name).write_text(script.string, encoding="utf-8")
# print(f"🛈 serverApp-state saved → {dump_name}")
#
# state = loads(script.string)
# print("TOP-LEVEL KEYS:", list(state.keys())[:20])
# print("inditex-data KEYS:", list(state.get("inditex-data", {}).keys()))
####### Вывод того что есть Конец
if not script:
print("Extractor Error: script#serverApp-state not found"); continue
try:
state = loads(script.string)
except Exception as e:
print(f"Extractor Error: bad JSON ({e})"); continue
# ── category_id ──────────────────────────────────────────
cdata = state.get("inditex-data", {})
cat_id = (cdata.get("iCategoryId") or
cdata.get("categoryId") or
cdata.get("iCategoryJSON", {}).get("id"))
if not cat_id:
for k in state:
m = search(r"/category/(\d+)/product", k)
if m: cat_id = m.group(1); break
if not cat_id:
print("Extractor Error: cannot detect category_id"); continue
# ── блок с продуктами или их ID ─────────────────────────
key = next((k for k in state if f"/category/{cat_id}/product" in k), None)
if not key:
print("Extractor Error: products block not found"); continue
prod_block = state[key]
summaries = []
# ★ Старая схема: products уже внутри
if "products" in prod_block:
for grp in prod_block["products"]:
summaries += grp["bundleProductSummaries"]
# ★ Новая схема: нужно тянуть по productIds
else:
ids = (prod_block.get("productIds") or
prod_block.get("sortedProductIds") or
prod_block.get("sortedProductIdsByPricesAsc") or [])
print(f"→ pulling {len(ids)} products via API")
CHUNK = 20
for p in range(0, len(ids), CHUNK):
ids_chunk = ",".join(map(str, ids[p:p+CHUNK]))
api = (f"{BASE_API}/productsArray"
f"?languageId=-1&productIds={ids_chunk}&appId=1")
data = parser.parse(api, return_type="json")
summaries += data.get("products", [])
print("DEBUG summaries count:", len(summaries))
for p in summaries:
print("", p.get("id"), p.get("productUrl"))
# ── путь категории для CSV ───────────────────────────────
# cat_json = cdata.get("iCategoryJSON", {})
# cat_title = "/".join(cat_json.get("parentNames", []) +
# [cat_json.get("name", "")])
# cat_path = f"Каталог/ZaraHome/{cat_title}"
seen = set()
for n, prod in enumerate(summaries, 1):
short_url = prod.get("productUrl")
if not short_url or short_url in seen:
continue
seen.add(short_url)
print(f"Products: {n} / {len(summaries)} "
f"https://www.zarahome.com/pl/{short_url}")
# ── у некоторых prod нет вариантов → смотрим глубже ──
variant_products = []
if prod.get("detail", {}).get("colors"):
variant_products.append(prod)
elif prod.get("bundleProductSummaries"):
variant_products += prod["bundleProductSummaries"]
else:
variant_products.append(prod) # моно-товар без вариантов
# ── обрабатываем каждый vprod (вариант или сам товар) ─
for vprod in variant_products:
det = vprod["detail"]
url_full = f"https://www.zarahome.com/pl/en/{vprod.get('productUrl','')}"
name = vprod.get("name", "")
article = det["displayReference"]
root_price = int(vprod.get("price", 0)) / 100
root_wt = vprod.get("weight", "")
# ── все изображения ────────────────────────────
raw_xmedia = (det.get("xmedia") or vprod.get("xmedia") or [])
default_idx = det.get("xmediaDefaultSet")
if isinstance(raw_xmedia, list) and raw_xmedia:
media_sets = [raw_xmedia[default_idx]] if isinstance(default_idx, int) else raw_xmedia
elif isinstance(raw_xmedia, dict):
media_sets = [raw_xmedia]
else:
media_sets = []
all_imgs = [
f"https://static.zarahome.net/8/photos4{loc['path']}/{m['idMedia']}2.jpg"
for loc in media_sets
for m in loc["xmediaItems"][0]["medias"]
]
all_imgs_s = "\n".join(all_imgs)
# ── состав / уход / происхождение ───────────────
comp_block = det.get("compositionDetail")
comp_txt = ""
if comp_block and comp_block.get("parts"):
comp_txt = "\n".join(
extract_components_zarahome(comp_block["parts"])
)
care = "\n".join(c["description"] for c in det.get("care", []))
trace = ""
if det.get("traceability"):
trace = "\n".join(
f"{v['name']}\n" + "\n".join(v["country"])
for v in det["traceability"].values()
if isinstance(v, dict) and v.get("country") and v.get("name")
)
# ── цвета и размеры ─────────────────────────────
colors_list = det.get("colors") or []
if not colors_list: # моно-товар без цветов
colors_list = [{
"id": 0,
"name": "DEFAULT",
"image": {"url": ""},
"sizes": [{
# "visibilityValue": "SHOW",
"name": "",
"description": "",
"weight": root_wt,
"price": vprod.get("price", 0)
}]
}]
serial = 0
for clr in colors_list:
if clr.get("image") is None and clr["name"] != "DEFAULT":
continue
clr_code = clr.get("id")
clr_name = clr.get("name", "")
clr_image = ""
if clr.get("image") and clr["image"].get("url"):
clr_image = (f"https://static.zarahome.net/8/photos4"
f"{clr['image']['url']}_3_1_5.jpg")
# картинки именно этого цвета
media_sets = [loc for loc in media_sets
if loc.get("colorCode") == clr_code] or media_sets
clr_imgs = [
f"https://static.zarahome.net/8/photos4{loc['path']}/{m['idMedia']}2.jpg"
for loc in media_sets
for m in loc["xmediaItems"][0]["medias"]
]
clr_imgs_s = "\n".join(clr_imgs)
for size in clr["sizes"]:
# if size["visibilityValue"] != "SHOW":
# continue
#suffix = "" if serial == 0 else f"-{serial}" Раскомментить если надо добавлять "-1,2,3" к артикуду при повторениях
serial += 1
visibility = size.get("visibilityValue", "UNKNOWN")
size_name = size.get("name", "")
size_descr = size.get("description", "")
size_full = f"{size_descr} ({size_name})" if size_descr else size_name
size_weight = size.get("weight") or root_wt
size_price = int(size.get("price") or vprod.get("price", 0)) / 100
# ── путь категории из sectionNameEN / familyName / subFamilyName
sec = vprod.get("sectionNameEN") or "" # верхний уровень
fam = vprod.get("familyName") or "" # семья
sub = vprod.get("subFamilyName") or "" # подсемья
cat_parts = [p for p in (sec, fam, sub) if p] # убираем пустые
cat_path = "Каталог/ZaraHome/" + "/".join(cat_parts)
sku_val = size.get("sku", "")
partnumber_val = size.get("partnumber", "")
table.append([
url_full,
f"{article}", #{suffix}", Раскомментить если надо добавлять "-1,2,3" к артикуду при повторениях
name,
sku_val, # ← SKU
partnumber_val, # ← PartNumber
det.get("longDescription", ""),
clr_image,
clr_name,
size_full,
size_price,
size_weight,
visibility,
all_imgs_s,
clr_imgs_s,
comp_txt,
care,
trace,
cat_path,
USER_BRAND
])
# ── запись CSV ──────────────────────────────────────────
csv_name = category.split("/")[-1]
recorder.record(csv_name, table)
def get_extractor():
with open(abspath("parse_settings.json"), "r", encoding="utf-8") as file:
return Extractor(load(file))

View File

@ -0,0 +1,317 @@
# extractor.py · v 2.0 · 2025-07-23
from json import load, loads
from os.path import abspath
from bs4 import BeautifulSoup
from lxml import etree
import logging, os, sys
# ────────────────────── настройка логирования ─────────────────────
_log_level = os.getenv("LOG_LEVEL", "INFO").upper()
logging.basicConfig(
level=_log_level,
stream=sys.stdout,
format="%(asctime)s%(levelname)-5s%(message)s",
datefmt="%H:%M:%S"
)
log = logging.getLogger("extractor")
# ────────────────────── вспомогательные функции ───────────────────
def extract_components_zarahome(parts):
comp = []
for part in parts:
if part.get("areas") and part.get("description"):
if len(parts) != 1:
comp.append(part["description"])
for area in part["areas"]:
comp.append(f"{area['description']} ({area['percentageArea']})")
for c in area["components"]:
comp.append(f"{c['percentage']} {c['material']}")
elif part.get("components") and part.get("description"):
if len(parts) != 1:
comp.append(part["description"])
for c in part["components"]:
comp.append(f"{c['percentage']} {c['material']}")
return comp
# ────────────────────────────────────────────────────────────────────
class Extractor:
def __init__(self, json_data):
self.methods = {
"": (self.default_extract_method, []),
"zarahome": (self.zarahome_extract_method, [
"Краткое описание",
"Артикул",
"SKU",
"PartNumber",
"Название товара или услуги",
"Полное описание",
"Образец цвета",
"Свойство: Цвет",
"Свойство: Размер",
"Цена закупки",
"Свойство: Вес(г)",
"Наличие на сайте",
"Изображения",
"Изображения варианта",
"Параметр: Состав",
"Параметр: Уход",
"Параметр: Происхождение",
"Размещение на сайте",
"Свойство: Бренд"
]),
"zara": (self.zara_extract_method, []),
"eobuwie": (self.eobuwie_extract_method, []),
"decathlon": (self.decathlon_extract_method, []),
"chanel": (self.chanel_extract_method, []),
}
self.method = json_data["method"]
self.tags = json_data["tags"]
self.headers = self.methods[self.method][1].copy()
for tag in self.tags:
self.headers.insert(tag["column_number"], tag["column_name"])
# ────────────────────────── общие утилиты ─────────────────────
def extract(self, parser, recorder, categories):
self.methods[self.method][0](parser, recorder, categories)
def default_extract_method(self, *a, **kw):
log.info("Default extractor → nothing to do.")
def tags_extract(self, soup, row):
dom = etree.HTML(str(soup))
for tag in self.tags:
res = dom.xpath(tag["xpath"])
col = ""
if res:
for el in res:
col += ''.join(el.itertext()).strip() + "\n"
row.insert(tag["column_number"], col)
# ─────────── заглушки для неиспользуемых магазинов ────────────
def zara_extract_method(self, *_, **__): log.info("ZARA extractor disabled.")
def eobuwie_extract_method(self, *_, **__): log.info("Eobuwie extractor disabled.")
def decathlon_extract_method(self, *_, **__): log.info("Decathlon extractor disabled.")
def chanel_extract_method(self, *_, **__): log.info("Chanel extractor disabled.")
# ───────────────────── Z A R A H O M E ───────────────────────
def zarahome_extract_method(self, parser, recorder, categories):
BASE_API = "https://www.zarahome.com/itxrest/3/catalog/store/85009924/80290000"
USER_BRAND = "ZARAHOME"
def fetch_json(url):
try:
return parser.parse(url, return_type="json")
except Exception as err:
log.warning("Request Error: %s - %s", err, url)
alt = url.replace(
"ieec2cihslb3-zarahome.central.inditex.grp",
"www.zarahome.com"
)
if alt != url:
log.info("→ retry via public host")
return parser.parse(alt, return_type="json")
return None
for c_idx, category in enumerate(categories, 1):
table = [self.headers]
log.info("Categories: %s / %s %s", c_idx, len(categories), category)
html = parser.parse(category)
if html is None:
log.warning("Extractor Error: empty page"); continue
soup = BeautifulSoup(html, "html.parser")
script = soup.select_one("#serverApp-state")
if not script:
log.warning("Extractor Error: script not found for %s", category)
continue
state = loads(script.string)
cat_key = next(k for k in state if "/category?" in k)
cat_info = state[cat_key]
ids = [str(p["id"]) for p in cat_info.get("products", [])]
summaries = []
# (A) productIds
if ids:
log.debug("→ pulling %s productIds via API", len(ids))
CHUNK = 20
for p in range(0, len(ids), CHUNK):
api = (f"{BASE_API}/productsArray?languageId=-1&"
f"productIds={','.join(ids[p:p+CHUNK])}&appId=1")
data = fetch_json(api)
if not data or "products" not in data:
log.debug("Skip chunk (no data)")
continue
summaries += data["products"]
# (B) products в state или рекурсивный обход
else:
prod_key = next((k for k in state if "/product?" in k), None)
if prod_key and "products" in state[prod_key]:
log.debug("→ products array found in state")
for grp in state[prod_key]["products"]:
summaries += grp.get("bundleProductSummaries", [])
# ★ если products нет, но есть productIds → пользуемся API
elif prod_key and "productIds" in state[prod_key]:
ids = state[prod_key]["productIds"]
log.debug("→ pulling %s productIds via API (from prod_block)", len(ids))
CHUNK = 20
for p in range(0, len(ids), CHUNK):
api = (f"{BASE_API}/productsArray?languageId=-1&"
f"productIds={','.join(map(str, ids[p:p+CHUNK]))}&appId=1")
data = fetch_json(api)
if not data or "products" not in data:
log.debug("Skip chunk (no data)")
continue
summaries += data["products"]
else:
subcats = cat_info.get("subcategories") or []
if not subcats:
log.info("→ no products in this category")
continue
log.info("→ diving into %s subcategories", len(subcats))
for sub in subcats:
sub_url = "https://www.zarahome.com/pl/en/" + sub["url"]
sub_html = parser.parse(sub_url)
if not sub_html:
continue
sub_state = loads(BeautifulSoup(sub_html, "html.parser")
.select_one("#serverApp-state").string)
sub_prod_key = next((k for k in sub_state if "/product?" in k), None)
if sub_prod_key and "products" in sub_state[sub_prod_key]:
for grp in sub_state[sub_prod_key]["products"]:
summaries += grp.get("bundleProductSummaries", [])
log.debug("JSON summaries count: %s", len(summaries))
seen_ids = set()
for n, prod in enumerate(summaries, 1):
prod_id = prod.get("id")
short_url = prod.get("productUrl")
if not short_url and prod.get("seo"):
kw = prod["seo"].get("keyword", "")
sid = prod["seo"].get("seoProductId", "")
if kw and sid:
short_url = f"{kw}-p{sid}.html"
prod["productUrl"] = short_url
if not short_url or prod_id in seen_ids:
continue
seen_ids.add(prod_id)
log.info("Products: %s / %s %s", n, len(summaries),
f"https://www.zarahome.com/pl/{short_url}")
variants = prod.get("bundleProductSummaries") or [prod]
for vprod in variants:
det = vprod["detail"]
sec, fam, sub = (vprod.get("sectionNameEN") or "",
vprod.get("familyName") or "",
vprod.get("subFamilyName") or "")
cat_path = "Каталог/ZaraHome/" + "/".join(p for p in (sec, fam, sub) if p)
url_full = f"https://www.zarahome.com/pl/en/{vprod.get('productUrl','')}"
name = vprod.get("name", "")
article = det["displayReference"]
root_price = int(vprod.get("price", 0)) / 100
root_wt = vprod.get("weight", "")
raw_xmedia = det.get("xmedia") or vprod.get("xmedia") or []
default_idx = det.get("xmediaDefaultSet")
if isinstance(raw_xmedia, list) and raw_xmedia:
media_sets = [raw_xmedia[default_idx]] if isinstance(default_idx, int) else raw_xmedia
elif isinstance(raw_xmedia, dict):
media_sets = [raw_xmedia]
else:
media_sets = []
all_imgs = [f"https://static.zarahome.net/8/photos4{loc['path']}/{m['idMedia']}2.jpg"
for loc in media_sets
for m in loc["xmediaItems"][0]["medias"]]
all_imgs_s = "\n".join(all_imgs)
comp_txt = ""
if det.get("compositionDetail") and det["compositionDetail"].get("parts"):
comp_txt = "\n".join(
extract_components_zarahome(det["compositionDetail"]["parts"])
)
care = "\n".join(c["description"] for c in det.get("care", []))
trace = ""
colors = det.get("colors") or [{
"id": 0, "name": "DEFAULT", "image": {"url": ""},
"sizes": [{
"visibilityValue": "SHOW",
"name": "", "description": "",
"weight": root_wt, "price": vprod.get("price", 0)
}]
}]
#serial = 0
for clr in colors:
clr_code = clr.get("id")
clr_name = clr.get("name", "")
clr_image = ""
if clr.get("image") and clr["image"].get("url"):
clr_image = f"https://static.zarahome.net/8/photos4{clr['image']['url']}_3_1_5.jpg"
clr_sets = [loc for loc in media_sets if loc.get("colorCode") == clr_code] or media_sets
clr_imgs = [f"https://static.zarahome.net/8/photos4{loc['path']}/{m['idMedia']}2.jpg"
for loc in clr_sets
for m in loc["xmediaItems"][0]["medias"]]
clr_imgs_s = "\n".join(clr_imgs)
for size in clr["sizes"]:
vis = size.get("visibilityValue", "UNKNOWN")
price = int(size.get("price") or vprod.get("price", 0)) / 100
weight = size.get("weight") or root_wt
# ── страна изготовления (если есть в size)
country = size.get("country") or ""
trace_local = f"Made in {country}" if country else trace
size_name = size.get("name", "")
size_descr = size.get("description", "")
size_full = f"{size_descr} ({size_name})" if size_descr else size_name
# ── SKU / PartNumber берём из size ───────────────
sku_val = size.get("sku", "")
partnumber_val = size.get("partnumber", "")
table.append([
url_full,
article,
sku_val,
partnumber_val,
name,
det.get("longDescription", ""),
clr_image,
clr_name,
size_full,
price,
weight,
vis,
all_imgs_s,
clr_imgs_s,
comp_txt,
care,
trace_local,
cat_path,
USER_BRAND
])
csv_name = category.split("/")[-1]
recorder.record(csv_name, table)
# ────────────────────────────────────────────────────────────────────
def get_extractor():
with open(abspath("parse_settings.json"), "r", encoding="utf-8") as fh:
return Extractor(load(fh))

View File

@ -0,0 +1,379 @@
# extractor.py · v 2.0 · 2025-07-23
from json import load, loads
from os.path import abspath
from bs4 import BeautifulSoup
from lxml import etree
import logging, os, sys
# включение / выключение фильтра дубликатов
DEL_SAME = "YES" # "YES" → фильтр активен, "NO" → пишем всё как есть
# ────────────────────── настройка логирования ─────────────────────
_log_level = os.getenv("LOG_LEVEL", "INFO").upper()
logging.basicConfig(
level=_log_level,
stream=sys.stdout,
format="%(asctime)s%(levelname)-5s%(message)s",
datefmt="%H:%M:%S"
)
log = logging.getLogger("extractor")
# ────────────────────── вспомогательные функции ───────────────────
def extract_components_zarahome(parts):
comp = []
for part in parts:
if part.get("areas") and part.get("description"):
if len(parts) != 1:
comp.append(part["description"])
for area in part["areas"]:
comp.append(f"{area['description']} ({area['percentageArea']})")
for c in area["components"]:
comp.append(f"{c['percentage']} {c['material']}")
elif part.get("components") and part.get("description"):
if len(parts) != 1:
comp.append(part["description"])
for c in part["components"]:
comp.append(f"{c['percentage']} {c['material']}")
return comp
# ────────────────────────────────────────────────────────────────────
# ────────────────── фильтр «одинаковых» товаров ──────────────────
def filter_duplicates(table, headers):
"""Убирает строки по правилам DEL_SAME. table[0] — заголовок."""
if DEL_SAME != "YES" or len(table) <= 2:
return table
# индексы нужных колонок
idx = {h: i for i, h in enumerate(headers)}
art_i = idx["Артикул"]
name_i = idx["Название товара или услуги"]
size_i = idx["Свойство: Размер"]
price_i = idx["Цена закупки"]
clr_i = idx["Свойство: Цвет"]
pn_i = idx["PartNumber"]
vis_i = idx["Наличие на сайте"]
keep_rows = [table[0]] # сохраняем заголовок
groups = {}
# ── группируем по 5 базовым полям ───────────────────────────────
for row in table[1:]:
key = (row[art_i], row[name_i], row[size_i], row[price_i], row[clr_i])
groups.setdefault(key, []).append(row)
# ── применяем правила к каждой группе ───────────────────────────
for rows in groups.values():
if len(rows) == 1:
keep_rows.append(rows[0])
continue
# 2) одни и те же PartNumber? → оставляем первую
pn_set = {r[pn_i] for r in rows}
if len(pn_set) == 1:
keep_rows.append(rows[0])
continue
# 3) vis одинаковый?
vis_set = {r[vis_i] for r in rows}
if len(vis_set) == 1: # одинаковые
# 4) сравниваем 4-символьные коды
good = []
for r in rows:
art4 = r[art_i][:4]
pn4 = r[pn_i][1:5] if len(r[pn_i]) >= 5 else ""
if art4 == pn4:
good.append(r)
# оставляем только подходящие; если ни одного — первую
keep_rows.extend(good or [rows[0]])
else: # 5) vis разные
show = [r for r in rows if r[vis_i] == "SHOW"]
keep_rows.extend(show or rows) # остаётся SHOW, иначе всё
return keep_rows
class Extractor:
def __init__(self, json_data):
self.methods = {
"": (self.default_extract_method, []),
"zarahome": (self.zarahome_extract_method, [
"Краткое описание",
"Артикул",
"SKU",
"PartNumber",
"Название товара или услуги",
"Полное описание",
"Образец цвета",
"Свойство: Цвет",
"Свойство: Размер",
"Цена закупки",
"Свойство: Вес(г)",
"Наличие на сайте",
"Изображения",
"Изображения варианта",
"Параметр: Состав",
"Параметр: Уход",
"Параметр: Происхождение",
"Размещение на сайте",
"Свойство: Бренд"
]),
"zara": (self.zara_extract_method, []),
"eobuwie": (self.eobuwie_extract_method, []),
"decathlon": (self.decathlon_extract_method, []),
"chanel": (self.chanel_extract_method, []),
}
self.method = json_data["method"]
self.tags = json_data["tags"]
self.headers = self.methods[self.method][1].copy()
for tag in self.tags:
self.headers.insert(tag["column_number"], tag["column_name"])
# ────────────────────────── общие утилиты ─────────────────────
def extract(self, parser, recorder, categories):
self.methods[self.method][0](parser, recorder, categories)
def default_extract_method(self, *a, **kw):
log.info("Default extractor → nothing to do.")
def tags_extract(self, soup, row):
dom = etree.HTML(str(soup))
for tag in self.tags:
res = dom.xpath(tag["xpath"])
col = ""
if res:
for el in res:
col += ''.join(el.itertext()).strip() + "\n"
row.insert(tag["column_number"], col)
# ─────────── заглушки для неиспользуемых магазинов ────────────
def zara_extract_method(self, *_, **__): log.info("ZARA extractor disabled.")
def eobuwie_extract_method(self, *_, **__): log.info("Eobuwie extractor disabled.")
def decathlon_extract_method(self, *_, **__): log.info("Decathlon extractor disabled.")
def chanel_extract_method(self, *_, **__): log.info("Chanel extractor disabled.")
# ───────────────────── Z A R A H O M E ───────────────────────
def zarahome_extract_method(self, parser, recorder, categories):
BASE_API = "https://www.zarahome.com/itxrest/3/catalog/store/85009924/80290000"
USER_BRAND = "ZARAHOME"
def fetch_json(url):
try:
return parser.parse(url, return_type="json")
except Exception as err:
log.warning("Request Error: %s - %s", err, url)
alt = url.replace(
"ieec2cihslb3-zarahome.central.inditex.grp",
"www.zarahome.com"
)
if alt != url:
log.info("→ retry via public host")
return parser.parse(alt, return_type="json")
return None
for c_idx, category in enumerate(categories, 1):
table = [self.headers]
log.info("Categories: %s / %s %s", c_idx, len(categories), category)
html = parser.parse(category)
if html is None:
log.warning("Extractor Error: empty page"); continue
soup = BeautifulSoup(html, "html.parser")
script = soup.select_one("#serverApp-state")
if not script:
log.warning("Extractor Error: script not found for %s", category)
continue
state = loads(script.string)
cat_key = next(k for k in state if "/category?" in k)
cat_info = state[cat_key]
ids = [str(p["id"]) for p in cat_info.get("products", [])]
summaries = []
# (A) productIds
if ids:
log.debug("→ pulling %s productIds via API", len(ids))
CHUNK = 20
for p in range(0, len(ids), CHUNK):
api = (f"{BASE_API}/productsArray?languageId=-1&"
f"productIds={','.join(ids[p:p+CHUNK])}&appId=1")
data = fetch_json(api)
if not data or "products" not in data:
log.debug("Skip chunk (no data)")
continue
summaries += data["products"]
# (B) products в state или рекурсивный обход
else:
prod_key = next((k for k in state if "/product?" in k), None)
if prod_key and "products" in state[prod_key]:
log.debug("→ products array found in state")
for grp in state[prod_key]["products"]:
summaries += grp.get("bundleProductSummaries", [])
# ★ если products нет, но есть productIds → пользуемся API
elif prod_key and "productIds" in state[prod_key]:
ids = state[prod_key]["productIds"]
log.debug("→ pulling %s productIds via API (from prod_block)", len(ids))
CHUNK = 60
for p in range(0, len(ids), CHUNK):
api = (f"{BASE_API}/productsArray?languageId=-1&"
f"productIds={','.join(map(str, ids[p:p+CHUNK]))}&appId=1")
data = fetch_json(api)
if not data or "products" not in data:
log.debug("Skip chunk (no data)")
continue
summaries += data["products"]
else:
subcats = cat_info.get("subcategories") or []
if not subcats:
log.info("→ no products in this category")
continue
log.info("→ diving into %s subcategories", len(subcats))
for sub in subcats:
sub_url = "https://www.zarahome.com/pl/en/" + sub["url"]
sub_html = parser.parse(sub_url)
if not sub_html:
continue
sub_state = loads(BeautifulSoup(sub_html, "html.parser")
.select_one("#serverApp-state").string)
sub_prod_key = next((k for k in sub_state if "/product?" in k), None)
if sub_prod_key and "products" in sub_state[sub_prod_key]:
for grp in sub_state[sub_prod_key]["products"]:
summaries += grp.get("bundleProductSummaries", [])
log.debug("JSON summaries count: %s", len(summaries))
seen_ids = set()
for n, prod in enumerate(summaries, 1):
prod_id = prod.get("id")
short_url = prod.get("productUrl")
if not short_url and prod.get("seo"):
kw = prod["seo"].get("keyword", "")
sid = prod["seo"].get("seoProductId", "")
if kw and sid:
short_url = f"{kw}-p{sid}.html"
prod["productUrl"] = short_url
if not short_url or prod_id in seen_ids:
continue
seen_ids.add(prod_id)
log.info("Products: %s / %s %s", n, len(summaries),
f"https://www.zarahome.com/pl/{short_url}")
variants = prod.get("bundleProductSummaries") or [prod]
for vprod in variants:
det = vprod["detail"]
sec, fam, sub = (vprod.get("sectionNameEN") or "",
vprod.get("familyName") or "",
vprod.get("subFamilyName") or "")
cat_path = "Каталог/ZaraHome/" + "/".join(p for p in (sec, fam, sub) if p)
url_full = f"https://www.zarahome.com/pl/en/{vprod.get('productUrl','')}"
name = vprod.get("name", "")
article = det["displayReference"]
root_price = int(vprod.get("price", 0)) / 100
root_wt = vprod.get("weight", "")
raw_xmedia = det.get("xmedia") or vprod.get("xmedia") or []
default_idx = det.get("xmediaDefaultSet")
if isinstance(raw_xmedia, list) and raw_xmedia:
media_sets = [raw_xmedia[default_idx]] if isinstance(default_idx, int) else raw_xmedia
elif isinstance(raw_xmedia, dict):
media_sets = [raw_xmedia]
else:
media_sets = []
all_imgs = [f"https://static.zarahome.net/8/photos4{loc['path']}/{m['idMedia']}2.jpg"
for loc in media_sets
for m in loc["xmediaItems"][0]["medias"]]
all_imgs_s = "\n".join(all_imgs)
comp_txt = ""
if det.get("compositionDetail") and det["compositionDetail"].get("parts"):
comp_txt = "\n".join(
extract_components_zarahome(det["compositionDetail"]["parts"])
)
care = "\n".join(c["description"] for c in det.get("care", []))
trace = ""
colors = det.get("colors") or [{
"id": 0, "name": "DEFAULT", "image": {"url": ""},
"sizes": [{
"visibilityValue": "SHOW",
"name": "", "description": "",
"weight": root_wt, "price": vprod.get("price", 0)
}]
}]
#serial = 0
for clr in colors:
clr_code = clr.get("id")
clr_name = clr.get("name", "")
clr_image = ""
if clr.get("image") and clr["image"].get("url"):
clr_image = f"https://static.zarahome.net/8/photos4{clr['image']['url']}_3_1_5.jpg"
clr_sets = [loc for loc in media_sets if loc.get("colorCode") == clr_code] or media_sets
clr_imgs = [f"https://static.zarahome.net/8/photos4{loc['path']}/{m['idMedia']}2.jpg"
for loc in clr_sets
for m in loc["xmediaItems"][0]["medias"]]
clr_imgs_s = "\n".join(clr_imgs)
for size in clr["sizes"]:
vis = size.get("visibilityValue", "UNKNOWN")
price = int(size.get("price") or vprod.get("price", 0)) / 100
weight = size.get("weight") or root_wt
# ── страна изготовления (если есть в size)
country = size.get("country") or ""
trace_local = f"Made in {country}" if country else trace
size_name = size.get("name", "")
size_descr = size.get("description", "")
size_full = f"{size_descr} ({size_name})" if size_descr else size_name
# ── SKU / PartNumber берём из size ───────────────
sku_val = size.get("sku", "")
partnumber_val = size.get("partnumber", "")
table.append([
url_full,
article,
sku_val,
partnumber_val,
name,
det.get("longDescription", ""),
clr_image,
clr_name,
size_full,
price,
weight,
vis,
all_imgs_s,
clr_imgs_s,
comp_txt,
care,
trace_local,
cat_path,
USER_BRAND
])
csv_name = category.split("/")[-1]
clean_table = filter_duplicates(table, self.headers)
recorder.record(csv_name, clean_table)
#csv_name = category.split("/")[-1]
#recorder.record(csv_name, table)
# ────────────────────────────────────────────────────────────────────
def get_extractor():
with open(abspath("parse_settings.json"), "r", encoding="utf-8") as fh:
return Extractor(load(fh))

View File

@ -0,0 +1,940 @@
from json import load, loads
from os.path import abspath
from bs4 import BeautifulSoup
from lxml import etree
from re import split, search, sub
def extract_components_zarahome(parts):
composition = []
for part in parts:
if part.get("areas") and part.get("description"):
if len(parts) != 1:
composition.append(part["description"])
for area in part["areas"]:
area_name = area["description"]
percentage_area = area["percentageArea"]
composition.append(f"{area_name} ({percentage_area})")
for component in area["components"]:
material = component["material"]
percentage = component["percentage"]
composition.append(f"{percentage} {material}")
elif part.get("components") and part.get("description"):
if len(parts) != 1:
composition.append(part["description"])
for component in part["components"]:
material = component["material"]
percentage = component["percentage"]
composition.append(f"{percentage} {material}")
return composition
# класс для извлечения нужных данных
class Extractor:
def __init__(self, json_data):
self.methods = {
"": (self.default_extract_method, []),
"zarahome": (self.zarahome_extract_method, [
"Краткое описание",
"Артикул",
"Название товара или услуги",
"Полное описание",
"Образец цвета",
"Свойство: Цвет",
"Свойство: Размер",
"Цена закупки",
"Свойство: Вес(г)",
"Изображения",
"Изображения варианта",
"Параметр: Состав",
"Параметр: Уход",
"Параметр: Происхождение",
"Размещение на сайте",
"Свойство: Бренд"
]),
"eobuwie": (self.eobuwie_extract_method, [
"Краткое описание",
"Артикул",
"Свойство: Размер",
"Полное описание(Таблица)",
"Название товара или услуги",
"Изображения",
"Размещение на сайте",
"Цена",
"Наличие"
]),
"decathlon": (self.decathlon_extract_method, [
"Краткое описание",
"Артикул",
"Название товара или услуги",
"Полное описание",
"Наличие",
"Свойство: Цвет",
"Свойство: Размер",
"Цена закупки",
"Параметр: Вес(г)",
"Изображения варианта",
"Размещение на сайте"
]),
"zara": (self.zara_extract_method, [
"Краткое описание",
"Артикул",
"Название товара или услуги",
"Наличие",
"Образец цвета",
"Свойство: Цвет",
"Свойство: Размер",
"Цена закупки",
"Изображения",
"Параметр: Состав",
"Параметр: Уход",
"Параметр: Происхождение",
"Размещение на сайте",
"Свойство: Бренд"
]),
"chanel": (self.chanel_extract_method, [
"Краткое описание",
"Артикул",
"Наличие",
"Свойство: Цвет",
"Свойство: Размер",
"Цена закупки",
"Изображения",
"Размещение на сайте",
"Свойство: Бренд"
])
}
self.method = json_data["method"]
self.tags = json_data["tags"]
self.headers = self.methods[self.method][1]
for tag in self.tags:
self.headers.insert(tag["column_number"], tag["column_name"])
def extract(self, parser, recorder, categories):
self.methods[self.method][0](parser, recorder, categories)
def default_extract_method(self):
pass
def tags_extract(self, soup, row):
dom_tree = etree.HTML(str(soup))
for tag in self.tags:
xpath_result = dom_tree.xpath(tag["xpath"])
column_data = ""
if len(xpath_result):
for element in xpath_result:
column_data = ''.join(element.itertext()).strip() + "\n"
row.insert(tag["column_number"], column_data)
def chanel_extract_method(self, parser, recorder, categories):
BASE_URL = "https://www.chanel.com"
for i, category in enumerate(categories):
table = [self.headers]
print(f"Categories: {i + 1} / {len(categories)}", category)
continue_loop = True
category_page = 1
request_elements_count = 24
product_number = 1
category_pattern = r"\/pl\/[\w\d]+\/"
location = "chanel/" + search(category_pattern, category)[0].replace("pl", "").replace("/", "")
while continue_loop:
category_data = parser.parse(f"{category}?requestType=ajax&page={category_page}&totalElementsCount={request_elements_count}", return_type="json")
if not category_data["next"]:
continue_loop = False
products_count = category_data["totalProducts"]
for product in category_data["dataLayer"]["productList"].values():
first_variant = True
article_pattern = r"\/p\/[\d\w]+/"
base_link = BASE_URL + product["quickviewPopin"]["page"]
print(f"Products: {product_number} / {products_count}", base_link)
product_number += 1
links = [base_link]
while len(links):
product_url = links.pop(0)
product_page = parser.parse(product_url)
if product_page == None:
continue
soup = BeautifulSoup(product_page, "html.parser")
if first_variant:
first_variant = False
variants_links = soup.select(".link.js-tabpanel-anchor")
replace_pattern = r"\/p\/.+$"
for variant_link in variants_links:
article = variant_link.get("data-value")
if not article in product_url:
links.append(sub(replace_pattern, f"/p/{article}", product_url))
product_url = soup.select("[property=\"og:url\"]")[0].get("content")
article = search(article_pattern, product_url)[0].replace("/", "").replace("p", "")
product_info = parser.parse(f"{BASE_URL}/pl/yapi/product/{article}?options=basic,vto,variants,stock&site=chanel", return_type="json")
stock = 0
if product_info["stock"]["stockLevel"] == "IN_STOCK":
stock = 1
product_color_name = product_info["color"]["name"]
product_size = product_info.get("size")
product_price = product_info["buyNow"].get("priceValue")
images = "\n".join(map(lambda x: x["url"], product_info["basic"]["images"]))
product_brand = "chanel"
try:
table_data = []
table_data.append([
product_url,
article,
stock,
product_color_name,
product_size,
product_price,
images,
location,
product_brand
])
self.tags_extract(soup, table_data[-1])
table += table_data.copy()
except Exception as error:
print(f"Extractor Error: {error}")
csv_name = category.replace(f"{BASE_URL}/pl/", "").replace("/", "_")
recorder.record(csv_name, table)
def zara_extract_method(self, parser, recorder, categories):
BASE_URL = "https://www.zara.com"
BASE_POLISH_URL = "https://www.zara.com/pl/en/"
for i, category in enumerate(categories):
table = [self.headers]
print(f"Categories: {i + 1} / {len(categories)}", category)
category_page = parser.parse(category)
category_soup = BeautifulSoup(category_page, "html.parser")
verify_url = category_soup.select("[http-equiv=\"refresh\"]")[0].get("content").split("'")[1]
bm_verify = verify_url.split("?")[-1]
category_page = parser.parse(BASE_URL + verify_url)
category_soup = BeautifulSoup(category_page, "html.parser")
tag_script_inner = category_soup.select("[type=\"text/javascript\"][data-compress=\"true\"]")[0].text
analytics_data = loads(search(r"zara\.analyticsData\s?=\s?{.+};", tag_script_inner)[0].split("=")[1].replace(";", ""))
category_id = analytics_data["catGroupId"]
category_products = parser.parse(f"{BASE_POLISH_URL}category/{category_id}/products?ajax=true", return_type="json")
location = "ZARA/" + "/".join(category.split("/")[5].split("-")[:2]).upper()
all_products_count = 0
for element in category_products["productGroups"][0]["elements"]:
products = element.get("commercialComponents")
if not products:
continue
for product in products:
if not product.get("name"):
continue
all_products_count += 1
product_number = 0
for element in category_products["productGroups"][0]["elements"]:
products = element.get("commercialComponents")
if not products:
continue
for product in products:
product_name = product.get("name")
if not product_name:
continue
product_number += 1
seo_keyword = product["seo"]["keyword"]
seo_id = product["seo"]["seoProductId"]
if not seo_keyword:
continue
product_url = f"{BASE_POLISH_URL}{seo_keyword}-p{seo_id}.html"
print(f"Products: {product_number} / {all_products_count}", product_url)
article = product["detail"]["displayReference"]
product_color_hex = product["colorInfo"].get("mainColorHexCode")
product_color_name = product["detail"]["colors"][0]["name"]
product_price = product["price"] / 100
product_brand = product["brand"].get("brandGroupCode")
product_page = parser.parse(f"{product_url}?{bm_verify}")
if product_page == None:
continue
soup = BeautifulSoup(product_page, "html.parser")
sizes = soup.select("[data-qa-action][role=\"option\"]")
images = "\n".join(map(lambda x: x.get("srcset").split(", ")[-1].split(" ")[0], soup.select(f"source[sizes=\"32vw\"]")))
product_id = product["id"]
extra_data = parser.parse(f"https://www.zara.com/pl/pl/product/{product_id}/extra-detail?ajax=true", return_type="json")
extra_data_extracted = {}
for section in extra_data:
extra_data_extracted[section["sectionType"]] = ""
for component in section["components"]:
if component["datatype"] in ["subtitle", "paragraph"]:
extra_data_extracted[section["sectionType"]] += component["text"]["value"] + "\n"
elif component["datatype"] == "spacer":
extra_data_extracted[section["sectionType"]] += "\n"
elif component["datatype"] == "iconList":
for item in component["items"]:
if item["datatype"] == "iconListItem" and item["description"]["datatype"] == "text":
extra_data_extracted[section["sectionType"]] += item["description"]["value"] + "\n"
materials = extra_data_extracted.get("materials")
care = extra_data_extracted.get("care")
origin = extra_data_extracted.get("origin")
for size in sizes:
try:
table_data = []
if size.get("data-qa-action") == "size-in-stock":
stock = 1
else:
stock = 0
product_size = size.select(".product-size-info__main-label")[0].text
table_data.append([
product_url,
f"{article} - {product_size}",
product_name,
stock,
product_color_hex,
product_color_name,
product_size,
product_price,
images,
materials,
care,
origin,
location,
product_brand
])
self.tags_extract(soup, table_data[-1])
table += table_data.copy()
except Exception as error:
print(f"Extractor Error: {error}")
csv_name = category.split("/")[-1].split("?")[0]
recorder.record(csv_name, table)
def decathlon_extract_method(self, parser, recorder, categories):
BASE_URL = "https://www.decathlon.pl"
for i, category in enumerate(categories):
table = [self.headers]
print(f"Categories: {i + 1} / {len(categories)}", category)
continue_loop = True
category_from = 0
while continue_loop:
category_page = parser.parse(f"{category}?from={category_from}")
category_soup = BeautifulSoup(category_page, "html.parser")
offers_count = int(category_soup.select("h1 ~ span.count")[0].text.split(" ")[0])
products_links = category_soup.select("[class$=\"model-link\"]")
products_links_count = len(products_links)
for e, product_link in enumerate(products_links):
product_url = BASE_URL + product_link.get("href")
print(f"Products: {e + 1 + category_from} / {offers_count}", product_url)
product_page = parser.parse(product_url)
if product_page == None:
continue
soup = BeautifulSoup(product_page, "html.parser")
meta_script_tags = soup.select("[type=\"application/ld+json\"]")
if len(meta_script_tags) <= 1:
continue
meta_data = loads(meta_script_tags[1].text)
path_steps = []
for step in meta_data["itemListElement"]:
path_steps.append(step["item"]["name"])
product_path = "decathlon/" + "/".join(path_steps)
script_json = soup.select("#__dkt")[0]
__dkt = loads(script_json.text.replace("__DKT = ", ""))
if __dkt["_ctx"]["page"]["id"] != "product":
continue
models_data = __dkt["_ctx"]["data"][4]["data"]["models"]
for model in models_data:
color = ""
colors = []
if model.get("colors"):
for color_info in model["colors"]:
colors.append(color_info["label"])
color = " / ".join(colors)
images = []
for image_info in model["images"]["product"]:
images.append(image_info["url"].replace("/250x250", ""))
image_lines = "\n".join(images)
product_name = model["webLabel"]
product_description = soup.select("[id^=\"ProductFunctionalities\"]")
if len(product_description):
product_description = product_description[0].encode_contents()
else:
product_description = ""
skus_data = model["skus"]
sku_ids = []
for sku in skus_data:
sku_ids.append(sku["skuId"])
sku_ids = ",".join(sku_ids)
stocks = parser.parse(f"https://www.decathlon.pl/pl/ajax/nfs/stocks/online?skuIds={sku_ids}", return_type="json")
for sku in skus_data:
try:
sku_id = sku["skuId"]
stock = stocks[sku_id]["stockOnline"] if stocks.get(sku_id) else "unknown"
table_data = []
article = f'{model["modelId"]}-{sku_id}'
size = ""
if sku.get("size"):
size = sku["size"]
price = ""
if sku.get("price"):
price = sku["price"]
weight = ""
if sku.get("grossWeight"):
weight = float(sku["grossWeight"])
table_data.append([
product_url,
article,
product_name,
product_description,
stock,
color,
size,
price,
weight,
image_lines,
product_path
])
self.tags_extract(soup, table_data[-1])
table += table_data.copy()
except Exception as error:
print(f"Extractor Error: {error}")
if offers_count == products_links_count + category_from:
continue_loop = False
else:
category_from += products_links_count
csv_name = "_".join(category.split("/")[4:]).replace(":", "-").replace("?", "_").replace("=", "_")
recorder.record(csv_name, table)
def eobuwie_extract_method(self, parser, recorder, categories):
for i, category in enumerate(categories):
table = [self.headers]
print(f"Categories: {i + 1} / {len(categories)}", category)
category_page = 1
category_marka = category.split(":")[2].split("?")[0]
category_type = category.split("/")[4]
while True:
category_products_data = parser.parse(f"https://eobuwie.com.pl/t-api/rest/search/eobuwie/v5/search?channel=eobuwie&currency=PLN&locale=pl_PL&limit=100&page={category_page}&filters[marka][in][]={category_marka}&categories[]={category_type}&select[]=url_key&select[]=product_group_associated&select[]=images&select[]=final_price&select[]=footwear_size&select_locales[]=pl_PL", return_type="json")
total = category_products_data["total"]
products = category_products_data["products"]
for e, product in enumerate(products):
short_url = product["values"]["url_key"]["value"]["pl_PL"]
product_url = f"https://eobuwie.com.pl/p/{short_url}"
print(f"Products: {e + 1 + ((category_page - 1) * 100)} / {total}", product_url)
product_page = parser.parse(product_url)
if product_page == None:
continue
soup = BeautifulSoup(product_page, "html.parser")
links = soup.select(".breadcrumb-list .text-link")[2:]
product_location = "/".join(list(map(lambda x: x.text, links)))
product_group = ""
if product["values"].get("product_group_associated") and product["values"]["product_group_associated"].get("value"):
product_group = product["values"]["product_group_associated"]["value"]
product_name = soup.select("[data-test-id=\"product-name\"]")[0].text.strip()
product_name = split(r"\d", product_name)[0]
product_name = f"{product_name} - {product_group}"
images_list = []
if product["values"].get("images") and product["values"]["images"].get("value"):
for image in product["values"]["images"]["value"]:
if image.get("url"):
images_list.append(f'https://img.modivo.cloud/eob_product_1800w_1800h({image["url"]}.jpg,webp)')
images_list = "\n".join(images_list)
for i, variant in enumerate(product["variants"].values()):
try:
table_data = []
size_url = variant["size"]
variant_url = f"{product_url}?size={size_url}"
article = variant["id"]
size_name = ""
if variant["values"].get("footwear_size"):
size_name = variant["values"]["footwear_size"]["value"]["label"]
description = ""
location = f"Каталог/Обувь и аксессуары/{product_location}"
availability = variant["stock_quantity"]
if variant["stock_quantity"]:
price = variant["offers"][0]["final_price"]["amount"]
else:
price = product["values"]["final_price"]["value"]["pl_PL"]["PLN"]["amount"]
table_data.append([
variant_url,
article,
size_name,
description,
product_name,
images_list,
location,
price,
availability
])
self.tags_extract(soup, table_data[-1])
table += table_data.copy()
except Exception as error:
print(f"Extractor Error: {error}")
if category_page * 100 >= total:
break
category_page += 1
csv_name = category.split("/")[-1].replace(":", "-").replace("?", "_").replace("=", "_")
recorder.record(csv_name, table)
def zarahome_extract_method(self, parser, recorder, categories):
BASE_API = "https://ieec2cihslb3-zarahome.central.inditex.grp/itxrest/3/catalog/store/85009924/80290000"
USER_BRAND = "ZARAHOME"
for i, category in enumerate(categories):
table = [self.headers]
print(f"Categories: {i + 1} / {len(categories)} {category}")
# ── HTML категории ───────────────────────────────────────
html = parser.parse(category)
if html is None:
print("Extractor Error: empty page"); continue
soup = BeautifulSoup(html, "html.parser")
script = soup.select_one("#serverApp-state")
####### Вывод того что есть Начало
# dump_name = f"state_dump_{int(time.time())}.json"
# pathlib.Path(dump_name).write_text(script.string, encoding="utf-8")
# print(f"🛈 serverApp-state saved → {dump_name}")
#
# state = loads(script.string)
# print("TOP-LEVEL KEYS:", list(state.keys())[:20])
# print("inditex-data KEYS:", list(state.get("inditex-data", {}).keys()))
####### Вывод того что есть Конец
if not script:
print("Extractor Error: script#serverApp-state not found"); continue
try:
state = loads(script.string)
except Exception as e:
print(f"Extractor Error: bad JSON ({e})"); continue
# ── category_id ──────────────────────────────────────────
cdata = state.get("inditex-data", {})
cat_id = (cdata.get("iCategoryId") or
cdata.get("categoryId") or
cdata.get("iCategoryJSON", {}).get("id"))
if not cat_id:
for k in state:
m = search(r"/category/(\d+)/product", k)
if m: cat_id = m.group(1); break
if not cat_id:
print("Extractor Error: cannot detect category_id"); continue
# ── блок с продуктами или их ID ─────────────────────────
key = next((k for k in state if f"/category/{cat_id}/product" in k), None)
if not key:
print("Extractor Error: products block not found"); continue
prod_block = state[key]
summaries = []
# ★ Старая схема: products уже внутри
if "products" in prod_block:
for grp in prod_block["products"]:
summaries += grp["bundleProductSummaries"]
# ★ Новая схема: нужно тянуть по productIds
else:
ids = (prod_block.get("productIds") or
prod_block.get("sortedProductIds") or
prod_block.get("sortedProductIdsByPricesAsc") or [])
print(f"→ pulling {len(ids)} products via API")
CHUNK = 20
for p in range(0, len(ids), CHUNK):
ids_chunk = ",".join(map(str, ids[p:p+CHUNK]))
api = (f"{BASE_API}/productsArray"
f"?languageId=-1&productIds={ids_chunk}&appId=1")
data = parser.parse(api, return_type="json")
summaries += data.get("products", [])
print("DEBUG summaries count:", len(summaries))
for p in summaries:
print("", p.get("id"), p.get("productUrl"))
# ── путь категории для CSV ───────────────────────────────
# cat_json = cdata.get("iCategoryJSON", {})
# cat_title = "/".join(cat_json.get("parentNames", []) +
# [cat_json.get("name", "")])
# cat_path = f"Каталог/ZaraHome/{cat_title}"
seen = set()
for n, prod in enumerate(summaries, 1):
short_url = prod.get("productUrl")
if not short_url or short_url in seen:
continue
seen.add(short_url)
print(f"Products: {n} / {len(summaries)} "
f"https://www.zarahome.com/pl/{short_url}")
# ── у некоторых prod нет вариантов → смотрим глубже ──
variant_products = []
if prod.get("detail", {}).get("colors"):
variant_products.append(prod)
elif prod.get("bundleProductSummaries"):
variant_products += prod["bundleProductSummaries"]
else:
variant_products.append(prod) # моно-товар без вариантов
# ── обрабатываем каждый vprod (вариант или сам товар) ─
for vprod in variant_products:
det = vprod["detail"]
url_full = f"https://www.zarahome.com/pl/en/{vprod.get('productUrl','')}"
name = vprod.get("name", "")
article = det["displayReference"]
root_price = int(vprod.get("price", 0)) / 100
root_wt = vprod.get("weight", "")
# ── все изображения ────────────────────────────
raw_xmedia = (det.get("xmedia") or vprod.get("xmedia") or [])
default_idx = det.get("xmediaDefaultSet")
if isinstance(raw_xmedia, list) and raw_xmedia:
media_sets = [raw_xmedia[default_idx]] if isinstance(default_idx, int) else raw_xmedia
elif isinstance(raw_xmedia, dict):
media_sets = [raw_xmedia]
else:
media_sets = []
all_imgs = [
f"https://static.zarahome.net/8/photos4{loc['path']}/{m['idMedia']}2.jpg"
for loc in media_sets
for m in loc["xmediaItems"][0]["medias"]
]
all_imgs_s = "\n".join(all_imgs)
# ── состав / уход / происхождение ───────────────
comp_block = det.get("compositionDetail")
comp_txt = ""
if comp_block and comp_block.get("parts"):
comp_txt = "\n".join(
extract_components_zarahome(comp_block["parts"])
)
care = "\n".join(c["description"] for c in det.get("care", []))
trace = ""
if det.get("traceability"):
trace = "\n".join(
f"{v['name']}\n" + "\n".join(v["country"])
for v in det["traceability"].values()
if isinstance(v, dict) and v.get("country") and v.get("name")
)
# ── цвета и размеры ─────────────────────────────
colors_list = det.get("colors") or []
if not colors_list: # моно-товар без цветов
colors_list = [{
"id": 0,
"name": "DEFAULT",
"image": {"url": ""},
"sizes": [{
# "visibilityValue": "SHOW",
"name": "",
"description": "",
"weight": root_wt,
"price": vprod.get("price", 0)
}]
}]
serial = 0
for clr in colors_list:
if clr.get("image") is None and clr["name"] != "DEFAULT":
continue
clr_code = clr.get("id")
clr_name = clr.get("name", "")
clr_image = ""
if clr.get("image") and clr["image"].get("url"):
clr_image = (f"https://static.zarahome.net/8/photos4"
f"{clr['image']['url']}_3_1_5.jpg")
# картинки именно этого цвета
media_sets = [loc for loc in media_sets
if loc.get("colorCode") == clr_code] or media_sets
clr_imgs = [
f"https://static.zarahome.net/8/photos4{loc['path']}/{m['idMedia']}2.jpg"
for loc in media_sets
for m in loc["xmediaItems"][0]["medias"]
]
clr_imgs_s = "\n".join(clr_imgs)
for size in clr["sizes"]:
# if size["visibilityValue"] != "SHOW":
# continue
#suffix = "" if serial == 0 else f"-{serial}" Раскомментить если надо добавлять "-1,2,3" к артикуду при повторениях
serial += 1
visibility = size.get("visibilityValue", "UNKNOWN")
size_name = size.get("name", "")
size_descr = size.get("description", "")
size_full = f"{size_descr} ({size_name})" if size_descr else size_name
size_weight = size.get("weight") or root_wt
size_price = int(size.get("price") or vprod.get("price", 0)) / 100
# ── путь категории из sectionNameEN / familyName / subFamilyName
sec = vprod.get("sectionNameEN") or "" # верхний уровень
fam = vprod.get("familyName") or "" # семья
sub = vprod.get("subFamilyName") or "" # подсемья
cat_parts = [p for p in (sec, fam, sub) if p] # убираем пустые
cat_path = "Каталог/ZaraHome/" + "/".join(cat_parts)
sku_val = size.get("sku", "")
partnumber_val = size.get("partnumber", "")
table.append([
url_full,
f"{article}", #{suffix}", Раскомментить если надо добавлять "-1,2,3" к артикуду при повторениях
name,
sku_val, # ← SKU
partnumber_val, # ← PartNumber
det.get("longDescription", ""),
clr_image,
clr_name,
size_full,
size_price,
size_weight,
visibility,
all_imgs_s,
clr_imgs_s,
comp_txt,
care,
trace,
cat_path,
USER_BRAND
])
# ── запись CSV ──────────────────────────────────────────
csv_name = category.split("/")[-1]
recorder.record(csv_name, table)
def get_extractor():
with open(abspath("parse_settings.json"), "r", encoding="utf-8") as file:
return Extractor(load(file))

View File

@ -0,0 +1,11 @@
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
opt = Options()
#opt.add_argument("--headless=new") # можно убрать, чтобы увидеть окно
driver = webdriver.Chrome(options=opt) # БЕЗ service, БЕЗ путей!
driver.get("https://www.zarahome.com/pl/en")
print("Title:", driver.title)
print("ChromeDriver:", driver.capabilities['chrome']['chromedriverVersion'])
driver.quit()

View File

@ -0,0 +1,71 @@
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.chrome.options import Options
from bs4 import BeautifulSoup
import pandas as pd
from urllib.parse import urljoin
BASE_URL = "https://www.zarahome.com"
START_URL = f"{BASE_URL}/pl/en/"
TIMEOUT = 30
opt = Options()
#opt.add_argument("--headless=new")
opt.add_argument("--window-size=1920,1080")
opt.add_argument("--disable-gpu")
opt.add_argument("--disable-blink-features=AutomationControlled")
opt.add_experimental_option("excludeSwitches", ["enable-automation"])
opt.add_experimental_option("useAutomationExtension", False)
driver = webdriver.Chrome(options=opt)
wait = WebDriverWait(driver, TIMEOUT)
try:
driver.get(START_URL)
# cookies
try:
wait.until(EC.element_to_be_clickable((
By.XPATH, "//button[contains(.,'Accept') or contains(.,'Akcept')]"))
).click()
except Exception:
pass
# раскрываем бургер (если есть)
try:
wait.until(EC.element_to_be_clickable((
By.CSS_SELECTOR,
"button[aria-label='Menu'], button[data-testid='menu-button']"))
).click()
except Exception:
pass
# ждём пунктов меню
wait.until(EC.presence_of_element_located((
By.XPATH, "//nav//ul//a[normalize-space(text())!='']")))
html = driver.page_source
finally:
driver.quit()
# ── парсинг
soup = BeautifulSoup(html, "lxml")
links = soup.select("nav ul a[href]") # любой href, не только https
print("Всего найдено ссылок в DOM:", len(links))
records = set()
for a in links:
name = a.get_text(strip=True)
href = a["href"]
if not name or href.startswith("javascript"):
continue
full_url = urljoin(BASE_URL, href) # /pl/en/... → https://www.zarahome.com/pl/en/...
records.add((full_url, name))
print("После фильтрации уникальных:", len(records))
df = pd.DataFrame(sorted(records), columns=["URL", "Category"])
df.to_excel(r"C:\Users\valis\YandexDisk\Python3\Parsing ZARAHOME\src_2024-09-05categories.xlsx", index=False)
print(f"✔ Собрано {len(df)} ссылок → categories.xlsx")

View File

@ -0,0 +1,173 @@
#!/usr/bin/env python3
# zarahome_product_links.py
# — извлекает данные напрямую со страниц товаров Zara Home —
# Формат колонок совпадает с вашим категорийным парсером.
import json, re, sys, time, pathlib, requests, pandas as pd
from bs4 import BeautifulSoup
# ── константы ────────────────────────────────────────────────────
HEADERS = {"User-Agent": "Mozilla/5.0"}
PID_RE = re.compile(r"-l(\d+)(?:[/?]|$)") # productId из URL
REST_API = ("https://www.zarahome.com/itxrest/3/catalog/store/"
"85009924/80290000/productsArray?languageId=-1"
"&productIds={ids}&appId=1")
BRAND = "ZARAHOME"
# ── helpers ──────────────────────────────────────────────────────
def fetch(url: str, json_flag=False):
"""GET-обёртка c timeout и user-agent."""
r = requests.get(url, headers=HEADERS, timeout=15)
r.raise_for_status()
return r.json() if json_flag else r.text
def try_json_ld(soup: BeautifulSoup):
"""Ищем <script type='application/ld+json'> с Product."""
for tag in soup.find_all("script", attrs={"type": lambda t: t and "ld+json" in t}):
try:
data = json.loads(tag.string)
except Exception:
continue
if isinstance(data, list):
data = next((d for d in data if d.get("@type") == "Product"), None)
if data and data.get("@type") == "Product":
offers = data.get("offers", {})
return {
# минимальный набор, которого хватит для таблицы
"name": data.get("name",""),
"longDescription": data.get("description",""),
"displayReference": data.get("sku") or data.get("gtin13",""),
"price": float(offers.get("price",0))*100,
"weight": "",
"colors": [],
"xmedia": [],
"xmediaDefaultSet": None,
"image": data.get("image", []),
"care": [],
"traceability": {},
"compositionDetail": {}
}
return None
def try_server_state(soup: BeautifulSoup):
"""Пробуем detail из #serverApp-state."""
tag = soup.select_one("#serverApp-state")
if not tag:
return None
state = json.loads(tag.string)
for k, v in state.items():
if "/detail" in k and isinstance(v, dict):
return v
prod = state.get("inditex-data", {}).get("product")
return prod if isinstance(prod, dict) else None
def try_rest_api(url: str):
"""REST /productsArray по productId."""
m = PID_RE.search(url)
if not m:
return None
pid = m.group(1)
api_json = fetch(REST_API.format(ids=pid), json_flag=True)
for p in api_json.get("products", []):
if p.get("detail"):
return p["detail"]
bs = p.get("bundleProductSummaries")
if bs and bs[0].get("detail"):
return bs[0]["detail"]
return None
def get_detail(url: str):
"""Возвращает словарь detail (или бросает ValueError)."""
soup = BeautifulSoup(fetch(url), "html.parser")
return (
try_json_ld(soup) or
try_server_state(soup) or
try_rest_api(url)
) or (_ for _ in ()).throw(ValueError("detail не найден"))
def join_imgs(paths):
return "\n".join(paths)
def rows_from_detail(det: dict, url: str):
"""Собирает все строки CSV-формата из detail."""
# --- общие картинки ---
all_imgs = det.get("image", [])
if det.get("xmedia"):
media_sets = det["xmedia"]
if isinstance(media_sets, dict):
media_sets = [media_sets]
all_imgs = [f"https://static.zarahome.net/8/photos4{loc['path']}/{m['idMedia']}2.jpg"
for loc in media_sets
for m in loc["xmediaItems"][0]["medias"]]
all_imgs_s = join_imgs(all_imgs)
# --- текстовые поля ---
comp_txt = ""
cdet = det.get("compositionDetail", {})
if cdet.get("parts"):
comp_txt = "\n".join(
f"{p['name']}: " + ", ".join(f"{c['percentage']}% {c['name']}"
for c in p["composition"])
for p in cdet["parts"]
)
care_txt = "\n".join(c.get("description","") for c in det.get("care", []))
trace_txt = ""
if det.get("traceability"):
trace_txt = "\n".join(
f"{v['name']}\n" + "\n".join(v["country"])
for v in det["traceability"].values()
if isinstance(v, dict) and v.get("country") and v.get("name")
)
# --- цветов нет → одна строка ---
return [[
url,
det.get("displayReference",""),
det.get("name",""),
"", "", # SKU / PartNumber
det.get("longDescription",""),
all_imgs[0] if all_imgs else "",
"", "", # Color / Size
det.get("price",0)/100,
det.get("weight",""),
"",
all_imgs_s,
all_imgs_s,
comp_txt,
care_txt,
trace_txt,
"",
BRAND
]]
# ── main ────────────────────────────────────────────────────────────
def main():
base = pathlib.Path(__file__).resolve().parent
src = base / "links.xlsx"
if not src.exists():
sys.exit("⚠️ Поместите links.xlsx в ту же папку")
urls = pd.read_excel(src, header=None).iloc[:,0].dropna().tolist()
print("Ссылок:", len(urls))
headers = [
"URL","Article","Name","SKU","PartNumber","Description","ColorImage",
"Color","Size","Price","Weight","Visibility","AllImages","ColorImages",
"Composition","Care","Traceability","CategoryPath","Brand"
]
rows = []
for i, u in enumerate(urls, 1):
print(f"[{i}/{len(urls)}] {u}", end=" ")
try:
rows.extend(rows_from_detail(get_detail(u), u))
print("")
except Exception as e:
print("⚠️", e)
time.sleep(1.0) # задержка, чтоб не спамить
pd.DataFrame(rows, columns=headers).to_excel(base/"result.xlsx", index=False)
print("\n✅ result.xlsx сохранён")
if __name__ == "__main__":
main()

View File

@ -0,0 +1,996 @@
from json import load, loads
from os.path import abspath
from bs4 import BeautifulSoup
from lxml import etree
from re import split, search, sub
import json, textwrap
from pathlib import Path, PurePath
import json, time
def extract_components_zarahome(parts):
composition = []
for part in parts:
if part.get("areas") and part.get("description"):
if len(parts) != 1:
composition.append(part["description"])
for area in part["areas"]:
area_name = area["description"]
percentage_area = area["percentageArea"]
composition.append(f"{area_name} ({percentage_area})")
for component in area["components"]:
material = component["material"]
percentage = component["percentage"]
composition.append(f"{percentage} {material}")
elif part.get("components") and part.get("description"):
if len(parts) != 1:
composition.append(part["description"])
for component in part["components"]:
material = component["material"]
percentage = component["percentage"]
composition.append(f"{percentage} {material}")
return composition
# класс для извлечения нужных данных
class Extractor:
def __init__(self, json_data):
self.methods = {
"": (self.default_extract_method, []),
"zarahome": (self.zarahome_extract_method, [
"Краткое описание",
"Артикул",
"Название товара или услуги",
"Полное описание",
"Образец цвета",
"Свойство: Цвет",
"Свойство: Размер",
"Цена закупки",
"Свойство: Вес(г)",
"Изображения",
"Изображения варианта",
"Параметр: Состав",
"Параметр: Уход",
"Параметр: Происхождение",
"Размещение на сайте",
"Свойство: Бренд"
]),
"eobuwie": (self.eobuwie_extract_method, [
"Краткое описание",
"Артикул",
"Свойство: Размер",
"Полное описание(Таблица)",
"Название товара или услуги",
"Изображения",
"Размещение на сайте",
"Цена",
"Наличие"
]),
"decathlon": (self.decathlon_extract_method, [
"Краткое описание",
"Артикул",
"Название товара или услуги",
"Полное описание",
"Наличие",
"Свойство: Цвет",
"Свойство: Размер",
"Цена закупки",
"Параметр: Вес(г)",
"Изображения варианта",
"Размещение на сайте"
]),
"zara": (self.zara_extract_method, [
"Краткое описание",
"Артикул",
"Название товара или услуги",
"Наличие",
"Образец цвета",
"Свойство: Цвет",
"Свойство: Размер",
"Цена закупки",
"Изображения",
"Параметр: Состав",
"Параметр: Уход",
"Параметр: Происхождение",
"Размещение на сайте",
"Свойство: Бренд"
]),
"chanel": (self.chanel_extract_method, [
"Краткое описание",
"Артикул",
"Наличие",
"Свойство: Цвет",
"Свойство: Размер",
"Цена закупки",
"Изображения",
"Размещение на сайте",
"Свойство: Бренд"
])
}
self.method = json_data["method"]
self.tags = json_data["tags"]
self.headers = self.methods[self.method][1]
for tag in self.tags:
self.headers.insert(tag["column_number"], tag["column_name"])
def extract(self, parser, recorder, categories):
self.methods[self.method][0](parser, recorder, categories)
def default_extract_method(self):
pass
def tags_extract(self, soup, row):
dom_tree = etree.HTML(str(soup))
for tag in self.tags:
xpath_result = dom_tree.xpath(tag["xpath"])
column_data = ""
if len(xpath_result):
for element in xpath_result:
column_data = ''.join(element.itertext()).strip() + "\n"
row.insert(tag["column_number"], column_data)
def chanel_extract_method(self, parser, recorder, categories):
BASE_URL = "https://www.chanel.com"
for i, category in enumerate(categories):
table = [self.headers]
print(f"Categories: {i + 1} / {len(categories)}", category)
continue_loop = True
category_page = 1
request_elements_count = 24
product_number = 1
category_pattern = r"\/pl\/[\w\d]+\/"
location = "chanel/" + search(category_pattern, category)[0].replace("pl", "").replace("/", "")
while continue_loop:
category_data = parser.parse(f"{category}?requestType=ajax&page={category_page}&totalElementsCount={request_elements_count}", return_type="json")
if not category_data["next"]:
continue_loop = False
products_count = category_data["totalProducts"]
for product in category_data["dataLayer"]["productList"].values():
first_variant = True
article_pattern = r"\/p\/[\d\w]+/"
base_link = BASE_URL + product["quickviewPopin"]["page"]
print(f"Products: {product_number} / {products_count}", base_link)
product_number += 1
links = [base_link]
while len(links):
product_url = links.pop(0)
product_page = parser.parse(product_url)
if product_page == None:
continue
soup = BeautifulSoup(product_page, "html.parser")
if first_variant:
first_variant = False
variants_links = soup.select(".link.js-tabpanel-anchor")
replace_pattern = r"\/p\/.+$"
for variant_link in variants_links:
article = variant_link.get("data-value")
if not article in product_url:
links.append(sub(replace_pattern, f"/p/{article}", product_url))
product_url = soup.select("[property=\"og:url\"]")[0].get("content")
article = search(article_pattern, product_url)[0].replace("/", "").replace("p", "")
product_info = parser.parse(f"{BASE_URL}/pl/yapi/product/{article}?options=basic,vto,variants,stock&site=chanel", return_type="json")
stock = 0
if product_info["stock"]["stockLevel"] == "IN_STOCK":
stock = 1
product_color_name = product_info["color"]["name"]
product_size = product_info.get("size")
product_price = product_info["buyNow"].get("priceValue")
images = "\n".join(map(lambda x: x["url"], product_info["basic"]["images"]))
product_brand = "chanel"
try:
table_data = []
table_data.append([
product_url,
article,
stock,
product_color_name,
product_size,
product_price,
images,
location,
product_brand
])
self.tags_extract(soup, table_data[-1])
table += table_data.copy()
except Exception as error:
print(f"Extractor Error: {error}")
csv_name = category.replace(f"{BASE_URL}/pl/", "").replace("/", "_")
recorder.record(csv_name, table)
def zara_extract_method(self, parser, recorder, categories):
BASE_URL = "https://www.zara.com"
BASE_POLISH_URL = "https://www.zara.com/pl/en/"
for i, category in enumerate(categories):
table = [self.headers]
print(f"Categories: {i + 1} / {len(categories)}", category)
category_page = parser.parse(category)
category_soup = BeautifulSoup(category_page, "html.parser")
verify_url = category_soup.select("[http-equiv=\"refresh\"]")[0].get("content").split("'")[1]
bm_verify = verify_url.split("?")[-1]
category_page = parser.parse(BASE_URL + verify_url)
category_soup = BeautifulSoup(category_page, "html.parser")
tag_script_inner = category_soup.select("[type=\"text/javascript\"][data-compress=\"true\"]")[0].text
analytics_data = loads(search(r"zara\.analyticsData\s?=\s?{.+};", tag_script_inner)[0].split("=")[1].replace(";", ""))
category_id = analytics_data["catGroupId"]
category_products = parser.parse(f"{BASE_POLISH_URL}category/{category_id}/products?ajax=true", return_type="json")
location = "ZARA/" + "/".join(category.split("/")[5].split("-")[:2]).upper()
all_products_count = 0
for element in category_products["productGroups"][0]["elements"]:
products = element.get("commercialComponents")
if not products:
continue
for product in products:
if not product.get("name"):
continue
all_products_count += 1
product_number = 0
for element in category_products["productGroups"][0]["elements"]:
products = element.get("commercialComponents")
if not products:
continue
for product in products:
product_name = product.get("name")
if not product_name:
continue
product_number += 1
seo_keyword = product["seo"]["keyword"]
seo_id = product["seo"]["seoProductId"]
if not seo_keyword:
continue
product_url = f"{BASE_POLISH_URL}{seo_keyword}-p{seo_id}.html"
print(f"Products: {product_number} / {all_products_count}", product_url)
article = product["detail"]["displayReference"]
product_color_hex = product["colorInfo"].get("mainColorHexCode")
product_color_name = product["detail"]["colors"][0]["name"]
product_price = product["price"] / 100
product_brand = product["brand"].get("brandGroupCode")
product_page = parser.parse(f"{product_url}?{bm_verify}")
if product_page == None:
continue
soup = BeautifulSoup(product_page, "html.parser")
sizes = soup.select("[data-qa-action][role=\"option\"]")
images = "\n".join(map(lambda x: x.get("srcset").split(", ")[-1].split(" ")[0], soup.select(f"source[sizes=\"32vw\"]")))
product_id = product["id"]
extra_data = parser.parse(f"https://www.zara.com/pl/pl/product/{product_id}/extra-detail?ajax=true", return_type="json")
extra_data_extracted = {}
for section in extra_data:
extra_data_extracted[section["sectionType"]] = ""
for component in section["components"]:
if component["datatype"] in ["subtitle", "paragraph"]:
extra_data_extracted[section["sectionType"]] += component["text"]["value"] + "\n"
elif component["datatype"] == "spacer":
extra_data_extracted[section["sectionType"]] += "\n"
elif component["datatype"] == "iconList":
for item in component["items"]:
if item["datatype"] == "iconListItem" and item["description"]["datatype"] == "text":
extra_data_extracted[section["sectionType"]] += item["description"]["value"] + "\n"
materials = extra_data_extracted.get("materials")
care = extra_data_extracted.get("care")
origin = extra_data_extracted.get("origin")
for size in sizes:
try:
table_data = []
if size.get("data-qa-action") == "size-in-stock":
stock = 1
else:
stock = 0
product_size = size.select(".product-size-info__main-label")[0].text
table_data.append([
product_url,
f"{article} - {product_size}",
product_name,
stock,
product_color_hex,
product_color_name,
product_size,
product_price,
images,
materials,
care,
origin,
location,
product_brand
])
self.tags_extract(soup, table_data[-1])
table += table_data.copy()
except Exception as error:
print(f"Extractor Error: {error}")
csv_name = category.split("/")[-1].split("?")[0]
recorder.record(csv_name, table)
def decathlon_extract_method(self, parser, recorder, categories):
BASE_URL = "https://www.decathlon.pl"
for i, category in enumerate(categories):
table = [self.headers]
print(f"Categories: {i + 1} / {len(categories)}", category)
continue_loop = True
category_from = 0
while continue_loop:
category_page = parser.parse(f"{category}?from={category_from}")
category_soup = BeautifulSoup(category_page, "html.parser")
offers_count = int(category_soup.select("h1 ~ span.count")[0].text.split(" ")[0])
products_links = category_soup.select("[class$=\"model-link\"]")
products_links_count = len(products_links)
for e, product_link in enumerate(products_links):
product_url = BASE_URL + product_link.get("href")
print(f"Products: {e + 1 + category_from} / {offers_count}", product_url)
product_page = parser.parse(product_url)
if product_page == None:
continue
soup = BeautifulSoup(product_page, "html.parser")
meta_script_tags = soup.select("[type=\"application/ld+json\"]")
if len(meta_script_tags) <= 1:
continue
meta_data = loads(meta_script_tags[1].text)
path_steps = []
for step in meta_data["itemListElement"]:
path_steps.append(step["item"]["name"])
product_path = "decathlon/" + "/".join(path_steps)
script_json = soup.select("#__dkt")[0]
__dkt = loads(script_json.text.replace("__DKT = ", ""))
if __dkt["_ctx"]["page"]["id"] != "product":
continue
models_data = __dkt["_ctx"]["data"][4]["data"]["models"]
for model in models_data:
color = ""
colors = []
if model.get("colors"):
for color_info in model["colors"]:
colors.append(color_info["label"])
color = " / ".join(colors)
images = []
for image_info in model["images"]["product"]:
images.append(image_info["url"].replace("/250x250", ""))
image_lines = "\n".join(images)
product_name = model["webLabel"]
product_description = soup.select("[id^=\"ProductFunctionalities\"]")
if len(product_description):
product_description = product_description[0].encode_contents()
else:
product_description = ""
skus_data = model["skus"]
sku_ids = []
for sku in skus_data:
sku_ids.append(sku["skuId"])
sku_ids = ",".join(sku_ids)
stocks = parser.parse(f"https://www.decathlon.pl/pl/ajax/nfs/stocks/online?skuIds={sku_ids}", return_type="json")
for sku in skus_data:
try:
sku_id = sku["skuId"]
stock = stocks[sku_id]["stockOnline"] if stocks.get(sku_id) else "unknown"
table_data = []
article = f'{model["modelId"]}-{sku_id}'
size = ""
if sku.get("size"):
size = sku["size"]
price = ""
if sku.get("price"):
price = sku["price"]
weight = ""
if sku.get("grossWeight"):
weight = float(sku["grossWeight"])
table_data.append([
product_url,
article,
product_name,
product_description,
stock,
color,
size,
price,
weight,
image_lines,
product_path
])
self.tags_extract(soup, table_data[-1])
table += table_data.copy()
except Exception as error:
print(f"Extractor Error: {error}")
if offers_count == products_links_count + category_from:
continue_loop = False
else:
category_from += products_links_count
csv_name = "_".join(category.split("/")[4:]).replace(":", "-").replace("?", "_").replace("=", "_")
recorder.record(csv_name, table)
def eobuwie_extract_method(self, parser, recorder, categories):
for i, category in enumerate(categories):
table = [self.headers]
print(f"Categories: {i + 1} / {len(categories)}", category)
category_page = 1
category_marka = category.split(":")[2].split("?")[0]
category_type = category.split("/")[4]
while True:
category_products_data = parser.parse(f"https://eobuwie.com.pl/t-api/rest/search/eobuwie/v5/search?channel=eobuwie&currency=PLN&locale=pl_PL&limit=100&page={category_page}&filters[marka][in][]={category_marka}&categories[]={category_type}&select[]=url_key&select[]=product_group_associated&select[]=images&select[]=final_price&select[]=footwear_size&select_locales[]=pl_PL", return_type="json")
total = category_products_data["total"]
products = category_products_data["products"]
for e, product in enumerate(products):
short_url = product["values"]["url_key"]["value"]["pl_PL"]
product_url = f"https://eobuwie.com.pl/p/{short_url}"
print(f"Products: {e + 1 + ((category_page - 1) * 100)} / {total}", product_url)
product_page = parser.parse(product_url)
if product_page == None:
continue
soup = BeautifulSoup(product_page, "html.parser")
links = soup.select(".breadcrumb-list .text-link")[2:]
product_location = "/".join(list(map(lambda x: x.text, links)))
product_group = ""
if product["values"].get("product_group_associated") and product["values"]["product_group_associated"].get("value"):
product_group = product["values"]["product_group_associated"]["value"]
product_name = soup.select("[data-test-id=\"product-name\"]")[0].text.strip()
product_name = split(r"\d", product_name)[0]
product_name = f"{product_name} - {product_group}"
images_list = []
if product["values"].get("images") and product["values"]["images"].get("value"):
for image in product["values"]["images"]["value"]:
if image.get("url"):
images_list.append(f'https://img.modivo.cloud/eob_product_1800w_1800h({image["url"]}.jpg,webp)')
images_list = "\n".join(images_list)
for i, variant in enumerate(product["variants"].values()):
try:
table_data = []
size_url = variant["size"]
variant_url = f"{product_url}?size={size_url}"
article = variant["id"]
size_name = ""
if variant["values"].get("footwear_size"):
size_name = variant["values"]["footwear_size"]["value"]["label"]
description = ""
location = f"Каталог/Обувь и аксессуары/{product_location}"
availability = variant["stock_quantity"]
if variant["stock_quantity"]:
price = variant["offers"][0]["final_price"]["amount"]
else:
price = product["values"]["final_price"]["value"]["pl_PL"]["PLN"]["amount"]
table_data.append([
variant_url,
article,
size_name,
description,
product_name,
images_list,
location,
price,
availability
])
self.tags_extract(soup, table_data[-1])
table += table_data.copy()
except Exception as error:
print(f"Extractor Error: {error}")
if category_page * 100 >= total:
break
category_page += 1
csv_name = category.split("/")[-1].replace(":", "-").replace("?", "_").replace("=", "_")
recorder.record(csv_name, table)
# ────────────────────────────────────────────────────────────────
# ZARA HOME — обновлённый метод
# ────────────────────────────────────────────────────────────────
def zarahome_extract_method(self, parser, recorder, categories):
BASE_API = "https://www.zarahome.com/itxrest/3/catalog/store/85009924/80290000"
USER_BRAND = "ZARAHOME"
for i, category in enumerate(categories):
table = [self.headers]
print(f"Categories: {i + 1} / {len(categories)} {category}")
# ── HTML категории ───────────────────────────────────────
html = parser.parse(category)
if html is None:
print("Extractor Error: empty category page"); continue
soup = BeautifulSoup(html, "html.parser")
script = soup.select_one("#serverApp-state")
if not script:
print("Extractor Error: script#serverApp-state not found"); continue
try:
state = loads(script.string)
except Exception as e:
print(f"Extractor Error: bad JSON ({e})"); continue
# ── category_id ──────────────────────────────────────────
cdata = state.get("inditex-data", {})
cat_id = (cdata.get("iCategoryId")
or cdata.get("categoryId")
or cdata.get("iCategoryJSON", {}).get("id"))
if not cat_id:
for k in state:
m = search(r"/category/(\d+)/product", k)
if m: cat_id = m.group(1); break
if not cat_id:
print("Extractor Error: cannot detect category_id"); continue
# ── блок с продуктами или их ID ─────────────────────────
key = next((k for k in state if f"/category/{cat_id}/product" in k), None)
if not key:
print("Extractor Error: products block not found"); continue
prod_block = state[key]
summaries = []
# ★ СТАРАЯ схема: в JSON уже есть ["products"]
if "products" in prod_block:
for grp in prod_block["products"]:
for s in grp["bundleProductSummaries"]:
summaries.append({
"productUrl": s.get("productUrl", ""),
"__full": None, # полного JSON пока нет
"detail": s["detail"] # нужен reference
})
# ★ НОВАЯ схема: есть только ID-шки, тянем их пачками
else:
ids = (prod_block.get("productIds")
or prod_block.get("sortedProductIds")
or prod_block.get("sortedProductIdsByPricesAsc")
or [])
print(f"→ pulling {len(ids)} products via API")
CHUNK = 1
for p in range(0, len(ids), CHUNK):
ids_chunk = ",".join(map(str, ids[p:p+CHUNK]))
api = (f"{BASE_API}/productsArray"
f"?languageId=-1&productIds={ids_chunk}&appId=1")
data = parser.parse(api, return_type="json")
# печатаем красиво (ANSI-символы не экранируем, чтобы было читаемо)
print("\n=== RAW API JSON ===")
print(textwrap.indent(json.dumps(data, ensure_ascii=False, indent=2), " "))
print("=== END ===\n")
#### Печать в файл
fname = PurePath(api).parts[-1].split("?")[0] # productsArray
ts = int(time.time())
Path(f"/Users/valis/Yandex.Disk.localized/Python3/Parsing ZARAHOME/src_2024-09-05/records_folderdebug_{fname}_{ts}.json").write_text(
json.dumps(data, ensure_ascii=False, indent=2),
encoding="utf-8"
)
print(f"→ RAW saved to debug_{fname}_{ts}.json")
for prod in data.get("products", []):
summaries.append({
"productUrl": prod.get("productUrl", ""),
"__full": prod # уже полный JSON
})
# ── путь категории для итоговой таблицы ─────────────────
cat_json = cdata.get("iCategoryJSON", {})
cat_title = "/".join(cat_json.get("parentNames", []) +
[cat_json.get("name", "")])
cat_path = f"Каталог/ZaraHome/{cat_title}"
seen = set()
for n, summary in enumerate(summaries, 1):
short_url = summary.get("productUrl")
if not short_url or short_url in seen:
continue
seen.add(short_url)
print(f"Products: {n} / {len(summaries)} "
f"https://www.zarahome.com/pl/{short_url}")
# ── получаем полный JSON товара ─────────────────────
prod = summary.get("__full")
if prod is None: # старая схема
ref_id = summary["detail"]["reference"].split("-")[0]
api = (f"{BASE_API}/productsArray"
f"?languageId=-1&referenceIds={ref_id}&appId=1")
data = parser.parse(api, return_type="json")
if not data or "products" not in data:
print(f"Skip (no data) → {short_url}"); continue
prod = data["products"][0]
det = prod["detail"]
url_full = f"https://www.zarahome.com/pl/en/{prod.get('productUrl','')}"
article = det["displayReference"]
name = prod["name"]
descr = det["longDescription"]
# ── перед блоком "все изображения" ───────────────────────────────
print("DETAIL KEYS:", list(det.keys())[:20]) # покажем первые 20 ключей
print(
textwrap.indent(
json.dumps(det, ensure_ascii=False, indent=2), # полный JSON
prefix=" " # немного отступа
)
)
# ─────────────────────────────────────────────────────────────────
# ── ВСЕ ИЗОБРАЖЕНИЯ ──────────────────────────────────────────────
# raw_xmedia → либо список set-ов, либо None
raw_xmedia = (det.get("xmedia") or
prod.get("xmedia") or
[])
# default_idx → целое число (индекс) либо None
default_idx = det.get("xmediaDefaultSet")
# получаем список наборов, которые надо разобрать
if isinstance(raw_xmedia, list) and raw_xmedia:
if isinstance(default_idx, int):
media_sets = [raw_xmedia[default_idx]] # только дефолтный
else:
media_sets = raw_xmedia # все наборы
elif isinstance(raw_xmedia, dict):
media_sets = [raw_xmedia] # иногда словарь
else:
media_sets = []
all_imgs = [
f"https://static.zarahome.net/8/photos4{loc['path']}/{m['idMedia']}2.jpg"
for loc in media_sets
for m in loc["xmediaItems"][0]["medias"]
]
all_imgs_s = "\n".join(all_imgs)
# состав
colors_list = det.get("colors") or [] # может быть []
####
colors_list = det.get("colors") or []
if not colors_list: # псевдо-цвет
colors_list = [{
"id": 0,
"name": "DEFAULT",
"image": {"url": ""},
"sizes": [{
"visibilityValue": "SHOW",
"name": "",
"description": "",
"weight": prod.get("weight", ""),
"price": prod.get("price", 0)
}]
}]
##
comp_block = det.get("compositionDetail") or \
(colors_list[0].get("compositionDetail") if colors_list else None)
comp_txt = ""
if comp_block and comp_block.get("parts"):
comp_txt = "\n".join(
extract_components_zarahome(comp_block["parts"])
)
# уход
care = "\n".join(c["description"] for c in det["care"])
# traceability
trace = ""
if colors_list and colors_list[0].get("traceability"):
trace = "\n".join(
f"{v['name']}\n" + "\n".join(v["country"])
for v in colors_list[0]["traceability"].values()
if isinstance(v, dict) and v.get("country") and v.get("name")
)
# ── цвета / размеры ─────────────────────────────────
serial = 0
rows = []
if not colors_list: # у товара вообще нет вариантов цвета
continue # переходим к следующему товару
for clr in colors_list:
if clr["image"] is None: continue
clr_code = clr.get("id")
clr_name = clr.get("name", "")
# безопасно строим картинку: если поля нет — остаётся пусто
clr_image = ""
if clr.get("image") and clr["image"].get("url"):
clr_image = f"https://static.zarahome.net/8/photos4{clr['image']['url']}_3_1_5.jpg"
# ── ИЗОБРАЖЕНИЯ ЭТОГО ЦВЕТА ─────────────────────────────────────
raw_xmedia = (det.get("xmedia") or
prod.get("xmedia") or
[])
default_idx = det.get("xmediaDefaultSet")
if isinstance(raw_xmedia, list) and raw_xmedia:
media_sets = [raw_xmedia[default_idx]] if isinstance(default_idx, int) else raw_xmedia
elif isinstance(raw_xmedia, dict):
media_sets = [raw_xmedia]
else:
media_sets = []
clr_imgs = [
f"https://static.zarahome.net/8/photos4{loc['path']}/{m['idMedia']}2.jpg"
for loc in media_sets
if loc.get("colorCode") == clr_code
for m in loc["xmediaItems"][0]["medias"]
]
clr_imgs_s = "\n".join(clr_imgs)
for size in clr["sizes"]:
if size["visibilityValue"] != "SHOW": continue
suffix = "" if serial == 0 else f"-{serial}"
serial += 1
size_name = size["name"]
size_descr = size["description"]
size_full = f"{size_descr} ({size_name})" if size_descr else size_name
weight = size.get("weight") or prod.get("weight", "")
buy_price = int(size.get("price") or prod.get("price", 0)) / 100
rows.append([
url_full,
f"{article}{suffix}",
name,
descr,
clr_image,
clr_name,
size_full,
buy_price,
weight,
all_imgs_s,
clr_imgs_s,
comp_txt,
care,
trace,
cat_path,
USER_BRAND
])
table += rows
# ── сохраняем категорию ────────────────────────────────
csv_name = category.split("/")[-1]
recorder.record(csv_name, table)
def get_extractor():
with open(abspath("parse_settings.json"), "r", encoding="utf-8") as file:
return Extractor(load(file))

View File

@ -0,0 +1,341 @@
# extractor.py · v 2.0 · 2025-07-24
from json import load, loads
from os.path import abspath
from bs4 import BeautifulSoup
from lxml import etree
import logging, os, sys
# ────────────────────────── конфигурация ───────────────────────────
DEL_SAME = "YES" # "YES" → фильтрация, "NO" → без фильтра
_log_level = os.getenv("LOG_LEVEL", "INFO").upper()
logging.basicConfig(
level=_log_level,
stream=sys.stdout,
format="%(asctime)s%(levelname)-5s%(message)s",
datefmt="%H:%M:%S"
)
log = logging.getLogger("extractor")
# ────────────────────── вспом-функции Zara Home ────────────────────
def extract_components_zarahome(parts):
comp = []
for part in parts:
if part.get("areas") and part.get("description"):
if len(parts) != 1:
comp.append(part["description"])
for area in part["areas"]:
comp.append(f"{area['description']} ({area['percentageArea']})")
for c in area["components"]:
comp.append(f"{c['percentage']} {c['material']}")
elif part.get("components") and part.get("description"):
if len(parts) != 1:
comp.append(part["description"])
for c in part["components"]:
comp.append(f"{c['percentage']} {c['material']}")
return comp
# ───────────────────── фильтр дубликатов on-the-fly ────────────────
def push_row_dedup(row, idx, seen, out):
"""Добавляет строку в out, соблюдая правила DEL_SAME."""
art, name, size, price, clr = (row[idx["Артикул"]],
row[idx["Название товара или услуги"]],
row[idx["Свойство: Размер"]],
row[idx["Цена закупки"]],
row[idx["Свойство: Цвет"]])
base = (art, name, size, price, clr)
if base not in seen:
seen[base] = row
out.append(row)
return
old = seen[base]
pn_old, pn_new = old[idx["PartNumber"]], row[idx["PartNumber"]]
vis_old, vis_new = old[idx["Наличие на сайте"]], row[idx["Наличие на сайте"]]
# 2) одинаковый PartNumber -> игнор новой строки
if pn_old == pn_new:
return
# 3) vis одинаковый?
if vis_old == vis_new:
art4 = art[:4]
pn4_old = pn_old[1:5] if len(pn_old) >= 5 else ""
pn4_new = pn_new[1:5] if len(pn_new) >= 5 else ""
# 4) оставляем только совпавшие 4-символа
if art4 == pn4_new and art4 != pn4_old:
# новая подходит лучше -> заменить
seen[base] = row
out[out.index(old)] = row
# если старая совпадает, новая — нет -> игнор
return
# 5) vis разные -> оставляем SHOW
if vis_new == "SHOW" and vis_old != "SHOW":
seen[base] = row
out[out.index(old)] = row
# иначе — оставляем старую (SHOW уже сохранён) или обе off-SHOW пропускаем.
class Extractor:
# ----------------------------------------------------------------
def __init__(self, json_data):
self.methods = {
"": (self.default_extract_method, []),
"zarahome": (self.zarahome_extract_method, [
"Краткое описание",
"Артикул",
"SKU",
"PartNumber",
"Название товара или услуги",
"Полное описание",
"Образец цвета",
"Свойство: Цвет",
"Свойство: Размер",
"Цена закупки",
"Свойство: Вес(г)",
"Наличие на сайте",
"Изображения",
"Изображения варианта",
"Параметр: Состав",
"Параметр: Уход",
"Параметр: Происхождение",
"Размещение на сайте",
"Свойство: Бренд"
]),
"zara": (self.zara_extract_method, []),
"eobuwie": (self.eobuwie_extract_method, []),
"decathlon": (self.decathlon_extract_method, []),
"chanel": (self.chanel_extract_method, []),
}
self.method = json_data["method"]
self.tags = json_data["tags"]
self.headers = self.methods[self.method][1].copy()
for tag in self.tags:
self.headers.insert(tag["column_number"], tag["column_name"])
# ─────────────────────────── утилиты ───────────────────────────
def extract(self, parser, recorder, categories):
self.methods[self.method][0](parser, recorder, categories)
def default_extract_method(self, *a, **kw):
log.info("Default extractor → nothing to do.")
def tags_extract(self, soup, row):
dom = etree.HTML(str(soup))
for tag in self.tags:
res = dom.xpath(tag["xpath"])
col = ""
if res:
for el in res:
col += ''.join(el.itertext()).strip() + "\n"
row.insert(tag["column_number"], col)
# ───── заглушки ─────
def zara_extract_method(self, *_, **__): log.info("ZARA extractor disabled.")
def eobuwie_extract_method(self, *_, **__): log.info("Eobuwie extractor disabled.")
def decathlon_extract_method(self, *_, **__): log.info("Decathlon extractor disabled.")
def chanel_extract_method(self, *_, **__): log.info("Chanel extractor disabled.")
# ─────────────────────── ZARA HOME ─────────────────────────────
def zarahome_extract_method(self, parser, recorder, categories):
BASE_API = "https://www.zarahome.com/itxrest/3/catalog/store/85009924/80290000"
USER_BRAND = "ZARAHOME"
def fetch_json(url):
try:
return parser.parse(url, return_type="json")
except Exception as err:
log.warning("Request Error: %s - %s", err, url)
alt = url.replace("ieec2cihslb3-zarahome.central.inditex.grp",
"www.zarahome.com")
if alt != url:
log.info("→ retry via public host")
return parser.parse(alt, return_type="json")
return None
for c_idx, category in enumerate(categories, 1):
log.info("Categories: %s / %s %s", c_idx, len(categories), category)
# подготовка структур фильтра
clean_rows = [self.headers]
if DEL_SAME == "YES":
idx_map = {h: i for i, h in enumerate(self.headers)}
seen = {}
# ── HTML категории
html = parser.parse(category)
if html is None:
log.warning("Extractor Error: empty page"); continue
soup = BeautifulSoup(html, "lxml")
script = soup.select_one("#serverApp-state")
if not script:
log.warning("Extractor Error: script not found"); continue
state = loads(script.string)
cat_key = next(k for k in state if "/category?" in k)
cat_info = state[cat_key]
ids = [str(p["id"]) for p in cat_info.get("products", [])]
summaries = []
# (A) via productIds
if ids:
CHUNK = 60
for p in range(0, len(ids), CHUNK):
api = (f"{BASE_API}/productsArray?languageId=-1&"
f"productIds={','.join(ids[p:p+CHUNK])}&appId=1")
data = fetch_json(api)
if data and "products" in data:
summaries += data["products"]
else:
prod_key = next((k for k in state if "/product?" in k), None)
if prod_key and "products" in state[prod_key]:
for grp in state[prod_key]["products"]:
summaries += grp.get("bundleProductSummaries", [])
elif prod_key and "productIds" in state[prod_key]:
ids = state[prod_key]["productIds"]
CHUNK = 60
for p in range(0, len(ids), CHUNK):
api = (f"{BASE_API}/productsArray?languageId=-1&"
f"productIds={','.join(map(str, ids[p:p+CHUNK]))}&appId=1")
data = fetch_json(api)
if data and "products" in data:
summaries += data["products"]
else:
subcats = cat_info.get("subcategories") or []
for sub in subcats:
sub_url = "https://www.zarahome.com/pl/en/" + sub["url"]
sub_html = parser.parse(sub_url)
if not sub_html:
continue
sub_state = loads(BeautifulSoup(sub_html, "lxml")
.select_one("#serverApp-state").string)
sub_prod_key = next((k for k in sub_state if "/product?" in k), None)
if sub_prod_key and "products" in sub_state[sub_prod_key]:
for grp in sub_state[sub_prod_key]["products"]:
summaries += grp.get("bundleProductSummaries", [])
seen_ids = set()
for prod in summaries:
prod_id = prod.get("id")
short_url = prod.get("productUrl") or (
f"{prod['seo']['keyword']}-p{prod['seo']['seoProductId']}.html"
if prod.get("seo") else "")
if not short_url or prod_id in seen_ids:
continue
seen_ids.add(prod_id)
variants = prod.get("bundleProductSummaries") or [prod]
for vprod in variants:
det = vprod["detail"]
sec, fam, subfam = (vprod.get("sectionNameEN") or "",
vprod.get("familyName") or "",
vprod.get("subFamilyName") or "")
cat_path = "Каталог/ZaraHome/" + "/".join(p for p in (sec, fam, subfam) if p)
url_full = f"https://www.zarahome.com/pl/en/{vprod.get('productUrl','')}"
name = vprod.get("name", "")
article = det["displayReference"]
root_price = int(vprod.get("price", 0)) / 100
root_wt = vprod.get("weight", "")
raw_xmedia = det.get("xmedia") or vprod.get("xmedia") or []
default_idx = det.get("xmediaDefaultSet")
if isinstance(raw_xmedia, list) and raw_xmedia:
media_sets = [raw_xmedia[default_idx]] if isinstance(default_idx, int) else raw_xmedia
elif isinstance(raw_xmedia, dict):
media_sets = [raw_xmedia]
else:
media_sets = []
all_imgs = [f"https://static.zarahome.net/8/photos4{loc['path']}/{m['idMedia']}2.jpg"
for loc in media_sets for m in loc["xmediaItems"][0]["medias"]]
all_imgs_s = "\n".join(all_imgs)
comp_txt = ""
if det.get("compositionDetail") and det["compositionDetail"].get("parts"):
comp_txt = "\n".join(
extract_components_zarahome(det["compositionDetail"]["parts"])
)
care = "\n".join(c["description"] for c in det.get("care", []))
trace = ""
colors = det.get("colors") or [{
"id": 0, "name": "DEFAULT", "image": {"url": ""},
"sizes": [{
"visibilityValue": "SHOW",
"name": "", "description": "",
"weight": root_wt, "price": vprod.get("price", 0)
}]
}]
for clr in colors:
clr_code = clr.get("id")
clr_name = clr.get("name", "")
clr_image = ""
if clr.get("image") and clr["image"].get("url"):
clr_image = f"https://static.zarahome.net/8/photos4{clr['image']['url']}_3_1_5.jpg"
clr_sets = [loc for loc in media_sets if loc.get("colorCode") == clr_code] or media_sets
clr_imgs = [f"https://static.zarahome.net/8/photos4{loc['path']}/{m['idMedia']}2.jpg"
for loc in clr_sets for m in loc["xmediaItems"][0]["medias"]]
clr_imgs_s = "\n".join(clr_imgs)
for size in clr["sizes"]:
vis = size.get("visibilityValue", "UNKNOWN")
price = int(size.get("price") or vprod.get("price", 0)) / 100
weight = size.get("weight") or root_wt
size_name = size.get("name", "")
size_descr = size.get("description", "")
size_full = f"{size_descr} ({size_name})" if size_descr else size_name
sku_val = size.get("sku", "")
partnumber_val = size.get("partnumber", "")
country = size.get("country") or ""
trace_local = f"Страна изготовления {country}" if country else trace
row = [
url_full,
article,
sku_val,
partnumber_val,
name,
det.get("longDescription", ""),
clr_image,
clr_name,
size_full,
price,
weight,
vis,
all_imgs_s,
clr_imgs_s,
comp_txt,
care,
trace_local,
cat_path,
USER_BRAND
]
if DEL_SAME == "YES":
push_row_dedup(row, idx_map, seen, clean_rows)
else:
clean_rows.append(row)
csv_name = category.split("/")[-1]
recorder.record(csv_name, clean_rows)
# ───────────────────────────────────────────────────────────────────
def get_extractor():
with open(abspath("parse_settings.json"), "r", encoding="utf-8") as fh:
return Extractor(load(fh))

Binary file not shown.

View File

@ -0,0 +1,24 @@
from categories import get_categories
from xlsx_recorder import Recorder
from requester import get_parser
from extractor import get_extractor
def main():
recorder = Recorder()
try:
extractor = get_extractor()
except:
raise Exception("Error: parse_settings")
try:
parser = get_parser()
except:
raise Exception("Error: request_settings")
categories = get_categories()
extractor.extract(parser, recorder, categories)
if __name__ == "__main__":
main()

View File

@ -0,0 +1,4 @@
{
"method": "zarahome",
"tags": []
}

Binary file not shown.

Binary file not shown.

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

Binary file not shown.

Binary file not shown.

Some files were not shown because too many files have changed in this diff Show More