You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

247 lines
8.8 KiB

import httpx
import json
from concurrent.futures import ThreadPoolExecutor, as_completed
from os.path import expanduser, exists
from httpx import BasicAuth
from time import sleep
from datetime import datetime
class WorldQuantBrainAPI:
def __init__(self, credentials_file='brain_credentials.txt'):
"""初始化API客户端"""
self.credentials_file = expanduser(credentials_file)
self.client = None
self.brain_api_url = 'https://api.worldquantbrain.com'
def login(self):
"""登录认证"""
try:
# 检查凭证文件是否存在
if not exists(self.credentials_file):
print(f"❌ 凭证文件不存在: {self.credentials_file}")
return False
# Load credentials
with open(self.credentials_file) as f:
file_content = f.read().strip()
with open(self.credentials_file) as f:
credentials = eval(f.read())
# Extract username and password from the list
if len(credentials) != 2:
print(f"❌ 凭证格式错误,应该是 [username, password] 格式,但得到: {credentials}")
return False
username, password = credentials
# Create a client with basic authentication
self.client = httpx.Client(auth=BasicAuth(username, password))
# Send authentication request
response = self.client.post(f'{self.brain_api_url}/authentication')
print(f"认证状态: {response.status_code}")
if response.status_code == 201:
print("✅ 登录成功!")
return True
else:
print(f"❌ 登录失败: {response.text}")
return False
except json.JSONDecodeError as e:
print(f"❌ JSON解析错误: {e}")
print(f"📄 文件内容: {file_content if 'file_content' in locals() else '无法读取'}")
return False
except Exception as e:
print(f"❌ 登录过程中出错: {str(e)}")
return False
def batch_simulate_alphas(self, alphas_config_file='alphas.json', max_workers=3):
"""批量测试多个Alpha因子(使用线程池)"""
# 检查配置文件是否存在
if not exists(alphas_config_file):
print(f"❌ 配置文件 {alphas_config_file} 不存在")
return []
# 读取Alpha配置
try:
with open(alphas_config_file, 'r', encoding='utf-8') as f:
file_content = f.read().strip()
print(f"📄 Alpha配置文件内容预览: {file_content[:200]}...") # 只显示前200个字符
with open(alphas_config_file, 'r', encoding='utf-8') as f:
alpha_configs = json.load(f)
print(f"✅ 成功读取 {len(alpha_configs)} 个Alpha配置")
for i, config in enumerate(alpha_configs):
print(f" {i+1}. {config.get('name', '未命名')}")
except json.JSONDecodeError as e:
print(f"❌ 配置文件 JSON格式错误: {e}")
print(f"📄 出错的文件内容: {file_content if 'file_content' in locals() else '无法读取'}")
return []
except Exception as e:
print(f"❌ 读取配置文件时出错: {str(e)}")
return []
print(f"📁 读取到 {len(alpha_configs)} 个Alpha配置")
# 登录
if not self.login():
print("❌ 登录失败,无法继续")
return []
results = []
# 使用线程池并发执行
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# 提交所有任务
future_to_alpha = {
executor.submit(self.simulate_alpha, config): config
for config in alpha_configs
}
# 收集结果
for future in as_completed(future_to_alpha):
config = future_to_alpha[future]
try:
result = future.result()
results.append(result)
except Exception as e:
print(f"{config.get('name', '未知Alpha')} 执行失败: {str(e)}")
results.append({
'name': config.get('name', '未知Alpha'),
'success': False,
'error': str(e)
})
# 打印汇总结果
self._print_summary(results)
return results
def simulate_alpha(self, alpha_config):
"""模拟单个Alpha因子"""
if self.client is None:
raise Exception("请先调用 login() 方法登录")
name = alpha_config.get('name', '未命名Alpha')
expression = alpha_config['expression']
settings = alpha_config.get('settings', {})
print(f"\n🚀 开始模拟: {name}")
print(f"📝 表达式: {expression}")
simulation_data = {
'type': 'REGULAR',
'settings': settings,
'regular': expression
}
try:
# 发送模拟数据
sim_resp = self.client.post(
f'{self.brain_api_url}/simulations',
json=simulation_data,
)
print(f"📤 模拟提交状态: {sim_resp.status_code}")
if sim_resp.status_code != 201:
print(f"{name} 提交失败: {sim_resp.text}")
return {'name': name, 'success': False, 'error': f'提交失败: {sim_resp.status_code}'}
# 获取进度URL并轮询结果
sim_progress_url = sim_resp.headers['location']
print(f"{name} 进度URL获取成功")
while True:
sim_progress_resp = self.client.get(sim_progress_url)
retry_after_sec = float(sim_progress_resp.headers.get("Retry-After", 0))
if retry_after_sec == 0: # simulation done!
break
print(f"{name} 等待 {retry_after_sec} 秒...")
sleep(retry_after_sec)
# 获取最终的alpha ID
result_data = sim_progress_resp.json()
alpha_id = result_data["alpha"]
print(f"{name} 模拟完成! Alpha ID: {alpha_id}")
return {
'name': name,
'expression': expression,
'alpha_id': alpha_id,
'success': True,
'result_data': result_data
}
except Exception as e:
print(f"{name} 模拟过程中出错: {str(e)}")
return {'name': name, 'success': False, 'error': str(e)}
def _print_summary(self, results):
"""打印测试结果汇总"""
print("\n" + "=" * 50)
print("📊 Alpha测试结果汇总")
print("=" * 50)
success_count = sum(1 for r in results if r.get('success', False))
failed_count = len(results) - success_count
print(f"✅ 成功: {success_count}")
print(f"❌ 失败: {failed_count}")
print("\n成功详情:")
for result in results:
if result.get('success'):
print(f"{result['name']}: {result['alpha_id']}")
if failed_count > 0:
print("\n失败详情:")
for result in results:
if not result.get('success'):
print(f"{result['name']}: {result.get('error', '未知错误')}")
def close(self):
"""关闭客户端连接"""
if self.client:
self.client.close()
self.client = None
# 使用示例
if __name__ == "__main__":
# 创建API实例
wq_api = WorldQuantBrainAPI()
try:
# 1. 批量测试所有Alpha
print("🎯 开始批量测试Alpha因子...")
start_time = datetime.now()
results = wq_api.batch_simulate_alphas(
alphas_config_file='alphas.json',
max_workers=2 # 并发数量,根据API限制调整
)
end_time = datetime.now()
print(f"\n 测试总耗时: {(end_time - start_time).total_seconds():.2f}")
# 2. 提取成功的Alpha ID
successful_alphas = [r for r in results if r.get('success')]
alpha_ids = [r['alpha_id'] for r in successful_alphas]
print(f"\n📋 成功模拟的Alpha数量: {len(alpha_ids)}")
for alpha in successful_alphas:
print(f" - {alpha['name']}: {alpha['alpha_id']}")
except Exception as e:
print(f"❌ 程序执行出错: {str(e)}")
import traceback
traceback.print_exc() # 打印完整的错误堆栈
finally:
# 确保连接关闭
wq_api.close()