MacOS_Parsers/API-UI/main.py
2025-10-28 11:10:08 +03:00

543 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import sys, os, json, re
from datetime import datetime
from typing import Any, Dict, List
import requests
import openpyxl
from PyQt6 import QtWidgets
from PyQt6.QtWidgets import (
QApplication, QMainWindow, QFileDialog,
QVBoxLayout, QHBoxLayout, QWidget,
QPushButton, QLabel, QComboBox, QTextEdit,
QTableWidget, QTableWidgetItem, QHeaderView
)
# === ПУТИ ===
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
MAPPINGS_DIR = os.path.join(BASE_DIR, "mappings")
HISTORY_FILE = os.path.join(BASE_DIR, "history.json")
def ts() -> str:
return datetime.now().strftime("%H:%M:%S")
def split_list_cell(val: str) -> List[str]:
if val is None:
return []
s = str(val).strip()
if not s:
return []
parts = re.split(r"[\n,;|]+", s)
return [p.strip() for p in parts if p.strip()]
def cast_value(raw: Any, t: str):
if raw is None:
return None
s = str(raw).strip()
if t == "int":
try:
return int(float(s))
except:
return None
if t == "float":
try:
return float(str(s).replace(",", "."))
except:
return None
if t == "bool":
s_low = s.lower()
return s_low in ("1", "true", "yes", "y", "да", "истина")
if t == "list" or t == "list[string]":
return split_list_cell(s)
# string
return s
def resolve_endpoint(selected_base_url: str, mapping_endpoint: str) -> str:
"""
Правила:
- Если endpoint начинается с '/', приклеиваем к выбранному base_url.
- Если endpoint начинается с 'http', заменяем схему+хост на выбранный base_url (оставляем path).
- Иначе считаем это относительным путём от base_url.
"""
base = selected_base_url.rstrip("/")
ep = mapping_endpoint.strip()
if ep.startswith("/"):
return f"{base}{ep}"
if ep.startswith("http://") or ep.startswith("https://"):
# вытащим только путь
try:
from urllib.parse import urlparse
parsed = urlparse(ep)
path = parsed.path
if parsed.query:
path += f"?{parsed.query}"
return f"{base}{path}"
except:
return ep # fallback
return f"{base}/{ep.lstrip('/')}"
class ImportApp(QMainWindow):
def __init__(self):
super().__init__()
self.setWindowTitle("🧩 Import Client v0.3")
self.resize(1200, 800)
self.api_history = self.load_history()
self.current_mapping: Dict[str, Any] = {}
self.loaded_data: List[List[Any]] = [] # с заголовком
self.headers: List[str] = [] # заголовки из Excel
self.map_pairs: List[Dict[str, str]] = [] # [{"json_field": "...", "excel_col": "..."}]
# === UI ===
main_widget = QWidget()
main_layout = QVBoxLayout(main_widget)
self.setCentralWidget(main_widget)
# Верхняя панель: API URL
top_layout = QHBoxLayout()
self.api_label = QLabel("API URL:")
self.api_box = QComboBox()
self.api_box.setEditable(True)
self.api_box.addItems(self.api_history or ["http://localhost:3005"])
self.btn_save_url = QPushButton("💾 Сохранить")
self.btn_save_url.clicked.connect(self.save_current_url)
top_layout.addWidget(self.api_label)
top_layout.addWidget(self.api_box)
top_layout.addWidget(self.btn_save_url)
main_layout.addLayout(top_layout)
# Тип импорта
import_layout = QHBoxLayout()
self.import_label = QLabel("Тип импорта:")
self.import_type = QComboBox()
self.import_type.addItems(["brands", "categories", "products"])
self.import_type.currentIndexChanged.connect(self.load_mapping)
import_layout.addWidget(self.import_label)
import_layout.addWidget(self.import_type)
# Кнопки Excel
self.btn_load_excel = QPushButton("📂 Загрузить Excel-файл")
self.btn_load_excel.clicked.connect(self.load_excel)
import_layout.addWidget(self.btn_load_excel)
main_layout.addLayout(import_layout)
# Таблица предпросмотра Excel
self.table = QTableWidget()
main_layout.addWidget(self.table)
# Таблица сопоставлений
main_layout.addWidget(QLabel("Сопоставление полей: JSON ↔ Колонка Excel"))
self.map_table = QTableWidget()
self.map_table.setColumnCount(3)
self.map_table.setHorizontalHeaderLabels(["Поле JSON", "Тип", "Колонка Excel"])
self.map_table.horizontalHeader().setSectionResizeMode(0, QHeaderView.ResizeMode.Stretch)
self.map_table.horizontalHeader().setSectionResizeMode(1, QHeaderView.ResizeMode.ResizeToContents)
self.map_table.horizontalHeader().setSectionResizeMode(2, QHeaderView.ResizeMode.Stretch)
main_layout.addWidget(self.map_table)
# Кнопки действий
action_layout = QHBoxLayout()
self.btn_autofill = QPushButton("✨ Автосопоставление")
self.btn_autofill.clicked.connect(self.autofill_mapping)
self.btn_generate = QPushButton("🧱 Сформировать JSON")
self.btn_generate.clicked.connect(self.generate_json)
self.btn_send = QPushButton("📤 Отправить в API")
self.btn_send.clicked.connect(self.send_to_api)
action_layout.addWidget(self.btn_autofill)
action_layout.addWidget(self.btn_generate)
action_layout.addWidget(self.btn_send)
main_layout.addLayout(action_layout)
# Предпросмотр JSON
main_layout.addWidget(QLabel("Предпросмотр JSON:"))
self.json_preview = QTextEdit()
self.json_preview.setReadOnly(True)
main_layout.addWidget(self.json_preview)
# Лог
main_layout.addWidget(QLabel("Лог:"))
self.log_box = QTextEdit()
self.log_box.setReadOnly(True)
main_layout.addWidget(self.log_box)
# Загрузим первый шаблон
self.load_mapping()
# ---------- History ----------
def load_history(self):
if os.path.exists(HISTORY_FILE):
try:
with open(HISTORY_FILE, "r", encoding="utf-8") as f:
return json.load(f)
except:
return []
return []
def save_current_url(self):
url = self.api_box.currentText().strip()
if not url:
return
if url not in self.api_history:
self.api_history.insert(0, url)
with open(HISTORY_FILE, "w", encoding="utf-8") as f:
json.dump(self.api_history[:10], f, ensure_ascii=False, indent=2)
self.log(f"✅ Сохранен URL: {url}")
# ---------- Mapping ----------
def load_mapping(self):
import_type = self.import_type.currentText()
path = os.path.join(MAPPINGS_DIR, f"{import_type}.json")
if not os.path.exists(path):
self.current_mapping = {}
self.log(f"⚠️ Не найден шаблон: {path}")
return
try:
with open(path, "r", encoding="utf-8") as f:
self.current_mapping = json.load(f)
self.log(f"📘 Загружен шаблон: {import_type}")
self.populate_map_table()
except Exception as e:
self.current_mapping = {}
self.log(f"❌ Ошибка чтения шаблона: {e}")
def populate_map_table(self):
self.map_table.setRowCount(0)
self.map_pairs.clear()
fields = (self.current_mapping.get("fields") or {})
# Список кандидатов полей JSON для сопоставления
# Пропускаем поля с "fixed" (их не мапим) — они выставляются автоматически
candidates = []
for k, meta in fields.items():
if isinstance(meta, dict) and "fixed" in meta:
continue
candidates.append((k, meta))
self.map_table.setRowCount(len(candidates))
for i, (json_field, meta) in enumerate(candidates):
# колонка 0: имя поля json
self.map_table.setItem(i, 0, QTableWidgetItem(json_field))
# колонка 1: тип
t = meta["type"] if isinstance(meta, dict) and "type" in meta else "string"
self.map_table.setItem(i, 1, QTableWidgetItem(t))
# колонка 2: выпадающий список — заголовки Excel
combo = QComboBox()
combo.addItem("") # пусто = не сопоставлено
for h in self.headers:
combo.addItem(h)
self.map_table.setCellWidget(i, 2, combo)
def autofill_mapping(self):
"""Простое автосопоставление по подстроке (без учета регистра)."""
rows = self.map_table.rowCount()
for i in range(rows):
json_field = self.map_table.item(i, 0).text()
jf_key = json_field.split(".")[-1].lower()
best = ""
score_best = 0
for h in self.headers:
hl = h.lower()
score = 0
if hl == jf_key:
score = 3
elif jf_key in hl or hl in jf_key:
score = 2
elif hl.replace(" ", "") == jf_key.replace("_", ""):
score = 1
if score > score_best:
score_best = score
best = h
combo = self.map_table.cellWidget(i, 2)
if combo and best:
idx = combo.findText(best)
if idx >= 0:
combo.setCurrentIndex(idx)
self.log("✨ Выполнено автосопоставление.")
# ---------- Excel ----------
def load_excel(self):
path, _ = QFileDialog.getOpenFileName(self, "Выбери Excel-файл", "", "Excel Files (*.xlsx)")
if not path:
return
try:
wb = openpyxl.load_workbook(path, data_only=True)
sheet = wb.active
# === читаем ВСЕ строки для обработки ===
all_data = [list(r) for r in sheet.iter_rows(values_only=True)]
if not all_data:
self.log("⚠️ Файл пуст.")
return
self.headers = [str(h) if h else "" for h in all_data[0]]
self.loaded_data = all_data # ВСЁ содержимое для дальнейшей работы
# === только первые 50 строк показываем в таблице ===
preview_data = all_data[:51]
self.table.clear()
self.table.setRowCount(0)
self.table.setColumnCount(len(self.headers))
self.table.setHorizontalHeaderLabels(self.headers)
for r_idx, row in enumerate(preview_data[1:], start=0):
self.table.insertRow(r_idx)
for c_idx, value in enumerate(row):
self.table.setItem(r_idx, c_idx, QTableWidgetItem("" if value is None else str(value)))
self.log(f"📄 Загружен файл: {os.path.basename(path)} ({sheet.max_row} строк)")
self.populate_map_table()
except Exception as e:
self.log(f"❌ Ошибка при чтении файла: {e}")
def display_sheet(self, sheet):
self.table.clear()
self.table.setRowCount(0)
self.table.setColumnCount(0)
data = []
for row in sheet.iter_rows(values_only=True):
data.append(list(row))
if len(data) > 50: # покажем первые 50 строк
break
if not data:
return
self.headers = [str(h) if h else "" for h in data[0]]
self.loaded_data = data
self.table.setColumnCount(len(self.headers))
self.table.setHorizontalHeaderLabels(self.headers)
for r_idx, row in enumerate(data[1:], start=0):
self.table.insertRow(r_idx)
for c_idx, value in enumerate(row):
self.table.setItem(r_idx, c_idx, QTableWidgetItem("" if value is None else str(value)))
# ---------- Генерация JSON ----------
def collect_mapping(self) -> Dict[str, str]:
"""Собираем словарь {json_field -> excel_header} для выбранных пользователем маппингов."""
mapping = {}
rows = self.map_table.rowCount()
for i in range(rows):
json_field = self.map_table.item(i, 0).text()
combo = self.map_table.cellWidget(i, 2)
header = combo.currentText().strip() if combo else ""
if header:
mapping[json_field] = header
return mapping
def generate_json(self):
if not self.loaded_data:
self.log("⚠️ Сначала загрузите Excel-файл.")
return
if not self.current_mapping:
self.log("⚠️ Не загружен шаблон JSON.")
return
import_type = self.import_type.currentText()
fields_meta = self.current_mapping.get("fields") or {}
mapping = self.collect_mapping()
rows = self.loaded_data[1:] # без заголовка
result: Any
# ===== BRANDS =====
if import_type == "brands":
# ожидается список строк в ключе "brands"
json_key = "brands"
selected_col = mapping.get(json_key)
values = []
if selected_col:
col_idx = self.headers.index(selected_col)
seen = set()
for r in rows:
if col_idx < len(r) and r[col_idx]:
val = str(r[col_idx]).strip()
if val and val not in seen:
seen.add(val)
values.append(val)
result = {json_key: values}
# ===== CATEGORIES =====
elif import_type == "categories":
# ожидаем объекты {"originPath": "...", "path": "..."}
origin_col = mapping.get("originPath", "")
path_col = mapping.get("path", "")
result = []
seen_origins = set()
idx_origin = self.headers.index(origin_col) if origin_col in self.headers else -1
idx_path = self.headers.index(path_col) if path_col in self.headers else -1
for r in rows:
origin_val = (
str(r[idx_origin]).strip()
if idx_origin >= 0 and idx_origin < len(r) and r[idx_origin]
else ""
)
path_val = (
str(r[idx_path]).strip()
if idx_path >= 0 and idx_path < len(r) and r[idx_path]
else ""
)
# пропускаем полностью пустые строки
if not origin_val and not path_val:
continue
# если originPath уже был, пропускаем (оставляем только первое вхождение)
if origin_val in seen_origins:
continue
seen_origins.add(origin_val)
# нормализуем разделители (чисто косметически)
path_val = " > ".join([p.strip() for p in path_val.split(">")])
result.append({"originPath": origin_val, "path": path_val})
self.log(f"🧩 Найдено уникальных originPath: {len(result)}")
# ===== PRODUCTS =====
else:
# структура:
# {
# "parserName": fixed или из поля,
# "items": [ { category:{name}, brand:{name}, variant:{...} } ]
# }
parser_name = fields_meta.get("parserName", {}).get("fixed", "ikea")
result = {"parserName": parser_name, "items": []}
# подготовим индекс колонок
col_index = {h: i for i, h in enumerate(self.headers)}
for r in rows:
# helpers для получения значения по json-полю
def get_cell(json_field: str):
header = mapping.get(json_field)
if not header:
return None
idx = col_index.get(header, -1)
if idx < 0 or idx >= len(r):
return None
return r[idx]
# категори/бренд
cat_name = get_cell("category.name")
brand_name = get_cell("brand.name")
# variant поля с типами
v = {}
for jf, meta in fields_meta.items():
if jf in ("parserName", "category.name", "brand.name"):
continue
if isinstance(meta, dict) and "fixed" in meta:
v_key = jf.split("variant.", 1)[-1] if jf.startswith("variant.") else jf
v[v_key] = meta["fixed"]
continue
header = mapping.get(jf)
if not header:
# оставим default если указан
default = meta.get("default") if isinstance(meta, dict) else None
if default is not None and jf.startswith("variant."):
v_key = jf.split("variant.", 1)[-1]
v[v_key] = default
continue
raw = get_cell(jf)
t = meta.get("type", "string") if isinstance(meta, dict) else "string"
# особый случай: images -> список ссылок
if jf.endswith(".images"):
v["images"] = cast_value(raw, "list")
else:
val = cast_value(raw, t)
v_key = jf.split("variant.", 1)[-1] if jf.startswith("variant.") else jf
v[v_key] = val
item = {
"category": {"name": cast_value(cat_name, "string") or ""},
"brand": {"name": cast_value(brand_name, "string") or ""},
"variant": {
# значения по умолчанию, если не заданы маппингом/фиксами
"status_id": v.get("status_id", 1),
"color": v.get("color", ""),
"sku": v.get("sku", ""),
"size": v.get("size", ""),
"cost": v.get("cost", 0),
"originalUrl": v.get("originalUrl", ""),
"originalName": v.get("originalName", ""),
"originalDescription": v.get("originalDescription", ""),
"originalComposition": v.get("originalComposition", ""),
"images": v.get("images", []),
"inStock": v.get("inStock", True),
"weight": v.get("weight", 0)
}
}
# фильтр пустых строк: нужен хотя бы sku или originalUrl или название
if any([item["variant"]["sku"], item["variant"]["originalUrl"], item["variant"]["originalName"]]):
result["items"].append(item)
# Вывод
self.json_preview.setText(json.dumps(result, ensure_ascii=False, indent=2))
self.log(f"🧱 Сформирован JSON для: {self.import_type.currentText()}")
# ---------- Отправка ----------
def send_to_api(self):
if not self.current_mapping:
self.log("⚠️ Не загружен шаблон JSON.")
return
text = self.json_preview.toPlainText().strip()
if not text:
self.log("⚠️ Сначала сформируйте JSON.")
return
try:
payload = json.loads(text)
except Exception as e:
self.log(f"❌ Некорректный JSON: {e}")
return
base_url = self.api_box.currentText().strip() or "http://localhost:3005"
endpoint = self.current_mapping.get("endpoint", "/")
url = resolve_endpoint(base_url, endpoint)
try:
self.log(f"➡️ POST {url}")
r = requests.post(url, json=payload, headers={"Content-Type": "application/json"}, timeout=60)
# plain text ответ
content_type = r.headers.get("Content-Type", "")
body = r.text if "text" in content_type or "html" in content_type or content_type == "" else r.content[:1000]
self.log(f"⬅️ {r.status_code} {r.reason}\n{body}")
except Exception as e:
self.log(f"❌ Ошибка запроса: {e}")
# ---------- Лог ----------
def log(self, text):
self.log_box.append(f"{ts()} - {text}")
if __name__ == "__main__":
app = QApplication(sys.argv)
window = ImportApp()
window.show()
sys.exit(app.exec())