#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os, json, re, math, time, html, requests, datetime
from bs4 import BeautifulSoup
from openpyxl import Workbook
# ───────────────────────── ПУТИ / ФАЙЛЫ ───────────────────────────
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
RECORDS_DIR = os.path.join(BASE_DIR, "records_folder")
os.makedirs(RECORDS_DIR, exist_ok=True)
INPUT_FILE = os.path.join(BASE_DIR, "product_links.txt")
OUTPUT_FILE = os.path.join(RECORDS_DIR, "records.xlsx")
DICT_FILE = os.path.join(BASE_DIR, "dictionary_main.txt")
EXCL_FILE = os.path.join(BASE_DIR, "exclusion_materials.txt")
POST_LOG = os.path.join(RECORDS_DIR, "post_log.txt")
# ───────────────────────── НАСТРОЙКИ POST ─────────────────────────
'''
На старте спросим:
- сохранять ли JSON батчи на диск
- отправлять ли батчи на API
Ответ: 1 (да) / 0 (нет). Пустой ввод = 1.
'''
POST_URL = os.getenv("IKEA_POST_URL", "http://172.25.4.101:3005/parser/data")
POST_API_KEY = os.getenv("IKEA_POST_API_KEY", "")
POST_TIMEOUT = 20
BATCH_SIZE = 50
# ───────────────────────── НАСТРОЙКИ САЙТА ────────────────────────
HEADERS = {"User-Agent": "Mozilla/5.0"}
CSS_SELECTOR = ".pip-product__subgrid.product-pip.js-product-pip"
BLOCKS = [
"buyModule",
"productSummary",
"pipPricePackage",
"productInformationSection",
"keyFacts",
"stockcheckSection",
"availabilityGroup",
"productGallery",
]
'''
Whitelist колонок для Excel.
'''
KEEP_COLUMNS = [
"availabilityGroup.serverOnlineSellable",
"availabilityGroup.storeHeader",
"buyModule.onlineSellable",
"buyModule.productName",
"buyModule.productPrice",
"buyModule.productType",
"keyFacts.ariaLabels",
"keyFacts.gaLabel",
"keyFacts.keyFacts",
"keyFacts.keyFacts_formatted",
"pipPricePackage.measurementText",
"pipPricePackage.productDescription",
"productGallery.urls",
"productInformationSection.dimensionProps",
"productInformationSection.dimensionProps_formatted",
"productInformationSection.dimensionProps_formatted_html_translated",
"productInformationSection.productDetailsProps",
"productInformationSection.productDetailsProps_formatted",
"productInformationSection.productDetailsProps_formatted_html",
"productSummary.description",
"productSummary.visibleItemNo",
"stockcheckSection.packagingProps",
"stockcheckSection.typeName",
"total brutto",
"prductVariantColorMeasure",
"categoryBreadcrumb",
"originalName", # ### NEW: колонка для Excel
"url",
]
# ───────────────────────── УТИЛИТЫ I/O ────────────────────────────
def ask_bool(prompt: str, default: str = "1") -> bool:
'''
Спрашивает 1/0; пустой ввод → default.
'''
try:
val = input(f"{prompt} (1=yes, 0=no) [{default}]: ").strip() or default
except EOFError:
val = default
return val == "1"
def _post_log(msg: str):
'''Пишем строку в post_log.txt (молча игнорируем ошибки).'''
try:
with open(POST_LOG, "a", encoding="utf-8") as f:
f.write(msg.rstrip() + "\n")
except Exception:
pass
def _now_tag():
return datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
def _save_json_batch(payload: dict, batch_index: int):
fname = f"ikea_batch_{_now_tag()}_{batch_index:04d}.json"
fpath = os.path.join(RECORDS_DIR, fname)
with open(fpath, "w", encoding="utf-8") as fh:
json.dump(payload, fh, ensure_ascii=False, indent=2)
print(f"💾 JSON saved: {fname}")
return fpath
# ───────────────────────── СЛОВАРИ / ФИЛЬТРЫ ──────────────────────
def load_dictionary(path: str) -> dict:
'''
Читает словарь переводов:
"Wymiary" : "Размеры",
...
'''
if not os.path.isfile(path):
return {}
txt = open(path, "r", encoding="utf-8").read()
pairs = re.findall(r'"([^"]+)"\s*:\s*"([^"]+)"', txt)
return {k: v for k, v in pairs}
DICT = load_dictionary(DICT_FILE)
def translate_token(token: str) -> str:
return DICT.get(token, token)
def load_exclusions(path: str) -> set:
'''
Загружает токены исключений из exclusion_materials.txt:
- можно по одному на строку
- или через запятую
- регистр игнорируем
'''
if not os.path.isfile(path):
return set()
txt = open(path, "r", encoding="utf-8").read()
# сначала из кавычек, если есть:
quoted = re.findall(r'"([^"]+)"', txt, flags=re.S)
tokens = quoted if quoted else re.split(r"[,;\n\r]+", txt)
return {t.strip().lower() for t in tokens if t.strip()}
EXCLUSIONS = load_exclusions(EXCL_FILE)
def materials_from_details_json(details: dict) -> list[str]:
'''
Извлекаем ВСЕ строки из ключей "material" на любой глубине productDetailsProps.
Встречаются разные схемы, поэтому делаем обход рекурсивно.
'''
out = []
def walk(node):
if isinstance(node, dict):
for k, v in node.items():
if k == "material" and isinstance(v, str):
out.append(v)
else:
walk(v)
elif isinstance(node, list):
for x in node:
walk(x)
walk(details or {})
return out
def materials_match_exclusions(details: dict, exclusion_tokens: set) -> bool:
'''
True — если хоть один токен встречается в любом material (case-insensitive).
'''
if not exclusion_tokens:
return False
mats = materials_from_details_json(details)
joined = "\n".join(mats).lower()
return any(tok in joined for tok in exclusion_tokens)
# ───────────────────────── ФОРМАТТЕРЫ ─────────────────────────────
def _parse_json_value(val):
if isinstance(val, (dict, list)) or val is None:
return val
if isinstance(val, str):
s = val.strip()
if not s:
return val
try:
return json.loads(s)
except Exception:
return val
return val
def flatten_block(block_name, data):
if not isinstance(data, dict):
return {}
flat = {}
for k, v in data.items():
if block_name == "productGallery" and k == "mediaList":
if isinstance(v, list):
urls = []
for item in v:
content = item.get("content", {})
if isinstance(content, dict) and "url" in content:
urls.append(content["url"])
flat["productGallery.urls"] = "\n".join(urls)
return flat
key = f"{block_name}.{k}"
flat[key] = v
return flat
def format_keyfacts(raw_keyfacts):
if not isinstance(raw_keyfacts, list):
return ""
out = []
header_added = False
for el in raw_keyfacts:
lbl = (el or {}).get("label")
name = (el or {}).get("name", "Właściwości")
if not header_added:
out.append(name)
header_added = True
if lbl:
out.append(lbl)
return "\n".join(out)
def _fmt_float(x):
try:
return f"{float(x):.2f}".rstrip("0").rstrip(".")
except Exception:
return ""
def _collect_packaging_total_kg(packaging):
total = 0.0
if not isinstance(packaging, dict):
return total
content = (packaging.get("contentProps") or {}).get("packages") or []
for pkg in content:
qty = ((pkg.get("quantity") or {}).get("value")) or 1
ms = pkg.get("measurements") or []
for block in ms:
if not isinstance(block, list):
continue
weight_lbl = next((m for m in block if (m.get("type") == "weight" or m.get("label") == "Waga")), None)
if weight_lbl and isinstance(weight_lbl.get("value"), (int, float)):
total += float(weight_lbl["value"]) * (qty or 1)
return total
def format_dimensions(raw_dim_props, with_html=False, translated=False):
if not isinstance(raw_dim_props, dict):
return ""
lines = []
br = "
" if with_html else "\n"
title = translate_token("Wymiary") if translated else "Wymiary"
lines.append(f"{title}" if with_html else title)
for d in raw_dim_props.get("dimensions", []):
name = d.get("name", "")
meas = d.get("measure", "")
if not name and not meas:
continue
if translated:
name_t = translate_token(name)
line = f"{name_t}: {meas}".strip()
else:
line = f"{name}: {meas}".strip()
lines.append(line)
pack = (raw_dim_props.get("packaging") or {})
pack_title = translate_token("Opakowanie") if translated else "Opakowanie"
lines.append(br if with_html else "")
lines.append(f"{pack_title}" if with_html else pack_title)
content = (pack.get("contentProps") or {}).get("packages") or []
for pkg in content:
name = pkg.get("name") or ""
if name:
lines.append(name)
art = (pkg.get("articleNumber") or {}).get("value")
if art:
art_lbl = "Numer artykułu"
if translated:
art_lbl = translate_token(art_lbl)
lines.append(art_lbl)
lines.append(f"{art}")
ms = pkg.get("measurements") or []
for block in ms:
if not isinstance(block, list):
continue
for m in block:
lbl = m.get("label", "")
txt = m.get("text", "")
if translated:
lbl = translate_token(lbl) if lbl else lbl
if lbl or txt:
lines.append(f"{lbl}: {txt}".strip(": "))
q_val = ((pkg.get("quantity") or {}).get("value"))
if q_val:
q_lbl = "Paczka(i)"
if translated:
q_lbl = translate_token(q_lbl)
lines.append(f"{q_lbl}: {q_val}")
if with_html:
s = br.join([x for x in lines if x is not None])
s = re.sub(r"(" + re.escape(br) + r"){2,}", br*2, s)
s = s.strip(br)
# ### NEW: страховка — иногда первая "<" теряется в Excel-предпросмотре
if s.startswith("strong>"):
s = "<" + s
return s
return "\n".join([x for x in lines if x is not None]).strip()
def format_product_details(raw_details, add_summary_desc="", with_html=False, skip_assembly=True):
if not isinstance(raw_details, dict):
return add_summary_desc if with_html else add_summary_desc
br = "
" if with_html else "\n"
out = []
if add_summary_desc:
out.append(add_summary_desc)
out.append(br if with_html else "")
t1 = "Informacje o produkcie"
out.append(f"{t1}" if with_html else t1)
pd = (raw_details.get("productDescriptionProps") or {})
paragraphs = pd.get("paragraphs") or []
for p in paragraphs:
out.append(p)
dlabel = pd.get("designerLabel")
dname = pd.get("designerName")
if dlabel and dname:
out.append(dlabel)
out.append(dname)
if raw_details.get("productId"):
out.append("Numer artykułu")
out.append(raw_details["productId"])
acc = (raw_details.get("accordionObject") or {})
gk = ((acc.get("goodToKnow") or {}).get("contentProps") or {}).get("goodToKnow") or []
if gk:
out.append(br if with_html else "")
t2 = "Dobrze wiedzieć"
out.append(f"{t2}" if with_html else t2)
for item in gk:
txt = item.get("text")
if txt:
out.append(txt)
mac = (acc.get("materialsAndCare") or {}).get("contentProps") or {}
mats = mac.get("materials") or []
care = mac.get("careInstructions") or []
t3 = "Materiały i pielęgnacja"
if mats or care:
out.append(br if with_html else "")
out.append(f"{t3}" if with_html else t3)
if mats:
out.append("Materiały")
for m in mats:
ptype = m.get("productType", "")
for mat in (m.get("materials") or []):
material = mat.get("material", "")
if ptype:
out.append(ptype)
if material:
out.append(material)
if care:
detailsCareText = mac.get("detailsCareText", "Pielęgnacja")
out.append(detailsCareText)
for c in care:
ptype = c.get("productType", "")
texts = c.get("texts") or []
if ptype:
out.append(ptype)
for t in texts:
out.append(t)
safety = (raw_details.get("safetyAndCompliance") or {}).get("contentProps") or {}
sc = safety.get("safetyAndCompliance") or []
if sc:
out.append(br if with_html else "")
t4 = "Bezpieczeństwo i zgodność z przepisami"
out.append(f"{t4}" if with_html else t4)
for s in sc:
txt = s.get("text")
if txt:
out.append(txt)
'''
### Был блок сборки "Instrukcja montażu" — по вашему запросу отключён.
if not skip_assembly:
...
'''
if with_html:
s = br.join([x for x in out if x is not None])
s = re.sub(r"(" + re.escape(br) + r"){2,}", br*2, s)
return s.strip(br)
return "\n".join([x for x in out if x is not None]).strip()
def build_variant_color_measure(desc: str, type_name: str, measurement: str) -> str:
s = (desc or "")
t = (type_name or "").strip()
if t:
pattern = r"^\s*" + re.escape(t) + r"[\s,;:\-–—/]*"
s = re.sub(pattern, "", s, flags=re.IGNORECASE)
if not re.search(r"[0-9A-Za-zА-Яа-яЁёÀ-ž]", s or ""):
s = ""
s = s.strip()
meas = (measurement or "").strip()
if not s:
return meas if meas else ""
s = s[:1].upper() + s[1:]
return f"{s}, {meas}" if meas else s
# ───────────────────── СКРАПИНГ КАРТОЧКИ ──────────────────────────
def extract_data(url: str) -> dict:
'''
Возвращает плоский dict с полями KEEP_COLUMNS.
Форматтеры/подсчёты: keyFacts_formatted, dimensionProps_formatted,
dimensionProps_formatted_html_translated, productDetailsProps_formatted,
productDetailsProps_formatted_html, total brutto, prductVariantColorMeasure, categoryBreadcrumb.
'''
try:
resp = requests.get(url, headers=HEADERS, timeout=15)
resp.raise_for_status()
# 🔎 DEBUG: вывести в консоль базовую информацию об ответе
print("\n=== FETCH DEBUG ===")
print("URL: ", url)
print("Final URL: ", resp.url)
print("Status: ", resp.status_code)
print("ContentType:", resp.headers.get("Content-Type"))
print("Length: ", len(resp.text))
print("Snippet ↓↓↓")
print(resp.text[:1000]) # покажет первые 1000 символов HTML
soup = BeautifulSoup(resp.text, "html.parser")
target = soup.select_one(CSS_SELECTOR)
if not target:
return {"url": url, "error": "CSS selector not found"}
raw = target.get("data-hydration-props")
if not raw:
return {"url": url, "error": "data-hydration-props not found"}
decoded = html.unescape(raw)
full_json = json.loads(decoded)
result = {"url": url}
for block in BLOCKS:
result.update(flatten_block(block, full_json.get(block, {})))
kf_json = _parse_json_value(result.get("keyFacts.keyFacts"))
dim_json = _parse_json_value(result.get("productInformationSection.dimensionProps"))
det_json = _parse_json_value(result.get("productInformationSection.productDetailsProps"))
result["keyFacts.keyFacts_formatted"] = format_keyfacts(kf_json)
result["productInformationSection.dimensionProps_formatted"] = format_dimensions(dim_json, with_html=False, translated=False)
html_trans = format_dimensions(dim_json, with_html=True, translated=True)
# ### NEW: дополнительная страховка — если вдруг нет '<' в начале:
if isinstance(html_trans, str) and html_trans.startswith("strong>"):
html_trans = "<" + html_trans
result["productInformationSection.dimensionProps_formatted_html_translated"] = html_trans
total_kg = _collect_packaging_total_kg((dim_json or {}).get("packaging") or {})
result["total brutto"] = _fmt_float(total_kg)
summary_desc = result.get("productSummary.description", "") or ""
result["productInformationSection.productDetailsProps_formatted"] = format_product_details(det_json, add_summary_desc=summary_desc, with_html=False, skip_assembly=True)
result["productInformationSection.productDetailsProps_formatted_html"] = format_product_details(det_json, add_summary_desc=summary_desc, with_html=True, skip_assembly=True)
desc = result.get("pipPricePackage.productDescription", "") or ""
tname = result.get("stockcheckSection.typeName", "") or ""
meas = result.get("pipPricePackage.measurementText", "") or ""
result["prductVariantColorMeasure"] = build_variant_color_measure(desc, tname, meas)
# breadcrumb
breadcrumb = None
for tag in soup.find_all("script", attrs={"type": lambda t: t and "ld+json" in t}):
try:
data = json.loads(tag.string)
except Exception:
continue
if isinstance(data, list):
data = next((d for d in data if isinstance(d, dict) and d.get("@type") == "BreadcrumbList"), None)
if isinstance(data, dict) and data.get("@type") == "BreadcrumbList":
items = data.get("itemListElement", [])
names = [it.get("name", "") for it in items]
breadcrumb = "/".join(names)
break
if breadcrumb:
result["categoryBreadcrumb"] = breadcrumb
# применяем whitelist
filtered = {k: result.get(k) for k in KEEP_COLUMNS if k != "originalName"}
'''
### NEW: originalName = productName + " " + typeName (без двойных пробелов)
'''
pn = (result.get("buyModule.productName") or "").strip()
tn = (result.get("stockcheckSection.typeName") or "").strip()
if pn and tn:
orig_name = f"{pn} {tn}"
else:
orig_name = pn or tn
filtered["originalName"] = orig_name
return filtered
except Exception as e:
print(e)
return {"url": url, "error": str(e)}
# ───────────────────── ПОСТРОЕНИЕ ВАРИАНТА / POST ─────────────────
def _split_color_size(text: str):
if not text:
return "", ""
parts = [p.strip() for p in text.split(",", 1)]
if len(parts) == 2:
return parts[0], parts[1]
return "", parts[0]
def _ceil_price(v):
try:
return int(math.ceil(float(v)))
except Exception:
return None
def _ceil_int(v):
try:
return int(math.ceil(float(v)))
except Exception:
return None
def build_variant(row: dict) -> dict:
category_name = row.get("categoryBreadcrumb") or ""
brand_name = "ikea"
visible = row.get("productSummary.visibleItemNo") or ""
sku = visible.replace(" ", "")
csm = (row.get("prductVariantColorMeasure") or "").strip()
color, size = _split_color_size(csm)
if not color and not size:
size = (row.get("pipPricePackage.measurementText") or "").strip()
cost = _ceil_price(row.get("buyModule.productPrice"))
url = row.get("url") or ""
'''
### NEW: originalName берём из одноимённой колонки (а не только из productName)
'''
name = row.get("originalName") or row.get("buyModule.productName") or ""
desc_html = row.get("productInformationSection.productDetailsProps_formatted_html") or ""
'''
### NEW: originalComposition = HTML из dimensionProps_formatted_html_translated
'''
composition_html = row.get("productInformationSection.dimensionProps_formatted_html_translated") or ""
imgs = []
raw_imgs = row.get("productGallery.urls") or ""
if isinstance(raw_imgs, str):
imgs = [x for x in raw_imgs.split("\n") if x.strip()]
in_stock = bool(row.get("availabilityGroup.serverOnlineSellable"))
if not in_stock:
in_stock = bool(row.get("buyModule.onlineSellable"))
weight_kg = _ceil_int(row.get("total brutto"))
variant = {
"status_id": 1,
"color": color.capitalize() if color else "none",
"sku": sku,
"size": size,
"cost": cost,
"originalUrl": url,
"originalName": name, # ← ### NEW: в JSON сохраняем originalName
"originalDescription": desc_html,
"originalComposition": composition_html, # ← ### NEW
"images": imgs,
"inStock": in_stock,
"weight": weight_kg if weight_kg is not None else 0,
}
return {
"category": {"name": category_name},
#"category": {"name": "TEST/IKEA"},
"brand": {"name": "ikea"},
"variant": variant,
}
def post_payload(payload: dict) -> dict:
headers = {"Content-Type": "application/json"}
if POST_API_KEY:
headers["Authorization"] = f"Bearer {POST_API_KEY}"
body = json.dumps(payload, ensure_ascii=False)
_post_log(f"→ POST {POST_URL}\nHeaders: {headers}\nBody: {body}")
try:
r = requests.post(POST_URL, headers=headers, data=body.encode("utf-8"), timeout=POST_TIMEOUT)
text = r.text
_post_log(f"← {r.status_code}\n{text}\n{'-'*60}")
ok = 200 <= r.status_code < 300
return {"ok": ok, "status": r.status_code, "response": text}
except Exception as e:
_post_log(f"× ERROR: {e}\n{'-'*60}")
return {"ok": False, "status": None, "error": str(e)}
# ───────────────────────── СЕРДЦЕ СКРИПТА ─────────────────────────
def safe_cell(val):
if isinstance(val, (dict, list)):
return json.dumps(val, ensure_ascii=False)
return "" if val is None else val
def main():
SAVE_JSON = ask_bool("SAVE_JSON (сохранять JSON на диск?)", "1")
SEND_JSON = ask_bool("SEND_JSON (отправлять на API?)", "1")
# читаем ссылки
with open(INPUT_FILE, "r", encoding="utf-8") as f:
links = [line.strip() for line in f if line.strip()]
print(f"Всего ссылок: {len(links)}")
# готовим Excel
wb = Workbook()
ws = wb.active
ws.title = "IKEA Products"
ws.append(KEEP_COLUMNS)
# батч для JSON/API
batch_items = []
batch_index = 1
def flush_batch():
nonlocal batch_items, batch_index
if not batch_items:
return
payload = {"parserName": "ikea", "items": batch_items}
if SAVE_JSON:
_save_json_batch(payload, batch_index)
if SEND_JSON:
res = post_payload(payload)
ok = res.get("ok")
print(f"POST batch {batch_index}: {'OK' if ok else 'FAIL'} (status={res.get('status')})")
batch_index += 1
batch_items = []
for idx, link in enumerate(links, 1):
print(f"[{idx}/{len(links)}] {link}")
row = extract_data(link)
'''
### NEW: originalName уже сформирован в extract_data и попал в row
'''
# пишем в Excel ВСЁ (без фильтров)
ws.append([safe_cell(row.get(col, "")) for col in KEEP_COLUMNS])
# ФИЛЬТРЫ для JSON/API
try:
price = float(row.get("buyModule.productPrice") or 0)
except Exception:
price = 0.0
try:
total_kg = float(row.get("total brutto") or 0)
except Exception:
total_kg = 0.0
details_json = row.get("productInformationSection.productDetailsProps") or {}
# 1) фильтр цены
if not (20 <= price <= 1500):
pass
# 2) фильтр веса
elif total_kg > 30:
pass
# 3) фильтр материалов
elif materials_match_exclusions(details_json, EXCLUSIONS):
pass
else:
# прошёл фильтры → добавляем в батч
try:
item = build_variant(row)
batch_items.append(item)
except Exception as e:
_post_log(f"× build_variant error for {link}: {e}")
# авто-сейв Excel каждые 50 строк
if idx % 50 == 0:
wb.save(OUTPUT_FILE)
print(f"💾 autosave: {OUTPUT_FILE}")
# флаш батча при достижении лимита
if len(batch_items) >= BATCH_SIZE:
flush_batch()
# финал: дописать Excel и отправить/сохранить остаток батча
wb.save(OUTPUT_FILE)
print(f"\n✅ Excel готов: {OUTPUT_FILE}")
flush_batch()
print("🎯 Готово.")
if __name__ == "__main__":
main()