You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
wqb-server/simulator/simulator_wqb.py

583 lines
22 KiB

"""
Enhanced Alpha Template Generator Script
This script generates alpha templates with interactive user input for:
- JSON file path selection
- User authentication
- Simulation parameters
- Multi-simulation mode support
- Real-time log monitoring
"""
import asyncio
import wqb
import json
import os
import getpass
import threading
import time
import sys
import msvcrt # For Windows password input with asterisks
from pathlib import Path
def get_password_with_asterisks(prompt):
"""Get password input with asterisks shown for each character"""
print(prompt, end='', flush=True)
password = ""
while True:
char = msvcrt.getch()
# Handle Enter key (carriage return)
if char == b'\r':
print() # New line
break
# Handle Backspace
elif char == b'\x08':
if len(password) > 0:
password = password[:-1]
# Move cursor back, print space, move cursor back again
print('\b \b', end='', flush=True)
# Handle Ctrl+C
elif char == b'\x03':
print()
raise KeyboardInterrupt
# Handle regular characters
else:
try:
# Convert bytes to string
char_str = char.decode('utf-8')
if char_str.isprintable():
password += char_str
print('*', end='', flush=True)
except UnicodeDecodeError:
pass # Ignore non-printable characters
return password
def get_json_filepath():
"""Ask user to input the directory/filepath of expressions_with_settings.json"""
while True:
print("\n" + "="*60)
print("JSON 文件配置")
print("="*60)
filepath = input("请复制粘贴 expressions_with_settings.json (即以json格式储存的带有setting的表达式列表)的目录或完整路径: ").strip()
# Remove quotes if user copied with quotes
filepath = filepath.strip('"').strip("'")
# Check if it's a directory and try to find the file
if os.path.isdir(filepath):
json_path = os.path.join(filepath, "expressions_with_settings.json")
else:
json_path = filepath
# Verify file exists
if os.path.exists(json_path):
try:
with open(json_path, 'r') as f:
data = json.load(f)
print(f"✓ 成功加载 JSON 文件: {json_path}")
return json_path, data
except json.JSONDecodeError:
print("❌ 错误: JSON 文件格式无效,请检查文件。")
except Exception as e:
print(f"❌ 读取文件错误: {e}")
else:
print("❌ 错误: 文件未找到,请检查路径后重试。")
def get_user_credentials():
"""Ask user for brain username and password with asterisk password input"""
print("\n" + "="*60)
print("BRAIN 身份验证")
print("="*60)
username = input("请输入您的 BRAIN 用户名: ").strip()
password = get_password_with_asterisks("请输入您的 BRAIN 密码 (显示为 *): ")
return username, password
def test_authentication(username, password):
"""Test authentication and return session if successful"""
print("\n" + "="*60)
print("API连通验证")
print("="*60)
try:
logger = wqb.wqb_logger()
wqbs = wqb.WQBSession((username, password), logger=logger)
# Test connection
resp = wqbs.locate_field('open')
print(f"连接测试结果: resp.ok = {resp.ok}")
if resp.ok:
print("✓ 身份验证成功!")
return wqbs, logger
else:
print("❌ 身份验证失败,请检查您的用户名和密码。")
return None, None
except Exception as e:
print(f"❌ 身份验证错误: {e}")
return None, None
def get_simulation_parameters(expressions_count, json_path):
"""Get simulation parameters from user with validation"""
print("\n" + "="*60)
print("回测参数设置")
print("="*60)
print(f"JSON 中的表达式总数: {expressions_count}")
# Get starting position
while True:
try:
where_to_start = int(input(f"从列表中第几个表达式开始 (0 到 {expressions_count-1}): "))
if 0 <= where_to_start < expressions_count:
if where_to_start > 0:
print(f"\n 警告: 原始 JSON 文件将被直接覆盖!")
print(f"📝 原始文件: {expressions_count} 个表达式")
print(f"🔪 切割后: {expressions_count - where_to_start} 个表达式")
print(f"📂 文件位置: {json_path}")
print(f"\n🚨 重要提示: 如果您不希望覆盖原始文件,请立即关闭终端并手动备份文件!")
print(f"⏰ 5秒后将继续执行覆盖操作...")
# Give user 5 seconds to think/close terminal
import time
for i in range(5, 0, -1):
print(f"倒计时: {i} 秒...", end='\r')
time.sleep(1)
print(" ") # Clear countdown line
confirm = input("(继续程序,开始回测y/返回并重选列表起始位置n): ").lower().strip()
if confirm != 'y':
print("请重新选择表达式列表起始位置。")
continue
break
else:
print(f"❌ 起始位置无效,必须在 0 到 {expressions_count-1} 之间")
except ValueError:
print("❌ 请输入有效数字。")
# Get concurrent count
while True:
try:
concurrent_count = int(input("请输入并发回测数量 (最小值 1): "))
if concurrent_count >= 1:
break
else:
print("❌ 并发数量必须大于等于 1。")
except ValueError:
print("❌ 请输入有效数字。")
return where_to_start, concurrent_count
def cut_json_file(json_path, expressions_with_settings, where_to_start):
"""Cut the JSON file from the starting point and overwrite the original file"""
if where_to_start == 0:
return expressions_with_settings # No cutting needed
# Cut the expressions list
cut_expressions = expressions_with_settings[where_to_start:]
# Overwrite the original JSON file
try:
with open(json_path, 'w', encoding='utf-8') as f:
json.dump(cut_expressions, f, ensure_ascii=False, indent=2)
print(f"✓ 原始 JSON 文件已被覆盖")
print(f"📊 新文件包含 {len(cut_expressions)} 个表达式")
return cut_expressions
except Exception as e:
print(f"❌ 覆盖 JSON 文件失败: {e}")
print(f" 将使用原始数据继续运行")
return expressions_with_settings
def shuffle_json_file(json_path, expressions_with_settings):
"""Randomly shuffle the JSON elements and overwrite the file"""
import random
# Create a copy and shuffle it
shuffled_expressions = expressions_with_settings.copy()
random.shuffle(shuffled_expressions)
# Overwrite the JSON file with shuffled data
try:
with open(json_path, 'w', encoding='utf-8') as f:
json.dump(shuffled_expressions, f, ensure_ascii=False, indent=2)
print(f"✓ JSON 文件已随机打乱并覆盖")
print(f"🔀 已打乱 {len(shuffled_expressions)} 个表达式的顺序")
return shuffled_expressions
except Exception as e:
print(f"❌ 打乱 JSON 文件失败: {e}")
print(f" 将使用原始顺序继续运行")
return expressions_with_settings
def get_random_shuffle_choice():
"""Ask user if they want to randomly shuffle the expressions"""
print("\n" + "="*60)
print("随机模式选择")
print("="*60)
print("是否要随机打乱表达式顺序?")
print("💡 这将改变表达式在文件中的排列顺序,以达到随机回测的目的")
while True:
choice = input("选择随机模式? (y/n): ").lower().strip()
if choice in ['y', 'n']:
return choice == 'y'
else:
print("❌ 请输入 y 或 n")
def get_multi_simulation_choice():
"""Ask user if they want to use multi-simulation mode"""
print("\n" + "="*60)
print("多重回测(multi-simulatioin)模式选择")
print("="*60)
print("是否要使用多重回测(multi-simulatioin)模式?")
print("💡 多重回测(multi-simulatioin)可以将多个alpha组合在一个回测槽中运行")
while True:
choice = input("使用多重回测(multi-simulatioin)模式? (y/n): ").lower().strip()
if choice in ['y', 'n']:
return choice == 'y'
else:
print("❌ 请输入 y 或 n")
def get_alpha_count_per_slot():
"""Ask user how many alphas to put in one multi-simulation slot"""
print("\n" + "="*60)
print("多重回测(multi-simulatioin)槽配置")
print("="*60)
print("每个多重回测(multi-simulatioin)槽中放置多少个alpha?")
print("💡 范围: 2-10 个alpha")
while True:
try:
alpha_count = int(input("每个槽的alpha数量 (2-10): "))
if 2 <= alpha_count <= 10:
return alpha_count
else:
print("❌ 数量必须在 2 到 10 之间")
except ValueError:
print("❌ 请输入有效数字。")
def monitor_log_file(logger, stop_event, use_multi_sim=False, alpha_count_per_slot=None):
"""Monitor log file and print new lines in real-time"""
print("\n📊 开始监控日志文件...")
# Get current directory to look for log files
current_dir = os.getcwd()
log_file_path = None
# First, try to find any existing wqb log files (including older ones)
print("🔍 查找 WQB 日志文件...")
# Look for any wqb*.log files in current directory
wqb_files = []
try:
for file in os.listdir(current_dir):
if file.startswith('wqb') and file.endswith('.log'):
file_path = os.path.join(current_dir, file)
wqb_files.append((file_path, os.path.getmtime(file_path)))
except Exception as e:
print(f" 扫描目录失败: {e}")
return
if wqb_files:
# Sort by modification time, get the newest one
log_file_path = sorted(wqb_files, key=lambda x: x[1], reverse=True)[0][0]
print(f"✓ 监控已找到的最新日志文件: {log_file_path}")
else:
# Wait for new log file to be created
print("等待新的 WQB 日志文件创建...")
start_time = time.time()
while not stop_event.is_set() and (time.time() - start_time) < 30: # Wait max 30 seconds
try:
for file in os.listdir(current_dir):
if file.startswith('wqb') and file.endswith('.log'):
file_path = os.path.join(current_dir, file)
# Check if file was created recently (within last 120 seconds)
if os.path.getctime(file_path) > (time.time() - 120):
log_file_path = file_path
break
except Exception:
pass
if log_file_path:
break
time.sleep(1)
if not log_file_path:
print(" 未找到 WQB 日志文件,日志监控已禁用。")
print("💡 提示: 日志文件通常在开始回测后才会创建")
return
else:
print(f"✓ 找到新日志文件: {log_file_path}")
if stop_event.is_set():
return
print("="*60)
# Display multi-simulation information if applicable
if use_multi_sim and alpha_count_per_slot:
print("📌 重要提示:")
print(f"以下是multi simulation的记录,你的设计是1个multi simulation中有{alpha_count_per_slot}个alpha,")
print(f"因此需将实际回测数乘以该乘数,才得到实际已完成的Alpha个数。")
print("="*60)
try:
# Start monitoring from current end of file
with open(log_file_path, 'r', encoding='utf-8') as f:
# Go to end of file
f.seek(0, 2)
while not stop_event.is_set():
line = f.readline()
if line:
# Clean up the log line and print it
clean_line = line.rstrip()
if clean_line: # Only print non-empty lines
print(f"[日志] {clean_line}")
else:
time.sleep(0.2)
except Exception as e:
print(f" 监控日志文件时出错: {e}")
async def automated_main(json_file_content, username, password, start_position=0, concurrent_count=3,
random_shuffle=False, use_multi_sim=False, alpha_count_per_slot=3):
"""Automated main function for web interface - takes all parameters at once"""
try:
print("🧠 BRAIN Alpha 模板生成器 (自动模式)")
print("="*60)
# Parse JSON content directly
import json
expressions_with_settings = json.loads(json_file_content)
expressions_count = len(expressions_with_settings)
print(f"📊 已加载 {expressions_count} 个 alpha 配置")
# Setup logger and session
logger = wqb.wqb_logger()
wqbs = wqb.WQBSession((username, password), logger=logger)
# Test connection
resp = wqbs.locate_field('open')
print(f"连接测试结果: resp.ok = {resp.ok}")
if not resp.ok:
print("❌ 身份验证失败")
return {"success": False, "error": "Authentication failed"}
print("✅ 身份验证成功!")
# Process expressions based on parameters
if start_position > 0:
expressions_with_settings = expressions_with_settings[start_position:]
print(f"🔪 已从位置 {start_position} 开始切割,剩余 {len(expressions_with_settings)} 个表达式")
if random_shuffle:
import random
random.shuffle(expressions_with_settings)
print(f"🔀 已随机打乱 {len(expressions_with_settings)} 个表达式的顺序")
if use_multi_sim:
# Convert to multi-alphas format
original_count = len(expressions_with_settings)
expressions_with_settings = wqb.to_multi_alphas(expressions_with_settings, alpha_count_per_slot)
print(f"✓ 已转换为多重回测(multi-simulatioin)格式")
print(f"📊 原始表达式数: {original_count}")
print(f"🎯 每槽alpha数: {alpha_count_per_slot}")
# Write multi-simulation info to log
multi_sim_msg = (f"[MULTI-SIMULATION MODE] 以下是multi simulation的记录,"
f"你的设计是1个multi simulation中有{alpha_count_per_slot}个alpha,"
f"因此需将实际回测数乘以该乘数,才得到实际已完成的Alpha个数。")
logger.info("="*80)
logger.info(multi_sim_msg)
logger.info("="*80)
print(f"🔄 使用 {concurrent_count} 个并发回测")
print("\n" + "="*60)
print("运行回测")
print("="*60)
if use_multi_sim:
print("开始多重回测(multi-simulatioin)并发回测...")
else:
print("开始并发回测...")
# Run simulations
resps = await wqbs.concurrent_simulate(
expressions_with_settings,
concurrent_count,
log_gap=10
)
# Collect results
alpha_ids = []
successful_count = 0
failed_count = 0
print("\n" + "="*60)
print("回测结果")
print("="*60)
if use_multi_sim:
print(f"成功完成 {len(resps)} 个多重回测(multi-simulatioin)槽的回测")
else:
print(f"成功完成 {len(resps)} 个回测")
print("\nAlpha IDs:")
for i, resp in enumerate(resps):
try:
alpha_id = resp.json()['alpha']
alpha_ids.append(alpha_id)
successful_count += 1
print(f" {i+1:4d}. {alpha_id}")
except Exception as e:
failed_count += 1
print(f" {i+1:4d}. 错误: {e}")
print("\n✅ 处理完成!")
return {
"success": True,
"results": {
"total": len(resps),
"successful": successful_count,
"failed": failed_count,
"alphaIds": alpha_ids,
"use_multi_sim": use_multi_sim,
"alpha_count_per_slot": alpha_count_per_slot if use_multi_sim else None
}
}
except Exception as e:
print(f"\n❌ 错误: {e}")
return {"success": False, "error": str(e)}
async def main():
"""Main function with interactive workflow"""
print("🧠 BRAIN Alpha 模板生成器")
print("="*60)
# Step 1: Get JSON file and load expressions
json_path, expressions_with_settings = get_json_filepath()
expressions_count = len(expressions_with_settings)
print(f"\n📊 已从以下位置加载 {expressions_count} 个 alpha 配置:")
print(f" {json_path}")
# Step 2: Get credentials and test authentication
wqbs = None
logger = None
while wqbs is None:
username, password = get_user_credentials()
wqbs, logger = test_authentication(username, password)
if wqbs is None:
retry = input("\n是否要重试? (y/n): ").lower().strip()
if retry != 'y':
print("正在退出...")
return
# Step 3: Get simulation parameters
where_to_start, concurrent_count = get_simulation_parameters(expressions_count, json_path)
# Step 3.5: Cut JSON file if needed
if where_to_start > 0:
print(f"\n🔪 正在切割 JSON 文件...")
expressions_with_settings = cut_json_file(json_path, expressions_with_settings, where_to_start)
where_to_start = 0 # Reset to 0 since we cut the file
# Step 3.6: Ask for random shuffle option
if get_random_shuffle_choice():
print(f"\n🔀 正在随机打乱表达式顺序...")
expressions_with_settings = shuffle_json_file(json_path, expressions_with_settings)
# Step 3.7: Ask for multi-simulation mode
use_multi_sim = get_multi_simulation_choice()
alpha_count_per_slot = None
if use_multi_sim:
alpha_count_per_slot = get_alpha_count_per_slot()
# Convert to multi-alphas format
original_count = len(expressions_with_settings)
expressions_with_settings = wqb.to_multi_alphas(expressions_with_settings, alpha_count_per_slot)
print(f"\n✓ 已转换为多重回测(multi-simulatioin)格式")
print(f"📊 原始表达式数: {original_count}")
print(f"🎯 每槽alpha数: {alpha_count_per_slot}")
# Calculate how many expressions will be processed
print(f"🔄 使用 {concurrent_count} 个并发回测")
# Step 4: Write multi-simulation info to log if applicable
if use_multi_sim and alpha_count_per_slot and logger:
multi_sim_msg = (f"[MULTI-SIMULATION MODE] 以下是multi simulation的记录,"
f"你的设计是1个multi simulation中有{alpha_count_per_slot}个alpha,"
f"因此需将实际回测数乘以该乘数,才得到实际已完成的Alpha个数。")
logger.info("="*80)
logger.info(multi_sim_msg)
logger.info("="*80)
# Step 5: Start log monitoring in background
stop_log_monitor = threading.Event()
log_thread = threading.Thread(
target=monitor_log_file,
args=(logger, stop_log_monitor, use_multi_sim, alpha_count_per_slot),
daemon=True
)
log_thread.start()
# Step 6: Run simulations
print("\n" + "="*60)
print("运行回测")
print("="*60)
if use_multi_sim:
print("开始多重回测(multi-simulatioin)并发回测...")
else:
print("开始并发回测...")
try:
resps = await wqbs.concurrent_simulate(
expressions_with_settings,
concurrent_count,
log_gap=10
)
# Stop log monitoring
stop_log_monitor.set()
# Print results
print("\n" + "="*60)
print("回测结果")
print("="*60)
if use_multi_sim:
print(f"成功完成 {len(resps)} 个多重回测(multi-simulatioin)槽的回测")
else:
print(f"成功完成 {len(resps)} 个回测")
print("\nAlpha IDs:")
for i, resp in enumerate(resps):
try:
alpha_id = resp.json()['alpha']
print(f" {i+1:4d}. {alpha_id}")
except Exception as e:
print(f" {i+1:4d}. 错误: {e}")
except KeyboardInterrupt:
print("\n\n 回测被用户中断")
stop_log_monitor.set()
except Exception as e:
print(f"\n❌ 回测错误: {e}")
stop_log_monitor.set()
print("\n✅ 处理完成!")
if __name__ == '__main__':
asyncio.run(main())