You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
203 lines
7.1 KiB
203 lines
7.1 KiB
# -*- coding: utf-8 -*-
|
|
import os
|
|
import sys
|
|
|
|
sys.path.append(os.path.join(os.path.abspath(__file__).split('auto')[0] + 'auto'))
|
|
|
|
import httpx
|
|
from datetime import datetime
|
|
from utils.utils_send_gotify import *
|
|
|
|
retry_count = 5
|
|
|
|
|
|
def fetch_coin_data(target):
|
|
url = "https://api.chainalert.me/"
|
|
headers = {
|
|
"Content-Type": "application/x-www-form-urlencoded; charset=UTF-8",
|
|
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36"
|
|
}
|
|
|
|
payload = {
|
|
"method": "listData",
|
|
"params": [datetime.now().strftime("%Y-%m-%d"), "MARKETPRICE", '', 0, 9999.0, target]
|
|
}
|
|
|
|
with httpx.Client() as client:
|
|
try:
|
|
response = client.post(url, headers=headers, data=payload, timeout=3)
|
|
except Exception as e:
|
|
# print(f"Target: {target} failed to fetch data. error: {str(e)}")
|
|
client.close()
|
|
return False
|
|
if response.status_code != 200:
|
|
client.close()
|
|
# print(f"{target} failed to fetch data. status code: {response.status_code}")
|
|
return False
|
|
else:
|
|
text = ''
|
|
data = response.json()
|
|
try:
|
|
target_data = eval(data['result'][0]['data'])
|
|
except Exception as e:
|
|
client.close()
|
|
raise Exception(f"Failed to parse data: {data}, error: {str(e)}")
|
|
|
|
target_data = target_data[0]
|
|
|
|
# print(target_data)
|
|
|
|
# 获取数据值
|
|
name = target_data['name']
|
|
rank = target_data['rank']
|
|
price = target_data['item1']
|
|
volume = target_data['item2']
|
|
change = target_data['item3']
|
|
market_cap = target_data['item4']
|
|
dilute = target_data['item5']
|
|
logoUrl = target_data['logoUrl']
|
|
|
|
# 拼接到 text 中
|
|
text = '{} {} {} {} {} {}'.format(name, price, change, volume, rank, market_cap)
|
|
print(text)
|
|
# text += f'Name: {name}\n'
|
|
# text += f'Ranking: {rank}\n'
|
|
# text += f'Price: {price}\n'
|
|
# text += f'24H Transaction Volume: {volume}\n'
|
|
# text += f'24H Price Change: {change}\n'
|
|
# text += f'Market Capitalization: {market_cap}\n'
|
|
# text += f'Diluted Market Value: {dilute}\n'
|
|
# text += f'Logo: {logoUrl}\n'
|
|
|
|
return text + '\n' + ('-' * len(text)) + '\n'
|
|
|
|
|
|
def fetch_vix_data():
|
|
url = "https://api.chainalert.me/"
|
|
headers = {
|
|
"Content-Type": "application/x-www-form-urlencoded; charset=UTF-8",
|
|
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36"
|
|
}
|
|
|
|
payload = {
|
|
"method": "listData",
|
|
"params": ['', "GREEDY_INDEX", 1, 0, 1, '']
|
|
}
|
|
|
|
with httpx.Client() as client:
|
|
try:
|
|
response = client.post(url, headers=headers, data=payload, timeout=3)
|
|
except Exception as e:
|
|
# print(f"failed to fetch VIX data. error: {str(e)}")
|
|
client.close()
|
|
return False
|
|
if response.status_code != 200:
|
|
client.close()
|
|
# print(f"Failed to fetch VIX data. status code: {response.status_code}")
|
|
return False
|
|
else:
|
|
data = response.json()
|
|
vix_data = eval(data['result'][0]['data'])
|
|
vix_data = vix_data[0]
|
|
print(vix_data)
|
|
greedy = vix_data['greedy']
|
|
level = vix_data['level']
|
|
text = f'VIX data: {greedy}\nLevel: {level}'
|
|
return text
|
|
|
|
|
|
def fetch_gas_data():
|
|
url = "https://a5.maiziqianbao.net/api/v1/chains/EVM/1/gas_price"
|
|
|
|
headers = {
|
|
"Host": "a5.maiziqianbao.net",
|
|
"Connection": "keep-alive",
|
|
"x-req-token": "MDbO4FsaSUPdjCdvTUs2zY4V3rnvvYatvYyjz7SfY+aCJ8r+RFm06X2dGR8eEDK7Gc5g1TLEQySEhGerRXbDT/NS+e5QAWRU68yD8m4y/aKK+TBkIv90VwvxmvYId2BVoDPDHQCGG4o3EqRWkS93eV0twYQ7w7qvNUj2e3tpDcUZYuplPyLozgYVTegFPnDk",
|
|
"Accept": "*/*",
|
|
"x-app-type": "iOS-5",
|
|
"x-app-ver": "1.0.1",
|
|
"x-app-udid": "419815AD-3015-4B5A-92CA-3BCBED24ACEC",
|
|
"x-app-locale": "en",
|
|
"Accept-Language": "zh-Hans-CN;q=1.0, en-CN;q=0.9",
|
|
"Accept-Encoding": "br;q=1.0, gzip;q=0.9, deflate;q=0.8",
|
|
"User-Agent": "MathGas/1.0.1 (MathWallet.MathGas; build:3; macOS 13.5.0) Alamofire/5.4.4"
|
|
}
|
|
|
|
with httpx.Client() as client:
|
|
response = client.get(url, headers=headers)
|
|
if response.status_code != 200:
|
|
client.close()
|
|
print("Error:", response.status_code)
|
|
return False
|
|
|
|
if not response.json():
|
|
client.close()
|
|
print("Not Find GAS Data. Error: No response")
|
|
return False
|
|
|
|
remove_last_n_chars = lambda n, n_chars=9: int(str(n)[:-n_chars]) if len(str(n)) > n_chars else n
|
|
|
|
result = '\nGAS:\n'
|
|
|
|
try:
|
|
data = response.json()['data']
|
|
|
|
fastest = remove_last_n_chars(data['fastest']['price'])
|
|
fast = remove_last_n_chars(data['fast']['price'])
|
|
standard = remove_last_n_chars(data['standard']['price'])
|
|
low = remove_last_n_chars(data['low']['price'])
|
|
base = remove_last_n_chars(data['base']['price'])
|
|
print(f'fastest: {fastest} - fast: {fast} - standard: {standard} - low: {low} - base: {base}')
|
|
result += f'fastest: {fastest}\nfast: {fast}\nstandard: {standard}\nlow: {low}\nbase: {base}'
|
|
return result
|
|
except Exception as e:
|
|
print(e)
|
|
return False
|
|
|
|
|
|
def main():
|
|
text = ''
|
|
|
|
# 获取币币实时价格
|
|
target_list = ['btc', 'eth', 'sol', 'grass', 'sui', 'doge', 'arb', 'ath', 'move', 'pepe', 'degen', 'act', 'plume']
|
|
for target in target_list:
|
|
for retry in range(1, retry_count + 1):
|
|
result = fetch_coin_data(target)
|
|
if result:
|
|
text += result
|
|
break
|
|
else:
|
|
print(f"{target} Failed to fetch data. retry: {retry}")
|
|
if retry == retry_count:
|
|
text += f"{target} Failed to fetch data. retry count: {retry}"
|
|
|
|
# 获取恐慌指数
|
|
for retry in range(1, retry_count + 1):
|
|
result = fetch_vix_data()
|
|
if result:
|
|
text += result + '\n\n'
|
|
break
|
|
else:
|
|
print(f"Failed to fetch VIX data. retry: {retry}")
|
|
if retry == retry_count:
|
|
text += f"Failed to fetch VIX data. retry count: {retry}"
|
|
|
|
# 获取实时gas费
|
|
for retry in range(1, retry_count + 1):
|
|
result = fetch_gas_data()
|
|
if result:
|
|
text += '\n' + result + '\n\n'
|
|
break
|
|
else:
|
|
# print(f"Failed to fetch Gas data. retry: {retry}")
|
|
if retry == retry_count:
|
|
text += f"Failed to fetch Gas data. retry count: {retry}"
|
|
|
|
if text:
|
|
GotifyNotifier('Real-time coin price\n', text, 'coin').send_message()
|
|
else:
|
|
print('No Data')
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|
|
|