Перенос Decathlon и доработка для локального API

This commit is contained in:
va1is 2025-10-09 20:09:29 +03:00
parent 63c4c9b14f
commit cd3f1796ba
14 changed files with 1081 additions and 1 deletions

Binary file not shown.

View File

@ -0,0 +1,17 @@
from openpyxl import load_workbook
from os.path import abspath
# получаем все ссылки из categories.xlsx
def get_categories():
wookbook = load_workbook(abspath("categories.xlsx"))
worksheet = wookbook.active
categories = []
for i in worksheet["A"]:
value = i.value
if value != None:
categories.append(value)
return categories

Binary file not shown.

719
Pars_Decathlon/extractor.py Normal file
View File

@ -0,0 +1,719 @@
from json import load, loads
from os.path import abspath
from bs4 import BeautifulSoup
from lxml import etree
from re import split
import re
import json
import os
def extract_components_zarahome(parts):
composition = []
for part in parts:
if part.get("areas") and part.get("description"):
if len(parts) != 1:
composition.append(part["description"])
for area in part["areas"]:
area_name = area["description"]
percentage_area = area["percentageArea"]
composition.append(f"{area_name} ({percentage_area})")
for component in area["components"]:
material = component["material"]
percentage = component["percentage"]
composition.append(f"{percentage} {material}")
elif part.get("components") and part.get("description"):
if len(parts) != 1:
composition.append(part["description"])
for component in part["components"]:
material = component["material"]
percentage = component["percentage"]
composition.append(f"{percentage} {material}")
return composition
# класс для извлечения нужных данных
class Extractor:
def __init__(self, json_data):
self.methods = {
"": (self.default_extract_method, []),
"zarahome": (self.zarahome_extract_method, [
"Краткое описание",
"Артикул",
"Название товара или услуги",
"Полное описание",
"Образец цвета",
"Свойство: Цвет",
"Свойство: Размер",
"Цена закупки",
"Свойство: Вес(г)",
"Изображения",
"Изображения варианта",
"Параметр: Состав",
"Параметр: Уход",
"Параметр: Происхождение",
"Размещение на сайте",
"Свойство: Бренд"
]),
"eobuwie": (self.eobuwie_extract_method, [
"Краткое описание",
"Артикул",
"Свойство: Размер",
"Полное описание(Таблица)",
"Название товара или услуги",
"Изображения",
"Размещение на сайте",
"Цена",
"Наличие"
]),
"decathlon": (self.decathlon_extract_method, [
"Краткое описание",
"Артикул",
"Название товара или услуги",
"Полное описание",
"Наличие",
"Свойство: Цвет",
"Свойство: Размер",
"Цена закупки",
"Параметр: Вес(г)",
"Изображения варианта",
"Размещение на сайте"
])
}
self.method = json_data["method"]
self.tags = json_data["tags"]
self.headers = self.methods[self.method][1]
for tag in self.tags:
self.headers.insert(tag["column_number"], tag["column_name"])
def extract(self, parser, recorder, categories):
self.methods[self.method][0](parser, recorder, categories)
def default_extract_method(self):
pass
def tags_extract(self, soup, row):
dom_tree = etree.HTML(str(soup))
for tag in self.tags:
xpath_result = dom_tree.xpath(tag["xpath"])
column_data = ""
if len(xpath_result):
for element in xpath_result:
column_data = ''.join(element.itertext()).strip() + "\n"
row.insert(tag["column_number"], column_data)
def decathlon_extract_method(self, parser, recorder, categories):
BASE_URL = "https://www.decathlon.pl"
for i, category in enumerate(categories):
csv_name = "_".join(category.split("/")[4:]).replace(":", "-").replace("?", "_").replace("=", "_")
filepath = os.path.join(recorder.record_folder, f"{csv_name}.xlsx")
# Проверяем наличие файла, и если он есть — пропускаем парсинг категории
if os.path.isfile(filepath):
print(f"Файл {csv_name}.xlsx уже существует. Пропускаем категорию: {category}")
continue
table = [self.headers]
print(f"Categories: {i + 1} / {len(categories)}", category)
continue_loop = True
category_from = 0
while continue_loop:
# Дальше без изменений твой текущий код
category_page = parser.parse(f"{category}?from={category_from}")
category_soup = BeautifulSoup(category_page, "html.parser")
dom_tree = etree.HTML(str(category_soup))
offers_count_element = dom_tree.xpath('//*[@id="start-of-listing"]/div[2]/div/span[1]')
if offers_count_element:
offers_count = int(offers_count_element[0].text.strip())
else:
print("Не найдено количество товаров")
offers_count = 0
break # если не нашли количество товаров, нет смысла продолжать
products_links = category_soup.select('a.dpb-product-link')
products_links_count = len(products_links)
for e, product_link in enumerate(products_links):
product_url = BASE_URL + product_link.get("href")
print(f"Products: {e + 1 + category_from} / {offers_count}", product_url)
product_page = parser.parse(product_url)
if product_page == None:
continue
soup = BeautifulSoup(product_page, "html.parser")
meta_script_tags = soup.select("[type=\"application/ld+json\"]")
if len(meta_script_tags) <= 1:
continue
meta_data = loads(meta_script_tags[1].text)
path_steps = []
for step in meta_data["itemListElement"]:
path_steps.append(step["item"]["name"])
product_path = "decathlon/" + "/".join(path_steps)
script_json = soup.select("#__dkt")[0].text
# Находим начало JSON
json_start_match = re.search(r'__DKT\s*=\s*({)', script_json)
if json_start_match:
# Индекс начала JSON-объекта
start = json_start_match.start(1)
# Считаем баланс фигурных скобок, чтобы извлечь точный JSON
bracket_count = 0
for i in range(start, len(script_json)):
if script_json[i] == '{':
bracket_count += 1
elif script_json[i] == '}':
bracket_count -= 1
if bracket_count == 0:
# JSON найден полностью
json_text = script_json[start:i+1]
break
else:
print("Ошибка: JSON не сбалансирован.")
continue
# Теперь парсим
try:
__dkt = json.loads(json_text)
except json.JSONDecodeError as e:
print(f"Ошибка парсинга JSON: {e}")
continue
else:
print("Ошибка: не найдено начало JSON (__DKT).")
continue
#__dkt = loads(script_json.text.replace("__DKT = ", ""))
#try:
# __dkt = json.loads(json_text)
#except json.JSONDecodeError as e:
# print(f"Ошибка парсинга JSON: {e}")
# continue
# Отладочный вывод структуры данных
#print(json.dumps(__dkt["_ctx"]["data"], indent=2)[:2000])
#if __dkt["_ctx"]["page"]["id"] != "product":
# continue
########было вот так models_data = __dkt["_ctx"]["data"][4]["data"]["models"]
# Новый правильный путь
model_info = __dkt["_ctx"]["data"][2]["data"]
if "models" not in model_info or not model_info["models"]:
print(f"Ошибка: нет 'models' для товара {product_url}")
continue
model = model_info["models"][0]
# print(json.dumps(model_info, indent=2)) #### Отадка посмотреть что в json
color = ""
colors = []
# проверь есть ли сейчас colors в новой структуре, возможно нужно будет адаптировать и это
if model.get("colors"):
for color_info in model["colors"]:
colors.append(color_info["label"])
color = " / ".join(colors)
images = []
if model.get("images"):
for image_info in model["images"]["product"]:
images.append(image_info["url"].replace("/250x250", ""))
image_lines = "\n".join(images)
product_name = model["webLabel"]
#product_description = soup.select("[id^=\"ProductFunctionalities\"]") ТАК БЫЛО
description_parts = []
description_set = set() # Для проверки уникальности
def add_unique_description(text):
text_cleaned = text.strip()
if text_cleaned and text_cleaned not in description_set:
description_parts.append(text_cleaned)
description_set.add(text_cleaned)
# 1. MarketplaceProductDescription
description_data = next((item for item in __dkt["_ctx"]["data"] if item["type"] == "MarketplaceProductDescription"), None)
if description_data and "data" in description_data and "description" in description_data["data"]:
add_unique_description(description_data["data"]["description"])
# 2. ProductConception
description_data = next((item for item in __dkt["_ctx"]["data"] if item["type"] == "ProductConception"), None)
if description_data and "data" in description_data and "conception" in description_data["data"]:
conception = description_data["data"]["conception"]
if isinstance(conception, list) and conception:
conception_text = conception[0].get("description", "")
add_unique_description(conception_text)
# 3. ProductFunctionalities
description_data = next((item for item in __dkt["_ctx"]["data"] if item["type"] == "ProductFunctionalities"), None)
if description_data and "data" in description_data and "functionalities" in description_data["data"]:
functionalities = description_data["data"]["functionalities"]
if isinstance(functionalities, list):
func_text = "\n".join(f"{func.get('title', '')}: {func.get('value', '')}" for func in functionalities)
add_unique_description(func_text)
# 4. MarketplaceProductTechnicalInformations
tech_info = next((item for item in __dkt["_ctx"]["data"] if item["type"] == "MarketplaceProductTechnicalInformations"), None)
if tech_info and "data" in tech_info and "information" in tech_info["data"]:
information = tech_info["data"]["information"]
if isinstance(information, list):
info_text = "\n".join(f"{info.get('key', '')}: {info.get('value', '')}" for info in information)
add_unique_description(info_text)
# 5. ProductGuarantee
# guarantee_data = next((item for item in __dkt["_ctx"]["data"] if item["type"] == "ProductGuarantee"), None)
# if guarantee_data and "data" in guarantee_data and "guarantee" in guarantee_data["data"]:
# guarantee_text = f"Gwarancja: {guarantee_data['data']['guarantee']} lat"
# add_unique_description(guarantee_text)
# 6. ProductBenefits
benefits_data = next((item for item in __dkt["_ctx"]["data"] if item["type"] == "ProductBenefits"), None)
if benefits_data and "data" in benefits_data and "benefits" in benefits_data["data"]:
benefits = benefits_data["data"]["benefits"]
if isinstance(benefits, list):
benefits_text = "\n".join(f"{benefit.get('label', '')}: {benefit.get('value', '')}" for benefit in benefits)
add_unique_description(benefits_text)
# Соединяем все уникальные описания через двойной перевод строки
product_description = "\n\n".join(description_parts)
# Проверочный вывод (первые 500 символов)
#print("Полное описание продукта:", product_description[:500])
# Дальше извлекаем данные о SKU и т.д., аналогично проверяя новые структуры
skus_data = model["skus"]
sku_ids = []
for sku in skus_data:
sku_ids.append(sku["skuId"])
sku_ids = ",".join(sku_ids)
stocks = parser.parse(f"https://www.decathlon.pl/pl/ajax/nfs/stocks/online?skuIds={sku_ids}", return_type="json")
for sku in skus_data:
try:
sku_id = sku["skuId"]
sku_id1 = model["modelId"]
stock = stocks[sku_id]["stockOnline"] if stocks.get(sku_id) else "unknown"
table_data = []
# Старый метод - не хорошо для существующих товаров на ОЗОН
#article = f'{sku_id1}-{sku_id}'
#article = sku_id1.split("-")[-1]
# Извлекаем article из product_url
article = ""
try:
base_part = product_url.split("?")[0] # убираем всё после ?
article_part = base_part.split("-")[-1]
article = f"{article_part}_Decathlon" # берём часть после последнего дефиса
except Exception:
article = ""
size = ""
if sku.get("size"):
size = sku["size"]
price = ""
if sku.get("price"):
price = sku["price"]
weight = ""
if sku.get("grossWeight"):
weight = float(sku["grossWeight"])
table_data.append([
product_url,
article,
product_name,
product_description,
stock,
color,
size,
price,
weight,
image_lines,
product_path
])
self.tags_extract(soup, table_data[-1])
table += table_data.copy()
except Exception as error:
print(f"Extractor Error: {error}")
if offers_count == products_links_count + category_from:
continue_loop = False
else:
category_from += products_links_count
csv_name = "_".join(category.split("/")[4:]).replace(":", "-").replace("?", "_").replace("=", "_")
recorder.record(csv_name, table)
def eobuwie_extract_method(self, parser, recorder, categories):
for i, category in enumerate(categories):
table = [self.headers]
print(f"Categories: {i + 1} / {len(categories)}", category)
category_page = 1
category_marka = category.split(":")[2].split("?")[0]
category_type = category.split("/")[4]
while True:
category_products_data = parser.parse(f"https://eobuwie.com.pl/t-api/rest/search/eobuwie/v5/search?channel=eobuwie&currency=PLN&locale=pl_PL&limit=100&page={category_page}&filters[marka][in][]={category_marka}&categories[]={category_type}&select[]=url_key&select[]=product_group_associated&select[]=images&select[]=final_price&select[]=footwear_size&select_locales[]=pl_PL", return_type="json")
total = category_products_data["total"]
products = category_products_data["products"]
for e, product in enumerate(products):
short_url = product["values"]["url_key"]["value"]["pl_PL"]
product_url = f"https://eobuwie.com.pl/p/{short_url}"
print(f"Products: {e + 1 + ((category_page - 1) * 100)} / {total}", product_url)
product_page = parser.parse(product_url)
if product_page == None:
continue
soup = BeautifulSoup(product_page, "html.parser")
links = soup.select(".breadcrumb-list .text-link")[2:]
product_location = "/".join(list(map(lambda x: x.text, links)))
product_group = ""
if product["values"].get("product_group_associated") and product["values"]["product_group_associated"].get("value"):
product_group = product["values"]["product_group_associated"]["value"]
product_name = soup.select("[data-test-id=\"product-name\"]")[0].text.strip()
product_name = split(r"\d", product_name)[0]
product_name = f"{product_name} - {product_group}"
images_list = []
if product["values"].get("images") and product["values"]["images"].get("value"):
for image in product["values"]["images"]["value"]:
if image.get("url"):
images_list.append(f'https://img.modivo.cloud/productcard({image["url"]},jpg)')
images_list = "\n".join(images_list)
for i, variant in enumerate(product["variants"].values()):
try:
table_data = []
size_url = variant["size"]
variant_url = f"{product_url}?size={size_url}"
article = variant["id"]
size_name = ""
if variant["values"].get("footwear_size"):
size_name = variant["values"]["footwear_size"]["value"]["label"]
description = ""
location = f"Каталог/Обувь и аксессуары/{product_location}"
availability = variant["stock_quantity"]
if variant["stock_quantity"]:
price = variant["offers"][0]["final_price"]["amount"]
else:
price = product["values"]["final_price"]["value"]["pl_PL"]["PLN"]["amount"]
table_data.append([
variant_url,
article,
size_name,
description,
product_name,
images_list,
location,
price,
availability
])
self.tags_extract(soup, table_data[-1])
table += table_data.copy()
except Exception as error:
print(f"Extractor Error: {error}")
if category_page * 100 >= total:
break
category_page += 1
csv_name = category.split("/")[-1].replace(":", "-").replace("?", "_").replace("=", "_")
recorder.record(csv_name, table)
def zarahome_extract_method(self, parser, recorder, categories):
for i, category in enumerate(categories):
table = [self.headers]
print(f"Categories: {i + 1} / {len(categories)}", category)
category_seo = parser.parse(f"{category}?itxSeo=true", return_type="json")
category_id = category_seo["categoryId"]
category_title = "/".join(category_seo["metaTitle"].split(" | ")[0].split(" - ")[::-1])
category_products_data = parser.parse(f"https://www.zarahome.com/itxrest/3/catalog/store/85009924/80290000/category/{category_id}/product?showProducts=true&languageId=-22&appId=1", return_type="json")
products = category_products_data["products"].values()
for e, product in enumerate(products):
if product.get("productUrlParam"):
continue
short_url = product.get("productUrl")
print(f"Products: {e + 1} / {len(products)}", f"https://www.zarahome.com/pl/{short_url}")
product_reference_id = product["detail"]["reference"].split("-")[0][1:9]
product_url = f"https://www.zarahome.com/itxrest/3/catalog/store/85009924/80290000/productsArray?languageId=-22&referenceIds={product_reference_id}&appId=1"
product_json = parser.parse(product_url, return_type="json")
if not product_json["products"][0].get("productUrl"):
continue
try:
table_data = []
category_path = f"Каталог/ZaraHome/{category_title}"
product_short_url = product_json["products"][0]["productUrl"]
url = f"https://www.zarahome.com/pl/{product_short_url}"
article = product_json["products"][0]["detail"]["displayReference"]
name = product_json["products"][0]["name"]
description = product_json["products"][0]["detail"]["longDescription"]
all_images = []
for location in product_json["products"][0]["detail"]["xmedia"]:
path = location["path"]
for media in location["xmediaItems"][0]["medias"]:
id_media = media["idMedia"]
all_images.append(f"https://static.zarahome.net/8/photos4{path}/{id_media}2.jpg")
all_images = "\n".join(all_images)
components = product_json["products"][0]["detail"]["compositionDetail"]
if components:
components = components["parts"]
else:
components = product_json["products"][0]["detail"]["colors"][0]["compositionDetail"]
if components:
components = components["parts"]
else:
components = {}
if components != {}:
composition = extract_components_zarahome(components)
composition = "\n".join(composition)
else:
composition = ""
care = []
for part in product_json["products"][0]["detail"]["care"]:
care_description = part["description"]
care.append(care_description)
care = "\n".join(care)
traceability = []
for part in product_json["products"][0]["detail"]["colors"][0]["traceability"].values():
if type(part) is dict and part.get("country") and part.get("name"):
traceability_name = part["name"]
traceability_country = "\n".join(part["country"])
traceability.append(f"{traceability_name}\n{traceability_country}")
traceability = "\n".join(traceability)
number = 0
for i, color in enumerate(product_json["products"][0]["detail"]["colors"]):
if color["image"] == None:
continue
color_code = color["id"]
current_images = []
for location in product_json["products"][0]["detail"]["xmedia"]:
if location["colorCode"] == color_code:
path = location["path"]
for media in location["xmediaItems"][0]["medias"]:
id_media = media["idMedia"]
current_images.append(
f"https://static.zarahome.net/8/photos4{path}/{id_media}2.jpg")
break
current_images = "\n".join(current_images)
color_url = color["image"]["url"]
color_image = f"https://static.zarahome.net/8/photos4{color_url}_3_1_5.jpg"
color_name = color["name"]
for e, size in enumerate(color["sizes"]):
if size["visibilityValue"] != "SHOW":
continue
article_number = "" if number == 0 else f"-{number}"
size_weight = size["weight"]
size_name = size["name"]
size_description = size["description"]
size_full_name = f"{size_description} ({size_name})" if size_description else size_name
size_buy_price = int(size["price"]) / 100
table_data.append([
url,
f"{article}{article_number}",
name,
description,
color_image,
color_name,
size_full_name,
size_buy_price,
size_weight,
all_images,
current_images,
composition,
care,
traceability,
category_path,
"ZARAHOME"
])
number += 1
table += table_data.copy()
except Exception as error:
print(f"Extractor Error: {error}")
csv_name = category.split("/")[-1]
recorder.record(csv_name, table)
def get_extractor():
with open(abspath("parse_settings.json"), "r", encoding="utf-8") as file:
return Extractor(load(file))

