You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
alpha_tools/test/wqb-fetch-universe/fetch_universe.py

65 lines
1.8 KiB

# -*- coding: utf-8 -*-
import httpx
from httpx import BasicAuth
def login():
"""登录WorldQuant Brain API"""
# 从nacos获取账号密码
nacos_resp = httpx.get('http://192.168.31.41:30848/nacos/v1/cs/configs?dataId=wq_account&group=quantify')
if nacos_resp.status_code != 200:
print('获取账号密码失败')
return False
config = nacos_resp.json()
username = config['user_name']
password = config['password']
print(f"正在登录账户: {username}")
# 创建客户端并认证
client = httpx.Client(auth=BasicAuth(username, password))
# 发送登录请求
response = client.post('https://api.worldquantbrain.com/authentication')
print(f"登录状态: {response.status_code}")
if response.status_code == 201:
print("登录成功!")
print(response.json())
return client
else:
print(f"登录失败: {response.json()}")
client.close()
return None
def fetch_universe(client):
try:
response = client.options('https://api.worldquantbrain.com/simulations')
print(f"Options请求状态: {response.status_code}")
results = response.json()
except Exception as e:
print(f"Options请求失败: {e}")
return {}
return results
# 使用示例
client = login()
if client:
results = fetch_universe(client)
universe_data = (results.get('actions', {})
.get('POST', {})
.get('settings', {})
.get('children', {})
.get('universe', {})
.get('choices', {})
.get('instrumentType', {})
.get('EQUITY', {})
.get('region', {})
)
print(universe_data)
client.close()