first commit

main
jack 3 months ago
commit d62923f2b4
  1. 65
      .gitignore
  2. 223
      base/base_daily_logs_generate.py
  3. 121
      base/base_daily_logs_send.py
  4. 233
      base/base_news_data_collation.py
  5. 177
      base/base_timing_remove_data.py
  6. 20
      config.json
  7. 11
      gotify_config.json
  8. 143
      remind/remind_auto_remind.py
  9. 101
      requirements.txt
  10. 153
      to_email/anyknew.py
  11. 134
      to_email/apprcn.py
  12. 55
      to_email/chaincatcher.py
  13. 236
      to_email/chiphell.py
  14. 302
      to_email/dlt.py
  15. 143
      to_email/hello_github.py
  16. 197
      to_email/rss_data.py
  17. 62
      to_gotify/airdrop_tasks.py
  18. 201
      to_gotify/coin_detail.py
  19. 127
      to_gotify/coin_world.py
  20. 66
      to_gotify/weather7d.py
  21. 251
      to_gotify/web3_news.py
  22. 238
      tools/tools_ql_create_tasks.py
  23. 1
      utils/__init__.py
  24. 213
      utils/utils.py

65
.gitignore vendored

@ -0,0 +1,65 @@
.DS_Store
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
env/
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
*.egg-info/
.installed.cfg
*.egg
.idea/*
xml_files/
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*,cover
# Translations
*.mo
*.pot
# Django stuff:
*.log
# Sphinx documentation
docs/_build/
# PyBuilder
target/
other/split_clash_config/split_config
ai_news/save_data
daily/*.txt

@ -0,0 +1,223 @@
# -*- coding: utf-8 -*-
"""
用于青龙面板的日志初始化脚本
每天 00:00:00 执行创建当天的日志记录
"""
import os
import sys
import time
import logging
from datetime import datetime
from typing import Dict, Any
# 添加项目路径
sys.path.append(os.path.join(os.path.abspath(__file__).split('AutoInfo')[0] + 'AutoInfo'))
from utils.utils import *
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class Config:
"""配置管理类"""
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._load_config()
return cls._instance
def _load_config(self):
"""加载配置"""
try:
config_json = LoadConfig().load_config()
self.PROJECT_NAME = config_json.get('PROJECT_NAME', 'AutoInfo')
self.DB_USER = config_json.get('DB_USER', '')
self.DB_PASSWORD = config_json.get('DB_PASSWORD', '')
self.DB_IP = config_json.get('DB_IP', 'localhost')
self.DB_PORT = config_json.get('DB_PORT', 27017)
self.MAIL_HOST = config_json.get('MAIL_HOST', '')
self.MAIL_USER = config_json.get('MAIL_USER', '')
self.MAIL_PASS = config_json.get('MAIL_PASS', '')
self.MAIL_SENDER = config_json.get('MAIL_SENDER', '')
self.MAIL_RECEIVERS = config_json.get('MAIL_RECEIVERS', [])
# 构建MongoDB连接字符串
if self.DB_USER and self.DB_PASSWORD:
self.MONGO_LINK = f'mongodb://{self.DB_USER}:{self.DB_PASSWORD}@{self.DB_IP}:{self.DB_PORT}/?authSource=admin'
else:
self.MONGO_LINK = f'mongodb://{self.DB_IP}:{self.DB_PORT}/'
except Exception as e:
logger.error(f"加载配置失败: {e}")
raise
class MongoHandle:
"""MongoDB操作处理类"""
def __init__(self, db: str, collection: str, del_db: bool = False,
del_collection: bool = False, auto_remove: int = 0):
"""
初始化MongoDB连接
Args:
db: 数据库名
collection: 集合名
del_db: 是否删除数据库
del_collection: 是否删除集合
auto_remove: 自动删除数据的天数阈值
"""
self.config = Config()
logger.info(f"连接数据库: {self.config.MONGO_LINK}")
try:
self.client = pymongo.MongoClient(
self.config.MONGO_LINK,
serverSelectionTimeoutMS=5000 # 5秒超时
)
# 测试连接
self.client.admin.command('ismaster')
self.db_name = db
self.collection_name = collection
self._setup_database(del_db, del_collection)
if auto_remove > 0:
self.auto_remove_data(auto_remove)
except pymongo.errors.ServerSelectionTimeoutError:
logger.error("无法连接到MongoDB服务器")
raise
except Exception as e:
logger.error(f"数据库初始化失败: {e}")
raise
def _setup_database(self, del_db: bool, del_collection: bool):
"""设置数据库和集合"""
if del_db and self.db_name:
if self.db_name in self.client.list_database_names():
self.client.drop_database(self.db_name)
logger.info(f"已删除数据库: {self.db_name}")
self.db = self.client[self.db_name]
if del_collection and self.collection_name:
if self.collection_name in self.db.list_collection_names():
self.db.drop_collection(self.collection_name)
logger.info(f"已删除集合: {self.collection_name}")
self.collection = self.db[self.collection_name]
def write_data(self, data: Dict[str, Any]) -> bool:
"""写入数据"""
try:
result = self.collection.insert_one(data)
logger.debug(f"数据插入成功, ID: {result.inserted_id}")
return True
except Exception as e:
logger.error(f"数据插入失败: {e}")
return False
def auto_remove_data(self, days: int):
"""自动删除指定天数前的数据"""
try:
cutoff_time = int(time.time()) - days * 24 * 60 * 60
result = self.collection.delete_many({
'create_time': {'$lt': cutoff_time}
})
if result.deleted_count > 0:
logger.info(f"已删除 {result.deleted_count} 条过期数据")
except Exception as e:
logger.error(f"自动删除数据失败: {e}")
def close(self):
"""关闭数据库连接"""
if hasattr(self, 'client'):
self.client.close()
logger.info("数据库连接已关闭")
class LogsHandler:
"""日志处理类"""
def __init__(self):
self.config = Config()
self.now_day = datetime.now().strftime('%Y-%m-%d')
self.mongo = None
self._setup_mongo()
def _setup_mongo(self):
"""设置MongoDB连接"""
try:
self.mongo = MongoHandle(
db='logs',
collection=f'logs_{self.now_day}',
del_db=False,
del_collection=False,
auto_remove=0 # 不自动删除,由其他机制处理
)
except Exception as e:
logger.error(f"初始化MongoDB连接失败: {e}")
raise
def logs_generate(self) -> bool:
"""生成当天的日志记录"""
data_to_insert = {
"title": "daily_log_init",
"context": f"Daily log collection created for {self.now_day}",
"state": "created",
"create_time": int(time.time()),
"create_datetime": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"project": self.config.PROJECT_NAME
}
try:
success = self.mongo.write_data(data_to_insert)
if success:
logger.info(f"成功创建当天日志记录: {self.now_day}")
return success
except Exception as e:
logger.error(f"创建日志记录失败: {e}")
return False
def cleanup(self):
"""清理资源"""
if self.mongo:
self.mongo.close()
def main():
"""主函数"""
logger.info("开始创建当天日志记录...")
logs_handler = None
try:
logs_handler = LogsHandler()
success = logs_handler.logs_generate()
if success:
logger.info("当天日志记录创建成功")
return 0
else:
logger.error("当天日志记录创建失败")
return 1
except Exception as e:
logger.error(f"执行过程中发生错误: {e}")
return 1
finally:
if logs_handler:
logs_handler.cleanup()
if __name__ == '__main__':
exit_code = main()
sys.exit(exit_code)

@ -0,0 +1,121 @@
# -*- coding: utf-8 -*-
'''
设置每天 23:59 执行, 读取当天数据库中, 所有日志, 发送到指定邮箱
'''
import os
import sys
import time
import httpx
import pymongo
sys.path.append(os.path.join(os.path.abspath(__file__).split('AutoInfo')[0] + 'AutoInfo'))
from utils.utils import LoadConfig
config_json = LoadConfig().load_config()
base_project = LoadConfig().get_base_path()
PROJECT_NAME = config_json.get('PROJECT_NAME')
DB_USER = config_json.get('DB_USER')
DB_PASSWORD = config_json.get('DB_PASSWORD')
DB_IP = config_json.get('DB_IP')
DB_PORT = config_json.get('DB_PORT')
MONGO_LINK = f'mongodb://{DB_USER}:{DB_PASSWORD}@{DB_IP}:{DB_PORT}/'
MAIL_HOST = config_json.get('MAIL_HOST')
MAIL_USER = config_json.get('MAIL_USER')
MAIL_PASS = config_json.get('MAIL_PASS')
MAIL_SENDER = config_json.get('MAIL_SENDER')
MAIL_RECEIVERS = config_json.get('MAIL_RECEIVERS')
now_day = time.strftime('%Y-%m-%d', time.localtime())
class LogsHandle(object):
def __init__(self):
self.now_day = time.strftime('%Y-%m-%d', time.localtime())
db = 'logs'
collection = 'logs_' + self.now_day
self.mongo = MongoHandle(db=db, collection=collection, del_db=False, del_collection=False, auto_remove=0)
def logs_send(self):
title = 'AutoInfo message - daily logs: {}'.format(self.now_day)
text = ''
# TODO
# 从 mongodb 读取日志, 拼接 text, 发送邮件
# 查询所有文档
query = {'state': 'error'}
cursor = self.mongo.collection.find(query)
# 遍历结果集
for record in cursor:
text += "logs_source: {}, logs_detail: {}, state: {} logs_create_time: {}\n\n".format(
record.setdefault('title'),
record.setdefault('content'),
record.setdefault('state'),
record.setdefault('create_datetime'),
)
if text:
G = GotifyNotifier(title=title, message=text, token_name='base')
G.send_message()
else:
print("No error logs found for today.")
class MongoHandle(object):
def __init__(self, db, collection, del_db=False, del_collection=False, auto_remove=0):
self.client = pymongo.MongoClient(MONGO_LINK)
self.db = db
self.collection = collection
if del_db and db:
# 检查数据库是否存在
if db in self.client.list_database_names():
# 删除数据库
self.client.drop_database(db)
self.db = self.client[db]
if del_collection and self.collection:
# 检查集合是否存在
if self.collection in self.db.list_collection_names():
# 删除集合
self.db.drop_collection(collection)
self.collection = self.db[collection]
if auto_remove:
self.auto_remove_data(auto_remove)
def write_data(self, data):
self.collection.insert_one(data)
def auto_remove_data(self, day):
for data in self.collection.find({'create_time': {'$lt': int(time.time()) - day * 24 * 60 * 60}}):
self.collection.delete_one({'_id': data['_id']})
class GotifyNotifier:
def __init__(self, title, message, token_name='A52cfQ1UZ2e.Z0B'):
self.gotify_url = 'https://gotify.erhe.top'
self.app_token = token_name
self.title = title
self.message = message
def send_message(self):
# 发送POST请求
with httpx.Client() as client:
response = client.post(
url=f"{self.gotify_url}/message?token={self.app_token}",
headers={'Content-Type': 'application/json'},
json={'title': self.title, 'message': self.message}
)
# 检查响应状态码
if response.status_code == 200:
print('Gotify Message sent successfully!')
else:
print('Failed to send message:', response.text)
if __name__ == '__main__':
print("开始执行日志处理")
LogsHandle().logs_send()
print("处理日志程序执行完毕")

@ -0,0 +1,233 @@
'''
每日从 mongo 数据库, 做新闻汇总,发送到邮箱
'''
import os
import sys
sys.path.append(os.path.join(os.path.abspath(__file__).split('AutoInfo')[0] + 'AutoInfo'))
from pymongo import MongoClient
from datetime import timedelta
import re
from utils.utils import *
config_json = LoadConfig().load_config()
base_project = LoadConfig().get_base_path()
PROJECT_NAME = config_json.get('PROJECT_NAME')
DB_USER = config_json.get('DB_USER')
DB_PASSWORD = config_json.get('DB_PASSWORD')
DB_IP = config_json.get('DB_IP')
DB_PORT = config_json.get('DB_PORT')
MAIL_HOST = config_json.get('MAIL_HOST')
MAIL_USER = config_json.get('MAIL_USER')
MAIL_PASS = config_json.get('MAIL_PASS')
MAIL_SENDER = config_json.get('MAIL_SENDER')
MAIL_RECEIVERS = config_json.get('MAIL_RECEIVERS')
DB_NAME = config_json.get('DB_NAME') # 确保配置文件中有这个键
MONGO_LINK = f'mongodb://{DB_USER}:{DB_PASSWORD}@{DB_IP}:{DB_PORT}/'.format(**config_json)
now_day = datetime.now().strftime('%Y-%m-%d') # 获取今天的日期
filter_days = config_json.get('FILTER_DAYS')
filter_keys = config_json.get('FILTER_KEYS')
filter_switch = True
class NewsDataCollation(object):
def __init__(self):
# 第三方 SMTP 服务
self.mail_host = MAIL_HOST # 设置服务器
self.mail_user = MAIL_USER # 用户名
self.mail_pass = MAIL_PASS # 口令
self.sender = MAIL_SENDER
self.receivers = [MAIL_RECEIVERS]
self.processed_data = []
def load_data(self):
processed_data = []
# 读取数据
print('程序正在读取数据')
client = MongoClient(MONGO_LINK)
db = client['NEWS']
# 根据 self.days 获取日期范围
start_date = (datetime.now() - timedelta(days=filter_days - 1)).strftime('%Y-%m-%d')
end_date = datetime.now().strftime('%Y-%m-%d')
# 构造查询条件,匹配日期范围内的日期
query = {
"create_datetime": {
"$regex": f"^{start_date}|{end_date}",
"$options": "i" # 使用不区分大小写的匹配
}
}
# 遍历数据库中的所有集合
for collection_name in db.list_collection_names():
print(collection_name)
collection = db[collection_name]
cursor = collection.find(query)
for document in cursor:
if not document.get('title'):
continue
# 检查 'repush_times' 字段是否存在,如果不存在则默认为 5
repush_times = document.get('repush_times', 5)
# 减少 repush_times 的值
new_repush_times = repush_times - 1
# 更新数据库中的 repush_times 字段
collection.update_one(
{"_id": document['_id']}, # 假设文档中有 _id 字段作为唯一标识
{"$set": {"repush_times": new_repush_times}}
)
data = self.process_data(document)
if data:
processed_data.append(data)
# 关闭MongoDB连接
client.close()
return processed_data
def process_data(self, document):
# 处理数据
data = {
"title": document.get('title') or '',
"context": document.get('context') or '',
"source_url": document.get('source_url') or '',
'link': document.get('link') or '',
"article_type": document.get('article_type') or '',
"article_source": document.get('article_source') or '',
"img_url": document.get('img_url') or '',
'keyword': document.get('keyword') or '',
"posted_date": document.get('posted_date') or '',
"create_time": document.get('create_time') or '',
"create_datetime": document.get('create_datetime') or '',
"repush_times": document.get('repush_times', 5) - 1
}
data['title'] = self.clean_string(data['title'], 'title')
data['context'] = self.clean_string(data['context'], 'context')
return data
def clean_string(self, input_string, text_type):
# 清除 title 和 context 中的换行符和制表符
if not isinstance(input_string, str):
return ''
# 清除所有空白字符(包括空格、制表符、换行符等)
cleaned_string = re.sub(r'\s+', '', input_string)
if len(cleaned_string) > 100:
cleaned_string = cleaned_string[:100] + '...'
if text_type == 'context':
pass
return cleaned_string
def send_email(self, processed_data):
# 发送邮件
print('准备发送邮件')
subject = '新闻汇总sub'
title = '新闻汇总title'
text = '********************************************************\n'
for data in processed_data:
text += '标题: {}\n'.format(data['title'])
text += '正文: {}\n'.format(data['context'])
text += '文章地址: {}\n'.format(data['link'])
text += '类型: {}\n'.format(data['article_type'])
text += '板块: {}\n'.format(data['article_source'])
text += '文章时间: {}\n'.format(data['posted_date'])
text += '获取时间: {}\n'.format(data['create_datetime'])
text += '********************************************************\n\n'
message = MIMEText(text, 'plain', 'utf-8')
message['From'] = Header(title, 'utf-8')
message['To'] = Header("auto", 'utf-8')
message['Subject'] = Header(subject, 'utf-8')
try:
smtpObj = smtplib.SMTP_SSL(self.mail_host)
smtpObj.login(self.mail_user, self.mail_pass)
smtpObj.sendmail(self.sender, self.receivers, message.as_string())
print("邮件发送成功")
except smtplib.SMTPException as e:
print("Error: 无法发送邮件", e)
def send_email_with_keyword(self, series, keys, processed_data):
process_send_data = {}
keys = keys.split('|')
have_data_keys = []
for key in keys:
# print(f'通过关键字: {key} 过滤') # 用来调试 key 是否正确
for data in processed_data:
if key in data['title'] or key in data['context']:
# 如果数据里面无 keyword, 用当前 key 替换一下
if not data.get('keyword'):
data['keyword'] = key
if series not in process_send_data:
process_send_data[series] = [data]
else:
process_send_data[series].append(data)
# 储存一下有数据的 key, 输出用
have_data_keys.append(key)
if process_send_data:
print('{}系列, 以下关键字有数据\n{}'.format(series, list(set(have_data_keys))))
# 发送邮件
print('程序正在准备发送邮件的数据')
for key in process_send_data:
subject = '新闻汇总sub - {}'.format(series)
title = '新闻汇总title - {}'.format(series)
text = '********************************************************\n'
for data in process_send_data[key]:
text += '标题: {}\n'.format(data['title'])
text += '正文: {}\n'.format(data['context'])
text += '文章地址: {}\n'.format(data['link'])
text += '类型: {}\n'.format(data['article_type'])
text += '板块: {}\n'.format(data['article_source'])
text += '关键词: {}\n'.format(key)
text += '文章时间: {}\n'.format(data['posted_date'])
text += '获取时间: {}\n'.format(data['create_datetime'])
text += '********************************************************\n\n'
message = MIMEText(text, 'plain', 'utf-8')
message['From'] = Header(title, 'utf-8')
message['To'] = Header("auto", 'utf-8')
message['Subject'] = Header(subject, 'utf-8')
try:
smtpObj = smtplib.SMTP_SSL(self.mail_host)
smtpObj.login(self.mail_user, self.mail_pass)
smtpObj.sendmail(self.sender, self.receivers, message.as_string())
print("关键字: {} 的邮件发送成功".format(series))
except smtplib.SMTPException as e:
print("Error: 无法发送邮件", e)
def main(self):
# 加载指定天数的所有数据
processed_data = self.load_data()
# 如果无数据, 则退出
if not processed_data:
print("没有找到任何数据")
exit(0)
# 发送一次所有数据的邮件
# self.send_email(processed_data)
# # 这里是通过关键词过滤然后再发送邮件
if filter_switch and filter_keys:
for series, keys in filter_keys.items():
self.send_email_with_keyword(series, keys, processed_data)
if __name__ == '__main__':
NewsDataCollation().main()

@ -0,0 +1,177 @@
# -*- coding: utf-8 -*-
"""
自动清除大于指定天数的数据
"""
import threading
import time
import sys
import os
from datetime import datetime
import pymongo
sys.path.append(os.path.join(os.path.abspath(__file__).split('AutoInfo')[0] + 'AutoInfo'))
base_project = os.path.join(os.getcwd().split('AutoInfo')[0], 'AutoInfo')
from utils.utils import LoadConfig, GotifyNotifier
config_json = LoadConfig().load_config()
base_project = LoadConfig().get_base_path()
PROJECT_NAME = config_json.get('PROJECT_NAME')
DB_USER = config_json.get('DB_USER')
DB_PASSWORD = config_json.get('DB_PASSWORD')
DB_IP = config_json.get('DB_IP')
DB_PORT = config_json.get('DB_PORT')
MONGO_LINK = f'mongodb://{DB_USER}:{DB_PASSWORD}@{DB_IP}:{DB_PORT}/'
MAIL_HOST = config_json.get('MAIL_HOST')
MAIL_USER = config_json.get('MAIL_USER')
MAIL_PASS = config_json.get('MAIL_PASS')
MAIL_SENDER = config_json.get('MAIL_SENDER')
MAIL_RECEIVERS = config_json.get('MAIL_RECEIVERS')
now_day = time.strftime('%Y-%m-%d', time.localtime())
class MongoHandle(object):
def __init__(self, db, collection, del_db=False, del_collection=False, auto_remove=0):
self.client = pymongo.MongoClient(MONGO_LINK)
self.db = db
self.collection = collection
if del_db and db:
# 检查数据库是否存在
if db in self.client.list_database_names():
# 删除数据库
self.client.drop_database(db)
self.db = self.client[db]
if del_collection and self.collection:
# 检查集合是否存在
if self.collection in self.db.list_collection_names():
# 删除集合
self.db.drop_collection(collection)
self.collection = self.db[collection]
if auto_remove:
self.auto_remove_data(auto_remove)
def write_data(self, data):
self.collection.insert_one(data)
def auto_remove_data(self, day):
for data in self.collection.find({'create_time': {'$lt': int(time.time()) - day * 24 * 60 * 60}}):
self.collection.delete_one({'_id': data['_id']})
class SendEmail(object):
def __init__(self, subject='AutoInfo subject', title='AutoInfo title', text='AutoInfo text') -> None:
# 第三方 SMTP 服务
self.mail_host = MAIL_HOST # 设置服务器
self.mail_user = MAIL_USER # 用户名
self.mail_pass = MAIL_PASS # 口令
self.sender = MAIL_SENDER
self.receivers = [MAIL_RECEIVERS]
self.subject = subject
self.title = title
self.text = text
def send(self):
if self.title:
G = GotifyNotifier(title=self.title, message=self.subject)
G.send_message()
else:
print("No error logs found for today.")
class LogsHandle(object):
def __init__(self):
self.now_day = time.strftime('%Y-%m-%d', time.localtime())
db = 'logs'
collection = 'logs_' + self.now_day
self.mongo = MongoHandle(db=db, collection=collection, del_db=False, del_collection=False, auto_remove=0)
def logs_write(self, title_source=None, content=None, state=None, send_now=False):
data_to_insert = {
"title": title_source,
"context": content,
"state": state,
"create_time": int(time.time()),
"create_datetime": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
self.mongo.collection.insert_one(data_to_insert)
if send_now:
subject = 'auto collection'
title = 'auto collection - running logs: {}'.format(self.now_day)
text = 'logs_source: {}, logs_detail: {}, state: {} logs_create_time: {}'.format(
data_to_insert.setdefault('title'),
data_to_insert.setdefault('content'),
data_to_insert.setdefault('state'),
data_to_insert.setdefault('create_datetime'),
)
Send = SendEmail(subject=subject, title=title, text=text)
Send.send()
class AutoRemoveData(object):
def __init__(self):
self.databases = [
'spider_news',
'apprcn',
'HelloGithub'
]
self.day = 60
self.client = pymongo.MongoClient(MONGO_LINK)
self.logs = LogsHandle()
self.all_delete_count = 0
def auto_remove_data(self, db_name, day):
print(f'准备删除时间大于: {self.day} 数据')
if db_name not in self.client.list_database_names():
return
deleted_count = 0
db = self.client[db_name]
for collection_name in db.list_collection_names():
collection = db[collection_name]
for data in collection.find({'create_time': {'$lt': int(time.time()) - day * 24 * 60 * 60}}):
collection.delete_one({'_id': data['_id']})
deleted_count += 1
self.all_delete_count += deleted_count
msg = f"删除 {db_name}{self.day} 天以上数据 {deleted_count}"
if deleted_count:
print(msg)
self.logs.logs_write(f'自动删除 {self.day} 天以上数据', msg, 'delete', False)
def main(self):
self.logs.logs_write(f'自动删除 {self.day} 天以上数据', f'开始自动删除 {self.day} 天以上数据', 'start', False)
threads = []
for db_name in self.databases:
thread = threading.Thread(target=self.auto_remove_data, args=(db_name, self.day))
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
print(f'删除时间大于: {self.day} 数据, 已完成')
print(f'本次运行共删除: {self.all_delete_count} 条数据')
self.logs.logs_write(f'自动删除 {self.day} 天以上数据', f'自动删除 {self.day} 天数以上数据完成', 'done', False)
if __name__ == "__main__":
A = AutoRemoveData()
A.main()

@ -0,0 +1,20 @@
{
"PROJECT_NAME": "AutoInfo",
"MAIL_HOST": "smtp.163.com",
"MAIL_USER": "pushmessagebot@163.com",
"MAIL_PASS": "WSMSRKBKXIHIQWTU",
"MAIL_SENDER": "pushmessagebot@163.com",
"MAIL_RECEIVERS": "pushmessagebot@163.com",
"DB_USER": "root",
"DB_PASSWORD": "aaaAAA111!!!",
"DB_IP": "localhost",
"DB_PORT": "27017",
"GOTIFY_URL": "https://gotify.erhe.top",
"FILTER_DAYS": 1,
"FILTER_KEYS": {
"新闻汇总": "经济|金融|失业率",
"web3新闻": "web3|btc|eth|区块链|NFT|数字货币|数字币|数字资产|Dapp|DeFi|NFT|稳定币|元宇宙|GameFi|跨链|以太坊",
"关注新闻": "grass|movement"
},
"DEFAULT_RE_PUSH_TIMES": 5
}

@ -0,0 +1,11 @@
{
"logs": "A52cfQ1UZ2e.Z0B",
"base": "A8EVb0Cmxnb2vfk",
"coin": "AgfOJESqDKftBTQ",
"dlt": "A3bqt9Dlbs.fPUb",
"AirdropTasksNews": "Aoe0VKt-kkZnm8d",
"weather": "A9KF--mx_12PjSu",
"news": "AT2QGp_vyCX4akW",
"CheckAndRemind": "Aw7XKE2Ppk7Dgwk",
"test": "A0Xg6ZE5946iBYg"
}

@ -0,0 +1,143 @@
# -*- coding: utf-8 -*-
import sys
import os
from utils.utils import LoadConfig
sys.path.append(os.path.join(os.path.abspath(__file__).split('AutoInfo')[0] + 'AutoInfo'))
from utils.utils import *
config_json = LoadConfig().load_config()
base_project = LoadConfig().get_base_path()
DB_USER = config_json.get('DB_USER')
DB_PASSWORD = config_json.get('DB_PASSWORD')
DB_IP = config_json.get('DB_IP')
DB_PORT = config_json.get('DB_PORT')
MONGO_LINK = f'mongodb://{DB_USER}:{DB_PASSWORD}@{DB_IP}:{DB_PORT}/'
from pymongo import MongoClient
class AutoRemind:
def __init__(self):
self.logs_handle = LogsHandle()
self.now_day = time.strftime('%Y-%m-%d', time.localtime())
self.db = 'ReMind'
self.collection = 'remind'
self.client = MongoClient(MONGO_LINK)
def send_message(self, task_data):
if task_data['retry'] > 0:
# 如果无填时间, 则一直发送,如果有,则判断当前时间, 是否大于设置的时间, 大于则发送,返回 retry-1 的数据, 小于则不发送,返回 task_data 的数据
if task_data['set_time']:
if datetime.now() < datetime.strptime(task_data['set_time'], '%Y-%m-%d %H:%M:%S'):
return None
else:
title = '消息提醒: {} - {}'.format('提醒消息', task_data['title'])
context = '消息内容: {}\n'.format(task_data['context'])
context += '设置时间: {}\n'.format(task_data['set_time'])
context += '推送时间: {}\n'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
# 组装完标题和正文, 准备发送消息
# 推送到 message
GotifyNotifier(title, context, 'news').send_message()
# 推送到 serverchan
ServerChanNotifier(title, context.replace('\n', '\n\n')).send_message()
# 发送后 retry - 1
task_data = {
'title': task_data['title'],
'context': task_data['context'],
'set_time': task_data['set_time'],
'retry': task_data['retry'] - 1
}
# 然后存回数据库
self.write_config(task_data['title'])
else:
return None
def load_config(self):
db = self.client['ReMind']
collection = db['remind']
cursor = collection.find({})
result = []
# 遍历游标并打印每条数据
for document in cursor:
result.append({
'title': document['title'],
'context': document['context'],
'set_time': document['set_time'],
'retry': document['retry']
})
return result
def write_config(self, task_title):
db = self.client['ReMind']
collection = db['remind']
updated_document = collection.find_one_and_update(
{"title": task_title}, # 查询条件
{"$inc": {"retry": -1}}, # 更新操作,retry减1
upsert=False, # 不插入新文档,如果未找到
return_document=True # 返回更新前的文档
)
if updated_document:
print("找到并更新了文档:", updated_document)
else:
print("未找到匹配的文档")
def check_config(self):
db_name = 'ReMind'
if db_name not in self.client.list_database_names():
self.db = self.client[db_name]
else:
self.db = self.client[db_name]
collection_name = 'remind'
if collection_name not in self.db.list_collection_names():
self.collection = self.db[collection_name]
else:
self.collection = self.db[collection_name]
default = {
"title": "消息标题 1 title",
"context": "消息内容 1 context",
"set_time": "9999-12-31 10:00:00",
"retry": 99
}
if not self.collection.find_one({"title": default["title"]}):
self.collection.insert_one(default)
def create_config(self):
db = self.client['ReMind']
collection = db['remind']
create_list = [
{"title": "消息标题 1 title", "context": "消息内容 1 context", "set_time": "9999-12-31 10:00:00",
"retry": 99},
]
for task in create_list:
if not collection.find_one({"title": task["title"]}):
collection.insert_one(task)
def main(self):
self.check_config()
config_list = self.load_config()
self.create_config()
for task_data in config_list:
result_task = self.send_message(task_data)
if __name__ == '__main__':
AutoRemind().main()
print('消息发送完成,程序退出!')

@ -0,0 +1,101 @@
aiofiles==24.1.0
aiohappyeyeballs==2.4.0
aiohttp==3.10.5
aiohttp-socks==0.8.4
aiosignal==1.3.1
annotated-types==0.6.0
anyio==4.1.0
appdirs==1.4.4
APScheduler==3.10.4
async-timeout==4.0.3
attrs==23.1.0
beautifulsoup4==4.12.2
blinker==1.7.0
Brotli==1.1.0
bs4==0.0.1
certifi==2023.11.17
cffi==1.16.0
charset-normalizer==3.3.2
click==8.1.7
colorama==0.4.6
cryptography==41.0.7
distro==1.9.0
dnspython==2.4.2
exceptiongroup==1.2.0
fastapi==0.108.0
frozenlist==1.4.0
greenlet==3.0.3
h11==0.14.0
h2==4.1.0
helium==5.0.3
hpack==4.0.0
httpcore==1.0.2
httpx==0.25.2
hyperframe==6.0.1
idna==3.6
importlib-metadata==7.0.0
jieba==0.42.1
jiter==0.5.0
jsonschema==4.23.0
jsonschema-specifications==2023.12.1
kaitaistruct==0.10
lxml==5.3.0
matrix-client==0.4.0
matrix-nio==0.25.0
multidict==6.0.4
numpy==1.26.4
openai==1.42.0
outcome==1.3.0.post0
packaging==24.1
paho-mqtt==2.1.0
pillow==11.0.0
ping3==4.0.4
playwright==1.46.0
psycopg2-binary==2.9.10
pyasn1==0.5.1
pycparser==2.21
pycryptodome==3.19.0
pydantic==2.5.3
pydantic_core==2.14.6
pydivert==2.1.0
pyee==11.1.0
PyExecJS==1.5.1
pygpt4all==1.1.0
pygptj==2.0.3
pymongo==4.6.1
pyOpenSSL==23.3.0
pyparsing==3.1.1
pyppeteer==2.0.0
PyRSS2Gen==1.1
pyshark==0.6
PySocks==1.7.1
python-socks==2.5.1
pytz==2024.1
PyYAML==6.0.2
redis==5.0.1
referencing==0.35.1
requests==2.31.0
rpds-py==0.20.0
rsa==4.9
scapy==2.5.0
schedule==1.2.1
six==1.16.0
sniffio==1.3.0
sortedcontainers==2.4.0
soupsieve==2.5
starlette==0.32.0.post1
termcolor==2.4.0
tqdm==4.66.1
trio==0.23.1
trio-websocket==0.11.1
typing_extensions==4.12.2
tzlocal==5.2
unpaddedbase64==2.1.0
urllib3==1.26.18
uvicorn==0.25.0
websockets==10.4
wsproto==1.2.0
xmltodict==0.13.0
yarl==1.9.4
zipp==3.17.0
zstandard==0.22.0

@ -0,0 +1,153 @@
# -*- coding: utf-8 -*-
import os
import sys
sys.path.append(os.path.join(os.path.abspath(__file__).split('AutoInfo')[0] + 'AutoInfo'))
from utils.utils import *
config_json = LoadConfig().load_config()
DEFAULT_RE_PUSH_TIMES = config_json['DEFAULT_RE_PUSH_TIMES']
class HotNews():
def __init__(self):
self.base_url = 'https://www.anyknew.com/go/'
self.email_subject = '聚合新闻'
self.email_title = 'Anyknew'
self.email_text = '获取数据时间:\n{0}\n{1}\n\n\n\n'.format(datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
('-' * 90))
self.logs_handle = LogsHandle()
self.now_day = time.strftime('%Y-%m-%d', time.localtime())
self.db = 'NEWS'
self.collection = 'Anyknew_info'
self.targets = {
'universal': 'https://www.anyknew.com/api/v1/cats/universal',
'finance': 'https://www.anyknew.com/api/v1/cats/aam',
'science': 'https://www.anyknew.com/api/v1/cats/st',
'life': 'https://www.anyknew.com/api/v1/cats/life',
'binary': 'https://www.anyknew.com/api/v1/cats/binary'
}
self.temp_datas = []
def main(self):
self.logs_handle.logs_write('聚合新闻', '任务开始', 'start', False)
resp_data = self.req()
if resp_data:
self.save_to_mongo(resp_data)
if self.temp_datas:
print('准备发送消息')
self.send_to_gotify()
else:
print('无新数据')
else:
self.logs_handle.logs_write('聚合新闻', '获取数据为空', 'error', False)
return False
self.logs_handle.logs_write('聚合新闻', '任务完成', 'done', False)
def req(self):
print('开始请求数据')
result_data = []
for target in self.targets:
url = self.targets[target]
try:
resp = httpx.get(url=url)
except Exception as e:
print("请求出错{}, \nurl: {}".format(e, url))
time.sleep(20)
continue
resp_json = resp.json()
data = resp_json.setdefault('data')
cat = data.setdefault('cat')
sites = cat.setdefault('sites')
for site in sites:
site_name = site.setdefault('site')
subs = site.setdefault('subs')
target_and_site = '{}-{}'.format(target, site_name)
for items in subs:
for item in items:
if item == 'items':
detail = items['items']
for d in detail:
if target == 'universal':
tag = 'Anyknew - 综合'
elif target == 'finance':
tag = 'Anyknew - 金融'
elif target == 'science':
tag = 'Anyknew - 科学'
elif target == 'life':
tag = 'Anyknew - 生活'
elif target == 'binary':
tag = 'Anyknew - 二进制'
else:
tag = 'Anyknew'
result_data.append({
"title": d.get('title') or '',
"context": d.get('more') or '',
"source_url": url,
'link': self.base_url + (str(d.get('iid')) or ''),
"article_type": target_and_site,
"article_source": tag,
"img_url": '',
'keyword': '',
"posted_date": d.get('add_date') or '',
"create_time": int(time.time()),
"create_datetime": datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
"repush_times": DEFAULT_RE_PUSH_TIMES
})
print('已获取数据')
return result_data
def save_to_mongo(self, source_data):
print(f'开始处理Anyknew数据')
mongo = MongoHandle(db=self.db, collection=self.collection, del_db=False, del_collection=False, auto_remove=0)
for data_to_insert in source_data:
try:
# 检查数据库中是否存在匹配的文档
filter_criteria = {'title': data_to_insert.get('title', '')} # 确保 title 字段有值
count = mongo.collection.count_documents(filter_criteria)
if count == 0:
# 如果没有找到匹配的文档,插入新文档
result = mongo.collection.insert_one(data_to_insert)
self.temp_datas.append(data_to_insert)
except TypeError as te:
print('\n%s' % te)
self.logs_handle.logs_write('聚合新闻', '写入数据库报错: %s' % te, 'error', False)
return 0
print(f'Anyknew数据处理')
def send_to_gotify(self):
text = '****************************************\n'
for data in self.temp_datas:
text += '标题: {}\n'.format(data['title'])
text += '正文: {}\n'.format(data['context'])
text += '文章地址: {}\n'.format(data['link'])
text += '类型: {}\n'.format(data['article_type'])
text += '板块: {}\n'.format(data['article_source'])
text += '文章时间: {}\n'.format(data['posted_date'])
text += '获取时间: {}\n'.format(data['create_datetime'])
text += '***********************************\n\n'
title = 'Anyknew新闻 - ' + str(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
sub = 'Anyknew新闻'
SendEmail(subject=sub, title=title, text=text).send()
# GotifyNotifier(title=title, message=text, token_name='news').send_message()
if __name__ == '__main__':
HotNews().main()

@ -0,0 +1,134 @@
# -*- coding: utf-8 -*-
'''
反斗限免
1, 获取反斗限免数据
2, 储存到mongodb
3, 发送到指定邮件
'''
import re
import sys
import os
sys.path.append(os.path.join(os.path.abspath(__file__).split('AutoInfo')[0] + 'AutoInfo'))
from utils.utils import *
config_json = LoadConfig().load_config()
DEFAULT_RE_PUSH_TIMES = config_json['DEFAULT_RE_PUSH_TIMES']
class APPRCN(object):
def __init__(self):
self.logs_handle = LogsHandle()
self.now_day = time.strftime('%Y-%m-%d', time.localtime())
self.base_url = 'https://free.apprcn.com/page/{}/'
self.headers = {
'User-Agent': 'Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 6.1; WOW64; Trident/4.0; SLCC2; Media Center PC 6.0; InfoPath.2; MS-RTC LM 8'
}
db = 'NEWS'
collection = 'apprcn-info'
self.mongo = MongoHandle(db=db, collection=collection, del_db=False, del_collection=False, auto_remove=0)
self.temp_datas = []
def main(self):
self.logs_handle.logs_write('apprcn', '开始获取反斗限免数据', 'start', False)
response_data = self.req()
if response_data:
self.save_to_mongo(response_data)
self.send_to_gotify()
self.logs_handle.logs_write('apprcn', '反斗限免数据获取完成', 'done', False)
print('done')
else:
self.logs_handle.logs_write('apprcn', '无法获取apprcn数据', 'error', False)
def req(self):
urls = ['https://free.apprcn.com/']
for i in range(2, 10):
urls.append(self.base_url.format(i))
response_data = []
for i in urls:
response = httpx.get(url=i, headers=self.headers)
if response.status_code != 200:
self.logs_handle.logs_write('apprcn', '请求失败, 状态码: %s' % response.status_code, 'error', False)
exit(0)
response.encoding = 'utf-8'
content_list = re.findall('<div class="content">([\S\s]*?)<div class="sidebar">', response.text)
# 清理content数据
content = ''
if content_list:
for i in ['\t', '\n']:
content = content_list[0].replace(i, '')
context_list = re.findall('<p class="note">(.*?)</p>', content)
title_list = re.findall('title="(.*?)"', content)
post_date_list = re.findall('<time>(.*?)</time>', content)
source_data_list = re.findall('<a class="cat" href="(.*?)"', content)
for title, context, post_date, source_data in zip(title_list, context_list, post_date_list,
source_data_list):
response_data.append({
"title": title,
"context": context,
"source_url": source_data,
'link': '',
"article_type": '',
"article_source": '',
"img_url": '',
'keyword': '',
"posted_date": post_date,
"create_time": int(time.time()),
"create_datetime": datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
"repush_times": DEFAULT_RE_PUSH_TIMES
})
if response_data:
return response_data
else:
self.logs_handle.logs_write('apprcn', '获取数据失败', 'error', False)
def save_to_mongo(self, data):
print('开始储存 反斗限免 数据')
for data_to_insert in data:
try:
# 检查数据库中是否存在匹配的文档
filter_criteria = {'title': data_to_insert.get('title', '')} # 确保 title 字段有值
count = self.mongo.collection.count_documents(filter_criteria)
if count == 0:
# 如果没有找到匹配的文档,插入新文档
result = self.mongo.collection.insert_one(data_to_insert)
self.temp_datas.append(data_to_insert)
except TypeError as te:
print('\n%s' % te)
self.logs_handle.logs_write('反斗限免', '写入数据库报错: %s' % te, 'error', False)
return 0
print('储存数据完成', datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
def send_to_gotify(self):
if self.temp_datas:
text = ''
for data in self.temp_datas:
text += '标题: %s\n内容: %s\n时间: %s\n链接: %s\n\n' % (
data['title'], data['context'], data['posted_date'], data['source_url'])
title = '反斗限免 - ' + str(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
sub = '反斗限免'
SendEmail(subject=sub, title=title, text=text).send()
# GotifyNotifier(title=title, message=text, token_name='news').send_message()
self.logs_handle.logs_write('apprcn', '发送消息完成', 'done', False)
else:
self.logs_handle.logs_write('apprcn', '没有新数据, 不发送邮件', 'done', False)
if __name__ == "__main__":
APPRCN().main()

@ -0,0 +1,55 @@
# -*- coding: utf-8 -*-
'''
网络爬虫抓取链捕手新闻(data-v-***** 此参数会失效, 定期更换)
'''
import sys
import os
from playwright.sync_api import sync_playwright
from bs4 import BeautifulSoup
sys.path.append(os.path.join(os.path.abspath(__file__).split('AutoInfo')[0] + 'AutoInfo'))
from utils.utils import *
def chaincatcher_news():
url = "https://www.chaincatcher.com/news"
with sync_playwright() as p:
browser = p.chromium.launch(headless=True)
page = browser.new_page()
try:
page.goto(url)
time.sleep(2)
start_time = time.time()
while time.time() - start_time < 10:
page.mouse.wheel(0, 100)
time.sleep(0.1)
page_content = page.content()
browser.close()
soup = BeautifulSoup(page_content, 'html.parser')
contents = [span.get_text(strip=True) for span in soup.find_all('span', class_='text', attrs={'data-v-6560eea9': True}) if "微信扫码" not in span]
result = '\n'.join(contents)
if result:
result += f'\n推送时间: {datetime.now().strftime("%Y年%m月%d%H时%M分%S秒")}'
title = 'ChainCatcher' + str(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
sub = 'ChainCatcher News'
SendEmail(subject=sub, title=title, text=result).send()
# GotifyNotifier(title='ChainCatcher News', message=result, token_name='news').send_message()
else:
print("No news found.")
except Exception as e:
raise e
finally:
browser.close()
for retry in range(5):
try:
chaincatcher_news()
break
except Exception as e:
sleep_time = 20
print(f"Error occurred: {e}. Retrying... {retry + 1} \t sleep time: {sleep_time}")
time.sleep(sleep_time)

@ -0,0 +1,236 @@
# -*- coding: utf-8 -*-
'''
chiphell
'''
import os
import random
import sys
import threading
import re
sys.path.append(os.path.join(os.path.abspath(__file__).split('AutoInfo')[0] + 'AutoInfo'))
from utils.utils import *
config_json = LoadConfig().load_config()
DEFAULT_RE_PUSH_TIMES = config_json['DEFAULT_RE_PUSH_TIMES']
class CHIPHELL(object):
def __init__(self):
self.logs_handle = LogsHandle()
self.now_day = time.strftime('%Y-%m-%d', time.localtime())
self.base_url = 'https://www.chiphell.com/'
self.href_url = 'portal.php?mod=list&catid={}'
self.db = 'NEWS'
self.collection = 'chiphell_info'
self.headers = {
'User-Agent': 'Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 6.1; WOW64; Trident/4.0; SLCC2; Media Center PC 6.0; InfoPath.2; MS-RTC LM 8'
}
self.temp_datas = []
def req(self, source, target):
print(f'正在获取 {source} 数据')
# sleep_time = random.uniform(10, 15)
sleep_time = random.uniform(1, 2)
print(f'睡眠 {sleep_time}')
time.sleep(sleep_time)
result_list = []
try:
url = self.base_url + self.href_url.format(target)
print(url)
resp = httpx.get(url=url, headers=self.headers)
except Exception as e:
print(e)
return 0
if resp.status_code == 200:
resp.encoding = 'utf-8'
# print(resp.text)
dl_list = re.findall('<dt class="xs2">([\S\s]*?)</dl>', resp.text)
for dl in dl_list:
if dl:
url_list = re.findall('<a href="(.*?)" target="_blank" ', dl)
title_list = re.findall('class="xi2" style="">(.*?)</a> </dt>', dl)
img_url_list = re.findall('target="_blank"><img src="(.*?)"', dl)
context_list = re.findall('class="tn" /></a></div>([\S\s]*?)</dd>', dl)
post_time_list = re.findall('<span class="xg1"> (.*?)</span>', dl)
for url, title, img_url, context, post_time in zip(url_list, title_list, img_url_list, context_list,
post_time_list):
# 清理正文内容的空格和换行等字符
if context:
for i in [' ', '\n']:
context = context.replace(i, '')
context = context.replace('\r', ' ')
result_list.append({
"title": title,
"context": context,
"source_url": self.base_url + url,
'link': '',
"article_type": source.split(' - ')[1],
"article_source": source.split(' - ')[0],
"img_url": img_url,
'keyword': '',
"posted_date": post_time,
"create_time": int(time.time()),
"create_datetime": datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
"repush_times": DEFAULT_RE_PUSH_TIMES
})
else:
print(resp.status_code)
return 0
return result_list
def save_to_mongo(self, collection, source_data):
print(f'正在处理 {self.collection} 数据')
mongo = MongoHandle(db=self.db, collection=self.collection, del_db=False, del_collection=False, auto_remove=0)
for data_to_insert in source_data:
try:
# 检查数据库中是否存在匹配的文档
filter_criteria = {'title': data_to_insert.get('title', '')} # 确保 title 字段有值
count = mongo.collection.count_documents(filter_criteria)
if count == 0:
# 如果没有找到匹配的文档,插入新文档
result = mongo.collection.insert_one(data_to_insert)
# 准备发送邮件的数据
self.temp_datas.append(data_to_insert)
except TypeError as te:
print('\n%s' % te)
self.logs_handle.logs_write('chiphell', '写入数据库报错: %s' % te, 'error', False)
return 0
print(f'处理 chiphell - {collection}数据完成')
def send_to_email(self):
text = '********************************************************\n'
for data in self.temp_datas:
text += '标题: {}\n'.format(data['title'])
text += '正文: {}\n'.format(data['context'])
text += '板块: {}\n'.format(data['article_source'])
text += '类型: {}\n'.format(data['article_type'])
text += '文章地址: {}\n'.format(data['source_url'])
text += '文章时间: {}\n'.format(data['posted_date'])
text += '获取时间: {}\n'.format(data['create_datetime'])
text += '********************************************************\n\n'
title = 'chiphell - info - ' + str(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
sub = 'chiphell - info'
SendEmail(subject=sub, title=title, text=text).send()
# GotifyNotifier(title=title, message=text, token_name='news').send_message()
self.logs_handle.logs_write('chiphell', f'{title}-发送邮件完成', 'done', False)
def main(self):
category = {
'评测': {
'笔记本': '19',
'机箱': '11',
# '处理器': '13',
# '散热器': '14',
# '主板': '15',
# '内存': '137',
# '外设': '18',
# '电源': '35',
'存储': '23',
'显示设备': '21',
# '台式机': '88',
'显卡': '10',
# '相机': '116'
},
'电脑': {
'配件开箱': '98',
'整机搭建': '99',
'桌面书房': '101'
},
'掌设': {
'智能手机': '40',
'智能穿戴': '89',
'笔电平板': '41',
# '周边附件': '92'
},
# '摄影': {
# '微单卡片': '52',
# '单反单电': '51',
# '经典旁轴': '53',
# '怀旧菲林': '54',
# '影音摄像': '57',
# '周边附件': '55'
# },
# '汽车': {
# '买菜车': '58',
# '商务车': '59',
# '性能车': '63',
# '旅行车': '60',
# 'SUV': '61',
# 'MPV': '95',
# '摩托轻骑': '65',
# '改装配件': '96'
# },
# '单车': {
# '山地车': '108',
# '公路车': '109',
# '折叠车': '110',
# '休旅车': '111'
# },
# '腕表': {
# '机械表': '128',
# '电子表': '126'
# },
'视听': {
'耳机耳放': '71',
'音箱功放': '72',
# '解码转盘': '73',
'随身设备': '74'
},
'美食': {
'当地美食': '68',
'世界美食': '117',
'私房菜品': '69',
'美食器材': '70'
},
# '家居': {
# '家居': '132'
# },
}
response_datas = {}
for source1, tags in category.items():
# source1作为表名, 先放到response_datas里面
if source1 not in response_datas:
response_datas[source1] = []
for source2, target in tags.items():
source = source1 + ' - ' + source2
response_data = self.req(source, target)
if response_data != 0:
response_datas[source1] += response_data
if response_datas:
threads = []
for k, v in response_datas.items():
thread = threading.Thread(target=self.save_to_mongo, args=(k, v,))
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
if self.temp_datas:
self.send_to_email()
return None
else:
self.logs_handle.logs_write('chiphell - info', '获取数据为空', 'error', False)
return False
if __name__ == '__main__':
CHIPHELL().main()

@ -0,0 +1,302 @@
# -*-coding: utf-8 -*-
import os
import sys
import threading
sys.path.append(os.path.join(os.path.abspath(__file__).split('AutoInfo')[0] + 'AutoInfo'))
from utils.utils import *
class GetData(object):
def __init__(self, get_num=9999999):
self.get_num = get_num
self.url = 'https://webapi.sporttery.cn/gateway/lottery/getHistoryPageListV1.qry?gameNo=85&provinceId=0&pageSize={}&isVerify=1&pageNo=1'.format(
get_num)
self.logs_handle = LogsHandle()
self.email_subject = 'dlt'
self.email_title = '超级大乐透最新一期开奖查询对比'
self.email_text = '获取数据时间:\n{0}\n{1}\n\n\n\n'.format(datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
('-' * 90))
self.logs_handle = LogsHandle()
self.now_day = time.strftime('%Y-%m-%d', time.localtime())
db = 'dlt'
collection = 'dlt_' + self.now_day
self.mongo = MongoHandle(db=db, collection=collection, del_db=False, del_collection=False, auto_remove=0)
def main(self):
data_list = self.req()
result_data = self.data_handle(data_list)
return result_data
def req(self):
resp = httpx.get(self.url)
if resp.status_code != 200:
print('state code: {}'.format(resp.status_code))
log_detail = '访问失败, 状态码:{},url:{}'.format(resp.status_code, self.url)
self.logs_handle.logs_write('auto_get_and_check_dlt', log_detail, 'error', False)
exit(0)
resp_json = resp.json()
value = resp_json.setdefault('value')
data_list = value.setdefault('list')
if not data_list:
self.logs_handle.logs_write('auto_get_and_check_dlt', '返回的数据为空, 获取数据失败', 'error', False)
return
print('已获取数据')
return data_list
def data_handle(self, data_list):
result_data = []
for d in data_list:
numbers = d.setdefault('lotteryUnsortDrawresult')
try:
if len(numbers.split(' ')) < 7:
continue
except Exception as e:
print('numbers: {}, err: {}'.format(numbers, e))
continue
red_list = numbers.split(' ')[:5]
blue_list = numbers.split(' ')[5:]
red_list.sort()
blue_list.sort()
try:
# 切开红球,蓝球数组
red1 = red_list[0]
red2 = red_list[1]
red3 = red_list[2]
red4 = red_list[3]
red5 = red_list[4]
blue1 = blue_list[0]
blue2 = blue_list[1]
except Exception as e:
print('红球或蓝球数据丢失')
continue
result_data.append({
'serial': d.setdefault('lotteryDrawNum'),
'red1': red1 or '',
'red2': red2 or '',
'red3': red3 or '',
'red4': red4 or '',
'red5': red5 or '',
'blue1': blue1 or '',
'blue2': blue2 or '',
'drawPdfUrl': d.setdefault('drawPdfUrl'),
'date': d.setdefault('lotteryDrawTime'),
'pool': d.setdefault('poolBalanceAfterdraw')
})
if result_data:
return result_data
else:
self.logs_handle.logs_write('auto_get_and_check_dlt', '返回的数据为空, 获取数据失败', 'error', False)
exit(0)
class CheckMyDLT(object):
def __init__(self, data):
self.my_dlt = [
['10', '11', '16', '17', '18', '11', '12'],
['02', '03', '11', '12', '23', '05', '06'],
['07', '09', '15', '17', '22', '09', '11'],
['05', '06', '07', '34', '35', '02', '09'],
['09', '10', '11', '21', '22', '04', '05']
]
self.data = data
def main(self):
print('开始数据对比')
prepare_send_text, prepare_send_subject = self.process_text()
self.send_data(prepare_send_subject, prepare_send_text)
def process_text(self):
text = ''
serial_text = None
subject = None
for data in self.data:
red_list = [data['red1'], data['red2'], data['red3'], data['red4'], data['red5']]
blue_list = [data['blue1'], data['blue2']]
# 只查询一期时, subject显示, 如果查询多期,则subject不显示
if len(data) == 1:
subject = '{}'.format(data['serial'])
# 组成每期数据的text
serial_text = 'serial: {}\t\tlottery draw date: {}\t\tbonus pool: {} RMB\n{}\nlottery draw num: {} + {}\n'.format(
data['serial'], data['date'], data['pool'], '*' * 90,
red_list, blue_list)
for my_num in self.my_dlt:
my_red_list = my_num[:5]
my_blue_list = my_num[5:]
# 使用列表推导式找出两个列表中都存在的元素
red_common_elements = [element for element in red_list if element in my_red_list]
blue_common_elements = [element for element in blue_list if element in my_blue_list]
# 计算相等元素的数量
red_equal_count = len(red_common_elements)
blue_equal_count = len(blue_common_elements)
serial_text += 'my nums: {} + {}\t\tred hit: {}\tblue hit: {}\n'.format(my_red_list, my_blue_list,
red_equal_count,
blue_equal_count)
text += serial_text
text += '{}\n\n\n\n'.format('*' * 90)
return text, subject
def send_data(self, subject, text):
title = '超级大乐透最新一期开奖查询对比'
SendEmail(subject, title, text).send()
class SaveToDB(object):
def __init__(self, data):
self.logs_handle = LogsHandle()
self.now_day = time.strftime('%Y-%m-%d', time.localtime())
db = 'dlt'
collection = 'dlt_' + self.now_day
self.mongo = MongoHandle(db=db, collection=collection, del_db=False, del_collection=True, auto_remove=0)
self.data = data
def save_data(self):
print('开始保存数据')
for data in self.data:
data_to_insert = {
"serial": data.setdefault('serial'),
"red1": data.setdefault('red1'),
"red2": data.setdefault('red2'),
"red3": data.setdefault('red3'),
"red4": data.setdefault('red4'),
"red5": data.setdefault('red5'),
"blue1": data.setdefault('blue1'),
"blue2": data.setdefault('blue2'),
"date": data.setdefault('date'),
"pool": data.setdefault('pool'),
"drawPdfUrl": data.setdefault('drawPdfUrl'),
"create_time": int(time.time()),
"create_datetime": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
self.mongo.collection.insert_one(data_to_insert)
print('数据已储存, 共储存数据{}'.format(len(self.data)))
class DLT(object):
def start(self, n):
# # 获取数据
G = GetData(n)
data = G.main()
return data
def check(self, data):
# # 读取数据并发送到邮件
Check = CheckMyDLT(data)
Check.main()
def mongo(self, data):
# 存 mongodb
Mongo = SaveToDB(data)
Mongo.save_data()
def main(self):
L = LogsHandle()
L.logs_write('auto_get_and_check_dlt', 'dlt任务开始', 'start', False)
data = self.start(30)
if data:
tasks = [
self.check,
self.mongo
]
threads = []
for i in tasks:
thread = threading.Thread(target=i, args=(data,))
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
L.logs_write('auto_get_and_check_dlt', 'dlt任务结束', 'start', False)
print('done')
else:
L.logs_write('auto_get_and_check_dlt', '获取数据失败', 'error', False)
class Luanch(object):
def start(self, n):
# # 获取数据
G = GetData(n)
data = G.main()
return data
def check(self, data):
# # 读取数据并发送到邮件
Check = CheckMyDLT(data)
Check.main()
def mongo(self, data):
# 存 mongodb
Mongo = SaveToDB(data)
Mongo.save_data()
def main(self):
Logs = LogsHandle()
Logs.logs_write('auto_get_and_check_dlt', 'dlt任务开始', 'start', False)
data = self.start(30)
if data:
tasks = [
self.check,
self.mongo
]
threads = []
for i in tasks:
thread = threading.Thread(target=i, args=(data,))
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
Logs.logs_write('auto_get_and_check_dlt', 'dlt任务结束', 'start', False)
print('done')
else:
Logs.logs_write('auto_get_and_check_dlt', '获取数据失败', 'error', False)
if __name__ == '__main__':
Luanch().main()
# ## 单独获取数据
# G = GetData()
# data = G.main()
# re_data = data[::-1]
# save_txt = ''
# for item in re_data:
# save_txt += f'[[{item["red1"]}, {item["red2"]}, {item["red3"]}, {item["red4"]}, {item["red5"]}], [{item["blue1"]}, {item["blue2"]}]],\n'
#
# with open('dlt.txt', 'w') as f:
# f.write(save_txt)

@ -0,0 +1,143 @@
# -*- coding: utf-8 -*-
'''
Hello Github
'''
import os
import sys
import time
from datetime import datetime
import httpx
sys.path.append(os.path.join(os.path.abspath(__file__).split('AutoInfo')[0] + 'AutoInfo'))
from utils.utils import *
config_json = LoadConfig().load_config()
DEFAULT_RE_PUSH_TIMES = config_json.get('DEFAULT_RE_PUSH_TIMES')
class HelloGithub(object):
def __init__(self):
self.logs_handle = LogsHandle()
self.now_day = time.strftime('%Y-%m-%d', time.localtime())
self.base_url = 'https://api.hellogithub.com/v1/?sort_by=last&tid=&page={}'
self.headers = {
'User-Agent': 'Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 6.1; WOW64; Trident/4.0; SLCC2; Media Center PC 6.0; InfoPath.2; MS-RTC LM 8'
}
self.db = 'NEWS'
self.collection = 'HelloGithub_info'
self.source_url = 'https://hellogithub.com/repository/'
self.send_email_datas = []
self.send_email_now = 0
def main(self):
self.logs_handle.logs_write('HelloGithub', '开始获取 HelloGithub 数据', 'start', False)
targets = ['featured']
response_datas = []
for target in targets:
response_data = self.req(target)
response_datas += response_data
if response_datas:
self.save_to_mongo(response_datas)
else:
self.logs_handle.logs_write('HelloGithub', '获取 HelloGithub 数据失败', 'error', False)
self.logs_handle.logs_write('HelloGithub', 'HelloGithub 数据获取完成', 'done', False)
print('获取 HelloGithub 数据 done')
if self.send_email_now:
if self.send_email_datas:
self.send_to_email()
else:
print('没有新数据, 不发送邮件')
def req(self, target):
print('开始获取 HelloGithub {} 数据'.format(target))
response_data = []
for i in range(1, 5):
url = 'https://api.hellogithub.com/v1/?sort_by={}&tid=&page={}'.format(target, i)
try:
response = httpx.get(url=url, headers=self.headers)
except Exception as e:
print("请求出错{}, \nurl: {}".format(e, url))
continue
if response.status_code != 200:
print(
'获取 HelloGithub {} 数据, 状态码: {}, 程序退出\n检查目标地址: https://api.hellogithub.com/v1/?sort_by={}&tid=&page={}'.format(
target, response.status_code, target, i))
self.logs_handle.logs_write('HelloGithub', '请求失败, 状态码: %s' % response.status_code, 'error',
False)
exit(0)
json_data = response.json()
for d in json_data.setdefault('data'):
response_data.append({
"title": d.setdefault('title', ''),
"context": '---'.join([d.setdefault('summary', ''), d.setdefault('description', '')]),
"source_url": 'https://hellogithub.com',
'link': self.source_url + d.setdefault('item_id'),
"article_type": '',
"article_source": target,
"img_url": '',
'keyword': '',
"posted_date": d.setdefault('updated_at'),
"create_time": int(time.time()),
"create_datetime": datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
"repush_times": DEFAULT_RE_PUSH_TIMES
})
if response_data:
return response_data
else:
self.logs_handle.logs_write('HelloGithub', '获取数据失败', 'error', False)
def save_to_mongo(self, data):
print(f'开始储存 HelloGithub 数据')
for data_to_insert in data:
mongo = MongoHandle(db=self.db, collection=self.collection, del_db=False, del_collection=False,
auto_remove=0)
try:
# 检查数据库中是否存在匹配的文档
filter_criteria = {'title': data_to_insert.get('title', '')} # 确保 title 字段有值
count = mongo.collection.count_documents(filter_criteria)
if count == 0:
# 如果没有找到匹配的文档,插入新文档
result = mongo.collection.insert_one(data_to_insert)
# 准备发送邮件的数据
self.send_email_datas.append(data_to_insert)
except TypeError as te:
print('\n%s' % te)
self.logs_handle.logs_write('HelloGithub', '写入数据库报错: %s' % te, 'error', False)
return 0
print(f'处理 HelloGithub 数据完成', datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
def send_to_email(self):
title = 'HelloGithub - info'
subject = 'HelloGithub - info'
text = '********************************************************\n'
for data in self.send_email_datas:
text += '标题: {}\n'.format(data['title'])
text += '正文: {}\n'.format(data['context'])
text += '文章地址: {}\n'.format(data['source_url'])
text += '文章时间: {}\n'.format(data['posted_date'])
text += '获取时间: {}\n'.format(data['create_datetime'])
text += '********************************************************\n\n'
SendEmail(subject=subject, title=title, text=text).send()
# GotifyNotifier(title=title, message=text, token_name='news').send_message()
self.logs_handle.logs_write('HelloGithub', f'{title}-发送邮件完成', 'done', False)
if __name__ == "__main__":
HelloGithub().main()

@ -0,0 +1,197 @@
import smtplib
from email.mime.text import MIMEText
from email.header import Header
import datetime
import re
import psycopg2
from psycopg2 import Error
from concurrent.futures import ThreadPoolExecutor, as_completed
class FreshRSSDatabase:
def __init__(self):
self.hostname = 'erhe.top'
self.port = 20788
self.database = 'freshrss'
self.user = 'freshrss'
self.password = 'freshrss'
self.conn = None
self.keys = [
{'web3新闻': 'web3|区块链|NFT|DeFi|NFT'},
{'购物类新闻': '大疆|无人机|硬盘|鼠标|纸巾|穿越机|礼物'},
{'coin新闻': 'btc|eth|sui|degen'}
]
self.ellipsis = 300
self.days = 3
def connect(self):
"""连接到 PostgreSQL 数据库"""
try:
self.conn = psycopg2.connect(
dbname=self.database,
user=self.user,
password=self.password,
host=self.hostname,
port=self.port
)
except Error as e:
print(f"Error connecting to the database: {e}")
raise # 重新抛出异常
def execute_query(self, keywords):
"""执行 SQL 查询并返回结果"""
if self.conn is None:
self.connect()
if self.conn is None:
print("Database connection failed")
return None
try:
cur = self.conn.cursor()
conditions = [f"title ILIKE '%{keyword}%' AND content ILIKE '%{keyword}%'" for keyword in
keywords.split('|')]
sql = f"""
SELECT *
FROM freshrss_toor_entry
WHERE {" OR ".join(conditions)}
AND date > EXTRACT(EPOCH FROM NOW() - INTERVAL '1 day')
ORDER BY date DESC;
"""
cur.execute(sql)
records = cur.fetchall()
cur.close()
return records
except Error as e:
print(f"An error occurred: {e}")
return None
def close(self):
"""关闭数据库连接"""
if self.conn:
self.conn.close()
def remove_all_html_tags(self, text):
"""
移除字符串中的所有 HTML 标签
参数:
text (str): 包含 HTML 标签的原始文本
返回:
str: 移除所有 HTML 标签后的文本
"""
clean_text = re.sub(r'<[^>]+>', '', text)
clean_text = clean_text.replace(' ', '')
clean_text = clean_text.replace('\n', '')
if len(clean_text) > self.ellipsis:
clean_text = clean_text[:self.ellipsis] + '...'
return clean_text
def send_email(self, subject='', title='', text=''):
mail_host = "smtp.163.com"
mail_user = "pushmessagebot@163.com"
mail_pass = "WSMSRKBKXIHIQWTU"
sender = "pushmessagebot@163.com"
receivers = ["pushmessagebot@163.com"]
message = MIMEText(text, 'plain', 'utf-8')
message['From'] = Header(title, 'utf-8')
message['To'] = Header("RSS data", 'utf-8')
message['Subject'] = Header(subject, 'utf-8')
try:
smtpObj = smtplib.SMTP_SSL(mail_host)
smtpObj.login(mail_user, mail_pass)
smtpObj.sendmail(sender, receivers, message.as_string())
print(f"{title} 邮件发送成功")
except smtplib.SMTPException as e:
print("Error: 无法发送邮件", e)
def query_and_process_key(self, key_name, keywords):
records = self.execute_query(keywords)
if records:
unique_records = {}
for record in records:
title = self.remove_all_html_tags(record[2]) # 获取标题
if title not in unique_records:
unique_records[title] = {
"title": title,
"content": self.remove_all_html_tags(record[4]),
"link": record[5],
"postdate": (datetime.datetime.utcfromtimestamp(record[7])
.strftime('%Y-%m-%d %H:%M:%S')) if record[7] else '',
"posttimestamp": record[7] or 0
}
return list(unique_records.values())
return None
def prepare_to_send(self, data):
source_key = data.get('source_key')
keys = data.get('keys')
data_list = data.get('data')
filter_data = []
# 计算过去一天的时间戳
one_day_ago = datetime.datetime.now() - datetime.timedelta(days=self.days)
# 将 datetime 对象转换为时间戳
one_day_ago_timestamp = one_day_ago.timestamp()
for value in data_list:
if value['posttimestamp'] >= one_day_ago_timestamp:
filter_data.append(value)
sorted_list = sorted(filter_data, key=lambda x: x['posttimestamp'], reverse=True)
subject = 'RSS' + data.get('source_key')
title = source_key
key_data_total = len(data.get('data'))
text = '关键词:\n' + data.get('keys').replace('|', '\n') + '\n\n'
text += '一共搜索到: ' + str(key_data_total) + ' 条数据\n\n'
text += '*' * 80 + '\n'
for d in sorted_list:
text += '标题: ' + d.get('title') + '\n'
text += '内容: ' + d.get('content') + '\n'
text += '链接: ' + d.get('link') + '\n'
text += '发布日期: ' + d.get('postdate') + '\n'
text += '时间戳: ' + str(d.get('posttimestamp')) + '\n\n'
text += '*' * 80
text += '\n\n'
self.send_email(subject=subject, title=title, text=text)
def main(self):
# 执行查询
loaded_data = {}
with ThreadPoolExecutor(max_workers=len(self.keys)) as executor:
future_to_key = {executor.submit(self.query_and_process_key, k, v): (k, v) for sublist in self.keys for k, v
in sublist.items()}
for future in as_completed(future_to_key):
key_name, keywords = future_to_key[future]
try:
data = future.result()
if data:
loaded_data[key_name] = {
'source_key': key_name,
'keys': keywords,
'data': data
}
else:
print(f'key: {key_name} 数据为空')
except Exception as exc:
print(f'{key_name} generated an exception: {exc}')
# 关闭数据库连接
self.close()
for source_key, data in loaded_data.items():
self.prepare_to_send(data)
print('done!')
if __name__ == "__main__":
f = FreshRSSDatabase()
f.main()

@ -0,0 +1,62 @@
from datetime import datetime
import json
import httpx
url = "https://api.chainalert.me/"
headers = {
"Content-Type": "application/x-www-form-urlencoded; charset=UTF-8",
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36"
}
text = ''
n = 0
with httpx.Client() as client:
for page in range(1, 5):
payload = {
"method": "listData",
"params": ['', "CRYPTO_RANK_AIRDROP", page, 0, 0, '']
}
for retry in range(10):
try:
response = client.post(url, headers=headers, data=payload, timeout=3)
except Exception as e:
print(str(e))
continue
if response.status_code != 200:
print(response.status_code)
continue
else:
data = response.json()
if not data:
client.close()
continue
try:
data_list = data['result']
airdrop_list = data_list[0]['data']
airdrop_list = json.loads(airdrop_list)
except Exception as e:
print(str(e))
continue
for airdrop in airdrop_list:
name = airdrop['name']
rank = airdrop['rank']
task = airdrop['item1']
update_date = airdrop['item2']['updateDate']
financing = airdrop['item4']
logoUrl = airdrop['logoUrl']
if task == '无任务':
continue
if task == 'No active tasks':
continue
task = '成本: {}, 耗时: {}, 任务类型: {}'.format(task.get('cost'), task.get('time'), task.get('task'))
text += '任务名称: {}\n排名: {}\n任务详细: {}\n更新时间: {}\n融资: {}\nlogo: {}\n'.format(name, rank, task, update_date, financing, logoUrl)
text += '=' * 50 + '\n'
n += 1
break
if text:
print(f'一共 {n} 条数据')
httpx.post('https://gotify.erhe.top/message?token=Aoe0VKt-kkZnm8d', headers={'Content-Type': 'application/json'}, json={'title': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), 'message': text})

@ -0,0 +1,201 @@
# -*- coding: utf-8 -*-
import os
import sys
sys.path.append(os.path.join(os.path.abspath(__file__).split('AutoInfo')[0] + 'AutoInfo'))
from utils.utils import *
retry_count = 5
def fetch_coin_data(target):
url = "https://api.chainalert.me/"
headers = {
"Content-Type": "application/x-www-form-urlencoded; charset=UTF-8",
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36"
}
payload = {
"method": "listData",
"params": [datetime.now().strftime("%Y-%m-%d"), "MARKETPRICE", '', 0, 9999.0, target]
}
with httpx.Client() as client:
try:
response = client.post(url, headers=headers, data=payload, timeout=3)
except Exception as e:
# print(f"Target: {target} failed to fetch data. error: {str(e)}")
client.close()
return False
if response.status_code != 200:
client.close()
# print(f"{target} failed to fetch data. status code: {response.status_code}")
return False
else:
text = ''
data = response.json()
try:
target_data = eval(data['result'][0]['data'])
except Exception as e:
client.close()
raise Exception(f"Failed to parse data: {data}, error: {str(e)}")
target_data = target_data[0]
# print(target_data)
# 获取数据值
name = target_data['name']
rank = target_data['rank']
price = target_data['item1']
volume = target_data['item2']
change = target_data['item3']
market_cap = target_data['item4']
dilute = target_data['item5']
logoUrl = target_data['logoUrl']
# 拼接到 text 中
text = '{} {} {} {} {} {}'.format(name, price, change, volume, rank, market_cap)
print(text)
# text += f'Name: {name}\n'
# text += f'Ranking: {rank}\n'
# text += f'Price: {price}\n'
# text += f'24H Transaction Volume: {volume}\n'
# text += f'24H Price Change: {change}\n'
# text += f'Market Capitalization: {market_cap}\n'
# text += f'Diluted Market Value: {dilute}\n'
# text += f'Logo: {logoUrl}\n'
return text + '\n' + ('-' * len(text)) + '\n'
def fetch_vix_data():
url = "https://api.chainalert.me/"
headers = {
"Content-Type": "application/x-www-form-urlencoded; charset=UTF-8",
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36"
}
payload = {
"method": "listData",
"params": ['', "GREEDY_INDEX", 1, 0, 1, '']
}
with httpx.Client() as client:
try:
response = client.post(url, headers=headers, data=payload, timeout=3)
except Exception as e:
# print(f"failed to fetch VIX data. error: {str(e)}")
client.close()
return False
if response.status_code != 200:
client.close()
# print(f"Failed to fetch VIX data. status code: {response.status_code}")
return False
else:
data = response.json()
vix_data = eval(data['result'][0]['data'])
vix_data = vix_data[0]
print(vix_data)
greedy = vix_data['greedy']
level = vix_data['level']
text = f'VIX data: {greedy}\nLevel: {level}'
return text
def fetch_gas_data():
url = "https://a5.maiziqianbao.net/api/v1/chains/EVM/1/gas_price"
headers = {
"Host": "a5.maiziqianbao.net",
"Connection": "keep-alive",
"x-req-token": "MDbO4FsaSUPdjCdvTUs2zY4V3rnvvYatvYyjz7SfY+aCJ8r+RFm06X2dGR8eEDK7Gc5g1TLEQySEhGerRXbDT/NS+e5QAWRU68yD8m4y/aKK+TBkIv90VwvxmvYId2BVoDPDHQCGG4o3EqRWkS93eV0twYQ7w7qvNUj2e3tpDcUZYuplPyLozgYVTegFPnDk",
"Accept": "*/*",
"x-app-type": "iOS-5",
"x-app-ver": "1.0.1",
"x-app-udid": "419815AD-3015-4B5A-92CA-3BCBED24ACEC",
"x-app-locale": "en",
"Accept-Language": "zh-Hans-CN;q=1.0, en-CN;q=0.9",
"Accept-Encoding": "br;q=1.0, gzip;q=0.9, deflate;q=0.8",
"User-Agent": "MathGas/1.0.1 (MathWallet.MathGas; build:3; macOS 13.5.0) Alamofire/5.4.4"
}
with httpx.Client() as client:
response = client.get(url, headers=headers)
if response.status_code != 200:
client.close()
print("Error:", response.status_code)
return False
if not response.json():
client.close()
print("Not Find GAS Data. Error: No response")
return False
remove_last_n_chars = lambda n, n_chars=9: int(str(n)[:-n_chars]) if len(str(n)) > n_chars else n
result = '\nGAS:\n'
try:
data = response.json()['data']
fastest = remove_last_n_chars(data['fastest']['price'])
fast = remove_last_n_chars(data['fast']['price'])
standard = remove_last_n_chars(data['standard']['price'])
low = remove_last_n_chars(data['low']['price'])
base = remove_last_n_chars(data['base']['price'])
print(f'fastest: {fastest} - fast: {fast} - standard: {standard} - low: {low} - base: {base}')
result += f'fastest: {fastest}\nfast: {fast}\nstandard: {standard}\nlow: {low}\nbase: {base}'
return result
except Exception as e:
print(e)
return False
def main():
text = ''
# 获取币币实时价格
target_list = ['btc', 'eth', 'sol', 'grass', 'sui', 'doge', 'arb', 'ath', 'move', 'pepe', 'degen', 'act', 'plume']
for target in target_list:
for retry in range(1, retry_count + 1):
result = fetch_coin_data(target)
if result:
text += result
break
else:
print(f"{target} Failed to fetch data. retry: {retry}")
if retry == retry_count:
text += f"{target} Failed to fetch data. retry count: {retry}"
# 获取恐慌指数
for retry in range(1, retry_count + 1):
result = fetch_vix_data()
if result:
text += result + '\n\n'
break
else:
print(f"Failed to fetch VIX data. retry: {retry}")
if retry == retry_count:
text += f"Failed to fetch VIX data. retry count: {retry}"
# 获取实时gas费
for retry in range(1, retry_count + 1):
result = fetch_gas_data()
if result:
text += '\n' + result + '\n\n'
break
else:
# print(f"Failed to fetch Gas data. retry: {retry}")
if retry == retry_count:
text += f"Failed to fetch Gas data. retry count: {retry}"
if text:
GotifyNotifier('Real-time coin price\n', text, 'coin').send_message()
else:
print('No Data')
if __name__ == "__main__":
main()

