# main.py import os import json import logging from pathlib import Path from typing import Dict, Any, List import asyncio import httpx import aiofiles from fastapi import FastAPI, HTTPException, BackgroundTasks from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import HTMLResponse from fastapi.staticfiles import StaticFiles from pydantic import BaseModel import uvicorn # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) # 常量定义 DOWNLOADS_DIR = "downloads" MAX_FILENAME_LENGTH = 100 INVALID_FILENAME_CHARS = '<>:"/\\|?*' MAX_CONCURRENT_DOWNLOADS = 5 DOWNLOAD_TIMEOUT = 30 # FastAPI应用 app = FastAPI(title="eh-v2") # 全局变量用于跟踪下载状态 download_status: Dict[str, Dict[str, Any]] = {} # 数据模型 class SaveDataRequest(BaseModel): url: str title: str all_images: Dict[str, str] total_images: int class GalleryInfo(BaseModel): title: str path: str total_images: int downloaded_images: int class DownloadStatusResponse(BaseModel): status: str message: str downloaded: int total: int current_progress: float # 工具函数 def setup_downloads_directory() -> Path: downloads_path = Path(DOWNLOADS_DIR) downloads_path.mkdir(exist_ok=True) return downloads_path def sanitize_filename(filename: str) -> str: sanitized = filename for char in INVALID_FILENAME_CHARS: sanitized = sanitized.replace(char, '_') if len(sanitized) > MAX_FILENAME_LENGTH: sanitized = sanitized[:MAX_FILENAME_LENGTH] return sanitized def create_title_directory(base_path: Path, title: str) -> Path: safe_title = sanitize_filename(title) title_dir = base_path / safe_title title_dir.mkdir(exist_ok=True) return title_dir async def save_data_to_file(file_path: Path, data: Dict[str, Any]) -> None: async with aiofiles.open(file_path, 'w', encoding='utf-8') as f: await f.write(json.dumps(data, ensure_ascii=False, indent=2)) def get_all_galleries() -> List[GalleryInfo]: galleries = [] downloads_path = Path(DOWNLOADS_DIR) if not downloads_path.exists(): return galleries for gallery_dir in downloads_path.iterdir(): if gallery_dir.is_dir(): data_file = gallery_dir / "data.json" if data_file.exists(): try: with open(data_file, 'r', encoding='utf-8') as f: data = json.load(f) downloaded_count = 0 if 'all_images' in data: for filename, url in data['all_images'].items(): image_path = gallery_dir / filename if image_path.exists(): downloaded_count += 1 galleries.append(GalleryInfo( title=data.get('title', gallery_dir.name), path=str(gallery_dir), total_images=data.get('total_images', 0), downloaded_images=downloaded_count )) except Exception as e: logger.error(f"读取画廊数据失败 {gallery_dir}: {e}") return galleries async def download_single_image(client: httpx.AsyncClient, url: str, file_path: Path, semaphore: asyncio.Semaphore) -> bool: async with semaphore: try: # 先获取图片后缀 response = await client.get(url, timeout=DOWNLOAD_TIMEOUT) response.raise_for_status() import re match = re.search(r'img id="img" src="(.*?)"', response.text) if not match: return False real_img_url = match.group(1) suffix = real_img_url.split('.')[-1] # 创建带后缀的文件路径 file_path_with_suffix = file_path.with_suffix('.' + suffix) if file_path_with_suffix.exists(): return True img_response = await client.get(real_img_url, timeout=DOWNLOAD_TIMEOUT) img_response.raise_for_status() async with aiofiles.open(file_path_with_suffix, 'wb') as f: await f.write(img_response.content) return True except Exception as e: logger.error(f"下载失败 {url}: {e}") return False async def download_gallery_images(title: str) -> DownloadStatusResponse: safe_title = sanitize_filename(title) gallery_path = downloads_path / safe_title data_file = gallery_path / "data.json" if not data_file.exists(): return DownloadStatusResponse( status="error", message="画廊数据文件不存在", downloaded=0, total=0, current_progress=0.0 ) try: async with aiofiles.open(data_file, 'r', encoding='utf-8') as f: content = await f.read() data = json.loads(content) all_images = data.get('all_images', {}) total_images = len(all_images) if total_images == 0: return DownloadStatusResponse( status="error", message="没有可下载的图片", downloaded=0, total=0, current_progress=0.0 ) download_status[title] = { "downloaded": 0, "total": total_images, "status": "downloading" } logger.info(f"开始下载画廊 '{title}',共 {total_images} 张图片") semaphore = asyncio.Semaphore(MAX_CONCURRENT_DOWNLOADS) async with httpx.AsyncClient( headers={ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' }, follow_redirects=True ) as client: tasks = [] for filename, url in all_images.items(): image_path = gallery_path / filename if image_path.exists(): download_status[title]["downloaded"] += 1 continue task = download_single_image(client, url, image_path, semaphore) tasks.append(task) if tasks: results = await asyncio.gather(*tasks, return_exceptions=True) successful_downloads = sum(1 for result in results if result is True) download_status[title]["downloaded"] += successful_downloads downloaded_count = download_status[title]["downloaded"] progress = (downloaded_count / total_images) * 100 if downloaded_count == total_images: download_status[title]["status"] = "completed" message = f"下载完成!共下载 {downloaded_count}/{total_images} 张图片" else: download_status[title]["status"] = "partial" message = f"部分完成!下载 {downloaded_count}/{total_images} 张图片" return DownloadStatusResponse( status="success", message=message, downloaded=downloaded_count, total=total_images, current_progress=progress ) except Exception as e: logger.error(f"下载画廊 '{title}' 时发生错误: {e}") download_status[title] = { "status": "error", "message": str(e) } return DownloadStatusResponse( status="error", message=f"下载失败: {str(e)}", downloaded=0, total=0, current_progress=0.0 ) async def download_all_pending_galleries(): galleries = get_all_galleries() pending_galleries = [g for g in galleries if g.downloaded_images < g.total_images] logger.info(f"找到 {len(pending_galleries)} 个待下载画廊") if not pending_galleries: logger.info("没有待下载的画廊") return for gallery in pending_galleries: logger.info(f"开始下载画廊: {gallery.title}") result = await download_gallery_images(gallery.title) if result.status == "success": logger.info(f"画廊 '{gallery.title}' 下载完成: {result.message}") else: logger.error(f"画廊 '{gallery.title}' 下载失败: {result.message}") await asyncio.sleep(1) logger.info("批量下载任务完成") # 初始化 downloads_path = setup_downloads_directory() # API路由 @app.post("/save_url") @app.options("/save_url") async def save_url_data(request: SaveDataRequest = None): if not request: return {"status": "ok"} try: title_dir = create_title_directory(downloads_path, request.title) data_file = title_dir / "data.json" await save_data_to_file(data_file, { "url": request.url, "title": request.title, "all_images": request.all_images, "total_images": request.total_images }) logger.info(f"成功保存数据: {request.title}") return { "status": "success", "message": f"数据保存成功,共 {request.total_images} 张图片", "path": str(title_dir) } except Exception as e: logger.error(f"保存数据失败: {e}") raise HTTPException(status_code=500, detail=f"保存失败: {str(e)}") @app.get("/", response_class=HTMLResponse) async def read_gallery_manager(): return """
管理您的画廊下载任务
点击"读取文件夹"按钮加载数据