# main.py import os import json import logging from pathlib import Path from typing import Dict, Any, List import aiofiles from fastapi import FastAPI, HTTPException from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import HTMLResponse from fastapi.staticfiles import StaticFiles from pydantic import BaseModel import uvicorn # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) # 常量定义 DOWNLOADS_DIR = "downloads" MAX_FILENAME_LENGTH = 100 INVALID_FILENAME_CHARS = '<>:"/\\|?*' # FastAPI应用 app = FastAPI(title="eh-v2") # 数据模型 class SaveDataRequest(BaseModel): url: str title: str all_images: Dict[str, str] total_images: int class GalleryInfo(BaseModel): title: str path: str total_images: int downloaded_images: int # 工具函数 def setup_downloads_directory() -> Path: """创建并返回下载目录路径""" downloads_path = Path(DOWNLOADS_DIR) downloads_path.mkdir(exist_ok=True) logger.info(f"下载目录已准备: {downloads_path.absolute()}") return downloads_path def sanitize_filename(filename: str) -> str: """清理文件名,移除非法字符并限制长度""" sanitized = filename for char in INVALID_FILENAME_CHARS: sanitized = sanitized.replace(char, '_') # 限制文件名长度 if len(sanitized) > MAX_FILENAME_LENGTH: sanitized = sanitized[:MAX_FILENAME_LENGTH] return sanitized def create_title_directory(base_path: Path, title: str) -> Path: """创建标题对应的目录""" safe_title = sanitize_filename(title) title_dir = base_path / safe_title title_dir.mkdir(exist_ok=True) logger.info(f"创建标题目录: {title_dir}") return title_dir async def save_data_to_file(file_path: Path, data: Dict[str, Any]) -> None: """异步保存数据到JSON文件""" async with aiofiles.open(file_path, 'w', encoding='utf-8') as f: await f.write(json.dumps(data, ensure_ascii=False, indent=2)) def get_all_galleries() -> List[GalleryInfo]: """获取所有画廊信息""" galleries = [] downloads_path = Path(DOWNLOADS_DIR) if not downloads_path.exists(): return galleries for gallery_dir in downloads_path.iterdir(): if gallery_dir.is_dir(): data_file = gallery_dir / "data.json" if data_file.exists(): try: with open(data_file, 'r', encoding='utf-8') as f: data = json.load(f) # 计算已下载的图片数量 downloaded_count = 0 if 'all_images' in data: for filename, url in data['all_images'].items(): image_path = gallery_dir / filename if image_path.exists(): downloaded_count += 1 galleries.append(GalleryInfo( title=data.get('title', gallery_dir.name), path=str(gallery_dir), total_images=data.get('total_images', 0), downloaded_images=downloaded_count )) except Exception as e: logger.error(f"读取画廊数据失败 {gallery_dir}: {e}") return galleries # 初始化 downloads_path = setup_downloads_directory() # API路由 @app.post("/save_url") async def save_url(data: SaveDataRequest): """保存URL数据到文件系统""" try: logger.info("收到保存数据请求") logger.info(f"标题: {data.title}, URL: {data.url}, 图片数量: {data.total_images}") # 创建标题目录 title_dir = create_title_directory(downloads_path, data.title) # 数据文件路径 data_file = title_dir / "data.json" # 异步保存数据 await save_data_to_file(data_file, data.dict()) logger.info(f"数据已保存到: {data_file}") return { "status": "success", "message": "数据保存成功", "file_path": str(data_file), "title": data.title, "total_images": data.total_images } except Exception as e: error_msg = f"保存数据时出错: {str(e)}" logger.error(error_msg) logger.exception("详细错误信息:") raise HTTPException(status_code=500, detail=error_msg) @app.get("/", response_class=HTMLResponse) async def read_gallery_manager(): """画廊管理页面""" return """
管理您的画廊下载任务
点击"读取文件夹"按钮加载数据