|
|
"""
|
|
|
BRAIN 表达式模板解码器 - Flask 网络应用程序
|
|
|
一个完整的网络应用程序,用于解码字符串模板并与 WorldQuant BRAIN 集成
|
|
|
"""
|
|
|
|
|
|
# 自动安装缺失的依赖
|
|
|
import subprocess
|
|
|
import sys
|
|
|
import os
|
|
|
|
|
|
def install_requirements():
|
|
|
"""如果缺少必需的包,则从 requirements.txt 安装"""
|
|
|
print("🔍 检查并安装必需的依赖...")
|
|
|
print("📋 验证 BRAIN 表达式模板解码器所需的包...")
|
|
|
|
|
|
# 获取此脚本所在的目录
|
|
|
script_dir = os.path.dirname(os.path.abspath(__file__))
|
|
|
|
|
|
# 检查 requirements.txt 是否存在于脚本目录中
|
|
|
req_file = os.path.join(script_dir, 'requirements.txt')
|
|
|
if not os.path.exists(req_file):
|
|
|
print("❌ 错误: 未找到 requirements.txt!")
|
|
|
print(f"查找路径: {req_file}")
|
|
|
return False
|
|
|
|
|
|
# 如果存在镜像配置则读取
|
|
|
mirror_url = 'https://pypi.tuna.tsinghua.edu.cn/simple' # 默认为清华镜像
|
|
|
mirror_config_file = os.path.join(script_dir, 'mirror_config.txt')
|
|
|
|
|
|
if os.path.exists(mirror_config_file):
|
|
|
try:
|
|
|
with open(mirror_config_file, 'r', encoding='utf-8') as f:
|
|
|
for line in f:
|
|
|
line = line.strip()
|
|
|
if line and not line.startswith('#') and line.startswith('http'):
|
|
|
mirror_url = line
|
|
|
break
|
|
|
except Exception as e:
|
|
|
print(f"警告: 无法读取镜像配置: {e}")
|
|
|
|
|
|
# 尝试导入主要包以检查是否已安装
|
|
|
packages_to_check = {
|
|
|
'flask': 'flask',
|
|
|
'flask_cors': 'flask-cors',
|
|
|
'requests': 'requests',
|
|
|
'pandas': 'pandas',
|
|
|
'PyPDF2': 'PyPDF2',
|
|
|
'docx': 'python-docx',
|
|
|
'pdfplumber': 'pdfplumber',
|
|
|
'fitz': 'PyMuPDF',
|
|
|
'cozepy': 'cozepy',
|
|
|
'lxml': 'lxml',
|
|
|
'bs4': 'beautifulsoup4'
|
|
|
}
|
|
|
|
|
|
missing_packages = []
|
|
|
for import_name, pip_name in packages_to_check.items():
|
|
|
try:
|
|
|
__import__(import_name)
|
|
|
except ImportError:
|
|
|
missing_packages.append(pip_name)
|
|
|
print(f"缺失的包: {pip_name} (导入名: {import_name})")
|
|
|
|
|
|
if missing_packages:
|
|
|
print(f"⚠️ 检测到缺失的包: {', '.join(missing_packages)}")
|
|
|
print("📦 从 requirements.txt 安装依赖...")
|
|
|
print(f"🌐 使用镜像: {mirror_url}")
|
|
|
|
|
|
try:
|
|
|
# 使用配置的镜像安装所有 requirements
|
|
|
subprocess.check_call([
|
|
|
sys.executable, '-m', 'pip', 'install',
|
|
|
'-i', mirror_url,
|
|
|
'-r', req_file
|
|
|
])
|
|
|
print("✅ 所有依赖安装成功!")
|
|
|
return True
|
|
|
except subprocess.CalledProcessError:
|
|
|
print(f"❌ 错误: 使用 {mirror_url} 安装依赖失败")
|
|
|
print("🔄 尝试使用默认 PyPI...")
|
|
|
try:
|
|
|
# 回退到默认 PyPI
|
|
|
subprocess.check_call([sys.executable, '-m', 'pip', 'install', '-r', req_file])
|
|
|
print("✅ 所有依赖安装成功!")
|
|
|
return True
|
|
|
except subprocess.CalledProcessError:
|
|
|
print("❌ 错误: 安装依赖失败。请手动运行:")
|
|
|
print(f" {sys.executable} -m pip install -i {mirror_url} -r requirements.txt")
|
|
|
return False
|
|
|
else:
|
|
|
print("✅ 所有必需的依赖已安装!")
|
|
|
return True
|
|
|
|
|
|
# 在导入前检查并安装依赖
|
|
|
# 每次导入模块时都会运行,但只在需要时安装
|
|
|
def check_and_install_dependencies():
|
|
|
"""检查并在需要时安装依赖"""
|
|
|
if not globals().get('_dependencies_checked'):
|
|
|
if install_requirements():
|
|
|
globals()['_dependencies_checked'] = True
|
|
|
return True
|
|
|
else:
|
|
|
print("\n请手动安装依赖并重试。")
|
|
|
return False
|
|
|
return True
|
|
|
|
|
|
# 导入此模块时始终运行依赖检查
|
|
|
print("🚀 初始化 BRAIN 表达式模板解码器...")
|
|
|
if not check_and_install_dependencies():
|
|
|
if __name__ == "__main__":
|
|
|
sys.exit(1)
|
|
|
else:
|
|
|
print("⚠️ 警告: 可能缺少某些依赖。请运行 'pip install -r requirements.txt'")
|
|
|
print("🔄 继续导入,但某些功能可能无法正常工作。")
|
|
|
|
|
|
# 现在导入包
|
|
|
try:
|
|
|
from flask import Flask, render_template, request, jsonify, session as flask_session
|
|
|
from flask_cors import CORS
|
|
|
import requests
|
|
|
import json
|
|
|
import time
|
|
|
import os
|
|
|
from datetime import datetime
|
|
|
print("📚 核心包导入成功!")
|
|
|
except ImportError as e:
|
|
|
print(f"❌ 导入核心包失败: {e}")
|
|
|
print("请运行: pip install -r requirements.txt")
|
|
|
if __name__ == "__main__":
|
|
|
sys.exit(1)
|
|
|
raise
|
|
|
|
|
|
app = Flask(__name__)
|
|
|
app.secret_key = 'brain_template_decoder_secret_key_change_in_production'
|
|
|
CORS(app)
|
|
|
|
|
|
print("🌐 Flask 应用程序已初始化,支持 CORS!")
|
|
|
|
|
|
# BRAIN API 配置
|
|
|
BRAIN_API_BASE = 'https://api.worldquantbrain.com'
|
|
|
|
|
|
# 存储 BRAIN 会话(在生产环境中,使用适当的会话管理如 Redis)
|
|
|
brain_sessions = {}
|
|
|
|
|
|
print("🧠 BRAIN API 集成已配置!")
|
|
|
|
|
|
def sign_in_to_brain(username, password):
|
|
|
"""使用重试逻辑和生物识别认证支持登录 BRAIN API"""
|
|
|
from urllib.parse import urljoin
|
|
|
|
|
|
session = requests.Session()
|
|
|
session.auth = (username, password)
|
|
|
|
|
|
retry_count = 0
|
|
|
max_retries = 3
|
|
|
|
|
|
while retry_count < max_retries:
|
|
|
try:
|
|
|
response = session.post(f'{BRAIN_API_BASE}/authentication')
|
|
|
|
|
|
# 检查是否需要生物识别认证
|
|
|
if response.status_code == 401:
|
|
|
# 检查是否为生物识别认证要求
|
|
|
if response.headers.get("WWW-Authenticate") == "persona":
|
|
|
# 获取 location 头
|
|
|
location = response.headers.get("Location")
|
|
|
if location:
|
|
|
# 构建生物识别认证的完整 URL
|
|
|
biometric_url = urljoin(response.url, location)
|
|
|
|
|
|
# 返回特殊响应,指示需要生物识别认证
|
|
|
return {
|
|
|
'requires_biometric': True,
|
|
|
'biometric_url': biometric_url,
|
|
|
'session': session,
|
|
|
'location': location
|
|
|
}
|
|
|
else:
|
|
|
raise Exception("需要生物识别认证但未提供 Location 头")
|
|
|
else:
|
|
|
# 常规认证失败
|
|
|
raise requests.HTTPError("认证失败: 用户名或密码无效")
|
|
|
|
|
|
# 如果执行到这里,认证成功
|
|
|
response.raise_for_status()
|
|
|
print("认证成功。")
|
|
|
return session
|
|
|
|
|
|
except requests.HTTPError as e:
|
|
|
if "Invalid username or password" in str(e):
|
|
|
raise # 对于无效凭证不重试
|
|
|
print(f"发生 HTTP 错误: {e}。正在重试...")
|
|
|
retry_count += 1
|
|
|
if retry_count >= max_retries:
|
|
|
raise
|
|
|
time.sleep(2)
|
|
|
except Exception as e:
|
|
|
print(f"认证期间发生错误: {e}")
|
|
|
retry_count += 1
|
|
|
if retry_count >= max_retries:
|
|
|
raise
|
|
|
time.sleep(2)
|
|
|
|
|
|
# 路由
|
|
|
@app.route('/')
|
|
|
def index():
|
|
|
"""主应用程序页面"""
|
|
|
return render_template('index.html')
|
|
|
|
|
|
@app.route('/simulator')
|
|
|
def simulator():
|
|
|
"""用户友好的模拟器界面"""
|
|
|
return render_template('simulator.html')
|
|
|
|
|
|
@app.route('/api/simulator/logs', methods=['GET'])
|
|
|
def get_simulator_logs():
|
|
|
"""获取模拟器目录中可用的日志文件"""
|
|
|
try:
|
|
|
import glob
|
|
|
import os
|
|
|
from datetime import datetime
|
|
|
|
|
|
# 在当前目录和模拟器目录中查找日志文件
|
|
|
script_dir = os.path.dirname(os.path.abspath(__file__))
|
|
|
simulator_dir = os.path.join(script_dir, 'simulator')
|
|
|
|
|
|
log_files = []
|
|
|
|
|
|
# 检查当前目录和模拟器目录
|
|
|
for directory in [script_dir, simulator_dir]:
|
|
|
if os.path.exists(directory):
|
|
|
pattern = os.path.join(directory, 'wqb*.log')
|
|
|
for log_file in glob.glob(pattern):
|
|
|
try:
|
|
|
stat = os.stat(log_file)
|
|
|
log_files.append({
|
|
|
'filename': os.path.basename(log_file),
|
|
|
'path': log_file,
|
|
|
'size': f"{stat.st_size / 1024:.1f} KB",
|
|
|
'modified': datetime.fromtimestamp(stat.st_mtime).strftime('%Y-%m-%d %H:%M:%S'),
|
|
|
'mtime': stat.st_mtime
|
|
|
})
|
|
|
except Exception as e:
|
|
|
print(f"读取日志文件 {log_file} 时出错: {e}")
|
|
|
|
|
|
# 按修改时间排序(最新的在前)
|
|
|
log_files.sort(key=lambda x: x['mtime'], reverse=True)
|
|
|
|
|
|
# 查找最新的日志文件
|
|
|
latest = log_files[0]['filename'] if log_files else None
|
|
|
|
|
|
return jsonify({
|
|
|
'logs': log_files,
|
|
|
'latest': latest,
|
|
|
'count': len(log_files)
|
|
|
})
|
|
|
|
|
|
except Exception as e:
|
|
|
return jsonify({'error': f'获取日志文件时出错: {str(e)}'}), 500
|
|
|
|
|
|
@app.route('/api/simulator/logs/<filename>', methods=['GET'])
|
|
|
def get_simulator_log_content(filename):
|
|
|
"""获取特定日志文件的内容"""
|
|
|
try:
|
|
|
import os
|
|
|
|
|
|
# 安全性: 只允许使用安全名称的日志文件
|
|
|
if not filename.startswith('wqb') or not filename.endswith('.log'):
|
|
|
return jsonify({'error': '无效的日志文件名'}), 400
|
|
|
|
|
|
script_dir = os.path.dirname(os.path.abspath(__file__))
|
|
|
simulator_dir = os.path.join(script_dir, 'simulator')
|
|
|
|
|
|
# 在两个目录中查找文件
|
|
|
log_path = None
|
|
|
for directory in [script_dir, simulator_dir]:
|
|
|
potential_path = os.path.join(directory, filename)
|
|
|
if os.path.exists(potential_path):
|
|
|
log_path = potential_path
|
|
|
break
|
|
|
|
|
|
if not log_path:
|
|
|
return jsonify({'error': '未找到日志文件'}), 404
|
|
|
|
|
|
# 使用多种编码尝试读取文件内容
|
|
|
content = None
|
|
|
encodings_to_try = ['utf-8', 'gbk', 'gb2312', 'big5', 'latin-1', 'cp1252']
|
|
|
|
|
|
for encoding in encodings_to_try:
|
|
|
try:
|
|
|
with open(log_path, 'r', encoding=encoding) as f:
|
|
|
content = f.read()
|
|
|
print(f"使用 {encoding} 编码成功读取日志文件")
|
|
|
break
|
|
|
except UnicodeDecodeError:
|
|
|
continue
|
|
|
except Exception as e:
|
|
|
print(f"使用 {encoding} 读取时出错: {e}")
|
|
|
continue
|
|
|
|
|
|
if content is None:
|
|
|
# 最后手段: 以二进制读取并使用错误处理解码
|
|
|
try:
|
|
|
with open(log_path, 'rb') as f:
|
|
|
raw_content = f.read()
|
|
|
content = raw_content.decode('utf-8', errors='replace')
|
|
|
print("使用 UTF-8 带错误替换读取日志内容")
|
|
|
except Exception as e:
|
|
|
content = f"错误: 无法解码文件内容 - {str(e)}"
|
|
|
|
|
|
response = jsonify({
|
|
|
'content': content,
|
|
|
'filename': filename,
|
|
|
'size': len(content)
|
|
|
})
|
|
|
response.headers['Content-Type'] = 'application/json; charset=utf-8'
|
|
|
return response
|
|
|
|
|
|
except Exception as e:
|
|
|
return jsonify({'error': f'读取日志文件时出错: {str(e)}'}), 500
|
|
|
|
|
|
@app.route('/api/simulator/test-connection', methods=['POST'])
|
|
|
def test_simulator_connection():
|
|
|
"""测试模拟器的 BRAIN API 连接"""
|
|
|
try:
|
|
|
data = request.get_json()
|
|
|
username = data.get('username')
|
|
|
password = data.get('password')
|
|
|
|
|
|
if not username or not password:
|
|
|
return jsonify({'error': '需要用户名和密码'}), 400
|
|
|
|
|
|
# 使用现有的 sign_in_to_brain 函数测试连接
|
|
|
result = sign_in_to_brain(username, password)
|
|
|
|
|
|
# 处理生物识别认证要求
|
|
|
if isinstance(result, dict) and result.get('requires_biometric'):
|
|
|
return jsonify({
|
|
|
'success': False,
|
|
|
'error': '需要生物识别认证。请先使用主界面完成认证。',
|
|
|
'requires_biometric': True
|
|
|
})
|
|
|
|
|
|
# 测试简单的 API 调用以验证连接
|
|
|
brain_session = result
|
|
|
response = brain_session.get(f'{BRAIN_API_BASE}/data-fields/open')
|
|
|
|
|
|
if response.ok:
|
|
|
return jsonify({
|
|
|
'success': True,
|
|
|
'message': '连接成功'
|
|
|
})
|
|
|
else:
|
|
|
return jsonify({
|
|
|
'success': False,
|
|
|
'error': f'API 测试失败: {response.status_code}'
|
|
|
})
|
|
|
|
|
|
except Exception as e:
|
|
|
return jsonify({
|
|
|
'success': False,
|
|
|
'error': f'连接失败: {str(e)}'
|
|
|
})
|
|
|
|
|
|
@app.route('/api/simulator/run', methods=['POST'])
|
|
|
def run_simulator_with_params():
|
|
|
"""在新终端中使用用户提供的参数运行模拟器"""
|
|
|
try:
|
|
|
import subprocess
|
|
|
import threading
|
|
|
import json
|
|
|
import os
|
|
|
import tempfile
|
|
|
import sys
|
|
|
import time
|
|
|
|
|
|
# 获取表单数据
|
|
|
json_file = request.files.get('jsonFile')
|
|
|
username = request.form.get('username')
|
|
|
password = request.form.get('password')
|
|
|
start_position = int(request.form.get('startPosition', 0))
|
|
|
concurrent_count = int(request.form.get('concurrentCount', 3))
|
|
|
random_shuffle = request.form.get('randomShuffle') == 'true'
|
|
|
use_multi_sim = request.form.get('useMultiSim') == 'true'
|
|
|
alpha_count_per_slot = int(request.form.get('alphaCountPerSlot', 3))
|
|
|
|
|
|
if not json_file or not username or not password:
|
|
|
return jsonify({'error': '缺少必需参数'}), 400
|
|
|
|
|
|
# 验证并读取 JSON 文件
|
|
|
try:
|
|
|
json_content = json_file.read().decode('utf-8')
|
|
|
expressions_data = json.loads(json_content)
|
|
|
if not isinstance(expressions_data, list):
|
|
|
return jsonify({'error': 'JSON 文件必须包含表达式数组'}), 400
|
|
|
except Exception as e:
|
|
|
return jsonify({'error': f'无效的 JSON 文件: {str(e)}'}), 400
|
|
|
|
|
|
# 获取路径
|
|
|
script_dir = os.path.dirname(os.path.abspath(__file__))
|
|
|
simulator_dir = os.path.join(script_dir, 'simulator')
|
|
|
|
|
|
# 为自动化运行创建临时文件
|
|
|
temp_json_path = os.path.join(simulator_dir, f'temp_expressions_{int(time.time())}.json')
|
|
|
temp_script_path = os.path.join(simulator_dir, f'temp_automated_{int(time.time())}.py')
|
|
|
temp_batch_path = os.path.join(simulator_dir, f'temp_run_{int(time.time())}.bat')
|
|
|
|
|
|
try:
|
|
|
# 将 JSON 数据保存到临时文件
|
|
|
with open(temp_json_path, 'w', encoding='utf-8') as f:
|
|
|
json.dump(expressions_data, f, ensure_ascii=False, indent=2)
|
|
|
|
|
|
# 创建调用 automated_main 的自动化脚本
|
|
|
script_content = f'''
|
|
|
import asyncio
|
|
|
import sys
|
|
|
import os
|
|
|
import json
|
|
|
|
|
|
# 将当前目录添加到路径
|
|
|
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
|
|
|
|
|
import simulator_wqb
|
|
|
|
|
|
async def run_automated():
|
|
|
"""使用 Web 界面的参数运行自动化模拟器"""
|
|
|
try:
|
|
|
# 加载 JSON 数据
|
|
|
with open(r"{temp_json_path}", 'r', encoding='utf-8') as f:
|
|
|
json_content = f.read()
|
|
|
|
|
|
# 使用参数调用 automated_main
|
|
|
result = await simulator_wqb.automated_main(
|
|
|
json_file_content=json_content,
|
|
|
username="{username}",
|
|
|
password="{password}",
|
|
|
start_position={start_position},
|
|
|
concurrent_count={concurrent_count},
|
|
|
random_shuffle={random_shuffle},
|
|
|
use_multi_sim={use_multi_sim},
|
|
|
alpha_count_per_slot={alpha_count_per_slot}
|
|
|
)
|
|
|
|
|
|
if result['success']:
|
|
|
print("\\n" + "="*60)
|
|
|
print("🎉 Web 界面自动化成功 🎉")
|
|
|
print("="*60)
|
|
|
print(f"✅ 总模拟次数: {{result['results']['total']}}")
|
|
|
print(f"✅ 成功: {{result['results']['successful']}}")
|
|
|
print(f"❌ 失败: {{result['results']['failed']}}")
|
|
|
if result['results']['alphaIds']:
|
|
|
print(f"📊 生成 {{len(result['results']['alphaIds'])}} 个 Alpha ID")
|
|
|
print("="*60)
|
|
|
else:
|
|
|
print("\\n" + "="*60)
|
|
|
print("❌ Web 界面自动化失败")
|
|
|
print("="*60)
|
|
|
print(f"错误: {{result['error']}}")
|
|
|
print("="*60)
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f"\\n❌ 脚本执行错误: {{e}}")
|
|
|
|
|
|
finally:
|
|
|
# 清理临时文件
|
|
|
try:
|
|
|
if os.path.exists(r"{temp_json_path}"):
|
|
|
os.remove(r"{temp_json_path}")
|
|
|
if os.path.exists(r"{temp_script_path}"):
|
|
|
os.remove(r"{temp_script_path}")
|
|
|
if os.path.exists(r"{temp_batch_path}"):
|
|
|
os.remove(r"{temp_batch_path}")
|
|
|
except:
|
|
|
pass
|
|
|
|
|
|
print("\\n🔄 按任意键关闭此窗口...")
|
|
|
input()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
asyncio.run(run_automated())
|
|
|
'''
|
|
|
|
|
|
# 保存脚本
|
|
|
with open(temp_script_path, 'w', encoding='utf-8') as f:
|
|
|
f.write(script_content)
|
|
|
|
|
|
# 为 Windows 创建批处理文件
|
|
|
batch_content = f'''@echo off
|
|
|
cd /d "{simulator_dir}"
|
|
|
python "{os.path.basename(temp_script_path)}"
|
|
|
'''
|
|
|
with open(temp_batch_path, 'w', encoding='utf-8') as f:
|
|
|
f.write(batch_content)
|
|
|
|
|
|
# 在新终端中启动
|
|
|
def launch_simulator():
|
|
|
try:
|
|
|
if os.name == 'nt': # Windows
|
|
|
# 使用批处理文件避免路径问题
|
|
|
subprocess.Popen([
|
|
|
temp_batch_path
|
|
|
], creationflags=subprocess.CREATE_NEW_CONSOLE)
|
|
|
else: # Unix 类系统
|
|
|
# 尝试不同的终端模拟器
|
|
|
terminals = ['gnome-terminal', 'xterm', 'konsole', 'terminal']
|
|
|
for terminal in terminals:
|
|
|
try:
|
|
|
if terminal == 'gnome-terminal':
|
|
|
subprocess.Popen([
|
|
|
terminal, '--working-directory', simulator_dir,
|
|
|
'--', 'python3', os.path.basename(temp_script_path)
|
|
|
])
|
|
|
else:
|
|
|
subprocess.Popen([
|
|
|
terminal, '-e',
|
|
|
f'cd "{simulator_dir}" && python3 "{os.path.basename(temp_script_path)}"'
|
|
|
])
|
|
|
break
|
|
|
except FileNotFoundError:
|
|
|
continue
|
|
|
else:
|
|
|
# 回退: 如果找不到终端则在后台运行
|
|
|
subprocess.Popen([
|
|
|
sys.executable, temp_script_path
|
|
|
], cwd=simulator_dir)
|
|
|
except Exception as e:
|
|
|
print(f"启动模拟器时出错: {e}")
|
|
|
|
|
|
# 在单独的线程中启动模拟器
|
|
|
thread = threading.Thread(target=launch_simulator)
|
|
|
thread.daemon = True
|
|
|
thread.start()
|
|
|
|
|
|
return jsonify({
|
|
|
'success': True,
|
|
|
'message': '模拟器已在新终端窗口中启动',
|
|
|
'parameters': {
|
|
|
'expressions_count': len(expressions_data),
|
|
|
'concurrent_count': concurrent_count,
|
|
|
'use_multi_sim': use_multi_sim,
|
|
|
'alpha_count_per_slot': alpha_count_per_slot if use_multi_sim else None
|
|
|
}
|
|
|
})
|
|
|
|
|
|
except Exception as e:
|
|
|
# 出错时清理
|
|
|
try:
|
|
|
if os.path.exists(temp_json_path):
|
|
|
os.remove(temp_json_path)
|
|
|
if os.path.exists(temp_script_path):
|
|
|
os.remove(temp_script_path)
|
|
|
if os.path.exists(temp_batch_path):
|
|
|
os.remove(temp_batch_path)
|
|
|
except:
|
|
|
pass
|
|
|
raise e
|
|
|
|
|
|
except Exception as e:
|
|
|
return jsonify({'error': f'运行模拟器失败: {str(e)}'}), 500
|
|
|
|
|
|
@app.route('/api/simulator/stop', methods=['POST'])
|
|
|
def stop_simulator():
|
|
|
"""停止运行的模拟器"""
|
|
|
try:
|
|
|
# 这是一个占位符 - 在生产环境中,您需要
|
|
|
# 实现适当的进程管理来停止运行的模拟
|
|
|
return jsonify({
|
|
|
'success': True,
|
|
|
'message': '停止信号已发送'
|
|
|
})
|
|
|
except Exception as e:
|
|
|
return jsonify({'error': f'停止模拟器失败: {str(e)}'}), 500
|
|
|
|
|
|
@app.route('/api/authenticate', methods=['POST'])
|
|
|
def authenticate():
|
|
|
"""使用 BRAIN API 进行认证"""
|
|
|
try:
|
|
|
data = request.get_json()
|
|
|
username = data.get('username')
|
|
|
password = data.get('password')
|
|
|
|
|
|
if not username or not password:
|
|
|
return jsonify({'error': '需要用户名和密码'}), 400
|
|
|
|
|
|
# 使用 BRAIN 进行认证
|
|
|
result = sign_in_to_brain(username, password)
|
|
|
|
|
|
# 检查是否需要生物识别认证
|
|
|
if isinstance(result, dict) and result.get('requires_biometric'):
|
|
|
# 临时存储具有生物识别待处理状态的会话
|
|
|
session_id = f"{username}_{int(time.time())}_biometric_pending"
|
|
|
brain_sessions[session_id] = {
|
|
|
'session': result['session'],
|
|
|
'username': username,
|
|
|
'timestamp': time.time(),
|
|
|
'biometric_pending': True,
|
|
|
'biometric_location': result['location']
|
|
|
}
|
|
|
|
|
|
# 在 Flask 会话中存储会话 ID
|
|
|
flask_session['brain_session_id'] = session_id
|
|
|
|
|
|
return jsonify({
|
|
|
'success': False,
|
|
|
'requires_biometric': True,
|
|
|
'biometric_url': result['biometric_url'],
|
|
|
'session_id': session_id,
|
|
|
'message': '请通过访问提供的 URL 完成生物识别认证'
|
|
|
})
|
|
|
|
|
|
# 常规成功认证
|
|
|
brain_session = result
|
|
|
|
|
|
# 存储会话
|
|
|
session_id = f"{username}_{int(time.time())}"
|
|
|
brain_sessions[session_id] = {
|
|
|
'session': brain_session,
|
|
|
'username': username,
|
|
|
'timestamp': time.time()
|
|
|
}
|
|
|
|
|
|
# 在 Flask 会话中存储会话 ID
|
|
|
flask_session['brain_session_id'] = session_id
|
|
|
|
|
|
return jsonify({
|
|
|
'success': True,
|
|
|
'session_id': session_id,
|
|
|
'message': '认证成功'
|
|
|
})
|
|
|
|
|
|
except requests.HTTPError as e:
|
|
|
if e.response.status_code == 401:
|
|
|
return jsonify({'error': '用户名或密码无效'}), 401
|
|
|
else:
|
|
|
return jsonify({'error': f'认证失败: {str(e)}'}), 500
|
|
|
except Exception as e:
|
|
|
return jsonify({'error': f'认证错误: {str(e)}'}), 500
|
|
|
|
|
|
@app.route('/api/complete-biometric', methods=['POST'])
|
|
|
def complete_biometric():
|
|
|
"""在用户在浏览器中完成生物识别认证后完成认证"""
|
|
|
try:
|
|
|
from urllib.parse import urljoin
|
|
|
|
|
|
session_id = request.headers.get('Session-ID') or flask_session.get('brain_session_id')
|
|
|
if not session_id or session_id not in brain_sessions:
|
|
|
return jsonify({'error': '无效或过期的会话'}), 401
|
|
|
|
|
|
session_info = brain_sessions[session_id]
|
|
|
|
|
|
# 检查此会话是否正在等待生物识别完成
|
|
|
if not session_info.get('biometric_pending'):
|
|
|
return jsonify({'error': '会话不处于生物识别认证待处理状态'}), 400
|
|
|
|
|
|
brain_session = session_info['session']
|
|
|
location = session_info['biometric_location']
|
|
|
|
|
|
# 完成生物识别认证
|
|
|
try:
|
|
|
# 向 location 发出后续 POST 请求
|
|
|
complete_url = urljoin(f'{BRAIN_API_BASE}/authentication', location)
|
|
|
response = brain_session.post(complete_url)
|
|
|
response.raise_for_status()
|
|
|
|
|
|
# 更新会话信息 - 移除生物识别待处理状态
|
|
|
session_info['biometric_pending'] = False
|
|
|
del session_info['biometric_location']
|
|
|
|
|
|
# 创建没有 biometric_pending 后缀的新会话 ID
|
|
|
new_session_id = f"{session_info['username']}_{int(time.time())}"
|
|
|
brain_sessions[new_session_id] = {
|
|
|
'session': brain_session,
|
|
|
'username': session_info['username'],
|
|
|
'timestamp': time.time()
|
|
|
}
|
|
|
|
|
|
# 删除旧会话
|
|
|
del brain_sessions[session_id]
|
|
|
|
|
|
# 更新 Flask 会话
|
|
|
flask_session['brain_session_id'] = new_session_id
|
|
|
|
|
|
return jsonify({
|
|
|
'success': True,
|
|
|
'session_id': new_session_id,
|
|
|
'message': '生物识别认证成功完成'
|
|
|
})
|
|
|
|
|
|
except requests.HTTPError as e:
|
|
|
return jsonify({
|
|
|
'error': f'完成生物识别认证失败: {str(e)}'
|
|
|
}), 500
|
|
|
|
|
|
except Exception as e:
|
|
|
return jsonify({'error': f'完成生物识别认证时出错: {str(e)}'}), 500
|
|
|
|
|
|
@app.route('/api/operators', methods=['GET'])
|
|
|
def get_operators():
|
|
|
"""从 BRAIN API 获取用户运算符"""
|
|
|
try:
|
|
|
session_id = request.headers.get('Session-ID') or flask_session.get('brain_session_id')
|
|
|
if not session_id or session_id not in brain_sessions:
|
|
|
return jsonify({'error': '无效或过期的会话'}), 401
|
|
|
|
|
|
session_info = brain_sessions[session_id]
|
|
|
brain_session = session_info['session']
|
|
|
|
|
|
# 首先尝试不使用分页参数(大多数 API 一次返回所有运算符)
|
|
|
try:
|
|
|
response = brain_session.get(f'{BRAIN_API_BASE}/operators')
|
|
|
response.raise_for_status()
|
|
|
|
|
|
data = response.json()
|
|
|
|
|
|
# 如果是列表,我们获得了所有运算符
|
|
|
if isinstance(data, list):
|
|
|
all_operators = data
|
|
|
print(f"从 BRAIN API 获取了 {len(all_operators)} 个运算符(直接)")
|
|
|
# 如果是包含 results 的字典,处理分页
|
|
|
elif isinstance(data, dict) and 'results' in data:
|
|
|
all_operators = []
|
|
|
total_count = data.get('count', len(data['results']))
|
|
|
print(f"找到 {total_count} 个总运算符,正在获取全部...")
|
|
|
|
|
|
# 获取第一批
|
|
|
all_operators.extend(data['results'])
|
|
|
|
|
|
# 如果需要,获取剩余批次
|
|
|
limit = 100
|
|
|
offset = len(data['results'])
|
|
|
|
|
|
while len(all_operators) < total_count:
|
|
|
params = {'limit': limit, 'offset': offset}
|
|
|
batch_response = brain_session.get(f'{BRAIN_API_BASE}/operators', params=params)
|
|
|
batch_response.raise_for_status()
|
|
|
batch_data = batch_response.json()
|
|
|
|
|
|
if isinstance(batch_data, dict) and 'results' in batch_data:
|
|
|
batch_operators = batch_data['results']
|
|
|
if not batch_operators: # 没有更多数据
|
|
|
break
|
|
|
all_operators.extend(batch_operators)
|
|
|
offset += len(batch_operators)
|
|
|
else:
|
|
|
break
|
|
|
|
|
|
print(f"从 BRAIN API 获取了 {len(all_operators)} 个运算符(分页)")
|
|
|
else:
|
|
|
# 未知格式,视为空
|
|
|
all_operators = []
|
|
|
print("运算符 API 的响应格式未知")
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f"获取运算符时出错: {str(e)}")
|
|
|
# 回退: 尝试显式分页
|
|
|
all_operators = []
|
|
|
limit = 100
|
|
|
offset = 0
|
|
|
|
|
|
while True:
|
|
|
params = {'limit': limit, 'offset': offset}
|
|
|
response = brain_session.get(f'{BRAIN_API_BASE}/operators', params=params)
|
|
|
response.raise_for_status()
|
|
|
|
|
|
data = response.json()
|
|
|
if isinstance(data, list):
|
|
|
all_operators.extend(data)
|
|
|
if len(data) < limit:
|
|
|
break
|
|
|
elif isinstance(data, dict) and 'results' in data:
|
|
|
batch_operators = data['results']
|
|
|
all_operators.extend(batch_operators)
|
|
|
if len(batch_operators) < limit:
|
|
|
break
|
|
|
else:
|
|
|
break
|
|
|
|
|
|
offset += limit
|
|
|
|
|
|
print(f"从 BRAIN API 获取了 {len(all_operators)} 个运算符(回退)")
|
|
|
|
|
|
# 提取名称、类别、描述、定义和其他字段(如果可用)
|
|
|
filtered_operators = []
|
|
|
for op in all_operators:
|
|
|
operator_data = {
|
|
|
'name': op['name'],
|
|
|
'category': op['category']
|
|
|
}
|
|
|
# 如果可用则包含描述
|
|
|
if 'description' in op and op['description']:
|
|
|
operator_data['description'] = op['description']
|
|
|
# 如果可用则包含定义
|
|
|
if 'definition' in op and op['definition']:
|
|
|
operator_data['definition'] = op['definition']
|
|
|
# 如果可用则包含使用计数
|
|
|
if 'usageCount' in op:
|
|
|
operator_data['usageCount'] = op['usageCount']
|
|
|
# 如果可用则包含其他有用字段
|
|
|
if 'example' in op and op['example']:
|
|
|
operator_data['example'] = op['example']
|
|
|
filtered_operators.append(operator_data)
|
|
|
|
|
|
return jsonify(filtered_operators)
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f"获取运算符时出错: {str(e)}")
|
|
|
return jsonify({'error': f'获取运算符失败: {str(e)}'}), 500
|
|
|
|
|
|
@app.route('/api/datafields', methods=['GET'])
|
|
|
def get_datafields():
|
|
|
"""从 BRAIN API 获取数据字段"""
|
|
|
try:
|
|
|
session_id = request.headers.get('Session-ID') or flask_session.get('brain_session_id')
|
|
|
if not session_id or session_id not in brain_sessions:
|
|
|
return jsonify({'error': '无效或过期的会话'}), 401
|
|
|
|
|
|
session_info = brain_sessions[session_id]
|
|
|
brain_session = session_info['session']
|
|
|
|
|
|
# 获取参数
|
|
|
region = request.args.get('region', 'USA')
|
|
|
delay = request.args.get('delay', '1')
|
|
|
universe = request.args.get('universe', 'TOP3000')
|
|
|
dataset_id = request.args.get('dataset_id', 'fundamental6')
|
|
|
search = ''
|
|
|
|
|
|
# 基于笔记本实现构建 URL 模板
|
|
|
if len(search) == 0:
|
|
|
url_template = f"{BRAIN_API_BASE}/data-fields?" + \
|
|
|
f"&instrumentType=EQUITY" + \
|
|
|
f"®ion={region}&delay={delay}&universe={universe}&dataset.id={dataset_id}&limit=50" + \
|
|
|
"&offset={x}"
|
|
|
# 从第一个请求获取计数
|
|
|
first_response = brain_session.get(url_template.format(x=0))
|
|
|
first_response.raise_for_status()
|
|
|
count = first_response.json()['count']
|
|
|
else:
|
|
|
url_template = f"{BRAIN_API_BASE}/data-fields?" + \
|
|
|
f"&instrumentType=EQUITY" + \
|
|
|
f"®ion={region}&delay={delay}&universe={universe}&limit=50" + \
|
|
|
f"&search={search}" + \
|
|
|
"&offset={x}"
|
|
|
count = 100 # 搜索查询的默认值
|
|
|
|
|
|
# 分批获取所有数据字段
|
|
|
datafields_list = []
|
|
|
for x in range(0, count, 50):
|
|
|
response = brain_session.get(url_template.format(x=x))
|
|
|
response.raise_for_status()
|
|
|
datafields_list.append(response.json()['results'])
|
|
|
|
|
|
# 展平列表
|
|
|
datafields_list_flat = [item for sublist in datafields_list for item in sublist]
|
|
|
|
|
|
# 过滤字段,只包含必要信息
|
|
|
filtered_fields = [
|
|
|
{
|
|
|
'id': field['id'],
|
|
|
'description': field['description'],
|
|
|
'type': field['type'],
|
|
|
'coverage': field.get('coverage', 0),
|
|
|
'userCount': field.get('userCount', 0),
|
|
|
'alphaCount': field.get('alphaCount', 0)
|
|
|
}
|
|
|
for field in datafields_list_flat
|
|
|
]
|
|
|
|
|
|
return jsonify(filtered_fields)
|
|
|
|
|
|
except Exception as e:
|
|
|
return jsonify({'error': f'获取数据字段失败: {str(e)}'}), 500
|
|
|
|
|
|
@app.route('/api/dataset-description', methods=['GET'])
|
|
|
def get_dataset_description():
|
|
|
"""从 BRAIN API 获取数据集描述"""
|
|
|
try:
|
|
|
session_id = request.headers.get('Session-ID') or flask_session.get('brain_session_id')
|
|
|
if not session_id or session_id not in brain_sessions:
|
|
|
return jsonify({'error': '无效或过期的会话'}), 401
|
|
|
|
|
|
session_info = brain_sessions[session_id]
|
|
|
brain_session = session_info['session']
|
|
|
|
|
|
# 获取参数
|
|
|
region = request.args.get('region', 'USA')
|
|
|
delay = request.args.get('delay', '1')
|
|
|
universe = request.args.get('universe', 'TOP3000')
|
|
|
dataset_id = request.args.get('dataset_id', 'analyst10')
|
|
|
|
|
|
# 构建数据集描述的 URL
|
|
|
url = f"{BRAIN_API_BASE}/data-sets/{dataset_id}?" + \
|
|
|
f"instrumentType=EQUITY®ion={region}&delay={delay}&universe={universe}"
|
|
|
|
|
|
print(f"从以下位置获取数据集描述: {url}")
|
|
|
|
|
|
# 向 BRAIN API 发出请求
|
|
|
response = brain_session.get(url)
|
|
|
response.raise_for_status()
|
|
|
|
|
|
data = response.json()
|
|
|
description = data.get('description', '无描述可用')
|
|
|
|
|
|
print(f"数据集描述已检索: {description[:100]}...")
|
|
|
|
|
|
return jsonify({
|
|
|
'success': True,
|
|
|
'description': description,
|
|
|
'dataset_id': dataset_id
|
|
|
})
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f"数据集描述错误: {str(e)}")
|
|
|
return jsonify({'error': f'获取数据集描述失败: {str(e)}'}), 500
|
|
|
|
|
|
@app.route('/api/status', methods=['GET'])
|
|
|
def check_status():
|
|
|
"""检查会话是否仍然有效"""
|
|
|
try:
|
|
|
session_id = request.headers.get('Session-ID') or flask_session.get('brain_session_id')
|
|
|
if not session_id or session_id not in brain_sessions:
|
|
|
return jsonify({'valid': False})
|
|
|
|
|
|
session_info = brain_sessions[session_id]
|
|
|
# 检查会话是否不太旧(24 小时)
|
|
|
if time.time() - session_info['timestamp'] > 86400:
|
|
|
del brain_sessions[session_id]
|
|
|
return jsonify({'valid': False})
|
|
|
|
|
|
# 检查生物识别认证是否待处理
|
|
|
if session_info.get('biometric_pending'):
|
|
|
return jsonify({
|
|
|
'valid': False,
|
|
|
'biometric_pending': True,
|
|
|
'username': session_info['username'],
|
|
|
'message': '生物识别认证待处理'
|
|
|
})
|
|
|
|
|
|
return jsonify({
|
|
|
'valid': True,
|
|
|
'username': session_info['username']
|
|
|
})
|
|
|
|
|
|
except Exception as e:
|
|
|
return jsonify({'error': f'状态检查失败: {str(e)}'}), 500
|
|
|
|
|
|
@app.route('/api/logout', methods=['POST'])
|
|
|
def logout():
|
|
|
"""注销并清理会话"""
|
|
|
try:
|
|
|
session_id = request.headers.get('Session-ID') or flask_session.get('brain_session_id')
|
|
|
if session_id and session_id in brain_sessions:
|
|
|
del brain_sessions[session_id]
|
|
|
|
|
|
if 'brain_session_id' in flask_session:
|
|
|
flask_session.pop('brain_session_id')
|
|
|
|
|
|
return jsonify({'success': True, 'message': '注销成功'})
|
|
|
|
|
|
except Exception as e:
|
|
|
return jsonify({'error': f'注销失败: {str(e)}'}), 500
|
|
|
|
|
|
@app.route('/api/test-expression', methods=['POST'])
|
|
|
def test_expression():
|
|
|
"""使用 BRAIN API 模拟测试表达式"""
|
|
|
try:
|
|
|
session_id = request.headers.get('Session-ID') or flask_session.get('brain_session_id')
|
|
|
if not session_id or session_id not in brain_sessions:
|
|
|
return jsonify({'error': '无效或过期的会话'}), 401
|
|
|
|
|
|
session_info = brain_sessions[session_id]
|
|
|
brain_session = session_info['session']
|
|
|
|
|
|
# 从请求获取模拟数据
|
|
|
simulation_data = request.get_json()
|
|
|
|
|
|
# 确保必需字段存在
|
|
|
if 'type' not in simulation_data:
|
|
|
simulation_data['type'] = 'REGULAR'
|
|
|
|
|
|
# 确保设置具有必需字段
|
|
|
if 'settings' not in simulation_data:
|
|
|
simulation_data['settings'] = {}
|
|
|
|
|
|
# 为缺失的设置设置默认值
|
|
|
default_settings = {
|
|
|
'instrumentType': 'EQUITY',
|
|
|
'region': 'USA',
|
|
|
'universe': 'TOP3000',
|
|
|
'delay': 1,
|
|
|
'decay': 15,
|
|
|
'neutralization': 'SUBINDUSTRY',
|
|
|
'truncation': 0.08,
|
|
|
'pasteurization': 'ON',
|
|
|
'testPeriod': 'P1Y6M',
|
|
|
'unitHandling': 'VERIFY',
|
|
|
'nanHandling': 'OFF',
|
|
|
'language': 'FASTEXPR',
|
|
|
'visualization': False
|
|
|
}
|
|
|
|
|
|
for key, value in default_settings.items():
|
|
|
if key not in simulation_data['settings']:
|
|
|
simulation_data['settings'][key] = value
|
|
|
|
|
|
# 将字符串布尔值转换为实际布尔值
|
|
|
if isinstance(simulation_data['settings'].get('visualization'), str):
|
|
|
viz_value = simulation_data['settings']['visualization'].lower()
|
|
|
simulation_data['settings']['visualization'] = viz_value == 'true'
|
|
|
|
|
|
# 发送模拟请求(遵循笔记本模式)
|
|
|
try:
|
|
|
message = {}
|
|
|
simulation_response = brain_session.post(f'{BRAIN_API_BASE}/simulations', json=simulation_data)
|
|
|
|
|
|
# 检查是否收到 Location 头(遵循笔记本模式)
|
|
|
if 'Location' in simulation_response.headers:
|
|
|
# 跟随 location 获取实际状态
|
|
|
message = brain_session.get(simulation_response.headers['Location']).json()
|
|
|
|
|
|
# 检查模拟是否正在运行或已完成
|
|
|
if 'progress' in message.keys():
|
|
|
info_to_print = "模拟正在运行"
|
|
|
return jsonify({
|
|
|
'success': True,
|
|
|
'status': 'RUNNING',
|
|
|
'message': info_to_print,
|
|
|
'full_response': message
|
|
|
})
|
|
|
else:
|
|
|
# 返回完整消息,如笔记本中所示
|
|
|
return jsonify({
|
|
|
'success': message.get('status') != 'ERROR',
|
|
|
'status': message.get('status', 'UNKNOWN'),
|
|
|
'message': str(message),
|
|
|
'full_response': message
|
|
|
})
|
|
|
else:
|
|
|
# 尝试从响应体获取错误(遵循笔记本模式)
|
|
|
try:
|
|
|
message = simulation_response.json()
|
|
|
return jsonify({
|
|
|
'success': False,
|
|
|
'status': 'ERROR',
|
|
|
'message': str(message),
|
|
|
'full_response': message
|
|
|
})
|
|
|
except:
|
|
|
return jsonify({
|
|
|
'success': False,
|
|
|
'status': 'ERROR',
|
|
|
'message': '网络连接错误',
|
|
|
'full_response': {}
|
|
|
})
|
|
|
|
|
|
except Exception as e:
|
|
|
return jsonify({
|
|
|
'success': False,
|
|
|
'status': 'ERROR',
|
|
|
'message': '网络连接错误',
|
|
|
'full_response': {'error': str(e)}
|
|
|
})
|
|
|
|
|
|
except Exception as e:
|
|
|
import traceback
|
|
|
return jsonify({
|
|
|
'success': False,
|
|
|
'status': 'ERROR',
|
|
|
'message': f'测试表达式失败: {str(e)}',
|
|
|
'full_response': {'error': str(e), 'traceback': traceback.format_exc()}
|
|
|
}), 500
|
|
|
|
|
|
@app.route('/api/test-operators', methods=['GET'])
|
|
|
def test_operators():
|
|
|
"""测试端点以检查原始 BRAIN 运算符 API 响应"""
|
|
|
try:
|
|
|
session_id = request.headers.get('Session-ID') or flask_session.get('brain_session_id')
|
|
|
if not session_id or session_id not in brain_sessions:
|
|
|
return jsonify({'error': '无效或过期的会话'}), 401
|
|
|
|
|
|
session_info = brain_sessions[session_id]
|
|
|
brain_session = session_info['session']
|
|
|
|
|
|
# 从 BRAIN API 获取原始响应
|
|
|
response = brain_session.get(f'{BRAIN_API_BASE}/operators')
|
|
|
response.raise_for_status()
|
|
|
|
|
|
data = response.json()
|
|
|
|
|
|
# 返回原始响应信息用于调试
|
|
|
result = {
|
|
|
'type': str(type(data)),
|
|
|
'is_list': isinstance(data, list),
|
|
|
'is_dict': isinstance(data, dict),
|
|
|
'length': len(data) if isinstance(data, list) else None,
|
|
|
'keys': list(data.keys()) if isinstance(data, dict) else None,
|
|
|
'count_key': data.get('count') if isinstance(data, dict) else None,
|
|
|
'first_few_items': data[:3] if isinstance(data, list) else (data.get('results', [])[:3] if isinstance(data, dict) else None)
|
|
|
}
|
|
|
|
|
|
return jsonify(result)
|
|
|
|
|
|
except Exception as e:
|
|
|
return jsonify({'error': f'测试失败: {str(e)}'}), 500
|
|
|
|
|
|
# 导入蓝图
|
|
|
try:
|
|
|
from blueprints import idea_house_bp, paper_analysis_bp, feature_engineering_bp
|
|
|
print("📦 蓝图导入成功!")
|
|
|
except ImportError as e:
|
|
|
print(f"❌ 导入蓝图失败: {e}")
|
|
|
print("某些功能可能不可用。")
|
|
|
|
|
|
# 注册蓝图
|
|
|
app.register_blueprint(idea_house_bp, url_prefix='/idea-house')
|
|
|
app.register_blueprint(paper_analysis_bp, url_prefix='/paper-analysis')
|
|
|
app.register_blueprint(feature_engineering_bp, url_prefix='/feature-engineering')
|
|
|
|
|
|
print("🔧 所有蓝图注册成功!")
|
|
|
print(" - idea库: /idea-house")
|
|
|
print(" - 论文分析: /paper-analysis")
|
|
|
print(" - 特征工程: /feature-engineering")
|
|
|
|
|
|
# 模板管理路由
|
|
|
# 获取此脚本所在的目录以获取模板
|
|
|
script_dir = os.path.dirname(os.path.abspath(__file__))
|
|
|
TEMPLATES_DIR = os.path.join(script_dir, 'custom_templates')
|
|
|
|
|
|
# 确保模板目录存在
|
|
|
if not os.path.exists(TEMPLATES_DIR):
|
|
|
os.makedirs(TEMPLATES_DIR)
|
|
|
print(f"📁 已创建模板目录: {TEMPLATES_DIR}")
|
|
|
else:
|
|
|
print(f"📁 模板目录已就绪: {TEMPLATES_DIR}")
|
|
|
|
|
|
print("✅ BRAIN 表达式模板解码器完全初始化!")
|
|
|
print("🎯 准备处理模板并与 BRAIN API 集成!")
|
|
|
|
|
|
@app.route('/api/templates', methods=['GET'])
|
|
|
def get_templates():
|
|
|
"""获取所有自定义模板"""
|
|
|
try:
|
|
|
templates = []
|
|
|
templates_file = os.path.join(TEMPLATES_DIR, 'templates.json')
|
|
|
|
|
|
if os.path.exists(templates_file):
|
|
|
with open(templates_file, 'r', encoding='utf-8') as f:
|
|
|
templates = json.load(f)
|
|
|
|
|
|
return jsonify(templates)
|
|
|
except Exception as e:
|
|
|
return jsonify({'error': f'加载模板时出错: {str(e)}'}), 500
|
|
|
|
|
|
@app.route('/api/templates', methods=['POST'])
|
|
|
def save_template():
|
|
|
"""保存新的自定义模板"""
|
|
|
try:
|
|
|
data = request.get_json()
|
|
|
name = data.get('name', '').strip()
|
|
|
description = data.get('description', '').strip()
|
|
|
expression = data.get('expression', '').strip()
|
|
|
template_configurations = data.get('templateConfigurations', {})
|
|
|
|
|
|
if not name or not expression:
|
|
|
return jsonify({'error': '名称和表达式是必需的'}), 400
|
|
|
|
|
|
# 加载现有模板
|
|
|
templates_file = os.path.join(TEMPLATES_DIR, 'templates.json')
|
|
|
templates = []
|
|
|
|
|
|
if os.path.exists(templates_file):
|
|
|
with open(templates_file, 'r', encoding='utf-8') as f:
|
|
|
templates = json.load(f)
|
|
|
|
|
|
# 检查重复名称
|
|
|
existing_index = next((i for i, t in enumerate(templates) if t['name'] == name), None)
|
|
|
|
|
|
new_template = {
|
|
|
'name': name,
|
|
|
'description': description,
|
|
|
'expression': expression,
|
|
|
'templateConfigurations': template_configurations,
|
|
|
'createdAt': datetime.now().isoformat()
|
|
|
}
|
|
|
|
|
|
if existing_index is not None:
|
|
|
# 更新现有模板但保留 createdAt(如果存在)
|
|
|
if 'createdAt' in templates[existing_index]:
|
|
|
new_template['createdAt'] = templates[existing_index]['createdAt']
|
|
|
new_template['updatedAt'] = datetime.now().isoformat()
|
|
|
templates[existing_index] = new_template
|
|
|
message = f'模板 "{name}" 更新成功'
|
|
|
else:
|
|
|
# 添加新模板
|
|
|
templates.append(new_template)
|
|
|
message = f'模板 "{name}" 保存成功'
|
|
|
|
|
|
# 保存到文件
|
|
|
with open(templates_file, 'w', encoding='utf-8') as f:
|
|
|
json.dump(templates, f, indent=2, ensure_ascii=False)
|
|
|
|
|
|
return jsonify({'success': True, 'message': message})
|
|
|
|
|
|
except Exception as e:
|
|
|
return jsonify({'error': f'保存模板时出错: {str(e)}'}), 500
|
|
|
|
|
|
@app.route('/api/templates/<int:template_id>', methods=['DELETE'])
|
|
|
def delete_template(template_id):
|
|
|
"""删除自定义模板"""
|
|
|
try:
|
|
|
templates_file = os.path.join(TEMPLATES_DIR, 'templates.json')
|
|
|
templates = []
|
|
|
|
|
|
if os.path.exists(templates_file):
|
|
|
with open(templates_file, 'r', encoding='utf-8') as f:
|
|
|
templates = json.load(f)
|
|
|
|
|
|
if 0 <= template_id < len(templates):
|
|
|
deleted_template = templates.pop(template_id)
|
|
|
|
|
|
# 保存更新后的模板
|
|
|
with open(templates_file, 'w', encoding='utf-8') as f:
|
|
|
json.dump(templates, f, indent=2, ensure_ascii=False)
|
|
|
|
|
|
return jsonify({'success': True, 'message': f'模板 "{deleted_template["name"]}" 删除成功'})
|
|
|
else:
|
|
|
return jsonify({'error': '未找到模板'}), 404
|
|
|
|
|
|
except Exception as e:
|
|
|
return jsonify({'error': f'删除模板时出错: {str(e)}'}), 500
|
|
|
|
|
|
@app.route('/api/templates/export', methods=['GET'])
|
|
|
def export_templates():
|
|
|
"""将所有模板导出为 JSON"""
|
|
|
try:
|
|
|
templates_file = os.path.join(TEMPLATES_DIR, 'templates.json')
|
|
|
templates = []
|
|
|
|
|
|
if os.path.exists(templates_file):
|
|
|
with open(templates_file, 'r', encoding='utf-8') as f:
|
|
|
templates = json.load(f)
|
|
|
|
|
|
return jsonify(templates)
|
|
|
|
|
|
except Exception as e:
|
|
|
return jsonify({'error': f'导出模板时出错: {str(e)}'}), 500
|
|
|
|
|
|
@app.route('/api/templates/import', methods=['POST'])
|
|
|
def import_templates():
|
|
|
"""从 JSON 导入模板"""
|
|
|
try:
|
|
|
data = request.get_json()
|
|
|
imported_templates = data.get('templates', [])
|
|
|
overwrite = data.get('overwrite', False)
|
|
|
|
|
|
if not isinstance(imported_templates, list):
|
|
|
return jsonify({'error': '无效的模板格式'}), 400
|
|
|
|
|
|
# 验证模板结构
|
|
|
valid_templates = []
|
|
|
for template in imported_templates:
|
|
|
if (isinstance(template, dict) and
|
|
|
'name' in template and 'expression' in template and
|
|
|
template['name'].strip() and template['expression'].strip()):
|
|
|
valid_templates.append({
|
|
|
'name': template['name'].strip(),
|
|
|
'description': template.get('description', '').strip(),
|
|
|
'expression': template['expression'].strip(),
|
|
|
'templateConfigurations': template.get('templateConfigurations', {}),
|
|
|
'createdAt': template.get('createdAt', datetime.now().isoformat())
|
|
|
})
|
|
|
|
|
|
if not valid_templates:
|
|
|
return jsonify({'error': '未找到有效模板'}), 400
|
|
|
|
|
|
# 加载现有模板
|
|
|
templates_file = os.path.join(TEMPLATES_DIR, 'templates.json')
|
|
|
existing_templates = []
|
|
|
|
|
|
if os.path.exists(templates_file):
|
|
|
with open(templates_file, 'r', encoding='utf-8') as f:
|
|
|
existing_templates = json.load(f)
|
|
|
|
|
|
# 处理重复项
|
|
|
duplicates = []
|
|
|
new_templates = []
|
|
|
|
|
|
for template in valid_templates:
|
|
|
existing_index = next((i for i, t in enumerate(existing_templates) if t['name'] == template['name']), None)
|
|
|
|
|
|
if existing_index is not None:
|
|
|
duplicates.append(template['name'])
|
|
|
if overwrite:
|
|
|
existing_templates[existing_index] = template
|
|
|
else:
|
|
|
new_templates.append(template)
|
|
|
|
|
|
# 添加新模板
|
|
|
existing_templates.extend(new_templates)
|
|
|
|
|
|
# 保存到文件
|
|
|
with open(templates_file, 'w', encoding='utf-8') as f:
|
|
|
json.dump(existing_templates, f, indent=2, ensure_ascii=False)
|
|
|
|
|
|
result = {
|
|
|
'success': True,
|
|
|
'imported': len(new_templates),
|
|
|
'duplicates': duplicates,
|
|
|
'overwritten': len(duplicates) if overwrite else 0
|
|
|
}
|
|
|
|
|
|
return jsonify(result)
|
|
|
|
|
|
except Exception as e:
|
|
|
return jsonify({'error': f'导入模板时出错: {str(e)}'}), 500
|
|
|
|
|
|
@app.route('/api/run-simulator', methods=['POST'])
|
|
|
def run_simulator():
|
|
|
"""运行 simulator_wqb.py 脚本"""
|
|
|
try:
|
|
|
import subprocess
|
|
|
import threading
|
|
|
from pathlib import Path
|
|
|
|
|
|
# 获取脚本路径(现在在模拟器子文件夹中)
|
|
|
script_dir = os.path.dirname(os.path.abspath(__file__))
|
|
|
simulator_dir = os.path.join(script_dir, 'simulator')
|
|
|
simulator_path = os.path.join(simulator_dir, 'simulator_wqb.py')
|
|
|
|
|
|
# 检查脚本是否存在
|
|
|
if not os.path.exists(simulator_path):
|
|
|
return jsonify({'error': '在模拟器文件夹中未找到 simulator_wqb.py'}), 404
|
|
|
|
|
|
# 在新终端窗口中运行脚本
|
|
|
def run_script():
|
|
|
try:
|
|
|
# 对于 Windows
|
|
|
if os.name == 'nt':
|
|
|
# 使用子进程和适当的工作目录(模拟器文件夹)
|
|
|
subprocess.Popen(['cmd', '/k', 'python', 'simulator_wqb.py'],
|
|
|
cwd=simulator_dir,
|
|
|
creationflags=subprocess.CREATE_NEW_CONSOLE)
|
|
|
else:
|
|
|
# 对于 Unix 类系统
|
|
|
subprocess.Popen(['gnome-terminal', '--working-directory', simulator_dir, '--', 'python3', 'simulator_wqb.py'])
|
|
|
except Exception as e:
|
|
|
print(f"运行模拟器时出错: {e}")
|
|
|
|
|
|
# 在单独线程中启动脚本
|
|
|
thread = threading.Thread(target=run_script)
|
|
|
thread.daemon = True
|
|
|
thread.start()
|
|
|
|
|
|
return jsonify({
|
|
|
'success': True,
|
|
|
'message': '模拟器脚本已在新终端窗口中启动'
|
|
|
})
|
|
|
|
|
|
except Exception as e:
|
|
|
return jsonify({'error': f'运行模拟器失败: {str(e)}'}), 500
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
port = 5190
|
|
|
print("启动 BRAIN 表达式模板解码器 Web 应用程序...")
|
|
|
print(f"应用程序将在 http://localhost:{port} 运行")
|
|
|
print("包含 BRAIN API 集成 - 不需要单独的代理!")
|
|
|
app.run(debug=True, host='0.0.0.0', port=port) |