# -*- coding: utf-8 -*- import os import sys sys.path.append(os.path.join(os.path.abspath(__file__).split('AutoInfo')[0] + 'AutoInfo')) from utils.utils import * config_json = LoadConfig().load_config() DEFAULT_RE_PUSH_TIMES = config_json['DEFAULT_RE_PUSH_TIMES'] class HotNews(): def __init__(self): self.base_url = 'https://www.anyknew.com/go/' self.email_subject = '聚合新闻' self.email_title = 'Anyknew' self.email_text = '获取数据时间:\n{0}\n{1}\n\n\n\n'.format(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), ('-' * 90)) self.logs_handle = LogsHandle() self.now_day = time.strftime('%Y-%m-%d', time.localtime()) self.db = 'NEWS' self.collection = 'Anyknew_info' self.targets = { 'universal': 'https://www.anyknew.com/api/v1/cats/universal', 'finance': 'https://www.anyknew.com/api/v1/cats/aam', 'science': 'https://www.anyknew.com/api/v1/cats/st', 'life': 'https://www.anyknew.com/api/v1/cats/life', 'binary': 'https://www.anyknew.com/api/v1/cats/binary' } self.temp_datas = [] def main(self): self.logs_handle.logs_write('聚合新闻', '任务开始', 'start', False) resp_data = self.req() if resp_data: self.save_to_mongo(resp_data) if self.temp_datas: print('准备发送消息') self.send_to_gotify() else: print('无新数据') else: self.logs_handle.logs_write('聚合新闻', '获取数据为空', 'error', False) return False self.logs_handle.logs_write('聚合新闻', '任务完成', 'done', False) def req(self): print('开始请求数据') result_data = [] for target in self.targets: url = self.targets[target] try: resp = httpx.get(url=url) except Exception as e: print("请求出错{}, \nurl: {}".format(e, url)) time.sleep(20) continue resp_json = resp.json() data = resp_json.setdefault('data') cat = data.setdefault('cat') sites = cat.setdefault('sites') for site in sites: site_name = site.setdefault('site') subs = site.setdefault('subs') target_and_site = '{}-{}'.format(target, site_name) for items in subs: for item in items: if item == 'items': detail = items['items'] for d in detail: if target == 'universal': tag = 'Anyknew - 综合' elif target == 'finance': tag = 'Anyknew - 金融' elif target == 'science': tag = 'Anyknew - 科学' elif target == 'life': tag = 'Anyknew - 生活' elif target == 'binary': tag = 'Anyknew - 二进制' else: tag = 'Anyknew' result_data.append({ "title": d.get('title') or '', "context": d.get('more') or '', "source_url": url, 'link': self.base_url + (str(d.get('iid')) or ''), "article_type": target_and_site, "article_source": tag, "img_url": '', 'keyword': '', "posted_date": d.get('add_date') or '', "create_time": int(time.time()), "create_datetime": datetime.now().strftime('%Y-%m-%d %H:%M:%S'), "repush_times": DEFAULT_RE_PUSH_TIMES }) print('已获取数据') return result_data def save_to_mongo(self, source_data): print(f'开始处理Anyknew数据') mongo = MongoHandle(db=self.db, collection=self.collection, del_db=False, del_collection=False, auto_remove=0) for data_to_insert in source_data: try: # 检查数据库中是否存在匹配的文档 filter_criteria = {'title': data_to_insert.get('title', '')} # 确保 title 字段有值 count = mongo.collection.count_documents(filter_criteria) if count == 0: # 如果没有找到匹配的文档,插入新文档 result = mongo.collection.insert_one(data_to_insert) self.temp_datas.append(data_to_insert) except TypeError as te: print('\n%s' % te) self.logs_handle.logs_write('聚合新闻', '写入数据库报错: %s' % te, 'error', False) return 0 print(f'Anyknew数据处理') def send_to_gotify(self): text = '****************************************\n' for data in self.temp_datas: text += '标题: {}\n'.format(data['title']) text += '正文: {}\n'.format(data['context']) text += '文章地址: {}\n'.format(data['link']) text += '类型: {}\n'.format(data['article_type']) text += '板块: {}\n'.format(data['article_source']) text += '文章时间: {}\n'.format(data['posted_date']) text += '获取时间: {}\n'.format(data['create_datetime']) text += '***********************************\n\n' title = 'Anyknew新闻 - ' + str(datetime.now().strftime('%Y-%m-%d %H:%M:%S')) sub = 'Anyknew新闻' SendEmail(subject=sub, title=title, text=text).send() # GotifyNotifier(title=title, message=text, token_name='news').send_message() if __name__ == '__main__': HotNews().main()