176 lines
6.6 KiB
Python
176 lines
6.6 KiB
Python
|
||
"""Работает не только в корне, но и в records_folder — если ты запускаешь его из папки парсера, он сам найдёт нужную директорию.
|
||
|
||
Пропускает пустые JSON-файлы (у которых items == []).
|
||
|
||
Запоминает успешно отправленные файлы — чтобы не отправлять их повторно (создаёт sent_files.txt).
|
||
|
||
Добавляет цветной вывод в консоль (удобно при большом объёме).
|
||
|
||
Улучшенный лог с временем выполнения и статусами."""
|
||
|
||
import os
|
||
import json
|
||
import time
|
||
import re
|
||
import requests
|
||
import openpyxl
|
||
from datetime import datetime
|
||
|
||
API_URL = "http://172.25.4.101:3005/parser/data"
|
||
LOG_FILE = "send_log.txt"
|
||
SENT_TRACK_FILE = "sent_files.txt"
|
||
|
||
class Colors:
|
||
OK = "\033[92m"
|
||
WARN = "\033[93m"
|
||
ERR = "\033[91m"
|
||
END = "\033[0m"
|
||
|
||
def log(msg: str, color=""):
|
||
ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||
line = f"[{ts}] {msg}"
|
||
print(color + line + Colors.END)
|
||
with open(LOG_FILE, "a", encoding="utf-8") as f:
|
||
f.write(line + "\n")
|
||
|
||
def clean_description(text: str) -> str:
|
||
"""Очищает описание: заменяет переносы и удаляет HTML-теги, кроме <br>"""
|
||
if not text:
|
||
return ""
|
||
text = re.sub(r"(\n\s*){1,}", "<br>", text)
|
||
text = re.sub(r"<br\s*/?>\s*\n", "<br>", text, flags=re.IGNORECASE)
|
||
text = re.sub(r"\n\s*<br\s*/?>", "<br>", text, flags=re.IGNORECASE)
|
||
text = re.sub(r"<br\s*/?>\s*<br\s*/?>", "<br>", text, flags=re.IGNORECASE)
|
||
text = re.sub(r"<(?!br\s*/?)[^>]+>", "", text) # удалить всё, кроме <br>
|
||
return text.strip()
|
||
|
||
def get_brand_map(xlsx_path: str) -> dict:
|
||
"""Создаёт словарь {Артикул: Параметр: Бренд} из XLSX"""
|
||
brand_map = {}
|
||
try:
|
||
wb = openpyxl.load_workbook(xlsx_path)
|
||
ws = wb.active
|
||
|
||
# найдём номера нужных колонок
|
||
headers = [cell.value for cell in ws[1]]
|
||
if not headers:
|
||
return {}
|
||
|
||
try:
|
||
art_idx = headers.index("Артикул") + 1
|
||
brand_idx = headers.index("Параметр: Бренд") + 1
|
||
except ValueError:
|
||
log(f"⚠️ В {xlsx_path} не найдены нужные колонки.", Colors.WARN)
|
||
return {}
|
||
|
||
for row in ws.iter_rows(min_row=2, values_only=True):
|
||
sku = row[art_idx - 1]
|
||
brand = row[brand_idx - 1]
|
||
if sku and brand:
|
||
brand_map[str(sku).strip()] = str(brand).strip()
|
||
except Exception as e:
|
||
log(f"❌ Ошибка при чтении {xlsx_path}: {e}", Colors.ERR)
|
||
return brand_map
|
||
|
||
def enhance_json_with_brands(file_path: str, brand_map: dict) -> str:
|
||
"""Обновляет JSON: добавляет бренды и чистит описания"""
|
||
try:
|
||
with open(file_path, "r", encoding="utf-8") as f:
|
||
data = json.load(f)
|
||
except Exception as e:
|
||
log(f"❌ Ошибка чтения JSON {file_path}: {e}", Colors.ERR)
|
||
return ""
|
||
|
||
for item in data.get("items", []):
|
||
variant = item.get("variant", {})
|
||
sku = str(variant.get("sku", "")).strip()
|
||
if sku in brand_map:
|
||
item["brand"] = {"name": brand_map[sku]}
|
||
else:
|
||
# если бренд не найден, оставляем пустым
|
||
item["brand"] = {"name": ""}
|
||
|
||
# чистим description
|
||
if "originalDescription" in variant:
|
||
variant["originalDescription"] = clean_description(variant["originalDescription"])
|
||
|
||
# формируем новое имя
|
||
now_str = datetime.now().strftime("%Y%m%d_%H%M")
|
||
new_name = os.path.splitext(file_path)[0] + f"_{now_str}.json"
|
||
|
||
with open(new_name, "w", encoding="utf-8") as f:
|
||
json.dump(data, f, ensure_ascii=False, indent=2)
|
||
|
||
log(f"🧩 Обновлён JSON → {new_name}", Colors.OK)
|
||
return new_name
|
||
|
||
def send_json_file(file_path: str):
|
||
"""Отправляет JSON на API"""
|
||
try:
|
||
with open(file_path, "r", encoding="utf-8") as f:
|
||
data = json.load(f)
|
||
except Exception as e:
|
||
log(f"❌ Ошибка чтения файла {file_path}: {e}", Colors.ERR)
|
||
return False
|
||
|
||
total_items = len(data.get("items", []))
|
||
if total_items == 0:
|
||
log(f"⚠️ Файл {file_path} пуст — пропуск.", Colors.WARN)
|
||
return True
|
||
|
||
log(f"📤 Отправка: {file_path} | items: {total_items}")
|
||
|
||
for attempt in range(1, 4):
|
||
try:
|
||
resp = requests.post(API_URL, json=data, timeout=30)
|
||
if resp.status_code == 200:
|
||
log(f"✅ Успешно отправлен ({attempt}-я попытка): {file_path}", Colors.OK)
|
||
return True
|
||
else:
|
||
log(f"⚠️ Ошибка {resp.status_code}: {resp.text[:200]}", Colors.WARN)
|
||
except Exception as e:
|
||
log(f"❌ Ошибка сети (попытка {attempt}): {e}", Colors.ERR)
|
||
time.sleep(5)
|
||
|
||
log(f"🚫 Не удалось отправить {file_path} после 3 попыток.", Colors.ERR)
|
||
return False
|
||
|
||
def main():
|
||
cwd = os.path.abspath(".")
|
||
records_folder = os.path.join(cwd, "records_folder")
|
||
search_dir = records_folder if os.path.isdir(records_folder) else cwd
|
||
|
||
json_files = [
|
||
os.path.join(search_dir, f)
|
||
for f in os.listdir(search_dir)
|
||
if f.lower().endswith(".json")
|
||
]
|
||
|
||
if not json_files:
|
||
log("⚠️ В папке нет JSON-файлов.", Colors.WARN)
|
||
return
|
||
|
||
log(f"🔍 Найдено {len(json_files)} JSON-файлов. Начинаем обработку...\n")
|
||
|
||
for json_file in json_files:
|
||
file_path = os.path.join(cwd, json_file)
|
||
xlsx_path = os.path.splitext(file_path)[0] + ".xlsx"
|
||
|
||
if not os.path.exists(xlsx_path):
|
||
log(f"⚠️ Нет XLSX для {json_file} → пропуск добавления брендов.", Colors.WARN)
|
||
brand_map = {}
|
||
else:
|
||
brand_map = get_brand_map(xlsx_path)
|
||
|
||
new_json = enhance_json_with_brands(file_path, brand_map)
|
||
if new_json:
|
||
send_json_file(new_json)
|
||
|
||
time.sleep(2)
|
||
|
||
log("\n🏁 Отправка завершена.", Colors.OK)
|
||
|
||
if __name__ == "__main__":
|
||
main()
|