You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
FactorSimulator/core/database.py

176 lines
5.9 KiB

# -*- coding: utf-8 -*-
"""数据库管理层"""
import psycopg2
from typing import List
from config.settings import settings
from models.entities import SimulationResult
class DatabaseManager:
"""数据库管理类"""
def __init__(self):
self.connection = None
self.init_database()
def create_database(self) -> None:
"""创建数据库(如果不存在)"""
try:
# 先连接到默认的postgres数据库来创建alpha数据库
conn = psycopg2.connect(
host=settings.DATABASE_CONFIG["host"],
port=settings.DATABASE_CONFIG["port"],
database="postgres",
user=settings.DATABASE_CONFIG["user"],
password=settings.DATABASE_CONFIG["password"]
)
conn.autocommit = True
cursor = conn.cursor()
# 检查数据库是否存在
cursor.execute("SELECT 1 FROM pg_catalog.pg_database WHERE datname = %s",
(settings.DATABASE_CONFIG["database"],))
exists = cursor.fetchone()
if not exists:
cursor.execute(f"CREATE DATABASE {settings.DATABASE_CONFIG['database']}")
print(f"数据库 {settings.DATABASE_CONFIG['database']} 创建成功")
else:
print(f"数据库 {settings.DATABASE_CONFIG['database']} 已存在")
cursor.close()
conn.close()
except Exception as e:
print(f"创建数据库时出错: {e}")
raise
def get_connection(self) -> psycopg2.extensions.connection:
"""获取数据库连接"""
if self.connection is None or self.connection.closed:
self.connection = psycopg2.connect(
host=settings.DATABASE_CONFIG["host"],
port=settings.DATABASE_CONFIG["port"],
database=settings.DATABASE_CONFIG["database"],
user=settings.DATABASE_CONFIG["user"],
password=settings.DATABASE_CONFIG["password"]
)
return self.connection
def init_database(self) -> None:
"""初始化数据库和表结构"""
# 先创建数据库
self.create_database()
# 然后连接到此数据库创建表
conn = self.get_connection()
cursor = conn.cursor()
# 创建 alpha_prepare 表
cursor.execute('''
CREATE TABLE IF NOT EXISTS alpha_prepare
(
id
SERIAL
PRIMARY
KEY,
alpha
TEXT
NOT
NULL
UNIQUE,
unused
BOOLEAN
NOT
NULL
DEFAULT
TRUE,
created_time
TIMESTAMP
DEFAULT
CURRENT_TIMESTAMP
)
''')
# 创建 simulation 表
cursor.execute('''
CREATE TABLE IF NOT EXISTS simulation
(
id
SERIAL
PRIMARY
KEY,
expression
TEXT
NOT
NULL,
time_consuming
REAL
NOT
NULL,
status
TEXT
NOT
NULL,
timestamp
TEXT
NOT
NULL,
alpha_id
TEXT,
message
TEXT,
created_time
TIMESTAMP
DEFAULT
CURRENT_TIMESTAMP
)
''')
conn.commit()
print(f"数据库 {settings.DATABASE_CONFIG['database']} 表结构初始化完成")
def get_unused_alpha(self) -> List[str]:
"""获取所有未使用的alpha表达式"""
conn = self.get_connection()
cursor = conn.cursor()
cursor.execute("SELECT alpha FROM alpha_prepare WHERE unused = TRUE")
results = cursor.fetchall()
alpha_list = [result[0] for result in results]
return alpha_list
def mark_alpha_used(self, alpha: str) -> None:
"""将alpha标记为已使用"""
conn = self.get_connection()
cursor = conn.cursor()
cursor.execute("UPDATE alpha_prepare SET unused = FALSE WHERE alpha = %s", (alpha,))
conn.commit()
def insert_simulation_result(self, result: SimulationResult) -> None:
"""插入模拟结果到simulation表"""
conn = self.get_connection()
cursor = conn.cursor()
cursor.execute('''
INSERT INTO simulation
(expression, time_consuming, status, timestamp, alpha_id, message)
VALUES (%s, %s, %s, %s, %s, %s)
''', (
result.expression,
result.time_consuming,
result.status,
result.timestamp,
result.alpha_id,
result.message or ""
))
conn.commit()
def close_connection(self) -> None:
"""关闭数据库连接"""
if self.connection and not self.connection.closed:
self.connection.close()