You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
auto/ai_news/ai_news.py

425 lines
16 KiB

# -*- coding: utf-8 -*-
import os
import re
import json
import uuid
import httpx
import asyncio
import datetime
from bs4 import BeautifulSoup
from ollama import Client as oClient
from openai import OpenAI
from playwright.async_api import async_playwright
from matrix_client.client import MatrixClient
from matrix_client.api import MatrixHttpApi
key_list = ['web3']
text_batch = 1
class FREEAI(object):
def call_ai(self, message):
try:
client = OpenAI(
api_key="sk-rM32T5VuyyCFyZGyEe006aEdFe6e4301A7627f7a3973Df17",
base_url="https://knox.chat/v1",
)
completion = client.chat.completions.create(
model="deepseek-ai/DeepSeek-R1-Distill-Llama-70B-free",
messages=[{"role": "user", "content": f"{message}"}],
temperature=0.3,
)
result = completion.choices[0].message.content
return result
except Exception as e:
print(e)
class OllamaChat(object):
def __init__(self, host='http://192.168.31.28:11434'):
self.host = host
def call_ollama(self, role, text, prompt_words, model='llava:13b', temperature=0.4):
# 使用 ollama 里面的模型
message = text + '\n\n' + prompt_words
print(f'use model: {model}')
try:
response_iter = oClient(host=self.host).chat(model=model,
messages=[
{'role': 'system', 'content': role},
{'role': 'user', 'content': message}
],
options={"temperature": temperature},
stream=False)
return response_iter['message']['content']
except Exception as e:
print(f"\n发生错误: {e}")
return None
class MatrixBot:
def __init__(self, user, password, key):
self.base_url = "https://matrix.erhe.top"
self.user = user
self.password = password
self.client = MatrixClient("https://matrix.erhe.top")
self.token = self.login()
self.to = key
def login(self):
self.token = self.client.login(username=self.user, password=self.password)
return self.token
def send_message(self, message):
if self.token:
try:
api = MatrixHttpApi(self.base_url, token=self.token)
api.send_message(self.to, message)
except Exception as e:
print(e)
api = MatrixHttpApi(self.base_url, token=self.token)
api.send_message(self.to, str(e))
else:
print("Bot is not logged in. Please login first.")
class AINEWS:
def create_config_if_not_exists(self):
# 如果当前路径无 config 则新建 config.json, 并写入一个配置例子
current_dir = os.path.dirname(os.path.abspath(__file__)) # 获取当前文件的目录路径
# 构建 config.json 文件的完整路径
config_path = os.path.join(current_dir, 'config.json')
# 检查 config.json 文件是否存在
if not os.path.exists(config_path):
# 如果不存在,创建并写入默认的 JSON 数据
default_config = {
"example": {
"use_browser": 0,
"ai_host": 'http://127.0.0.1:11434(不需要此功能留空)',
"message_bot_key": '填入matrix的key(不需要此功能留空)',
"target_url_list": ['目标网站'],
"role": "AI的角色, 例如: 你是一个聊天机器人",
"prompt_words": "提示词: 帮我总结, 用中文回复"
}
}
# 写入 JSON 数据到 config.json 文件
with open(config_path, 'w', encoding='utf-8') as f:
json.dump(default_config, f, indent=4)
print(f"Created {config_path} with default configuration.")
def mkdir_save_data(self):
# 获取当前脚本所在路径
current_file_path = os.path.dirname(__file__)
# 拼接 save_data 文件夹路径
save_file_path = os.path.join(current_file_path, 'save_data')
# 如果 save_data 文件夹不存在,则创建
if not os.path.exists(save_file_path):
os.makedirs(save_file_path)
# 在 save_data 文件夹中,创建一个以当前日期时间命名的子文件夹
datetime_file_name = datetime.datetime.now().strftime("%Y_%m_%d_%H_%M_%S")
datetime_file_path = os.path.join(save_file_path, datetime_file_name)
if not os.path.exists(datetime_file_path):
os.makedirs(datetime_file_path)
return datetime_file_path
def save_to_txt(self, url_to_text, datetime_file_path):
# 将爬取的新闻 保存到 txt 文件中
file = os.path.join(datetime_file_path, 'all_page_data.txt')
with open(file, 'w', encoding='utf-8') as file:
file.write(str(url_to_text))
# region 读取配置文件
def load_config(self, key):
# 读取配置文件 config.json
# 如果当前路径无 config 则新建 config.json, 并写入一个配置例子
current_dir = os.path.dirname(os.path.abspath(__file__)) # 获取当前文件的目录路径
# 构建 config.json 文件的完整路径
config_path = os.path.join(current_dir, 'config.json')
config = {}
if os.path.exists('config.json'):
with open('config.json', 'r', encoding='utf-8') as f:
config = json.load(f)
if not config:
# 检查 config.json 文件是否存在
if not os.path.exists(config_path):
# 如果不存在,创建并写入默认的 JSON 数据
default_config = {
"example": {
"use_browser": 0,
"ai_host": 'http://127.0.0.1:11434(不需要此功能留空)',
"message_bot_key": '填入matrix的key(不需要此功能留空)',
"target_url_list": ['目标网站'],
"role": "AI的角色, 例如: 你是一个聊天机器人",
"prompt_words": "提示词: 帮我总结, 用中文回复"
}
}
# 写入 JSON 数据到 config.json 文件
with open(config_path, 'w', encoding='utf-8') as f:
json.dump(default_config, f, indent=4)
print(f"Created {config_path} with default configuration.")
exit(0)
k = config[key]
return k
# return k['target_url_list'], k['prompt_words'], k['role'], k['use_browser'], k['ai_host'], k['message_bot_key']
# endregion
# region 使用httpx获取网页内容
async def get_htmls(self, urls):
# 获取 HTML
async with httpx.AsyncClient() as client:
async def get_html(url):
try:
print(f'正在打开: {url}')
# 发送 GET 请求获取页面内容
response = await client.get(url)
response.raise_for_status() # 确保请求成功
# 使用 BeautifulSoup 解析 HTML 内容
soup = BeautifulSoup(response.text, 'html.parser')
# 提取纯文本内容
text = soup.get_text(separator=' ', strip=True)
# 去除多余的空白字符
cleaned_text = re.sub(r'\s+', ' ', text).strip()
return url, cleaned_text
except Exception as e:
print(f"Error fetching {url}: {e}")
return url, ""
# 使用 asyncio.gather 同时获取所有网站的 HTML
tasks = [get_html(url) for url in urls]
results = await asyncio.gather(*tasks)
# 将结果存储在字典中
url_to_text = {url: text for url, text in results}
return url_to_text
# endregion
# region 使用Playwright获取HTML内容
async def get_htmls_with_browser(self, urls, datetime_file_path):
# 使用 Playwright 获取 HTML 内容
url_to_text = {}
async with async_playwright() as p:
# 启动浏览器
browser = await p.chromium.launch(headless=True)
# 创建浏览器上下文
context = await browser.new_context()
async def get_html(url):
try:
print(f'正在打开: {url}')
# 在上下文中打开新页面
page = await context.new_page()
# 导航到指定网址
await page.goto(url)
# 禁止弹框
await self.disable_dialogs(page)
# 调用 disable_images 方法阻止图片加载并隐藏图片
await self.disable_images(page)
# 滚动页面以加载动态内容
await self.scroll_to_percentage(page)
# 顺手截图
await self.screenshot(page, datetime_file_path)
# 获取渲染后的 HTML
html = await page.content()
# 使用 BeautifulSoup 解析 HTML 内容
soup = BeautifulSoup(html, 'html.parser')
# 提取纯文本内容
text = soup.get_text(separator=' ', strip=True)
# 去除多余的空白字符
cleaned_text = re.sub(r'\s+', ' ', text).strip()
# 关闭页面
await page.close()
return url, cleaned_text
except Exception as e:
print(f"Error fetching {url}: {e}")
return url, ""
# 使用 asyncio.gather 同时获取所有网站的 HTML
tasks = [get_html(url) for url in urls]
results = await asyncio.gather(*tasks)
# 将结果存储在字典中
url_to_text = {url: text for url, text in results}
# 关闭上下文和浏览器
await context.close()
await browser.close()
return url_to_text
# endregion
# region 滚动页面
@staticmethod
async def scroll_to_percentage(page):
# 获取页面标题并打印
title = await page.title()
print(f'正在滚动浏览器页面: {title}')
percentage_list = [i for i in range(5, 101, 2)]
for percentage in percentage_list:
# 计算页面的指定百分比高度
height = await page.evaluate("() => document.body.scrollHeight")
scroll_position = height * (percentage / 100)
# 跳转到指定的百分比位置
await page.evaluate(f"window.scrollTo({{top: {scroll_position}, behavior: 'smooth'}})")
await asyncio.sleep(0.5) # 使用异步 sleep
await page.evaluate("window.scrollTo({top: 0, behavior: 'smooth'})")
# endregion
# region 网页截图
@staticmethod
async def screenshot(page, datetime_file_path):
# 顺手截图
# 获取网页的 title
title = await page.title()
# 替换不合法的字符
cleaned_title = re.sub(r'[\\/:*?"<>|]', '', title)
# 如果 title 为空,使用默认名称
if not cleaned_title:
cleaned_title = "untitled"
# 拼接截图文件路径
screenshot_path = os.path.join(datetime_file_path, f"{cleaned_title}_{uuid.uuid4().hex[:6]}.png")
# 进行整页截图
await page.screenshot(path=screenshot_path, full_page=True)
print(f"截图已保存到: {screenshot_path}")
# endregion
# region 禁止网页显示图片
async def disable_images(self, page):
# 调用 JavaScript 函数阻止图片加载并隐藏图片
await page.evaluate('''() => {
function disableImages() {
// 阻止所有图片加载
document.querySelectorAll('img').forEach(img => {
img.src = ''; // 清空 src 属性
img.removeAttribute('srcset'); // 移除 srcset 属性(如果有)
});
// 隐藏所有图片
document.querySelectorAll('img').forEach(img => {
img.style.display = 'none';
});
}
disableImages(); // 调用函数
}''')
# endregion
# region 覆盖JavaScript的弹框方法,使其无效
async def disable_dialogs(self, page):
# 覆盖 JavaScript 的弹框方法,使其无效
await page.evaluate('''() => {
window.alert = () => {};
window.confirm = () => true; // confirm 默认返回 true
window.prompt = () => null; // prompt 默认返回 null
}''')
# endregion
# region AI处理数据
def process_data(self, result_text, prompt_words, role, ai_host):
# 整理获取的数据, 返回准备发送的数据
process_send = []
O = OllamaChat(ai_host)
if text_batch:
for k, v in result_text.items():
response_context = FREEAI().call_ai(v)
# response_context = O.call_ollama(role, v, prompt_words)
if response_context:
message = f'{k}\n{response_context}\n'
process_send.append(message)
else:
t = ''
for k, v in result_text.items():
t += f'{k}\n{v}\n'
response_context = O.call_ollama(role, t, prompt_words)
if response_context:
process_send.append(response_context)
return process_send
# endregion
# region 主函数
def main(self, config):
target_url_list = config['target_url_list']
prompt_words = config['prompt_words']
role = config['role']
use_browser = config['use_browser']
ai_host = config['ai_host']
message_bot_key = config['message_bot_key']
use_ai = config['use_ai']
# 获取所有的网页html内容
if use_browser:
result_text = asyncio.run(self.get_htmls_with_browser(target_url_list, datetime_file_path))
else:
result_text = asyncio.run(self.get_htmls(target_url_list))
# 保存文本
if result_text:
print(f'共获取 {len(result_text)} 个网址的数据')
self.save_to_txt(result_text, datetime_file_path)
else:
print('无数据, 程序退出')
exit(0)
print(f'{role}\n{prompt_words}')
if use_ai:
# 处理发送 text 数据
process_send = self.process_data(result_text, prompt_words, role, ai_host)
# 创建消息bot实例
bot = MatrixBot('message-bot', 'aaaAAA111!!!', message_bot_key)
# 发送消息
for process_text in process_send:
bot.send_message(process_text)
# endregion
if __name__ == "__main__":
ainews = AINEWS()
ainews.create_config_if_not_exists()
datetime_file_path = ainews.mkdir_save_data()
for key in key_list:
config = ainews.load_config(key)
target_url_list = config['target_url_list']
print(f'关键词 {key} 共有 {len(target_url_list)} 个网址')
ainews.main(config)
print('done!')