You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
auto/archive/clash-pi-proxy/switch_proxy.py

119 lines
4.2 KiB

import json
import random
import subprocess
import httpx
def load_config():
with open("config.json", "r") as f:
return json.load(f)
config = load_config()
END = config["total_container"]
START_PORT = config["start_web_port"]
START_API_PORT = config["start_port"]
base_url_list = [f'http://192.168.31.201:{START_PORT + i}' for i in range(0, END)]
base_api_list = [f'http://192.168.31.201:{START_API_PORT + i}' for i in range(0, END)]
class ProxyManager:
def __init__(self, base_url):
self.base_url = base_url
def switch_to_global(self):
print(f"{self.base_url}/api/configs 切换全局代理")
headers = {
"accept": "application/json, text/plain, */*",
"accept-encoding": "gzip, deflate, br, zstd",
"accept-language": "zh-CN,zh",
"connection": "keep-alive",
"content-type": "application/json",
"host": base_url.replace('http://', ''),
"origin": base_url,
"referer": base_url,
"sec-ch-ua": '"Not A(Brand";v="8", "Chromium";v="132", "Brave";v="132"',
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": '"macOS"',
"sec-fetch-dest": "empty",
"sec-fetch-mode": "cors",
"sec-fetch-site": "same-origin",
"sec-gpc": "1",
"user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/132.0.0.0 Safari/537.36",
}
data = {"mode": "Global"}
try:
with httpx.Client() as client:
response = client.patch(self.base_url + "/api/configs", json=data, headers=headers)
if response.status_code == 204:
print(f"{self.base_url}/api/configs 切换全局代理成功")
else:
print(f"{self.base_url}/api/configs 切换全局代理失败 {response.status_code}")
exit(1)
except httpx.RequestError as exc:
print(f"请求失败: {exc}")
exit(1)
def get_proxy_name(self):
print(f'{self.base_url}/api/proxies 获取代理名称')
try:
response = httpx.get(self.base_url + "/api/proxies")
response.raise_for_status()
proxies = response.json()
proxy_name_list = [item for item in proxies['proxies']['GLOBAL']['all'] if item not in ('DIRECT', 'REJECT')]
return proxy_name_list
except Exception as e:
print(f'get_proxy_name: {e}')
exit(1)
def switch_proxy(self, proxy_name, base_api):
print(f'{self.base_url}/api/proxies/GLOBAL 切换代理')
try:
target_url = self.base_url + '/api/proxies/GLOBAL'
response = httpx.put(target_url, json={"name": proxy_name})
if response.status_code == 204:
# 切换代理成功, 检测代理
return self.check_proxy(base_api)
else:
print(f"切换代理失败: {response.status_code} - {proxy_name}\n重试...")
return False
except Exception as e:
print(f"切换代理失败: {e}\n重试...")
return False
def check_proxy(self, proxy_url):
# proxy_url: 代理地址, 没有密码
# 测试目标地址:
command = ["curl", "-x", proxy_url, "ip.sb"]
# 执行命令并获取输出
try:
result = subprocess.run(command, capture_output=True, text=True, check=True)
if not result:
return False
print("Output:", result.stdout)
return True
except subprocess.CalledProcessError as e:
# 如果命令执行失败,打印错误信息
print("Error:", e.stderr)
return False
used_proxy = []
for base_url, base_api in zip(base_url_list, base_api_list):
manager = ProxyManager(base_url)
manager.switch_to_global()
proxy_name_list = manager.get_proxy_name()
while proxy_name_list:
proxy_name = random.choice(proxy_name_list)
if proxy_name not in used_proxy:
if manager.switch_proxy(proxy_name, base_api):
break
result = used_proxy.append(proxy_name)