# -*- coding: utf-8 -*- import json import smtplib import time from datetime import datetime import os import sys from email.header import Header from email.mime.text import MIMEText import httpx import pymongo sys.path.append(os.path.join(os.path.abspath(__file__).split('AutoInfo')[0] + 'AutoInfo')) PROJECT_PATH = os.path.join(os.path.abspath(__file__).split('AutoInfo')[0] + 'AutoInfo') class LogsHandle(object): def __init__(self): self.now_day = time.strftime('%Y-%m-%d', time.localtime()) db = 'logs' collection = 'logs_' + self.now_day self.mongo = MongoHandle(db=db, collection=collection, del_db=False, del_collection=False, auto_remove=0) def logs_generate(self): data_to_insert = { "title": "logs", "context": 'generate logs', "state": "create", "create_time": int(time.time()), "create_datetime": datetime.now().strftime("%Y-%m-%d %H:%M:%S") } self.mongo.collection.insert_one(data_to_insert) def logs_send(self): title = 'autoinfo - logs: {}'.format(self.now_day) text = '' # TODO # 从 mongodb 读取日志, 拼接 text, 发送邮件 # 查询所有文档 cursor = self.mongo.collection.find() # 遍历结果集 for record in cursor: text += "logs_source: {}, logs_detail: {}, state: {} logs_create_time: {}\n\n".format( record.setdefault('title'), record.setdefault('content'), record.setdefault('state'), record.setdefault('create_datetime'), ) GotifyNotifier(title=title, message=text, token_name='logs').send_message() def logs_write(self, title_source=None, content=None, state=None, send_now=False): data_to_insert = { "title": title_source, "context": content, "state": state, "create_time": int(time.time()), "create_datetime": datetime.now().strftime("%Y-%m-%d %H:%M:%S") } self.mongo.collection.insert_one(data_to_insert) if send_now: title = 'Auto Info - running logs: {}'.format(self.now_day) text = 'logs_source: {}, logs_detail: {}, state: {} logs_create_time: {}'.format( data_to_insert.setdefault('title'), data_to_insert.setdefault('content'), data_to_insert.setdefault('state'), data_to_insert.setdefault('create_datetime'), ) GotifyNotifier(title=title, message=text, token_name='logs').send_message() class MongoHandle(object): def __init__(self, db, collection, del_db=False, del_collection=False, auto_remove=0): config_json = LoadConfig().load_config() base_project = LoadConfig().get_base_path() db_user = config_json.get('DB_USER') db_password = config_json.get('DB_PASSWORD') db_ip = config_json.get('DB_IP') db_port = config_json.get('DB_PORT') mongo_link = f'mongodb://{db_user}:{db_password}@{db_ip}:{db_port}/' self.client = pymongo.MongoClient(mongo_link) self.db = db self.collection = collection if del_db and db: # 检查数据库是否存在 if db in self.client.list_database_names(): # 删除数据库 self.client.drop_database(db) self.db = self.client[db] if del_collection and self.collection: # 检查集合是否存在 if self.collection in self.db.list_collection_names(): # 删除集合 self.db.drop_collection(collection) self.collection = self.db[collection] if auto_remove: self.auto_remove_data(auto_remove) def write_data(self, data): self.collection.insert_one(data) def load_data(self): # MongoDB 会在第一次写入时自动创建数据库和集合 return list(self.collection.find({}, {'_id': False})) def auto_remove_data(self, day): for data in self.collection.find({'create_time': {'$lt': int(time.time()) - day * 24 * 60 * 60}}): self.collection.delete_one({'_id': data['_id']}) class SendEmail(object): def __init__(self, subject='AutoInfo subject', title='AutoInfo title', text='auto text') -> None: config_json = LoadConfig().load_config() mail_host = config_json.get('MAIL_HOST') mail_user = config_json.get('MAIL_USER') mail_pass = config_json.get('MAIL_PASS') mail_sender = config_json.get('MAIL_SENDER') mail_receivers = config_json.get('MAIL_RECEIVERS') # 第三方 SMTP 服务 self.mail_host = mail_host # 设置服务器 self.mail_user = mail_user # 用户名 self.mail_pass = mail_pass # 口令 self.sender = mail_sender self.receivers = [mail_receivers] self.subject = subject self.title = title self.text = text def send(self): message = MIMEText(self.text, 'plain', 'utf-8') message['From'] = Header(self.title, 'utf-8') message['To'] = Header("auto", 'utf-8') message['Subject'] = Header(self.subject, 'utf-8') try: smtpObj = smtplib.SMTP_SSL(self.mail_host) smtpObj.login(self.mail_user, self.mail_pass) smtpObj.sendmail(self.sender, self.receivers, message.as_string()) print("邮件发送成功") except smtplib.SMTPException as e: print("Error: 无法发送邮件", e) class GotifyNotifier: def __init__(self, title, message, token_name=''): config_json = LoadConfig().load_config() self.gotify_url = config_json.get('GOTIFY_URL', 'https://gotify.erhe.top') self.app_token = self.match_token_name(token_name or 'test') self.title = title self.message = message def match_token_name(self, name): token_name_dict = {} # 读取项目根目录下的 gotify_config.json 文件 gotify_config_path = os.path.join(str(PROJECT_PATH), 'gotify_config.json') with open(gotify_config_path, 'r') as f: token_name_dict = json.load(f) token = token_name_dict.get(name) if token: return token else: return token_name_dict['base'] def send_message(self): # 发送POST请求 with httpx.Client() as client: response = client.post( url=f"{self.gotify_url}/message?token={self.app_token}", headers={'Content-Type': 'application/json'}, json={'title': self.title, 'message': self.message} ) # 或者可以使用 curl # curl -k "https://gotify.erhe.top/message?token=A0Xg6ZE5946iBYg" -F "title=测试发送信息" -F "message=测试信息,测试发送" -F "priority=5" # 检查响应状态码 if response.status_code == 200: print(self.title) print('Gotify Message sent successfully!') else: print('Failed to send message:', response.text) class LoadConfig: def load_config(self): try: config_path = os.path.join(PROJECT_PATH, 'config.json') config_json = {} with open(config_path, 'r') as f: config_json = json.load(f) if not config_json: print('No config file found') exit(0) except Exception as e: print(e) exit(0) return config_json def get_base_path(self): return os.path.join(os.getcwd().split('AutoInfo')[0], 'AutoInfo')