# -*- coding: utf-8 -*- """数据库管理层""" import psycopg2 from typing import List from config.settings import settings from models.entities import SimulationResult class DatabaseManager: """数据库管理类""" def __init__(self): self.connection = None self.init_database() def create_database(self) -> None: """创建数据库(如果不存在)""" try: # 先连接到默认的postgres数据库来创建alpha数据库 conn = psycopg2.connect( host=settings.DATABASE_CONFIG["host"], port=settings.DATABASE_CONFIG["port"], database="postgres", user=settings.DATABASE_CONFIG["user"], password=settings.DATABASE_CONFIG["password"] ) conn.autocommit = True cursor = conn.cursor() # 检查数据库是否存在 cursor.execute("SELECT 1 FROM pg_catalog.pg_database WHERE datname = %s", (settings.DATABASE_CONFIG["database"],)) exists = cursor.fetchone() if not exists: cursor.execute(f"CREATE DATABASE {settings.DATABASE_CONFIG['database']}") print(f"数据库 {settings.DATABASE_CONFIG['database']} 创建成功") else: print(f"数据库 {settings.DATABASE_CONFIG['database']} 已存在") cursor.close() conn.close() except Exception as e: print(f"创建数据库时出错: {e}") raise def get_connection(self) -> psycopg2.extensions.connection: """获取数据库连接""" if self.connection is None or self.connection.closed: self.connection = psycopg2.connect( host=settings.DATABASE_CONFIG["host"], port=settings.DATABASE_CONFIG["port"], database=settings.DATABASE_CONFIG["database"], user=settings.DATABASE_CONFIG["user"], password=settings.DATABASE_CONFIG["password"] ) return self.connection def init_database(self) -> None: """初始化数据库和表结构""" # 先创建数据库 self.create_database() # 然后连接到此数据库创建表 conn = self.get_connection() cursor = conn.cursor() # 创建 alpha_prepare 表 cursor.execute(''' CREATE TABLE IF NOT EXISTS alpha_prepare ( id SERIAL PRIMARY KEY, alpha TEXT NOT NULL UNIQUE, unused BOOLEAN NOT NULL DEFAULT TRUE, created_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ''') # 创建 simulation 表 cursor.execute(''' CREATE TABLE IF NOT EXISTS simulation ( id SERIAL PRIMARY KEY, expression TEXT NOT NULL, time_consuming REAL NOT NULL, status TEXT NOT NULL, timestamp TEXT NOT NULL, alpha_id TEXT, message TEXT, created_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ''') conn.commit() print(f"数据库 {settings.DATABASE_CONFIG['database']} 表结构初始化完成") def get_unused_alpha(self) -> List[str]: """获取所有未使用的alpha表达式""" conn = self.get_connection() cursor = conn.cursor() cursor.execute("SELECT alpha FROM alpha_prepare WHERE unused = TRUE") results = cursor.fetchall() alpha_list = [result[0] for result in results] return alpha_list def mark_alpha_used(self, alpha: str) -> None: """将alpha标记为已使用""" conn = self.get_connection() cursor = conn.cursor() cursor.execute("UPDATE alpha_prepare SET unused = FALSE WHERE alpha = %s", (alpha,)) conn.commit() def insert_simulation_result(self, result: SimulationResult) -> None: """插入模拟结果到simulation表""" conn = self.get_connection() cursor = conn.cursor() cursor.execute(''' INSERT INTO simulation (expression, time_consuming, status, timestamp, alpha_id, message) VALUES (%s, %s, %s, %s, %s, %s) ''', ( result.expression, result.time_consuming, result.status, result.timestamp, result.alpha_id, result.message or "" )) conn.commit() def close_connection(self) -> None: """关闭数据库连接""" if self.connection and not self.connection.closed: self.connection.close()