# -*- coding: utf-8 -*- import time import httpx from datetime import datetime import os import sys sys.path.append(os.path.join(os.path.abspath(__file__).split('auto')[0] + 'auto')) from utils.utils_mongo_handle import MongoHandle from utils.utils_logs_handle import LogsHandle from utils.utils_send_email import SendEmail from base.base_load_config import load_config config_json = load_config() DEFAULT_RE_PUSH_TIMES = config_json['DEFAULT_RE_PUSH_TIMES'] class HotNews(): def __init__(self): self.base_url = 'https://www.anyknew.com/go/' self.email_subject = '聚合新闻' self.email_title = 'Anyknew' self.email_text = '获取数据时间:\n{0}\n{1}\n\n\n\n'.format(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), ('-' * 90)) self.logs_handle = LogsHandle() self.now_day = time.strftime('%Y-%m-%d', time.localtime()) self.db = 'NEWS' self.collection = 'Anyknew_info' self.targets = { 'universal': 'https://www.anyknew.com/api/v1/cats/universal', 'finance': 'https://www.anyknew.com/api/v1/cats/aam', 'science': 'https://www.anyknew.com/api/v1/cats/st', 'life': 'https://www.anyknew.com/api/v1/cats/life', 'binary': 'https://www.anyknew.com/api/v1/cats/binary' } self.send_email_datas = [] self.send_email_now = 0 def main(self): self.logs_handle.logs_write('聚合新闻', '任务开始', 'start', False) resp_data = self.req() if resp_data: self.save_to_mongo(resp_data) if self.send_email_now: if self.send_email_datas: print('准备发送邮件') self.send_to_email() else: print('无新数据') else: self.logs_handle.logs_write('聚合新闻', '获取数据为空', 'error', False) return False self.logs_handle.logs_write('聚合新闻', '任务完成', 'done', False) def req(self): print('开始请求数据') result_data = [] for target in self.targets: url = self.targets[target] try: resp = httpx.get(url=url) except Exception as e: print("请求出错{}, \nurl: {}".format(e, url)) time.sleep(20) continue resp_json = resp.json() data = resp_json.setdefault('data') cat = data.setdefault('cat') sites = cat.setdefault('sites') for site in sites: site_name = site.setdefault('site') subs = site.setdefault('subs') target_and_site = '{}-{}'.format(target, site_name) for items in subs: for item in items: if item == 'items': detail = items['items'] for d in detail: if target == 'universal': tag = 'Anyknew - 综合' elif target == 'finance': tag = 'Anyknew - 金融' elif target == 'science': tag = 'Anyknew - 科学' elif target == 'life': tag = 'Anyknew - 生活' elif target == 'binary': tag = 'Anyknew - 二进制' else: tag = 'Anyknew' result_data.append({ "title": d.get('title') or '', "context": d.get('more') or '', "source_url": url, 'link': self.base_url + (str(d.get('iid')) or ''), "article_type": target_and_site, "article_source": tag, "img_url": '', 'keyword': '', "posted_date": d.get('add_date') or '', "create_time": int(time.time()), "create_datetime": datetime.now().strftime('%Y-%m-%d %H:%M:%S'), "repush_times": DEFAULT_RE_PUSH_TIMES }) print('已获取数据') return result_data def save_to_mongo(self, source_data): print(f'开始处理Anyknew数据') mongo = MongoHandle(db=self.db, collection=self.collection, del_db=False, del_collection=False, auto_remove=0) for data_to_insert in source_data: try: # 检查数据库中是否存在匹配的文档 filter_criteria = {'title': data_to_insert.get('title', '')} # 确保 title 字段有值 count = mongo.collection.count_documents(filter_criteria) if count == 0: # 如果没有找到匹配的文档,插入新文档 result = mongo.collection.insert_one(data_to_insert) self.send_email_datas.append(data_to_insert) except TypeError as te: print('\n%s' % te) self.logs_handle.logs_write('聚合新闻', '写入数据库报错: %s' % te, 'error', False) return 0 print(f'Anyknew数据处理') def send_to_email(self): text = '********************************************************\n' for data in self.send_email_datas: text += '标题: {}\n'.format(data['title']) text += '正文: {}\n'.format(data['context']) text += '文章地址: {}\n'.format(data['link']) text += '类型: {}\n'.format(data['article_type']) text += '板块: {}\n'.format(data['article_source']) text += '文章时间: {}\n'.format(data['posted_date']) text += '获取时间: {}\n'.format(data['create_datetime']) text += '********************************************************\n\n' send_email = SendEmail(subject='Anyknew', title='Anyknew_info', text=text) send_email.send() print('邮件已发送') if __name__ == '__main__': HotNews().main()