You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
auto/test/my_proxy.py

103 lines
3.3 KiB

import asyncio
from aioquic.asyncio import connect
from aioquic.quic.configuration import QuicConfiguration
from aioquic.quic.events import StreamDataReceived, ConnectionTerminated
from aioquic.asyncio.protocol import QuicConnectionProtocol
import ssl
# Trojan 配置
config = {
"server": "lbso.bnnodeservice.com", # 目标服务器地址
"port": 443, # 目标服务器端口
"sni": "cert.bitbyte.one", # SNI 伪装域名
"password": "f2e8e50c-ffb8-48a1-a460-2e72dfaf7845", # 密码
"local_address": "127.0.0.1", # 本地监听地址
"local_port": 1080, # 本地监听端口
}
# 创建 QUIC 配置
quic_config = QuicConfiguration(is_client=True, alpn_protocols=["h3"], max_datagram_frame_size=65536)
quic_config.verify_mode = ssl.CERT_NONE # 禁用证书验证
quic_config.server_name = config["sni"] # 设置 SNI
# 创建 QUIC 连接
class TrojanClientProtocol(QuicConnectionProtocol):
def __init__(self):
super().__init__()
self.writer = None
def connection_established(self, quic):
self.quic = quic
def quic_event_received(self, event):
if isinstance(event, StreamDataReceived):
if self.writer:
self.writer.write(event.data)
self.writer.drain()
elif isinstance(event, ConnectionTerminated):
if self.writer:
self.writer.close()
# 处理客户端请求
async def handle_client(reader, writer):
protocol = TrojanClientProtocol()
protocol.writer = writer
# 创建到目标服务器的 QUIC 连接
async with connect(
config["server"],
config["port"],
configuration=quic_config,
create_protocol=lambda: protocol,
) as quic_connection:
# 发送密码
quic_connection.send_stream_data(0, config["password"].encode() + b"\r\n")
# 处理客户端请求
async def client_to_server():
try:
while True:
data = await reader.read(4096)
if not data:
break
quic_connection.send_stream_data(0, data)
except Exception as e:
print(f"Client to server error: {e}")
finally:
quic_connection.close()
# 处理服务器响应
async def server_to_client():
try:
while True:
event = quic_connection.next_event()
if isinstance(event, StreamDataReceived):
writer.write(event.data)
await writer.drain()
elif isinstance(event, ConnectionTerminated):
break
except Exception as e:
print(f"Server to client error: {e}")
finally:
writer.close()
# 启动两个任务
await asyncio.gather(client_to_server(), server_to_client())
# 关闭连接
writer.close()
# 启动本地代理服务器
async def start_proxy():
server = await asyncio.start_server(handle_client, config["local_address"], config["local_port"])
print(f"代理服务器已启动,监听地址:{config['local_address']}:{config['local_port']}")
async with server:
await server.serve_forever()
# 运行代理服务器
if __name__ == "__main__":
asyncio.run(start_proxy())