You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
AutoInfo/base/base_news_data_collation.py

233 lines
9.1 KiB

'''
每日从 mongo 数据库, 做新闻汇总,发送到邮箱
'''
import os
import sys
sys.path.append(os.path.join(os.path.abspath(__file__).split('AutoInfo')[0] + 'AutoInfo'))
from pymongo import MongoClient
from datetime import timedelta
import re
from utils.utils import *
config_json = LoadConfig().load_config()
base_project = LoadConfig().get_base_path()
PROJECT_NAME = config_json.get('PROJECT_NAME')
DB_USER = config_json.get('DB_USER')
DB_PASSWORD = config_json.get('DB_PASSWORD')
DB_IP = config_json.get('DB_IP')
DB_PORT = config_json.get('DB_PORT')
MAIL_HOST = config_json.get('MAIL_HOST')
MAIL_USER = config_json.get('MAIL_USER')
MAIL_PASS = config_json.get('MAIL_PASS')
MAIL_SENDER = config_json.get('MAIL_SENDER')
MAIL_RECEIVERS = config_json.get('MAIL_RECEIVERS')
DB_NAME = config_json.get('DB_NAME') # 确保配置文件中有这个键
MONGO_LINK = f'mongodb://{DB_USER}:{DB_PASSWORD}@{DB_IP}:{DB_PORT}/'.format(**config_json)
now_day = datetime.now().strftime('%Y-%m-%d') # 获取今天的日期
filter_days = config_json.get('FILTER_DAYS')
filter_keys = config_json.get('FILTER_KEYS')
filter_switch = True
class NewsDataCollation(object):
def __init__(self):
# 第三方 SMTP 服务
self.mail_host = MAIL_HOST # 设置服务器
self.mail_user = MAIL_USER # 用户名
self.mail_pass = MAIL_PASS # 口令
self.sender = MAIL_SENDER
self.receivers = [MAIL_RECEIVERS]
self.processed_data = []
def load_data(self):
processed_data = []
# 读取数据
print('程序正在读取数据')
client = MongoClient(MONGO_LINK)
db = client['NEWS']
# 根据 self.days 获取日期范围
start_date = (datetime.now() - timedelta(days=filter_days - 1)).strftime('%Y-%m-%d')
end_date = datetime.now().strftime('%Y-%m-%d')
# 构造查询条件,匹配日期范围内的日期
query = {
"create_datetime": {
"$regex": f"^{start_date}|{end_date}",
"$options": "i" # 使用不区分大小写的匹配
}
}
# 遍历数据库中的所有集合
for collection_name in db.list_collection_names():
print(collection_name)
collection = db[collection_name]
cursor = collection.find(query)
for document in cursor:
if not document.get('title'):
continue
# 检查 'repush_times' 字段是否存在,如果不存在则默认为 5
repush_times = document.get('repush_times', 5)
# 减少 repush_times 的值
new_repush_times = repush_times - 1
# 更新数据库中的 repush_times 字段
collection.update_one(
{"_id": document['_id']}, # 假设文档中有 _id 字段作为唯一标识
{"$set": {"repush_times": new_repush_times}}
)
data = self.process_data(document)
if data:
processed_data.append(data)
# 关闭MongoDB连接
client.close()
return processed_data
def process_data(self, document):
# 处理数据
data = {
"title": document.get('title') or '',
"context": document.get('context') or '',
"source_url": document.get('source_url') or '',
'link': document.get('link') or '',
"article_type": document.get('article_type') or '',
"article_source": document.get('article_source') or '',
"img_url": document.get('img_url') or '',
'keyword': document.get('keyword') or '',
"posted_date": document.get('posted_date') or '',
"create_time": document.get('create_time') or '',
"create_datetime": document.get('create_datetime') or '',
"repush_times": document.get('repush_times', 5) - 1
}
data['title'] = self.clean_string(data['title'], 'title')
data['context'] = self.clean_string(data['context'], 'context')
return data
def clean_string(self, input_string, text_type):
# 清除 title 和 context 中的换行符和制表符
if not isinstance(input_string, str):
return ''
# 清除所有空白字符(包括空格、制表符、换行符等)
cleaned_string = re.sub(r'\s+', '', input_string)
if len(cleaned_string) > 100:
cleaned_string = cleaned_string[:100] + '...'
if text_type == 'context':
pass
return cleaned_string
def send_email(self, processed_data):
# 发送邮件
print('准备发送邮件')
subject = '新闻汇总sub'
title = '新闻汇总title'
text = '********************************************************\n'
for data in processed_data:
text += '标题: {}\n'.format(data['title'])
text += '正文: {}\n'.format(data['context'])
text += '文章地址: {}\n'.format(data['link'])
text += '类型: {}\n'.format(data['article_type'])
text += '板块: {}\n'.format(data['article_source'])
text += '文章时间: {}\n'.format(data['posted_date'])
text += '获取时间: {}\n'.format(data['create_datetime'])
text += '********************************************************\n\n'
message = MIMEText(text, 'plain', 'utf-8')
message['From'] = Header(title, 'utf-8')
message['To'] = Header("auto", 'utf-8')
message['Subject'] = Header(subject, 'utf-8')
try:
smtpObj = smtplib.SMTP_SSL(self.mail_host)
smtpObj.login(self.mail_user, self.mail_pass)
smtpObj.sendmail(self.sender, self.receivers, message.as_string())
print("邮件发送成功")
except smtplib.SMTPException as e:
print("Error: 无法发送邮件", e)
def send_email_with_keyword(self, series, keys, processed_data):
process_send_data = {}
keys = keys.split('|')
have_data_keys = []
for key in keys:
# print(f'通过关键字: {key} 过滤') # 用来调试 key 是否正确
for data in processed_data:
if key in data['title'] or key in data['context']:
# 如果数据里面无 keyword, 用当前 key 替换一下
if not data.get('keyword'):
data['keyword'] = key
if series not in process_send_data:
process_send_data[series] = [data]
else:
process_send_data[series].append(data)
# 储存一下有数据的 key, 输出用
have_data_keys.append(key)
if process_send_data:
print('{}系列, 以下关键字有数据\n{}'.format(series, list(set(have_data_keys))))
# 发送邮件
print('程序正在准备发送邮件的数据')
for key in process_send_data:
subject = '新闻汇总sub - {}'.format(series)
title = '新闻汇总title - {}'.format(series)
text = '********************************************************\n'
for data in process_send_data[key]:
text += '标题: {}\n'.format(data['title'])
text += '正文: {}\n'.format(data['context'])
text += '文章地址: {}\n'.format(data['link'])
text += '类型: {}\n'.format(data['article_type'])
text += '板块: {}\n'.format(data['article_source'])
text += '关键词: {}\n'.format(key)
text += '文章时间: {}\n'.format(data['posted_date'])
text += '获取时间: {}\n'.format(data['create_datetime'])
text += '********************************************************\n\n'
message = MIMEText(text, 'plain', 'utf-8')
message['From'] = Header(title, 'utf-8')
message['To'] = Header("auto", 'utf-8')
message['Subject'] = Header(subject, 'utf-8')
try:
smtpObj = smtplib.SMTP_SSL(self.mail_host)
smtpObj.login(self.mail_user, self.mail_pass)
smtpObj.sendmail(self.sender, self.receivers, message.as_string())
print("关键字: {} 的邮件发送成功".format(series))
except smtplib.SMTPException as e:
print("Error: 无法发送邮件", e)
def main(self):
# 加载指定天数的所有数据
processed_data = self.load_data()
# 如果无数据, 则退出
if not processed_data:
print("没有找到任何数据")
exit(0)
# 发送一次所有数据的邮件
# self.send_email(processed_data)
# # 这里是通过关键词过滤然后再发送邮件
if filter_switch and filter_keys:
for series, keys in filter_keys.items():
self.send_email_with_keyword(series, keys, processed_data)
if __name__ == '__main__':
NewsDataCollation().main()