MacOS_Parsers/Парсер_IKEA/main.py

732 lines
26 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os, json, re, math, time, html, requests, datetime
from bs4 import BeautifulSoup
from openpyxl import Workbook
# ───────────────────────── ПУТИ / ФАЙЛЫ ───────────────────────────
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
RECORDS_DIR = os.path.join(BASE_DIR, "records_folder")
os.makedirs(RECORDS_DIR, exist_ok=True)
INPUT_FILE = os.path.join(BASE_DIR, "product_links.txt")
OUTPUT_FILE = os.path.join(RECORDS_DIR, "records.xlsx")
DICT_FILE = os.path.join(BASE_DIR, "dictionary_main.txt")
EXCL_FILE = os.path.join(BASE_DIR, "exclusion_materials.txt")
POST_LOG = os.path.join(RECORDS_DIR, "post_log.txt")
# ───────────────────────── НАСТРОЙКИ POST ─────────────────────────
'''
На старте спросим:
- сохранять ли JSON батчи на диск
- отправлять ли батчи на API
Ответ: 1 (да) / 0 (нет). Пустой ввод = 1.
'''
POST_URL = os.getenv("IKEA_POST_URL", "http://172.25.4.101:3005/parser/data")
POST_API_KEY = os.getenv("IKEA_POST_API_KEY", "")
POST_TIMEOUT = 20
BATCH_SIZE = 50
# ───────────────────────── НАСТРОЙКИ САЙТА ────────────────────────
HEADERS = {"User-Agent": "Mozilla/5.0"}
CSS_SELECTOR = ".pip-product__subgrid.product-pip.js-product-pip"
BLOCKS = [
"buyModule",
"productSummary",
"pipPricePackage",
"productInformationSection",
"keyFacts",
"stockcheckSection",
"availabilityGroup",
"productGallery",
]
'''
Whitelist колонок для Excel.
'''
KEEP_COLUMNS = [
"availabilityGroup.serverOnlineSellable",
"availabilityGroup.storeHeader",
"buyModule.onlineSellable",
"buyModule.productName",
"buyModule.productPrice",
"buyModule.productType",
"keyFacts.ariaLabels",
"keyFacts.gaLabel",
"keyFacts.keyFacts",
"keyFacts.keyFacts_formatted",
"pipPricePackage.measurementText",
"pipPricePackage.productDescription",
"productGallery.urls",
"productInformationSection.dimensionProps",
"productInformationSection.dimensionProps_formatted",
"productInformationSection.dimensionProps_formatted_html_translated",
"productInformationSection.productDetailsProps",
"productInformationSection.productDetailsProps_formatted",
"productInformationSection.productDetailsProps_formatted_html",
"productSummary.description",
"productSummary.visibleItemNo",
"stockcheckSection.packagingProps",
"stockcheckSection.typeName",
"total brutto",
"prductVariantColorMeasure",
"categoryBreadcrumb",
"originalName", # ### NEW: колонка для Excel
"url",
]
# ───────────────────────── УТИЛИТЫ I/O ────────────────────────────
def ask_bool(prompt: str, default: str = "1") -> bool:
'''
Спрашивает 1/0; пустой ввод → default.
'''
try:
val = input(f"{prompt} (1=yes, 0=no) [{default}]: ").strip() or default
except EOFError:
val = default
return val == "1"
def _post_log(msg: str):
'''Пишем строку в post_log.txt (молча игнорируем ошибки).'''
try:
with open(POST_LOG, "a", encoding="utf-8") as f:
f.write(msg.rstrip() + "\n")
except Exception:
pass
def _now_tag():
return datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
def _save_json_batch(payload: dict, batch_index: int):
fname = f"ikea_batch_{_now_tag()}_{batch_index:04d}.json"
fpath = os.path.join(RECORDS_DIR, fname)
with open(fpath, "w", encoding="utf-8") as fh:
json.dump(payload, fh, ensure_ascii=False, indent=2)
print(f"💾 JSON saved: {fname}")
return fpath
# ───────────────────────── СЛОВАРИ / ФИЛЬТРЫ ──────────────────────
def load_dictionary(path: str) -> dict:
'''
Читает словарь переводов:
"Wymiary" : "Размеры",
...
'''
if not os.path.isfile(path):
return {}
txt = open(path, "r", encoding="utf-8").read()
pairs = re.findall(r'"([^"]+)"\s*:\s*"([^"]+)"', txt)
return {k: v for k, v in pairs}
DICT = load_dictionary(DICT_FILE)
def translate_token(token: str) -> str:
return DICT.get(token, token)
def load_exclusions(path: str) -> set:
'''
Загружает токены исключений из exclusion_materials.txt:
- можно по одному на строку
- или через запятую
- регистр игнорируем
'''
if not os.path.isfile(path):
return set()
txt = open(path, "r", encoding="utf-8").read()
# сначала из кавычек, если есть:
quoted = re.findall(r'"([^"]+)"', txt, flags=re.S)
tokens = quoted if quoted else re.split(r"[,;\n\r]+", txt)
return {t.strip().lower() for t in tokens if t.strip()}
EXCLUSIONS = load_exclusions(EXCL_FILE)
def materials_from_details_json(details: dict) -> list[str]:
'''
Извлекаем ВСЕ строки из ключей "material" на любой глубине productDetailsProps.
Встречаются разные схемы, поэтому делаем обход рекурсивно.
'''
out = []
def walk(node):
if isinstance(node, dict):
for k, v in node.items():
if k == "material" and isinstance(v, str):
out.append(v)
else:
walk(v)
elif isinstance(node, list):
for x in node:
walk(x)
walk(details or {})
return out
def materials_match_exclusions(details: dict, exclusion_tokens: set) -> bool:
'''
True — если хоть один токен встречается в любом material (case-insensitive).
'''
if not exclusion_tokens:
return False
mats = materials_from_details_json(details)
joined = "\n".join(mats).lower()
return any(tok in joined for tok in exclusion_tokens)
# ───────────────────────── ФОРМАТТЕРЫ ─────────────────────────────
def _parse_json_value(val):
if isinstance(val, (dict, list)) or val is None:
return val
if isinstance(val, str):
s = val.strip()
if not s:
return val
try:
return json.loads(s)
except Exception:
return val
return val
def flatten_block(block_name, data):
if not isinstance(data, dict):
return {}
flat = {}
for k, v in data.items():
if block_name == "productGallery" and k == "mediaList":
if isinstance(v, list):
urls = []
for item in v:
content = item.get("content", {})
if isinstance(content, dict) and "url" in content:
urls.append(content["url"])
flat["productGallery.urls"] = "\n".join(urls)
return flat
key = f"{block_name}.{k}"
flat[key] = v
return flat
def format_keyfacts(raw_keyfacts):
if not isinstance(raw_keyfacts, list):
return ""
out = []
header_added = False
for el in raw_keyfacts:
lbl = (el or {}).get("label")
name = (el or {}).get("name", "Właściwości")
if not header_added:
out.append(name)
header_added = True
if lbl:
out.append(lbl)
return "\n".join(out)
def _fmt_float(x):
try:
return f"{float(x):.2f}".rstrip("0").rstrip(".")
except Exception:
return ""
def _collect_packaging_total_kg(packaging):
total = 0.0
if not isinstance(packaging, dict):
return total
content = (packaging.get("contentProps") or {}).get("packages") or []
for pkg in content:
qty = ((pkg.get("quantity") or {}).get("value")) or 1
ms = pkg.get("measurements") or []
for block in ms:
if not isinstance(block, list):
continue
weight_lbl = next((m for m in block if (m.get("type") == "weight" or m.get("label") == "Waga")), None)
if weight_lbl and isinstance(weight_lbl.get("value"), (int, float)):
total += float(weight_lbl["value"]) * (qty or 1)
return total
def format_dimensions(raw_dim_props, with_html=False, translated=False):
if not isinstance(raw_dim_props, dict):
return ""
lines = []
br = "<br/>" if with_html else "\n"
title = translate_token("Wymiary") if translated else "Wymiary"
lines.append(f"{title}" if with_html else title)
#lines.append(f"<strong>{title}</strong>" if with_html else title)
for d in raw_dim_props.get("dimensions", []):
name = d.get("name", "")
meas = d.get("measure", "")
if not name and not meas:
continue
if translated:
name_t = translate_token(name)
line = f"{name_t}: {meas}".strip()
else:
line = f"{name}: {meas}".strip()
lines.append(line)
pack = (raw_dim_props.get("packaging") or {})
pack_title = translate_token("Opakowanie") if translated else "Opakowanie"
lines.append(br if with_html else "")
lines.append(f"{pack_title}" if with_html else pack_title)
#lines.append(f"<strong>{pack_title}</strong>" if with_html else pack_title)
content = (pack.get("contentProps") or {}).get("packages") or []
for pkg in content:
name = pkg.get("name") or ""
if name:
lines.append(name)
art = (pkg.get("articleNumber") or {}).get("value")
if art:
art_lbl = "Numer artykułu"
if translated:
art_lbl = translate_token(art_lbl)
lines.append(art_lbl)
lines.append(f"{art}")
ms = pkg.get("measurements") or []
for block in ms:
if not isinstance(block, list):
continue
for m in block:
lbl = m.get("label", "")
txt = m.get("text", "")
if translated:
lbl = translate_token(lbl) if lbl else lbl
if lbl or txt:
lines.append(f"{lbl}: {txt}".strip(": "))
q_val = ((pkg.get("quantity") or {}).get("value"))
if q_val:
q_lbl = "Paczka(i)"
if translated:
q_lbl = translate_token(q_lbl)
lines.append(f"{q_lbl}: {q_val}")
if with_html:
s = br.join([x for x in lines if x is not None])
s = re.sub(r"(" + re.escape(br) + r"){2,}", br*2, s)
s = s.strip(br)
# ### NEW: страховка — иногда первая "<" теряется в Excel-предпросмотре
if s.startswith("strong>"):
s = "<" + s
return s
return "\n".join([x for x in lines if x is not None]).strip()
def format_product_details(raw_details, add_summary_desc="", with_html=False, skip_assembly=True):
if not isinstance(raw_details, dict):
return add_summary_desc if with_html else add_summary_desc
br = "<br/>" if with_html else "\n"
out = []
if add_summary_desc:
out.append(add_summary_desc)
out.append(br if with_html else "")
t1 = "Informacje o produkcie"
out.append(f"{t1}" if with_html else t1)
#out.append(f"<strong>{t1}</strong>" if with_html else t1)
pd = (raw_details.get("productDescriptionProps") or {})
paragraphs = pd.get("paragraphs") or []
for p in paragraphs:
out.append(p)
dlabel = pd.get("designerLabel")
dname = pd.get("designerName")
if dlabel and dname:
out.append(dlabel)
out.append(dname)
if raw_details.get("productId"):
out.append("Numer artykułu")
out.append(raw_details["productId"])
acc = (raw_details.get("accordionObject") or {})
gk = ((acc.get("goodToKnow") or {}).get("contentProps") or {}).get("goodToKnow") or []
if gk:
out.append(br if with_html else "")
t2 = "Dobrze wiedzieć"
out.append(f"{t2}" if with_html else t2)
#out.append(f"<strong>{t2}</strong>" if with_html else t2)
for item in gk:
txt = item.get("text")
if txt:
out.append(txt)
mac = (acc.get("materialsAndCare") or {}).get("contentProps") or {}
mats = mac.get("materials") or []
care = mac.get("careInstructions") or []
t3 = "Materiały i pielęgnacja"
if mats or care:
out.append(br if with_html else "")
out.append(f"{t3}" if with_html else t3)
#out.append(f"<strong>{t3}</strong>" if with_html else t3)
if mats:
out.append("Materiały")
for m in mats:
ptype = m.get("productType", "")
for mat in (m.get("materials") or []):
material = mat.get("material", "")
if ptype:
out.append(ptype)
if material:
out.append(material)
if care:
detailsCareText = mac.get("detailsCareText", "Pielęgnacja")
out.append(detailsCareText)
for c in care:
ptype = c.get("productType", "")
texts = c.get("texts") or []
if ptype:
out.append(ptype)
for t in texts:
out.append(t)
safety = (raw_details.get("safetyAndCompliance") or {}).get("contentProps") or {}
sc = safety.get("safetyAndCompliance") or []
if sc:
out.append(br if with_html else "")
t4 = "Bezpieczeństwo i zgodność z przepisami"
out.append(f"{t4}" if with_html else t4)
#out.append(f"<strong>{t4}</strong>" if with_html else t4)
for s in sc:
txt = s.get("text")
if txt:
out.append(txt)
'''
### Был блок сборки "Instrukcja montażu" — по вашему запросу отключён.
if not skip_assembly:
...
'''
if with_html:
s = br.join([x for x in out if x is not None])
s = re.sub(r"(" + re.escape(br) + r"){2,}", br*2, s)
return s.strip(br)
return "\n".join([x for x in out if x is not None]).strip()
def build_variant_color_measure(desc: str, type_name: str, measurement: str) -> str:
s = (desc or "")
t = (type_name or "").strip()
if t:
pattern = r"^\s*" + re.escape(t) + r"[\s,;:\-–—/]*"
s = re.sub(pattern, "", s, flags=re.IGNORECASE)
if not re.search(r"[0-9A-Za-zА-Яа-яЁёÀ-ž]", s or ""):
s = ""
s = s.strip()
meas = (measurement or "").strip()
if not s:
return meas if meas else ""
s = s[:1].upper() + s[1:]
return f"{s}, {meas}" if meas else s
# ───────────────────── СКРАПИНГ КАРТОЧКИ ──────────────────────────
def extract_data(url: str) -> dict:
'''
Возвращает плоский dict с полями KEEP_COLUMNS.
Форматтеры/подсчёты: keyFacts_formatted, dimensionProps_formatted,
dimensionProps_formatted_html_translated, productDetailsProps_formatted,
productDetailsProps_formatted_html, total brutto, prductVariantColorMeasure, categoryBreadcrumb.
'''
try:
resp = requests.get(url, headers=HEADERS, timeout=15)
resp.raise_for_status()
# 🔎 DEBUG: вывести в консоль базовую информацию об ответе
print("\n=== FETCH DEBUG ===")
print("URL: ", url)
print("Final URL: ", resp.url)
print("Status: ", resp.status_code)
print("ContentType:", resp.headers.get("Content-Type"))
print("Length: ", len(resp.text))
print("Snippet ↓↓↓")
print(resp.text[:1000]) # покажет первые 1000 символов HTML
soup = BeautifulSoup(resp.text, "html.parser")
target = soup.select_one(CSS_SELECTOR)
if not target:
return {"url": url, "error": "CSS selector not found"}
raw = target.get("data-hydration-props")
if not raw:
return {"url": url, "error": "data-hydration-props not found"}
decoded = html.unescape(raw)
full_json = json.loads(decoded)
result = {"url": url}
for block in BLOCKS:
result.update(flatten_block(block, full_json.get(block, {})))
kf_json = _parse_json_value(result.get("keyFacts.keyFacts"))
dim_json = _parse_json_value(result.get("productInformationSection.dimensionProps"))
det_json = _parse_json_value(result.get("productInformationSection.productDetailsProps"))
result["keyFacts.keyFacts_formatted"] = format_keyfacts(kf_json)
result["productInformationSection.dimensionProps_formatted"] = format_dimensions(dim_json, with_html=False, translated=False)
html_trans = format_dimensions(dim_json, with_html=True, translated=True)
# ### NEW: дополнительная страховка — если вдруг нет '<' в начале:
if isinstance(html_trans, str) and html_trans.startswith("strong>"):
html_trans = "<" + html_trans
result["productInformationSection.dimensionProps_formatted_html_translated"] = html_trans
total_kg = _collect_packaging_total_kg((dim_json or {}).get("packaging") or {})
result["total brutto"] = _fmt_float(total_kg)
summary_desc = result.get("productSummary.description", "") or ""
result["productInformationSection.productDetailsProps_formatted"] = format_product_details(det_json, add_summary_desc=summary_desc, with_html=False, skip_assembly=True)
result["productInformationSection.productDetailsProps_formatted_html"] = format_product_details(det_json, add_summary_desc=summary_desc, with_html=True, skip_assembly=True)
desc = result.get("pipPricePackage.productDescription", "") or ""
tname = result.get("stockcheckSection.typeName", "") or ""
meas = result.get("pipPricePackage.measurementText", "") or ""
result["prductVariantColorMeasure"] = build_variant_color_measure(desc, tname, meas)
# breadcrumb
breadcrumb = None
for tag in soup.find_all("script", attrs={"type": lambda t: t and "ld+json" in t}):
try:
data = json.loads(tag.string)
except Exception:
continue
if isinstance(data, list):
data = next((d for d in data if isinstance(d, dict) and d.get("@type") == "BreadcrumbList"), None)
if isinstance(data, dict) and data.get("@type") == "BreadcrumbList":
items = data.get("itemListElement", [])
names = [it.get("name", "") for it in items]
breadcrumb = "/".join(names)
break
if breadcrumb:
result["categoryBreadcrumb"] = breadcrumb
# применяем whitelist
filtered = {k: result.get(k) for k in KEEP_COLUMNS if k != "originalName"}
'''
### NEW: originalName = productName + " " + typeName (без двойных пробелов)
'''
pn = (result.get("buyModule.productName") or "").strip()
tn = (result.get("stockcheckSection.typeName") or "").strip()
if pn and tn:
orig_name = f"{pn} {tn}"
else:
orig_name = pn or tn
filtered["originalName"] = orig_name
return filtered
except Exception as e:
print(e)
return {"url": url, "error": str(e)}
# ───────────────────── ПОСТРОЕНИЕ ВАРИАНТА / POST ─────────────────
def _split_color_size(text: str):
if not text:
return "", ""
parts = [p.strip() for p in text.split(",", 1)]
if len(parts) == 2:
return parts[0], parts[1]
return "", parts[0]
def _ceil_price(v):
try:
return int(math.ceil(float(v)))
except Exception:
return None
def _ceil_int(v):
try:
return int(math.ceil(float(v)))
except Exception:
return None
def build_variant(row: dict) -> dict:
category_name = row.get("categoryBreadcrumb") or ""
brand_name = "ikea"
visible = row.get("productSummary.visibleItemNo") or ""
sku = visible.replace(" ", "")
csm = (row.get("prductVariantColorMeasure") or "").strip()
color, size = _split_color_size(csm)
if not color and not size:
size = (row.get("pipPricePackage.measurementText") or "").strip()
cost = _ceil_price(row.get("buyModule.productPrice"))
url = row.get("url") or ""
'''
### NEW: originalName берём из одноимённой колонки (а не только из productName)
'''
name = row.get("originalName") or row.get("buyModule.productName") or ""
desc_html = row.get("productInformationSection.productDetailsProps_formatted_html") or ""
'''
### NEW: originalComposition = HTML из dimensionProps_formatted_html_translated
'''
composition_html = row.get("productInformationSection.dimensionProps_formatted_html_translated") or ""
imgs = []
raw_imgs = row.get("productGallery.urls") or ""
if isinstance(raw_imgs, str):
imgs = [x for x in raw_imgs.split("\n") if x.strip()]
in_stock = bool(row.get("availabilityGroup.serverOnlineSellable"))
if not in_stock:
in_stock = bool(row.get("buyModule.onlineSellable"))
weight_kg = _ceil_int(row.get("total brutto"))
variant = {
"status_id": 1,
"color": color.capitalize() if color else "none",
"sku": sku,
"size": size,
"cost": cost,
"originalUrl": url,
"originalName": name, # ← ### NEW: в JSON сохраняем originalName
"originalDescription": desc_html,
"originalComposition": composition_html, # ← ### NEW
"images": imgs,
"inStock": in_stock,
"weight": weight_kg if weight_kg is not None else 0,
}
return {
"category": {"name": category_name},
#"category": {"name": "TEST/IKEA"},
"brand": {"name": "ikea"},
"variant": variant,
}
def post_payload(payload: dict) -> dict:
headers = {"Content-Type": "application/json"}
if POST_API_KEY:
headers["Authorization"] = f"Bearer {POST_API_KEY}"
body = json.dumps(payload, ensure_ascii=False)
_post_log(f"→ POST {POST_URL}\nHeaders: {headers}\nBody: {body}")
try:
r = requests.post(POST_URL, headers=headers, data=body.encode("utf-8"), timeout=POST_TIMEOUT)
text = r.text
_post_log(f"{r.status_code}\n{text}\n{'-'*60}")
ok = 200 <= r.status_code < 300
return {"ok": ok, "status": r.status_code, "response": text}
except Exception as e:
_post_log(f"× ERROR: {e}\n{'-'*60}")
return {"ok": False, "status": None, "error": str(e)}
# ───────────────────────── СЕРДЦЕ СКРИПТА ─────────────────────────
def safe_cell(val):
if isinstance(val, (dict, list)):
return json.dumps(val, ensure_ascii=False)
return "" if val is None else val
def main():
SAVE_JSON = ask_bool("SAVE_JSON (сохранять JSON на диск?)", "1")
SEND_JSON = ask_bool("SEND_JSON (отправлять на API?)", "1")
# читаем ссылки
with open(INPUT_FILE, "r", encoding="utf-8") as f:
links = [line.strip() for line in f if line.strip()]
print(f"Всего ссылок: {len(links)}")
# готовим Excel
wb = Workbook()
ws = wb.active
ws.title = "IKEA Products"
ws.append(KEEP_COLUMNS)
# батч для JSON/API
batch_items = []
batch_index = 1
def flush_batch():
nonlocal batch_items, batch_index
if not batch_items:
return
payload = {"parserName": "ikea", "items": batch_items}
if SAVE_JSON:
_save_json_batch(payload, batch_index)
if SEND_JSON:
res = post_payload(payload)
ok = res.get("ok")
print(f"POST batch {batch_index}: {'OK' if ok else 'FAIL'} (status={res.get('status')})")
batch_index += 1
batch_items = []
for idx, link in enumerate(links, 1):
print(f"[{idx}/{len(links)}] {link}")
row = extract_data(link)
'''
### NEW: originalName уже сформирован в extract_data и попал в row
'''
# пишем в Excel ВСЁ (без фильтров)
ws.append([safe_cell(row.get(col, "")) for col in KEEP_COLUMNS])
# ФИЛЬТРЫ для JSON/API
try:
price = float(row.get("buyModule.productPrice") or 0)
except Exception:
price = 0.0
try:
total_kg = float(row.get("total brutto") or 0)
except Exception:
total_kg = 0.0
details_json = row.get("productInformationSection.productDetailsProps") or {}
# 1) фильтр цены
if not (20 <= price <= 1500):
pass
# 2) фильтр веса
elif total_kg > 30:
pass
# 3) фильтр материалов
elif materials_match_exclusions(details_json, EXCLUSIONS):
pass
else:
# прошёл фильтры → добавляем в батч
try:
item = build_variant(row)
batch_items.append(item)
except Exception as e:
_post_log(f"× build_variant error for {link}: {e}")
# авто-сейв Excel каждые 50 строк
if idx % 50 == 0:
wb.save(OUTPUT_FILE)
print(f"💾 autosave: {OUTPUT_FILE}")
# флаш батча при достижении лимита
if len(batch_items) >= BATCH_SIZE:
flush_batch()
# финал: дописать Excel и отправить/сохранить остаток батча
wb.save(OUTPUT_FILE)
print(f"\n✅ Excel готов: {OUTPUT_FILE}")
flush_batch()
print("🎯 Готово.")
if __name__ == "__main__":
main()