""" BRAIN 表达式模板解码器 - Flask 网络应用程序 一个完整的网络应用程序,用于解码字符串模板并与 WorldQuant BRAIN 集成 """ # 自动安装缺失的依赖 import subprocess import sys import os def install_requirements(): """如果缺少必需的包,则从 requirements.txt 安装""" print("🔍 检查并安装必需的依赖...") print("📋 验证 BRAIN 表达式模板解码器所需的包...") # 获取此脚本所在的目录 script_dir = os.path.dirname(os.path.abspath(__file__)) # 检查 requirements.txt 是否存在于脚本目录中 req_file = os.path.join(script_dir, 'requirements.txt') if not os.path.exists(req_file): print("❌ 错误: 未找到 requirements.txt!") print(f"查找路径: {req_file}") return False # 如果存在镜像配置则读取 mirror_url = 'https://pypi.tuna.tsinghua.edu.cn/simple' # 默认为清华镜像 mirror_config_file = os.path.join(script_dir, 'mirror_config.txt') if os.path.exists(mirror_config_file): try: with open(mirror_config_file, 'r', encoding='utf-8') as f: for line in f: line = line.strip() if line and not line.startswith('#') and line.startswith('http'): mirror_url = line break except Exception as e: print(f"警告: 无法读取镜像配置: {e}") # 尝试导入主要包以检查是否已安装 packages_to_check = { 'flask': 'flask', 'flask_cors': 'flask-cors', 'requests': 'requests', 'pandas': 'pandas', 'PyPDF2': 'PyPDF2', 'docx': 'python-docx', 'pdfplumber': 'pdfplumber', 'fitz': 'PyMuPDF', 'cozepy': 'cozepy', 'lxml': 'lxml', 'bs4': 'beautifulsoup4' } missing_packages = [] for import_name, pip_name in packages_to_check.items(): try: __import__(import_name) except ImportError: missing_packages.append(pip_name) print(f"缺失的包: {pip_name} (导入名: {import_name})") if missing_packages: print(f"⚠️ 检测到缺失的包: {', '.join(missing_packages)}") print("📦 从 requirements.txt 安装依赖...") print(f"🌐 使用镜像: {mirror_url}") try: # 使用配置的镜像安装所有 requirements subprocess.check_call([ sys.executable, '-m', 'pip', 'install', '-i', mirror_url, '-r', req_file ]) print("✅ 所有依赖安装成功!") return True except subprocess.CalledProcessError: print(f"❌ 错误: 使用 {mirror_url} 安装依赖失败") print("🔄 尝试使用默认 PyPI...") try: # 回退到默认 PyPI subprocess.check_call([sys.executable, '-m', 'pip', 'install', '-r', req_file]) print("✅ 所有依赖安装成功!") return True except subprocess.CalledProcessError: print("❌ 错误: 安装依赖失败。请手动运行:") print(f" {sys.executable} -m pip install -i {mirror_url} -r requirements.txt") return False else: print("✅ 所有必需的依赖已安装!") return True # 在导入前检查并安装依赖 # 每次导入模块时都会运行,但只在需要时安装 def check_and_install_dependencies(): """检查并在需要时安装依赖""" if not globals().get('_dependencies_checked'): if install_requirements(): globals()['_dependencies_checked'] = True return True else: print("\n请手动安装依赖并重试。") return False return True # 导入此模块时始终运行依赖检查 print("🚀 初始化 BRAIN 表达式模板解码器...") if not check_and_install_dependencies(): if __name__ == "__main__": sys.exit(1) else: print("⚠️ 警告: 可能缺少某些依赖。请运行 'pip install -r requirements.txt'") print("🔄 继续导入,但某些功能可能无法正常工作。") # 现在导入包 try: from flask import Flask, render_template, request, jsonify, session as flask_session from flask_cors import CORS import requests import json import time import os from datetime import datetime print("📚 核心包导入成功!") except ImportError as e: print(f"❌ 导入核心包失败: {e}") print("请运行: pip install -r requirements.txt") if __name__ == "__main__": sys.exit(1) raise app = Flask(__name__) app.secret_key = 'brain_template_decoder_secret_key_change_in_production' CORS(app) print("🌐 Flask 应用程序已初始化,支持 CORS!") # BRAIN API 配置 BRAIN_API_BASE = 'https://api.worldquantbrain.com' # 存储 BRAIN 会话(在生产环境中,使用适当的会话管理如 Redis) brain_sessions = {} print("🧠 BRAIN API 集成已配置!") def sign_in_to_brain(username, password): """使用重试逻辑和生物识别认证支持登录 BRAIN API""" from urllib.parse import urljoin session = requests.Session() session.auth = (username, password) retry_count = 0 max_retries = 3 while retry_count < max_retries: try: response = session.post(f'{BRAIN_API_BASE}/authentication') # 检查是否需要生物识别认证 if response.status_code == 401: # 检查是否为生物识别认证要求 if response.headers.get("WWW-Authenticate") == "persona": # 获取 location 头 location = response.headers.get("Location") if location: # 构建生物识别认证的完整 URL biometric_url = urljoin(response.url, location) # 返回特殊响应,指示需要生物识别认证 return { 'requires_biometric': True, 'biometric_url': biometric_url, 'session': session, 'location': location } else: raise Exception("需要生物识别认证但未提供 Location 头") else: # 常规认证失败 raise requests.HTTPError("认证失败: 用户名或密码无效") # 如果执行到这里,认证成功 response.raise_for_status() print("认证成功。") return session except requests.HTTPError as e: if "Invalid username or password" in str(e): raise # 对于无效凭证不重试 print(f"发生 HTTP 错误: {e}。正在重试...") retry_count += 1 if retry_count >= max_retries: raise time.sleep(2) except Exception as e: print(f"认证期间发生错误: {e}") retry_count += 1 if retry_count >= max_retries: raise time.sleep(2) # 路由 @app.route('/') def index(): """主应用程序页面""" return render_template('index.html') @app.route('/simulator') def simulator(): """用户友好的模拟器界面""" return render_template('simulator.html') @app.route('/api/simulator/logs', methods=['GET']) def get_simulator_logs(): """获取模拟器目录中可用的日志文件""" try: import glob import os from datetime import datetime # 在当前目录和模拟器目录中查找日志文件 script_dir = os.path.dirname(os.path.abspath(__file__)) simulator_dir = os.path.join(script_dir, 'simulator') log_files = [] # 检查当前目录和模拟器目录 for directory in [script_dir, simulator_dir]: if os.path.exists(directory): pattern = os.path.join(directory, 'wqb*.log') for log_file in glob.glob(pattern): try: stat = os.stat(log_file) log_files.append({ 'filename': os.path.basename(log_file), 'path': log_file, 'size': f"{stat.st_size / 1024:.1f} KB", 'modified': datetime.fromtimestamp(stat.st_mtime).strftime('%Y-%m-%d %H:%M:%S'), 'mtime': stat.st_mtime }) except Exception as e: print(f"读取日志文件 {log_file} 时出错: {e}") # 按修改时间排序(最新的在前) log_files.sort(key=lambda x: x['mtime'], reverse=True) # 查找最新的日志文件 latest = log_files[0]['filename'] if log_files else None return jsonify({ 'logs': log_files, 'latest': latest, 'count': len(log_files) }) except Exception as e: return jsonify({'error': f'获取日志文件时出错: {str(e)}'}), 500 @app.route('/api/simulator/logs/', methods=['GET']) def get_simulator_log_content(filename): """获取特定日志文件的内容""" try: import os # 安全性: 只允许使用安全名称的日志文件 if not filename.startswith('wqb') or not filename.endswith('.log'): return jsonify({'error': '无效的日志文件名'}), 400 script_dir = os.path.dirname(os.path.abspath(__file__)) simulator_dir = os.path.join(script_dir, 'simulator') # 在两个目录中查找文件 log_path = None for directory in [script_dir, simulator_dir]: potential_path = os.path.join(directory, filename) if os.path.exists(potential_path): log_path = potential_path break if not log_path: return jsonify({'error': '未找到日志文件'}), 404 # 使用多种编码尝试读取文件内容 content = None encodings_to_try = ['utf-8', 'gbk', 'gb2312', 'big5', 'latin-1', 'cp1252'] for encoding in encodings_to_try: try: with open(log_path, 'r', encoding=encoding) as f: content = f.read() print(f"使用 {encoding} 编码成功读取日志文件") break except UnicodeDecodeError: continue except Exception as e: print(f"使用 {encoding} 读取时出错: {e}") continue if content is None: # 最后手段: 以二进制读取并使用错误处理解码 try: with open(log_path, 'rb') as f: raw_content = f.read() content = raw_content.decode('utf-8', errors='replace') print("使用 UTF-8 带错误替换读取日志内容") except Exception as e: content = f"错误: 无法解码文件内容 - {str(e)}" response = jsonify({ 'content': content, 'filename': filename, 'size': len(content) }) response.headers['Content-Type'] = 'application/json; charset=utf-8' return response except Exception as e: return jsonify({'error': f'读取日志文件时出错: {str(e)}'}), 500 @app.route('/api/simulator/test-connection', methods=['POST']) def test_simulator_connection(): """测试模拟器的 BRAIN API 连接""" try: data = request.get_json() username = data.get('username') password = data.get('password') if not username or not password: return jsonify({'error': '需要用户名和密码'}), 400 # 使用现有的 sign_in_to_brain 函数测试连接 result = sign_in_to_brain(username, password) # 处理生物识别认证要求 if isinstance(result, dict) and result.get('requires_biometric'): return jsonify({ 'success': False, 'error': '需要生物识别认证。请先使用主界面完成认证。', 'requires_biometric': True }) # 测试简单的 API 调用以验证连接 brain_session = result response = brain_session.get(f'{BRAIN_API_BASE}/data-fields/open') if response.ok: return jsonify({ 'success': True, 'message': '连接成功' }) else: return jsonify({ 'success': False, 'error': f'API 测试失败: {response.status_code}' }) except Exception as e: return jsonify({ 'success': False, 'error': f'连接失败: {str(e)}' }) @app.route('/api/simulator/run', methods=['POST']) def run_simulator_with_params(): """在新终端中使用用户提供的参数运行模拟器""" try: import subprocess import threading import json import os import tempfile import sys import time # 获取表单数据 json_file = request.files.get('jsonFile') username = request.form.get('username') password = request.form.get('password') start_position = int(request.form.get('startPosition', 0)) concurrent_count = int(request.form.get('concurrentCount', 3)) random_shuffle = request.form.get('randomShuffle') == 'true' use_multi_sim = request.form.get('useMultiSim') == 'true' alpha_count_per_slot = int(request.form.get('alphaCountPerSlot', 3)) if not json_file or not username or not password: return jsonify({'error': '缺少必需参数'}), 400 # 验证并读取 JSON 文件 try: json_content = json_file.read().decode('utf-8') expressions_data = json.loads(json_content) if not isinstance(expressions_data, list): return jsonify({'error': 'JSON 文件必须包含表达式数组'}), 400 except Exception as e: return jsonify({'error': f'无效的 JSON 文件: {str(e)}'}), 400 # 获取路径 script_dir = os.path.dirname(os.path.abspath(__file__)) simulator_dir = os.path.join(script_dir, 'simulator') # 为自动化运行创建临时文件 temp_json_path = os.path.join(simulator_dir, f'temp_expressions_{int(time.time())}.json') temp_script_path = os.path.join(simulator_dir, f'temp_automated_{int(time.time())}.py') temp_batch_path = os.path.join(simulator_dir, f'temp_run_{int(time.time())}.bat') try: # 将 JSON 数据保存到临时文件 with open(temp_json_path, 'w', encoding='utf-8') as f: json.dump(expressions_data, f, ensure_ascii=False, indent=2) # 创建调用 automated_main 的自动化脚本 script_content = f''' import asyncio import sys import os import json # 将当前目录添加到路径 sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) import simulator_wqb async def run_automated(): """使用 Web 界面的参数运行自动化模拟器""" try: # 加载 JSON 数据 with open(r"{temp_json_path}", 'r', encoding='utf-8') as f: json_content = f.read() # 使用参数调用 automated_main result = await simulator_wqb.automated_main( json_file_content=json_content, username="{username}", password="{password}", start_position={start_position}, concurrent_count={concurrent_count}, random_shuffle={random_shuffle}, use_multi_sim={use_multi_sim}, alpha_count_per_slot={alpha_count_per_slot} ) if result['success']: print("\\n" + "="*60) print("🎉 Web 界面自动化成功 🎉") print("="*60) print(f"✅ 总模拟次数: {{result['results']['total']}}") print(f"✅ 成功: {{result['results']['successful']}}") print(f"❌ 失败: {{result['results']['failed']}}") if result['results']['alphaIds']: print(f"📊 生成 {{len(result['results']['alphaIds'])}} 个 Alpha ID") print("="*60) else: print("\\n" + "="*60) print("❌ Web 界面自动化失败") print("="*60) print(f"错误: {{result['error']}}") print("="*60) except Exception as e: print(f"\\n❌ 脚本执行错误: {{e}}") finally: # 清理临时文件 try: if os.path.exists(r"{temp_json_path}"): os.remove(r"{temp_json_path}") if os.path.exists(r"{temp_script_path}"): os.remove(r"{temp_script_path}") if os.path.exists(r"{temp_batch_path}"): os.remove(r"{temp_batch_path}") except: pass print("\\n🔄 按任意键关闭此窗口...") input() if __name__ == '__main__': asyncio.run(run_automated()) ''' # 保存脚本 with open(temp_script_path, 'w', encoding='utf-8') as f: f.write(script_content) # 为 Windows 创建批处理文件 batch_content = f'''@echo off cd /d "{simulator_dir}" python "{os.path.basename(temp_script_path)}" ''' with open(temp_batch_path, 'w', encoding='utf-8') as f: f.write(batch_content) # 在新终端中启动 def launch_simulator(): try: if os.name == 'nt': # Windows # 使用批处理文件避免路径问题 subprocess.Popen([ temp_batch_path ], creationflags=subprocess.CREATE_NEW_CONSOLE) else: # Unix 类系统 # 尝试不同的终端模拟器 terminals = ['gnome-terminal', 'xterm', 'konsole', 'terminal'] for terminal in terminals: try: if terminal == 'gnome-terminal': subprocess.Popen([ terminal, '--working-directory', simulator_dir, '--', 'python3', os.path.basename(temp_script_path) ]) else: subprocess.Popen([ terminal, '-e', f'cd "{simulator_dir}" && python3 "{os.path.basename(temp_script_path)}"' ]) break except FileNotFoundError: continue else: # 回退: 如果找不到终端则在后台运行 subprocess.Popen([ sys.executable, temp_script_path ], cwd=simulator_dir) except Exception as e: print(f"启动模拟器时出错: {e}") # 在单独的线程中启动模拟器 thread = threading.Thread(target=launch_simulator) thread.daemon = True thread.start() return jsonify({ 'success': True, 'message': '模拟器已在新终端窗口中启动', 'parameters': { 'expressions_count': len(expressions_data), 'concurrent_count': concurrent_count, 'use_multi_sim': use_multi_sim, 'alpha_count_per_slot': alpha_count_per_slot if use_multi_sim else None } }) except Exception as e: # 出错时清理 try: if os.path.exists(temp_json_path): os.remove(temp_json_path) if os.path.exists(temp_script_path): os.remove(temp_script_path) if os.path.exists(temp_batch_path): os.remove(temp_batch_path) except: pass raise e except Exception as e: return jsonify({'error': f'运行模拟器失败: {str(e)}'}), 500 @app.route('/api/simulator/stop', methods=['POST']) def stop_simulator(): """停止运行的模拟器""" try: # 这是一个占位符 - 在生产环境中,您需要 # 实现适当的进程管理来停止运行的模拟 return jsonify({ 'success': True, 'message': '停止信号已发送' }) except Exception as e: return jsonify({'error': f'停止模拟器失败: {str(e)}'}), 500 @app.route('/api/authenticate', methods=['POST']) def authenticate(): """使用 BRAIN API 进行认证""" try: data = request.get_json() username = data.get('username') password = data.get('password') if not username or not password: return jsonify({'error': '需要用户名和密码'}), 400 # 使用 BRAIN 进行认证 result = sign_in_to_brain(username, password) # 检查是否需要生物识别认证 if isinstance(result, dict) and result.get('requires_biometric'): # 临时存储具有生物识别待处理状态的会话 session_id = f"{username}_{int(time.time())}_biometric_pending" brain_sessions[session_id] = { 'session': result['session'], 'username': username, 'timestamp': time.time(), 'biometric_pending': True, 'biometric_location': result['location'] } # 在 Flask 会话中存储会话 ID flask_session['brain_session_id'] = session_id return jsonify({ 'success': False, 'requires_biometric': True, 'biometric_url': result['biometric_url'], 'session_id': session_id, 'message': '请通过访问提供的 URL 完成生物识别认证' }) # 常规成功认证 brain_session = result # 存储会话 session_id = f"{username}_{int(time.time())}" brain_sessions[session_id] = { 'session': brain_session, 'username': username, 'timestamp': time.time() } # 在 Flask 会话中存储会话 ID flask_session['brain_session_id'] = session_id return jsonify({ 'success': True, 'session_id': session_id, 'message': '认证成功' }) except requests.HTTPError as e: if e.response.status_code == 401: return jsonify({'error': '用户名或密码无效'}), 401 else: return jsonify({'error': f'认证失败: {str(e)}'}), 500 except Exception as e: return jsonify({'error': f'认证错误: {str(e)}'}), 500 @app.route('/api/complete-biometric', methods=['POST']) def complete_biometric(): """在用户在浏览器中完成生物识别认证后完成认证""" try: from urllib.parse import urljoin session_id = request.headers.get('Session-ID') or flask_session.get('brain_session_id') if not session_id or session_id not in brain_sessions: return jsonify({'error': '无效或过期的会话'}), 401 session_info = brain_sessions[session_id] # 检查此会话是否正在等待生物识别完成 if not session_info.get('biometric_pending'): return jsonify({'error': '会话不处于生物识别认证待处理状态'}), 400 brain_session = session_info['session'] location = session_info['biometric_location'] # 完成生物识别认证 try: # 向 location 发出后续 POST 请求 complete_url = urljoin(f'{BRAIN_API_BASE}/authentication', location) response = brain_session.post(complete_url) response.raise_for_status() # 更新会话信息 - 移除生物识别待处理状态 session_info['biometric_pending'] = False del session_info['biometric_location'] # 创建没有 biometric_pending 后缀的新会话 ID new_session_id = f"{session_info['username']}_{int(time.time())}" brain_sessions[new_session_id] = { 'session': brain_session, 'username': session_info['username'], 'timestamp': time.time() } # 删除旧会话 del brain_sessions[session_id] # 更新 Flask 会话 flask_session['brain_session_id'] = new_session_id return jsonify({ 'success': True, 'session_id': new_session_id, 'message': '生物识别认证成功完成' }) except requests.HTTPError as e: return jsonify({ 'error': f'完成生物识别认证失败: {str(e)}' }), 500 except Exception as e: return jsonify({'error': f'完成生物识别认证时出错: {str(e)}'}), 500 @app.route('/api/operators', methods=['GET']) def get_operators(): """从 BRAIN API 获取用户运算符""" try: session_id = request.headers.get('Session-ID') or flask_session.get('brain_session_id') if not session_id or session_id not in brain_sessions: return jsonify({'error': '无效或过期的会话'}), 401 session_info = brain_sessions[session_id] brain_session = session_info['session'] # 首先尝试不使用分页参数(大多数 API 一次返回所有运算符) try: response = brain_session.get(f'{BRAIN_API_BASE}/operators') response.raise_for_status() data = response.json() # 如果是列表,我们获得了所有运算符 if isinstance(data, list): all_operators = data print(f"从 BRAIN API 获取了 {len(all_operators)} 个运算符(直接)") # 如果是包含 results 的字典,处理分页 elif isinstance(data, dict) and 'results' in data: all_operators = [] total_count = data.get('count', len(data['results'])) print(f"找到 {total_count} 个总运算符,正在获取全部...") # 获取第一批 all_operators.extend(data['results']) # 如果需要,获取剩余批次 limit = 100 offset = len(data['results']) while len(all_operators) < total_count: params = {'limit': limit, 'offset': offset} batch_response = brain_session.get(f'{BRAIN_API_BASE}/operators', params=params) batch_response.raise_for_status() batch_data = batch_response.json() if isinstance(batch_data, dict) and 'results' in batch_data: batch_operators = batch_data['results'] if not batch_operators: # 没有更多数据 break all_operators.extend(batch_operators) offset += len(batch_operators) else: break print(f"从 BRAIN API 获取了 {len(all_operators)} 个运算符(分页)") else: # 未知格式,视为空 all_operators = [] print("运算符 API 的响应格式未知") except Exception as e: print(f"获取运算符时出错: {str(e)}") # 回退: 尝试显式分页 all_operators = [] limit = 100 offset = 0 while True: params = {'limit': limit, 'offset': offset} response = brain_session.get(f'{BRAIN_API_BASE}/operators', params=params) response.raise_for_status() data = response.json() if isinstance(data, list): all_operators.extend(data) if len(data) < limit: break elif isinstance(data, dict) and 'results' in data: batch_operators = data['results'] all_operators.extend(batch_operators) if len(batch_operators) < limit: break else: break offset += limit print(f"从 BRAIN API 获取了 {len(all_operators)} 个运算符(回退)") # 提取名称、类别、描述、定义和其他字段(如果可用) filtered_operators = [] for op in all_operators: operator_data = { 'name': op['name'], 'category': op['category'] } # 如果可用则包含描述 if 'description' in op and op['description']: operator_data['description'] = op['description'] # 如果可用则包含定义 if 'definition' in op and op['definition']: operator_data['definition'] = op['definition'] # 如果可用则包含使用计数 if 'usageCount' in op: operator_data['usageCount'] = op['usageCount'] # 如果可用则包含其他有用字段 if 'example' in op and op['example']: operator_data['example'] = op['example'] filtered_operators.append(operator_data) return jsonify(filtered_operators) except Exception as e: print(f"获取运算符时出错: {str(e)}") return jsonify({'error': f'获取运算符失败: {str(e)}'}), 500 @app.route('/api/datafields', methods=['GET']) def get_datafields(): """从 BRAIN API 获取数据字段""" try: session_id = request.headers.get('Session-ID') or flask_session.get('brain_session_id') if not session_id or session_id not in brain_sessions: return jsonify({'error': '无效或过期的会话'}), 401 session_info = brain_sessions[session_id] brain_session = session_info['session'] # 获取参数 region = request.args.get('region', 'USA') delay = request.args.get('delay', '1') universe = request.args.get('universe', 'TOP3000') dataset_id = request.args.get('dataset_id', 'fundamental6') search = '' # 基于笔记本实现构建 URL 模板 if len(search) == 0: url_template = f"{BRAIN_API_BASE}/data-fields?" + \ f"&instrumentType=EQUITY" + \ f"®ion={region}&delay={delay}&universe={universe}&dataset.id={dataset_id}&limit=50" + \ "&offset={x}" # 从第一个请求获取计数 first_response = brain_session.get(url_template.format(x=0)) first_response.raise_for_status() count = first_response.json()['count'] else: url_template = f"{BRAIN_API_BASE}/data-fields?" + \ f"&instrumentType=EQUITY" + \ f"®ion={region}&delay={delay}&universe={universe}&limit=50" + \ f"&search={search}" + \ "&offset={x}" count = 100 # 搜索查询的默认值 # 分批获取所有数据字段 datafields_list = [] for x in range(0, count, 50): response = brain_session.get(url_template.format(x=x)) response.raise_for_status() datafields_list.append(response.json()['results']) # 展平列表 datafields_list_flat = [item for sublist in datafields_list for item in sublist] # 过滤字段,只包含必要信息 filtered_fields = [ { 'id': field['id'], 'description': field['description'], 'type': field['type'], 'coverage': field.get('coverage', 0), 'userCount': field.get('userCount', 0), 'alphaCount': field.get('alphaCount', 0) } for field in datafields_list_flat ] return jsonify(filtered_fields) except Exception as e: return jsonify({'error': f'获取数据字段失败: {str(e)}'}), 500 @app.route('/api/dataset-description', methods=['GET']) def get_dataset_description(): """从 BRAIN API 获取数据集描述""" try: session_id = request.headers.get('Session-ID') or flask_session.get('brain_session_id') if not session_id or session_id not in brain_sessions: return jsonify({'error': '无效或过期的会话'}), 401 session_info = brain_sessions[session_id] brain_session = session_info['session'] # 获取参数 region = request.args.get('region', 'USA') delay = request.args.get('delay', '1') universe = request.args.get('universe', 'TOP3000') dataset_id = request.args.get('dataset_id', 'analyst10') # 构建数据集描述的 URL url = f"{BRAIN_API_BASE}/data-sets/{dataset_id}?" + \ f"instrumentType=EQUITY®ion={region}&delay={delay}&universe={universe}" print(f"从以下位置获取数据集描述: {url}") # 向 BRAIN API 发出请求 response = brain_session.get(url) response.raise_for_status() data = response.json() description = data.get('description', '无描述可用') print(f"数据集描述已检索: {description[:100]}...") return jsonify({ 'success': True, 'description': description, 'dataset_id': dataset_id }) except Exception as e: print(f"数据集描述错误: {str(e)}") return jsonify({'error': f'获取数据集描述失败: {str(e)}'}), 500 @app.route('/api/status', methods=['GET']) def check_status(): """检查会话是否仍然有效""" try: session_id = request.headers.get('Session-ID') or flask_session.get('brain_session_id') if not session_id or session_id not in brain_sessions: return jsonify({'valid': False}) session_info = brain_sessions[session_id] # 检查会话是否不太旧(24 小时) if time.time() - session_info['timestamp'] > 86400: del brain_sessions[session_id] return jsonify({'valid': False}) # 检查生物识别认证是否待处理 if session_info.get('biometric_pending'): return jsonify({ 'valid': False, 'biometric_pending': True, 'username': session_info['username'], 'message': '生物识别认证待处理' }) return jsonify({ 'valid': True, 'username': session_info['username'] }) except Exception as e: return jsonify({'error': f'状态检查失败: {str(e)}'}), 500 @app.route('/api/logout', methods=['POST']) def logout(): """注销并清理会话""" try: session_id = request.headers.get('Session-ID') or flask_session.get('brain_session_id') if session_id and session_id in brain_sessions: del brain_sessions[session_id] if 'brain_session_id' in flask_session: flask_session.pop('brain_session_id') return jsonify({'success': True, 'message': '注销成功'}) except Exception as e: return jsonify({'error': f'注销失败: {str(e)}'}), 500 @app.route('/api/test-expression', methods=['POST']) def test_expression(): """使用 BRAIN API 模拟测试表达式""" try: session_id = request.headers.get('Session-ID') or flask_session.get('brain_session_id') if not session_id or session_id not in brain_sessions: return jsonify({'error': '无效或过期的会话'}), 401 session_info = brain_sessions[session_id] brain_session = session_info['session'] # 从请求获取模拟数据 simulation_data = request.get_json() # 确保必需字段存在 if 'type' not in simulation_data: simulation_data['type'] = 'REGULAR' # 确保设置具有必需字段 if 'settings' not in simulation_data: simulation_data['settings'] = {} # 为缺失的设置设置默认值 default_settings = { 'instrumentType': 'EQUITY', 'region': 'USA', 'universe': 'TOP3000', 'delay': 1, 'decay': 15, 'neutralization': 'SUBINDUSTRY', 'truncation': 0.08, 'pasteurization': 'ON', 'testPeriod': 'P1Y6M', 'unitHandling': 'VERIFY', 'nanHandling': 'OFF', 'language': 'FASTEXPR', 'visualization': False } for key, value in default_settings.items(): if key not in simulation_data['settings']: simulation_data['settings'][key] = value # 将字符串布尔值转换为实际布尔值 if isinstance(simulation_data['settings'].get('visualization'), str): viz_value = simulation_data['settings']['visualization'].lower() simulation_data['settings']['visualization'] = viz_value == 'true' # 发送模拟请求(遵循笔记本模式) try: message = {} simulation_response = brain_session.post(f'{BRAIN_API_BASE}/simulations', json=simulation_data) # 检查是否收到 Location 头(遵循笔记本模式) if 'Location' in simulation_response.headers: # 跟随 location 获取实际状态 message = brain_session.get(simulation_response.headers['Location']).json() # 检查模拟是否正在运行或已完成 if 'progress' in message.keys(): info_to_print = "模拟正在运行" return jsonify({ 'success': True, 'status': 'RUNNING', 'message': info_to_print, 'full_response': message }) else: # 返回完整消息,如笔记本中所示 return jsonify({ 'success': message.get('status') != 'ERROR', 'status': message.get('status', 'UNKNOWN'), 'message': str(message), 'full_response': message }) else: # 尝试从响应体获取错误(遵循笔记本模式) try: message = simulation_response.json() return jsonify({ 'success': False, 'status': 'ERROR', 'message': str(message), 'full_response': message }) except: return jsonify({ 'success': False, 'status': 'ERROR', 'message': '网络连接错误', 'full_response': {} }) except Exception as e: return jsonify({ 'success': False, 'status': 'ERROR', 'message': '网络连接错误', 'full_response': {'error': str(e)} }) except Exception as e: import traceback return jsonify({ 'success': False, 'status': 'ERROR', 'message': f'测试表达式失败: {str(e)}', 'full_response': {'error': str(e), 'traceback': traceback.format_exc()} }), 500 @app.route('/api/test-operators', methods=['GET']) def test_operators(): """测试端点以检查原始 BRAIN 运算符 API 响应""" try: session_id = request.headers.get('Session-ID') or flask_session.get('brain_session_id') if not session_id or session_id not in brain_sessions: return jsonify({'error': '无效或过期的会话'}), 401 session_info = brain_sessions[session_id] brain_session = session_info['session'] # 从 BRAIN API 获取原始响应 response = brain_session.get(f'{BRAIN_API_BASE}/operators') response.raise_for_status() data = response.json() # 返回原始响应信息用于调试 result = { 'type': str(type(data)), 'is_list': isinstance(data, list), 'is_dict': isinstance(data, dict), 'length': len(data) if isinstance(data, list) else None, 'keys': list(data.keys()) if isinstance(data, dict) else None, 'count_key': data.get('count') if isinstance(data, dict) else None, 'first_few_items': data[:3] if isinstance(data, list) else (data.get('results', [])[:3] if isinstance(data, dict) else None) } return jsonify(result) except Exception as e: return jsonify({'error': f'测试失败: {str(e)}'}), 500 # 导入蓝图 try: from blueprints import idea_house_bp, paper_analysis_bp, feature_engineering_bp print("📦 蓝图导入成功!") except ImportError as e: print(f"❌ 导入蓝图失败: {e}") print("某些功能可能不可用。") # 注册蓝图 app.register_blueprint(idea_house_bp, url_prefix='/idea-house') app.register_blueprint(paper_analysis_bp, url_prefix='/paper-analysis') app.register_blueprint(feature_engineering_bp, url_prefix='/feature-engineering') print("🔧 所有蓝图注册成功!") print(" - idea库: /idea-house") print(" - 论文分析: /paper-analysis") print(" - 特征工程: /feature-engineering") # 模板管理路由 # 获取此脚本所在的目录以获取模板 script_dir = os.path.dirname(os.path.abspath(__file__)) TEMPLATES_DIR = os.path.join(script_dir, 'custom_templates') # 确保模板目录存在 if not os.path.exists(TEMPLATES_DIR): os.makedirs(TEMPLATES_DIR) print(f"📁 已创建模板目录: {TEMPLATES_DIR}") else: print(f"📁 模板目录已就绪: {TEMPLATES_DIR}") print("✅ BRAIN 表达式模板解码器完全初始化!") print("🎯 准备处理模板并与 BRAIN API 集成!") @app.route('/api/templates', methods=['GET']) def get_templates(): """获取所有自定义模板""" try: templates = [] templates_file = os.path.join(TEMPLATES_DIR, 'templates.json') if os.path.exists(templates_file): with open(templates_file, 'r', encoding='utf-8') as f: templates = json.load(f) return jsonify(templates) except Exception as e: return jsonify({'error': f'加载模板时出错: {str(e)}'}), 500 @app.route('/api/templates', methods=['POST']) def save_template(): """保存新的自定义模板""" try: data = request.get_json() name = data.get('name', '').strip() description = data.get('description', '').strip() expression = data.get('expression', '').strip() template_configurations = data.get('templateConfigurations', {}) if not name or not expression: return jsonify({'error': '名称和表达式是必需的'}), 400 # 加载现有模板 templates_file = os.path.join(TEMPLATES_DIR, 'templates.json') templates = [] if os.path.exists(templates_file): with open(templates_file, 'r', encoding='utf-8') as f: templates = json.load(f) # 检查重复名称 existing_index = next((i for i, t in enumerate(templates) if t['name'] == name), None) new_template = { 'name': name, 'description': description, 'expression': expression, 'templateConfigurations': template_configurations, 'createdAt': datetime.now().isoformat() } if existing_index is not None: # 更新现有模板但保留 createdAt(如果存在) if 'createdAt' in templates[existing_index]: new_template['createdAt'] = templates[existing_index]['createdAt'] new_template['updatedAt'] = datetime.now().isoformat() templates[existing_index] = new_template message = f'模板 "{name}" 更新成功' else: # 添加新模板 templates.append(new_template) message = f'模板 "{name}" 保存成功' # 保存到文件 with open(templates_file, 'w', encoding='utf-8') as f: json.dump(templates, f, indent=2, ensure_ascii=False) return jsonify({'success': True, 'message': message}) except Exception as e: return jsonify({'error': f'保存模板时出错: {str(e)}'}), 500 @app.route('/api/templates/', methods=['DELETE']) def delete_template(template_id): """删除自定义模板""" try: templates_file = os.path.join(TEMPLATES_DIR, 'templates.json') templates = [] if os.path.exists(templates_file): with open(templates_file, 'r', encoding='utf-8') as f: templates = json.load(f) if 0 <= template_id < len(templates): deleted_template = templates.pop(template_id) # 保存更新后的模板 with open(templates_file, 'w', encoding='utf-8') as f: json.dump(templates, f, indent=2, ensure_ascii=False) return jsonify({'success': True, 'message': f'模板 "{deleted_template["name"]}" 删除成功'}) else: return jsonify({'error': '未找到模板'}), 404 except Exception as e: return jsonify({'error': f'删除模板时出错: {str(e)}'}), 500 @app.route('/api/templates/export', methods=['GET']) def export_templates(): """将所有模板导出为 JSON""" try: templates_file = os.path.join(TEMPLATES_DIR, 'templates.json') templates = [] if os.path.exists(templates_file): with open(templates_file, 'r', encoding='utf-8') as f: templates = json.load(f) return jsonify(templates) except Exception as e: return jsonify({'error': f'导出模板时出错: {str(e)}'}), 500 @app.route('/api/templates/import', methods=['POST']) def import_templates(): """从 JSON 导入模板""" try: data = request.get_json() imported_templates = data.get('templates', []) overwrite = data.get('overwrite', False) if not isinstance(imported_templates, list): return jsonify({'error': '无效的模板格式'}), 400 # 验证模板结构 valid_templates = [] for template in imported_templates: if (isinstance(template, dict) and 'name' in template and 'expression' in template and template['name'].strip() and template['expression'].strip()): valid_templates.append({ 'name': template['name'].strip(), 'description': template.get('description', '').strip(), 'expression': template['expression'].strip(), 'templateConfigurations': template.get('templateConfigurations', {}), 'createdAt': template.get('createdAt', datetime.now().isoformat()) }) if not valid_templates: return jsonify({'error': '未找到有效模板'}), 400 # 加载现有模板 templates_file = os.path.join(TEMPLATES_DIR, 'templates.json') existing_templates = [] if os.path.exists(templates_file): with open(templates_file, 'r', encoding='utf-8') as f: existing_templates = json.load(f) # 处理重复项 duplicates = [] new_templates = [] for template in valid_templates: existing_index = next((i for i, t in enumerate(existing_templates) if t['name'] == template['name']), None) if existing_index is not None: duplicates.append(template['name']) if overwrite: existing_templates[existing_index] = template else: new_templates.append(template) # 添加新模板 existing_templates.extend(new_templates) # 保存到文件 with open(templates_file, 'w', encoding='utf-8') as f: json.dump(existing_templates, f, indent=2, ensure_ascii=False) result = { 'success': True, 'imported': len(new_templates), 'duplicates': duplicates, 'overwritten': len(duplicates) if overwrite else 0 } return jsonify(result) except Exception as e: return jsonify({'error': f'导入模板时出错: {str(e)}'}), 500 @app.route('/api/run-simulator', methods=['POST']) def run_simulator(): """运行 simulator_wqb.py 脚本""" try: import subprocess import threading from pathlib import Path # 获取脚本路径(现在在模拟器子文件夹中) script_dir = os.path.dirname(os.path.abspath(__file__)) simulator_dir = os.path.join(script_dir, 'simulator') simulator_path = os.path.join(simulator_dir, 'simulator_wqb.py') # 检查脚本是否存在 if not os.path.exists(simulator_path): return jsonify({'error': '在模拟器文件夹中未找到 simulator_wqb.py'}), 404 # 在新终端窗口中运行脚本 def run_script(): try: # 对于 Windows if os.name == 'nt': # 使用子进程和适当的工作目录(模拟器文件夹) subprocess.Popen(['cmd', '/k', 'python', 'simulator_wqb.py'], cwd=simulator_dir, creationflags=subprocess.CREATE_NEW_CONSOLE) else: # 对于 Unix 类系统 subprocess.Popen(['gnome-terminal', '--working-directory', simulator_dir, '--', 'python3', 'simulator_wqb.py']) except Exception as e: print(f"运行模拟器时出错: {e}") # 在单独线程中启动脚本 thread = threading.Thread(target=run_script) thread.daemon = True thread.start() return jsonify({ 'success': True, 'message': '模拟器脚本已在新终端窗口中启动' }) except Exception as e: return jsonify({'error': f'运行模拟器失败: {str(e)}'}), 500 if __name__ == '__main__': port = 5190 print("启动 BRAIN 表达式模板解码器 Web 应用程序...") print(f"应用程序将在 http://localhost:{port} 运行") print("包含 BRAIN API 集成 - 不需要单独的代理!") app.run(debug=True, host='0.0.0.0', port=port)