You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
auto/message/message_rss_data_handel.py

197 lines
6.8 KiB

import smtplib
from email.mime.text import MIMEText
from email.header import Header
import datetime
import re
import psycopg2
from psycopg2 import Error
from concurrent.futures import ThreadPoolExecutor, as_completed
class FreshRSSDatabase:
def __init__(self):
self.hostname = 'erhe.top'
self.port = 20788
self.database = 'freshrss'
self.user = 'freshrss'
self.password = 'freshrss'
self.conn = None
self.keys = [
{'web3新闻': 'web3|区块链|NFT|DeFi|NFT'},
{'购物类新闻': '大疆|无人机|硬盘|鼠标|纸巾|穿越机|礼物'},
{'coin新闻': 'btc|eth|sui|degen'}
]
self.ellipsis = 300
self.days = 3
def connect(self):
"""连接到 PostgreSQL 数据库"""
try:
self.conn = psycopg2.connect(
dbname=self.database,
user=self.user,
password=self.password,
host=self.hostname,
port=self.port
)
except Error as e:
print(f"Error connecting to the database: {e}")
raise # 重新抛出异常
def execute_query(self, keywords):
"""执行 SQL 查询并返回结果"""
if self.conn is None:
self.connect()
if self.conn is None:
print("Database connection failed")
return None
try:
cur = self.conn.cursor()
conditions = [f"title ILIKE '%{keyword}%' AND content ILIKE '%{keyword}%'" for keyword in
keywords.split('|')]
sql = f"""
SELECT *
FROM freshrss_toor_entry
WHERE {" OR ".join(conditions)}
AND date > EXTRACT(EPOCH FROM NOW() - INTERVAL '1 day')
ORDER BY date DESC;
"""
cur.execute(sql)
records = cur.fetchall()
cur.close()
return records
except Error as e:
print(f"An error occurred: {e}")
return None
def close(self):
"""关闭数据库连接"""
if self.conn:
self.conn.close()
def remove_all_html_tags(self, text):
"""
移除字符串中的所有 HTML 标签。
参数:
text (str): 包含 HTML 标签的原始文本。
返回:
str: 移除所有 HTML 标签后的文本。
"""
clean_text = re.sub(r'<[^>]+>', '', text)
clean_text = clean_text.replace(' ', '')
clean_text = clean_text.replace('\n', '')
if len(clean_text) > self.ellipsis:
clean_text = clean_text[:self.ellipsis] + '...'
return clean_text
def send_email(self, subject='', title='', text=''):
mail_host = "smtp.163.com"
mail_user = "pushmessagebot@163.com"
mail_pass = "WSMSRKBKXIHIQWTU"
sender = "pushmessagebot@163.com"
receivers = ["pushmessagebot@163.com"]
message = MIMEText(text, 'plain', 'utf-8')
message['From'] = Header(title, 'utf-8')
message['To'] = Header("RSS data", 'utf-8')
message['Subject'] = Header(subject, 'utf-8')
try:
smtpObj = smtplib.SMTP_SSL(mail_host)
smtpObj.login(mail_user, mail_pass)
smtpObj.sendmail(sender, receivers, message.as_string())
print(f"{title} 邮件发送成功")
except smtplib.SMTPException as e:
print("Error: 无法发送邮件", e)
def query_and_process_key(self, key_name, keywords):
records = self.execute_query(keywords)
if records:
unique_records = {}
for record in records:
title = self.remove_all_html_tags(record[2]) # 获取标题
if title not in unique_records:
unique_records[title] = {
"title": title,
"content": self.remove_all_html_tags(record[4]),
"link": record[5],
"postdate": (datetime.datetime.utcfromtimestamp(record[7])
.strftime('%Y-%m-%d %H:%M:%S')) if record[7] else '',
"posttimestamp": record[7] or 0
}
return list(unique_records.values())
return None
def prepare_to_send(self, data):
source_key = data.get('source_key')
keys = data.get('keys')
data_list = data.get('data')
filter_data = []
# 计算过去一天的时间戳
one_day_ago = datetime.datetime.now() - datetime.timedelta(days=self.days)
# 将 datetime 对象转换为时间戳
one_day_ago_timestamp = one_day_ago.timestamp()
for value in data_list:
if value['posttimestamp'] >= one_day_ago_timestamp:
filter_data.append(value)
sorted_list = sorted(filter_data, key=lambda x: x['posttimestamp'], reverse=True)
subject = 'RSS' + data.get('source_key')
title = source_key
key_data_total = len(data.get('data'))
text = '关键词:\n' + data.get('keys').replace('|', '\n') + '\n\n'
text += '一共搜索到: ' + str(key_data_total) + ' 条数据\n\n'
text += '*' * 80 + '\n'
for d in sorted_list:
text += '标题: ' + d.get('title') + '\n'
text += '内容: ' + d.get('content') + '\n'
text += '链接: ' + d.get('link') + '\n'
text += '发布日期: ' + d.get('postdate') + '\n'
text += '时间戳: ' + str(d.get('posttimestamp')) + '\n\n'
text += '*' * 80
text += '\n\n'
self.send_email(subject=subject, title=title, text=text)
def main(self):
# 执行查询
loaded_data = {}
with ThreadPoolExecutor(max_workers=len(self.keys)) as executor:
future_to_key = {executor.submit(self.query_and_process_key, k, v): (k, v) for sublist in self.keys for k, v
in sublist.items()}
for future in as_completed(future_to_key):
key_name, keywords = future_to_key[future]
try:
data = future.result()
if data:
loaded_data[key_name] = {
'source_key': key_name,
'keys': keywords,
'data': data
}
else:
print(f'key: {key_name} 数据为空')
except Exception as exc:
print(f'{key_name} generated an exception: {exc}')
# 关闭数据库连接
self.close()
for source_key, data in loaded_data.items():
self.prepare_to_send(data)
print('done!')
if __name__ == "__main__":
f = FreshRSSDatabase()
f.main()