You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
auto/archive/message_coinmarketcap.py

77 lines
2.5 KiB

# -*- coding: utf-8 -*-
'''
获取 coinmarketcap 数字货币实时价格
'''
import sys
import os
from datetime import datetime
sys.path.append(os.path.join(os.path.abspath(__file__).split('auto')[0] + 'auto'))
from base.base_load_config import load_config, get_base_path
config_json = load_config()
base_project = get_base_path()
from utils.utils_check_base import *
from utils.utils_send_gotify import *
class CheckCoinmarketcap:
def __init__(self):
self.currency = '¥'
self.url_list = [
{'BTC': 'https://www.coinmarketcap.com/currencies/bitcoin/'},
{'ETH': 'https://www.coinmarketcap.com/currencies/ethereum/'},
{'DOGE': 'https://coinmarketcap.com/currencies/dogecoin/'},
{'ARB': 'https://coinmarketcap.com/currencies/arbitrum/'},
{'ATH': 'https://coinmarketcap.com/currencies/aethir/'},
{'SUI': 'https://coinmarketcap.com/currencies/sui/'},
]
# self.selectors = ['#section-coin-overview > div.sc-65e7f566-0.czwNaM.flexStart.alignBaseline > span']
self.selectors = ['div#section-coin-overview > div:nth-of-type(2)']
def process_data(self, all_data):
result = {}
for data in all_data:
for key, value in data[0].items():
value_list = value.split('\xa0')
value_list = [item for item in value_list if item]
if key not in result:
result[key] = value_list
return result
def send_data(self, all_data):
# 打印结果,只包含货币名称和价格
context = ''
for key, value in all_data.items():
context += f'{key}: {value[0]} {value[1]} {value[2]}\n'
if context:
context += '\n{}'.format(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
print(context)
# 推送到 message
GotifyNotifier('实时coin价格', context, 'coin').send_message()
else:
print('no data!')
def main(self):
# 运行异步函数
all_data = None
crawler = CryptoCrawler(self.url_list, self.selectors, headless=True)
all_data = crawler.main() # 返回的数据格式: [[{'BTC': '¥483,784.76\xa0\xa01.02%\xa0(1天)'}], ...
if not all_data:
print('获取数据失败')
return
# 清理一下数据
all_data = self.process_data(all_data) # {'BTC': ['¥483,482.98', '0.95%', '(1天)'], ...
self.send_data(all_data)
if __name__ == '__main__':
CheckCoinmarketcap().main()