import time import httpx import datetime def login(): username = "jack0210_@hotmail.com" password = "!QAZ2wsx+0913" timeout = httpx.Timeout(60.0, connect=10.0) limits = httpx.Limits(max_keepalive_connections=20, max_connections=100) transport = httpx.HTTPTransport(retries=3) s = httpx.Client( auth=(username, password), timeout=timeout, limits=limits, transport=transport ) response = s.post('https://api.worldquantbrain.com/authentication') print(response.content) return s def wait_get(s, url: str, max_retries: int = 10): retries = 0 while retries < max_retries: while True: simulation_progress = s.get(url) if simulation_progress.headers.get("Retry-After", 0) == 0: break time.sleep(float(simulation_progress.headers["Retry-After"])) if simulation_progress.status_code < 400: break else: time.sleep(2 ** retries) retries += 1 return simulation_progress def get_alphas(start_date, end_date, sharpe_th, fitness_th, region, alpha_num, usage): s = login() output = [] count = 0 for i in range(0, alpha_num, 100): print(i) url_e = "https://api.worldquantbrain.com/users/self/alphas?limit=100&offset=%d" % (i) \ + "&status=UNSUBMITTED%1FIS_FAIL&dateCreated%3E=2025-" + start_date \ + "T00:00:00-04:00&dateCreated%3C2025-" + end_date \ + "T00:00:00-04:00&is.fitness%3E" + str(fitness_th) + "&is.sharpe%3E" \ + str(sharpe_th) + "&settings.region=" + region + "&order=-is.sharpe&hidden=false&type!=SUPER" url_c = "https://api.worldquantbrain.com/users/self/alphas?limit=100&offset=%d" % (i) \ + "&status=UNSUBMITTED%1FIS_FAIL&dateCreated%3E=2025-" + start_date \ + "T00:00:00-04:00&dateCreated%3C2025-" + end_date \ + "T00:00:00-04:00&is.fitness%3C-" + str(fitness_th) + "&is.sharpe%3C-" \ + str(sharpe_th) + "&settings.region=" + region + "&order=is.sharpe&hidden=false&type!=SUPER" urls = [url_e] if usage != "submit": urls.append(url_c) for url in urls: response = s.get(url) try: alpha_list = response.json()["results"] for j in range(len(alpha_list)): alpha_id = alpha_list[j]["id"] name = alpha_list[j]["name"] dateCreated = alpha_list[j]["dateCreated"] sharpe = alpha_list[j]["is"]["sharpe"] fitness = alpha_list[j]["is"]["fitness"] turnover = alpha_list[j]["is"]["turnover"] margin = alpha_list[j]["is"]["margin"] longCount = alpha_list[j]["is"]["longCount"] shortCount = alpha_list[j]["is"]["shortCount"] decay = alpha_list[j]["settings"]["decay"] exp = alpha_list[j]['regular']['code'] count += 1 if (longCount + shortCount) > 100: if sharpe < -sharpe_th: exp = "-%s" % exp rec = [alpha_id, exp, sharpe, turnover, fitness, margin, dateCreated, decay] print(rec) if turnover > 0.7: rec.append(decay * 4) elif turnover > 0.6: rec.append(decay * 3 + 3) elif turnover > 0.5: rec.append(decay * 3) elif turnover > 0.4: rec.append(decay * 2) elif turnover > 0.35: rec.append(decay + 4) elif turnover > 0.3: rec.append(decay + 2) output.append(rec) except: print("%d finished re-login" % i) s = login() print("count: %d" % count) return output, s def check_consecutive_non_zero_values(alpha_id, data, required_streak=200): if not data or len(data) < required_streak: return True def check_column(column_data): if len(column_data) < required_streak: return True current_streak_count = 0 current_streak_value = None for value in column_data: if value != 0: if value == current_streak_value: current_streak_count += 1 else: current_streak_value = value current_streak_count = 1 else: current_streak_value = None current_streak_count = 0 if current_streak_count >= required_streak: return False return True column1_values = [] column2_values = [] for row in data: if len(row) >= 3: column1_values.append(row[1]) column2_values.append(row[2]) if column1_values and column2_values: is_col1_all_zeros = all(v == 0 for v in column1_values) is_col2_all_zeros = all(v == 0 for v in column2_values) if is_col1_all_zeros or is_col2_all_zeros: print(alpha_id, "不合法") return False if not check_column(column1_values): print(alpha_id, "不合法") return False if not check_column(column2_values): print(alpha_id, "不合法") return False return True def get_alpha_pnl_legal(s, alpha_id: str) -> bool: not_legal_id = [] pnl = wait_get(s, "https://api.worldquantbrain.com/alphas/" + alpha_id + "/recordsets/pnl").json() records = pnl["records"] if not records: return False date_list = [] for record in records: try: date_obj = datetime.datetime.strptime(record[0], '%Y-%m-%d').date() date_list.append(date_obj) except Exception: return False min_date = min(date_list) max_date = max(date_list) total_days = (max_date - min_date).days if total_days < 2920: return False zero_streak_threshold = 5 * 252 col1_zeros = [record[1] == 0 for record in records] def max_consecutive_zeros(arr): max_streak = current_streak = 0 for val in arr: current_streak = current_streak + 1 if val else 0 max_streak = max(max_streak, current_streak) return max_streak col1_max_zero_streak = max_consecutive_zeros(col1_zeros) if col1_max_zero_streak >= zero_streak_threshold: print(f"{alpha_id} 不合法:存在连续{zero_streak_threshold // 252}年零值") not_legal_id.append(str(alpha_id)) return False if not check_consecutive_non_zero_values(alpha_id, records): return False return True def mute(s, alpha_id): url = "https://api.worldquantbrain.com/alphas/" + alpha_id data = { "hidden": True } response = s.patch(url, json=data) def main(): fo_tracker, s = get_alphas('12-01', '12-31', 1, 0.5, 'USA', 1000, 'submit') f_num = len(fo_tracker) print(f_num, "个alpha 进行pnl合法检测,请耐心等待") count = 0 print(len(fo_tracker)) for i in fo_tracker[::-1][0:]: if count % 25 == 0: print('===========', count, '===========') count += 1 if get_alpha_pnl_legal(s, i[0]) == False: print(i[0], '已经隐藏') mute(s, i[0]) if __name__ == "__main__": main()