MacOS_Parsers/Pars_Decathlon/extractor.py
2025-10-16 10:32:49 +03:00

725 lines
31 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from json import load, loads
from os.path import abspath
from bs4 import BeautifulSoup
from lxml import etree
from re import split
import re
import json
import os
def extract_components_zarahome(parts):
composition = []
for part in parts:
if part.get("areas") and part.get("description"):
if len(parts) != 1:
composition.append(part["description"])
for area in part["areas"]:
area_name = area["description"]
percentage_area = area["percentageArea"]
composition.append(f"{area_name} ({percentage_area})")
for component in area["components"]:
material = component["material"]
percentage = component["percentage"]
composition.append(f"{percentage} {material}")
elif part.get("components") and part.get("description"):
if len(parts) != 1:
composition.append(part["description"])
for component in part["components"]:
material = component["material"]
percentage = component["percentage"]
composition.append(f"{percentage} {material}")
return composition
# класс для извлечения нужных данных
class Extractor:
def __init__(self, json_data):
self.methods = {
"": (self.default_extract_method, []),
"zarahome": (self.zarahome_extract_method, [
"Краткое описание",
"Артикул",
"Название товара или услуги",
"Полное описание",
"Образец цвета",
"Свойство: Цвет",
"Свойство: Размер",
"Цена закупки",
"Свойство: Вес(г)",
"Изображения",
"Изображения варианта",
"Параметр: Состав",
"Параметр: Уход",
"Параметр: Происхождение",
"Размещение на сайте",
"Свойство: Бренд"
]),
"eobuwie": (self.eobuwie_extract_method, [
"Краткое описание",
"Артикул",
"Свойство: Размер",
"Полное описание(Таблица)",
"Название товара или услуги",
"Изображения",
"Размещение на сайте",
"Цена",
"Наличие"
]),
"decathlon": (self.decathlon_extract_method, [
"Краткое описание",
"Артикул",
"Название товара или услуги",
"Полное описание",
"Наличие",
"Свойство: Цвет",
"Свойство: Размер",
"Цена закупки",
"Параметр: Вес(г)",
"Изображения варианта",
"Размещение на сайте"
])
}
self.method = json_data["method"]
self.tags = json_data["tags"]
self.headers = self.methods[self.method][1]
for tag in self.tags:
self.headers.insert(tag["column_number"], tag["column_name"])
def extract(self, parser, recorder, categories):
self.methods[self.method][0](parser, recorder, categories)
def default_extract_method(self):
pass
def tags_extract(self, soup, row):
dom_tree = etree.HTML(str(soup))
for tag in self.tags:
xpath_result = dom_tree.xpath(tag["xpath"])
column_data = ""
if len(xpath_result):
for element in xpath_result:
column_data = ''.join(element.itertext()).strip() + "\n"
row.insert(tag["column_number"], column_data)
def decathlon_extract_method(self, parser, recorder, categories):
BASE_URL = "https://www.decathlon.pl"
for i, category in enumerate(categories):
csv_name = "_".join(category.split("/")[4:]).replace(":", "-").replace("?", "_").replace("=", "_")
filepath = os.path.join(recorder.record_folder, f"{csv_name}.xlsx")
# Проверяем наличие файла, и если он есть — пропускаем парсинг категории
if os.path.isfile(filepath):
print(f"Файл {csv_name}.xlsx уже существует. Пропускаем категорию: {category}")
continue
table = [self.headers]
print(f"Categories: {i + 1} / {len(categories)}", category)
continue_loop = True
category_from = 0
while continue_loop:
# Дальше без изменений твой текущий код
category_page = parser.parse(f"{category}?from={category_from}")
category_soup = BeautifulSoup(category_page, "html.parser")
dom_tree = etree.HTML(str(category_soup))
offers_count_element = dom_tree.xpath('//*[@id="start-of-listing"]/div[2]/div/span[1]')
if offers_count_element:
offers_count = int(offers_count_element[0].text.strip())
else:
print("Не найдено количество товаров")
offers_count = 0
break # если не нашли количество товаров, нет смысла продолжать
products_links = category_soup.select('a.dpb-product-link')
products_links_count = len(products_links)
for e, product_link in enumerate(products_links):
product_url = BASE_URL + product_link.get("href")
print(f"Products: {e + 1 + category_from} / {offers_count}", product_url)
product_page = parser.parse(product_url)
if product_page == None:
continue
soup = BeautifulSoup(product_page, "html.parser")
meta_script_tags = soup.select("[type=\"application/ld+json\"]")
if len(meta_script_tags) <= 1:
continue
meta_data = loads(meta_script_tags[1].text)
path_steps = []
for step in meta_data["itemListElement"]:
path_steps.append(step["item"]["name"])
product_path = "decathlon/" + "/".join(path_steps)
script_json = soup.select("#__dkt")[0].text
# Находим начало JSON
json_start_match = re.search(r'__DKT\s*=\s*({)', script_json)
if json_start_match:
# Индекс начала JSON-объекта
start = json_start_match.start(1)
# Считаем баланс фигурных скобок, чтобы извлечь точный JSON
bracket_count = 0
for i in range(start, len(script_json)):
if script_json[i] == '{':
bracket_count += 1
elif script_json[i] == '}':
bracket_count -= 1
if bracket_count == 0:
# JSON найден полностью
json_text = script_json[start:i+1]
break
else:
print("Ошибка: JSON не сбалансирован.")
continue
# Теперь парсим
try:
__dkt = json.loads(json_text)
except json.JSONDecodeError as e:
print(f"Ошибка парсинга JSON: {e}")
continue
else:
print("Ошибка: не найдено начало JSON (__DKT).")
continue
#__dkt = loads(script_json.text.replace("__DKT = ", ""))
#try:
# __dkt = json.loads(json_text)
#except json.JSONDecodeError as e:
# print(f"Ошибка парсинга JSON: {e}")
# continue
# Отладочный вывод структуры данных
#print(json.dumps(__dkt["_ctx"]["data"], indent=2)[:2000])
#if __dkt["_ctx"]["page"]["id"] != "product":
# continue
########было вот так models_data = __dkt["_ctx"]["data"][4]["data"]["models"]
# Новый правильный путь
model_info = __dkt["_ctx"]["data"][2]["data"]
if "models" not in model_info or not model_info["models"]:
print(f"Ошибка: нет 'models' для товара {product_url}")
continue
model = model_info["models"][0]
# print(json.dumps(model_info, indent=2)) #### Отадка посмотреть что в json
color = ""
colors = []
# проверь есть ли сейчас colors в новой структуре, возможно нужно будет адаптировать и это
if model.get("colors"):
for color_info in model["colors"]:
# Берём label, если нет — name, если и этого нет — id
label = color_info.get("label") or color_info.get("name") or color_info.get("title") or color_info.get("id") or ""
if label:
colors.append(label.strip())
color = " / ".join(colors)
else:
color = ""
images = []
if model.get("images"):
for image_info in model["images"]["product"]:
images.append(image_info["url"].replace("/250x250", ""))
image_lines = "\n".join(images)
product_name = model["webLabel"]
#product_description = soup.select("[id^=\"ProductFunctionalities\"]") ТАК БЫЛО
description_parts = []
description_set = set() # Для проверки уникальности
def add_unique_description(text):
text_cleaned = text.strip()
if text_cleaned and text_cleaned not in description_set:
description_parts.append(text_cleaned)
description_set.add(text_cleaned)
# 1. MarketplaceProductDescription
description_data = next((item for item in __dkt["_ctx"]["data"] if item["type"] == "MarketplaceProductDescription"), None)
if description_data and "data" in description_data and "description" in description_data["data"]:
add_unique_description(description_data["data"]["description"])
# 2. ProductConception
description_data = next((item for item in __dkt["_ctx"]["data"] if item["type"] == "ProductConception"), None)
if description_data and "data" in description_data and "conception" in description_data["data"]:
conception = description_data["data"]["conception"]
if isinstance(conception, list) and conception:
conception_text = conception[0].get("description", "")
add_unique_description(conception_text)
# 3. ProductFunctionalities
description_data = next((item for item in __dkt["_ctx"]["data"] if item["type"] == "ProductFunctionalities"), None)
if description_data and "data" in description_data and "functionalities" in description_data["data"]:
functionalities = description_data["data"]["functionalities"]
if isinstance(functionalities, list):
func_text = "\n".join(f"{func.get('title', '')}: {func.get('value', '')}" for func in functionalities)
add_unique_description(func_text)
# 4. MarketplaceProductTechnicalInformations
tech_info = next((item for item in __dkt["_ctx"]["data"] if item["type"] == "MarketplaceProductTechnicalInformations"), None)
if tech_info and "data" in tech_info and "information" in tech_info["data"]:
information = tech_info["data"]["information"]
if isinstance(information, list):
info_text = "\n".join(f"{info.get('key', '')}: {info.get('value', '')}" for info in information)
add_unique_description(info_text)
# 5. ProductGuarantee
# guarantee_data = next((item for item in __dkt["_ctx"]["data"] if item["type"] == "ProductGuarantee"), None)
# if guarantee_data and "data" in guarantee_data and "guarantee" in guarantee_data["data"]:
# guarantee_text = f"Gwarancja: {guarantee_data['data']['guarantee']} lat"
# add_unique_description(guarantee_text)
# 6. ProductBenefits
benefits_data = next((item for item in __dkt["_ctx"]["data"] if item["type"] == "ProductBenefits"), None)
if benefits_data and "data" in benefits_data and "benefits" in benefits_data["data"]:
benefits = benefits_data["data"]["benefits"]
if isinstance(benefits, list):
benefits_text = "\n".join(f"{benefit.get('label', '')}: {benefit.get('value', '')}" for benefit in benefits)
add_unique_description(benefits_text)
# Соединяем все уникальные описания через двойной перевод строки
product_description = "\n\n".join(description_parts)
# Проверочный вывод (первые 500 символов)
#print("Полное описание продукта:", product_description[:500])
# Дальше извлекаем данные о SKU и т.д., аналогично проверяя новые структуры
skus_data = model["skus"]
sku_ids = []
for sku in skus_data:
sku_ids.append(sku["skuId"])
sku_ids = ",".join(sku_ids)
stocks = parser.parse(f"https://www.decathlon.pl/pl/ajax/nfs/stocks/online?skuIds={sku_ids}", return_type="json")
for sku in skus_data:
try:
sku_id = sku["skuId"]
sku_id1 = model["modelId"]
stock = stocks[sku_id]["stockOnline"] if stocks.get(sku_id) else "unknown"
table_data = []
# Старый метод - не хорошо для существующих товаров на ОЗОН
#article = f'{sku_id1}-{sku_id}'
#article = sku_id1.split("-")[-1]
# Извлекаем article из product_url
article = ""
try:
base_part = product_url.split("?")[0] # убираем всё после ?
article_part = base_part.split("-")[-1]
article = f"{article_part}_Decathlon" # берём часть после последнего дефиса
except Exception:
article = ""
size = ""
if sku.get("size"):
size = sku["size"]
price = ""
if sku.get("price"):
price = sku["price"]
weight = ""
if sku.get("grossWeight"):
weight = float(sku["grossWeight"])
table_data.append([
product_url,
article,
product_name,
product_description,
stock,
color,
size,
price,
weight,
image_lines,
product_path
])
self.tags_extract(soup, table_data[-1])
table += table_data.copy()
except Exception as error:
print(f"Extractor Error: {error}")
if offers_count == products_links_count + category_from:
continue_loop = False
else:
category_from += products_links_count
csv_name = "_".join(category.split("/")[4:]).replace(":", "-").replace("?", "_").replace("=", "_")
recorder.record(csv_name, table)
def eobuwie_extract_method(self, parser, recorder, categories):
for i, category in enumerate(categories):
table = [self.headers]
print(f"Categories: {i + 1} / {len(categories)}", category)
category_page = 1
category_marka = category.split(":")[2].split("?")[0]
category_type = category.split("/")[4]
while True:
category_products_data = parser.parse(f"https://eobuwie.com.pl/t-api/rest/search/eobuwie/v5/search?channel=eobuwie&currency=PLN&locale=pl_PL&limit=100&page={category_page}&filters[marka][in][]={category_marka}&categories[]={category_type}&select[]=url_key&select[]=product_group_associated&select[]=images&select[]=final_price&select[]=footwear_size&select_locales[]=pl_PL", return_type="json")
total = category_products_data["total"]
products = category_products_data["products"]
for e, product in enumerate(products):
short_url = product["values"]["url_key"]["value"]["pl_PL"]
product_url = f"https://eobuwie.com.pl/p/{short_url}"
print(f"Products: {e + 1 + ((category_page - 1) * 100)} / {total}", product_url)
product_page = parser.parse(product_url)
if product_page == None:
continue
soup = BeautifulSoup(product_page, "html.parser")
links = soup.select(".breadcrumb-list .text-link")[2:]
product_location = "/".join(list(map(lambda x: x.text, links)))
product_group = ""
if product["values"].get("product_group_associated") and product["values"]["product_group_associated"].get("value"):
product_group = product["values"]["product_group_associated"]["value"]
product_name = soup.select("[data-test-id=\"product-name\"]")[0].text.strip()
product_name = split(r"\d", product_name)[0]
product_name = f"{product_name} - {product_group}"
images_list = []
if product["values"].get("images") and product["values"]["images"].get("value"):
for image in product["values"]["images"]["value"]:
if image.get("url"):
images_list.append(f'https://img.modivo.cloud/productcard({image["url"]},jpg)')
images_list = "\n".join(images_list)
for i, variant in enumerate(product["variants"].values()):
try:
table_data = []
size_url = variant["size"]
variant_url = f"{product_url}?size={size_url}"
article = variant["id"]
size_name = ""
if variant["values"].get("footwear_size"):
size_name = variant["values"]["footwear_size"]["value"]["label"]
description = ""
location = f"Каталог/Обувь и аксессуары/{product_location}"
availability = variant["stock_quantity"]
if variant["stock_quantity"]:
price = variant["offers"][0]["final_price"]["amount"]
else:
price = product["values"]["final_price"]["value"]["pl_PL"]["PLN"]["amount"]
table_data.append([
variant_url,
article,
size_name,
description,
product_name,
images_list,
location,
price,
availability
])
self.tags_extract(soup, table_data[-1])
table += table_data.copy()
except Exception as error:
print(f"Extractor Error: {error}")
if category_page * 100 >= total:
break
category_page += 1
csv_name = category.split("/")[-1].replace(":", "-").replace("?", "_").replace("=", "_")
recorder.record(csv_name, table)
def zarahome_extract_method(self, parser, recorder, categories):
for i, category in enumerate(categories):
table = [self.headers]
print(f"Categories: {i + 1} / {len(categories)}", category)
category_seo = parser.parse(f"{category}?itxSeo=true", return_type="json")
category_id = category_seo["categoryId"]
category_title = "/".join(category_seo["metaTitle"].split(" | ")[0].split(" - ")[::-1])
category_products_data = parser.parse(f"https://www.zarahome.com/itxrest/3/catalog/store/85009924/80290000/category/{category_id}/product?showProducts=true&languageId=-22&appId=1", return_type="json")
products = category_products_data["products"].values()
for e, product in enumerate(products):
if product.get("productUrlParam"):
continue
short_url = product.get("productUrl")
print(f"Products: {e + 1} / {len(products)}", f"https://www.zarahome.com/pl/{short_url}")
product_reference_id = product["detail"]["reference"].split("-")[0][1:9]
product_url = f"https://www.zarahome.com/itxrest/3/catalog/store/85009924/80290000/productsArray?languageId=-22&referenceIds={product_reference_id}&appId=1"
product_json = parser.parse(product_url, return_type="json")
if not product_json["products"][0].get("productUrl"):
continue
try:
table_data = []
category_path = f"Каталог/ZaraHome/{category_title}"
product_short_url = product_json["products"][0]["productUrl"]
url = f"https://www.zarahome.com/pl/{product_short_url}"
article = product_json["products"][0]["detail"]["displayReference"]
name = product_json["products"][0]["name"]
description = product_json["products"][0]["detail"]["longDescription"]
all_images = []
for location in product_json["products"][0]["detail"]["xmedia"]:
path = location["path"]
for media in location["xmediaItems"][0]["medias"]:
id_media = media["idMedia"]
all_images.append(f"https://static.zarahome.net/8/photos4{path}/{id_media}2.jpg")
all_images = "\n".join(all_images)
components = product_json["products"][0]["detail"]["compositionDetail"]
if components:
components = components["parts"]
else:
components = product_json["products"][0]["detail"]["colors"][0]["compositionDetail"]
if components:
components = components["parts"]
else:
components = {}
if components != {}:
composition = extract_components_zarahome(components)
composition = "\n".join(composition)
else:
composition = ""
care = []
for part in product_json["products"][0]["detail"]["care"]:
care_description = part["description"]
care.append(care_description)
care = "\n".join(care)
traceability = []
for part in product_json["products"][0]["detail"]["colors"][0]["traceability"].values():
if type(part) is dict and part.get("country") and part.get("name"):
traceability_name = part["name"]
traceability_country = "\n".join(part["country"])
traceability.append(f"{traceability_name}\n{traceability_country}")
traceability = "\n".join(traceability)
number = 0
for i, color in enumerate(product_json["products"][0]["detail"]["colors"]):
if color["image"] == None:
continue
color_code = color["id"]
current_images = []
for location in product_json["products"][0]["detail"]["xmedia"]:
if location["colorCode"] == color_code:
path = location["path"]
for media in location["xmediaItems"][0]["medias"]:
id_media = media["idMedia"]
current_images.append(
f"https://static.zarahome.net/8/photos4{path}/{id_media}2.jpg")
break
current_images = "\n".join(current_images)
color_url = color["image"]["url"]
color_image = f"https://static.zarahome.net/8/photos4{color_url}_3_1_5.jpg"
color_name = color["name"]
for e, size in enumerate(color["sizes"]):
if size["visibilityValue"] != "SHOW":
continue
article_number = "" if number == 0 else f"-{number}"
size_weight = size["weight"]
size_name = size["name"]
size_description = size["description"]
size_full_name = f"{size_description} ({size_name})" if size_description else size_name
size_buy_price = int(size["price"]) / 100
table_data.append([
url,
f"{article}{article_number}",
name,
description,
color_image,
color_name,
size_full_name,
size_buy_price,
size_weight,
all_images,
current_images,
composition,
care,
traceability,
category_path,
"ZARAHOME"
])
number += 1
table += table_data.copy()
except Exception as error:
print(f"Extractor Error: {error}")
csv_name = category.split("/")[-1]
recorder.record(csv_name, table)
def get_extractor():
with open(abspath("parse_settings.json"), "r", encoding="utf-8") as file:
return Extractor(load(file))