From a9ec2702dbf79677049bf4638aae5cb9d5c11225 Mon Sep 17 00:00:00 2001 From: jack Date: Tue, 31 Mar 2026 16:41:55 +0800 Subject: [PATCH] =?UTF-8?q?alpha=20=E6=80=A7=E8=83=BD=E5=9B=9E=E5=86=99?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- fetch-alphas/wqb-get-alphas.py | 2 +- test/wqb-fetch_alpha_by_id/main.py | 71 ++++++++++++++++++++++++++++++ 2 files changed, 72 insertions(+), 1 deletion(-) create mode 100644 test/wqb-fetch_alpha_by_id/main.py diff --git a/fetch-alphas/wqb-get-alphas.py b/fetch-alphas/wqb-get-alphas.py index 4455605..9936374 100644 --- a/fetch-alphas/wqb-get-alphas.py +++ b/fetch-alphas/wqb-get-alphas.py @@ -36,7 +36,7 @@ class AlphaManager: return logger - def login(self, credentials_file='account.txt'): + def login(self): """登录WorldQuant Brain API""" try: # 从nacos获取账号密码 diff --git a/test/wqb-fetch_alpha_by_id/main.py b/test/wqb-fetch_alpha_by_id/main.py new file mode 100644 index 0000000..1fbd810 --- /dev/null +++ b/test/wqb-fetch_alpha_by_id/main.py @@ -0,0 +1,71 @@ +import httpx +from httpx import BasicAuth, Timeout + + +def login(): + """登录WorldQuant Brain API""" + try: + # 从nacos获取账号密码 + with httpx.Client(timeout=10.0) as temp_client: + nacos_resp = temp_client.get( + 'http://192.168.31.41:30848/nacos/v1/cs/configs?dataId=wq_account&group=quantify' + ) + + if nacos_resp.status_code != 200: + print('获取账号密码失败') + return False + + config = nacos_resp.json() + username = config.get('user_name') + password = config.get('password') + + if not username or not password: + print('账号密码不完整') + return False + + print(f"正在登录账户: {username}") + + # 创建客户端并设置超时(关键修复) + # 设置更长的超时时间:连接30秒,读取60秒 + timeout = Timeout(connect=30.0, read=60.0, write=30.0, pool=30.0) + client = httpx.Client( + auth=BasicAuth(username, password), + timeout=timeout + ) + + # 发送登录请求 + response = client.post( + 'https://api.worldquantbrain.com/authentication') + + if response.status_code == 201: + print("登录成功!") + # 登录成功后将client对象返回出来 + return client + else: + print( + f"登录失败: {response.status_code} - {response.text}") + client.close() + return False + + except Exception as e: + print(f"登录异常: {e}") + return False + + +def fetch_alpha_by_id(alpha_id: str, client: httpx.Client): + """根据Alpha ID获取Alpha详情""" + url = f"https://api.worldquantbrain.com/alphas/{alpha_id}" + response = client.get(url) + if response.status_code == 200: + return response.json() + else: + return {} + + +if __name__ == '__main__': + client = login() + if not client: + exit(1) + + alpha = fetch_alpha_by_id('O0mnm8zd', client) + print(alpha)