# -*- coding: utf-8 -*- import os import re import xml.etree.ElementTree as ET from xml.etree.ElementTree import fromstring, ParseError import asyncio import httpx class OPMLParser: def __init__(self, file_path): """ 初始化OPML解析器 :param file_path: OPML文件路径 """ self.file_path = file_path self.data = None # 用于存储解析后的数据 def clean_string(self, input_string): """ 清除字符串中的非法字符和多余的空格。 合法字符包括字母、数字和下划线。 """ # 使用正则表达式替换非法字符为空字符串 cleaned_string = re.sub(r'[^\w]', '', input_string) return cleaned_string def parse(self): """ 解析OPML文件为字典,从body节点开始 """ tree = ET.parse(self.file_path) root = tree.getroot() # 找到body节点 body = root.find(".//body") if body is None: raise ValueError("OPML文件中未找到body节点!") self.data = self._parse_outline(body) result = [] for children in self.data['children']: for k, v in children.items(): if k == 'children': for d in v: result.append(d) return result def _parse_outline(self, element): """ 递归解析OPML中的outline元素 """ item = { "title": self.clean_string(element.get("text")) if element.get("text") else '', "xmlUrl": element.get("xmlUrl") } # 去除值为None的键 item = {k: v for k, v in item.items() if v is not None} # 如果有子元素,递归解析 children = [] for child in element: children.append(self._parse_outline(child)) if children: item["children"] = children return item def get_data(self): """ 获取解析后的数据 """ if self.data is None: raise ValueError("尚未解析数据,请先调用 parse 方法!") return self.data class GetNews: def __init__(self, parsed_data): """ 初始化 GetNews 类 :param parsed_data: OPMLParser 解析后的数据 """ self.parsed_data = parsed_data async def fetch_news(self, url): """ 异步请求单个 RSS 链接并解析 XML 数据 :param url: RSS 链接 :return: 解析后的新闻数据,请求失败或状态码非200时返回空列表 """ try: async with httpx.AsyncClient() as client: response = await client.get(url) if response.status_code != 200: return [] # 如果状态码不是200,直接返回空列表 xml_content = response.text try: root = fromstring(xml_content) items = root.findall(".//item") news_list = [] for item in items: title = self.clean_text(item.find("title").text) if item.find("title") is not None else "无标题" link = self.clean_text(item.find("link").text) if item.find("link") is not None else "无链接" description = self.clean_text(item.find("description").text) if item.find( "description") is not None else "无描述" news_list.append({ "title": title, "link": link, "description": description }) return news_list except ParseError: return [] # XML 解析失败时返回空列表 except httpx.RequestError: return [] # 请求失败时返回空列表 def clean_text(self, text): """ 清洗文本,去除HTML标签和特殊字符,返回纯文本 """ if not text: return "" # 去除HTML标签 clean_text = re.sub(r'<.*?>', '', text) # 去除多余的空格和换行符 clean_text = re.sub(r'\s+', ' ', clean_text).strip() return clean_text async def get_all_news(self): """ 异步获取所有 RSS 链接的新闻数据 :return: 所有新闻数据的列表 """ tasks = [] for data in self.parsed_data: url = data.get("xmlUrl") if url: tasks.append(self.fetch_news(url)) results = await asyncio.gather(*tasks) return results class SearchByKeyword: def __init__(self, data): self.data = data def search(self, keyword): result = {} for item in self.data: if keyword.lower() in item['title'].lower(): if keyword.lower() not in result: result[keyword] = [] result[keyword].append(item) return result # 使用示例 if __name__ == "__main__": opml_file_path = "read_news.opml" opml_file_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), opml_file_path) if not os.path.exists(opml_file_path): print(f"文件 {opml_file_path} 不存在!") exit(1) parser = OPMLParser(opml_file_path) parsed_data = parser.parse() print(f'一共有 {len(parsed_data)} 个订阅源') get_news = GetNews(parsed_data) # 异步获取所有新闻数据 loop = asyncio.get_event_loop() all_news = loop.run_until_complete(get_news.get_all_news()) valid_data = [] for news_list in all_news: if news_list: for news in news_list: valid_data.append(news) S = SearchByKeyword(valid_data) result = S.search('deepseek') for keyword, item in result.items(): print(f'关键词 {keyword} 的新闻有:{len(item)} 条') for news in item: print(f'标题:{news["title"]}') print(f'链接:{news["link"]}') print('-' * 200)