#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 异步批量抓取 E-H 画廊图片链接,按专辑保存 json python eh_crawler.py """ from __future__ import annotations import asyncio import json import logging import re import sys from pathlib import Path from typing import Dict, List, Optional, Tuple import aiofiles import httpx from bs4 import BeautifulSoup from tqdm.asyncio import tqdm_asyncio from aiopath import AsyncPath # -------------------- 可配置常量 -------------------- CONCURRENCY = 20 # 并发页数 MAX_PAGE = 100 # 单专辑最大翻页 RETRY_PER_PAGE = 5 # 单页重试 TIMEOUT = httpx.Timeout(10.0) # 请求超时 PROXY = "http://127.0.0.1:7890" # 科学上网代理,不需要留空 IMG_SELECTOR = "#gdt" # 图片入口区域 FAILED_RECORD = "failed_keys.json" LOG_LEVEL = logging.INFO # ---------------------------------------------------- logging.basicConfig( level=LOG_LEVEL, format="[%(asctime)s] [%(levelname)s] %(message)s", handlers=[ logging.StreamHandler(sys.stdout), logging.FileHandler("crawl.log", encoding="utf-8"), ], ) log = logging.getLogger("eh_crawler") # 预编译正则 ILLEGAL_CHARS = re.compile(r'[<>:"/\\|?*\x00-\x1F]') # -------------------- 工具函数 -------------------- def clean_folder_name(title: str) -> str: """清洗文件夹名""" return ILLEGAL_CHARS.sub("_", title).replace(" ", "").replace("_", "").strip() or "gallery" def load_targets() -> List[str]: """读取 targets.txt""" tgt = Path("targets.txt") if not tgt.exists(): log.error("targets.txt 不存在,已自动创建,请先填写 URL") tgt.touch() sys.exit(0) lines = [ln.strip() for ln in tgt.read_text(encoding="utf-8").splitlines() if ln.strip()] if not lines: log.error("targets.txt 为空,请先填写 URL") sys.exit(0) return list(set(lines)) # 去重 def load_failed() -> List[str]: if Path(FAILED_RECORD).exists(): try: return json.loads(Path(FAILED_RECORD).read_text(encoding="utf-8")) except Exception as exc: log.warning(f"加载失败记录失败 -> {exc}") return [] def save_failed(keys: List[str]) -> None: Path(FAILED_RECORD).write_text(json.dumps(keys, ensure_ascii=False, indent=2), encoding="utf-8") # -------------------- 爬虫核心 -------------------- async def fetch_page(client: httpx.AsyncClient, url: str) -> Optional[str]: """获取单页 HTML""" for attempt in range(1, RETRY_PER_PAGE + 1): try: resp = await client.get(url) resp.raise_for_status() return resp.text except httpx.HTTPError as exc: log.error(f"[{attempt}/{RETRY_PER_PAGE}] 请求失败 {url} -> {exc}") await asyncio.sleep(2 ** attempt) return None async def crawl_single_gallery( client: httpx.AsyncClient, sem: asyncio.Semaphore, gallery_url: str ) -> bool: """抓取单个画廊,成功返回 True""" async with sem: base_url = gallery_url.rstrip("/") key = base_url.split("/")[-1] # 用最后一截当 key json_name = f"{key}.json" folder_path: Optional[AsyncPath] = None json_data: Dict[str, str] = {} img_count = 1 last_page = False for page in range(MAX_PAGE): if last_page: break url = f"{base_url}?p={page}" html = await fetch_page(client, url) if html is None: continue soup = BeautifulSoup(html, "lxml") title = soup.title.string if soup.title else "gallery" clean_title = clean_folder_name(title) folder_path = AsyncPath("downloads") / clean_title await folder_path.mkdir(parents=True, exist_ok=True) # 如果 json 已存在则跳过整个画廊 json_path = folder_path / json_name if await json_path.exists(): log.info(f"{json_name} 已存在,跳过") return True log.info(f"当前页码:{page + 1} {url}") selected = soup.select_one(IMG_SELECTOR) if not selected: log.warning(f"未找到选择器 {IMG_SELECTOR}") continue links = re.findall(r' {json_path} ({len(json_data)} 张)") return True else: log.warning(f"{key} 未解析到任何图片链接") return False # -------------------- 主流程 -------------------- async def main() -> None: targets = load_targets() failed = load_failed() if failed: log.info(f"优先重试上次失败画廊: {len(failed)} 个") all_urls = list(set(targets + failed)) proxy = PROXY if PROXY else None limits = httpx.Limits(max_keepalive_connections=20, max_connections=50) async with httpx.AsyncClient( limits=limits, timeout=TIMEOUT, proxies=proxy, verify=True ) as client: sem = asyncio.Semaphore(CONCURRENCY) results = await tqdm_asyncio.gather( *[crawl_single_gallery(client, sem, u) for u in all_urls], desc="Galleries", total=len(all_urls), ) # 失败持久化 new_failed = [u for u, ok in zip(all_urls, results) if not ok] if new_failed: save_failed(new_failed) log.warning(f"本轮仍有 {len(new_failed)} 个画廊失败,已写入 {FAILED_RECORD}") else: Path(FAILED_RECORD).unlink(missing_ok=True) log.info("全部画廊抓取完成!") if __name__ == "__main__": try: asyncio.run(main()) except KeyboardInterrupt: log.info("用户中断,抓取结束")