# -*- coding: utf-8 -*- ''' Hello Github ''' import os import sys import time from datetime import datetime import httpx sys.path.append(os.path.join(os.path.abspath(__file__).split('auto')[0] + 'auto')) from utils.utils_mongo_handle import MongoHandle from utils.utils_logs_handle import LogsHandle from utils.utils_send_email import SendEmail from base.base_load_config import load_config config_json = load_config() DEFAULT_RE_PUSH_TIMES = config_json['DEFAULT_RE_PUSH_TIMES'] class HelloGithub(object): def __init__(self): self.logs_handle = LogsHandle() self.now_day = time.strftime('%Y-%m-%d', time.localtime()) self.base_url = 'https://api.hellogithub.com/v1/?sort_by=last&tid=&page={}' self.headers = { 'User-Agent': 'Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 6.1; WOW64; Trident/4.0; SLCC2; Media Center PC 6.0; InfoPath.2; MS-RTC LM 8' } self.db = 'NEWS' self.collection = 'HelloGithub_info' self.source_url = 'https://hellogithub.com/repository/' self.send_email_datas = [] self.send_email_now = 0 def main(self): self.logs_handle.logs_write('HelloGithub', '开始获取 HelloGithub 数据', 'start', False) targets = ['featured'] response_datas = [] for target in targets: response_data = self.req(target) response_datas += response_data if response_datas: self.save_to_mongo(response_datas) else: self.logs_handle.logs_write('HelloGithub', '获取 HelloGithub 数据失败', 'error', False) self.logs_handle.logs_write('HelloGithub', 'HelloGithub 数据获取完成', 'done', False) print('获取 HelloGithub 数据 done') if self.send_email_now: if self.send_email_datas: self.send_to_email() else: print('没有新数据, 不发送邮件') def req(self, target): print('开始获取 HelloGithub {} 数据'.format(target)) response_data = [] for i in range(1, 5): url = 'https://api.hellogithub.com/v1/?sort_by={}&tid=&page={}'.format(target, i) try: response = httpx.get(url=url, headers=self.headers) except Exception as e: print("请求出错{}, \nurl: {}".format(e, url)) continue if response.status_code != 200: print( '获取 HelloGithub {} 数据, 状态码: {}, 程序退出\n检查目标地址: https://api.hellogithub.com/v1/?sort_by={}&tid=&page={}'.format( target, response.status_code, target, i)) self.logs_handle.logs_write('HelloGithub', '请求失败, 状态码: %s' % response.status_code, 'error', False) exit(0) json_data = response.json() for d in json_data.setdefault('data'): response_data.append({ "title": d.setdefault('title', ''), "context": '---'.join([d.setdefault('summary', ''), d.setdefault('description', '')]), "source_url": 'https://hellogithub.com', 'link': self.source_url + d.setdefault('item_id'), "article_type": '', "article_source": target, "img_url": '', 'keyword': '', "posted_date": d.setdefault('updated_at'), "create_time": int(time.time()), "create_datetime": datetime.now().strftime('%Y-%m-%d %H:%M:%S'), "repush_times": DEFAULT_RE_PUSH_TIMES }) if response_data: return response_data else: self.logs_handle.logs_write('HelloGithub', '获取数据失败', 'error', False) def save_to_mongo(self, data): print(f'开始储存 HelloGithub 数据') for data_to_insert in data: mongo = MongoHandle(db=self.db, collection=self.collection, del_db=False, del_collection=False, auto_remove=0) try: # 检查数据库中是否存在匹配的文档 filter_criteria = {'title': data_to_insert.get('title', '')} # 确保 title 字段有值 count = mongo.collection.count_documents(filter_criteria) if count == 0: # 如果没有找到匹配的文档,插入新文档 result = mongo.collection.insert_one(data_to_insert) # 准备发送邮件的数据 self.send_email_datas.append(data_to_insert) except TypeError as te: print('\n%s' % te) self.logs_handle.logs_write('HelloGithub', '写入数据库报错: %s' % te, 'error', False) return 0 print(f'处理 HelloGithub 数据完成', datetime.now().strftime('%Y-%m-%d %H:%M:%S')) def send_to_email(self): title = 'HelloGithub - info' subject = 'HelloGithub - info' text = '********************************************************\n' for data in self.send_email_datas: text += '标题: {}\n'.format(data['title']) text += '正文: {}\n'.format(data['context']) text += '文章地址: {}\n'.format(data['source_url']) text += '文章时间: {}\n'.format(data['posted_date']) text += '获取时间: {}\n'.format(data['create_datetime']) text += '********************************************************\n\n' send_email = SendEmail(subject=subject, title=title, text=text) send_email.send() self.logs_handle.logs_write('HelloGithub', f'{title}-发送邮件完成', 'done', False) if __name__ == "__main__": HelloGithub().main()