You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
187 lines
6.4 KiB
187 lines
6.4 KiB
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
"""
|
|
异步批量下载 EH 画廊真实图片
|
|
python download_images.py
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
import os
|
|
import re
|
|
import sys
|
|
from pathlib import Path
|
|
from typing import Dict, List
|
|
|
|
import aiofiles
|
|
import httpx
|
|
from pathlib import Path
|
|
from tqdm.asyncio import tqdm_asyncio
|
|
|
|
# -------------------- 可配置常量 --------------------
|
|
from config import config
|
|
|
|
CONCURRENCY = config.concurrency
|
|
RETRY_PER_IMG = config.retry_per_image
|
|
TIMEOUT = httpx.Timeout(config.image_timeout)
|
|
FAILED_RECORD = "data/failed_downloads.json"
|
|
LOG_LEVEL = getattr(logging, config.log_level.upper())
|
|
# ----------------------------------------------------
|
|
|
|
# 确保数据目录存在
|
|
if not os.path.exists("data"):
|
|
os.mkdir("data")
|
|
|
|
# 使用统一的日志配置
|
|
from logger import get_logger
|
|
from realtime_logger import realtime_logger
|
|
log = get_logger("step2", "download.log")
|
|
|
|
# 预编译正则
|
|
IMG_URL_RE = re.compile(r'<img id="img" src="(.*?)"', re.S)
|
|
EXT_RE = re.compile(r"\.(jpg|jpeg|png|gif|webp)$", re.I)
|
|
|
|
|
|
# -------------------- 工具函数 --------------------
|
|
def load_failed() -> List[Dict[str, str]]:
|
|
if Path(FAILED_RECORD).exists():
|
|
try:
|
|
return json.loads(Path(FAILED_RECORD).read_text(encoding="utf-8"))
|
|
except Exception as exc:
|
|
log.warning(f"加载失败记录失败 -> {exc}")
|
|
return []
|
|
|
|
|
|
def save_failed(failed: List[Dict[str, str]]) -> None:
|
|
Path(FAILED_RECORD).write_text(json.dumps(failed, ensure_ascii=False, indent=2), encoding="utf-8")
|
|
|
|
|
|
# -------------------- 下载核心 --------------------
|
|
async def download_one(
|
|
client: httpx.AsyncClient, sem: asyncio.Semaphore, item: Dict[str, str]
|
|
) -> bool:
|
|
"""下载单张图,成功返回 True"""
|
|
img_path, img_url = Path(item["img_path"]), item["img_url"]
|
|
|
|
await sem.acquire()
|
|
try:
|
|
for attempt in range(1, RETRY_PER_IMG + 1):
|
|
try:
|
|
# 1. 获取详情页
|
|
resp = await client.get(img_url)
|
|
resp.raise_for_status()
|
|
real_url_match = IMG_URL_RE.search(resp.text)
|
|
if not real_url_match:
|
|
log.warning(f"未解析到真实图片链接: {img_url}")
|
|
return False # <- 这里不会触发 await
|
|
real_url = real_url_match.group(1)
|
|
|
|
# 2. 下载真实图片(流式)
|
|
ext_match = EXT_RE.search(real_url)
|
|
ext = ext_match.group(1).lower() if ext_match else "jpg"
|
|
final_path = img_path.with_suffix(f".{ext}")
|
|
|
|
if final_path.exists():
|
|
log.info(f"已存在,跳过: {final_path.name}")
|
|
return True
|
|
|
|
async with client.stream("GET", real_url) as img_resp:
|
|
img_resp.raise_for_status()
|
|
final_path.parent.mkdir(parents=True, exist_ok=True)
|
|
async with aiofiles.open(final_path, "wb") as fp:
|
|
async for chunk in img_resp.aiter_bytes(chunk_size=65536):
|
|
await fp.write(chunk)
|
|
|
|
# log.info(f"[OK] {final_path.name}")
|
|
# 发送实时日志
|
|
try:
|
|
realtime_logger.broadcast_log_sync(f"下载完成: {final_path.name}", "SUCCESS", "step2")
|
|
except Exception as e:
|
|
log.warning(f"发送实时日志失败: {e}")
|
|
return True
|
|
|
|
except httpx.HTTPStatusError as exc:
|
|
if exc.response.status_code == 429:
|
|
wait = 2 ** (attempt - 1)
|
|
log.warning(f"[429] 等待 {wait}s 后重试({attempt}/{RETRY_PER_IMG})")
|
|
await asyncio.sleep(wait)
|
|
else:
|
|
log.error(f"[HTTP {exc.response.status_code}] {img_url}")
|
|
break
|
|
except Exception as exc:
|
|
log.error(f"[ERROR] {img_url} -> {exc} ({attempt}/{RETRY_PER_IMG})")
|
|
await asyncio.sleep(1)
|
|
|
|
return False
|
|
finally:
|
|
sem.release()
|
|
|
|
|
|
# -------------------- 扫描待下载列表 --------------------
|
|
async def scan_tasks() -> List[Dict[str, str]]:
|
|
"""扫描 downloads/ 下所有 json,返回待下载列表"""
|
|
result = []
|
|
root = Path("data/downloads")
|
|
if not root.exists():
|
|
return result
|
|
|
|
for json_path in root.rglob("*.json"):
|
|
folder = json_path.parent
|
|
try:
|
|
data: Dict[str, str] = json.loads(json_path.read_text(encoding="utf-8"))
|
|
except Exception as exc:
|
|
log.warning(f"读取 json 失败 {json_path} -> {exc}")
|
|
continue
|
|
|
|
for img_name, img_url in data.items():
|
|
img_path = folder / img_name # 无后缀
|
|
# 判断任意后缀是否存在
|
|
exists = False
|
|
for ext in (".jpg", ".jpeg", ".png", ".gif", ".webp"):
|
|
if img_path.with_suffix(ext).exists():
|
|
exists = True
|
|
break
|
|
if not exists:
|
|
result.append({"img_path": str(img_path), "img_url": img_url})
|
|
|
|
return result
|
|
|
|
|
|
# -------------------- 主流程 --------------------
|
|
async def main(proxy: str | None = None) -> None:
|
|
# 1. 优先重试上次失败
|
|
failed_tasks = load_failed()
|
|
if failed_tasks:
|
|
log.info(f"优先重试上次失败任务: {len(failed_tasks)} 张")
|
|
|
|
tasks = failed_tasks + await scan_tasks()
|
|
if not tasks:
|
|
log.info("没有需要下载的图片,收工!")
|
|
return
|
|
|
|
limits = httpx.Limits(max_keepalive_connections=20, max_connections=50)
|
|
async with httpx.AsyncClient(limits=limits, timeout=TIMEOUT, proxies=proxy, verify=True) as client:
|
|
sem = asyncio.Semaphore(CONCURRENCY)
|
|
results = await tqdm_asyncio.gather(
|
|
*[download_one(client, sem, t) for t in tasks],
|
|
desc="Downloading",
|
|
total=len(tasks),
|
|
)
|
|
|
|
# 统计 & 持久化新失败
|
|
failed_again = [t for t, ok in zip(tasks, results) if not ok]
|
|
if failed_again:
|
|
save_failed(failed_again)
|
|
log.warning(f"本轮仍有 {len(failed_again)} 张下载失败,已写入 {FAILED_RECORD}")
|
|
else:
|
|
Path(FAILED_RECORD).unlink(missing_ok=True)
|
|
log.info("全部下载完成!")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
try:
|
|
asyncio.run(main())
|
|
except KeyboardInterrupt:
|
|
log.info("用户中断,下载结束") |