import httpx from httpx import BasicAuth import os def login(credentials_file='account.txt'): """登录WorldQuant Brain API""" # 读取本地账号密码 if not os.path.exists(credentials_file): print("未找到 account.txt 文件") with open(credentials_file, 'w') as f: f.write("") print("account.txt 文件已创建,请填写账号密码, 格式: ['username', 'password]") exit(1) with open(credentials_file) as f: credentials = eval(f.read()) username, password = credentials[0], credentials[1] print(f"正在登录账户: {username}") # 创建客户端并认证 client = httpx.Client(auth=BasicAuth(username, password)) # 发送登录请求 response = client.post('https://api.worldquantbrain.com/authentication') print(f"登录状态: {response.status_code}") if response.status_code == 201: print("登录成功!") print(response.json()) return client else: print(f"登录失败: {response.json()}") client.close() return None def get_alphas_data(client, limit=10, offset=0): """获取alpha数据""" # 使用params参数而不是直接拼接URL params = { 'limit': limit, 'offset': offset, 'status': 'UNSUBMITTED%1FIS_FAIL', # 注意:这里可能需要根据实际API要求调整编码 'is.sharpe%3E1.25': '', # 注意:这里可能需要调整参数格式 'order': '-dateSubmitted', 'hidden': 'false' } url = 'https://api.worldquantbrain.com/users/self/alphas' try: response = client.get(url, params=params) print(f"获取数据状态码: {response.status_code}") if response.status_code == 200: return response.json() else: print(f"获取数据失败: {response.text}") return None except Exception as e: print(f"请求发生错误: {e}") return None def main(): # 登录 client = login() if not client: return try: # 第一次请求获取count first_data = get_alphas_data(client, limit=10, offset=0) if first_data and 'count' in first_data: count = first_data['count'] print(f"获取到的count值: {count}") if int(count) == 0: print("没有符合条件的Alpha因子") return # 第二次请求,使用count作为offset second_data = get_alphas_data(client, limit=count, offset=count) if second_data: print("第二次请求返回的数据:") print(second_data) else: print("第二次请求失败") else: print("第一次请求失败或未找到count字段") finally: # 确保客户端被关闭 client.close() print("\n连接已关闭") # 使用示例 if __name__ == "__main__": main()