import os, sys sys.path.append(os.path.join(os.path.abspath(__file__).split('AutoInfo')[0] + 'AutoInfo')) import asyncio from datetime import datetime import httpx from utils.utils import SendEmail url = "https://60s.erhe.link" headers = {"user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/142.0.0.0 Safari/537.36"} text = '' n = 0 api_list = [ "/v2/60s", "/v2/douyin", "/v2/rednote", "/v2/bili", "/v2/weibo", "/v2/baidu/hot", "/v2/baidu/tieba", "/v2/toutiao", "/v2/zhihu", "/v2/hacker-news/top", "/v2/hacker-news/new", "/v2/hacker-news/best" ] api_name_map = {api.replace('/v2/', '').replace('/', '-'): api for api in api_list} pending_to_send_data = {} async def fetch_api(client, api_name, api): source = api.replace('/v2/', '') source_name = source.replace('/', '-') print(f"Fetching {source_name}") try: response = await client.get(url + api, headers=headers, timeout=10.0) except Exception as e: print(f"Error fetching {source_name}: {str(e)}") return if response.status_code != 200: print(f"Error {response.status_code} for {source_name}") return data = response.json() data = data.get('data') if not data: return if type(data) == dict: news_list = data.get('news') created = data.get('created') if news_list: if api_name not in pending_to_send_data: pending_to_send_data[api_name] = [] for news in news_list: pending_to_send_data[api_name].append({ 'title': news.get('title'), 'active_time': created }) elif type(data) == list: for record in data: if api_name not in pending_to_send_data: pending_to_send_data[api_name] = [] pending_to_send_data[api_name].append({ 'title': record.get('title'), 'link': record.get('link'), 'active_time': record.get('active_time') }) async def get_api_data(api_name_map, concurrent_limit=6): # 将API列表分成批次,每批最多concurrent_limit个 api_items = list(api_name_map.items()) async with httpx.AsyncClient() as client: # 分批处理 for i in range(0, len(api_items), concurrent_limit): batch = api_items[i:i + concurrent_limit] # 创建并发的协程任务 tasks = [fetch_api(client, api_name, api) for api_name, api in batch] # 等待这一批任务完成 await asyncio.gather(*tasks, return_exceptions=True) # 批次之间等待3秒 if i + concurrent_limit < len(api_items): print(f"Waiting 3 seconds before next batch...") await asyncio.sleep(3) def generate_html_content(pending_to_send_data): """将新闻数据组装成HTML格式""" html_template = """ 每日热点新闻汇总

📰 每日热点新闻汇总

更新时间: {update_time}
{content}
""" # 生成内容部分 content_parts = [] total_news = 0 # 按平台名称排序,让显示更有序 sorted_sources = sorted(pending_to_send_data.keys()) for source in sorted_sources: news_list = pending_to_send_data[source] total_news += len(news_list) # 平台标题(美化显示名称) display_name = { '60s': '🐯 每日60秒', 'douyin': '🎵 抖音热榜', 'rednote': '📕 小红书', 'bili': '📺 B站热门', 'weibo': '🐧 微博热搜', 'baidu-hot': '🔍 百度热点', 'baidu-tieba': '💬 百度贴吧', 'toutiao': '📄 今日头条', 'zhihu': '📚 知乎热榜', 'hacker-news-top': '💻 Hacker News Top', 'hacker-news-new': '💻 Hacker News New', 'hacker-news-best': '💻 Hacker News Best' }.get(source, f'📊 {source}') source_html = f"""
{display_name} ({len(news_list)}条)
""" if news_list: news_items = [] for i, news in enumerate(news_list, 1): title = news.get('title', '无标题') link = news.get('link', '') active_time = news.get('active_time', '') # 格式化时间 if active_time: try: if isinstance(active_time, int): # 如果是时间戳 active_time = datetime.fromtimestamp(active_time).strftime('%Y-%m-%d %H:%M') else: # 如果是字符串时间 active_time = str(active_time) except: active_time = str(active_time) title_html = f'{title}' if link else title time_html = f'
{active_time}
' if active_time else '' news_item = f"""
  • {i}. {title_html}
    {time_html}
  • """ news_items.append(news_item) source_html += f'' else: source_html += '
    暂无数据
    ' source_html += '
    ' content_parts.append(source_html) # 组装完整HTML html_content = html_template.format( update_time=datetime.now().strftime('%Y-%m-%d %H:%M:%S'), content='\n'.join(content_parts), total_news=total_news ) return html_content def generate_text_content(pending_to_send_data): """生成纯文本版本的新闻内容""" text_parts = [] text_parts.append("📰 每日热点新闻汇总") text_parts.append(f"更新时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") text_parts.append("=" * 50) total_news = 0 # 按平台名称排序 sorted_sources = sorted(pending_to_send_data.keys()) for source in sorted_sources: news_list = pending_to_send_data[source] total_news += len(news_list) display_name = { '60s': '每日60秒', 'douyin': '抖音热榜', 'rednote': '小红书', 'bili': 'B站热门', 'weibo': '微博热搜', 'baidu-hot': '百度热点', 'baidu-tieba': '百度贴吧', 'toutiao': '今日头条', 'zhihu': '知乎热榜', 'hacker-news-top': 'Hacker News Top', 'hacker-news-new': 'Hacker News New', 'hacker-news-best': 'Hacker News Best' }.get(source, source) text_parts.append(f"\n【{display_name}】({len(news_list)}条)") for i, news in enumerate(news_list, 1): title = news.get('title', '无标题') link = news.get('link', '') active_time = news.get('active_time', '') time_str = f" ({active_time})" if active_time else "" link_str = f" 链接: {link}" if link else "" text_parts.append(f"{i}. {title}{time_str}\n{link_str}") text_parts.append(f"\n共 {total_news} 条热点") text_parts.append("数据来源于各大平台 • 自动生成") return '\n'.join(text_parts) # 在main函数中使用 async def main(): await get_api_data(api_name_map, concurrent_limit=6) # 生成HTML内容 html_content = generate_html_content(pending_to_send_data) # 如果你需要同时保留文本版本 text_content = generate_text_content(pending_to_send_data) print("HTML内容已生成,长度:", len(html_content)) print("文本内容已生成,长度:", len(text_content)) # 返回两种格式的内容,根据需要选择使用 return { 'html': html_content, 'text': text_content, 'data': pending_to_send_data } def send_email(text): title = '60S - ' + str(datetime.now().strftime('%Y-%m-%d %H:%M:%S')) sub = '60S' SendEmail(subject=sub, title=title, text=text).send() if __name__ == "__main__": result = asyncio.run(main()) # print(result['html']) # HTML内容 # print(result['text']) # 纯文本内容 # result['data'] 原始数据 # send_email(result['text']) print("执行完成!")