#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 异步批量下载 EH 画廊真实图片 python download_images.py """ from __future__ import annotations import asyncio import json import logging import os import re import sys from pathlib import Path from typing import Dict, List import aiofiles import httpx from pathlib import Path from tqdm.asyncio import tqdm_asyncio # -------------------- 可配置常量 -------------------- from config import config CONCURRENCY = config.concurrency RETRY_PER_IMG = config.retry_per_image TIMEOUT = httpx.Timeout(config.image_timeout) FAILED_RECORD = "data/failed_downloads.json" LOG_LEVEL = getattr(logging, config.log_level.upper()) # ---------------------------------------------------- # 确保数据目录存在 if not os.path.exists("data"): os.mkdir("data") # 使用统一的日志配置 from logger import get_logger from realtime_logger import realtime_logger log = get_logger("step2", "download.log") # 预编译正则 IMG_URL_RE = re.compile(r' List[Dict[str, str]]: if Path(FAILED_RECORD).exists(): try: return json.loads(Path(FAILED_RECORD).read_text(encoding="utf-8")) except Exception as exc: log.warning(f"加载失败记录失败 -> {exc}") return [] def save_failed(failed: List[Dict[str, str]]) -> None: Path(FAILED_RECORD).write_text(json.dumps(failed, ensure_ascii=False, indent=2), encoding="utf-8") # -------------------- 下载核心 -------------------- async def download_one( client: httpx.AsyncClient, sem: asyncio.Semaphore, item: Dict[str, str] ) -> bool: """下载单张图,成功返回 True""" img_path, img_url = Path(item["img_path"]), item["img_url"] await sem.acquire() try: for attempt in range(1, RETRY_PER_IMG + 1): try: # 1. 获取详情页 resp = await client.get(img_url) resp.raise_for_status() real_url_match = IMG_URL_RE.search(resp.text) if not real_url_match: log.warning(f"未解析到真实图片链接: {img_url}") return False # <- 这里不会触发 await real_url = real_url_match.group(1) # 2. 下载真实图片(流式) ext_match = EXT_RE.search(real_url) ext = ext_match.group(1).lower() if ext_match else "jpg" final_path = img_path.with_suffix(f".{ext}") if final_path.exists(): log.info(f"已存在,跳过: {final_path.name}") return True async with client.stream("GET", real_url) as img_resp: img_resp.raise_for_status() final_path.parent.mkdir(parents=True, exist_ok=True) async with aiofiles.open(final_path, "wb") as fp: async for chunk in img_resp.aiter_bytes(chunk_size=65536): await fp.write(chunk) # log.info(f"[OK] {final_path.name}") # 发送实时日志 try: realtime_logger.broadcast_log_sync(f"下载完成: {final_path.name}", "SUCCESS", "step2") except Exception as e: log.warning(f"发送实时日志失败: {e}") return True except httpx.HTTPStatusError as exc: if exc.response.status_code == 429: wait = 2 ** (attempt - 1) log.warning(f"[429] 等待 {wait}s 后重试({attempt}/{RETRY_PER_IMG})") await asyncio.sleep(wait) else: log.error(f"[HTTP {exc.response.status_code}] {img_url}") break except Exception as exc: log.error(f"[ERROR] {img_url} -> {exc} ({attempt}/{RETRY_PER_IMG})") await asyncio.sleep(1) return False finally: sem.release() # -------------------- 扫描待下载列表 -------------------- async def scan_tasks() -> List[Dict[str, str]]: """扫描 downloads/ 下所有 json,返回待下载列表""" result = [] root = Path("data/downloads") if not root.exists(): return result for json_path in root.rglob("*.json"): folder = json_path.parent try: data: Dict[str, str] = json.loads(json_path.read_text(encoding="utf-8")) except Exception as exc: log.warning(f"读取 json 失败 {json_path} -> {exc}") continue for img_name, img_url in data.items(): img_path = folder / img_name # 无后缀 # 判断任意后缀是否存在 exists = False for ext in (".jpg", ".jpeg", ".png", ".gif", ".webp"): if img_path.with_suffix(ext).exists(): exists = True break if not exists: result.append({"img_path": str(img_path), "img_url": img_url}) return result # -------------------- 主流程 -------------------- async def main(proxy: str | None = None) -> None: # 1. 优先重试上次失败 failed_tasks = load_failed() if failed_tasks: log.info(f"优先重试上次失败任务: {len(failed_tasks)} 张") tasks = failed_tasks + await scan_tasks() if not tasks: log.info("没有需要下载的图片,收工!") return limits = httpx.Limits(max_keepalive_connections=20, max_connections=50) async with httpx.AsyncClient(limits=limits, timeout=TIMEOUT, proxies=proxy, verify=True) as client: sem = asyncio.Semaphore(CONCURRENCY) results = await tqdm_asyncio.gather( *[download_one(client, sem, t) for t in tasks], desc="Downloading", total=len(tasks), ) # 统计 & 持久化新失败 failed_again = [t for t, ok in zip(tasks, results) if not ok] if failed_again: save_failed(failed_again) log.warning(f"本轮仍有 {len(failed_again)} 张下载失败,已写入 {FAILED_RECORD}") else: Path(FAILED_RECORD).unlink(missing_ok=True) log.info("全部下载完成!") if __name__ == "__main__": try: asyncio.run(main()) except KeyboardInterrupt: log.info("用户中断,下载结束")