@ -0,0 +1,127 @@
# -*- coding: utf-8 -*-
'''
币世界 文章板块
'''
import os
import sys
from httpx import HTTPStatusError
import re
sys.path.append(os.path.join(os.path.abspath(__file__).split('AutoInfo')[0] + 'AutoInfo'))
from utils.utils import *
config_json = LoadConfig().load_config()
DEFAULT_RE_PUSH_TIMES = config_json['DEFAULT_RE_PUSH_TIMES']
class BiShiJie(object):
def __init__(self):
self.base_url = 'https://www.528btc.com'
self.url = self.base_url + "/e/extend/api/v2/AjaxPageList/"
self.send_email_datas = []
self.send_email_now = 0
self.logs_handle = LogsHandle()
self.now_day = time.strftime('%Y-%m-%d', time.localtime())
self.headers = {
"Accept": "text/html, */*; q=0.01",
"Accept-Encoding": "gzip, deflate, br, zstd",
"Accept-Language": "zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2",
"Origin": "https://www.528btc.com",
"Referer": "https://www.528btc.com/kx/",
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:129.0) Gecko/20100101 Firefox/129.0",
"X-Requested-With": "XMLHttpRequest",
}
db = 'NEWS'
collection = '币世界-文章'
self.mongo = MongoHandle(db=db, collection=collection, del_db=False, del_collection=False, auto_remove=0)
def req(self):
max_page_num = 1 + 5
all_data = []
for page in range(1, max_page_num):
form_data = {
"pageIndex": f"{page}",
"module": "newslist-v2",
"classid": "114",
"limitpage": "15"
}
try:
response = httpx.post(self.url, headers=self.headers, data=form_data)
# 检查响应状态码
response.raise_for_status()
html = response.text
div_list = re.findall('<div class="slices_item_content">([\S\s]*?)</div>\n.*?</div>\n.*?</div>', html)
for div in div_list:
title_list = re.findall('<div class="title overflow">(.*?)</div>', div)
title = title_list[0] if len(title_list) > 0 else ''
context_list = re.findall('<div class="introduce overflow">(.*?)</div>', div)
context = context_list[0] if len(context_list) > 0 else ''
source_url_list = re.findall('<a target="_blank" href="(.*?)">', div)
source_url = source_url_list[0] if len(source_url_list) > 0 else ''
article_type_list = re.findall('<span class="span">(.*?)</span>', div)
article_type = article_type_list[0] if len(article_type_list) > 0 else ''
posted_date_list = re.findall('<span class="time">(.*?)</span>', div)
posted_date = posted_date_list[0] if len(posted_date_list) > 0 else ''
all_data.append({
"title": title,
"context": context,
"source_url": '',
'link': self.base_url + source_url,
"article_type": article_type,
"article_source": '',
"img_url": '',
'keyword': article_type,
"posted_date": posted_date,
"create_time": int(time.time()),
"create_datetime": datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
"repush_times": DEFAULT_RE_PUSH_TIMES
})
except HTTPStatusError as http_err:
print(f"HTTP error occurred: {http_err}")
except Exception as err:
print(f"An error occurred: {err}")
return all_data
def save_to_mongo(self, data):
print('开始储存 币世界文章 数据')
for data_to_insert in data:
try:
# 检查数据库中是否存在匹配的文档
filter_criteria = {'title': data_to_insert.get('title', '')} # 确保 title 字段有值
count = self.mongo.collection.count_documents(filter_criteria)
if count == 0:
# 如果没有找到匹配的文档,插入新文档
result = self.mongo.collection.insert_one(data_to_insert)
self.send_email_datas.append(data_to_insert)
except TypeError as te:
print('\n%s' % te)
self.logs_handle.logs_write('币世界-文章', '写入数据库报错: %s' % te, 'error', False)
return 0
print('储存数据完成', datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
def main(self):
all_data = self.req()
if not all_data:
print('数据为空')
exit(0)
self.save_to_mongo(all_data)
if __name__ == '__main__':
BiShiJie().main()

@ -0,0 +1,66 @@
# -*- coding: utf-8 -*-
'''
获取天气预报
'''
import os
import sys
sys.path.append(os.path.join(os.path.abspath(__file__).split('AutoInfo')[0] + 'AutoInfo'))
SUB_PROJECT_NAME = "获取天气预报"
PROJECT_PATH = os.path.join(os.path.abspath(__file__).split('AutoInfo')[0] + 'AutoInfo')
from bs4 import BeautifulSoup
from utils.utils import *
class Weather():
def main(self):
print('开始获取天气预报数据')
try:
area_code = '59287'
one_week = [
'/tomorrow-%s.htm' % area_code,
'/third-%s.htm' % area_code,
'/fourth-%s.htm' % area_code,
'/fifth-%s.htm' % area_code,
'/sixth-%s.htm' % area_code,
'/seventh-%s.htm' % area_code,
]
url = "https://tianqi.2345.com/today-%s.htm" % area_code
header = {'User-Agent': 'Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 6.1; WOW64; Trident/4.0; SLCC2; Media Center PC 6.0; InfoPath.2; MS-RTC LM 8'}
response = httpx.get(url=url, headers=header)
response.encoding = "utf-8"
bs = BeautifulSoup(response.text, 'html.parser')
one_week_weather = []
for week in one_week:
a = bs.find_all('a', href=week)
a = ' '.join(a[0].text.split())
one_week_weather.append(a)
except Exception as e:
print(e)
print('Weather forecast')
exit(0)
text = "天气预报: {}获取并发送\n".format(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())) + '\n'.join(
one_week_weather)
# 推送到 message
GotifyNotifier('天气预报数', text, 'weather').send_message()
print('天气预报数据已获取')
if __name__ == "__main__":
try:
W = Weather().main()
except Exception as e:
print(e)
L = LogsHandle()
L.logs_write('Weather forecast', str(e), 'error')

