You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
198 lines
6.0 KiB
198 lines
6.0 KiB
# -*- coding: utf-8 -*-
|
|
|
|
import os
|
|
import re
|
|
import xml.etree.ElementTree as ET
|
|
from xml.etree.ElementTree import fromstring, ParseError
|
|
import asyncio
|
|
import httpx
|
|
|
|
|
|
class OPMLParser:
|
|
def __init__(self, file_path):
|
|
"""
|
|
初始化OPML解析器
|
|
:param file_path: OPML文件路径
|
|
"""
|
|
self.file_path = file_path
|
|
self.data = None # 用于存储解析后的数据
|
|
|
|
def clean_string(self, input_string):
|
|
"""
|
|
清除字符串中的非法字符和多余的空格。
|
|
合法字符包括字母、数字和下划线。
|
|
"""
|
|
# 使用正则表达式替换非法字符为空字符串
|
|
cleaned_string = re.sub(r'[^\w]', '', input_string)
|
|
return cleaned_string
|
|
|
|
def parse(self):
|
|
"""
|
|
解析OPML文件为字典,从body节点开始
|
|
"""
|
|
tree = ET.parse(self.file_path)
|
|
root = tree.getroot()
|
|
|
|
# 找到body节点
|
|
body = root.find(".//body")
|
|
if body is None:
|
|
raise ValueError("OPML文件中未找到body节点!")
|
|
|
|
self.data = self._parse_outline(body)
|
|
|
|
result = []
|
|
for children in self.data['children']:
|
|
for k, v in children.items():
|
|
if k == 'children':
|
|
for d in v:
|
|
result.append(d)
|
|
|
|
return result
|
|
|
|
def _parse_outline(self, element):
|
|
"""
|
|
递归解析OPML中的outline元素
|
|
"""
|
|
item = {
|
|
"title": self.clean_string(element.get("text")) if element.get("text") else '',
|
|
"xmlUrl": element.get("xmlUrl")
|
|
}
|
|
# 去除值为None的键
|
|
item = {k: v for k, v in item.items() if v is not None}
|
|
|
|
# 如果有子元素,递归解析
|
|
children = []
|
|
for child in element:
|
|
children.append(self._parse_outline(child))
|
|
if children:
|
|
item["children"] = children
|
|
|
|
return item
|
|
|
|
def get_data(self):
|
|
"""
|
|
获取解析后的数据
|
|
"""
|
|
if self.data is None:
|
|
raise ValueError("尚未解析数据,请先调用 parse 方法!")
|
|
return self.data
|
|
|
|
|
|
class GetNews:
|
|
def __init__(self, parsed_data):
|
|
"""
|
|
初始化 GetNews 类
|
|
:param parsed_data: OPMLParser 解析后的数据
|
|
"""
|
|
self.parsed_data = parsed_data
|
|
|
|
async def fetch_news(self, url):
|
|
"""
|
|
异步请求单个 RSS 链接并解析 XML 数据
|
|
:param url: RSS 链接
|
|
:return: 解析后的新闻数据,请求失败或状态码非200时返回空列表
|
|
"""
|
|
try:
|
|
async with httpx.AsyncClient() as client:
|
|
response = await client.get(url)
|
|
if response.status_code != 200:
|
|
return [] # 如果状态码不是200,直接返回空列表
|
|
|
|
xml_content = response.text
|
|
|
|
try:
|
|
root = fromstring(xml_content)
|
|
items = root.findall(".//item")
|
|
news_list = []
|
|
for item in items:
|
|
title = self.clean_text(item.find("title").text) if item.find("title") is not None else "无标题"
|
|
link = self.clean_text(item.find("link").text) if item.find("link") is not None else "无链接"
|
|
description = self.clean_text(item.find("description").text) if item.find(
|
|
"description") is not None else "无描述"
|
|
news_list.append({
|
|
"title": title,
|
|
"link": link,
|
|
"description": description
|
|
})
|
|
return news_list
|
|
except ParseError:
|
|
return [] # XML 解析失败时返回空列表
|
|
except httpx.RequestError:
|
|
return [] # 请求失败时返回空列表
|
|
|
|
def clean_text(self, text):
|
|
"""
|
|
清洗文本,去除HTML标签和特殊字符,返回纯文本
|
|
"""
|
|
if not text:
|
|
return ""
|
|
# 去除HTML标签
|
|
clean_text = re.sub(r'<.*?>', '', text)
|
|
# 去除多余的空格和换行符
|
|
clean_text = re.sub(r'\s+', ' ', clean_text).strip()
|
|
return clean_text
|
|
|
|
async def get_all_news(self):
|
|
"""
|
|
异步获取所有 RSS 链接的新闻数据
|
|
:return: 所有新闻数据的列表
|
|
"""
|
|
tasks = []
|
|
for data in self.parsed_data:
|
|
url = data.get("xmlUrl")
|
|
if url:
|
|
tasks.append(self.fetch_news(url))
|
|
|
|
results = await asyncio.gather(*tasks)
|
|
return results
|
|
|
|
|
|
class SearchByKeyword:
|
|
def __init__(self, data):
|
|
self.data = data
|
|
|
|
def search(self, keyword):
|
|
result = {}
|
|
for item in self.data:
|
|
if keyword.lower() in item['title'].lower():
|
|
if keyword.lower() not in result:
|
|
result[keyword] = []
|
|
result[keyword].append(item)
|
|
return result
|
|
|
|
|
|
# 使用示例
|
|
if __name__ == "__main__":
|
|
opml_file_path = "read_news.opml"
|
|
opml_file_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), opml_file_path)
|
|
|
|
if not os.path.exists(opml_file_path):
|
|
print(f"文件 {opml_file_path} 不存在!")
|
|
exit(1)
|
|
|
|
parser = OPMLParser(opml_file_path)
|
|
parsed_data = parser.parse()
|
|
|
|
print(f'一共有 {len(parsed_data)} 个订阅源')
|
|
|
|
get_news = GetNews(parsed_data)
|
|
|
|
# 异步获取所有新闻数据
|
|
loop = asyncio.get_event_loop()
|
|
all_news = loop.run_until_complete(get_news.get_all_news())
|
|
|
|
valid_data = []
|
|
|
|
for news_list in all_news:
|
|
if news_list:
|
|
for news in news_list:
|
|
valid_data.append(news)
|
|
|
|
S = SearchByKeyword(valid_data)
|
|
result = S.search('deepseek')
|
|
for keyword, item in result.items():
|
|
print(f'关键词 {keyword} 的新闻有:{len(item)} 条')
|
|
for news in item:
|
|
print(f'标题:{news["title"]}')
|
|
print(f'链接:{news["link"]}')
|
|
print('-' * 200)
|
|
|