import aiofiles import httpx from typing import Optional import os class Downloader: def __init__(self): self.output_dir = "downloads" os.makedirs(self.output_dir, exist_ok=True) async def download(self, proxy_str: str, url: str) -> str: """ 下载文件的主要逻辑 """ try: # 如果proxy_str不为空,构建代理配置 proxy = None if proxy_str and ":" in proxy_str: ip, port = proxy_str.split(":", 1) proxy = f"http://{ip}:{port}" # 使用 httpx 异步下载 async with httpx.AsyncClient(proxies=proxy, timeout=30.0) as client: response = await client.get(url) response.raise_for_status() # 获取文件名 filename = self._get_filename(url, response) filepath = os.path.join(self.output_dir, filename) # 保存文件 async with aiofiles.open(filepath, 'wb') as f: await f.write(response.content) return f"下载成功: {filename}\n保存路径: {filepath}\n文件大小: {len(response.content)} bytes" except Exception as e: raise Exception(f"下载过程中出错: {str(e)}") def _get_filename(self, url: str, response: httpx.Response) -> str: """从 URL 或响应头中获取文件名""" # 从 URL 中提取文件名 if '/' in url: filename = url.split('/')[-1] if '?' in filename: filename = filename.split('?')[0] else: filename = "downloaded_file" # 如果没有扩展名,尝试从 Content-Type 推断 if '.' not in filename: content_type = response.headers.get('content-type', '') if 'image' in content_type: ext = content_type.split('/')[-1] filename = f"{filename}.{ext}" return filename or "downloaded_file" async def download_image(self, proxy_str: str, url: str) -> str: """专门下载图片的方法""" # 这里可以添加图片下载的特殊逻辑 return await self.download(proxy_str, url)