"""Работает не только в корне, но и в records_folder — если ты запускаешь его из папки парсера, он сам найдёт нужную директорию.
Пропускает пустые JSON-файлы (у которых items == []).
Запоминает успешно отправленные файлы — чтобы не отправлять их повторно (создаёт sent_files.txt).
Добавляет цветной вывод в консоль (удобно при большом объёме).
Улучшенный лог с временем выполнения и статусами."""
import os
import json
import time
import re
import requests
import openpyxl
from datetime import datetime
API_URL = "http://172.25.4.101:3005/parser/data"
LOG_FILE = "send_log.txt"
SENT_TRACK_FILE = "sent_files.txt"
class Colors:
OK = "\033[92m"
WARN = "\033[93m"
ERR = "\033[91m"
END = "\033[0m"
def log(msg: str, color=""):
ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
line = f"[{ts}] {msg}"
print(color + line + Colors.END)
with open(LOG_FILE, "a", encoding="utf-8") as f:
f.write(line + "\n")
def clean_description(text: str) -> str:
"""Очищает описание: заменяет переносы и удаляет HTML-теги, кроме
"""
if not text:
return ""
text = re.sub(r"(\n\s*){1,}", "
", text)
text = re.sub(r"
\s*\n", "
", text, flags=re.IGNORECASE)
text = re.sub(r"\n\s*
", "
", text, flags=re.IGNORECASE)
text = re.sub(r"
\s*
", "
", text, flags=re.IGNORECASE)
text = re.sub(r"<(?!br\s*/?)[^>]+>", "", text) # удалить всё, кроме
return text.strip()
def get_brand_map(xlsx_path: str) -> dict:
"""Создаёт словарь {Артикул: Параметр: Бренд} из XLSX"""
brand_map = {}
try:
wb = openpyxl.load_workbook(xlsx_path)
ws = wb.active
# найдём номера нужных колонок
headers = [cell.value for cell in ws[1]]
if not headers:
return {}
try:
art_idx = headers.index("Артикул") + 1
brand_idx = headers.index("Параметр: Бренд") + 1
except ValueError:
log(f"⚠️ В {xlsx_path} не найдены нужные колонки.", Colors.WARN)
return {}
for row in ws.iter_rows(min_row=2, values_only=True):
sku = row[art_idx - 1]
brand = row[brand_idx - 1]
if sku and brand:
brand_map[str(sku).strip()] = str(brand).strip()
except Exception as e:
log(f"❌ Ошибка при чтении {xlsx_path}: {e}", Colors.ERR)
return brand_map
def enhance_json_with_brands(file_path: str, brand_map: dict) -> str:
"""Обновляет JSON: добавляет бренды и чистит описания"""
try:
with open(file_path, "r", encoding="utf-8") as f:
data = json.load(f)
except Exception as e:
log(f"❌ Ошибка чтения JSON {file_path}: {e}", Colors.ERR)
return ""
for item in data.get("items", []):
variant = item.get("variant", {})
sku = str(variant.get("sku", "")).strip()
if sku in brand_map:
item["brand"] = {"name": brand_map[sku]}
else:
# если бренд не найден, оставляем пустым
item["brand"] = {"name": ""}
# чистим description
if "originalDescription" in variant:
variant["originalDescription"] = clean_description(variant["originalDescription"])
# формируем новое имя
now_str = datetime.now().strftime("%Y%m%d_%H%M")
new_name = os.path.splitext(file_path)[0] + f"_{now_str}.json"
with open(new_name, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
log(f"🧩 Обновлён JSON → {new_name}", Colors.OK)
return new_name
def send_json_file(file_path: str):
"""Отправляет JSON на API"""
try:
with open(file_path, "r", encoding="utf-8") as f:
data = json.load(f)
except Exception as e:
log(f"❌ Ошибка чтения файла {file_path}: {e}", Colors.ERR)
return False
total_items = len(data.get("items", []))
if total_items == 0:
log(f"⚠️ Файл {file_path} пуст — пропуск.", Colors.WARN)
return True
log(f"📤 Отправка: {file_path} | items: {total_items}")
for attempt in range(1, 4):
try:
resp = requests.post(API_URL, json=data, timeout=30)
if resp.status_code == 200:
log(f"✅ Успешно отправлен ({attempt}-я попытка): {file_path}", Colors.OK)
return True
else:
log(f"⚠️ Ошибка {resp.status_code}: {resp.text[:200]}", Colors.WARN)
except Exception as e:
log(f"❌ Ошибка сети (попытка {attempt}): {e}", Colors.ERR)
time.sleep(5)
log(f"🚫 Не удалось отправить {file_path} после 3 попыток.", Colors.ERR)
return False
def main():
cwd = os.path.abspath(".")
records_folder = os.path.join(cwd, "records_folder")
search_dir = records_folder if os.path.isdir(records_folder) else cwd
json_files = [
os.path.join(search_dir, f)
for f in os.listdir(search_dir)
if f.lower().endswith(".json")
]
if not json_files:
log("⚠️ В папке нет JSON-файлов.", Colors.WARN)
return
log(f"🔍 Найдено {len(json_files)} JSON-файлов. Начинаем обработку...\n")
for json_file in json_files:
file_path = os.path.join(cwd, json_file)
xlsx_path = os.path.splitext(file_path)[0] + ".xlsx"
if not os.path.exists(xlsx_path):
log(f"⚠️ Нет XLSX для {json_file} → пропуск добавления брендов.", Colors.WARN)
brand_map = {}
else:
brand_map = get_brand_map(xlsx_path)
new_json = enhance_json_with_brands(file_path, brand_map)
if new_json:
send_json_file(new_json)
time.sleep(2)
log("\n🏁 Отправка завершена.", Colors.OK)
if __name__ == "__main__":
main()