MacOS_Parsers/Parsing ZARAHOME/src/xlsx_recorder.py

159 lines
7.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# xlsx_recorder.py · 2025-07-24
from openpyxl import Workbook
from os.path import isdir, abspath, join
from os import mkdir
import re, json, math, logging, requests, os, sys, pathlib
from pathlib import Path
log = logging.getLogger("recorder")
# ─────────────────────── интерактивные флаги ──────────────────────
def ask(flag: str, default: str = "1") -> bool:
"""Спрашивает 0/1; пустой ввод → default."""
try:
val = input(f"{flag} (1=yes, 0=no) [{default}]: ").strip() or default
except EOFError:
val = default
return val == "1"
SEND_JSON = ask("SEND_JSON") # отправка POST
SAVE_JSON = ask("SAVE_JSON") # сохранять копию JSON
# ─────────────────────── прочие настройки ─────────────────────────
POST_URL = "http://localhost:3005/parser/data"
MIN_PRICE = 40
MAX_PRICE = 1200
EXCLUSION_FILE = Path(__file__).with_name("exclusion_materials.txt") # в том же каталоге
# Только такие статусы допускаем в JSON/POST
ALLOWED_VIS = {"SHOW", "RUNNING_OUT"}
INVALID_FILE_CHARS = r'[<>:"/\\|*?]' # для имён файлов Windows
def sanitize_filename(name: str, repl: str = "_") -> str:
clean = re.sub(INVALID_FILE_CHARS, repl, name)
return clean.split("?", 1)[0].strip()
# ────────────────────────── Recorder ──────────────────────────────
class Recorder:
def __init__(self, records_folder="records_folder"):
rf_abs = abspath(records_folder)
if not isdir(rf_abs):
mkdir(rf_abs)
self.record_folder = rf_abs
self.forbidden = self._load_forbidden()
# ---------- загрузка словаря исключений ----------
def _load_forbidden(self):
path = pathlib.Path(EXCLUSION_FILE)
if not path.is_file():
log.warning("Exclusion file not found: %s (filter disabled)", path)
return set()
txt = path.read_text(encoding="utf-8")
# ищем элементы в кавычках или делим по запятой
tokens = re.findall(r'"([^"]+)"', txt) or [t.strip() for t in txt.split(",")]
cleaned = {t.lower() for t in tokens if t}
log.info("Loaded %s exclusion tokens", len(cleaned))
return cleaned
# ---------------- запись таблицы + JSON ----------------
def record(self, csv_name, table):
csv_name = sanitize_filename(csv_name)
# 1) сохраняем XLSX
wb = Workbook()
ws = wb.active
for row in table:
ws.append(row)
xlsx_path = join(self.record_folder, f"{csv_name}.xlsx")
wb.save(xlsx_path)
log.info("XLSX saved → %s", xlsx_path)
# индексы столбцов
headers = table[0]
idx = {h: i for i, h in enumerate(headers)}
items = []
for row in table[1:]:
# ----------- фильтр цены -----------
price_raw = row[idx["Цена закупки"]]
price_int = math.ceil(float(price_raw))
if not (MIN_PRICE <= price_int <= MAX_PRICE):
# вне ценового коридора — остаётся только в XLSX
continue
# ----------- фильтр статуса наличия --------
vis_val = row[idx["Наличие на сайте"]]
#Закомментить это если нужно выключить (начало)-------------------------------#
if vis_val not in ALLOWED_VIS: #
# любые статусы кроме SHOW/RUNNING_OUT — только в XLSX #
log.debug("Skip by availability: %s (%s)", row[idx["Артикул"]], vis_val) #
continue #
#Закомментить это если нужно выключить (конец)--------------------------------#
# ----------- фильтр по составу -----------
comp_txt = row[idx["Параметр: Состав"]].replace("\n", "<br/>")
comp_low = comp_txt.lower()
if any(tok in comp_low for tok in self.forbidden):
log.debug("Skip by exclusion token: %s", row[idx["Артикул"]])
continue
# ----------- формируем variant -----------
article = row[idx["Артикул"]]
partnumber = row[idx["PartNumber"]]
clr_name = row[idx["Свойство: Цвет"]].capitalize()
size_full = row[idx["Свойство: Размер"]].replace("\n", "<br/>")
vis = vis_val
weight_g = float(row[idx["Свойство: Вес(г)"]]) if row[idx["Свойство: Вес(г)"]] else 0.0
weight_kg = math.ceil(weight_g / 1000) if weight_g else 0
url_full = row[idx["Краткое описание"]]
name_orig = row[idx["Название товара или услуги"]].capitalize()
desc_orig = (
row[idx["Полное описание"]].replace("\n", "<br/>") + "<br/>" +
row[idx["Параметр: Уход"]].replace("\n", "<br/>") + "<br/>" +
row[idx["Параметр: Происхождение"]].replace("\n", "<br/>")
).strip("<br/>")
images = [img for img in row[idx["Изображения варианта"]].split("\n") if img]
cat_raw = row[idx["Размещение на сайте"]].replace("Каталог/ZaraHome/", "")
category_name = re.sub(r"[^\w/-]+|_+", "_", cat_raw)
variant = {
"status_id": 1,
"color": clr_name,
"sku": f"{article}-{partnumber}",
"size": size_full,
"cost": price_int,
"originalUrl": url_full,
"originalName": name_orig,
"originalDescription": desc_orig,
"originalComposition": comp_txt,
"images": images,
"inStock": vis in ALLOWED_VIS, # ← здесь привязали к ALLOWED_VIS
"weight": weight_kg
}
items.append({
"category": {"name": category_name},
"variant": variant,
"brand": {"name": "zara-home"}
})
payload = {"items": items, "parserName": "zara-home"}
# 3) сохраняем JSON
if SAVE_JSON:
json_path = join(self.record_folder, f"{csv_name}.json")
with open(json_path, "w", encoding="utf-8") as fh:
json.dump(payload, fh, ensure_ascii=False, indent=2)
log.info("JSON saved → %s", json_path)
# 4) отправляем POST
if SEND_JSON:
try:
resp = requests.post(POST_URL, json=payload, timeout=20)
resp.raise_for_status()
log.info("POST %s OK (%s items)", csv_name, len(items))
except Exception as err:
log.warning("POST %s FAILED: %s", csv_name, err)