from json import load, loads from os.path import abspath from bs4 import BeautifulSoup from lxml import etree from re import split import re import json import os def extract_components_zarahome(parts): composition = [] for part in parts: if part.get("areas") and part.get("description"): if len(parts) != 1: composition.append(part["description"]) for area in part["areas"]: area_name = area["description"] percentage_area = area["percentageArea"] composition.append(f"{area_name} ({percentage_area})") for component in area["components"]: material = component["material"] percentage = component["percentage"] composition.append(f"{percentage} {material}") elif part.get("components") and part.get("description"): if len(parts) != 1: composition.append(part["description"]) for component in part["components"]: material = component["material"] percentage = component["percentage"] composition.append(f"{percentage} {material}") return composition # класс для извлечения нужных данных class Extractor: def __init__(self, json_data): self.methods = { "": (self.default_extract_method, []), "zarahome": (self.zarahome_extract_method, [ "Краткое описание", "Артикул", "Название товара или услуги", "Полное описание", "Образец цвета", "Свойство: Цвет", "Свойство: Размер", "Цена закупки", "Свойство: Вес(г)", "Изображения", "Изображения варианта", "Параметр: Состав", "Параметр: Уход", "Параметр: Происхождение", "Размещение на сайте", "Свойство: Бренд" ]), "eobuwie": (self.eobuwie_extract_method, [ "Краткое описание", "Артикул", "Свойство: Размер", "Полное описание(Таблица)", "Название товара или услуги", "Изображения", "Размещение на сайте", "Цена", "Наличие" ]), "decathlon": (self.decathlon_extract_method, [ "Краткое описание", "Артикул", "Название товара или услуги", "Полное описание", "Наличие", "Свойство: Цвет", "Свойство: Размер", "Цена закупки", "Параметр: Вес(г)", "Изображения варианта", "Размещение на сайте" ]) } self.method = json_data["method"] self.tags = json_data["tags"] self.headers = self.methods[self.method][1] for tag in self.tags: self.headers.insert(tag["column_number"], tag["column_name"]) def extract(self, parser, recorder, categories): self.methods[self.method][0](parser, recorder, categories) def default_extract_method(self): pass def tags_extract(self, soup, row): dom_tree = etree.HTML(str(soup)) for tag in self.tags: xpath_result = dom_tree.xpath(tag["xpath"]) column_data = "" if len(xpath_result): for element in xpath_result: column_data = ''.join(element.itertext()).strip() + "\n" row.insert(tag["column_number"], column_data) def decathlon_extract_method(self, parser, recorder, categories): BASE_URL = "https://www.decathlon.pl" for i, category in enumerate(categories): csv_name = "_".join(category.split("/")[4:]).replace(":", "-").replace("?", "_").replace("=", "_") filepath = os.path.join(recorder.record_folder, f"{csv_name}.xlsx") # Проверяем наличие файла, и если он есть — пропускаем парсинг категории if os.path.isfile(filepath): print(f"Файл {csv_name}.xlsx уже существует. Пропускаем категорию: {category}") continue table = [self.headers] print(f"Categories: {i + 1} / {len(categories)}", category) continue_loop = True category_from = 0 while continue_loop: # Дальше без изменений твой текущий код category_page = parser.parse(f"{category}?from={category_from}") category_soup = BeautifulSoup(category_page, "html.parser") dom_tree = etree.HTML(str(category_soup)) offers_count_element = dom_tree.xpath('//*[@id="start-of-listing"]/div[2]/div/span[1]') if offers_count_element: offers_count = int(offers_count_element[0].text.strip()) else: print("Не найдено количество товаров") offers_count = 0 break # если не нашли количество товаров, нет смысла продолжать products_links = category_soup.select('a.dpb-product-link') products_links_count = len(products_links) for e, product_link in enumerate(products_links): product_url = BASE_URL + product_link.get("href") print(f"Products: {e + 1 + category_from} / {offers_count}", product_url) product_page = parser.parse(product_url) if product_page == None: continue soup = BeautifulSoup(product_page, "html.parser") meta_script_tags = soup.select("[type=\"application/ld+json\"]") if len(meta_script_tags) <= 1: continue meta_data = loads(meta_script_tags[1].text) path_steps = [] for step in meta_data["itemListElement"]: path_steps.append(step["item"]["name"]) product_path = "decathlon/" + "/".join(path_steps) script_json = soup.select("#__dkt")[0].text # Находим начало JSON json_start_match = re.search(r'__DKT\s*=\s*({)', script_json) if json_start_match: # Индекс начала JSON-объекта start = json_start_match.start(1) # Считаем баланс фигурных скобок, чтобы извлечь точный JSON bracket_count = 0 for i in range(start, len(script_json)): if script_json[i] == '{': bracket_count += 1 elif script_json[i] == '}': bracket_count -= 1 if bracket_count == 0: # JSON найден полностью json_text = script_json[start:i+1] break else: print("Ошибка: JSON не сбалансирован.") continue # Теперь парсим try: __dkt = json.loads(json_text) except json.JSONDecodeError as e: print(f"Ошибка парсинга JSON: {e}") continue else: print("Ошибка: не найдено начало JSON (__DKT).") continue #__dkt = loads(script_json.text.replace("__DKT = ", "")) #try: # __dkt = json.loads(json_text) #except json.JSONDecodeError as e: # print(f"Ошибка парсинга JSON: {e}") # continue # Отладочный вывод структуры данных #print(json.dumps(__dkt["_ctx"]["data"], indent=2)[:2000]) #if __dkt["_ctx"]["page"]["id"] != "product": # continue ########было вот так models_data = __dkt["_ctx"]["data"][4]["data"]["models"] # Новый правильный путь model_info = __dkt["_ctx"]["data"][2]["data"] if "models" not in model_info or not model_info["models"]: print(f"Ошибка: нет 'models' для товара {product_url}") continue model = model_info["models"][0] # print(json.dumps(model_info, indent=2)) #### Отадка посмотреть что в json color = "" colors = [] # проверь есть ли сейчас colors в новой структуре, возможно нужно будет адаптировать и это if model.get("colors"): for color_info in model["colors"]: # Берём label, если нет — name, если и этого нет — id label = color_info.get("label") or color_info.get("name") or color_info.get("title") or color_info.get("id") or "" if label: colors.append(label.strip()) color = " / ".join(colors) else: color = "" images = [] if model.get("images"): for image_info in model["images"]["product"]: images.append(image_info["url"].replace("/250x250", "")) image_lines = "\n".join(images) product_name = model["webLabel"] #product_description = soup.select("[id^=\"ProductFunctionalities\"]") ТАК БЫЛО description_parts = [] description_set = set() # Для проверки уникальности def add_unique_description(text): text_cleaned = text.strip() if text_cleaned and text_cleaned not in description_set: description_parts.append(text_cleaned) description_set.add(text_cleaned) # 1. MarketplaceProductDescription description_data = next((item for item in __dkt["_ctx"]["data"] if item["type"] == "MarketplaceProductDescription"), None) if description_data and "data" in description_data and "description" in description_data["data"]: add_unique_description(description_data["data"]["description"]) # 2. ProductConception description_data = next((item for item in __dkt["_ctx"]["data"] if item["type"] == "ProductConception"), None) if description_data and "data" in description_data and "conception" in description_data["data"]: conception = description_data["data"]["conception"] if isinstance(conception, list) and conception: conception_text = conception[0].get("description", "") add_unique_description(conception_text) # 3. ProductFunctionalities description_data = next((item for item in __dkt["_ctx"]["data"] if item["type"] == "ProductFunctionalities"), None) if description_data and "data" in description_data and "functionalities" in description_data["data"]: functionalities = description_data["data"]["functionalities"] if isinstance(functionalities, list): func_text = "\n".join(f"{func.get('title', '')}: {func.get('value', '')}" for func in functionalities) add_unique_description(func_text) # 4. MarketplaceProductTechnicalInformations tech_info = next((item for item in __dkt["_ctx"]["data"] if item["type"] == "MarketplaceProductTechnicalInformations"), None) if tech_info and "data" in tech_info and "information" in tech_info["data"]: information = tech_info["data"]["information"] if isinstance(information, list): info_text = "\n".join(f"{info.get('key', '')}: {info.get('value', '')}" for info in information) add_unique_description(info_text) # 5. ProductGuarantee # guarantee_data = next((item for item in __dkt["_ctx"]["data"] if item["type"] == "ProductGuarantee"), None) # if guarantee_data and "data" in guarantee_data and "guarantee" in guarantee_data["data"]: # guarantee_text = f"Gwarancja: {guarantee_data['data']['guarantee']} lat" # add_unique_description(guarantee_text) # 6. ProductBenefits benefits_data = next((item for item in __dkt["_ctx"]["data"] if item["type"] == "ProductBenefits"), None) if benefits_data and "data" in benefits_data and "benefits" in benefits_data["data"]: benefits = benefits_data["data"]["benefits"] if isinstance(benefits, list): benefits_text = "\n".join(f"{benefit.get('label', '')}: {benefit.get('value', '')}" for benefit in benefits) add_unique_description(benefits_text) # Соединяем все уникальные описания через двойной перевод строки product_description = "\n\n".join(description_parts) # Проверочный вывод (первые 500 символов) #print("Полное описание продукта:", product_description[:500]) # Дальше извлекаем данные о SKU и т.д., аналогично проверяя новые структуры skus_data = model["skus"] sku_ids = [] for sku in skus_data: sku_ids.append(sku["skuId"]) sku_ids = ",".join(sku_ids) stocks = parser.parse(f"https://www.decathlon.pl/pl/ajax/nfs/stocks/online?skuIds={sku_ids}", return_type="json") for sku in skus_data: try: sku_id = sku["skuId"] sku_id1 = model["modelId"] stock = stocks[sku_id]["stockOnline"] if stocks.get(sku_id) else "unknown" table_data = [] # Старый метод - не хорошо для существующих товаров на ОЗОН #article = f'{sku_id1}-{sku_id}' #article = sku_id1.split("-")[-1] # Извлекаем article из product_url article = "" try: base_part = product_url.split("?")[0] # убираем всё после ? article_part = base_part.split("-")[-1] article = f"{article_part}_Decathlon" # берём часть после последнего дефиса except Exception: article = "" size = "" if sku.get("size"): size = sku["size"] price = "" if sku.get("price"): price = sku["price"] weight = "" if sku.get("grossWeight"): weight = float(sku["grossWeight"]) table_data.append([ product_url, article, product_name, product_description, stock, color, size, price, weight, image_lines, product_path ]) self.tags_extract(soup, table_data[-1]) table += table_data.copy() except Exception as error: print(f"Extractor Error: {error}") if offers_count == products_links_count + category_from: continue_loop = False else: category_from += products_links_count csv_name = "_".join(category.split("/")[4:]).replace(":", "-").replace("?", "_").replace("=", "_") recorder.record(csv_name, table) def eobuwie_extract_method(self, parser, recorder, categories): for i, category in enumerate(categories): table = [self.headers] print(f"Categories: {i + 1} / {len(categories)}", category) category_page = 1 category_marka = category.split(":")[2].split("?")[0] category_type = category.split("/")[4] while True: category_products_data = parser.parse(f"https://eobuwie.com.pl/t-api/rest/search/eobuwie/v5/search?channel=eobuwie¤cy=PLN&locale=pl_PL&limit=100&page={category_page}&filters[marka][in][]={category_marka}&categories[]={category_type}&select[]=url_key&select[]=product_group_associated&select[]=images&select[]=final_price&select[]=footwear_size&select_locales[]=pl_PL", return_type="json") total = category_products_data["total"] products = category_products_data["products"] for e, product in enumerate(products): short_url = product["values"]["url_key"]["value"]["pl_PL"] product_url = f"https://eobuwie.com.pl/p/{short_url}" print(f"Products: {e + 1 + ((category_page - 1) * 100)} / {total}", product_url) product_page = parser.parse(product_url) if product_page == None: continue soup = BeautifulSoup(product_page, "html.parser") links = soup.select(".breadcrumb-list .text-link")[2:] product_location = "/".join(list(map(lambda x: x.text, links))) product_group = "" if product["values"].get("product_group_associated") and product["values"]["product_group_associated"].get("value"): product_group = product["values"]["product_group_associated"]["value"] product_name = soup.select("[data-test-id=\"product-name\"]")[0].text.strip() product_name = split(r"\d", product_name)[0] product_name = f"{product_name} - {product_group}" images_list = [] if product["values"].get("images") and product["values"]["images"].get("value"): for image in product["values"]["images"]["value"]: if image.get("url"): images_list.append(f'https://img.modivo.cloud/productcard({image["url"]},jpg)') images_list = "\n".join(images_list) for i, variant in enumerate(product["variants"].values()): try: table_data = [] size_url = variant["size"] variant_url = f"{product_url}?size={size_url}" article = variant["id"] size_name = "" if variant["values"].get("footwear_size"): size_name = variant["values"]["footwear_size"]["value"]["label"] description = "" location = f"Каталог/Обувь и аксессуары/{product_location}" availability = variant["stock_quantity"] if variant["stock_quantity"]: price = variant["offers"][0]["final_price"]["amount"] else: price = product["values"]["final_price"]["value"]["pl_PL"]["PLN"]["amount"] table_data.append([ variant_url, article, size_name, description, product_name, images_list, location, price, availability ]) self.tags_extract(soup, table_data[-1]) table += table_data.copy() except Exception as error: print(f"Extractor Error: {error}") if category_page * 100 >= total: break category_page += 1 csv_name = category.split("/")[-1].replace(":", "-").replace("?", "_").replace("=", "_") recorder.record(csv_name, table) def zarahome_extract_method(self, parser, recorder, categories): for i, category in enumerate(categories): table = [self.headers] print(f"Categories: {i + 1} / {len(categories)}", category) category_seo = parser.parse(f"{category}?itxSeo=true", return_type="json") category_id = category_seo["categoryId"] category_title = "/".join(category_seo["metaTitle"].split(" | ")[0].split(" - ")[::-1]) category_products_data = parser.parse(f"https://www.zarahome.com/itxrest/3/catalog/store/85009924/80290000/category/{category_id}/product?showProducts=true&languageId=-22&appId=1", return_type="json") products = category_products_data["products"].values() for e, product in enumerate(products): if product.get("productUrlParam"): continue short_url = product.get("productUrl") print(f"Products: {e + 1} / {len(products)}", f"https://www.zarahome.com/pl/{short_url}") product_reference_id = product["detail"]["reference"].split("-")[0][1:9] product_url = f"https://www.zarahome.com/itxrest/3/catalog/store/85009924/80290000/productsArray?languageId=-22&referenceIds={product_reference_id}&appId=1" product_json = parser.parse(product_url, return_type="json") if not product_json["products"][0].get("productUrl"): continue try: table_data = [] category_path = f"Каталог/ZaraHome/{category_title}" product_short_url = product_json["products"][0]["productUrl"] url = f"https://www.zarahome.com/pl/{product_short_url}" article = product_json["products"][0]["detail"]["displayReference"] name = product_json["products"][0]["name"] description = product_json["products"][0]["detail"]["longDescription"] all_images = [] for location in product_json["products"][0]["detail"]["xmedia"]: path = location["path"] for media in location["xmediaItems"][0]["medias"]: id_media = media["idMedia"] all_images.append(f"https://static.zarahome.net/8/photos4{path}/{id_media}2.jpg") all_images = "\n".join(all_images) components = product_json["products"][0]["detail"]["compositionDetail"] if components: components = components["parts"] else: components = product_json["products"][0]["detail"]["colors"][0]["compositionDetail"] if components: components = components["parts"] else: components = {} if components != {}: composition = extract_components_zarahome(components) composition = "\n".join(composition) else: composition = "" care = [] for part in product_json["products"][0]["detail"]["care"]: care_description = part["description"] care.append(care_description) care = "\n".join(care) traceability = [] for part in product_json["products"][0]["detail"]["colors"][0]["traceability"].values(): if type(part) is dict and part.get("country") and part.get("name"): traceability_name = part["name"] traceability_country = "\n".join(part["country"]) traceability.append(f"{traceability_name}\n{traceability_country}") traceability = "\n".join(traceability) number = 0 for i, color in enumerate(product_json["products"][0]["detail"]["colors"]): if color["image"] == None: continue color_code = color["id"] current_images = [] for location in product_json["products"][0]["detail"]["xmedia"]: if location["colorCode"] == color_code: path = location["path"] for media in location["xmediaItems"][0]["medias"]: id_media = media["idMedia"] current_images.append( f"https://static.zarahome.net/8/photos4{path}/{id_media}2.jpg") break current_images = "\n".join(current_images) color_url = color["image"]["url"] color_image = f"https://static.zarahome.net/8/photos4{color_url}_3_1_5.jpg" color_name = color["name"] for e, size in enumerate(color["sizes"]): if size["visibilityValue"] != "SHOW": continue article_number = "" if number == 0 else f"-{number}" size_weight = size["weight"] size_name = size["name"] size_description = size["description"] size_full_name = f"{size_description} ({size_name})" if size_description else size_name size_buy_price = int(size["price"]) / 100 table_data.append([ url, f"{article}{article_number}", name, description, color_image, color_name, size_full_name, size_buy_price, size_weight, all_images, current_images, composition, care, traceability, category_path, "ZARAHOME" ]) number += 1 table += table_data.copy() except Exception as error: print(f"Extractor Error: {error}") csv_name = category.split("/")[-1] recorder.record(csv_name, table) def get_extractor(): with open(abspath("parse_settings.json"), "r", encoding="utf-8") as file: return Extractor(load(file))