import time import logging import httpx from httpx import BasicAuth # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) def login(): """ 登录 WorldQuant Brain API,返回 httpx.Client 对象。 注意:登录成功后 client 已包含认证信息,无需额外处理。 """ # 从 nacos 获取账号密码 nacos_resp = httpx.get('http://192.168.31.41:30848/nacos/v1/cs/configs?dataId=wq_account&group=quantify') if nacos_resp.status_code != 200: logger.error('获取账号密码失败') return None config = nacos_resp.json() username = config['user_name'] password = config['password'] logger.info(f"正在登录账户: {username}") # 创建客户端并认证 client = httpx.Client(auth=BasicAuth(username, password)) # 发送登录请求 response = client.post('https://api.worldquantbrain.com/authentication') logger.info(f"登录状态: {response.status_code}") if response.status_code == 201: logger.info("登录成功!") logger.debug(response.json()) return client else: logger.error(f"登录失败: {response.json()}") client.close() return None def submit_alpha(alpha_id): """ 提交 alpha,自动重试直到成功或达到最大重试次数。 返回 True 表示提交成功,False 表示最终失败。 """ TOTAL_RETRY_COUNT = 100000 retry_count = 0 client = None while retry_count < TOTAL_RETRY_COUNT: # 如果没有有效 client,则重新登录 if client is None: client = login() if client is None: logger.error("登录失败,等待 10 秒后重试") time.sleep(10) retry_count += 1 continue try: url = f"https://api.worldquantbrain.com/alphas/{alpha_id}/submit" logger.debug(f'url: {url}') # 1. 发送提交请求 res = client.post(url) # 处理特殊情况:已经提交过(需要轮询结果) if res.status_code == 400 and "The plain HTTP request was sent to HTTPS port" in res.text: logger.info("Alpha 已提交,正在轮询状态...") # 轮询获取最终结果 poll_interval = 1.0 # 初始轮询间隔 while True: time.sleep(poll_interval) print(".", end="", flush=True) # 使用 GET 请求查询当前状态 poll_res = client.get(url) # 如果服务器返回了 Retry-After,则使用它 if "retry-after" in poll_res.headers: poll_interval = max(float(poll_res.headers["retry-after"]), 3) else: poll_interval = 3 # 默认间隔 # 当状态不再是 400(处理中)时,退出轮询 if poll_res.status_code != 400 or "The plain HTTP request was sent to HTTPS port" not in poll_res.text: res = poll_res break logger.info(f"轮询结束,最终状态码: {res.status_code}") # 2. 处理各种状态码 if res.status_code == 429: logger.info("触发限流 (429),休眠 60 秒后重试") time.sleep(60) retry_count += 1 continue if res.status_code == 401: logger.warning("认证失效,重新登录") if client: client.close() client = None retry_count += 1 continue if res.status_code == 404: logger.warning(f"Alpha {alpha_id} 不存在或超时,重试 ({retry_count+1}/{TOTAL_RETRY_COUNT})") retry_count += 1 continue if res.status_code // 100 == 5: logger.warning(f"服务器错误 {res.status_code},5 秒后重试") time.sleep(5) retry_count += 1 continue if res.status_code == 403: logger.info(f"{alpha_id} 提交失败 (403)") fail_checks = [] try: checks = res.json()["is"]["checks"] fail_checks = [x for x in checks if x.get("result") == "FAIL"] except Exception as e: logger.error(f"解析失败原因时出错: {e}") logger.info(f"失败的检查项: {fail_checks}") # 如果是提交次数超限,则直接退出,不再重试 if any(x.get("name") in ["REGULAR_SUBMISSION", "SUPER_SUBMISSION"] for x in fail_checks): logger.error("提交次数超过限制,放弃重试") return False # 其他 403 错误也视为永久失败 return False if res.status_code == 200: logger.info(f"{alpha_id} 提交成功") return True # 其他非 2xx 状态码,视为未知错误,直接退出 logger.error(f"未处理的响应状态码 {res.status_code},放弃重试。响应内容: {res.text[:200]}") return False except httpx.RequestError as e: logger.error(f"网络请求异常 (alpha_id={alpha_id}, retry={retry_count}): {e}") retry_count += 1 time.sleep(10) continue except Exception as e: logger.error(f"未预期的异常 (alpha_id={alpha_id}): {e}") return False # 超过最大重试次数 logger.error(f"达到最大重试次数 {TOTAL_RETRY_COUNT},提交失败") return False # 使用示例 if __name__ == "__main__": # 测试提交一个 alpha alpha_id = "your_alpha_id_here" success = submit_alpha(alpha_id) if success: print("提交完成") else: print("提交失败")