35
Pars_Decathlon/main.py Normal file
View File

@ -0,0 +1,35 @@
from categories import get_categories
from xlsx_recorder import Recorder
from parser import get_parser
from extractor import get_extractor
import re
def main():
# --- Настройки перед стартом ---
print("Создавать JSON-файлы при парсинге? (0 = нет, 1 = да)")
try_json = input("") or "1"
print("Отправлять JSON-файлы на API после создания? (0 = нет, 1 = да)")
try_send = input("") or "1"
try_json = try_json.strip() == "1"
try_send = try_send.strip() == "1"
recorder = Recorder(try_json=try_json, try_send=try_send)
try:
extractor = get_extractor()
except:
raise Exception("Error: parse_settings")
try:
parser = get_parser()
except:
raise Exception("Error: request_settings")
categories = get_categories()
extractor.extract(parser, recorder, categories)
if __name__ == "__main__":
main()

View File

@ -0,0 +1,11 @@
{
"method": "decathlon",
"tags": [
{
"column_name": "Параметр: Бренд",
"column_number": 12,
"xpath": "//a[contains(@class, \"brand\")]"
}
]
}

View File

@ -0,0 +1,59 @@
from json import load
from time import sleep
import cloudscraper
from os.path import abspath
# класс парсера с обходом защиты Cloudflare
class Parser:
def __init__(self, json_data):
self.proxies = {
"http": f'{json_data["proxy"]}',
"https": f'{json_data["proxy"]}'
} if json_data["proxy"] != "" else None
self.request_delay = json_data["request_delay"]
self.request_repeats = json_data["request_repeats"]
self.request_repeat_delay = json_data["request_repeat_delay"]
# Инициализация scraper с обходом защиты Cloudflare
self.scraper = cloudscraper.create_scraper()
if self.proxies:
self.scraper.proxies.update(self.proxies)
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64)',
'Accept-Language': 'pl-PL,pl;q=0.9,en-US;q=0.8,en;q=0.7',
'Accept': 'text/html,application/xhtml+xml',
'Referer': 'https://www.google.com/'
}
def parse(self, url, method="GET", return_type="text"):
sleep(self.request_delay)
for i in range(self.request_repeats):
try:
if method == "GET":
response = self.scraper.get(url, headers=self.headers)
else:
response = self.scraper.post(url, headers=self.headers)
except Exception as error:
print(f"Request Error: {error} - {url}")
continue
if response.status_code == 200:
if return_type == "text":
return response.text
else:
return response.json()
else:
print(f"bad response, status code -> {response.status_code} - {url}")
if response.status_code == 404:
break
sleep(self.request_repeat_delay)
return None
# получение объекта Парсера с настройками из request_settings.json
def get_parser():
with open(abspath("request_settings.json"), "r", encoding="utf-8") as file:
return Parser(load(file))

