import os, sys
sys.path.append(os.path.join(os.path.abspath(__file__).split('AutoInfo')[0] + 'AutoInfo'))
import asyncio
from datetime import datetime
import httpx
from utils.utils import SendEmail
url = "https://60s.erhe.link"
headers = {"user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/142.0.0.0 Safari/537.36"}
text = ''
n = 0
api_list = [
"/v2/60s",
"/v2/douyin",
"/v2/rednote",
"/v2/bili",
"/v2/weibo",
"/v2/baidu/hot",
"/v2/baidu/tieba",
"/v2/toutiao",
"/v2/zhihu",
"/v2/hacker-news/top",
"/v2/hacker-news/new",
"/v2/hacker-news/best"
]
api_name_map = {api.replace('/v2/', '').replace('/', '-'): api for api in api_list}
pending_to_send_data = {}
async def fetch_api(client, api_name, api):
source = api.replace('/v2/', '')
source_name = source.replace('/', '-')
print(f"Fetching {source_name}")
try:
response = await client.get(url + api, headers=headers, timeout=10.0)
except Exception as e:
print(f"Error fetching {source_name}: {str(e)}")
return
if response.status_code != 200:
print(f"Error {response.status_code} for {source_name}")
return
data = response.json()
data = data.get('data')
if not data:
return
if type(data) == dict:
news_list = data.get('news')
created = data.get('created')
if news_list:
if api_name not in pending_to_send_data:
pending_to_send_data[api_name] = []
for news in news_list:
pending_to_send_data[api_name].append({
'title': news.get('title'),
'active_time': created
})
elif type(data) == list:
for record in data:
if api_name not in pending_to_send_data:
pending_to_send_data[api_name] = []
pending_to_send_data[api_name].append({
'title': record.get('title'),
'link': record.get('link'),
'active_time': record.get('active_time')
})
async def get_api_data(api_name_map, concurrent_limit=6):
# 将API列表分成批次,每批最多concurrent_limit个
api_items = list(api_name_map.items())
async with httpx.AsyncClient() as client:
# 分批处理
for i in range(0, len(api_items), concurrent_limit):
batch = api_items[i:i + concurrent_limit]
# 创建并发的协程任务
tasks = [fetch_api(client, api_name, api) for api_name, api in batch]
# 等待这一批任务完成
await asyncio.gather(*tasks, return_exceptions=True)
# 批次之间等待3秒
if i + concurrent_limit < len(api_items):
print(f"Waiting 3 seconds before next batch...")
await asyncio.sleep(3)
def generate_html_content(pending_to_send_data):
"""将新闻数据组装成HTML格式"""
html_template = """
每日热点新闻汇总
{content}
"""
# 生成内容部分
content_parts = []
total_news = 0
# 按平台名称排序,让显示更有序
sorted_sources = sorted(pending_to_send_data.keys())
for source in sorted_sources:
news_list = pending_to_send_data[source]
total_news += len(news_list)
# 平台标题(美化显示名称)
display_name = {
'60s': '🐯 每日60秒',
'douyin': '🎵 抖音热榜',
'rednote': '📕 小红书',
'bili': '📺 B站热门',
'weibo': '🐧 微博热搜',
'baidu-hot': '🔍 百度热点',
'baidu-tieba': '💬 百度贴吧',
'toutiao': '📄 今日头条',
'zhihu': '📚 知乎热榜',
'hacker-news-top': '💻 Hacker News Top',
'hacker-news-new': '💻 Hacker News New',
'hacker-news-best': '💻 Hacker News Best'
}.get(source, f'📊 {source}')
source_html = f"""
"""
if news_list:
news_items = []
for i, news in enumerate(news_list, 1):
title = news.get('title', '无标题')
link = news.get('link', '')
active_time = news.get('active_time', '')
# 格式化时间
if active_time:
try:
if isinstance(active_time, int):
# 如果是时间戳
active_time = datetime.fromtimestamp(active_time).strftime('%Y-%m-%d %H:%M')
else:
# 如果是字符串时间
active_time = str(active_time)
except:
active_time = str(active_time)
title_html = f'
{title}' if link else title
time_html = f'
{active_time}
' if active_time else ''
news_item = f"""
{i}. {title_html}
{time_html}
"""
news_items.append(news_item)
source_html += f'
'
else:
source_html += '
暂无数据
'
source_html += '
'
content_parts.append(source_html)
# 组装完整HTML
html_content = html_template.format(
update_time=datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
content='\n'.join(content_parts),
total_news=total_news
)
return html_content
def generate_text_content(pending_to_send_data):
"""生成纯文本版本的新闻内容"""
text_parts = []
text_parts.append("📰 每日热点新闻汇总")
text_parts.append(f"更新时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
text_parts.append("=" * 50)
total_news = 0
# 按平台名称排序
sorted_sources = sorted(pending_to_send_data.keys())
for source in sorted_sources:
news_list = pending_to_send_data[source]
total_news += len(news_list)
display_name = {
'60s': '每日60秒',
'douyin': '抖音热榜',
'rednote': '小红书',
'bili': 'B站热门',
'weibo': '微博热搜',
'baidu-hot': '百度热点',
'baidu-tieba': '百度贴吧',
'toutiao': '今日头条',
'zhihu': '知乎热榜',
'hacker-news-top': 'Hacker News Top',
'hacker-news-new': 'Hacker News New',
'hacker-news-best': 'Hacker News Best'
}.get(source, source)
text_parts.append(f"\n【{display_name}】({len(news_list)}条)")
for i, news in enumerate(news_list, 1):
title = news.get('title', '无标题')
link = news.get('link', '')
active_time = news.get('active_time', '')
time_str = f" ({active_time})" if active_time else ""
link_str = f" 链接: {link}" if link else ""
text_parts.append(f"{i}. {title}{time_str}\n{link_str}")
text_parts.append(f"\n共 {total_news} 条热点")
text_parts.append("数据来源于各大平台 • 自动生成")
return '\n'.join(text_parts)
# 在main函数中使用
async def main():
await get_api_data(api_name_map, concurrent_limit=6)
# 生成HTML内容
html_content = generate_html_content(pending_to_send_data)
# 如果你需要同时保留文本版本
text_content = generate_text_content(pending_to_send_data)
print("HTML内容已生成,长度:", len(html_content))
print("文本内容已生成,长度:", len(text_content))
# 返回两种格式的内容,根据需要选择使用
return {
'html': html_content,
'text': text_content,
'data': pending_to_send_data
}
# 执行
if __name__ == "__main__":
result = asyncio.run(main())
# print(result['html']) # HTML内容
# print(result['text']) # 是纯文本内容
# result['data'] 是原始数据
title = '60S - ' + str(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
sub = '60S'
SendEmail(subject=sub, title=title, text=result['text']).send()
print("执行完成!")