You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
auto/spider/news_get_hello_github.py

147 lines
5.8 KiB

# -*- coding: utf-8 -*-
'''
Hello Github
'''
import os
import sys
import time
from datetime import datetime
import httpx
sys.path.append(os.path.join(os.path.abspath(__file__).split('auto')[0] + 'auto'))
from utils.utils_mongo_handle import MongoHandle
from utils.utils_logs_handle import LogsHandle
from utils.utils_send_email import SendEmail
from base.base_load_config import load_config
config_json = load_config()
DEFAULT_RE_PUSH_TIMES = config_json['DEFAULT_RE_PUSH_TIMES']
class HelloGithub(object):
def __init__(self):
self.logs_handle = LogsHandle()
self.now_day = time.strftime('%Y-%m-%d', time.localtime())
self.base_url = 'https://api.hellogithub.com/v1/?sort_by=last&tid=&page={}'
self.headers = {
'User-Agent': 'Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 6.1; WOW64; Trident/4.0; SLCC2; Media Center PC 6.0; InfoPath.2; MS-RTC LM 8'
}
self.db = 'NEWS'
self.collection = 'HelloGithub_info'
self.source_url = 'https://hellogithub.com/repository/'
self.send_email_datas = []
self.send_email_now = 0
def main(self):
self.logs_handle.logs_write('HelloGithub', '开始获取 HelloGithub 数据', 'start', False)
targets = ['featured']
response_datas = []
for target in targets:
response_data = self.req(target)
response_datas += response_data
if response_datas:
self.save_to_mongo(response_datas)
else:
self.logs_handle.logs_write('HelloGithub', '获取 HelloGithub 数据失败', 'error', False)
self.logs_handle.logs_write('HelloGithub', 'HelloGithub 数据获取完成', 'done', False)
print('获取 HelloGithub 数据 done')
if self.send_email_now:
if self.send_email_datas:
self.send_to_email()
else:
print('没有新数据, 不发送邮件')
def req(self, target):
print('开始获取 HelloGithub {} 数据'.format(target))
response_data = []
for i in range(1, 5):
url = 'https://api.hellogithub.com/v1/?sort_by={}&tid=&page={}'.format(target, i)
try:
response = httpx.get(url=url, headers=self.headers)
except Exception as e:
print("请求出错{}, \nurl: {}".format(e, url))
continue
if response.status_code != 200:
print(
'获取 HelloGithub {} 数据, 状态码: {}, 程序退出\n检查目标地址: https://api.hellogithub.com/v1/?sort_by={}&tid=&page={}'.format(
target, response.status_code, target, i))
self.logs_handle.logs_write('HelloGithub', '请求失败, 状态码: %s' % response.status_code, 'error',
False)
exit(0)
json_data = response.json()
for d in json_data.setdefault('data'):
response_data.append({
"title": d.setdefault('title', ''),
"context": '---'.join([d.setdefault('summary', ''), d.setdefault('description', '')]),
"source_url": 'https://hellogithub.com',
'link': self.source_url + d.setdefault('item_id'),
"article_type": '',
"article_source": target,
"img_url": '',
'keyword': '',
"posted_date": d.setdefault('updated_at'),
"create_time": int(time.time()),
"create_datetime": datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
"repush_times": DEFAULT_RE_PUSH_TIMES
})
if response_data:
return response_data
else:
self.logs_handle.logs_write('HelloGithub', '获取数据失败', 'error', False)
def save_to_mongo(self, data):
print(f'开始储存 HelloGithub 数据')
for data_to_insert in data:
mongo = MongoHandle(db=self.db, collection=self.collection, del_db=False, del_collection=False,
auto_remove=0)
try:
# 检查数据库中是否存在匹配的文档
filter_criteria = {'title': data_to_insert.get('title', '')} # 确保 title 字段有值
count = mongo.collection.count_documents(filter_criteria)
if count == 0:
# 如果没有找到匹配的文档,插入新文档
result = mongo.collection.insert_one(data_to_insert)
# 准备发送邮件的数据
self.send_email_datas.append(data_to_insert)
except TypeError as te:
print('\n%s' % te)
self.logs_handle.logs_write('HelloGithub', '写入数据库报错: %s' % te, 'error', False)
return 0
print(f'处理 HelloGithub 数据完成', datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
def send_to_email(self):
title = 'HelloGithub - info'
subject = 'HelloGithub - info'
text = '********************************************************\n'
for data in self.send_email_datas:
text += '标题: {}\n'.format(data['title'])
text += '正文: {}\n'.format(data['context'])
text += '文章地址: {}\n'.format(data['source_url'])
text += '文章时间: {}\n'.format(data['posted_date'])
text += '获取时间: {}\n'.format(data['create_datetime'])
text += '********************************************************\n\n'
send_email = SendEmail(subject=subject, title=title, text=text)
send_email.send()
self.logs_handle.logs_write('HelloGithub', f'{title}-发送邮件完成', 'done', False)
if __name__ == "__main__":
HelloGithub().main()