View File

@ -0,0 +1,10 @@
import cloudscraper
url = 'https://www.decathlon.pl/sporty/turystyka-trekking/namioty-kempingowe-rodzinne?from=0'
scraper = cloudscraper.create_scraper() # автоматически создаёт обход защиты
response = scraper.get(url)
print('Код ответа:', response.status_code)
print('Содержимое страницы:', response.text[:500])

69
Pars_Decathlon/parser.py Normal file
View File

@ -0,0 +1,69 @@
from json import load
from time import sleep
import cloudscraper
from os.path import abspath
# класс парсера с обходом защиты Cloudflare
class Parser:
def __init__(self, json_data):
self.proxies = {
"http": f'{json_data["proxy"]}',
"https": f'{json_data["proxy"]}'
} if json_data["proxy"] != "" else None
self.request_delay = json_data["request_delay"]
self.request_repeats = json_data["request_repeats"]
self.request_repeat_delay = json_data["request_repeat_delay"]
# Инициализация scraper с обходом защиты Cloudflare
self.scraper = cloudscraper.create_scraper()
if self.proxies:
self.scraper.proxies.update(self.proxies)
#self.headers = {
# 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64)',
# 'Accept-Language': 'pl-PL,pl;q=0.9,en-US;q=0.8,en;q=0.7',
# 'Accept': 'text/html,application/xhtml+xml',
# 'Referer': 'https://www.google.com/'
#}
self.headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "pl-PL,pl;q=0.9,en-US;q=0.8,en;q=0.7",
"Connection": "keep-alive",
"Referer": "https://www.google.com/",
"DNT": "1"
}
def parse(self, url, method="GET", return_type="text"):
sleep(self.request_delay)
for i in range(self.request_repeats):
try:
if method == "GET":
response = self.scraper.get(url, headers=self.headers)
else:
response = self.scraper.post(url, headers=self.headers)
except Exception as error:
print(f"Request Error: {error} - {url}")
continue
if response.status_code == 200:
if return_type == "text":
return response.text
else:
return response.json()
else:
print(f"bad response, status code -> {response.status_code} - {url}")
if response.status_code == 404:
break
sleep(self.request_repeat_delay)
return None
# получение объекта Парсера с настройками из request_settings.json
def get_parser():
with open(abspath("request_settings.json"), "r", encoding="utf-8") as file:
return Parser(load(file))

