# -*- coding: utf-8 -*- """Alpha模拟器核心类""" import time import httpx from datetime import datetime from typing import List, Tuple from config.settings import settings from models.entities import SimulationResult, TokenInfo from core.database import DatabaseManager from services.auth_service import AuthService from services.alpha_service import AlphaService from services.notification_service import NotificationService class AlphaSimulator: """Alpha模拟器主类""" def __init__(self): self.db_manager = DatabaseManager() self.auth_service = AuthService() self.client = None self.token_info = None self.alpha_service = None def __del__(self): """析构函数,确保数据库连接被关闭""" if hasattr(self, 'db_manager'): self.db_manager.close_connection() def initialize(self) -> None: """初始化模拟器""" self.client = httpx.Client() self.login() def login(self) -> TokenInfo: """登录并初始化alpha服务""" self.token_info = self.auth_service.login(self.client) self.alpha_service = AlphaService(self.client) return self.token_info def needs_token_refresh(self) -> bool: """检查是否需要刷新token""" if not self.token_info: return True return self.token_info.expiry < settings.TOKEN_REFRESH_THRESHOLD def load_alpha_list(self) -> List[str]: """从数据库加载未使用的alpha表达式""" return self.db_manager.get_unused_alpha() def run_batch_simulation(self, alpha_list: List[str]) -> Tuple[int, int]: """运行批量模拟""" success_count = 0 fail_count = 0 for i in range(0, len(alpha_list), settings.BATCH_SIZE): batch = alpha_list[i:i + settings.BATCH_SIZE] print(f"\n开始处理第 {i // settings.BATCH_SIZE + 1} 批因子,共 {len(batch)} 个") for expression in batch: result = self._simulate_single_alpha(expression) if result.status == "ok": success_count += 1 else: fail_count += 1 print(f"第 {i // settings.BATCH_SIZE + 1} 批处理完成") self._print_summary(success_count, fail_count) return success_count, fail_count def _simulate_single_alpha(self, expression: str) -> SimulationResult: """模拟单个Alpha表达式""" print(f"\n模拟因子: {expression}") start_time = time.time() try: result = self.alpha_service.simulate_alpha(expression) end_time = time.time() time_consuming = round(end_time - start_time, 2) simulation_result = SimulationResult( expression=expression, time_consuming=time_consuming, status=result["status"], timestamp=datetime.now().strftime("%Y-%m-%d %H:%M:%S"), alpha_id=result.get("alpha_id"), message=result.get("message", "") ) if result["status"] == "ok": print(f"✅ 模拟成功 - Alpha ID: {result['alpha_id']} - 耗时: {time_consuming}秒") else: print(f"❌ 模拟失败 - {result.get('message', '未知错误')}") except Exception as e: end_time = time.time() time_consuming = round(end_time - start_time, 2) simulation_result = SimulationResult( expression=expression, time_consuming=time_consuming, status="err", message=str(e), timestamp=datetime.now().strftime("%Y-%m-%d %H:%M:%S") ) print(f"❌ 模拟异常 - {str(e)}") # 保存结果并标记为已使用 self._save_simulation_result(simulation_result) return simulation_result def _save_simulation_result(self, result: SimulationResult) -> None: """保存模拟结果到数据库""" self.db_manager.mark_alpha_used(result.expression) self.db_manager.insert_simulation_result(result) def _print_summary(self, success_count: int, fail_count: int) -> None: """打印总结信息""" now = datetime.now().strftime('%Y-%m-%d %H:%M:%S') print(f"\n总计: 成功 {success_count} 个, 失败 {fail_count} 个") print(f"完成时间: {now}") print(f"所有结果已保存到 PostgreSQL 数据库 {settings.DATABASE_CONFIG['database']} 的 simulation 表中") # 发送通知 NotificationService.send_to_gotify(success_count, fail_count)