import asyncio from aioquic.asyncio import connect from aioquic.quic.configuration import QuicConfiguration from aioquic.quic.events import StreamDataReceived, ConnectionTerminated from aioquic.asyncio.protocol import QuicConnectionProtocol import ssl # Trojan 配置 config = { "server": "lbso.bnnodeservice.com", # 目标服务器地址 "port": 443, # 目标服务器端口 "sni": "cert.bitbyte.one", # SNI 伪装域名 "password": "f2e8e50c-ffb8-48a1-a460-2e72dfaf7845", # 密码 "local_address": "127.0.0.1", # 本地监听地址 "local_port": 1080, # 本地监听端口 } # 创建 QUIC 配置 quic_config = QuicConfiguration(is_client=True, alpn_protocols=["h3"], max_datagram_frame_size=65536) quic_config.verify_mode = ssl.CERT_NONE # 禁用证书验证 quic_config.server_name = config["sni"] # 设置 SNI # 创建 QUIC 连接 class TrojanClientProtocol(QuicConnectionProtocol): def __init__(self): super().__init__() self.writer = None def connection_established(self, quic): self.quic = quic def quic_event_received(self, event): if isinstance(event, StreamDataReceived): if self.writer: self.writer.write(event.data) self.writer.drain() elif isinstance(event, ConnectionTerminated): if self.writer: self.writer.close() # 处理客户端请求 async def handle_client(reader, writer): protocol = TrojanClientProtocol() protocol.writer = writer # 创建到目标服务器的 QUIC 连接 async with connect( config["server"], config["port"], configuration=quic_config, create_protocol=lambda: protocol, ) as quic_connection: # 发送密码 quic_connection.send_stream_data(0, config["password"].encode() + b"\r\n") # 处理客户端请求 async def client_to_server(): try: while True: data = await reader.read(4096) if not data: break quic_connection.send_stream_data(0, data) except Exception as e: print(f"Client to server error: {e}") finally: quic_connection.close() # 处理服务器响应 async def server_to_client(): try: while True: event = quic_connection.next_event() if isinstance(event, StreamDataReceived): writer.write(event.data) await writer.drain() elif isinstance(event, ConnectionTerminated): break except Exception as e: print(f"Server to client error: {e}") finally: writer.close() # 启动两个任务 await asyncio.gather(client_to_server(), server_to_client()) # 关闭连接 writer.close() # 启动本地代理服务器 async def start_proxy(): server = await asyncio.start_server(handle_client, config["local_address"], config["local_port"]) print(f"代理服务器已启动,监听地址:{config['local_address']}:{config['local_port']}") async with server: await server.serve_forever() # 运行代理服务器 if __name__ == "__main__": asyncio.run(start_proxy())