You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
wqb/FactorSimulator.py

202 lines
7.2 KiB

# -*- coding: utf-8 -*-
import os
import time
import json
import httpx
from httpx import BasicAuth
class AlphaSimulator:
def __init__(self, credentials_file='account.txt'):
self.credentials_file = credentials_file
self.client = None
self.brain_api_url = 'https://api.worldquantbrain.com'
def load_credentials(self):
if not os.path.exists(self.credentials_file):
print("未找到 account.txt 文件")
with open(self.credentials_file, 'w') as f:
f.write("")
print("account.txt 文件已创建,请填写账号密码, 格式: ['username', 'password]")
exit(1)
with open(self.credentials_file) as f:
credentials = eval(f.read())
return credentials[0], credentials[1]
def login(self):
username, password = self.load_credentials()
self.client = httpx.Client(auth=BasicAuth(username, password))
response = self.client.post(f'{self.brain_api_url}/authentication')
print(f"登录状态: {response.status_code}")
if response.status_code == 201:
print(f"登录成功!:{response.json()}")
return True
else:
print(f"登录失败: {response.json()}")
return False
def load_alpha_list(self, file_path='alpha.txt'):
if not os.path.exists(file_path):
print(f"{file_path} 文件不存在")
with open(file_path, 'w', encoding='utf-8') as file:
file.write("")
print(f"已创建 {file_path} 文件, 请添加因子后重新运行, 一行一个因子")
return []
with open(file_path, 'r', encoding='utf-8') as file:
alpha_list = [line.strip() for line in file if line.strip()]
return alpha_list
def simulate_alpha(self, expression):
if self.client is None:
raise Exception("请先登录")
settings = {
'instrumentType': 'EQUITY',
'region': 'USA',
'universe': 'TOP3000',
'delay': 1,
'decay': 0,
'neutralization': 'INDUSTRY',
'truncation': 0.08,
'pasteurization': 'ON',
'unitHandling': 'VERIFY',
'nanHandling': 'OFF',
'language': 'FASTEXPR',
'visualization': False,
}
simulation_data = {
'type': 'REGULAR',
'settings': settings,
'regular': expression
}
sim_resp = self.client.post(f'{self.brain_api_url}/simulations', json=simulation_data)
print(f"模拟提交状态: {sim_resp.status_code}")
sim_progress_url = sim_resp.headers['location']
while True:
sim_progress_resp = self.client.get(sim_progress_url)
retry_after_sec = float(sim_progress_resp.headers.get("Retry-After", 0))
if retry_after_sec == 0:
break
if sim_progress_resp.json():
result = sim_progress_resp.json()
progress = result.get('progress', 0)
print(f"模拟进度: {float(progress) * 100}%")
print(f"等待 {retry_after_sec} 秒...")
time.sleep(retry_after_sec)
if sim_progress_resp.json().get("status") == "ERROR":
result = sim_progress_resp.json().get("message", "未知错误")
print(f"因子模拟失败: {result}")
return {"status": "error", "message": result}
alpha_id = sim_progress_resp.json().get("alpha")
print(f"生成的Alpha ID: {alpha_id}")
return {"status": "success", "alpha_id": alpha_id}
def run_batch_simulation(self, alpha_list, batch_size=3):
success_results = []
fail_results = []
for i in range(0, len(alpha_list), batch_size):
batch = alpha_list[i:i + batch_size]
print(f"\n开始处理第 {i // batch_size + 1} 批因子,共 {len(batch)}")
for expression in batch:
print(f"\n模拟因子: {expression}")
start_time = time.time()
try:
result = self.simulate_alpha(expression)
end_time = time.time()
time_consuming = end_time - start_time
result_data = {
"expression": expression,
"time_consuming": round(time_consuming, 2),
"status": result["status"],
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S")
}
if result["status"] == "success":
result_data["alpha_id"] = result["alpha_id"]
success_results.append(result_data)
print(f"✅ 模拟成功 - Alpha ID: {result['alpha_id']} - 耗时: {time_consuming:.2f}")
else:
result_data["message"] = result.get("message", "")
fail_results.append(result_data)
print(f"❌ 模拟失败 - {result.get('message', '未知错误')}")
except Exception as e:
end_time = time.time()
time_consuming = end_time - start_time
error_result = {
"expression": expression,
"time_consuming": round(time_consuming, 2),
"status": "error",
"message": str(e),
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S")
}
fail_results.append(error_result)
print(f"❌ 模拟异常 - {str(e)}")
print(f"{i // batch_size + 1} 批处理完成")
self._save_results(success_results, fail_results)
return success_results, fail_results
def _save_results(self, success_results, fail_results):
base_dir = 'simulator_result'
success_dir = os.path.join(base_dir, 'success')
fail_dir = os.path.join(base_dir, 'fail')
os.makedirs(success_dir, exist_ok=True)
os.makedirs(fail_dir, exist_ok=True)
timestamp = str(int(time.time()))
if success_results:
success_file = os.path.join(success_dir, f"success_results_{timestamp}.json")
with open(success_file, 'w', encoding='utf-8') as f:
json.dump(success_results, f, ensure_ascii=False, indent=2)
print(f"成功结果已保存到 {success_file}")
if fail_results:
fail_file = os.path.join(fail_dir, f"fail_results_{timestamp}.json")
with open(fail_file, 'w', encoding='utf-8') as f:
json.dump(fail_results, f, ensure_ascii=False, indent=2)
print(f"失败结果已保存到 {fail_file}")
print(f"\n总计: 成功 {len(success_results)} 个, 失败 {len(fail_results)}")
def main():
simulator = AlphaSimulator()
if not simulator.login():
return
alpha_list = simulator.load_alpha_list('alpha.txt')
if not alpha_list:
print("未找到有效的因子表达式,请检查 alpha.txt 文件")
return
print(f"共加载 {len(alpha_list)} 个因子表达式")
simulator.run_batch_simulation(alpha_list, batch_size=3)
if __name__ == "__main__":
main()