@ -0,0 +1,251 @@
# -*- coding: utf-8 -*-
'''
爬取多个 web 新闻网站
mongo, 但只检索是否已发送过消息
'''
import os
import sys
import threading
sys.path.append(os.path.join(os.path.abspath(__file__).split('AutoInfo')[0] + 'AutoInfo'))
from html import unescape
import re
from utils.utils import *
config_json = LoadConfig().load_config()
DEFAULT_RE_PUSH_TIMES = config_json['DEFAULT_RE_PUSH_TIMES']
class MessageSearchKey(object):
def __init__(self):
db_name = 'NEWS'
collection_name = 'web3_news'
self.mongo = MongoHandle(db=db_name, collection=collection_name, del_db=False, del_collection=False,
auto_remove=0)
self.headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64)",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.5",
"Accept-Encoding": "gzip, deflate, br",
"Connection": "keep-alive",
"Content-Type": "application/json"
}
def techflow(self):
# 深潮TechFlow url: https://www.163.com/dy/media/T1561634363944.html
tag_title = '深潮TechFlow'
data_list = []
target = ['https://www.163.com/dy/media/T1561634363944.html']
for url in target:
print('前往 url: {}'.format(url))
resp = httpx.get(url, headers=self.headers, timeout=10)
if resp.status_code != 200:
print('深潮TechFlow - 获取数据失败, 状态码: {}'.format(resp.status_code))
return False
resp.encoding = 'utf-8'
html = resp.text
context_urls = re.findall('<a href="(.*?)" class="title">', html)
title_list = re.findall('class="title">(.*?)</a>', html)
posted_time_list = re.findall('<span class="time">(.*?)</span>', html)
for title, context_url, posted_time in zip(title_list, context_urls, posted_time_list):
data = {
'title': title,
'context': title,
'source_url': url,
'link': context_url,
'article_type': tag_title,
'article_source': tag_title,
'img_url': '',
'keyword': '',
'posted_date': posted_time,
'create_time': int(time.time()),
'create_datetime': datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
'repush_times': DEFAULT_RE_PUSH_TIMES
}
filter_criteria = {'title': data['title']}
count = self.mongo.collection.count_documents(filter_criteria)
if count == 0:
result = self.mongo.collection.insert_one(data)
def panewslab(self):
tag_title = 'panewslab'
base_url = 'https://www.panewslab.com'
# ------------------------------------------------------------------------------------------------------------
try:
url = 'https://www.panewslab.com/webapi/index/list?Rn=20&LId=1&LastTime=1724891115&TagId=&tw=0'
print('前往 url: {}'.format(url))
resp = httpx.get(url, headers=self.headers, timeout=10)
if resp.status_code != 200:
print('{} - 获取数据失败, 状态码: {}'.format(tag_title, resp.status_code))
return False
resp.encoding = 'utf-8'
resp_json = resp.json()
for resp_data in resp_json['data']:
try:
data = {
'title': resp_data['share']['title'],
'context': resp_data['desc'],
'source_url': url,
'link': resp_data['share']['url'],
'article_type': tag_title,
'article_source': tag_title,
'img_url': '',
'keyword': '',
'posted_date': datetime.utcfromtimestamp(int(resp_data['publishTime'])).strftime(
'%Y-%m-%d %H:%M:%S'),
'create_time': int(time.time()),
'create_datetime': datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
'repush_times': DEFAULT_RE_PUSH_TIMES
}
filter_criteria = {'title': data['title']}
count = self.mongo.collection.count_documents(filter_criteria)
if count == 0:
result = self.mongo.collection.insert_one(data)
except Exception as e:
print(f'{tag_title}: 数据取值失败, {e}')
continue
except Exception as e:
print(f'{tag_title}: 数据取值失败, {e}')
# -------------------------------------------------------------------------------------------------------------
url = 'https://www.panewslab.com/zh/profundity/index.html'
print('前往 url: {}'.format(url))
resp = httpx.get(url, headers=self.headers, timeout=10)
if resp.status_code != 200:
print('{} - 获取数据失败, 状态码: {}'.format(tag_title, resp.status_code))
return False
resp.encoding = 'utf-8'
html = resp.text
context_urls = re.findall('<div class="list-left" data-v-559b28aa><a href="(.*?)" target="_blank"', html)
title_list = re.findall('target="_blank" class="n-title" data-v-559b28aa>(.*?)</a>', html)
context_list = re.findall('<p class="description" data-v-559b28aa>(.*?)</p>', html)
for title, context, context_url in zip(title_list, context_list, context_urls):
data = {
'title': title,
'context': context,
'source_url': url,
'link': base_url + context_url,
'article_type': tag_title,
'article_source': tag_title,
'img_url': '',
'keyword': '',
'posted_date': datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
'create_time': int(time.time()),
'create_datetime': datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
'repush_times': DEFAULT_RE_PUSH_TIMES
}
filter_criteria = {'title': data['title']}
count = self.mongo.collection.count_documents(filter_criteria)
if count == 0:
result = self.mongo.collection.insert_one(data)
# -------------------------------------------------------------------------------------------------------------
url = 'https://www.panewslab.com/zh/news/index.html'
print('前往 url: {}'.format(url))
resp = httpx.get(url, headers=self.headers, timeout=10)
if resp.status_code != 200:
print('{} - 获取数据失败, 状态码: {}'.format(tag_title, resp.status_code))
return False
resp.encoding = 'utf-8'
html = resp.text
context_urls = re.findall('class="content" data-v-3376a1f2><a href="(.*?)" target="_blank"', html)
title_list = re.findall('target="_blank" class="n-title" data-v-3376a1f2>(.*?)</a>', html)
context_list = re.findall('</a> <p data-v-3376a1f2>(.*?)</p>', html)
for title, context, context_url in zip(title_list, context_list, context_urls):
data = {
'title': title,
'context': context,
'source_url': url,
'link': base_url + context_url,
'article_type': tag_title,
'article_source': tag_title,
'img_url': '',
'keyword': '',
'posted_date': datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
'create_time': int(time.time()),
'create_datetime': datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
'repush_times': DEFAULT_RE_PUSH_TIMES
}
filter_criteria = {'title': data['title']}
count = self.mongo.collection.count_documents(filter_criteria)
if count == 0:
result = self.mongo.collection.insert_one(data)
def foresightnews(self):
# 获取 foresightnews 新闻数据
tag_title = 'foresightnews'
base_url = 'https://foresightnews.pro/'
# -------------------------------------------------------------------------------------------------------------
url = 'https://foresightnews.pro/'
print('前往 url: {}'.format(url))
resp = httpx.get(url, headers=self.headers, timeout=10)
if resp.status_code != 200:
print('{} - 获取数据失败, 状态码: {}'.format(tag_title, resp.status_code))
return False
resp.encoding = 'utf-8'
html = resp.text
html = unescape(html)
context_urls = re.findall('</div></div></div></a><a href="(.*?)" target="_blank"', html)
title_list = re.findall('<div class="topic-body-title" data-v-3171afda>(.*?)</div>', html)
context_list = re.findall('<div class="topic-body-content" data-v-3171afda>(.*?)</div>', html)
posted_time_list = re.findall('div class="topic-time" data-v-3171afda>(.*?)</div>', html)
for title, context, context_url, posted_time in zip(title_list, context_list, context_urls, posted_time_list):
data = {
'title': title,
'context': context,
'source_url': url,
'link': base_url + context_url,
'article_type': tag_title,
'article_source': tag_title,
'img_url': '',
'keyword': '',
'posted_date': datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
'create_time': int(time.time()),
'create_datetime': datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
'repush_times': DEFAULT_RE_PUSH_TIMES
}
filter_criteria = {'title': title}
count = self.mongo.collection.count_documents(filter_criteria)
if count == 0:
result = self.mongo.collection.insert_one(data)
def main(self):
# 打开浏览器之后, 按照每个网站不同的规则, 进行数据获取, 最后无论成功或者失败, 都放到 self.data_set
# 每条新闻数据格式: {text: '', url: '', post_time: ''}
# 跑完所有规则, 在数据库判定是否发送过消息, 数据格式: {text: '', url: '', post_time: '', push_count: 0}
functions = [
self.techflow,
self.panewslab,
self.foresightnews
]
# 创建并启动线程
print('创建并启动线程')
threads = []
for func in functions:
thread = threading.Thread(target=func)
thread.start()
threads.append(thread)
# 等待所有线程完成
for thread in threads:
thread.join()
print('程序运行结束')
if __name__ == "__main__":
m = MessageSearchKey()
m.main()

