import json import random import subprocess import httpx def load_config(): with open("config.json", "r") as f: return json.load(f) config = load_config() END = config["total_container"] START_PORT = config["start_web_port"] START_API_PORT = config["start_port"] base_url_list = [f'http://192.168.31.201:{START_PORT + i}' for i in range(0, END)] base_api_list = [f'http://192.168.31.201:{START_API_PORT + i}' for i in range(0, END)] class ProxyManager: def __init__(self, base_url): self.base_url = base_url def switch_to_global(self): print(f"{self.base_url}/api/configs 切换全局代理") headers = { "accept": "application/json, text/plain, */*", "accept-encoding": "gzip, deflate, br, zstd", "accept-language": "zh-CN,zh", "connection": "keep-alive", "content-type": "application/json", "host": base_url.replace('http://', ''), "origin": base_url, "referer": base_url, "sec-ch-ua": '"Not A(Brand";v="8", "Chromium";v="132", "Brave";v="132"', "sec-ch-ua-mobile": "?0", "sec-ch-ua-platform": '"macOS"', "sec-fetch-dest": "empty", "sec-fetch-mode": "cors", "sec-fetch-site": "same-origin", "sec-gpc": "1", "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/132.0.0.0 Safari/537.36", } data = {"mode": "Global"} try: with httpx.Client() as client: response = client.patch(self.base_url + "/api/configs", json=data, headers=headers) if response.status_code == 204: print(f"{self.base_url}/api/configs 切换全局代理成功") else: print(f"{self.base_url}/api/configs 切换全局代理失败 {response.status_code}") exit(1) except httpx.RequestError as exc: print(f"请求失败: {exc}") exit(1) def get_proxy_name(self): print(f'{self.base_url}/api/proxies 获取代理名称') try: response = httpx.get(self.base_url + "/api/proxies") response.raise_for_status() proxies = response.json() proxy_name_list = [item for item in proxies['proxies']['GLOBAL']['all'] if item not in ('DIRECT', 'REJECT')] return proxy_name_list except Exception as e: print(f'get_proxy_name: {e}') exit(1) def switch_proxy(self, proxy_name, base_api): print(f'{self.base_url}/api/proxies/GLOBAL 切换代理') try: target_url = self.base_url + '/api/proxies/GLOBAL' response = httpx.put(target_url, json={"name": proxy_name}) if response.status_code == 204: # 切换代理成功, 检测代理 return self.check_proxy(base_api) else: print(f"切换代理失败: {response.status_code} - {proxy_name}\n重试...") return False except Exception as e: print(f"切换代理失败: {e}\n重试...") return False def check_proxy(self, proxy_url): # proxy_url: 代理地址, 没有密码 # 测试目标地址: command = ["curl", "-x", proxy_url, "ip.sb"] # 执行命令并获取输出 try: result = subprocess.run(command, capture_output=True, text=True, check=True) if not result: return False print("Output:", result.stdout) return True except subprocess.CalledProcessError as e: # 如果命令执行失败,打印错误信息 print("Error:", e.stderr) return False used_proxy = [] for base_url, base_api in zip(base_url_list, base_api_list): manager = ProxyManager(base_url) manager.switch_to_global() proxy_name_list = manager.get_proxy_name() while proxy_name_list: proxy_name = random.choice(proxy_name_list) if proxy_name not in used_proxy: if manager.switch_proxy(proxy_name, base_api): break result = used_proxy.append(proxy_name)