# main.py import os import json import logging from pathlib import Path from typing import Dict, Any, List import asyncio import httpx import shutil import aiofiles from fastapi import FastAPI, HTTPException, BackgroundTasks from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import HTMLResponse, FileResponse from fastapi.staticfiles import StaticFiles from pydantic import BaseModel import uvicorn # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) # 常量定义 DOWNLOADS_DIR = "downloads" MAX_FILENAME_LENGTH = 100 INVALID_FILENAME_CHARS = '<>:"/\\|?*' MAX_CONCURRENT_DOWNLOADS = 5 DOWNLOAD_TIMEOUT = 30 # FastAPI应用 app = FastAPI(title="eh-v2") # 全局变量用于跟踪下载状态 download_status: Dict[str, Dict[str, Any]] = {} # 数据模型 class SaveDataRequest(BaseModel): url: str title: str all_images: Dict[str, str] total_images: int class GalleryInfo(BaseModel): title: str path: str total_images: int downloaded_images: int class DownloadStatusResponse(BaseModel): status: str message: str downloaded: int total: int current_progress: float # 工具函数 def setup_downloads_directory() -> Path: downloads_path = Path(DOWNLOADS_DIR) downloads_path.mkdir(exist_ok=True) return downloads_path def sanitize_filename(filename: str) -> str: sanitized = filename for char in INVALID_FILENAME_CHARS: sanitized = sanitized.replace(char, '_') if len(sanitized) > MAX_FILENAME_LENGTH: sanitized = sanitized[:MAX_FILENAME_LENGTH] return sanitized def create_title_directory(base_path: Path, title: str) -> Path: safe_title = sanitize_filename(title) title_dir = base_path / safe_title title_dir.mkdir(exist_ok=True) return title_dir async def save_data_to_file(file_path: Path, data: Dict[str, Any]) -> None: async with aiofiles.open(file_path, 'w', encoding='utf-8') as f: await f.write(json.dumps(data, ensure_ascii=False, indent=2)) def get_all_galleries() -> List[GalleryInfo]: galleries = [] downloads_path = Path(DOWNLOADS_DIR) if not downloads_path.exists(): return galleries for gallery_dir in downloads_path.iterdir(): if gallery_dir.is_dir(): data_file = gallery_dir / "data.json" if data_file.exists(): try: with open(data_file, 'r', encoding='utf-8') as f: data = json.load(f) downloaded_count = 0 if 'all_images' in data: # 获取目录下所有图片文件 image_files = list(gallery_dir.glob("*.*")) image_filenames = {file.stem for file in image_files if file.is_file() and file.name != "data.json"} # 检查JSON中每个图片是否有对应的实际文件(忽略后缀名) for filename in data['all_images'].keys(): # 移除可能的扩展名(如果有的话),只比较文件名主体 filename_stem = Path(filename).stem if filename_stem in image_filenames: downloaded_count += 1 # 只显示未完成的任务(下载进度不是100%的) if downloaded_count < data.get('total_images', 0): galleries.append(GalleryInfo( title=data.get('title', gallery_dir.name), path=str(gallery_dir), total_images=data.get('total_images', 0), downloaded_images=downloaded_count )) except Exception as e: logger.error(f"读取画廊数据失败 {gallery_dir}: {e}") return galleries async def download_single_image(client: httpx.AsyncClient, url: str, file_path: Path, semaphore: asyncio.Semaphore) -> bool: async with semaphore: try: # 先获取图片后缀 response = await client.get(url, timeout=DOWNLOAD_TIMEOUT) response.raise_for_status() import re match = re.search(r'img id="img" src="(.*?)"', response.text) if not match: return False real_img_url = match.group(1) suffix = real_img_url.split('.')[-1] # 创建带后缀的文件路径 file_path_with_suffix = file_path.with_suffix('.' + suffix) # 检查是否已存在(考虑所有可能的扩展名) if check_image_exists(file_path): return True img_response = await client.get(real_img_url, timeout=DOWNLOAD_TIMEOUT) img_response.raise_for_status() async with aiofiles.open(file_path_with_suffix, 'wb') as f: await f.write(img_response.content) return True except Exception as e: logger.error(f"下载失败 {url}: {e}") return False def check_image_exists(file_path: Path) -> bool: """检查图片文件是否存在(忽略扩展名)""" if file_path.exists(): return True # 检查是否有相同文件名但不同扩展名的文件 parent_dir = file_path.parent stem = file_path.stem # 常见的图片扩展名 image_extensions = {'.jpg', '.jpeg', '.png', '.gif', '.bmp', '.webp'} for ext in image_extensions: potential_file = parent_dir / f"{stem}{ext}" if potential_file.exists(): return True return False async def download_gallery_images(title: str) -> DownloadStatusResponse: safe_title = sanitize_filename(title) gallery_path = downloads_path / safe_title data_file = gallery_path / "data.json" if not data_file.exists(): return DownloadStatusResponse( status="error", message="画廊数据文件不存在", downloaded=0, total=0, current_progress=0.0 ) try: async with aiofiles.open(data_file, 'r', encoding='utf-8') as f: content = await f.read() data = json.loads(content) all_images = data.get('all_images', {}) total_images = len(all_images) if total_images == 0: return DownloadStatusResponse( status="error", message="没有可下载的图片", downloaded=0, total=0, current_progress=0.0 ) download_status[title] = { "downloaded": 0, "total": total_images, "status": "downloading" } logger.info(f"开始下载画廊 '{title}',共 {total_images} 张图片") semaphore = asyncio.Semaphore(MAX_CONCURRENT_DOWNLOADS) async with httpx.AsyncClient( headers={ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' }, follow_redirects=True ) as client: tasks = [] for filename, url in all_images.items(): image_path = gallery_path / filename # 使用新的检查方法,忽略扩展名 if check_image_exists(image_path): download_status[title]["downloaded"] += 1 continue task = download_single_image(client, url, image_path, semaphore) tasks.append(task) if tasks: results = await asyncio.gather(*tasks, return_exceptions=True) successful_downloads = sum(1 for result in results if result is True) download_status[title]["downloaded"] += successful_downloads downloaded_count = download_status[title]["downloaded"] progress = (downloaded_count / total_images) * 100 if downloaded_count == total_images: download_status[title]["status"] = "completed" message = f"下载完成!共下载 {downloaded_count}/{total_images} 张图片" else: download_status[title]["status"] = "partial" message = f"部分完成!下载 {downloaded_count}/{total_images} 张图片" return DownloadStatusResponse( status="success", message=message, downloaded=downloaded_count, total=total_images, current_progress=progress ) except Exception as e: logger.error(f"下载画廊 '{title}' 时发生错误: {e}") download_status[title] = { "status": "error", "message": str(e) } return DownloadStatusResponse( status="error", message=f"下载失败: {str(e)}", downloaded=0, total=0, current_progress=0.0 ) async def download_all_pending_galleries(): galleries = get_all_galleries() pending_galleries = [g for g in galleries if g.downloaded_images < g.total_images] logger.info(f"找到 {len(pending_galleries)} 个待下载画廊") if not pending_galleries: logger.info("没有待下载的画廊") return for gallery in pending_galleries: logger.info(f"开始下载画廊: {gallery.title}") result = await download_gallery_images(gallery.title) if result.status == "success": logger.info(f"画廊 '{gallery.title}' 下载完成: {result.message}") else: logger.error(f"画廊 '{gallery.title}' 下载失败: {result.message}") await asyncio.sleep(1) logger.info("批量下载任务完成") def delete_completed_json_files(): """删除已完成任务的JSON文件""" downloads_path = Path(DOWNLOADS_DIR) deleted_count = 0 if not downloads_path.exists(): return deleted_count for gallery_dir in downloads_path.iterdir(): if gallery_dir.is_dir(): data_file = gallery_dir / "data.json" if data_file.exists(): try: with open(data_file, 'r', encoding='utf-8') as f: data = json.load(f) # 检查是否所有图片都已下载 downloaded_count = 0 if 'all_images' in data: image_files = list(gallery_dir.glob("*.*")) image_filenames = {file.stem for file in image_files if file.is_file() and file.name != "data.json"} for filename in data['all_images'].keys(): filename_stem = Path(filename).stem if filename_stem in image_filenames: downloaded_count += 1 total_images = len(data.get('all_images', {})) # 如果所有图片都已下载,删除JSON文件 if downloaded_count == total_images and total_images > 0: data_file.unlink() deleted_count += 1 logger.info(f"已删除已完成任务的JSON文件: {gallery_dir.name}") except Exception as e: logger.error(f"处理画廊目录失败 {gallery_dir}: {e}") return deleted_count # 初始化 downloads_path = setup_downloads_directory() # API路由 @app.post("/save_url") @app.options("/save_url") async def save_url_data(request: SaveDataRequest = None): if not request: return {"status": "ok"} try: title_dir = create_title_directory(downloads_path, request.title) data_file = title_dir / "data.json" await save_data_to_file(data_file, { "url": request.url, "title": request.title, "all_images": request.all_images, "total_images": request.total_images }) logger.info(f"成功保存数据: {request.title}") return { "status": "success", "message": f"数据保存成功,共 {request.total_images} 张图片", "path": str(title_dir) } except Exception as e: logger.error(f"保存数据失败: {e}") raise HTTPException(status_code=500, detail=f"保存失败: {str(e)}") @app.get("/") async def read_gallery_manager(): return FileResponse("index.html") @app.get("/api/galleries") async def get_galleries(): galleries = get_all_galleries() return galleries @app.post("/api/download/all") async def download_all_galleries(background_tasks: BackgroundTasks): background_tasks.add_task(download_all_pending_galleries) return { "status": "success", "message": "开始批量下载所有未完成的画廊" } @app.post("/api/download/{title}") async def download_gallery(title: str, background_tasks: BackgroundTasks): background_tasks.add_task(download_gallery_images, title) return { "status": "success", "message": f"开始下载画廊: {title}", "title": title } @app.post("/api/cleanup") async def cleanup_completed_galleries(): """清理已完成任务的JSON文件""" try: deleted_count = delete_completed_json_files() return { "status": "success", "message": f"成功删除 {deleted_count} 个已完成任务的JSON文件", "deleted_count": deleted_count } except Exception as e: logger.error(f"清理JSON文件失败: {e}") raise HTTPException(status_code=500, detail=f"清理失败: {str(e)}") @app.get("/health") async def health_check(): return {"status": "healthy"} if __name__ == "__main__": uvicorn.run( "main:app", host="0.0.0.0", port=5100, reload=True )