You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
AutoInfo/to_email/chiphell.py

236 lines
8.7 KiB

# -*- coding: utf-8 -*-
'''
chiphell
'''
import os
import random
import sys
import threading
import re
sys.path.append(os.path.join(os.path.abspath(__file__).split('AutoInfo')[0] + 'AutoInfo'))
from utils.utils import *
config_json = LoadConfig().load_config()
DEFAULT_RE_PUSH_TIMES = config_json['DEFAULT_RE_PUSH_TIMES']
class CHIPHELL(object):
def __init__(self):
self.logs_handle = LogsHandle()
self.now_day = time.strftime('%Y-%m-%d', time.localtime())
self.base_url = 'https://www.chiphell.com/'
self.href_url = 'portal.php?mod=list&catid={}'
self.db = 'NEWS'
self.collection = 'chiphell_info'
self.headers = {
'User-Agent': 'Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 6.1; WOW64; Trident/4.0; SLCC2; Media Center PC 6.0; InfoPath.2; MS-RTC LM 8'
}
self.temp_datas = []
def req(self, source, target):
print(f'正在获取 {source} 数据')
# sleep_time = random.uniform(10, 15)
sleep_time = random.uniform(1, 2)
print(f'睡眠 {sleep_time}')
time.sleep(sleep_time)
result_list = []
try:
url = self.base_url + self.href_url.format(target)
print(url)
resp = httpx.get(url=url, headers=self.headers)
except Exception as e:
print(e)
return 0
if resp.status_code == 200:
resp.encoding = 'utf-8'
# print(resp.text)
dl_list = re.findall('<dt class="xs2">([\S\s]*?)</dl>', resp.text)
for dl in dl_list:
if dl:
url_list = re.findall('<a href="(.*?)" target="_blank" ', dl)
title_list = re.findall('class="xi2" style="">(.*?)</a> </dt>', dl)
img_url_list = re.findall('target="_blank"><img src="(.*?)"', dl)
context_list = re.findall('class="tn" /></a></div>([\S\s]*?)</dd>', dl)
post_time_list = re.findall('<span class="xg1"> (.*?)</span>', dl)
for url, title, img_url, context, post_time in zip(url_list, title_list, img_url_list, context_list,
post_time_list):
# 清理正文内容的空格和换行等字符
if context:
for i in [' ', '\n']:
context = context.replace(i, '')
context = context.replace('\r', ' ')
result_list.append({
"title": title,
"context": context,
"source_url": self.base_url + url,
'link': '',
"article_type": source.split(' - ')[1],
"article_source": source.split(' - ')[0],
"img_url": img_url,
'keyword': '',
"posted_date": post_time,
"create_time": int(time.time()),
"create_datetime": datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
"repush_times": DEFAULT_RE_PUSH_TIMES
})
else:
print(resp.status_code)
return 0
return result_list
def save_to_mongo(self, collection, source_data):
print(f'正在处理 {self.collection} 数据')
mongo = MongoHandle(db=self.db, collection=self.collection, del_db=False, del_collection=False, auto_remove=0)
for data_to_insert in source_data:
try:
# 检查数据库中是否存在匹配的文档
filter_criteria = {'title': data_to_insert.get('title', '')} # 确保 title 字段有值
count = mongo.collection.count_documents(filter_criteria)
if count == 0:
# 如果没有找到匹配的文档,插入新文档
result = mongo.collection.insert_one(data_to_insert)
# 准备发送邮件的数据
self.temp_datas.append(data_to_insert)
except TypeError as te:
print('\n%s' % te)
self.logs_handle.logs_write('chiphell', '写入数据库报错: %s' % te, 'error', False)
return 0
print(f'处理 chiphell - {collection}数据完成')
def send_to_email(self):
text = '********************************************************\n'
for data in self.temp_datas:
text += '标题: {}\n'.format(data['title'])
text += '正文: {}\n'.format(data['context'])
text += '板块: {}\n'.format(data['article_source'])
text += '类型: {}\n'.format(data['article_type'])
text += '文章地址: {}\n'.format(data['source_url'])
text += '文章时间: {}\n'.format(data['posted_date'])
text += '获取时间: {}\n'.format(data['create_datetime'])
text += '********************************************************\n\n'
title = 'chiphell - info - ' + str(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
sub = 'chiphell - info'
SendEmail(subject=sub, title=title, text=text).send()
# GotifyNotifier(title=title, message=text, token_name='news').send_message()
self.logs_handle.logs_write('chiphell', f'{title}-发送邮件完成', 'done', False)
def main(self):
category = {
'评测': {
'笔记本': '19',
'机箱': '11',
# '处理器': '13',
# '散热器': '14',
# '主板': '15',
# '内存': '137',
# '外设': '18',
# '电源': '35',
'存储': '23',
'显示设备': '21',
# '台式机': '88',
'显卡': '10',
# '相机': '116'
},
'电脑': {
'配件开箱': '98',
'整机搭建': '99',
'桌面书房': '101'
},
'掌设': {
'智能手机': '40',
'智能穿戴': '89',
'笔电平板': '41',
# '周边附件': '92'
},
# '摄影': {
# '微单卡片': '52',
# '单反单电': '51',
# '经典旁轴': '53',
# '怀旧菲林': '54',
# '影音摄像': '57',
# '周边附件': '55'
# },
# '汽车': {
# '买菜车': '58',
# '商务车': '59',
# '性能车': '63',
# '旅行车': '60',
# 'SUV': '61',
# 'MPV': '95',
# '摩托轻骑': '65',
# '改装配件': '96'
# },
# '单车': {
# '山地车': '108',
# '公路车': '109',
# '折叠车': '110',
# '休旅车': '111'
# },
# '腕表': {
# '机械表': '128',
# '电子表': '126'
# },
'视听': {
'耳机耳放': '71',
'音箱功放': '72',
# '解码转盘': '73',
'随身设备': '74'
},
'美食': {
'当地美食': '68',
'世界美食': '117',
'私房菜品': '69',
'美食器材': '70'
},
# '家居': {
# '家居': '132'
# },
}
response_datas = {}
for source1, tags in category.items():
# source1作为表名, 先放到response_datas里面
if source1 not in response_datas:
response_datas[source1] = []
for source2, target in tags.items():
source = source1 + ' - ' + source2
response_data = self.req(source, target)
if response_data != 0:
response_datas[source1] += response_data
if response_datas:
threads = []
for k, v in response_datas.items():
thread = threading.Thread(target=self.save_to_mongo, args=(k, v,))
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
if self.temp_datas:
self.send_to_email()
return None
else:
self.logs_handle.logs_write('chiphell - info', '获取数据为空', 'error', False)
return False
if __name__ == '__main__':
CHIPHELL().main()