You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
alpha_tools/wqb-simulate/simulate.py

309 lines
11 KiB

import os.path
import httpx
import json
from httpx import BasicAuth
import time
from random import uniform
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
class WorldQuantBrainSimulate:
def __init__(self, credentials_file='account.txt'):
self.credentials_file = credentials_file
self.client = None
self.brain_api_url = 'https://api.worldquantbrain.com'
def load_credentials(self):
"""读取本地账号密码"""
if not os.path.exists(self.credentials_file):
print("未找到 account.txt 文件")
with open(self.credentials_file, 'w') as f: f.write("")
print("account.txt 文件已创建,请填写账号密码, 格式: ['username', 'password]")
exit(1)
with open(self.credentials_file) as f:
credentials = eval(f.read())
return credentials[0], credentials[1]
def login(self):
"""登录认证"""
username, password = self.load_credentials()
self.client = httpx.Client(auth=BasicAuth(username, password))
response = self.client.post(f'{self.brain_api_url}/authentication')
print(f"登录状态: {response.status_code}")
if response.status_code == 201:
print("登录成功!")
return True
else:
print(f"登录失败: {response.json()}")
return False
def simulate_alpha(self, expression, settings=None):
"""模拟Alpha因子"""
if self.client is None:
raise Exception("请先登录")
default_settings = {
'instrumentType': 'EQUITY',
'region': 'USA',
'universe': 'TOP3000',
'delay': 1,
'decay': 0,
'neutralization': 'INDUSTRY',
'truncation': 0.08,
'pasteurization': 'ON',
'unitHandling': 'VERIFY',
'nanHandling': 'OFF',
'language': 'FASTEXPR',
'visualization': False,
}
if settings:
default_settings.update(settings)
simulation_data = {
'type': 'REGULAR',
'settings': default_settings,
'regular': expression
}
sim_resp = self.client.post(f'{self.brain_api_url}/simulations', json=simulation_data)
print(f"模拟提交状态: {sim_resp.status_code}")
sim_progress_url = sim_resp.headers['location']
print(f"进度URL: {sim_progress_url}")
while True:
sim_progress_resp = self.client.get(sim_progress_url)
retry_after_sec = float(sim_progress_resp.headers.get("Retry-After", 0))
if retry_after_sec == 0:
break
print(sim_progress_resp.json())
print(f"等待 {retry_after_sec} 秒...")
time.sleep(retry_after_sec)
# 如果因子模拟不通过, 获取一下失败信息
if sim_progress_resp.json()["status"] == "ERROR":
result = sim_progress_resp.json()["message"]
print(f"因子模拟失败: {result}")
# 返回一个特殊标识,表示模拟失败
return {"status": "error", "message": result}
result = sim_progress_resp.json()["alpha"]
print(f"生成的Alpha ID: {result}")
return {"status": "success", "alpha_id": result}
def close(self):
"""关闭连接"""
if self.client:
self.client.close()
class AlphaSimulationManager:
def __init__(self, credentials_file='account.txt'):
self.credentials_file = credentials_file
self.results = []
def format_time(self, seconds):
"""将秒数格式化为 xx分xx秒 格式"""
if seconds < 60:
return f"{seconds:.2f}"
else:
minutes = int(seconds // 60)
remaining_seconds = seconds % 60
return f"{minutes}{remaining_seconds:.2f}"
def simulate_single_alpha(self, api, expression, settings=None):
"""模拟单个Alpha因子(线程安全)"""
alpha_start_time = time.time()
try:
# 模拟Alpha因子
simulation_result = api.simulate_alpha(expression, settings)
alpha_end_time = time.time()
time_consuming = alpha_end_time - alpha_start_time
# 根据模拟结果类型处理
if simulation_result["status"] == "success":
# 模拟成功的结果
result = {
"expression": expression,
"time_consuming": time_consuming,
"formatted_time": self.format_time(time_consuming),
"alpha_id": simulation_result["alpha_id"],
"status": "success",
"description": "/"
}
print(f"✓ 因子模拟成功: {expression}")
print(f" 耗时: {self.format_time(time_consuming)},Alpha ID: {simulation_result['alpha_id']}")
else:
# 模拟失败的结果(API返回的错误)
result = {
"expression": expression,
"time_consuming": time_consuming,
"formatted_time": self.format_time(time_consuming),
"alpha_id": "/",
"status": "error",
"description": simulation_result["message"]
}
print(f"✗ 因子模拟失败: {expression}")
print(f" 耗时: {self.format_time(time_consuming)},错误: {simulation_result['message']}")
except Exception as e:
# 其他异常情况
alpha_end_time = time.time()
time_consuming = alpha_end_time - alpha_start_time
result = {
"expression": expression,
"time_consuming": time_consuming,
"formatted_time": self.format_time(time_consuming),
"alpha_id": "/",
"status": "failed",
"description": str(e)
}
print(f"✗ 因子模拟异常: {expression}")
print(f" 耗时: {self.format_time(time_consuming)},异常: {str(e)}")
return result
def simulate_alpha_batch(self, alpha_batch, batch_number):
"""模拟一批Alpha因子(3个一组)"""
print(f"\n{'=' * 60}")
print(f"开始第 {batch_number} 批因子模拟 (共 {len(alpha_batch)} 个因子)")
print(f"因子列表: {alpha_batch}")
print(f"{'=' * 60}")
batch_start_time = time.time()
batch_results = []
# 创建API客户端实例(每个线程独立的客户端)
api = WorldQuantBrainSimulate(self.credentials_file)
try:
if api.login():
# 使用线程池执行3个因子的模拟
with ThreadPoolExecutor(max_workers=3) as executor:
# 提交所有任务
future_to_alpha = {executor.submit(self.simulate_single_alpha, api, alpha): alpha for alpha in alpha_batch}
# 等待所有任务完成
for future in as_completed(future_to_alpha):
alpha = future_to_alpha[future]
try:
result = future.result()
batch_results.append(result)
except Exception as e:
print(f"因子 {alpha} 执行异常: {e}")
except Exception as e:
print(f"{batch_number} 批模拟过程中出错: {e}")
finally:
api.close()
batch_end_time = time.time()
batch_total_time = batch_end_time - batch_start_time
print(f"\n{batch_number} 批模拟完成!")
print(f"本批总耗时: {self.format_time(batch_total_time)}")
print(f"{'=' * 60}")
return batch_results
def run_simulation(self, alpha_list, batch_size=3):
"""运行批量模拟"""
print("开始Alpha因子批量模拟...")
total_start_time = time.time()
# 将因子列表分成每批3个
batches = [alpha_list[i:i + batch_size] for i in range(0, len(alpha_list), batch_size)]
all_results = []
for i, batch in enumerate(batches, 1):
# 模拟当前批次
batch_results = self.simulate_alpha_batch(batch, i)
all_results.extend(batch_results)
# 如果不是最后一批,则等待3-5秒
if i < len(batches):
sleep_time = uniform(3, 5)
print(f"\n等待 {sleep_time:.2f} 秒后开始下一批...")
time.sleep(sleep_time)
total_end_time = time.time()
total_time = total_end_time - total_start_time
# 输出最终结果汇总
self.print_summary(all_results, total_time)
# 保存结果到文件
self.save_results(all_results)
return all_results
def print_summary(self, results, total_time):
"""打印结果汇总"""
print(f"\n{'=' * 60}")
print("模拟结果汇总")
print(f"{'=' * 60}")
success_count = sum(1 for r in results if r['status'] == 'success')
error_count = sum(1 for r in results if r['status'] == 'error')
failed_count = sum(1 for r in results if r['status'] == 'failed')
print(f"总模拟因子数: {len(results)}")
print(f"成功: {success_count}")
print(f"模拟错误: {error_count}")
print(f"执行异常: {failed_count}")
print(f"总耗时: {self.format_time(total_time)}")
print(f"{'=' * 60}")
for i, result in enumerate(results, 1):
status_icon = "" if result['status'] == 'success' else ""
print(f"{i}. {status_icon} {result['expression']}")
print(f" 状态: {result['status']}")
print(f" 耗时: {result['formatted_time']}")
print(f" Alpha ID: {result['alpha_id']}")
if result['status'] != 'success':
print(f" 原因: {result['description']}")
print()
def save_results(self, results):
"""保存结果到文件"""
# 转换为可序列化的格式
serializable_results = []
for result in results:
serializable_result = result.copy()
serializable_result['time_consuming'] = round(serializable_result['time_consuming'], 2)
serializable_results.append(serializable_result)
# 将日志文件, 保存到当前目录下, result 文件夹中
if not os.path.exists('./result'):
os.makedirs('./result')
result_name = f"result/simulation_results-{str(int(time.time()))}.json"
with open(result_name, 'w', encoding='utf-8') as f:
json.dump(serializable_results, f, ensure_ascii=False, indent=2)
print(f"结果已保存到 {result_name}")
if __name__ == "__main__":
# 待模拟因子列表
with open('alpha.txt', 'r', encoding='utf-8') as file:
alpha_list = [line.strip() for line in file]
if not alpha_list:
print("alpha.txt 文件不存在")
with open('alpha.txt', 'w', encoding='utf-8') as file: file.write("")
print("已创建 alpha.txt 文件, 请添加因子后重新运行, 一行一个因子")
exit(1)
# 创建模拟管理器并运行
manager = AlphaSimulationManager()
results = manager.run_simulation(alpha_list, batch_size=3)