You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
FactorSimulator/core/simulator.py

126 lines
4.6 KiB

# -*- coding: utf-8 -*-
"""Alpha模拟器核心类"""
import time
import httpx
from datetime import datetime
from typing import List, Tuple
from config.settings import settings
from models.entities import SimulationResult, TokenInfo
from core.database import DatabaseManager
from services.auth_service import AuthService
from services.alpha_service import AlphaService
from services.notification_service import NotificationService
class AlphaSimulator:
"""Alpha模拟器主类"""
def __init__(self):
self.db_manager = DatabaseManager()
self.auth_service = AuthService()
self.client = None
self.token_info = None
self.alpha_service = None
def __del__(self):
"""析构函数,确保数据库连接被关闭"""
if hasattr(self, 'db_manager'):
self.db_manager.close_connection()
def initialize(self) -> None:
"""初始化模拟器"""
self.client = httpx.Client()
self.login()
def login(self) -> TokenInfo:
"""登录并初始化alpha服务"""
self.token_info = self.auth_service.login(self.client)
self.alpha_service = AlphaService(self.client)
return self.token_info
def needs_token_refresh(self) -> bool:
"""检查是否需要刷新token"""
if not self.token_info:
return True
return self.token_info.expiry < settings.TOKEN_REFRESH_THRESHOLD
def load_alpha_list(self) -> List[str]:
"""从数据库加载未使用的alpha表达式"""
return self.db_manager.get_unused_alpha()
def run_batch_simulation(self, alpha_list: List[str]) -> Tuple[int, int]:
"""运行批量模拟"""
success_count = 0
fail_count = 0
for i in range(0, len(alpha_list), settings.BATCH_SIZE):
batch = alpha_list[i:i + settings.BATCH_SIZE]
print(f"\n开始处理第 {i // settings.BATCH_SIZE + 1} 批因子,共 {len(batch)}")
for expression in batch:
result = self._simulate_single_alpha(expression)
if result.status == "ok":
success_count += 1
else:
fail_count += 1
print(f"{i // settings.BATCH_SIZE + 1} 批处理完成")
self._print_summary(success_count, fail_count)
return success_count, fail_count
def _simulate_single_alpha(self, expression: str) -> SimulationResult:
"""模拟单个Alpha表达式"""
print(f"\n模拟因子: {expression}")
start_time = time.time()
try:
result = self.alpha_service.simulate_alpha(expression)
end_time = time.time()
time_consuming = round(end_time - start_time, 2)
simulation_result = SimulationResult(
expression=expression,
time_consuming=time_consuming,
status=result["status"],
timestamp=datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
alpha_id=result.get("alpha_id"),
message=result.get("message", "")
)
if result["status"] == "ok":
print(f"✅ 模拟成功 - Alpha ID: {result['alpha_id']} - 耗时: {time_consuming}")
else:
print(f"❌ 模拟失败 - {result.get('message', '未知错误')}")
except Exception as e:
end_time = time.time()
time_consuming = round(end_time - start_time, 2)
simulation_result = SimulationResult(
expression=expression,
time_consuming=time_consuming,
status="err",
message=str(e),
timestamp=datetime.now().strftime("%Y-%m-%d %H:%M:%S")
)
print(f"❌ 模拟异常 - {str(e)}")
# 保存结果并标记为已使用
self._save_simulation_result(simulation_result)
return simulation_result
def _save_simulation_result(self, result: SimulationResult) -> None:
"""保存模拟结果到数据库"""
self.db_manager.mark_alpha_used(result.expression)
self.db_manager.insert_simulation_result(result)
def _print_summary(self, success_count: int, fail_count: int) -> None:
"""打印总结信息"""
now = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print(f"\n总计: 成功 {success_count} 个, 失败 {fail_count}")
print(f"完成时间: {now}")
print(f"所有结果已保存到 PostgreSQL 数据库 {settings.DATABASE_CONFIG['database']} 的 simulation 表中")
# 发送通知
NotificationService.send_to_gotify(success_count, fail_count)