You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
197 lines
6.8 KiB
197 lines
6.8 KiB
import smtplib
|
|
from email.mime.text import MIMEText
|
|
from email.header import Header
|
|
import datetime
|
|
import re
|
|
import psycopg2
|
|
from psycopg2 import Error
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
|
|
|
|
class FreshRSSDatabase:
|
|
def __init__(self):
|
|
self.hostname = 'erhe.top'
|
|
self.port = 20788
|
|
self.database = 'freshrss'
|
|
self.user = 'freshrss'
|
|
self.password = 'freshrss'
|
|
self.conn = None
|
|
self.keys = [
|
|
{'web3新闻': 'web3|区块链|NFT|DeFi|NFT'},
|
|
{'购物类新闻': '大疆|无人机|硬盘|鼠标|纸巾|穿越机|礼物'},
|
|
{'coin新闻': 'btc|eth|sui|degen'}
|
|
]
|
|
self.ellipsis = 300
|
|
self.days = 3
|
|
|
|
def connect(self):
|
|
"""连接到 PostgreSQL 数据库"""
|
|
try:
|
|
self.conn = psycopg2.connect(
|
|
dbname=self.database,
|
|
user=self.user,
|
|
password=self.password,
|
|
host=self.hostname,
|
|
port=self.port
|
|
)
|
|
except Error as e:
|
|
print(f"Error connecting to the database: {e}")
|
|
raise # 重新抛出异常
|
|
|
|
def execute_query(self, keywords):
|
|
"""执行 SQL 查询并返回结果"""
|
|
if self.conn is None:
|
|
self.connect()
|
|
if self.conn is None:
|
|
print("Database connection failed")
|
|
return None
|
|
try:
|
|
cur = self.conn.cursor()
|
|
conditions = [f"title ILIKE '%{keyword}%' AND content ILIKE '%{keyword}%'" for keyword in
|
|
keywords.split('|')]
|
|
sql = f"""
|
|
SELECT *
|
|
FROM freshrss_toor_entry
|
|
WHERE {" OR ".join(conditions)}
|
|
AND date > EXTRACT(EPOCH FROM NOW() - INTERVAL '1 day')
|
|
ORDER BY date DESC;
|
|
"""
|
|
|
|
cur.execute(sql)
|
|
records = cur.fetchall()
|
|
cur.close()
|
|
return records
|
|
except Error as e:
|
|
print(f"An error occurred: {e}")
|
|
return None
|
|
|
|
def close(self):
|
|
"""关闭数据库连接"""
|
|
if self.conn:
|
|
self.conn.close()
|
|
|
|
def remove_all_html_tags(self, text):
|
|
"""
|
|
移除字符串中的所有 HTML 标签。
|
|
|
|
参数:
|
|
text (str): 包含 HTML 标签的原始文本。
|
|
|
|
返回:
|
|
str: 移除所有 HTML 标签后的文本。
|
|
"""
|
|
clean_text = re.sub(r'<[^>]+>', '', text)
|
|
clean_text = clean_text.replace(' ', '')
|
|
clean_text = clean_text.replace('\n', '')
|
|
if len(clean_text) > self.ellipsis:
|
|
clean_text = clean_text[:self.ellipsis] + '...'
|
|
return clean_text
|
|
|
|
def send_email(self, subject='', title='', text=''):
|
|
mail_host = "smtp.163.com"
|
|
mail_user = "pushmessagebot@163.com"
|
|
mail_pass = "WSMSRKBKXIHIQWTU"
|
|
|
|
sender = "pushmessagebot@163.com"
|
|
receivers = ["pushmessagebot@163.com"]
|
|
|
|
message = MIMEText(text, 'plain', 'utf-8')
|
|
message['From'] = Header(title, 'utf-8')
|
|
message['To'] = Header("RSS data", 'utf-8')
|
|
message['Subject'] = Header(subject, 'utf-8')
|
|
|
|
try:
|
|
smtpObj = smtplib.SMTP_SSL(mail_host)
|
|
smtpObj.login(mail_user, mail_pass)
|
|
smtpObj.sendmail(sender, receivers, message.as_string())
|
|
print(f"{title} 邮件发送成功")
|
|
except smtplib.SMTPException as e:
|
|
print("Error: 无法发送邮件", e)
|
|
|
|
def query_and_process_key(self, key_name, keywords):
|
|
records = self.execute_query(keywords)
|
|
if records:
|
|
unique_records = {}
|
|
for record in records:
|
|
title = self.remove_all_html_tags(record[2]) # 获取标题
|
|
if title not in unique_records:
|
|
unique_records[title] = {
|
|
"title": title,
|
|
"content": self.remove_all_html_tags(record[4]),
|
|
"link": record[5],
|
|
"postdate": (datetime.datetime.utcfromtimestamp(record[7])
|
|
.strftime('%Y-%m-%d %H:%M:%S')) if record[7] else '',
|
|
"posttimestamp": record[7] or 0
|
|
}
|
|
return list(unique_records.values())
|
|
return None
|
|
|
|
def prepare_to_send(self, data):
|
|
source_key = data.get('source_key')
|
|
keys = data.get('keys')
|
|
data_list = data.get('data')
|
|
|
|
filter_data = []
|
|
|
|
# 计算过去一天的时间戳
|
|
one_day_ago = datetime.datetime.now() - datetime.timedelta(days=self.days)
|
|
# 将 datetime 对象转换为时间戳
|
|
one_day_ago_timestamp = one_day_ago.timestamp()
|
|
|
|
for value in data_list:
|
|
if value['posttimestamp'] >= one_day_ago_timestamp:
|
|
filter_data.append(value)
|
|
|
|
sorted_list = sorted(filter_data, key=lambda x: x['posttimestamp'], reverse=True)
|
|
|
|
subject = 'RSS' + data.get('source_key')
|
|
title = source_key
|
|
|
|
key_data_total = len(data.get('data'))
|
|
text = '关键词:\n' + data.get('keys').replace('|', '\n') + '\n\n'
|
|
text += '一共搜索到: ' + str(key_data_total) + ' 条数据\n\n'
|
|
text += '*' * 80 + '\n'
|
|
for d in sorted_list:
|
|
text += '标题: ' + d.get('title') + '\n'
|
|
text += '内容: ' + d.get('content') + '\n'
|
|
text += '链接: ' + d.get('link') + '\n'
|
|
text += '发布日期: ' + d.get('postdate') + '\n'
|
|
text += '时间戳: ' + str(d.get('posttimestamp')) + '\n\n'
|
|
text += '*' * 80
|
|
text += '\n\n'
|
|
|
|
self.send_email(subject=subject, title=title, text=text)
|
|
|
|
def main(self):
|
|
# 执行查询
|
|
loaded_data = {}
|
|
with ThreadPoolExecutor(max_workers=len(self.keys)) as executor:
|
|
future_to_key = {executor.submit(self.query_and_process_key, k, v): (k, v) for sublist in self.keys for k, v
|
|
in sublist.items()}
|
|
for future in as_completed(future_to_key):
|
|
key_name, keywords = future_to_key[future]
|
|
try:
|
|
data = future.result()
|
|
if data:
|
|
loaded_data[key_name] = {
|
|
'source_key': key_name,
|
|
'keys': keywords,
|
|
'data': data
|
|
}
|
|
else:
|
|
print(f'key: {key_name} 数据为空')
|
|
except Exception as exc:
|
|
print(f'{key_name} generated an exception: {exc}')
|
|
|
|
# 关闭数据库连接
|
|
self.close()
|
|
|
|
for source_key, data in loaded_data.items():
|
|
self.prepare_to_send(data)
|
|
|
|
print('done!')
|
|
|
|
|
|
if __name__ == "__main__":
|
|
f = FreshRSSDatabase()
|
|
f.main()
|
|
|