You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
FactorSimulator/core/api_client.py

158 lines
5.3 KiB

# -*- coding: utf-8 -*-
import os.path
import httpx
import time
from httpx import BasicAuth
from typing import Dict, Any, Optional, Tuple
class WorldQuantBrainSimulate:
def __init__(self, credentials_file='account.txt'):
self.credentials_file = credentials_file
self.client = None
self.brain_api_url = 'https://api.worldquantbrain.com'
"""读取本地账号密码"""
def load_credentials(self) -> Tuple[str, str]:
if not os.path.exists(self.credentials_file):
print("未找到 account.txt 文件")
with open(self.credentials_file, 'w') as f:
f.write("")
print("account.txt 文件已创建,请填写账号密码, 格式: ['username', 'password]")
exit(1)
with open(self.credentials_file) as f:
credentials = eval(f.read())
return credentials[0], credentials[1]
"""登录认证"""
def login(self) -> bool:
username, password = self.load_credentials()
self.client = httpx.Client(auth=BasicAuth(username, password))
response = self.client.post(f'{self.brain_api_url}/authentication')
print(f"登录状态: {response.status_code}")
if response.status_code == 201:
print("登录成功!")
print(f"账户信息: {response.json()}")
return True
else:
print(f"登录失败: {response.json()}")
return False
"""模拟Alpha因子"""
def simulate_alpha(self, expression: str, settings: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
if self.client is None:
raise Exception("请先登录")
default_settings = {
'instrumentType': 'EQUITY',
'region': 'USA',
'universe': 'TOP3000',
'delay': 1,
'decay': 0,
'neutralization': 'INDUSTRY',
'truncation': 0.08,
'pasteurization': 'ON',
'unitHandling': 'VERIFY',
'nanHandling': 'OFF',
'language': 'FASTEXPR',
'visualization': False,
}
if settings:
default_settings.update(settings)
simulation_data = {
'type': 'REGULAR',
'settings': default_settings,
'regular': expression
}
sim_resp = self.client.post(f'{self.brain_api_url}/simulations', json=simulation_data)
print(f"模拟提交状态: {sim_resp.status_code}")
sim_progress_url = sim_resp.headers['location']
print(f"进度URL: {sim_progress_url}")
while True:
sim_progress_resp = self.client.get(sim_progress_url)
retry_after_sec = float(sim_progress_resp.headers.get("Retry-After", 0))
if retry_after_sec == 0:
break
if sim_progress_resp.json():
result = sim_progress_resp.json()
progress = result['progress']
if progress:
print(f"模拟进度: {float(progress)*100}%")
print(f"等待 {retry_after_sec} 秒...")
time.sleep(retry_after_sec)
# 如果因子模拟不通过, 获取一下失败信息
if sim_progress_resp.json()["status"] == "ERROR":
result = sim_progress_resp.json()["message"]
print(f"因子模拟失败: {result}")
# 返回一个特殊标识,表示模拟失败
return {"status": "error", "message": result}
alpha_id = sim_progress_resp.json()["alpha"]
print(f"生成的Alpha ID: {alpha_id}")
# 获取详细的性能指标
metrics = self.get_alpha_metrics(alpha_id)
return {"status": "success", "alpha_id": alpha_id, "metrics": metrics}
"""获取Alpha因子的详细指标"""
def get_alpha_metrics(self, alpha_id: str) -> Dict[str, Any]:
if self.client is None:
raise Exception("请先登录")
try:
# 获取Alpha的基本信息和指标
alpha_url = f'{self.brain_api_url}/alphas/{alpha_id}'
alpha_resp = self.client.get(alpha_url)
if alpha_resp.status_code in [200, 201]:
alpha_data = alpha_resp.json()
# 以后可能需要获取其他参数
if alpha_data.get('metrics'):
alpha_data = alpha_data.get('metrics')
return alpha_data or {}
else:
print(f"获取Alpha指标失败: {alpha_resp.status_code}")
# 返回一个空的字典结构
return {
"train": {},
"is": {},
"test": {},
"grade": None,
"stage": None,
"status": None,
"dateCreated": None,
"id": alpha_id
}
except Exception as e:
print(f"获取指标时出错: {str(e)}")
# 返回一个空的字典结构
return {
"train": {},
"is": {},
"test": {},
"grade": None,
"stage": None,
"status": None,
"dateCreated": None,
"id": alpha_id
}
def close(self):
"""关闭连接"""
if self.client:
self.client.close()