You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
wqb-server/app.py

1366 lines
52 KiB

"""
BRAIN 表达式模板解码器 - Flask 网络应用程序
一个完整的网络应用程序,用于解码字符串模板并与 WorldQuant BRAIN 集成
"""
# 自动安装缺失的依赖
import subprocess
import sys
import os
def install_requirements():
"""如果缺少必需的包,则从 requirements.txt 安装"""
print("🔍 检查并安装必需的依赖...")
print("📋 验证 BRAIN 表达式模板解码器所需的包...")
# 获取此脚本所在的目录
script_dir = os.path.dirname(os.path.abspath(__file__))
# 检查 requirements.txt 是否存在于脚本目录中
req_file = os.path.join(script_dir, 'requirements.txt')
if not os.path.exists(req_file):
print("❌ 错误: 未找到 requirements.txt!")
print(f"查找路径: {req_file}")
return False
# 如果存在镜像配置则读取
mirror_url = 'https://pypi.tuna.tsinghua.edu.cn/simple' # 默认为清华镜像
mirror_config_file = os.path.join(script_dir, 'mirror_config.txt')
if os.path.exists(mirror_config_file):
try:
with open(mirror_config_file, 'r', encoding='utf-8') as f:
for line in f:
line = line.strip()
if line and not line.startswith('#') and line.startswith('http'):
mirror_url = line
break
except Exception as e:
print(f"警告: 无法读取镜像配置: {e}")
# 尝试导入主要包以检查是否已安装
packages_to_check = {
'flask': 'flask',
'flask_cors': 'flask-cors',
'requests': 'requests',
'pandas': 'pandas',
'PyPDF2': 'PyPDF2',
'docx': 'python-docx',
'pdfplumber': 'pdfplumber',
'fitz': 'PyMuPDF',
'cozepy': 'cozepy',
'lxml': 'lxml',
'bs4': 'beautifulsoup4'
}
missing_packages = []
for import_name, pip_name in packages_to_check.items():
try:
__import__(import_name)
except ImportError:
missing_packages.append(pip_name)
print(f"缺失的包: {pip_name} (导入名: {import_name})")
if missing_packages:
print(f" 检测到缺失的包: {', '.join(missing_packages)}")
print("📦 从 requirements.txt 安装依赖...")
print(f"🌐 使用镜像: {mirror_url}")
try:
# 使用配置的镜像安装所有 requirements
subprocess.check_call([
sys.executable, '-m', 'pip', 'install',
'-i', mirror_url,
'-r', req_file
])
print("✅ 所有依赖安装成功!")
return True
except subprocess.CalledProcessError:
print(f"❌ 错误: 使用 {mirror_url} 安装依赖失败")
print("🔄 尝试使用默认 PyPI...")
try:
# 回退到默认 PyPI
subprocess.check_call([sys.executable, '-m', 'pip', 'install', '-r', req_file])
print("✅ 所有依赖安装成功!")
return True
except subprocess.CalledProcessError:
print("❌ 错误: 安装依赖失败。请手动运行:")
print(f" {sys.executable} -m pip install -i {mirror_url} -r requirements.txt")
return False
else:
print("✅ 所有必需的依赖已安装!")
return True
# 在导入前检查并安装依赖
# 每次导入模块时都会运行,但只在需要时安装
def check_and_install_dependencies():
"""检查并在需要时安装依赖"""
if not globals().get('_dependencies_checked'):
if install_requirements():
globals()['_dependencies_checked'] = True
return True
else:
print("\n请手动安装依赖并重试。")
return False
return True
# 导入此模块时始终运行依赖检查
print("🚀 初始化 BRAIN 表达式模板解码器...")
if not check_and_install_dependencies():
if __name__ == "__main__":
sys.exit(1)
else:
print(" 警告: 可能缺少某些依赖。请运行 'pip install -r requirements.txt'")
print("🔄 继续导入,但某些功能可能无法正常工作。")
# 现在导入包
try:
from flask import Flask, render_template, request, jsonify, session as flask_session
from flask_cors import CORS
import requests
import json
import time
import os
from datetime import datetime
print("📚 核心包导入成功!")
except ImportError as e:
print(f"❌ 导入核心包失败: {e}")
print("请运行: pip install -r requirements.txt")
if __name__ == "__main__":
sys.exit(1)
raise
app = Flask(__name__)
app.secret_key = 'brain_template_decoder_secret_key_change_in_production'
CORS(app)
print("🌐 Flask 应用程序已初始化,支持 CORS!")
# BRAIN API 配置
BRAIN_API_BASE = 'https://api.worldquantbrain.com'
# 存储 BRAIN 会话(在生产环境中,使用适当的会话管理如 Redis)
brain_sessions = {}
print("🧠 BRAIN API 集成已配置!")
def sign_in_to_brain(username, password):
"""使用重试逻辑和生物识别认证支持登录 BRAIN API"""
from urllib.parse import urljoin
session = requests.Session()
session.auth = (username, password)
retry_count = 0
max_retries = 3
while retry_count < max_retries:
try:
response = session.post(f'{BRAIN_API_BASE}/authentication')
# 检查是否需要生物识别认证
if response.status_code == 401:
# 检查是否为生物识别认证要求
if response.headers.get("WWW-Authenticate") == "persona":
# 获取 location 头
location = response.headers.get("Location")
if location:
# 构建生物识别认证的完整 URL
biometric_url = urljoin(response.url, location)
# 返回特殊响应,指示需要生物识别认证
return {
'requires_biometric': True,
'biometric_url': biometric_url,
'session': session,
'location': location
}
else:
raise Exception("需要生物识别认证但未提供 Location 头")
else:
# 常规认证失败
raise requests.HTTPError("认证失败: 用户名或密码无效")
# 如果执行到这里,认证成功
response.raise_for_status()
print("认证成功。")
return session
except requests.HTTPError as e:
if "Invalid username or password" in str(e):
raise # 对于无效凭证不重试
print(f"发生 HTTP 错误: {e}。正在重试...")
retry_count += 1
if retry_count >= max_retries:
raise
time.sleep(2)
except Exception as e:
print(f"认证期间发生错误: {e}")
retry_count += 1
if retry_count >= max_retries:
raise
time.sleep(2)
# 路由
@app.route('/')
def index():
"""主应用程序页面"""
return render_template('index.html')
@app.route('/simulator')
def simulator():
"""用户友好的模拟器界面"""
return render_template('simulator.html')
@app.route('/api/simulator/logs', methods=['GET'])
def get_simulator_logs():
"""获取模拟器目录中可用的日志文件"""
try:
import glob
import os
from datetime import datetime
# 在当前目录和模拟器目录中查找日志文件
script_dir = os.path.dirname(os.path.abspath(__file__))
simulator_dir = os.path.join(script_dir, 'simulator')
log_files = []
# 检查当前目录和模拟器目录
for directory in [script_dir, simulator_dir]:
if os.path.exists(directory):
pattern = os.path.join(directory, 'wqb*.log')
for log_file in glob.glob(pattern):
try:
stat = os.stat(log_file)
log_files.append({
'filename': os.path.basename(log_file),
'path': log_file,
'size': f"{stat.st_size / 1024:.1f} KB",
'modified': datetime.fromtimestamp(stat.st_mtime).strftime('%Y-%m-%d %H:%M:%S'),
'mtime': stat.st_mtime
})
except Exception as e:
print(f"读取日志文件 {log_file} 时出错: {e}")
# 按修改时间排序(最新的在前)
log_files.sort(key=lambda x: x['mtime'], reverse=True)
# 查找最新的日志文件
latest = log_files[0]['filename'] if log_files else None
return jsonify({
'logs': log_files,
'latest': latest,
'count': len(log_files)
})
except Exception as e:
return jsonify({'error': f'获取日志文件时出错: {str(e)}'}), 500
@app.route('/api/simulator/logs/<filename>', methods=['GET'])
def get_simulator_log_content(filename):
"""获取特定日志文件的内容"""
try:
import os
# 安全性: 只允许使用安全名称的日志文件
if not filename.startswith('wqb') or not filename.endswith('.log'):
return jsonify({'error': '无效的日志文件名'}), 400
script_dir = os.path.dirname(os.path.abspath(__file__))
simulator_dir = os.path.join(script_dir, 'simulator')
# 在两个目录中查找文件
log_path = None
for directory in [script_dir, simulator_dir]:
potential_path = os.path.join(directory, filename)
if os.path.exists(potential_path):
log_path = potential_path
break
if not log_path:
return jsonify({'error': '未找到日志文件'}), 404
# 使用多种编码尝试读取文件内容
content = None
encodings_to_try = ['utf-8', 'gbk', 'gb2312', 'big5', 'latin-1', 'cp1252']
for encoding in encodings_to_try:
try:
with open(log_path, 'r', encoding=encoding) as f:
content = f.read()
print(f"使用 {encoding} 编码成功读取日志文件")
break
except UnicodeDecodeError:
continue
except Exception as e:
print(f"使用 {encoding} 读取时出错: {e}")
continue
if content is None:
# 最后手段: 以二进制读取并使用错误处理解码
try:
with open(log_path, 'rb') as f:
raw_content = f.read()
content = raw_content.decode('utf-8', errors='replace')
print("使用 UTF-8 带错误替换读取日志内容")
except Exception as e:
content = f"错误: 无法解码文件内容 - {str(e)}"
response = jsonify({
'content': content,
'filename': filename,
'size': len(content)
})
response.headers['Content-Type'] = 'application/json; charset=utf-8'
return response
except Exception as e:
return jsonify({'error': f'读取日志文件时出错: {str(e)}'}), 500
@app.route('/api/simulator/test-connection', methods=['POST'])
def test_simulator_connection():
"""测试模拟器的 BRAIN API 连接"""
try:
data = request.get_json()
username = data.get('username')
password = data.get('password')
if not username or not password:
return jsonify({'error': '需要用户名和密码'}), 400
# 使用现有的 sign_in_to_brain 函数测试连接
result = sign_in_to_brain(username, password)
# 处理生物识别认证要求
if isinstance(result, dict) and result.get('requires_biometric'):
return jsonify({
'success': False,
'error': '需要生物识别认证。请先使用主界面完成认证。',
'requires_biometric': True
})
# 测试简单的 API 调用以验证连接
brain_session = result
response = brain_session.get(f'{BRAIN_API_BASE}/data-fields/open')
if response.ok:
return jsonify({
'success': True,
'message': '连接成功'
})
else:
return jsonify({
'success': False,
'error': f'API 测试失败: {response.status_code}'
})
except Exception as e:
return jsonify({
'success': False,
'error': f'连接失败: {str(e)}'
})
@app.route('/api/simulator/run', methods=['POST'])
def run_simulator_with_params():
"""在新终端中使用用户提供的参数运行模拟器"""
try:
import subprocess
import threading
import json
import os
import tempfile
import sys
import time
# 获取表单数据
json_file = request.files.get('jsonFile')
username = request.form.get('username')
password = request.form.get('password')
start_position = int(request.form.get('startPosition', 0))
concurrent_count = int(request.form.get('concurrentCount', 3))
random_shuffle = request.form.get('randomShuffle') == 'true'
use_multi_sim = request.form.get('useMultiSim') == 'true'
alpha_count_per_slot = int(request.form.get('alphaCountPerSlot', 3))
if not json_file or not username or not password:
return jsonify({'error': '缺少必需参数'}), 400
# 验证并读取 JSON 文件
try:
json_content = json_file.read().decode('utf-8')
expressions_data = json.loads(json_content)
if not isinstance(expressions_data, list):
return jsonify({'error': 'JSON 文件必须包含表达式数组'}), 400
except Exception as e:
return jsonify({'error': f'无效的 JSON 文件: {str(e)}'}), 400
# 获取路径
script_dir = os.path.dirname(os.path.abspath(__file__))
simulator_dir = os.path.join(script_dir, 'simulator')
# 为自动化运行创建临时文件
temp_json_path = os.path.join(simulator_dir, f'temp_expressions_{int(time.time())}.json')
temp_script_path = os.path.join(simulator_dir, f'temp_automated_{int(time.time())}.py')
temp_batch_path = os.path.join(simulator_dir, f'temp_run_{int(time.time())}.bat')
try:
# 将 JSON 数据保存到临时文件
with open(temp_json_path, 'w', encoding='utf-8') as f:
json.dump(expressions_data, f, ensure_ascii=False, indent=2)
# 创建调用 automated_main 的自动化脚本
script_content = f'''
import asyncio
import sys
import os
import json
# 将当前目录添加到路径
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
import simulator_wqb
async def run_automated():
"""使用 Web 界面的参数运行自动化模拟器"""
try:
# 加载 JSON 数据
with open(r"{temp_json_path}", 'r', encoding='utf-8') as f:
json_content = f.read()
# 使用参数调用 automated_main
result = await simulator_wqb.automated_main(
json_file_content=json_content,
username="{username}",
password="{password}",
start_position={start_position},
concurrent_count={concurrent_count},
random_shuffle={random_shuffle},
use_multi_sim={use_multi_sim},
alpha_count_per_slot={alpha_count_per_slot}
)
if result['success']:
print("\\n" + "="*60)
print("🎉 Web 界面自动化成功 🎉")
print("="*60)
print(f"✅ 总模拟次数: {{result['results']['total']}}")
print(f"✅ 成功: {{result['results']['successful']}}")
print(f"❌ 失败: {{result['results']['failed']}}")
if result['results']['alphaIds']:
print(f"📊 生成 {{len(result['results']['alphaIds'])}} 个 Alpha ID")
print("="*60)
else:
print("\\n" + "="*60)
print("❌ Web 界面自动化失败")
print("="*60)
print(f"错误: {{result['error']}}")
print("="*60)
except Exception as e:
print(f"\\n❌ 脚本执行错误: {{e}}")
finally:
# 清理临时文件
try:
if os.path.exists(r"{temp_json_path}"):
os.remove(r"{temp_json_path}")
if os.path.exists(r"{temp_script_path}"):
os.remove(r"{temp_script_path}")
if os.path.exists(r"{temp_batch_path}"):
os.remove(r"{temp_batch_path}")
except:
pass
print("\\n🔄 按任意键关闭此窗口...")
input()
if __name__ == '__main__':
asyncio.run(run_automated())
'''
# 保存脚本
with open(temp_script_path, 'w', encoding='utf-8') as f:
f.write(script_content)
# 为 Windows 创建批处理文件
batch_content = f'''@echo off
cd /d "{simulator_dir}"
python "{os.path.basename(temp_script_path)}"
'''
with open(temp_batch_path, 'w', encoding='utf-8') as f:
f.write(batch_content)
# 在新终端中启动
def launch_simulator():
try:
if os.name == 'nt': # Windows
# 使用批处理文件避免路径问题
subprocess.Popen([
temp_batch_path
], creationflags=subprocess.CREATE_NEW_CONSOLE)
else: # Unix 类系统
# 尝试不同的终端模拟器
terminals = ['gnome-terminal', 'xterm', 'konsole', 'terminal']
for terminal in terminals:
try:
if terminal == 'gnome-terminal':
subprocess.Popen([
terminal, '--working-directory', simulator_dir,
'--', 'python3', os.path.basename(temp_script_path)
])
else:
subprocess.Popen([
terminal, '-e',
f'cd "{simulator_dir}" && python3 "{os.path.basename(temp_script_path)}"'
])
break
except FileNotFoundError:
continue
else:
# 回退: 如果找不到终端则在后台运行
subprocess.Popen([
sys.executable, temp_script_path
], cwd=simulator_dir)
except Exception as e:
print(f"启动模拟器时出错: {e}")
# 在单独的线程中启动模拟器
thread = threading.Thread(target=launch_simulator)
thread.daemon = True
thread.start()
return jsonify({
'success': True,
'message': '模拟器已在新终端窗口中启动',
'parameters': {
'expressions_count': len(expressions_data),
'concurrent_count': concurrent_count,
'use_multi_sim': use_multi_sim,
'alpha_count_per_slot': alpha_count_per_slot if use_multi_sim else None
}
})
except Exception as e:
# 出错时清理
try:
if os.path.exists(temp_json_path):
os.remove(temp_json_path)
if os.path.exists(temp_script_path):
os.remove(temp_script_path)
if os.path.exists(temp_batch_path):
os.remove(temp_batch_path)
except:
pass
raise e
except Exception as e:
return jsonify({'error': f'运行模拟器失败: {str(e)}'}), 500
@app.route('/api/simulator/stop', methods=['POST'])
def stop_simulator():
"""停止运行的模拟器"""
try:
# 这是一个占位符 - 在生产环境中,您需要
# 实现适当的进程管理来停止运行的模拟
return jsonify({
'success': True,
'message': '停止信号已发送'
})
except Exception as e:
return jsonify({'error': f'停止模拟器失败: {str(e)}'}), 500
@app.route('/api/authenticate', methods=['POST'])
def authenticate():
"""使用 BRAIN API 进行认证"""
try:
data = request.get_json()
username = data.get('username')
password = data.get('password')
if not username or not password:
return jsonify({'error': '需要用户名和密码'}), 400
# 使用 BRAIN 进行认证
result = sign_in_to_brain(username, password)
# 检查是否需要生物识别认证
if isinstance(result, dict) and result.get('requires_biometric'):
# 临时存储具有生物识别待处理状态的会话
session_id = f"{username}_{int(time.time())}_biometric_pending"
brain_sessions[session_id] = {
'session': result['session'],
'username': username,
'timestamp': time.time(),
'biometric_pending': True,
'biometric_location': result['location']
}
# 在 Flask 会话中存储会话 ID
flask_session['brain_session_id'] = session_id
return jsonify({
'success': False,
'requires_biometric': True,
'biometric_url': result['biometric_url'],
'session_id': session_id,
'message': '请通过访问提供的 URL 完成生物识别认证'
})
# 常规成功认证
brain_session = result
# 存储会话
session_id = f"{username}_{int(time.time())}"
brain_sessions[session_id] = {
'session': brain_session,
'username': username,
'timestamp': time.time()
}
# 在 Flask 会话中存储会话 ID
flask_session['brain_session_id'] = session_id
return jsonify({
'success': True,
'session_id': session_id,
'message': '认证成功'
})
except requests.HTTPError as e:
if e.response.status_code == 401:
return jsonify({'error': '用户名或密码无效'}), 401
else:
return jsonify({'error': f'认证失败: {str(e)}'}), 500
except Exception as e:
return jsonify({'error': f'认证错误: {str(e)}'}), 500
@app.route('/api/complete-biometric', methods=['POST'])
def complete_biometric():
"""在用户在浏览器中完成生物识别认证后完成认证"""
try:
from urllib.parse import urljoin
session_id = request.headers.get('Session-ID') or flask_session.get('brain_session_id')
if not session_id or session_id not in brain_sessions:
return jsonify({'error': '无效或过期的会话'}), 401
session_info = brain_sessions[session_id]
# 检查此会话是否正在等待生物识别完成
if not session_info.get('biometric_pending'):
return jsonify({'error': '会话不处于生物识别认证待处理状态'}), 400
brain_session = session_info['session']
location = session_info['biometric_location']
# 完成生物识别认证
try:
# 向 location 发出后续 POST 请求
complete_url = urljoin(f'{BRAIN_API_BASE}/authentication', location)
response = brain_session.post(complete_url)
response.raise_for_status()
# 更新会话信息 - 移除生物识别待处理状态
session_info['biometric_pending'] = False
del session_info['biometric_location']
# 创建没有 biometric_pending 后缀的新会话 ID
new_session_id = f"{session_info['username']}_{int(time.time())}"
brain_sessions[new_session_id] = {
'session': brain_session,
'username': session_info['username'],
'timestamp': time.time()
}
# 删除旧会话
del brain_sessions[session_id]
# 更新 Flask 会话
flask_session['brain_session_id'] = new_session_id
return jsonify({
'success': True,
'session_id': new_session_id,
'message': '生物识别认证成功完成'
})
except requests.HTTPError as e:
return jsonify({
'error': f'完成生物识别认证失败: {str(e)}'
}), 500
except Exception as e:
return jsonify({'error': f'完成生物识别认证时出错: {str(e)}'}), 500
@app.route('/api/operators', methods=['GET'])
def get_operators():
"""从 BRAIN API 获取用户运算符"""
try:
session_id = request.headers.get('Session-ID') or flask_session.get('brain_session_id')
if not session_id or session_id not in brain_sessions:
return jsonify({'error': '无效或过期的会话'}), 401
session_info = brain_sessions[session_id]
brain_session = session_info['session']
# 首先尝试不使用分页参数(大多数 API 一次返回所有运算符)
try:
response = brain_session.get(f'{BRAIN_API_BASE}/operators')
response.raise_for_status()
data = response.json()
# 如果是列表,我们获得了所有运算符
if isinstance(data, list):
all_operators = data
print(f"从 BRAIN API 获取了 {len(all_operators)} 个运算符(直接)")
# 如果是包含 results 的字典,处理分页
elif isinstance(data, dict) and 'results' in data:
all_operators = []
total_count = data.get('count', len(data['results']))
print(f"找到 {total_count} 个总运算符,正在获取全部...")
# 获取第一批
all_operators.extend(data['results'])
# 如果需要,获取剩余批次
limit = 100
offset = len(data['results'])
while len(all_operators) < total_count:
params = {'limit': limit, 'offset': offset}
batch_response = brain_session.get(f'{BRAIN_API_BASE}/operators', params=params)
batch_response.raise_for_status()
batch_data = batch_response.json()
if isinstance(batch_data, dict) and 'results' in batch_data:
batch_operators = batch_data['results']
if not batch_operators: # 没有更多数据
break
all_operators.extend(batch_operators)
offset += len(batch_operators)
else:
break
print(f"从 BRAIN API 获取了 {len(all_operators)} 个运算符(分页)")
else:
# 未知格式,视为空
all_operators = []
print("运算符 API 的响应格式未知")
except Exception as e:
print(f"获取运算符时出错: {str(e)}")
# 回退: 尝试显式分页
all_operators = []
limit = 100
offset = 0
while True:
params = {'limit': limit, 'offset': offset}
response = brain_session.get(f'{BRAIN_API_BASE}/operators', params=params)
response.raise_for_status()
data = response.json()
if isinstance(data, list):
all_operators.extend(data)
if len(data) < limit:
break
elif isinstance(data, dict) and 'results' in data:
batch_operators = data['results']
all_operators.extend(batch_operators)
if len(batch_operators) < limit:
break
else:
break
offset += limit
print(f"从 BRAIN API 获取了 {len(all_operators)} 个运算符(回退)")
# 提取名称、类别、描述、定义和其他字段(如果可用)
filtered_operators = []
for op in all_operators:
operator_data = {
'name': op['name'],
'category': op['category']
}
# 如果可用则包含描述
if 'description' in op and op['description']:
operator_data['description'] = op['description']
# 如果可用则包含定义
if 'definition' in op and op['definition']:
operator_data['definition'] = op['definition']
# 如果可用则包含使用计数
if 'usageCount' in op:
operator_data['usageCount'] = op['usageCount']
# 如果可用则包含其他有用字段
if 'example' in op and op['example']:
operator_data['example'] = op['example']
filtered_operators.append(operator_data)
return jsonify(filtered_operators)
except Exception as e:
print(f"获取运算符时出错: {str(e)}")
return jsonify({'error': f'获取运算符失败: {str(e)}'}), 500
@app.route('/api/datafields', methods=['GET'])
def get_datafields():
"""从 BRAIN API 获取数据字段"""
try:
session_id = request.headers.get('Session-ID') or flask_session.get('brain_session_id')
if not session_id or session_id not in brain_sessions:
return jsonify({'error': '无效或过期的会话'}), 401
session_info = brain_sessions[session_id]
brain_session = session_info['session']
# 获取参数
region = request.args.get('region', 'USA')
delay = request.args.get('delay', '1')
universe = request.args.get('universe', 'TOP3000')
dataset_id = request.args.get('dataset_id', 'fundamental6')
search = ''
# 基于笔记本实现构建 URL 模板
if len(search) == 0:
url_template = f"{BRAIN_API_BASE}/data-fields?" + \
f"&instrumentType=EQUITY" + \
f"&region={region}&delay={delay}&universe={universe}&dataset.id={dataset_id}&limit=50" + \
"&offset={x}"
# 从第一个请求获取计数
first_response = brain_session.get(url_template.format(x=0))
first_response.raise_for_status()
count = first_response.json()['count']
else:
url_template = f"{BRAIN_API_BASE}/data-fields?" + \
f"&instrumentType=EQUITY" + \
f"&region={region}&delay={delay}&universe={universe}&limit=50" + \
f"&search={search}" + \
"&offset={x}"
count = 100 # 搜索查询的默认值
# 分批获取所有数据字段
datafields_list = []
for x in range(0, count, 50):
response = brain_session.get(url_template.format(x=x))
response.raise_for_status()
datafields_list.append(response.json()['results'])
# 展平列表
datafields_list_flat = [item for sublist in datafields_list for item in sublist]
# 过滤字段,只包含必要信息
filtered_fields = [
{
'id': field['id'],
'description': field['description'],
'type': field['type'],
'coverage': field.get('coverage', 0),
'userCount': field.get('userCount', 0),
'alphaCount': field.get('alphaCount', 0)
}
for field in datafields_list_flat
]
return jsonify(filtered_fields)
except Exception as e:
return jsonify({'error': f'获取数据字段失败: {str(e)}'}), 500
@app.route('/api/dataset-description', methods=['GET'])
def get_dataset_description():
"""从 BRAIN API 获取数据集描述"""
try:
session_id = request.headers.get('Session-ID') or flask_session.get('brain_session_id')
if not session_id or session_id not in brain_sessions:
return jsonify({'error': '无效或过期的会话'}), 401
session_info = brain_sessions[session_id]
brain_session = session_info['session']
# 获取参数
region = request.args.get('region', 'USA')
delay = request.args.get('delay', '1')
universe = request.args.get('universe', 'TOP3000')
dataset_id = request.args.get('dataset_id', 'analyst10')
# 构建数据集描述的 URL
url = f"{BRAIN_API_BASE}/data-sets/{dataset_id}?" + \
f"instrumentType=EQUITY&region={region}&delay={delay}&universe={universe}"
print(f"从以下位置获取数据集描述: {url}")
# 向 BRAIN API 发出请求
response = brain_session.get(url)
response.raise_for_status()
data = response.json()
description = data.get('description', '无描述可用')
print(f"数据集描述已检索: {description[:100]}...")
return jsonify({
'success': True,
'description': description,
'dataset_id': dataset_id
})
except Exception as e:
print(f"数据集描述错误: {str(e)}")
return jsonify({'error': f'获取数据集描述失败: {str(e)}'}), 500
@app.route('/api/status', methods=['GET'])
def check_status():
"""检查会话是否仍然有效"""
try:
session_id = request.headers.get('Session-ID') or flask_session.get('brain_session_id')
if not session_id or session_id not in brain_sessions:
return jsonify({'valid': False})
session_info = brain_sessions[session_id]
# 检查会话是否不太旧(24 小时)
if time.time() - session_info['timestamp'] > 86400:
del brain_sessions[session_id]
return jsonify({'valid': False})
# 检查生物识别认证是否待处理
if session_info.get('biometric_pending'):
return jsonify({
'valid': False,
'biometric_pending': True,
'username': session_info['username'],
'message': '生物识别认证待处理'
})
return jsonify({
'valid': True,
'username': session_info['username']
})
except Exception as e:
return jsonify({'error': f'状态检查失败: {str(e)}'}), 500
@app.route('/api/logout', methods=['POST'])
def logout():
"""注销并清理会话"""
try:
session_id = request.headers.get('Session-ID') or flask_session.get('brain_session_id')
if session_id and session_id in brain_sessions:
del brain_sessions[session_id]
if 'brain_session_id' in flask_session:
flask_session.pop('brain_session_id')
return jsonify({'success': True, 'message': '注销成功'})
except Exception as e:
return jsonify({'error': f'注销失败: {str(e)}'}), 500
@app.route('/api/test-expression', methods=['POST'])
def test_expression():
"""使用 BRAIN API 模拟测试表达式"""
try:
session_id = request.headers.get('Session-ID') or flask_session.get('brain_session_id')
if not session_id or session_id not in brain_sessions:
return jsonify({'error': '无效或过期的会话'}), 401
session_info = brain_sessions[session_id]
brain_session = session_info['session']
# 从请求获取模拟数据
simulation_data = request.get_json()
# 确保必需字段存在
if 'type' not in simulation_data:
simulation_data['type'] = 'REGULAR'
# 确保设置具有必需字段
if 'settings' not in simulation_data:
simulation_data['settings'] = {}
# 为缺失的设置设置默认值
default_settings = {
'instrumentType': 'EQUITY',
'region': 'USA',
'universe': 'TOP3000',
'delay': 1,
'decay': 15,
'neutralization': 'SUBINDUSTRY',
'truncation': 0.08,
'pasteurization': 'ON',
'testPeriod': 'P1Y6M',
'unitHandling': 'VERIFY',
'nanHandling': 'OFF',
'language': 'FASTEXPR',
'visualization': False
}
for key, value in default_settings.items():
if key not in simulation_data['settings']:
simulation_data['settings'][key] = value
# 将字符串布尔值转换为实际布尔值
if isinstance(simulation_data['settings'].get('visualization'), str):
viz_value = simulation_data['settings']['visualization'].lower()
simulation_data['settings']['visualization'] = viz_value == 'true'
# 发送模拟请求(遵循笔记本模式)
try:
message = {}
simulation_response = brain_session.post(f'{BRAIN_API_BASE}/simulations', json=simulation_data)
# 检查是否收到 Location 头(遵循笔记本模式)
if 'Location' in simulation_response.headers:
# 跟随 location 获取实际状态
message = brain_session.get(simulation_response.headers['Location']).json()
# 检查模拟是否正在运行或已完成
if 'progress' in message.keys():
info_to_print = "模拟正在运行"
return jsonify({
'success': True,
'status': 'RUNNING',
'message': info_to_print,
'full_response': message
})
else:
# 返回完整消息,如笔记本中所示
return jsonify({
'success': message.get('status') != 'ERROR',
'status': message.get('status', 'UNKNOWN'),
'message': str(message),
'full_response': message
})
else:
# 尝试从响应体获取错误(遵循笔记本模式)
try:
message = simulation_response.json()
return jsonify({
'success': False,
'status': 'ERROR',
'message': str(message),
'full_response': message
})
except:
return jsonify({
'success': False,
'status': 'ERROR',
'message': '网络连接错误',
'full_response': {}
})
except Exception as e:
return jsonify({
'success': False,
'status': 'ERROR',
'message': '网络连接错误',
'full_response': {'error': str(e)}
})
except Exception as e:
import traceback
return jsonify({
'success': False,
'status': 'ERROR',
'message': f'测试表达式失败: {str(e)}',
'full_response': {'error': str(e), 'traceback': traceback.format_exc()}
}), 500
@app.route('/api/test-operators', methods=['GET'])
def test_operators():
"""测试端点以检查原始 BRAIN 运算符 API 响应"""
try:
session_id = request.headers.get('Session-ID') or flask_session.get('brain_session_id')
if not session_id or session_id not in brain_sessions:
return jsonify({'error': '无效或过期的会话'}), 401
session_info = brain_sessions[session_id]
brain_session = session_info['session']
# 从 BRAIN API 获取原始响应
response = brain_session.get(f'{BRAIN_API_BASE}/operators')
response.raise_for_status()
data = response.json()
# 返回原始响应信息用于调试
result = {
'type': str(type(data)),
'is_list': isinstance(data, list),
'is_dict': isinstance(data, dict),
'length': len(data) if isinstance(data, list) else None,
'keys': list(data.keys()) if isinstance(data, dict) else None,
'count_key': data.get('count') if isinstance(data, dict) else None,
'first_few_items': data[:3] if isinstance(data, list) else (data.get('results', [])[:3] if isinstance(data, dict) else None)
}
return jsonify(result)
except Exception as e:
return jsonify({'error': f'测试失败: {str(e)}'}), 500
# 导入蓝图
try:
from blueprints import idea_house_bp, paper_analysis_bp, feature_engineering_bp
print("📦 蓝图导入成功!")
except ImportError as e:
print(f"❌ 导入蓝图失败: {e}")
print("某些功能可能不可用。")
# 注册蓝图
app.register_blueprint(idea_house_bp, url_prefix='/idea-house')
app.register_blueprint(paper_analysis_bp, url_prefix='/paper-analysis')
app.register_blueprint(feature_engineering_bp, url_prefix='/feature-engineering')
print("🔧 所有蓝图注册成功!")
print(" - idea库: /idea-house")
print(" - 论文分析: /paper-analysis")
print(" - 特征工程: /feature-engineering")
# 模板管理路由
# 获取此脚本所在的目录以获取模板
script_dir = os.path.dirname(os.path.abspath(__file__))
TEMPLATES_DIR = os.path.join(script_dir, 'custom_templates')
# 确保模板目录存在
if not os.path.exists(TEMPLATES_DIR):
os.makedirs(TEMPLATES_DIR)
print(f"📁 已创建模板目录: {TEMPLATES_DIR}")
else:
print(f"📁 模板目录已就绪: {TEMPLATES_DIR}")
print("✅ BRAIN 表达式模板解码器完全初始化!")
print("🎯 准备处理模板并与 BRAIN API 集成!")
@app.route('/api/templates', methods=['GET'])
def get_templates():
"""获取所有自定义模板"""
try:
templates = []
templates_file = os.path.join(TEMPLATES_DIR, 'templates.json')
if os.path.exists(templates_file):
with open(templates_file, 'r', encoding='utf-8') as f:
templates = json.load(f)
return jsonify(templates)
except Exception as e:
return jsonify({'error': f'加载模板时出错: {str(e)}'}), 500
@app.route('/api/templates', methods=['POST'])
def save_template():
"""保存新的自定义模板"""
try:
data = request.get_json()
name = data.get('name', '').strip()
description = data.get('description', '').strip()
expression = data.get('expression', '').strip()
template_configurations = data.get('templateConfigurations', {})
if not name or not expression:
return jsonify({'error': '名称和表达式是必需的'}), 400
# 加载现有模板
templates_file = os.path.join(TEMPLATES_DIR, 'templates.json')
templates = []
if os.path.exists(templates_file):
with open(templates_file, 'r', encoding='utf-8') as f:
templates = json.load(f)
# 检查重复名称
existing_index = next((i for i, t in enumerate(templates) if t['name'] == name), None)
new_template = {
'name': name,
'description': description,
'expression': expression,
'templateConfigurations': template_configurations,
'createdAt': datetime.now().isoformat()
}
if existing_index is not None:
# 更新现有模板但保留 createdAt(如果存在)
if 'createdAt' in templates[existing_index]:
new_template['createdAt'] = templates[existing_index]['createdAt']
new_template['updatedAt'] = datetime.now().isoformat()
templates[existing_index] = new_template
message = f'模板 "{name}" 更新成功'
else:
# 添加新模板
templates.append(new_template)
message = f'模板 "{name}" 保存成功'
# 保存到文件
with open(templates_file, 'w', encoding='utf-8') as f:
json.dump(templates, f, indent=2, ensure_ascii=False)
return jsonify({'success': True, 'message': message})
except Exception as e:
return jsonify({'error': f'保存模板时出错: {str(e)}'}), 500
@app.route('/api/templates/<int:template_id>', methods=['DELETE'])
def delete_template(template_id):
"""删除自定义模板"""
try:
templates_file = os.path.join(TEMPLATES_DIR, 'templates.json')
templates = []
if os.path.exists(templates_file):
with open(templates_file, 'r', encoding='utf-8') as f:
templates = json.load(f)
if 0 <= template_id < len(templates):
deleted_template = templates.pop(template_id)
# 保存更新后的模板
with open(templates_file, 'w', encoding='utf-8') as f:
json.dump(templates, f, indent=2, ensure_ascii=False)
return jsonify({'success': True, 'message': f'模板 "{deleted_template["name"]}" 删除成功'})
else:
return jsonify({'error': '未找到模板'}), 404
except Exception as e:
return jsonify({'error': f'删除模板时出错: {str(e)}'}), 500
@app.route('/api/templates/export', methods=['GET'])
def export_templates():
"""将所有模板导出为 JSON"""
try:
templates_file = os.path.join(TEMPLATES_DIR, 'templates.json')
templates = []
if os.path.exists(templates_file):
with open(templates_file, 'r', encoding='utf-8') as f:
templates = json.load(f)
return jsonify(templates)
except Exception as e:
return jsonify({'error': f'导出模板时出错: {str(e)}'}), 500
@app.route('/api/templates/import', methods=['POST'])
def import_templates():
"""从 JSON 导入模板"""
try:
data = request.get_json()
imported_templates = data.get('templates', [])
overwrite = data.get('overwrite', False)
if not isinstance(imported_templates, list):
return jsonify({'error': '无效的模板格式'}), 400
# 验证模板结构
valid_templates = []
for template in imported_templates:
if (isinstance(template, dict) and
'name' in template and 'expression' in template and
template['name'].strip() and template['expression'].strip()):
valid_templates.append({
'name': template['name'].strip(),
'description': template.get('description', '').strip(),
'expression': template['expression'].strip(),
'templateConfigurations': template.get('templateConfigurations', {}),
'createdAt': template.get('createdAt', datetime.now().isoformat())
})
if not valid_templates:
return jsonify({'error': '未找到有效模板'}), 400
# 加载现有模板
templates_file = os.path.join(TEMPLATES_DIR, 'templates.json')
existing_templates = []
if os.path.exists(templates_file):
with open(templates_file, 'r', encoding='utf-8') as f:
existing_templates = json.load(f)
# 处理重复项
duplicates = []
new_templates = []
for template in valid_templates:
existing_index = next((i for i, t in enumerate(existing_templates) if t['name'] == template['name']), None)
if existing_index is not None:
duplicates.append(template['name'])
if overwrite:
existing_templates[existing_index] = template
else:
new_templates.append(template)
# 添加新模板
existing_templates.extend(new_templates)
# 保存到文件
with open(templates_file, 'w', encoding='utf-8') as f:
json.dump(existing_templates, f, indent=2, ensure_ascii=False)
result = {
'success': True,
'imported': len(new_templates),
'duplicates': duplicates,
'overwritten': len(duplicates) if overwrite else 0
}
return jsonify(result)
except Exception as e:
return jsonify({'error': f'导入模板时出错: {str(e)}'}), 500
@app.route('/api/run-simulator', methods=['POST'])
def run_simulator():
"""运行 simulator_wqb.py 脚本"""
try:
import subprocess
import threading
from pathlib import Path
# 获取脚本路径(现在在模拟器子文件夹中)
script_dir = os.path.dirname(os.path.abspath(__file__))
simulator_dir = os.path.join(script_dir, 'simulator')
simulator_path = os.path.join(simulator_dir, 'simulator_wqb.py')
# 检查脚本是否存在
if not os.path.exists(simulator_path):
return jsonify({'error': '在模拟器文件夹中未找到 simulator_wqb.py'}), 404
# 在新终端窗口中运行脚本
def run_script():
try:
# 对于 Windows
if os.name == 'nt':
# 使用子进程和适当的工作目录(模拟器文件夹)
subprocess.Popen(['cmd', '/k', 'python', 'simulator_wqb.py'],
cwd=simulator_dir,
creationflags=subprocess.CREATE_NEW_CONSOLE)
else:
# 对于 Unix 类系统
subprocess.Popen(['gnome-terminal', '--working-directory', simulator_dir, '--', 'python3', 'simulator_wqb.py'])
except Exception as e:
print(f"运行模拟器时出错: {e}")
# 在单独线程中启动脚本
thread = threading.Thread(target=run_script)
thread.daemon = True
thread.start()
return jsonify({
'success': True,
'message': '模拟器脚本已在新终端窗口中启动'
})
except Exception as e:
return jsonify({'error': f'运行模拟器失败: {str(e)}'}), 500
if __name__ == '__main__':
port = 5190
print("启动 BRAIN 表达式模板解码器 Web 应用程序...")
print(f"应用程序将在 http://localhost:{port} 运行")
print("包含 BRAIN API 集成 - 不需要单独的代理!")
app.run(debug=True, host='0.0.0.0', port=port)