View File

@ -0,0 +1,6 @@
{
"proxy": "",
"request_delay": 2.5,
"request_repeats": 10,
"request_repeat_delay": 1
}

View File

@ -0,0 +1,6 @@
{
"default": 2,
"decathlon/Strona główna/Sporty/Turystyka, Trekking/Plecaki turystyczne i trekkingowe/Torby trekkingowe": 2,
"decathlon/Strona główna/Sporty/Turystyka, Trekking/Namioty dmuchane": 12,
"decathlon/Strona główna/Sporty/Kemping i piknik/Kubki i termosy": 0.5
}

View File

@ -0,0 +1,147 @@
from openpyxl import Workbook
from os.path import isdir, abspath, join
from os import mkdir
import json
import requests
import time
from datetime import datetime
# --- Загрузка словаря весов (weight_defaults.json) ---
try:
with open("weight_defaults.json", "r", encoding="utf-8") as f:
WEIGHT_DEFAULTS = json.load(f)
except Exception:
print("⚠️ weight_defaults.json не найден — используется default=2")
WEIGHT_DEFAULTS = {"default": 2}
def get_weight(record):
"""Возвращает корректный вес товара с подстановкой из weight_defaults.json"""
try:
weight = float(record.get("Параметр: Вес(г)", 0) or 0)
except (ValueError, TypeError):
weight = 0
# если вес <= 0, подставляем из словаря
if weight <= 0:
category_name = record.get("Размещение на сайте", "")
weight = float(WEIGHT_DEFAULTS.get(category_name, WEIGHT_DEFAULTS.get("default", 2)))
return weight
class Recorder:
def __init__(self, records_folder="records_folder", try_json=True, try_send=True):
# создаём папку при первом запуске
if not isdir(abspath(records_folder)):
mkdir(abspath(records_folder))
self.record_folder = abspath(records_folder)
# флаги
self.try_json = try_json
self.try_send = try_send
# настройки API
self.api_url = "http://172.25.4.101:3005/parser/data"
# файл лога
self.log_path = join(self.record_folder, "log.txt")
# --- простая функция логирования ---
def log(self, msg: str):
ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
line = f"[{ts}] {msg}"
print(line)
with open(self.log_path, "a", encoding="utf-8") as f:
f.write(line + "\n")
def record(self, csv_name, table_data):
# === 1. Сохраняем XLSX ===
workbook = Workbook()
worksheet = workbook.active
for row in table_data:
worksheet.append(row)
xlsx_path = join(self.record_folder, f"{csv_name}.xlsx")
workbook.save(xlsx_path)
self.log(f"✅ XLSX saved → {xlsx_path}")
# === 2. JSON ===
if not self.try_json:
return # пользователь выбрал не создавать JSON
headers = table_data[0]
json_items = []
filtered_out = 0
for row in table_data[1:]:
record = dict(zip(headers, row))
brand = record.get("Свойство: Бренд", "") or record.get("Бренд", "")
category = record.get("Размещение на сайте", "")
# --- безопасное определение наличия ---
in_stock_raw = record.get("Наличие", "")
in_stock = False
try:
val = float(in_stock_raw)
in_stock = val > 2
except (ValueError, TypeError):
in_stock = False
# --- проверка фильтров cost и weight ---
try:
cost = float(record.get("Цена закупки", 0) or 0)
except (ValueError, TypeError):
cost = 0
weight = get_weight(record)
if not (50 <= cost <= 1500 and weight <= 31):
filtered_out += 1
continue # не добавляем товар
json_items.append({
"category": {"name": category},
"brand": {"name": brand},
"variant": {
"status_id": 1,
"color": record.get("Свойство: Цвет", ""),
"sku": record.get("Артикул", ""),
"size": record.get("Свойство: Размер", ""),
"cost": cost,
"originalUrl": record.get("Краткое описание", ""),
"originalName": record.get("Название товара или услуги", ""),
"originalDescription": record.get("Полное описание", ""),
"originalComposition": record.get("Параметр: Состав", ""),
"images": (record.get("Изображения варианта", "") or record.get("Изображения", "")).split("\n"),
"inStock": in_stock,
"weight": weight,
}
})
json_data = {"parserName": "decathlon", "items": json_items}
json_path = join(self.record_folder, f"{csv_name}.json")
with open(json_path, "w", encoding="utf-8") as f:
json.dump(json_data, f, ensure_ascii=False, indent=2)
self.log(
f"✅ JSON saved → {json_path} | items: {len(json_items)} | filtered out: {filtered_out}"
)
# === 3. Отправка JSON ===
if self.try_send:
self.send_json_to_api(json_data, csv_name)
def send_json_to_api(self, json_data, csv_name):
total_items = len(json_data.get("items", []))
for attempt in range(1, 4):
try:
resp = requests.post(self.api_url, json=json_data, timeout=1)
if resp.status_code == 200:
self.log(f"✅ [{csv_name}] JSON sent to API successfully (attempt {attempt}) | items: {total_items}")
return
else:
self.log(f"⚠️ [{csv_name}] API response {resp.status_code}: {resp.text}")
except Exception as e:
self.log(f"❌ [{csv_name}] Error sending to API (attempt {attempt}): {e}")
time.sleep(5)
self.log(f"🚫 [{csv_name}] Failed to send JSON after 3 attempts.")

