You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
AutoInfo/to_email/anyknew.py

153 lines
6.2 KiB

# -*- coding: utf-8 -*-
import os
import sys
sys.path.append(os.path.join(os.path.abspath(__file__).split('AutoInfo')[0] + 'AutoInfo'))
from utils.utils import *
config_json = LoadConfig().load_config()
DEFAULT_RE_PUSH_TIMES = config_json['DEFAULT_RE_PUSH_TIMES']
class HotNews():
def __init__(self):
self.base_url = 'https://www.anyknew.com/go/'
self.email_subject = '聚合新闻'
self.email_title = 'Anyknew'
self.email_text = '获取数据时间:\n{0}\n{1}\n\n\n\n'.format(datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
('-' * 90))
self.logs_handle = LogsHandle()
self.now_day = time.strftime('%Y-%m-%d', time.localtime())
self.db = 'NEWS'
self.collection = 'Anyknew_info'
self.targets = {
'universal': 'https://www.anyknew.com/api/v1/cats/universal',
'finance': 'https://www.anyknew.com/api/v1/cats/aam',
'science': 'https://www.anyknew.com/api/v1/cats/st',
'life': 'https://www.anyknew.com/api/v1/cats/life',
'binary': 'https://www.anyknew.com/api/v1/cats/binary'
}
self.temp_datas = []
def main(self):
self.logs_handle.logs_write('聚合新闻', '任务开始', 'start', False)
resp_data = self.req()
if resp_data:
self.save_to_mongo(resp_data)
if self.temp_datas:
print('准备发送消息')
self.send_to_gotify()
else:
print('无新数据')
else:
self.logs_handle.logs_write('聚合新闻', '获取数据为空', 'error', False)
return False
self.logs_handle.logs_write('聚合新闻', '任务完成', 'done', False)
def req(self):
print('开始请求数据')
result_data = []
for target in self.targets:
url = self.targets[target]
try:
resp = httpx.get(url=url)
except Exception as e:
print("请求出错{}, \nurl: {}".format(e, url))
time.sleep(20)
continue
resp_json = resp.json()
data = resp_json.setdefault('data')
cat = data.setdefault('cat')
sites = cat.setdefault('sites')
for site in sites:
site_name = site.setdefault('site')
subs = site.setdefault('subs')
target_and_site = '{}-{}'.format(target, site_name)
for items in subs:
for item in items:
if item == 'items':
detail = items['items']
for d in detail:
if target == 'universal':
tag = 'Anyknew - 综合'
elif target == 'finance':
tag = 'Anyknew - 金融'
elif target == 'science':
tag = 'Anyknew - 科学'
elif target == 'life':
tag = 'Anyknew - 生活'
elif target == 'binary':
tag = 'Anyknew - 二进制'
else:
tag = 'Anyknew'
result_data.append({
"title": d.get('title') or '',
"context": d.get('more') or '',
"source_url": url,
'link': self.base_url + (str(d.get('iid')) or ''),
"article_type": target_and_site,
"article_source": tag,
"img_url": '',
'keyword': '',
"posted_date": d.get('add_date') or '',
"create_time": int(time.time()),
"create_datetime": datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
"repush_times": DEFAULT_RE_PUSH_TIMES
})
print('已获取数据')
return result_data
def save_to_mongo(self, source_data):
print(f'开始处理Anyknew数据')
mongo = MongoHandle(db=self.db, collection=self.collection, del_db=False, del_collection=False, auto_remove=0)
for data_to_insert in source_data:
try:
# 检查数据库中是否存在匹配的文档
filter_criteria = {'title': data_to_insert.get('title', '')} # 确保 title 字段有值
count = mongo.collection.count_documents(filter_criteria)
if count == 0:
# 如果没有找到匹配的文档,插入新文档
result = mongo.collection.insert_one(data_to_insert)
self.temp_datas.append(data_to_insert)
except TypeError as te:
print('\n%s' % te)
self.logs_handle.logs_write('聚合新闻', '写入数据库报错: %s' % te, 'error', False)
return 0
print(f'Anyknew数据处理')
def send_to_gotify(self):
text = '****************************************\n'
for data in self.temp_datas:
text += '标题: {}\n'.format(data['title'])
text += '正文: {}\n'.format(data['context'])
text += '文章地址: {}\n'.format(data['link'])
text += '类型: {}\n'.format(data['article_type'])
text += '板块: {}\n'.format(data['article_source'])
text += '文章时间: {}\n'.format(data['posted_date'])
text += '获取时间: {}\n'.format(data['create_datetime'])
text += '***********************************\n\n'
title = 'Anyknew新闻 - ' + str(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
sub = 'Anyknew新闻'
SendEmail(subject=sub, title=title, text=text).send()
# GotifyNotifier(title=title, message=text, token_name='news').send_message()
if __name__ == '__main__':
HotNews().main()