#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 异步批量下载 EH 画廊真实图片 python download_images.py """ from __future__ import annotations import asyncio import json import logging import re import sys from pathlib import Path from typing import Dict, List, Optional import aiofiles import httpx from aiopath import AsyncPath from tqdm.asyncio import tqdm_asyncio # -------------------- 可配置常量 -------------------- CONCURRENCY = 20 # 并发下载数 RETRY_PER_IMG = 3 # 单图重试 TIMEOUT = httpx.Timeout(15.0) # 请求超时 PROXY = "http://127.0.0.1:7890" # 科学上网代理,不需要留空 FAILED_RECORD = "failed_downloads.json" LOG_LEVEL = logging.INFO # ---------------------------------------------------- logging.basicConfig( level=LOG_LEVEL, format="[%(asctime)s] [%(levelname)s] %(message)s", handlers=[ logging.StreamHandler(sys.stdout), logging.FileHandler("download.log", encoding="utf-8"), ], ) log = logging.getLogger("img_downloader") # 预编译正则 IMG_URL_RE = re.compile(r' List[Dict[str, str]]: if Path(FAILED_RECORD).exists(): try: return json.loads(Path(FAILED_RECORD).read_text(encoding="utf-8")) except Exception as exc: log.warning(f"加载失败记录失败 -> {exc}") return [] def save_failed(failed: List[Dict[str, str]]) -> None: Path(FAILED_RECORD).write_text(json.dumps(failed, ensure_ascii=False, indent=2), encoding="utf-8") # -------------------- 下载核心 -------------------- async def download_one( client: httpx.AsyncClient, sem: asyncio.Semaphore, item: Dict[str, str] ) -> bool: """下载单张图,成功返回 True""" img_path, img_url = Path(item["img_path"]), item["img_url"] await sem.acquire() try: for attempt in range(1, RETRY_PER_IMG + 1): try: # 1. 获取详情页 resp = await client.get(img_url) resp.raise_for_status() real_url_match = IMG_URL_RE.search(resp.text) if not real_url_match: log.warning(f"未解析到真实图片链接: {img_url}") return False # <- 这里不会触发 await real_url = real_url_match.group(1) # 2. 下载真实图片(流式) ext_match = EXT_RE.search(real_url) ext = ext_match.group(1).lower() if ext_match else "jpg" final_path = img_path.with_suffix(f".{ext}") if await AsyncPath(final_path).exists(): log.info(f"已存在,跳过: {final_path.name}") return True async with client.stream("GET", real_url) as img_resp: img_resp.raise_for_status() await AsyncPath(final_path).parent.mkdir(parents=True, exist_ok=True) async with aiofiles.open(final_path, "wb") as fp: async for chunk in img_resp.aiter_bytes(chunk_size=65536): await fp.write(chunk) log.info(f"[OK] {final_path.name}") return True except httpx.HTTPStatusError as exc: if exc.response.status_code == 429: wait = 2 ** (attempt - 1) log.warning(f"[429] 等待 {wait}s 后重试({attempt}/{RETRY_PER_IMG})") await asyncio.sleep(wait) else: log.error(f"[HTTP {exc.response.status_code}] {img_url}") break except Exception as exc: log.error(f"[ERROR] {img_url} -> {exc} ({attempt}/{RETRY_PER_IMG})") await asyncio.sleep(1) return False finally: sem.release() # -------------------- 扫描待下载列表 -------------------- async def scan_tasks() -> List[Dict[str, str]]: """扫描 downloads/ 下所有 json,返回待下载列表""" result = [] root = AsyncPath("downloads") if not await root.exists(): return result async for json_path in root.rglob("*.json"): folder = json_path.parent try: data: Dict[str, str] = json.loads(await json_path.read_text(encoding="utf-8")) except Exception as exc: log.warning(f"读取 json 失败 {json_path} -> {exc}") continue for img_name, img_url in data.items(): img_path = folder / img_name # 无后缀 # 异步判断任意后缀是否存在 exists = False for ext in (".jpg", ".jpeg", ".png", ".gif", ".webp"): if await img_path.with_suffix(ext).exists(): exists = True break if not exists: result.append({"img_path": str(img_path), "img_url": img_url}) return result # -------------------- 主流程 -------------------- async def main() -> None: # 1. 优先重试上次失败 failed_tasks = load_failed() if failed_tasks: log.info(f"优先重试上次失败任务: {len(failed_tasks)} 张") tasks = failed_tasks + await scan_tasks() if not tasks: log.info("没有需要下载的图片,收工!") return proxy = PROXY if PROXY else None limits = httpx.Limits(max_keepalive_connections=20, max_connections=50) async with httpx.AsyncClient(limits=limits, timeout=TIMEOUT, proxies=proxy, verify=True) as client: sem = asyncio.Semaphore(CONCURRENCY) results = await tqdm_asyncio.gather( *[download_one(client, sem, t) for t in tasks], desc="Downloading", total=len(tasks), ) # 统计 & 持久化新失败 failed_again = [t for t, ok in zip(tasks, results) if not ok] if failed_again: save_failed(failed_again) log.warning(f"本轮仍有 {len(failed_again)} 张下载失败,已写入 {FAILED_RECORD}") else: Path(FAILED_RECORD).unlink(missing_ok=True) log.info("全部下载完成!") if __name__ == "__main__": try: asyncio.run(main()) except KeyboardInterrupt: log.info("用户中断,下载结束")