# -*- coding: utf-8 -*- import os import re import json import uuid import httpx import asyncio import datetime from bs4 import BeautifulSoup from ollama import Client as oClient from openai import OpenAI from playwright.async_api import async_playwright from matrix_client.client import MatrixClient from matrix_client.api import MatrixHttpApi key_list = ['web3'] text_batch = 1 class FREEAI(object): def call_ai(self, message): try: client = OpenAI( api_key="sk-rM32T5VuyyCFyZGyEe006aEdFe6e4301A7627f7a3973Df17", base_url="https://knox.chat/v1", ) completion = client.chat.completions.create( model="deepseek-ai/DeepSeek-R1-Distill-Llama-70B-free", messages=[{"role": "user", "content": f"{message}"}], temperature=0.3, ) result = completion.choices[0].message.content return result except Exception as e: print(e) class OllamaChat(object): def __init__(self, host='http://192.168.31.28:11434'): self.host = host def call_ollama(self, role, text, prompt_words, model='llava:13b', temperature=0.4): # 使用 ollama 里面的模型 message = text + '\n\n' + prompt_words print(f'use model: {model}') try: response_iter = oClient(host=self.host).chat(model=model, messages=[ {'role': 'system', 'content': role}, {'role': 'user', 'content': message} ], options={"temperature": temperature}, stream=False) return response_iter['message']['content'] except Exception as e: print(f"\n发生错误: {e}") return None class MatrixBot: def __init__(self, user, password, key): self.base_url = "https://matrix.erhe.top" self.user = user self.password = password self.client = MatrixClient("https://matrix.erhe.top") self.token = self.login() self.to = key def login(self): self.token = self.client.login(username=self.user, password=self.password) return self.token def send_message(self, message): if self.token: try: api = MatrixHttpApi(self.base_url, token=self.token) api.send_message(self.to, message) except Exception as e: print(e) api = MatrixHttpApi(self.base_url, token=self.token) api.send_message(self.to, str(e)) else: print("Bot is not logged in. Please login first.") class AINEWS: def create_config_if_not_exists(self): # 如果当前路径无 config 则新建 config.json, 并写入一个配置例子 current_dir = os.path.dirname(os.path.abspath(__file__)) # 获取当前文件的目录路径 # 构建 config.json 文件的完整路径 config_path = os.path.join(current_dir, 'config.json') # 检查 config.json 文件是否存在 if not os.path.exists(config_path): # 如果不存在,创建并写入默认的 JSON 数据 default_config = { "example": { "use_browser": 0, "ai_host": 'http://127.0.0.1:11434(不需要此功能留空)', "message_bot_key": '填入matrix的key(不需要此功能留空)', "target_url_list": ['目标网站'], "role": "AI的角色, 例如: 你是一个聊天机器人", "prompt_words": "提示词: 帮我总结, 用中文回复" } } # 写入 JSON 数据到 config.json 文件 with open(config_path, 'w', encoding='utf-8') as f: json.dump(default_config, f, indent=4) print(f"Created {config_path} with default configuration.") def mkdir_save_data(self): # 获取当前脚本所在路径 current_file_path = os.path.dirname(__file__) # 拼接 save_data 文件夹路径 save_file_path = os.path.join(current_file_path, 'save_data') # 如果 save_data 文件夹不存在,则创建 if not os.path.exists(save_file_path): os.makedirs(save_file_path) # 在 save_data 文件夹中,创建一个以当前日期时间命名的子文件夹 datetime_file_name = datetime.datetime.now().strftime("%Y_%m_%d_%H_%M_%S") datetime_file_path = os.path.join(save_file_path, datetime_file_name) if not os.path.exists(datetime_file_path): os.makedirs(datetime_file_path) return datetime_file_path def save_to_txt(self, url_to_text, datetime_file_path): # 将爬取的新闻 保存到 txt 文件中 file = os.path.join(datetime_file_path, 'all_page_data.txt') with open(file, 'w', encoding='utf-8') as file: file.write(str(url_to_text)) # region 读取配置文件 def load_config(self, key): # 读取配置文件 config.json # 如果当前路径无 config 则新建 config.json, 并写入一个配置例子 current_dir = os.path.dirname(os.path.abspath(__file__)) # 获取当前文件的目录路径 # 构建 config.json 文件的完整路径 config_path = os.path.join(current_dir, 'config.json') config = {} if os.path.exists('config.json'): with open('config.json', 'r', encoding='utf-8') as f: config = json.load(f) if not config: # 检查 config.json 文件是否存在 if not os.path.exists(config_path): # 如果不存在,创建并写入默认的 JSON 数据 default_config = { "example": { "use_browser": 0, "ai_host": 'http://127.0.0.1:11434(不需要此功能留空)', "message_bot_key": '填入matrix的key(不需要此功能留空)', "target_url_list": ['目标网站'], "role": "AI的角色, 例如: 你是一个聊天机器人", "prompt_words": "提示词: 帮我总结, 用中文回复" } } # 写入 JSON 数据到 config.json 文件 with open(config_path, 'w', encoding='utf-8') as f: json.dump(default_config, f, indent=4) print(f"Created {config_path} with default configuration.") exit(0) k = config[key] return k # return k['target_url_list'], k['prompt_words'], k['role'], k['use_browser'], k['ai_host'], k['message_bot_key'] # endregion # region 使用httpx获取网页内容 async def get_htmls(self, urls): # 获取 HTML async with httpx.AsyncClient() as client: async def get_html(url): try: print(f'正在打开: {url}') # 发送 GET 请求获取页面内容 response = await client.get(url) response.raise_for_status() # 确保请求成功 # 使用 BeautifulSoup 解析 HTML 内容 soup = BeautifulSoup(response.text, 'html.parser') # 提取纯文本内容 text = soup.get_text(separator=' ', strip=True) # 去除多余的空白字符 cleaned_text = re.sub(r'\s+', ' ', text).strip() return url, cleaned_text except Exception as e: print(f"Error fetching {url}: {e}") return url, "" # 使用 asyncio.gather 同时获取所有网站的 HTML tasks = [get_html(url) for url in urls] results = await asyncio.gather(*tasks) # 将结果存储在字典中 url_to_text = {url: text for url, text in results} return url_to_text # endregion # region 使用Playwright获取HTML内容 async def get_htmls_with_browser(self, urls, datetime_file_path): # 使用 Playwright 获取 HTML 内容 url_to_text = {} async with async_playwright() as p: # 启动浏览器 browser = await p.chromium.launch(headless=True) # 创建浏览器上下文 context = await browser.new_context() async def get_html(url): try: print(f'正在打开: {url}') # 在上下文中打开新页面 page = await context.new_page() # 导航到指定网址 await page.goto(url) # 禁止弹框 await self.disable_dialogs(page) # 调用 disable_images 方法阻止图片加载并隐藏图片 await self.disable_images(page) # 滚动页面以加载动态内容 await self.scroll_to_percentage(page) # 顺手截图 await self.screenshot(page, datetime_file_path) # 获取渲染后的 HTML html = await page.content() # 使用 BeautifulSoup 解析 HTML 内容 soup = BeautifulSoup(html, 'html.parser') # 提取纯文本内容 text = soup.get_text(separator=' ', strip=True) # 去除多余的空白字符 cleaned_text = re.sub(r'\s+', ' ', text).strip() # 关闭页面 await page.close() return url, cleaned_text except Exception as e: print(f"Error fetching {url}: {e}") return url, "" # 使用 asyncio.gather 同时获取所有网站的 HTML tasks = [get_html(url) for url in urls] results = await asyncio.gather(*tasks) # 将结果存储在字典中 url_to_text = {url: text for url, text in results} # 关闭上下文和浏览器 await context.close() await browser.close() return url_to_text # endregion # region 滚动页面 @staticmethod async def scroll_to_percentage(page): # 获取页面标题并打印 title = await page.title() print(f'正在滚动浏览器页面: {title}') percentage_list = [i for i in range(5, 101, 2)] for percentage in percentage_list: # 计算页面的指定百分比高度 height = await page.evaluate("() => document.body.scrollHeight") scroll_position = height * (percentage / 100) # 跳转到指定的百分比位置 await page.evaluate(f"window.scrollTo({{top: {scroll_position}, behavior: 'smooth'}})") await asyncio.sleep(0.5) # 使用异步 sleep await page.evaluate("window.scrollTo({top: 0, behavior: 'smooth'})") # endregion # region 网页截图 @staticmethod async def screenshot(page, datetime_file_path): # 顺手截图 # 获取网页的 title title = await page.title() # 替换不合法的字符 cleaned_title = re.sub(r'[\\/:*?"<>|]', '', title) # 如果 title 为空,使用默认名称 if not cleaned_title: cleaned_title = "untitled" # 拼接截图文件路径 screenshot_path = os.path.join(datetime_file_path, f"{cleaned_title}_{uuid.uuid4().hex[:6]}.png") # 进行整页截图 await page.screenshot(path=screenshot_path, full_page=True) print(f"截图已保存到: {screenshot_path}") # endregion # region 禁止网页显示图片 async def disable_images(self, page): # 调用 JavaScript 函数阻止图片加载并隐藏图片 await page.evaluate('''() => { function disableImages() { // 阻止所有图片加载 document.querySelectorAll('img').forEach(img => { img.src = ''; // 清空 src 属性 img.removeAttribute('srcset'); // 移除 srcset 属性(如果有) }); // 隐藏所有图片 document.querySelectorAll('img').forEach(img => { img.style.display = 'none'; }); } disableImages(); // 调用函数 }''') # endregion # region 覆盖JavaScript的弹框方法,使其无效 async def disable_dialogs(self, page): # 覆盖 JavaScript 的弹框方法,使其无效 await page.evaluate('''() => { window.alert = () => {}; window.confirm = () => true; // confirm 默认返回 true window.prompt = () => null; // prompt 默认返回 null }''') # endregion # region AI处理数据 def process_data(self, result_text, prompt_words, role, ai_host): # 整理获取的数据, 返回准备发送的数据 process_send = [] O = OllamaChat(ai_host) if text_batch: for k, v in result_text.items(): response_context = FREEAI().call_ai(v) # response_context = O.call_ollama(role, v, prompt_words) if response_context: message = f'{k}\n{response_context}\n' process_send.append(message) else: t = '' for k, v in result_text.items(): t += f'{k}\n{v}\n' response_context = O.call_ollama(role, t, prompt_words) if response_context: process_send.append(response_context) return process_send # endregion # region 主函数 def main(self, config): target_url_list = config['target_url_list'] prompt_words = config['prompt_words'] role = config['role'] use_browser = config['use_browser'] ai_host = config['ai_host'] message_bot_key = config['message_bot_key'] use_ai = config['use_ai'] # 获取所有的网页html内容 if use_browser: result_text = asyncio.run(self.get_htmls_with_browser(target_url_list, datetime_file_path)) else: result_text = asyncio.run(self.get_htmls(target_url_list)) # 保存文本 if result_text: print(f'共获取 {len(result_text)} 个网址的数据') self.save_to_txt(result_text, datetime_file_path) else: print('无数据, 程序退出') exit(0) print(f'{role}\n{prompt_words}') if use_ai: # 处理发送 text 数据 process_send = self.process_data(result_text, prompt_words, role, ai_host) # 创建消息bot实例 bot = MatrixBot('message-bot', 'aaaAAA111!!!', message_bot_key) # 发送消息 for process_text in process_send: bot.send_message(process_text) # endregion if __name__ == "__main__": ainews = AINEWS() ainews.create_config_if_not_exists() datetime_file_path = ainews.mkdir_save_data() for key in key_list: config = ainews.load_config(key) target_url_list = config['target_url_list'] print(f'关键词 {key} 共有 {len(target_url_list)} 个网址') ainews.main(config) print('done!')