# -*- coding: utf-8 -*- ''' 爬取多个 web 新闻网站 存 mongo, 但只检索是否已发送过消息 ''' import os import sys import threading import time import httpx sys.path.append(os.path.join(os.path.abspath(__file__).split('auto')[0] + 'auto')) from html import unescape from datetime import datetime import re from utils.utils_mongo_handle import MongoHandle from base.base_load_config import load_config config_json = load_config() DEFAULT_RE_PUSH_TIMES = config_json['DEFAULT_RE_PUSH_TIMES'] class MessageSearchKey(object): def __init__(self): db_name = 'NEWS' collection_name = 'web3_news' self.mongo = MongoHandle(db=db_name, collection=collection_name, del_db=False, del_collection=False, auto_remove=0) self.headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64)", "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", "Accept-Language": "en-US,en;q=0.5", "Accept-Encoding": "gzip, deflate, br", "Connection": "keep-alive", "Content-Type": "application/json" } def techflow(self): # 深潮TechFlow url: https://www.163.com/dy/media/T1561634363944.html tag_title = '深潮TechFlow' data_list = [] target = ['https://www.163.com/dy/media/T1561634363944.html'] for url in target: print('前往 url: {}'.format(url)) resp = httpx.get(url, headers=self.headers, timeout=10) if resp.status_code != 200: print('深潮TechFlow - 获取数据失败, 状态码: {}'.format(resp.status_code)) return False resp.encoding = 'utf-8' html = resp.text context_urls = re.findall('', html) title_list = re.findall('class="title">(.*?)', html) posted_time_list = re.findall('(.*?)', html) for title, context_url, posted_time in zip(title_list, context_urls, posted_time_list): data = { 'title': title, 'context': title, 'source_url': url, 'link': context_url, 'article_type': tag_title, 'article_source': tag_title, 'img_url': '', 'keyword': '', 'posted_date': posted_time, 'create_time': int(time.time()), 'create_datetime': datetime.now().strftime("%Y-%m-%d %H:%M:%S"), 'repush_times': DEFAULT_RE_PUSH_TIMES } filter_criteria = {'title': data['title']} count = self.mongo.collection.count_documents(filter_criteria) if count == 0: result = self.mongo.collection.insert_one(data) def panewslab(self): tag_title = 'panewslab' base_url = 'https://www.panewslab.com' # ------------------------------------------------------------------------------------------------------------ try: url = 'https://www.panewslab.com/webapi/index/list?Rn=20&LId=1&LastTime=1724891115&TagId=&tw=0' print('前往 url: {}'.format(url)) resp = httpx.get(url, headers=self.headers, timeout=10) if resp.status_code != 200: print('{} - 获取数据失败, 状态码: {}'.format(tag_title, resp.status_code)) return False resp.encoding = 'utf-8' resp_json = resp.json() for resp_data in resp_json['data']: try: data = { 'title': resp_data['share']['title'], 'context': resp_data['desc'], 'source_url': url, 'link': resp_data['share']['url'], 'article_type': tag_title, 'article_source': tag_title, 'img_url': '', 'keyword': '', 'posted_date': datetime.utcfromtimestamp(int(resp_data['publishTime'])).strftime( '%Y-%m-%d %H:%M:%S'), 'create_time': int(time.time()), 'create_datetime': datetime.now().strftime("%Y-%m-%d %H:%M:%S"), 'repush_times': DEFAULT_RE_PUSH_TIMES } filter_criteria = {'title': data['title']} count = self.mongo.collection.count_documents(filter_criteria) if count == 0: result = self.mongo.collection.insert_one(data) except Exception as e: print(f'{tag_title}: 数据取值失败, {e}') continue except Exception as e: print(f'{tag_title}: 数据取值失败, {e}') # ------------------------------------------------------------------------------------------------------------- url = 'https://www.panewslab.com/zh/profundity/index.html' print('前往 url: {}'.format(url)) resp = httpx.get(url, headers=self.headers, timeout=10) if resp.status_code != 200: print('{} - 获取数据失败, 状态码: {}'.format(tag_title, resp.status_code)) return False resp.encoding = 'utf-8' html = resp.text context_urls = re.findall('
(.*?)
', html) for title, context, context_url in zip(title_list, context_list, context_urls): data = { 'title': title, 'context': context, 'source_url': url, 'link': base_url + context_url, 'article_type': tag_title, 'article_source': tag_title, 'img_url': '', 'keyword': '', 'posted_date': datetime.now().strftime("%Y-%m-%d %H:%M:%S"), 'create_time': int(time.time()), 'create_datetime': datetime.now().strftime("%Y-%m-%d %H:%M:%S"), 'repush_times': DEFAULT_RE_PUSH_TIMES } filter_criteria = {'title': data['title']} count = self.mongo.collection.count_documents(filter_criteria) if count == 0: result = self.mongo.collection.insert_one(data) # ------------------------------------------------------------------------------------------------------------- url = 'https://www.panewslab.com/zh/news/index.html' print('前往 url: {}'.format(url)) resp = httpx.get(url, headers=self.headers, timeout=10) if resp.status_code != 200: print('{} - 获取数据失败, 状态码: {}'.format(tag_title, resp.status_code)) return False resp.encoding = 'utf-8' html = resp.text context_urls = re.findall('class="content" data-v-3376a1f2>(.*?)', html) context_list = re.findall('(.*?)
', html) for title, context, context_url in zip(title_list, context_list, context_urls): data = { 'title': title, 'context': context, 'source_url': url, 'link': base_url + context_url, 'article_type': tag_title, 'article_source': tag_title, 'img_url': '', 'keyword': '', 'posted_date': datetime.now().strftime("%Y-%m-%d %H:%M:%S"), 'create_time': int(time.time()), 'create_datetime': datetime.now().strftime("%Y-%m-%d %H:%M:%S"), 'repush_times': DEFAULT_RE_PUSH_TIMES } filter_criteria = {'title': data['title']} count = self.mongo.collection.count_documents(filter_criteria) if count == 0: result = self.mongo.collection.insert_one(data) def foresightnews(self): # 获取 foresightnews 新闻数据 tag_title = 'foresightnews' base_url = 'https://foresightnews.pro/' # ------------------------------------------------------------------------------------------------------------- url = 'https://foresightnews.pro/' print('前往 url: {}'.format(url)) resp = httpx.get(url, headers=self.headers, timeout=10) if resp.status_code != 200: print('{} - 获取数据失败, 状态码: {}'.format(tag_title, resp.status_code)) return False resp.encoding = 'utf-8' html = resp.text html = unescape(html) context_urls = re.findall('