import os import json import random import time import httpx from httpx import BasicAuth class BrainLogin: def __init__(self, credentials_file='account.txt'): self.credentials_file = credentials_file self.client = None self.brain_api_url = 'https://api.worldquantbrain.com' def load_credentials(self): if not os.path.exists(self.credentials_file): print("未找到 account.txt 文件") with open(self.credentials_file, 'w') as f: f.write("") print("account.txt 文件已创建,请填写账号密码, 格式: ['username', 'password']") exit(1) with open(self.credentials_file) as f: credentials = eval(f.read()) return credentials[0], credentials[1] def login(self): try: username, password = self.load_credentials() self.client = httpx.Client(auth=BasicAuth(username, password)) response = self.client.post(f'{self.brain_api_url}/authentication') print(f"登录状态: {response.status_code}") if response.status_code in [200, 201]: print("登录成功!") print(f"账户信息: {response.json()}") return self.client else: print(f"登录失败: {response.json()}") return None except Exception as e: print(f"登录过程中出现错误: {e}") return None def get_client(self): return self.client class DataSetDownloader: def __init__(self, client): self.client = client self.base_api_url = 'https://api.worldquantbrain.com' def debug_detailed_response(self, endpoint, data_set_id, offset, limit=20): print(f"\n=== 调试请求: {endpoint} ===") url = f"{self.base_api_url}/{endpoint}" params = { 'dataset.id': data_set_id, 'delay': 1, 'instrumentType': 'EQUITY', 'limit': limit, 'offset': offset, 'region': 'USA', 'universe': 'TOP3000' } response = self.client.get(url, params=params) if response.status_code == 200: data = response.json() print(f"count: {data.get('count')}") print(f"results 长度: {len(data.get('results', []))}") print(f"响应键: {list(data.keys())}") def process_data(self, raw_data): processed_data = [] for item in raw_data: processed_item = { 'id': item.get('id', ''), 'description': item.get('description', ''), 'dataset_id': item.get('dataset', {}).get('id', ''), 'dataset_name': item.get('dataset', {}).get('name', ''), 'category_id': item.get('category', {}).get('id', ''), 'category_name': item.get('category', {}).get('name', ''), 'region': item.get('region', ''), 'delay': item.get('delay', ''), 'universe': item.get('universe', ''), 'type': item.get('type', '') } processed_data.append(processed_item) return processed_data def download_data_set(self, endpoint, data_set_id): output_dir = 'reference_fields' if not os.path.exists(output_dir): os.makedirs(output_dir) self.debug_detailed_response(endpoint, data_set_id, offset=0, limit=20) url = f"{self.base_api_url}/{endpoint}" params = { 'dataset.id': data_set_id, 'delay': 1, 'instrumentType': 'EQUITY', 'limit': 1, 'offset': 0, 'region': 'USA', 'universe': 'TOP3000' } response = self.client.get(url, params=params) data = response.json() total_count = data.get('count', 0) print(f"📊 数据集总数: {total_count}") if total_count == 0: print("❌ 没有找到数据") return limit = 50 all_data = [] print("🚀 开始下载数据...") for offset in range(0, total_count, limit): sleep_time = random.uniform(1.0, 1.5) time.sleep(sleep_time) params = { 'dataset.id': data_set_id, 'delay': 1, 'instrumentType': 'EQUITY', 'limit': limit, 'offset': offset, 'region': 'USA', 'universe': 'TOP3000' } print(f"📥 下载进度: {offset}/{total_count} ({offset / total_count * 100:.1f}%)") try: response = self.client.get(url, params=params) if response.status_code == 200: data = response.json() results = data.get('results', []) print(f"✅ 本页获取到 {len(results)} 条记录") all_data.extend(results) print(f"✅ 累计获取 {len(all_data)} 条记录") if len(results) < limit: print("🎯 到达数据末尾") break else: print(f"❌ 请求失败: {response.status_code}") break except Exception as e: print(f"❌ 下载过程中出错: {e}") break print("🔄 处理数据中...") processed_data = self.process_data(all_data) output_file = os.path.join(output_dir, f"{data_set_id}_{endpoint}.json") with open(output_file, 'w', encoding='utf-8') as f: json.dump(processed_data, f, ensure_ascii=False, indent=2) print(f"💾 处理后的数据已保存到: {output_file}") print(f"🎉 总共处理了 {len(processed_data)} 条记录") if processed_data: print("\n📋 处理后数据示例:") print(json.dumps(processed_data[0], indent=2, ensure_ascii=False)) if __name__ == "__main__": brain_login = BrainLogin() client = brain_login.login() if client: downloader = DataSetDownloader(client) endpoint_list = ['data-sets', 'data-fields'] endpoint = endpoint_list[0] data_set_id = 'analyst4' downloader.download_data_set(endpoint, data_set_id) else: print("❌ 登录失败,无法下载数据")