109 lines
4.0 KiB
Python
109 lines
4.0 KiB
Python
import pandas as pd
|
||
from pathlib import Path
|
||
from typing import List, Dict, Any
|
||
from models import Product, RowOut
|
||
import hashlib, json, datetime
|
||
import re
|
||
|
||
# ---- Price parsing helpers ----
|
||
_PLN_PRICE_RE = re.compile(
|
||
r'(?<!\d)(\d{1,3}(?:[ \u00A0]?\d{3})*(?:[.,]\d{2})?)(?:\s*(?:zł|PLN))',
|
||
re.IGNORECASE,
|
||
)
|
||
|
||
def parse_pln_price_to_float(price_text: str | None) -> float | None:
|
||
"""
|
||
Из строки вида '1 299,00 zł' / '1299 zł' / '1 299 zł' достаём float 1299.00.
|
||
Возвращает None, если распарсить не удалось.
|
||
"""
|
||
if not price_text:
|
||
return None
|
||
t = (
|
||
price_text.replace("\u00a0", " ") # NBSP
|
||
.replace("\u2009", " ") # thin space
|
||
.strip()
|
||
)
|
||
m = _PLN_PRICE_RE.search(t)
|
||
if not m:
|
||
return None
|
||
num = m.group(1)
|
||
num = num.replace(" ", "").replace("\u00a0", "").replace("\u2009", "")
|
||
num = num.replace(",", ".")
|
||
try:
|
||
return float(num)
|
||
except Exception:
|
||
return None
|
||
|
||
def _as_str(v):
|
||
return str(v) if v is not None else ""
|
||
|
||
def _key_from_fields(product_id: str | None, url: str | None) -> str:
|
||
base = f"{_as_str(product_id)}|{_as_str(url)}"
|
||
return hashlib.md5(base.encode("utf-8")).hexdigest()
|
||
|
||
def _key(p: Product) -> str:
|
||
return _key_from_fields(p.product_id, _as_str(p.url))
|
||
|
||
def build_rows(category_name: str, category_url: str, products: List[Product]) -> List[Dict[str, Any]]:
|
||
"""Построить список строк RowOut (dict) из продуктов."""
|
||
rows: List[Dict[str, Any]] = []
|
||
seen: set[str] = set()
|
||
for p in products:
|
||
k = _key(p)
|
||
if k in seen:
|
||
continue
|
||
seen.add(k)
|
||
rows.append(RowOut(
|
||
category_name=category_name,
|
||
category_url=category_url,
|
||
product_id=_as_str(p.product_id) or None,
|
||
url=_as_str(p.url) or None,
|
||
name=p.name,
|
||
price=p.price,
|
||
currency=p.currency,
|
||
color=p.color,
|
||
images_joined="\n".join(p.image_urls) if p.image_urls else None
|
||
).model_dump())
|
||
return rows
|
||
|
||
def write_outputs(category_name: str, category_url: str, products: List[Product], out_folder: str, excel_prefix: str, csv_also: bool, jsonl_also: bool):
|
||
"""Запись per‑category файлов (xlsx + опционально csv/jsonl). Возвращает (excel_path, nrows, rows)."""
|
||
Path(out_folder).mkdir(parents=True, exist_ok=True)
|
||
rows = build_rows(category_name, category_url, products)
|
||
|
||
ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
|
||
excel_path = Path(out_folder) / f"{excel_prefix}_{ts}.xlsx"
|
||
df = pd.DataFrame(rows)
|
||
with pd.ExcelWriter(excel_path, engine="openpyxl") as w:
|
||
df.to_excel(w, sheet_name="Products", index=False)
|
||
|
||
if csv_also:
|
||
df.to_csv(Path(out_folder) / f"{excel_prefix}_{ts}.csv", index=False)
|
||
|
||
if jsonl_also:
|
||
with open(Path(out_folder) / f"{excel_prefix}_{ts}.jsonl", "w", encoding="utf-8") as f:
|
||
for r in rows:
|
||
f.write(json.dumps(r, ensure_ascii=False) + "\n")
|
||
|
||
return str(excel_path), len(rows), rows
|
||
|
||
def write_master_excel(all_path: str, rows: List[Dict[str, Any]]):
|
||
"""Записать общий XLSX (один лист AllProducts). Перезаписывает файл целиком один раз в конце."""
|
||
Path(all_path).parent.mkdir(parents=True, exist_ok=True)
|
||
if not rows:
|
||
# ничего не писать — пусто
|
||
return str(all_path), 0
|
||
# дедуп на всякий случай (по product_id|url)
|
||
seen: set[str] = set()
|
||
deduped: List[Dict[str, Any]] = []
|
||
for r in rows:
|
||
k = _key_from_fields(r.get("product_id"), r.get("url"))
|
||
if k in seen:
|
||
continue
|
||
seen.add(k)
|
||
deduped.append(r)
|
||
df = pd.DataFrame(deduped)
|
||
with pd.ExcelWriter(all_path, engine="openpyxl") as w:
|
||
df.to_excel(w, sheet_name="AllProducts", index=False)
|
||
return str(all_path), len(deduped)
|