# -*- coding: utf-8 -*- ''' 设置每天 23:59 执行, 读取当天数据库中, 所有日志, 发送到指定邮箱 ''' import os import sys import time import httpx import pymongo sys.path.append(os.path.join(os.path.abspath(__file__).split('AutoInfo')[0] + 'AutoInfo')) from utils.utils import LoadConfig config_json = LoadConfig().load_config() base_project = LoadConfig().get_base_path() PROJECT_NAME = config_json.get('PROJECT_NAME') DB_USER = config_json.get('DB_USER') DB_PASSWORD = config_json.get('DB_PASSWORD') DB_IP = config_json.get('DB_IP') DB_PORT = config_json.get('DB_PORT') MONGO_LINK = f'mongodb://{DB_USER}:{DB_PASSWORD}@{DB_IP}:{DB_PORT}/' MAIL_HOST = config_json.get('MAIL_HOST') MAIL_USER = config_json.get('MAIL_USER') MAIL_PASS = config_json.get('MAIL_PASS') MAIL_SENDER = config_json.get('MAIL_SENDER') MAIL_RECEIVERS = config_json.get('MAIL_RECEIVERS') now_day = time.strftime('%Y-%m-%d', time.localtime()) class LogsHandle(object): def __init__(self): self.now_day = time.strftime('%Y-%m-%d', time.localtime()) db = 'logs' collection = 'logs_' + self.now_day self.mongo = MongoHandle(db=db, collection=collection, del_db=False, del_collection=False, auto_remove=0) def logs_send(self): title = 'AutoInfo message - daily logs: {}'.format(self.now_day) text = '' # TODO # 从 mongodb 读取日志, 拼接 text, 发送邮件 # 查询所有文档 query = {'state': 'error'} cursor = self.mongo.collection.find(query) # 遍历结果集 for record in cursor: text += "logs_source: {}, logs_detail: {}, state: {} logs_create_time: {}\n\n".format( record.setdefault('title'), record.setdefault('content'), record.setdefault('state'), record.setdefault('create_datetime'), ) if text: G = GotifyNotifier(title=title, message=text, token_name='base') G.send_message() else: print("No error logs found for today.") class MongoHandle(object): def __init__(self, db, collection, del_db=False, del_collection=False, auto_remove=0): self.client = pymongo.MongoClient(MONGO_LINK) self.db = db self.collection = collection if del_db and db: # 检查数据库是否存在 if db in self.client.list_database_names(): # 删除数据库 self.client.drop_database(db) self.db = self.client[db] if del_collection and self.collection: # 检查集合是否存在 if self.collection in self.db.list_collection_names(): # 删除集合 self.db.drop_collection(collection) self.collection = self.db[collection] if auto_remove: self.auto_remove_data(auto_remove) def write_data(self, data): self.collection.insert_one(data) def auto_remove_data(self, day): for data in self.collection.find({'create_time': {'$lt': int(time.time()) - day * 24 * 60 * 60}}): self.collection.delete_one({'_id': data['_id']}) class GotifyNotifier: def __init__(self, title, message, token_name='A52cfQ1UZ2e.Z0B'): self.gotify_url = 'https://gotify.erhe.top' self.app_token = token_name self.title = title self.message = message def send_message(self): # 发送POST请求 with httpx.Client() as client: response = client.post( url=f"{self.gotify_url}/message?token={self.app_token}", headers={'Content-Type': 'application/json'}, json={'title': self.title, 'message': self.message} ) # 检查响应状态码 if response.status_code == 200: print('Gotify Message sent successfully!') else: print('Failed to send message:', response.text) if __name__ == '__main__': print("开始执行日志处理") LogsHandle().logs_send() print("处理日志程序执行完毕")