You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
auto/spider/news_get_news.py

159 lines
6.4 KiB

# -*- coding: utf-8 -*-
import time
import httpx
from datetime import datetime
import os
import sys
sys.path.append(os.path.join(os.path.abspath(__file__).split('auto')[0] + 'auto'))
from utils.utils_mongo_handle import MongoHandle
from utils.utils_logs_handle import LogsHandle
from utils.utils_send_email import SendEmail
from base.base_load_config import load_config
config_json = load_config()
DEFAULT_RE_PUSH_TIMES = config_json['DEFAULT_RE_PUSH_TIMES']
class HotNews():
def __init__(self):
self.base_url = 'https://www.anyknew.com/go/'
self.email_subject = '聚合新闻'
self.email_title = 'Anyknew'
self.email_text = '获取数据时间:\n{0}\n{1}\n\n\n\n'.format(datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
('-' * 90))
self.logs_handle = LogsHandle()
self.now_day = time.strftime('%Y-%m-%d', time.localtime())
self.db = 'NEWS'
self.collection = 'Anyknew_info'
self.targets = {
'universal': 'https://www.anyknew.com/api/v1/cats/universal',
'finance': 'https://www.anyknew.com/api/v1/cats/aam',
'science': 'https://www.anyknew.com/api/v1/cats/st',
'life': 'https://www.anyknew.com/api/v1/cats/life',
'binary': 'https://www.anyknew.com/api/v1/cats/binary'
}
self.send_email_datas = []
self.send_email_now = 0
def main(self):
self.logs_handle.logs_write('聚合新闻', '任务开始', 'start', False)
resp_data = self.req()
if resp_data:
self.save_to_mongo(resp_data)
if self.send_email_now:
if self.send_email_datas:
print('准备发送邮件')
self.send_to_email()
else:
print('无新数据')
else:
self.logs_handle.logs_write('聚合新闻', '获取数据为空', 'error', False)
return False
self.logs_handle.logs_write('聚合新闻', '任务完成', 'done', False)
def req(self):
print('开始请求数据')
result_data = []
for target in self.targets:
url = self.targets[target]
try:
resp = httpx.get(url=url)
except Exception as e:
print("请求出错{}, \nurl: {}".format(e, url))
time.sleep(20)
continue
resp_json = resp.json()
data = resp_json.setdefault('data')
cat = data.setdefault('cat')
sites = cat.setdefault('sites')
for site in sites:
site_name = site.setdefault('site')
subs = site.setdefault('subs')
target_and_site = '{}-{}'.format(target, site_name)
for items in subs:
for item in items:
if item == 'items':
detail = items['items']
for d in detail:
if target == 'universal':
tag = 'Anyknew - 综合'
elif target == 'finance':
tag = 'Anyknew - 金融'
elif target == 'science':
tag = 'Anyknew - 科学'
elif target == 'life':
tag = 'Anyknew - 生活'
elif target == 'binary':
tag = 'Anyknew - 二进制'
else:
tag = 'Anyknew'
result_data.append({
"title": d.get('title') or '',
"context": d.get('more') or '',
"source_url": url,
'link': self.base_url + (str(d.get('iid')) or ''),
"article_type": target_and_site,
"article_source": tag,
"img_url": '',
'keyword': '',
"posted_date": d.get('add_date') or '',
"create_time": int(time.time()),
"create_datetime": datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
"repush_times": DEFAULT_RE_PUSH_TIMES
})
print('已获取数据')
return result_data
def save_to_mongo(self, source_data):
print(f'开始处理Anyknew数据')
mongo = MongoHandle(db=self.db, collection=self.collection, del_db=False, del_collection=False, auto_remove=0)
for data_to_insert in source_data:
try:
# 检查数据库中是否存在匹配的文档
filter_criteria = {'title': data_to_insert.get('title', '')} # 确保 title 字段有值
count = mongo.collection.count_documents(filter_criteria)
if count == 0:
# 如果没有找到匹配的文档,插入新文档
result = mongo.collection.insert_one(data_to_insert)
self.send_email_datas.append(data_to_insert)
except TypeError as te:
print('\n%s' % te)
self.logs_handle.logs_write('聚合新闻', '写入数据库报错: %s' % te, 'error', False)
return 0
print(f'Anyknew数据处理')
def send_to_email(self):
text = '********************************************************\n'
for data in self.send_email_datas:
text += '标题: {}\n'.format(data['title'])
text += '正文: {}\n'.format(data['context'])
text += '文章地址: {}\n'.format(data['link'])
text += '类型: {}\n'.format(data['article_type'])
text += '板块: {}\n'.format(data['article_source'])
text += '文章时间: {}\n'.format(data['posted_date'])
text += '获取时间: {}\n'.format(data['create_datetime'])
text += '********************************************************\n\n'
send_email = SendEmail(subject='Anyknew', title='Anyknew_info', text=text)
send_email.send()
print('邮件已发送')
if __name__ == '__main__':
HotNews().main()