''' 每日从 mongo 数据库, 做新闻汇总,发送到邮箱 ''' import os import sys sys.path.append(os.path.join(os.path.abspath(__file__).split('AutoInfo')[0] + 'AutoInfo')) from pymongo import MongoClient from datetime import timedelta import re from utils.utils import * config_json = LoadConfig().load_config() base_project = LoadConfig().get_base_path() PROJECT_NAME = config_json.get('PROJECT_NAME') DB_USER = config_json.get('DB_USER') DB_PASSWORD = config_json.get('DB_PASSWORD') DB_IP = config_json.get('DB_IP') DB_PORT = config_json.get('DB_PORT') MAIL_HOST = config_json.get('MAIL_HOST') MAIL_USER = config_json.get('MAIL_USER') MAIL_PASS = config_json.get('MAIL_PASS') MAIL_SENDER = config_json.get('MAIL_SENDER') MAIL_RECEIVERS = config_json.get('MAIL_RECEIVERS') DB_NAME = config_json.get('DB_NAME') # 确保配置文件中有这个键 MONGO_LINK = f'mongodb://{DB_USER}:{DB_PASSWORD}@{DB_IP}:{DB_PORT}/'.format(**config_json) now_day = datetime.now().strftime('%Y-%m-%d') # 获取今天的日期 filter_days = config_json.get('FILTER_DAYS') filter_keys = config_json.get('FILTER_KEYS') filter_switch = True class NewsDataCollation(object): def __init__(self): # 第三方 SMTP 服务 self.mail_host = MAIL_HOST # 设置服务器 self.mail_user = MAIL_USER # 用户名 self.mail_pass = MAIL_PASS # 口令 self.sender = MAIL_SENDER self.receivers = [MAIL_RECEIVERS] self.processed_data = [] def load_data(self): processed_data = [] # 读取数据 print('程序正在读取数据') client = MongoClient(MONGO_LINK) db = client['NEWS'] # 根据 self.days 获取日期范围 start_date = (datetime.now() - timedelta(days=filter_days - 1)).strftime('%Y-%m-%d') end_date = datetime.now().strftime('%Y-%m-%d') # 构造查询条件,匹配日期范围内的日期 query = { "create_datetime": { "$regex": f"^{start_date}|{end_date}", "$options": "i" # 使用不区分大小写的匹配 } } # 遍历数据库中的所有集合 for collection_name in db.list_collection_names(): print(collection_name) collection = db[collection_name] cursor = collection.find(query) for document in cursor: if not document.get('title'): continue # 检查 'repush_times' 字段是否存在,如果不存在则默认为 5 repush_times = document.get('repush_times', 5) # 减少 repush_times 的值 new_repush_times = repush_times - 1 # 更新数据库中的 repush_times 字段 collection.update_one( {"_id": document['_id']}, # 假设文档中有 _id 字段作为唯一标识 {"$set": {"repush_times": new_repush_times}} ) data = self.process_data(document) if data: processed_data.append(data) # 关闭MongoDB连接 client.close() return processed_data def process_data(self, document): # 处理数据 data = { "title": document.get('title') or '', "context": document.get('context') or '', "source_url": document.get('source_url') or '', 'link': document.get('link') or '', "article_type": document.get('article_type') or '', "article_source": document.get('article_source') or '', "img_url": document.get('img_url') or '', 'keyword': document.get('keyword') or '', "posted_date": document.get('posted_date') or '', "create_time": document.get('create_time') or '', "create_datetime": document.get('create_datetime') or '', "repush_times": document.get('repush_times', 5) - 1 } data['title'] = self.clean_string(data['title'], 'title') data['context'] = self.clean_string(data['context'], 'context') return data def clean_string(self, input_string, text_type): # 清除 title 和 context 中的换行符和制表符 if not isinstance(input_string, str): return '' # 清除所有空白字符(包括空格、制表符、换行符等) cleaned_string = re.sub(r'\s+', '', input_string) if len(cleaned_string) > 100: cleaned_string = cleaned_string[:100] + '...' if text_type == 'context': pass return cleaned_string def send_email(self, processed_data): # 发送邮件 print('准备发送邮件') subject = '新闻汇总sub' title = '新闻汇总title' text = '********************************************************\n' for data in processed_data: text += '标题: {}\n'.format(data['title']) text += '正文: {}\n'.format(data['context']) text += '文章地址: {}\n'.format(data['link']) text += '类型: {}\n'.format(data['article_type']) text += '板块: {}\n'.format(data['article_source']) text += '文章时间: {}\n'.format(data['posted_date']) text += '获取时间: {}\n'.format(data['create_datetime']) text += '********************************************************\n\n' message = MIMEText(text, 'plain', 'utf-8') message['From'] = Header(title, 'utf-8') message['To'] = Header("auto", 'utf-8') message['Subject'] = Header(subject, 'utf-8') try: smtpObj = smtplib.SMTP_SSL(self.mail_host) smtpObj.login(self.mail_user, self.mail_pass) smtpObj.sendmail(self.sender, self.receivers, message.as_string()) print("邮件发送成功") except smtplib.SMTPException as e: print("Error: 无法发送邮件", e) def send_email_with_keyword(self, series, keys, processed_data): process_send_data = {} keys = keys.split('|') have_data_keys = [] for key in keys: # print(f'通过关键字: {key} 过滤') # 用来调试 key 是否正确 for data in processed_data: if key in data['title'] or key in data['context']: # 如果数据里面无 keyword, 用当前 key 替换一下 if not data.get('keyword'): data['keyword'] = key if series not in process_send_data: process_send_data[series] = [data] else: process_send_data[series].append(data) # 储存一下有数据的 key, 输出用 have_data_keys.append(key) if process_send_data: print('{}系列, 以下关键字有数据\n{}'.format(series, list(set(have_data_keys)))) # 发送邮件 print('程序正在准备发送邮件的数据') for key in process_send_data: subject = '新闻汇总sub - {}'.format(series) title = '新闻汇总title - {}'.format(series) text = '********************************************************\n' for data in process_send_data[key]: text += '标题: {}\n'.format(data['title']) text += '正文: {}\n'.format(data['context']) text += '文章地址: {}\n'.format(data['link']) text += '类型: {}\n'.format(data['article_type']) text += '板块: {}\n'.format(data['article_source']) text += '关键词: {}\n'.format(key) text += '文章时间: {}\n'.format(data['posted_date']) text += '获取时间: {}\n'.format(data['create_datetime']) text += '********************************************************\n\n' message = MIMEText(text, 'plain', 'utf-8') message['From'] = Header(title, 'utf-8') message['To'] = Header("auto", 'utf-8') message['Subject'] = Header(subject, 'utf-8') try: smtpObj = smtplib.SMTP_SSL(self.mail_host) smtpObj.login(self.mail_user, self.mail_pass) smtpObj.sendmail(self.sender, self.receivers, message.as_string()) print("关键字: {} 的邮件发送成功".format(series)) except smtplib.SMTPException as e: print("Error: 无法发送邮件", e) def main(self): # 加载指定天数的所有数据 processed_data = self.load_data() # 如果无数据, 则退出 if not processed_data: print("没有找到任何数据") exit(0) # 发送一次所有数据的邮件 # self.send_email(processed_data) # # 这里是通过关键词过滤然后再发送邮件 if filter_switch and filter_keys: for series, keys in filter_keys.items(): self.send_email_with_keyword(series, keys, processed_data) if __name__ == '__main__': NewsDataCollation().main()