You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
auto/spider/spider_web3_news.py

256 lines
11 KiB

# -*- coding: utf-8 -*-
'''
爬取多个 web 新闻网站
存 mongo, 但只检索是否已发送过消息
'''
import os
import sys
import threading
import time
import httpx
sys.path.append(os.path.join(os.path.abspath(__file__).split('auto')[0] + 'auto'))
from html import unescape
from datetime import datetime
import re
from utils.utils_mongo_handle import MongoHandle
from base.base_load_config import load_config
config_json = load_config()
DEFAULT_RE_PUSH_TIMES = config_json['DEFAULT_RE_PUSH_TIMES']
class MessageSearchKey(object):
def __init__(self):
db_name = 'NEWS'
collection_name = 'web3_news'
self.mongo = MongoHandle(db=db_name, collection=collection_name, del_db=False, del_collection=False,
auto_remove=0)
self.headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64)",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.5",
"Accept-Encoding": "gzip, deflate, br",
"Connection": "keep-alive",
"Content-Type": "application/json"
}
def techflow(self):
# 深潮TechFlow url: https://www.163.com/dy/media/T1561634363944.html
tag_title = '深潮TechFlow'
data_list = []
target = ['https://www.163.com/dy/media/T1561634363944.html']
for url in target:
print('前往 url: {}'.format(url))
resp = httpx.get(url, headers=self.headers, timeout=10)
if resp.status_code != 200:
print('深潮TechFlow - 获取数据失败, 状态码: {}'.format(resp.status_code))
return False
resp.encoding = 'utf-8'
html = resp.text
context_urls = re.findall('<a href="(.*?)" class="title">', html)
title_list = re.findall('class="title">(.*?)</a>', html)
posted_time_list = re.findall('<span class="time">(.*?)</span>', html)
for title, context_url, posted_time in zip(title_list, context_urls, posted_time_list):
data = {
'title': title,
'context': title,
'source_url': url,
'link': context_url,
'article_type': tag_title,
'article_source': tag_title,
'img_url': '',
'keyword': '',
'posted_date': posted_time,
'create_time': int(time.time()),
'create_datetime': datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
'repush_times': DEFAULT_RE_PUSH_TIMES
}
filter_criteria = {'title': data['title']}
count = self.mongo.collection.count_documents(filter_criteria)
if count == 0:
result = self.mongo.collection.insert_one(data)
def panewslab(self):
tag_title = 'panewslab'
base_url = 'https://www.panewslab.com'
# ------------------------------------------------------------------------------------------------------------
try:
url = 'https://www.panewslab.com/webapi/index/list?Rn=20&LId=1&LastTime=1724891115&TagId=&tw=0'
print('前往 url: {}'.format(url))
resp = httpx.get(url, headers=self.headers, timeout=10)
if resp.status_code != 200:
print('{} - 获取数据失败, 状态码: {}'.format(tag_title, resp.status_code))
return False
resp.encoding = 'utf-8'
resp_json = resp.json()
for resp_data in resp_json['data']:
try:
data = {
'title': resp_data['share']['title'],
'context': resp_data['desc'],
'source_url': url,
'link': resp_data['share']['url'],
'article_type': tag_title,
'article_source': tag_title,
'img_url': '',
'keyword': '',
'posted_date': datetime.utcfromtimestamp(int(resp_data['publishTime'])).strftime(
'%Y-%m-%d %H:%M:%S'),
'create_time': int(time.time()),
'create_datetime': datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
'repush_times': DEFAULT_RE_PUSH_TIMES
}
filter_criteria = {'title': data['title']}
count = self.mongo.collection.count_documents(filter_criteria)
if count == 0:
result = self.mongo.collection.insert_one(data)
except Exception as e:
print(f'{tag_title}: 数据取值失败, {e}')
continue
except Exception as e:
print(f'{tag_title}: 数据取值失败, {e}')
# -------------------------------------------------------------------------------------------------------------
url = 'https://www.panewslab.com/zh/profundity/index.html'
print('前往 url: {}'.format(url))
resp = httpx.get(url, headers=self.headers, timeout=10)
if resp.status_code != 200:
print('{} - 获取数据失败, 状态码: {}'.format(tag_title, resp.status_code))
return False
resp.encoding = 'utf-8'
html = resp.text
context_urls = re.findall('<div class="list-left" data-v-559b28aa><a href="(.*?)" target="_blank"', html)
title_list = re.findall('target="_blank" class="n-title" data-v-559b28aa>(.*?)</a>', html)
context_list = re.findall('<p class="description" data-v-559b28aa>(.*?)</p>', html)
for title, context, context_url in zip(title_list, context_list, context_urls):
data = {
'title': title,
'context': context,
'source_url': url,
'link': base_url + context_url,
'article_type': tag_title,
'article_source': tag_title,
'img_url': '',
'keyword': '',
'posted_date': datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
'create_time': int(time.time()),
'create_datetime': datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
'repush_times': DEFAULT_RE_PUSH_TIMES
}
filter_criteria = {'title': data['title']}
count = self.mongo.collection.count_documents(filter_criteria)
if count == 0:
result = self.mongo.collection.insert_one(data)
# -------------------------------------------------------------------------------------------------------------
url = 'https://www.panewslab.com/zh/news/index.html'
print('前往 url: {}'.format(url))
resp = httpx.get(url, headers=self.headers, timeout=10)
if resp.status_code != 200:
print('{} - 获取数据失败, 状态码: {}'.format(tag_title, resp.status_code))
return False
resp.encoding = 'utf-8'
html = resp.text
context_urls = re.findall('class="content" data-v-3376a1f2><a href="(.*?)" target="_blank"', html)
title_list = re.findall('target="_blank" class="n-title" data-v-3376a1f2>(.*?)</a>', html)
context_list = re.findall('</a> <p data-v-3376a1f2>(.*?)</p>', html)
for title, context, context_url in zip(title_list, context_list, context_urls):
data = {
'title': title,
'context': context,
'source_url': url,
'link': base_url + context_url,
'article_type': tag_title,
'article_source': tag_title,
'img_url': '',
'keyword': '',
'posted_date': datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
'create_time': int(time.time()),
'create_datetime': datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
'repush_times': DEFAULT_RE_PUSH_TIMES
}
filter_criteria = {'title': data['title']}
count = self.mongo.collection.count_documents(filter_criteria)
if count == 0:
result = self.mongo.collection.insert_one(data)
def foresightnews(self):
# 获取 foresightnews 新闻数据
tag_title = 'foresightnews'
base_url = 'https://foresightnews.pro/'
# -------------------------------------------------------------------------------------------------------------
url = 'https://foresightnews.pro/'
print('前往 url: {}'.format(url))
resp = httpx.get(url, headers=self.headers, timeout=10)
if resp.status_code != 200:
print('{} - 获取数据失败, 状态码: {}'.format(tag_title, resp.status_code))
return False
resp.encoding = 'utf-8'
html = resp.text
html = unescape(html)
context_urls = re.findall('</div></div></div></a><a href="(.*?)" target="_blank"', html)
title_list = re.findall('<div class="topic-body-title" data-v-3171afda>(.*?)</div>', html)
context_list = re.findall('<div class="topic-body-content" data-v-3171afda>(.*?)</div>', html)
posted_time_list = re.findall('div class="topic-time" data-v-3171afda>(.*?)</div>', html)
for title, context, context_url, posted_time in zip(title_list, context_list, context_urls, posted_time_list):
data = {
'title': title,
'context': context,
'source_url': url,
'link': base_url + context_url,
'article_type': tag_title,
'article_source': tag_title,
'img_url': '',
'keyword': '',
'posted_date': datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
'create_time': int(time.time()),
'create_datetime': datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
'repush_times': DEFAULT_RE_PUSH_TIMES
}
filter_criteria = {'title': title}
count = self.mongo.collection.count_documents(filter_criteria)
if count == 0:
result = self.mongo.collection.insert_one(data)
def main(self):
# 打开浏览器之后, 按照每个网站不同的规则, 进行数据获取, 最后无论成功或者失败, 都放到 self.data_set
# 每条新闻数据格式: {text: '', url: '', post_time: ''}
# 跑完所有规则, 在数据库判定是否发送过消息, 数据格式: {text: '', url: '', post_time: '', push_count: 0}
functions = [
self.techflow,
self.panewslab,
self.foresightnews
]
# 创建并启动线程
print('创建并启动线程')
threads = []
for func in functions:
thread = threading.Thread(target=func)
thread.start()
threads.append(thread)
# 等待所有线程完成
for thread in threads:
thread.join()
print('程序运行结束')
if __name__ == "__main__":
m = MessageSearchKey()
m.main()