View File

@ -245,7 +245,7 @@ class Extractor:
for vprod in variants: for vprod in variants:
det = vprod["detail"] det = vprod["detail"]
sec, fam, subfam = (vprod.get("sectionNameEN") or "", sec, fam, subfam = (#vprod.get("sectionNameEN") or "",
vprod.get("familyName") or "", vprod.get("familyName") or "",
vprod.get("subFamilyName") or "") vprod.get("subFamilyName") or "")
cat_path = "Каталог/ZaraHome/" + "/".join(p for p in (sec, fam, subfam) if p) cat_path = "Каталог/ZaraHome/" + "/".join(p for p in (sec, fam, subfam) if p)
@ -260,6 +260,7 @@ class Extractor:
default_idx = det.get("xmediaDefaultSet") default_idx = det.get("xmediaDefaultSet")
if isinstance(raw_xmedia, list) and raw_xmedia: if isinstance(raw_xmedia, list) and raw_xmedia:
media_sets = [raw_xmedia[default_idx]] if isinstance(default_idx, int) else raw_xmedia media_sets = [raw_xmedia[default_idx]] if isinstance(default_idx, int) else raw_xmedia
#elif isinstance(raw_xmedia, dict): #elif isinstance(raw_xmedia, dict):
# media_sets = [raw_xmedia] # media_sets = [raw_xmedia]
#else: #else:

Binary file not shown.