# -*- coding: utf-8 -*- """主程序入口 - FastAPI版本""" import time from fastapi import FastAPI, HTTPException from core.simulator import AlphaSimulator # 创建FastAPI应用 app = FastAPI( title="Alpha因子模拟器API", description="通过HTTP接口触发的因子模拟器", version="1.0.0" ) # 全局变量 _simulator = None _last_result = None _is_processing = False _token_expiry_time = 0 # token过期时间戳 _TOKEN_REFRESH_THRESHOLD = 1800 # 30分钟刷新阈值 def initialize_simulator(): """初始化模拟器""" global _simulator if _simulator is None: _simulator = AlphaSimulator() _simulator.initialize() print("模拟器初始化完成") return _simulator def check_and_refresh_token(): """检查并刷新token""" global _token_expiry_time, _simulator current_time = time.time() # 如果token即将过期(剩余时间少于30分钟),则刷新 if current_time >= _token_expiry_time - _TOKEN_REFRESH_THRESHOLD: print("token即将过期,重新登录...") _simulator.login() # 更新token过期时间(假设token有效期14400秒) _token_expiry_time = current_time + 14400 print(f"token已刷新,有效期至: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(_token_expiry_time))}") return True else: remaining = int(_token_expiry_time - current_time) print(f"token仍然有效,剩余时间: {remaining // 3600}小时{(remaining % 3600) // 60}分钟") return False @app.get("/") async def root(): """根路径,返回服务信息""" return { "service": "Alpha因子模拟器", "version": "1.0.0", "status": "running", "endpoints": { "run_simulation": "POST /run - 触发一次模拟执行", "last_result": "GET /result - 获取上次执行结果", "status": "GET /status - 查看服务状态", "health": "GET /health - 健康检查" } } @app.post("/run") async def run_simulation(): """ 触发一次模拟执行 执行顺序: 1. 检查token并刷新 2. 加载alpha列表 3. 执行批量模拟 """ global _is_processing, _last_result, _simulator, _token_expiry_time # 检查是否正在处理 if _is_processing: return { "status": "busy", "message": "已有模拟任务正在执行中,请稍后再试", "timestamp": time.time() } try: _is_processing = True # 初始化模拟器 simulator = initialize_simulator() # 1. 检查并刷新token(只在需要时) print("检查token状态...") if _token_expiry_time == 0: # 第一次运行,需要登录 print("首次运行,进行登录...") simulator.login() _token_expiry_time = time.time() + 14400 print("登录成功") else: check_and_refresh_token() # 2. 加载alpha列表 print("加载alpha列表...") alpha_list = simulator.load_alpha_list() if not alpha_list: result = { "status": "no_data", "message": "数据库中没有待处理的alpha表达式", "timestamp": time.time() } _last_result = result return result # 3. 执行批量模拟 print(f"共加载 {len(alpha_list)} 个需要模拟的因子表达式") success_count, fail_count = simulator.run_batch_simulation(alpha_list) result_message = f"处理完成: 成功 {success_count} 个, 失败 {fail_count} 个" print(result_message) result = { "status": "completed", "message": result_message, "data": { "alpha_count": len(alpha_list), "success_count": success_count, "fail_count": fail_count }, "timestamp": time.time() } _last_result = result return result except Exception as e: error_msg = f"模拟执行失败: {str(e)}" print(error_msg) result = { "status": "error", "message": error_msg, "timestamp": time.time() } _last_result = result return result finally: _is_processing = False @app.get("/result") async def get_last_result(): """获取上次执行的结果""" if _last_result is None: raise HTTPException(status_code=404, detail="暂无执行结果") return { "status": "success", "data": _last_result, "timestamp": time.time() } @app.get("/status") async def get_status(): """获取服务状态""" global _token_expiry_time token_valid = False if _token_expiry_time > 0: remaining_time = _token_expiry_time - time.time() token_valid = remaining_time > 0 return { "status": "running", "is_processing": _is_processing, "simulator_initialized": _simulator is not None, "has_last_result": _last_result is not None, "token_valid": token_valid, "token_expires_in": int(_token_expiry_time - time.time()) if _token_expiry_time > 0 else 0, "timestamp": time.time() } @app.get("/health") async def health_check(): """健康检查端点""" return { "status": "healthy", "timestamp": time.time(), "simulator_initialized": _simulator is not None, "token_valid": _token_expiry_time > time.time() if _token_expiry_time > 0 else False } if __name__ == "__main__": import uvicorn # 首次初始化 initialize_simulator() print("服务正在启动...") # 启动FastAPI服务 uvicorn.run( app, host="0.0.0.0", port=8005, log_level="info" )