You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
eh-fastapi/src/services/downloader.py

43 lines
1.4 KiB

import aiofiles
import httpx
from typing import Optional
import os
from config import config
class Downloader:
def __init__(self, proxy_str: Optional[str] = None):
self.proxy = proxy_str
async def download(self, url: str, save_path: str):
os.makedirs(os.path.dirname(save_path), exist_ok=True)
async with httpx.AsyncClient(proxies=self.proxy) as client:
response = await client.get(url)
response.raise_for_status()
async with aiofiles.open(save_path, 'wb') as f:
await f.write(response.content)
def _get_filename(self, url: str, content_type: Optional[str] = None) -> str:
if not url:
raise Exception("URL cannot be empty")
filename = url.split('/')[-1]
if content_type:
ext = content_type.split('/')[-1]
if '.' not in filename:
filename = f"{filename}.{ext}"
return filename
async def download_image(self, url: str, save_dir: str):
async with httpx.AsyncClient(proxies=self.proxy) as client:
response = await client.get(url)
response.raise_for_status()
filename = self._get_filename(url, response.headers.get('content-type'))
save_path = os.path.join(save_dir, filename)
await self.download(url, save_path)
return save_path