You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
FactorSimulator/main.py

173 lines
4.2 KiB

# -*- coding: utf-8 -*-
"""主程序入口 - FastAPI版本"""
import time
from fastapi import FastAPI, HTTPException
from core.simulator import AlphaSimulator
# 创建FastAPI应用
app = FastAPI(
title="Alpha因子模拟器API",
description="通过HTTP接口触发的因子模拟器",
version="1.0.0"
)
# 全局变量
_simulator = None
_last_result = None # 存储上次执行结果
_is_processing = False # 是否正在处理
def initialize_simulator():
"""初始化模拟器"""
global _simulator
if _simulator is None:
_simulator = AlphaSimulator()
_simulator.initialize()
print("模拟器初始化完成")
return _simulator
@app.get("/")
async def root():
"""根路径,返回服务信息"""
return {
"service": "Alpha因子模拟器",
"version": "1.0.0",
"status": "running",
"endpoints": {
"run_simulation": "POST /run - 触发一次模拟执行",
"last_result": "GET /result - 获取上次执行结果",
"status": "GET /status - 查看服务状态",
"health": "GET /health - 健康检查"
}
}
@app.post("/run")
async def run_simulation():
"""
触发一次模拟执行
执行顺序:
1. 刷新token
2. 加载alpha列表
3. 执行批量模拟
"""
global _is_processing, _last_result
# 检查是否正在处理
if _is_processing:
return {
"status": "busy",
"message": "已有模拟任务正在执行中,请稍后再试",
"timestamp": time.time()
}
try:
_is_processing = True
# 初始化模拟器
simulator = initialize_simulator()
# 1. 刷新token
print("刷新token...")
simulator.login()
# 2. 加载alpha列表
print("加载alpha列表...")
alpha_list = simulator.load_alpha_list()
if not alpha_list:
result = {
"status": "no_data",
"message": "数据库中没有待处理的alpha表达式",
"timestamp": time.time()
}
_last_result = result
return result
# 3. 执行批量模拟
print(f"共加载 {len(alpha_list)} 个需要模拟的因子表达式")
success_count, fail_count = simulator.run_batch_simulation(alpha_list)
result_message = f"处理完成: 成功 {success_count} 个, 失败 {fail_count}"
print(result_message)
result = {
"status": "completed",
"message": result_message,
"data": {
"alpha_count": len(alpha_list),
"success_count": success_count,
"fail_count": fail_count
},
"timestamp": time.time()
}
_last_result = result
return result
except Exception as e:
error_msg = f"模拟执行失败: {str(e)}"
print(error_msg)
result = {
"status": "error",
"message": error_msg,
"timestamp": time.time()
}
_last_result = result
return result
finally:
_is_processing = False
@app.get("/result")
async def get_last_result():
"""获取上次执行的结果"""
if _last_result is None:
raise HTTPException(status_code=404, detail="暂无执行结果")
return {
"status": "success",
"data": _last_result,
"timestamp": time.time()
}
@app.get("/status")
async def get_status():
"""获取服务状态"""
return {
"is_processing": _is_processing,
"simulator_initialized": _simulator is not None,
"has_last_result": _last_result is not None,
"timestamp": time.time()
}
@app.get("/health")
async def health_check():
"""健康检查端点"""
return {
"status": "healthy",
"timestamp": time.time(),
"simulator_initialized": _simulator is not None
}
if __name__ == "__main__":
import uvicorn
# 首次初始化
initialize_simulator()
print("服务正在启动...")
# 启动FastAPI服务
uvicorn.run(
app,
host="0.0.0.0",
port=8000,
log_level="info"
)