# -*- coding: utf-8 -*- """ 消息模块基础, 用于打开浏览器等相关操作 """ import random from playwright.sync_api import sync_playwright import sys import os import time sys.path.append(os.path.join(os.path.abspath(__file__).split('auto')[0] + 'auto')) from utils.utils_logs_handle import LogsHandle class CryptoCrawler: def __init__(self, url_list, selectors, check_difference=False, headless=True, proxy=False): self.url_list = url_list self.selectors = selectors self.check_difference = check_difference # 用于检测数据是否发生变化 (开关) self.data_difference = False # 用于检测数据是否发生变化 (结果) (默认 否) self.logs_handle = LogsHandle() # 记录日志 self.db = 'CHECK' self.collection = 'check' self.headless = headless self.proxy = proxy def main(self): with sync_playwright() as playwright: if self.proxy: browser = playwright.webkit.launch(headless=self.headless, proxy={'server': '127.0.0.1:7890'}) else: browser = playwright.webkit.launch(headless=self.headless) context = browser.new_context(viewport={'width': 1920, 'height': 1080}) page = context.new_page() all_data = [] for url_info in self.url_list: for key, url in url_info.items(): result_list = [] try: page.goto(url) page.wait_for_load_state('load') time.sleep(5) # 确保页面完全加载 for selector in self.selectors: element = page.query_selector(selector) if element: res = element.text_content().strip() result_list.append({key: res}) except Exception as e: err_str = f"Error fetching {url}: {e}" self.logs_handle.logs_write(self.collection, err_str, 'error', False) continue if result_list: all_data.append(result_list) time.sleep(random.randint(1, 3)) browser.close() if all_data: return all_data else: return None