You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
eh-fastapi/main.py

305 lines
10 KiB

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
EH-Downloader 主应用
"""
import glob
import os
from pathlib import Path
from typing import List
from fastapi import FastAPI, Request, HTTPException, WebSocket, WebSocketDisconnect
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from fastapi.responses import JSONResponse, FileResponse
from pydantic import BaseModel
import uvicorn
import asyncio
import threading
import json
from config import config
from logger import get_logger
from realtime_logger import realtime_logger
import step2
from utils import run_step1, run_step2
# 设置日志
logger = get_logger("main", "app.log")
app = FastAPI(
title=config.app_name,
version=config.app_version,
description="E-Hentai 画廊下载工具"
)
@app.on_event("startup")
async def startup_event():
"""应用启动事件"""
logger.info(f"启动 {config.app_name} v{config.app_version}")
# 注册事件循环到实时日志器,便于跨线程广播
try:
realtime_logger.set_loop(asyncio.get_running_loop())
except RuntimeError:
# 若获取失败则忽略
pass
# 确保目录存在
config._ensure_directories()
# 创建默认targets.txt文件
if not config.targets_path.exists():
with open(config.targets_path, 'w', encoding='utf-8') as f:
f.write("# 在这里添加目标URL,每行一个\n")
f.write("# 示例:\n")
f.write("https://e-hentai.org/g/3550066/47d6393550\n")
logger.info(f"创建文件: {config.targets_path}")
# 创建默认proxy.txt文件
if not config.proxy_path.exists():
with open(config.proxy_path, 'w', encoding='utf-8') as f:
f.write("127.0.0.1:7890\n")
logger.info(f"创建文件: {config.proxy_path}")
logger.info("应用启动完成")
# 挂载静态文件和模板
app.mount("/static", StaticFiles(directory="static"), name="static")
templates = Jinja2Templates(directory="templates")
# WebSocket 路由
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
"""WebSocket连接处理"""
await websocket.accept()
realtime_logger.add_connection(websocket)
try:
# 发送最近的日志
recent_logs = await realtime_logger.get_recent_logs(20)
for log_entry in recent_logs:
await websocket.send_text(json.dumps(log_entry, ensure_ascii=False))
# 保持连接
while True:
try:
# 等待客户端消息(心跳检测)
data = await websocket.receive_text()
if data == "ping":
await websocket.send_text("pong")
except WebSocketDisconnect:
break
except Exception as e:
logger.error(f"WebSocket错误: {e}")
finally:
realtime_logger.remove_connection(websocket)
# favicon 路由
@app.get("/favicon.ico", include_in_schema=False)
async def favicon():
return FileResponse("static/favicon.ico")
@app.get("/")
async def home(request: Request):
"""主页面"""
try:
proxies = config.get_proxies()
return templates.TemplateResponse("index.html", {
"request": request,
"proxies": proxies,
"default_proxy": proxies[0] if proxies else "127.0.0.1:7890"
})
except Exception as e:
logger.error(f"渲染主页失败: {e}")
raise HTTPException(status_code=500, detail="服务器内部错误")
@app.post("/load_urls")
async def load_urls():
"""读取 targets.txt 文件中的URL"""
try:
urls = config.get_targets()
if not urls:
return JSONResponse({
"success": True,
"message": "targets.txt 文件为空,请在data/targets.txt中添加URL",
"urls": []
})
logger.info(f"成功读取 {len(urls)} 个URL")
return JSONResponse({
"success": True,
"message": f"成功读取 {len(urls)} 个URL",
"urls": urls
})
except Exception as e:
logger.error(f"读取URL失败: {e}")
return JSONResponse({
"success": False,
"message": f"读取文件时出错: {str(e)}",
"urls": []
})
@app.post("/clear")
async def clear_output():
"""清除输出"""
return JSONResponse({
"success": True,
"message": "输出已清除",
"output": ""
})
class ProxyRequest(BaseModel):
proxy: str # 修改为单个proxy字段
@app.post("/download_urls")
async def download_urls(req: ProxyRequest):
"""下载画廊链接"""
try:
# 解析proxy字符串为ip和port
if ":" in req.proxy:
ip, port = req.proxy.split(":", 1)
proxy = f"http://{ip}:{port}"
else:
proxy = None
# 发送实时日志
await realtime_logger.broadcast_log(f"开始抓取画廊链接,代理: {proxy}", "INFO", "step1")
# 在后台线程中执行,避免阻塞
def run_step1_sync():
import asyncio
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
return loop.run_until_complete(run_step1(proxy))
finally:
loop.close()
# 使用线程池执行
import concurrent.futures
with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(run_step1_sync)
msg = future.result()
await realtime_logger.broadcast_log(f"画廊链接抓取完成: {msg}", "SUCCESS", "step1")
return JSONResponse({"success": True, "message": msg})
except Exception as e:
await realtime_logger.broadcast_log(f"抓取画廊链接失败: {e}", "ERROR", "step1")
logger.error(f"抓取画廊链接失败: {e}")
return JSONResponse({"success": False, "message": f"抓取失败: {str(e)}"})
@app.post("/download_images")
async def download_images(req: ProxyRequest):
"""下载图片"""
try:
# 解析proxy字符串为ip和port
if ":" in req.proxy:
ip, port = req.proxy.split(":", 1)
proxy = f"http://{ip}:{port}"
else:
proxy = None
# 发送实时日志
await realtime_logger.broadcast_log(f"开始下载图片,代理: {proxy}", "INFO", "step2")
# 在后台线程中执行,避免阻塞
def run_step2_sync():
import asyncio
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
return loop.run_until_complete(run_step2(proxy))
finally:
loop.close()
# 使用线程池执行
import concurrent.futures
with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(run_step2_sync)
msg = future.result()
await realtime_logger.broadcast_log(f"图片下载完成: {msg}", "SUCCESS", "step2")
return JSONResponse({"success": True, "message": msg})
except Exception as e:
await realtime_logger.broadcast_log(f"下载图片失败: {e}", "ERROR", "step2")
logger.error(f"下载图片失败: {e}")
return JSONResponse({"success": False, "message": f"下载失败: {str(e)}"})
@app.post("/clean_files")
async def clean_files():
"""清理项目目录下的所有 .log 和 .json 文件"""
try:
deleted_files = []
error_files = []
# 使用配置中的清理模式
for pattern in config.cleanup_patterns:
for file_path in glob.glob(pattern, recursive=True):
try:
# 跳过排除的文件
if file_path in config.cleanup_exclude:
continue
os.remove(file_path)
deleted_files.append(file_path)
logger.info(f"已删除文件: {file_path}")
except Exception as e:
error_files.append(f"{file_path}: {str(e)}")
logger.error(f"删除文件失败 {file_path}: {str(e)}")
if error_files:
logger.warning(f"清理完成,但部分文件删除失败: {len(error_files)}")
return JSONResponse({
"success": False,
"message": f"清理完成,但部分文件删除失败",
"deleted_count": len(deleted_files),
"error_count": len(error_files),
"deleted_files": deleted_files,
"error_files": error_files
})
else:
logger.info(f"成功清理 {len(deleted_files)} 个文件")
return JSONResponse({
"success": True,
"message": f"成功清理 {len(deleted_files)} 个文件",
"deleted_count": len(deleted_files),
"error_count": 0,
"deleted_files": deleted_files
})
except Exception as e:
logger.error(f"清理过程中出错: {e}")
return JSONResponse({
"success": False,
"message": f"清理过程中出错: {str(e)}",
"deleted_count": 0,
"error_count": 0
})
@app.post("/check_incomplete")
async def check_incomplete():
"""检查未完成文件"""
try:
result = await step2.scan_tasks()
logger.info(f"检查未完成文件: {len(result)}")
return JSONResponse({
"success": True,
"message": "检查未完成文件功能已就绪",
"data": f"{len(result)} 个文件未下载"
})
except Exception as e:
logger.error(f"检查未完成文件失败: {e}")
return JSONResponse({
"success": False,
"message": f"检查失败: {str(e)}"
})
if __name__ == "__main__":
uvicorn.run(
"main:app",
host=config.host,
port=config.port,
reload=config.debug
)