@ -0,0 +1,238 @@
# -*- coding: utf-8 -*-
import os
import json
import urllib.request
import urllib.parse
# 青龙面板的地址
url = "https://ql.erhe.link"
# 登录青龙面板
def login():
data = json.dumps({"username": "toor", "password": "!QAZ2wsx+0913"}).encode('utf-8')
req = urllib.request.Request(
f"{url}/api/user/login",
data=data,
headers={'Content-Type': 'application/json'}
)
try:
with urllib.request.urlopen(req) as response:
result = json.loads(response.read().decode('utf-8'))
return result['data']['token']
except urllib.error.HTTPError as e:
print(f"Login failed with status code: {e.code}")
print(e.read().decode('utf-8'))
exit(0)
# 获取任务列表
def get_tasks(token):
req = urllib.request.Request(
f"{url}/api/crons",
headers={"Authorization": f"Bearer {token}"}
)
with urllib.request.urlopen(req) as response:
result = json.loads(response.read().decode('utf-8'))
return result['data']['data']
# 创建任务
def create_task(task_template, token):
payload = {
"name": task_template["name"],
"command": task_template["command"],
"schedule": task_template["schedule"],
"labels": task_template["labels"]
}
data = json.dumps(payload).encode('utf-8')
req = urllib.request.Request(
f"{url}/api/crons",
data=data,
headers={
"Authorization": f"Bearer {token}",
"Content-Type": "application/json"
}
)
with urllib.request.urlopen(req) as response:
return json.loads(response.read().decode('utf-8'))
# 创建视图分类
def create_view_type(token):
view_type_list = ['base', 'spider_common']
for view_type in view_type_list:
payload = {
"name": view_type,
"filters": {
'property': 'labels',
'operation': 'Reg',
'value': view_type
},
'filterRelation': 'and'
}
data = json.dumps(payload).encode('utf-8')
req = urllib.request.Request(
f"{url}/api/crons",
data=data,
headers={
"Authorization": f"Bearer {token}",
"Content-Type": "application/json"
}
)
try:
with urllib.request.urlopen(req) as response:
result = json.loads(response.read().decode('utf-8'))
print(f"View type {view_type} created: {result}")
except urllib.error.HTTPError as e:
print(f"Failed to create view type {view_type}: {e}")
# 主逻辑
def main():
while True:
try:
token = login()
print(f"已连接到 {url}")
tasks = get_tasks(token)
tasks_names = [task['name'] for task in tasks]
if tasks:
print("Current tasks name: \n{}, \ntotal: {}".format('\n'.join(tasks_names), str(len(tasks_names))))
else:
print("Tasks list is empty")
project_path = '/ql/data/scripts/AutoInfo/'
base_path = os.path.join(project_path, 'base')
to_email_tasks_path = os.path.join(project_path, 'to_email')
to_gotify_tasks_path = os.path.join(project_path, 'to_gotify')
manual_path = os.path.join(project_path, 'manual')
tasks_template = [{
'base': [
{
"name": "每天开始自动创建日志",
"command": "python3 {}/base_daily_logs_generate.py".format(base_path),
"schedule": "0 0 * * *",
"labels": ["base"]
},
{
"name": "每天结束自动发送日志",
"command": "python3 {}/base_daily_logs_send.py".format(base_path),
"schedule": "58 23 * * *",
"labels": ["base"]
},
{
"name": "每天自动删除旧数据",
"command": "python3 {}/base_timing_remove_data.py".format(base_path),
"schedule": "1 0 * * *",
"labels": ["base"]
},
{
"name": "每天新闻汇总,发送到邮箱",
"command": "python3 {}/base_news_data_collation.py".format(base_path),
"schedule": "0 10 6,12,18 * * *",
"labels": ["base"]
}
],
'to-email': [
{
"name": "对比大乐透最新一期数据,匹配已购买号码,发送消息",
"command": "python3 {}/dlt.py".format(to_email_tasks_path),
"schedule": "30 22 * * 1,3,6",
"labels": ["to-email"]
},
{
"name": "链捕手快讯消息推送",
"command": "python3 {}/chaincatcher.py".format(to_email_tasks_path),
"schedule": "0 */2 * * *",
"labels": ["to-email"]
},
{
"name": "anyknew聚合新闻消息推送",
"command": "python3 {}/anyknew.py".format(to_email_tasks_path),
"schedule": "0 */3 * * *",
"labels": ["to-email"]
},
{
"name": "反斗限免消息推送",
"command": "python3 {}/apprcn.py".format(to_email_tasks_path),
"schedule": "0 */12 * * *",
"labels": ["to-email"]
},
{
"name": "chiphell消息推送",
"command": "python3 {}/chiphell.py".format(to_email_tasks_path),
"schedule": "0 */12 * * *",
"labels": ["to-email"]
},
{
"name": "hello-github消息推送",
"command": "python3 {}/hello_github.py".format(to_email_tasks_path),
"schedule": "0 */12 * * *",
"labels": ["to-email"]
}
],
'to-gotify': [
{
"name": "获取未来 7 天的天气预报",
"command": "python3 {}/weather7day.py".format(to_gotify_tasks_path),
"schedule": "0 0 6,22 * * *",
"labels": ["to-gotify"]
},
{
"name": "获取coin实时数据",
"command": "python3 {}/coin_detail.py".format(to_gotify_tasks_path),
"schedule": "0 * * * *",
"labels": ["to-gotify"]
},
{
"name": "空投任务消息",
"command": "python3 {}/airdrop_tasks.py".format(to_gotify_tasks_path),
"schedule": "0 8,20 * * *",
"labels": ["to-gotify"]
},
{
"name": "币界网消息推送",
"command": "python3 {}/coin_world.py".format(to_gotify_tasks_path),
"schedule": "0 8,20 * * *",
"labels": ["to-gotify"]
},
{
"name": " web3新闻消息推送",
"command": "python3 {}/web3_news.py".format(to_gotify_tasks_path),
"schedule": "0 9,21 * * *",
"labels": ["to-gotify"]
}
],
'manual': [
]
}]
for task_template in tasks_template:
for task_type, task_list in task_template.items():
for task in task_list:
task_name = task["name"]
if task_name in tasks_names:
print("Task {} already exists.".format(task_name))
else:
result = create_task(task, token)
print("Task creation result:", result)
# 创建所有任务之后, 创建视图分类
# create_view_type(token)
break # 正常执行完成后退出循环
except Exception as e:
print("An error occurred: ", e)
print("Retrying...")
if __name__ == "__main__":
main()
print('done!')

