"""Работает не только в корне, но и в records_folder — если ты запускаешь его из папки парсера, он сам найдёт нужную директорию. Пропускает пустые JSON-файлы (у которых items == []). Запоминает успешно отправленные файлы — чтобы не отправлять их повторно (создаёт sent_files.txt). Добавляет цветной вывод в консоль (удобно при большом объёме). Улучшенный лог с временем выполнения и статусами.""" import os import json import time import re import requests import openpyxl from datetime import datetime API_URL = "http://172.25.4.101:3005/parser/data" LOG_FILE = "send_log.txt" SENT_TRACK_FILE = "sent_files.txt" class Colors: OK = "\033[92m" WARN = "\033[93m" ERR = "\033[91m" END = "\033[0m" def log(msg: str, color=""): ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S") line = f"[{ts}] {msg}" print(color + line + Colors.END) with open(LOG_FILE, "a", encoding="utf-8") as f: f.write(line + "\n") def clean_description(text: str) -> str: """Очищает описание: заменяет переносы и удаляет HTML-теги, кроме
""" if not text: return "" text = re.sub(r"(\n\s*){1,}", "
", text) text = re.sub(r"\s*\n", "
", text, flags=re.IGNORECASE) text = re.sub(r"\n\s*", "
", text, flags=re.IGNORECASE) text = re.sub(r"\s*", "
", text, flags=re.IGNORECASE) text = re.sub(r"<(?!br\s*/?)[^>]+>", "", text) # удалить всё, кроме
return text.strip() def get_brand_map(xlsx_path: str) -> dict: """Создаёт словарь {Артикул: Параметр: Бренд} из XLSX""" brand_map = {} try: wb = openpyxl.load_workbook(xlsx_path) ws = wb.active # найдём номера нужных колонок headers = [cell.value for cell in ws[1]] if not headers: return {} try: art_idx = headers.index("Артикул") + 1 brand_idx = headers.index("Параметр: Бренд") + 1 except ValueError: log(f"⚠️ В {xlsx_path} не найдены нужные колонки.", Colors.WARN) return {} for row in ws.iter_rows(min_row=2, values_only=True): sku = row[art_idx - 1] brand = row[brand_idx - 1] if sku and brand: brand_map[str(sku).strip()] = str(brand).strip() except Exception as e: log(f"❌ Ошибка при чтении {xlsx_path}: {e}", Colors.ERR) return brand_map def enhance_json_with_brands(file_path: str, brand_map: dict) -> str: """Обновляет JSON: добавляет бренды и чистит описания""" try: with open(file_path, "r", encoding="utf-8") as f: data = json.load(f) except Exception as e: log(f"❌ Ошибка чтения JSON {file_path}: {e}", Colors.ERR) return "" for item in data.get("items", []): variant = item.get("variant", {}) sku = str(variant.get("sku", "")).strip() if sku in brand_map: item["brand"] = {"name": brand_map[sku]} else: # если бренд не найден, оставляем пустым item["brand"] = {"name": ""} # чистим description if "originalDescription" in variant: variant["originalDescription"] = clean_description(variant["originalDescription"]) # формируем новое имя now_str = datetime.now().strftime("%Y%m%d_%H%M") new_name = os.path.splitext(file_path)[0] + f"_{now_str}.json" with open(new_name, "w", encoding="utf-8") as f: json.dump(data, f, ensure_ascii=False, indent=2) log(f"🧩 Обновлён JSON → {new_name}", Colors.OK) return new_name def send_json_file(file_path: str): """Отправляет JSON на API""" try: with open(file_path, "r", encoding="utf-8") as f: data = json.load(f) except Exception as e: log(f"❌ Ошибка чтения файла {file_path}: {e}", Colors.ERR) return False total_items = len(data.get("items", [])) if total_items == 0: log(f"⚠️ Файл {file_path} пуст — пропуск.", Colors.WARN) return True log(f"📤 Отправка: {file_path} | items: {total_items}") for attempt in range(1, 4): try: resp = requests.post(API_URL, json=data, timeout=30) if resp.status_code == 200: log(f"✅ Успешно отправлен ({attempt}-я попытка): {file_path}", Colors.OK) return True else: log(f"⚠️ Ошибка {resp.status_code}: {resp.text[:200]}", Colors.WARN) except Exception as e: log(f"❌ Ошибка сети (попытка {attempt}): {e}", Colors.ERR) time.sleep(5) log(f"🚫 Не удалось отправить {file_path} после 3 попыток.", Colors.ERR) return False def main(): cwd = os.path.abspath(".") records_folder = os.path.join(cwd, "records_folder") search_dir = records_folder if os.path.isdir(records_folder) else cwd json_files = [ os.path.join(search_dir, f) for f in os.listdir(search_dir) if f.lower().endswith(".json") ] if not json_files: log("⚠️ В папке нет JSON-файлов.", Colors.WARN) return log(f"🔍 Найдено {len(json_files)} JSON-файлов. Начинаем обработку...\n") for json_file in json_files: file_path = os.path.join(cwd, json_file) xlsx_path = os.path.splitext(file_path)[0] + ".xlsx" if not os.path.exists(xlsx_path): log(f"⚠️ Нет XLSX для {json_file} → пропуск добавления брендов.", Colors.WARN) brand_map = {} else: brand_map = get_brand_map(xlsx_path) new_json = enhance_json_with_brands(file_path, brand_map) if new_json: send_json_file(new_json) time.sleep(2) log("\n🏁 Отправка завершена.", Colors.OK) if __name__ == "__main__": main()