# -*- coding: utf-8 -*- ''' chiphell ''' import os import random import sys import threading import re import time from datetime import datetime import httpx sys.path.append(os.path.join(os.path.abspath(__file__).split('auto')[0] + 'auto')) from utils.utils_mongo_handle import MongoHandle from utils.utils_logs_handle import LogsHandle from utils.utils_send_email import SendEmail from base.base_load_config import load_config config_json = load_config() DEFAULT_RE_PUSH_TIMES = config_json['DEFAULT_RE_PUSH_TIMES'] class CHIPHELL(object): def __init__(self): self.logs_handle = LogsHandle() self.now_day = time.strftime('%Y-%m-%d', time.localtime()) self.base_url = 'https://www.chiphell.com/' self.href_url = 'portal.php?mod=list&catid={}' self.db = 'NEWS' self.collection = 'chiphell_info' self.headers = { 'User-Agent': 'Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 6.1; WOW64; Trident/4.0; SLCC2; Media Center PC 6.0; InfoPath.2; MS-RTC LM 8' } self.send_email_datas = [] self.send_email_now = 0 def req(self, source, target): print(f'正在获取 {source} 数据') # sleep_time = random.uniform(10, 15) sleep_time = random.uniform(1, 2) print(f'睡眠 {sleep_time} 秒') time.sleep(sleep_time) result_list = [] try: url = self.base_url + self.href_url.format(target) print(url) resp = httpx.get(url=url, headers=self.headers) except Exception as e: print(e) return 0 if resp.status_code == 200: resp.encoding = 'utf-8' # print(resp.text) dl_list = re.findall('
([\S\s]*?)', resp.text) for dl in dl_list: if dl: url_list = re.findall('(.*?)
', dl) img_url_list = re.findall('target="_blank">([\S\s]*?)', dl) post_time_list = re.findall(' (.*?)', dl) for url, title, img_url, context, post_time in zip(url_list, title_list, img_url_list, context_list, post_time_list): # 清理正文内容的空格和换行等字符 if context: for i in [' ', '\n']: context = context.replace(i, '') context = context.replace('\r', ' ') result_list.append({ "title": title, "context": context, "source_url": self.base_url + url, 'link': '', "article_type": source.split(' - ')[1], "article_source": source.split(' - ')[0], "img_url": img_url, 'keyword': '', "posted_date": post_time, "create_time": int(time.time()), "create_datetime": datetime.now().strftime('%Y-%m-%d %H:%M:%S'), "repush_times": DEFAULT_RE_PUSH_TIMES }) else: print(resp.status_code) return 0 return result_list def save_to_mongo(self, collection, source_data): print(f'正在处理 {self.collection} 数据') mongo = MongoHandle(db=self.db, collection=self.collection, del_db=False, del_collection=False, auto_remove=0) for data_to_insert in source_data: try: # 检查数据库中是否存在匹配的文档 filter_criteria = {'title': data_to_insert.get('title', '')} # 确保 title 字段有值 count = mongo.collection.count_documents(filter_criteria) if count == 0: # 如果没有找到匹配的文档,插入新文档 result = mongo.collection.insert_one(data_to_insert) # 准备发送邮件的数据 self.send_email_datas.append(data_to_insert) except TypeError as te: print('\n%s' % te) self.logs_handle.logs_write('chiphell', '写入数据库报错: %s' % te, 'error', False) return 0 print(f'处理 chiphell - {collection}数据完成') def send_to_email(self): title = 'chiphell - info' subject = 'chiphell - info' text = '********************************************************\n' for data in self.send_email_datas: text += '标题: {}\n'.format(data['title']) text += '正文: {}\n'.format(data['context']) text += '板块: {}\n'.format(data['article_source']) text += '类型: {}\n'.format(data['article_type']) text += '文章地址: {}\n'.format(data['source_url']) text += '文章时间: {}\n'.format(data['posted_date']) text += '获取时间: {}\n'.format(data['create_datetime']) text += '********************************************************\n\n' send_email = SendEmail(subject=subject, title=title, text=text) send_email.send() self.logs_handle.logs_write('chiphell', f'{title}-发送邮件完成', 'done', False) def main(self): category = { '评测': { '笔记本': '19', '机箱': '11', # '处理器': '13', # '散热器': '14', # '主板': '15', # '内存': '137', # '外设': '18', # '电源': '35', '存储': '23', '显示设备': '21', # '台式机': '88', '显卡': '10', # '相机': '116' }, '电脑': { '配件开箱': '98', '整机搭建': '99', '桌面书房': '101' }, '掌设': { '智能手机': '40', '智能穿戴': '89', '笔电平板': '41', # '周边附件': '92' }, # '摄影': { # '微单卡片': '52', # '单反单电': '51', # '经典旁轴': '53', # '怀旧菲林': '54', # '影音摄像': '57', # '周边附件': '55' # }, # '汽车': { # '买菜车': '58', # '商务车': '59', # '性能车': '63', # '旅行车': '60', # 'SUV': '61', # 'MPV': '95', # '摩托轻骑': '65', # '改装配件': '96' # }, # '单车': { # '山地车': '108', # '公路车': '109', # '折叠车': '110', # '休旅车': '111' # }, # '腕表': { # '机械表': '128', # '电子表': '126' # }, '视听': { '耳机耳放': '71', '音箱功放': '72', # '解码转盘': '73', '随身设备': '74' }, '美食': { '当地美食': '68', '世界美食': '117', '私房菜品': '69', '美食器材': '70' }, # '家居': { # '家居': '132' # }, } response_datas = {} for source1, tags in category.items(): # source1作为表名, 先放到response_datas里面 if source1 not in response_datas: response_datas[source1] = [] for source2, target in tags.items(): source = source1 + ' - ' + source2 response_data = self.req(source, target) if response_data != 0: response_datas[source1] += response_data if response_datas: threads = [] for k, v in response_datas.items(): thread = threading.Thread(target=self.save_to_mongo, args=(k, v,)) threads.append(thread) thread.start() for thread in threads: thread.join() else: self.logs_handle.logs_write('chiphell', '获取数据为空', 'error', False) return False # 如果 self.send_email_datas 中有数据, 则发送邮件 if self.send_email_now: if self.send_email_datas: self.send_to_email() if __name__ == '__main__': CHIPHELL().main()