# -*- coding: utf-8 -*- """认证服务""" import os import httpx from httpx import BasicAuth from typing import Tuple, Dict, Any from config.settings import settings from models.entities import TokenInfo class AuthService: """认证服务类""" def __init__(self): self.credentials_file = settings.credentials_file def load_credentials(self) -> Tuple[str, str]: """加载凭证""" if not os.path.exists(self.credentials_file): self._create_credentials_file() with open(self.credentials_file, 'r', encoding='utf-8') as f: credentials = eval(f.read()) return credentials[0], credentials[1] def _create_credentials_file(self) -> None: """创建凭证文件""" print("未找到 account.txt 文件") with open(self.credentials_file, 'w', encoding='utf-8') as f: f.write("") print("account.txt 文件已创建,请填写账号密码, 格式: ['username', 'password']") exit(1) def login(self, client: httpx.Client) -> TokenInfo: """登录并获取token""" username, password = self.load_credentials() # 设置 BasicAuth client.auth = BasicAuth(username, password) response = client.post(f'{settings.BRAIN_API_URL}/authentication') print(f"登录状态: {response.status_code}") if response.status_code == 201: login_data = response.json() print(f"登录成功!: {login_data}") return TokenInfo( token=login_data['token'], expiry=int(login_data['token']['expiry']) ) elif response.status_code == 429: print("API rate limit exceeded") exit(1) else: print(f"登录失败: {response.json()}") raise Exception(f"登录失败: {response.json()}")