@ -0,0 +1 @@
# -*- coding: utf-8 -*-

@ -0,0 +1,213 @@
# -*- coding: utf-8 -*-
import json
import smtplib
import time
from datetime import datetime
import os
import sys
from email.header import Header
from email.mime.text import MIMEText
import httpx
import pymongo
sys.path.append(os.path.join(os.path.abspath(__file__).split('AutoInfo')[0] + 'AutoInfo'))
PROJECT_PATH = os.path.join(os.path.abspath(__file__).split('AutoInfo')[0] + 'AutoInfo')
class LogsHandle(object):
def __init__(self):
self.now_day = time.strftime('%Y-%m-%d', time.localtime())
db = 'logs'
collection = 'logs_' + self.now_day
self.mongo = MongoHandle(db=db, collection=collection, del_db=False, del_collection=False, auto_remove=0)
def logs_generate(self):
data_to_insert = {
"title": "logs",
"context": 'generate logs',
"state": "create",
"create_time": int(time.time()),
"create_datetime": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
self.mongo.collection.insert_one(data_to_insert)
def logs_send(self):
title = 'autoinfo - logs: {}'.format(self.now_day)
text = ''
# TODO
# 从 mongodb 读取日志, 拼接 text, 发送邮件
# 查询所有文档
cursor = self.mongo.collection.find()
# 遍历结果集
for record in cursor:
text += "logs_source: {}, logs_detail: {}, state: {} logs_create_time: {}\n\n".format(
record.setdefault('title'),
record.setdefault('content'),
record.setdefault('state'),
record.setdefault('create_datetime'),
)
GotifyNotifier(title=title, message=text, token_name='logs').send_message()
def logs_write(self, title_source=None, content=None, state=None, send_now=False):
data_to_insert = {
"title": title_source,
"context": content,
"state": state,
"create_time": int(time.time()),
"create_datetime": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
self.mongo.collection.insert_one(data_to_insert)
if send_now:
title = 'Auto Info - running logs: {}'.format(self.now_day)
text = 'logs_source: {}, logs_detail: {}, state: {} logs_create_time: {}'.format(
data_to_insert.setdefault('title'),
data_to_insert.setdefault('content'),
data_to_insert.setdefault('state'),
data_to_insert.setdefault('create_datetime'),
)
GotifyNotifier(title=title, message=text, token_name='logs').send_message()
class MongoHandle(object):
def __init__(self, db, collection, del_db=False, del_collection=False, auto_remove=0):
config_json = LoadConfig().load_config()
base_project = LoadConfig().get_base_path()
db_user = config_json.get('DB_USER')
db_password = config_json.get('DB_PASSWORD')
db_ip = config_json.get('DB_IP')
db_port = config_json.get('DB_PORT')
mongo_link = f'mongodb://{db_user}:{db_password}@{db_ip}:{db_port}/'
self.client = pymongo.MongoClient(mongo_link)
self.db = db
self.collection = collection
if del_db and db:
# 检查数据库是否存在
if db in self.client.list_database_names():
# 删除数据库
self.client.drop_database(db)
self.db = self.client[db]
if del_collection and self.collection:
# 检查集合是否存在
if self.collection in self.db.list_collection_names():
# 删除集合
self.db.drop_collection(collection)
self.collection = self.db[collection]
if auto_remove:
self.auto_remove_data(auto_remove)
def write_data(self, data):
self.collection.insert_one(data)
def load_data(self):
# MongoDB 会在第一次写入时自动创建数据库和集合
return list(self.collection.find({}, {'_id': False}))
def auto_remove_data(self, day):
for data in self.collection.find({'create_time': {'$lt': int(time.time()) - day * 24 * 60 * 60}}):
self.collection.delete_one({'_id': data['_id']})
class SendEmail(object):
def __init__(self, subject='AutoInfo subject', title='AutoInfo title', text='auto text') -> None:
config_json = LoadConfig().load_config()
mail_host = config_json.get('MAIL_HOST')
mail_user = config_json.get('MAIL_USER')
mail_pass = config_json.get('MAIL_PASS')
mail_sender = config_json.get('MAIL_SENDER')
mail_receivers = config_json.get('MAIL_RECEIVERS')
# 第三方 SMTP 服务
self.mail_host = mail_host # 设置服务器
self.mail_user = mail_user # 用户名
self.mail_pass = mail_pass # 口令
self.sender = mail_sender
self.receivers = [mail_receivers]
self.subject = subject
self.title = title
self.text = text
def send(self):
message = MIMEText(self.text, 'plain', 'utf-8')
message['From'] = Header(self.title, 'utf-8')
message['To'] = Header("auto", 'utf-8')
message['Subject'] = Header(self.subject, 'utf-8')
try:
smtpObj = smtplib.SMTP_SSL(self.mail_host)
smtpObj.login(self.mail_user, self.mail_pass)
smtpObj.sendmail(self.sender, self.receivers, message.as_string())
print("邮件发送成功")
except smtplib.SMTPException as e:
print("Error: 无法发送邮件", e)
class GotifyNotifier:
def __init__(self, title, message, token_name=''):
config_json = LoadConfig().load_config()
self.gotify_url = config_json.get('GOTIFY_URL', 'https://gotify.erhe.top')
self.app_token = self.match_token_name(token_name or 'test')
self.title = title
self.message = message
def match_token_name(self, name):
token_name_dict = {}
# 读取项目根目录下的 gotify_config.json 文件
gotify_config_path = os.path.join(str(PROJECT_PATH), 'gotify_config.json')
with open(gotify_config_path, 'r') as f:
token_name_dict = json.load(f)
token = token_name_dict.get(name)
if token:
return token
else:
return token_name_dict['base']
def send_message(self):
# 发送POST请求
with httpx.Client() as client:
response = client.post(
url=f"{self.gotify_url}/message?token={self.app_token}",
headers={'Content-Type': 'application/json'},
json={'title': self.title, 'message': self.message}
)
# 或者可以使用 curl
# curl -k "https://gotify.erhe.top/message?token=A0Xg6ZE5946iBYg" -F "title=测试发送信息" -F "message=测试信息,测试发送" -F "priority=5"
# 检查响应状态码
if response.status_code == 200:
print(self.title)
print('Gotify Message sent successfully!')
else:
print('Failed to send message:', response.text)
class LoadConfig:
def load_config(self):
try:
config_path = os.path.join(PROJECT_PATH, 'config.json')
config_json = {}
with open(config_path, 'r') as f:
config_json = json.load(f)
if not config_json:
print('No config file found')
exit(0)
except Exception as e:
print(e)
exit(0)
return config_json
def get_base_path(self):
return os.path.join(os.getcwd().split('AutoInfo')[0], 'AutoInfo')
Loading…
Cancel
Save