# -*- coding: utf-8 -*- import os import json import random import time import csv import httpx from httpx import BasicAuth class DataSetDownloader: def __init__(self): self.base_api_url = 'https://api.worldquantbrain.com' self.client = self.login() def login(self): """登录并返回客户端实例""" username, password = "jack0210_@hotmail.com", "!QAZ2wsx+0913" client = httpx.Client(auth=BasicAuth(username, password)) try: response = client.post(f'{self.base_api_url}/authentication') print(f"登录状态: {response.status_code}") if response.status_code in [200, 201]: print("登录成功!") return client else: print(f"登录失败: {response.json()}") return None except Exception as e: print(f"登录过程中出现错误: {e}") return None def _debug_response(self, endpoint, data_set_id, offset=0, limit=20): """调试请求响应""" print(f"\n=== 调试请求: {endpoint} ===") url = f"{self.base_api_url}/{data_set_id}" params = self._build_params(data_set_id, offset, limit) response = self.client.get(url, params=params) if response.status_code == 200: data = response.json() print(f"count: {data.get('count')}") print(f"results 长度: {len(data.get('results', []))}") print(f"响应键: {list(data.keys())}") def _build_params(self, data_set_id, region, universe, offset=0, limit=50): """构建请求参数""" return { 'dataset.id': data_set_id, 'delay': 1, 'instrumentType': 'EQUITY', 'limit': limit, 'offset': offset, 'region': region, 'universe': universe } def _process_item(self, item): """处理单个数据项""" return { 'id': item.get('id', ''), 'description': item.get('description', ''), 'dataset_id': item.get('dataset', {}).get('id', ''), 'dataset_name': item.get('dataset', {}).get('name', ''), 'category_id': item.get('category', {}).get('id', ''), 'category_name': item.get('category', {}).get('name', ''), 'region': item.get('region', ''), 'delay': item.get('delay', ''), 'universe': item.get('universe', ''), 'type': item.get('type', '') } def _process_data(self, raw_data): """批量处理数据""" return [self._process_item(item) for item in raw_data] def download_data_set(self, data_set_id, region, universe): endpoint = 'data-fields' """下载数据集""" # 检查登录状态 if not self.client: print("❌ 客户端未初始化,无法下载数据") return # 调试请求 self._debug_response(endpoint, data_set_id, offset=0, limit=20) # 获取数据总数 url = f"{self.base_api_url}/{endpoint}" params = self._build_params(data_set_id, region, universe, limit=1) response = self.client.get(url, params=params) data = response.json() total_count = data.get('count', 0) print(f"📊 数据集总数: {total_count}") if total_count == 0: print("❌ 没有找到数据") return # 下载所有数据 limit = 50 all_data = [] print("🚀 开始下载数据...") for offset in range(0, total_count, limit): time.sleep(random.uniform(1.0, 1.5)) params = self._build_params(data_set_id, region, universe, offset, limit) # 修正参数 print(f"📥 下载进度: {offset}/{total_count} ({offset / total_count * 100:.1f}%)") results = [] retry = 3 while retry > 0: try: response = self.client.get(url, params=params) if response.status_code == 200: data = response.json() results = data.get('results', []) print(f"✅ 本页获取到 {len(results)} 条记录") all_data.extend(results) # 成功时退出重试循环 break else: print(f"❌ 请求失败: {response.status_code}") retry -= 1 if retry > 0: print(f"🔄 重试中... ({retry}次剩余)") time.sleep(random.uniform(2, 3)) except Exception as e: print(f"❌ 下载过程中出错: {e}") retry -= 1 if retry > 0: print(f"🔄 重试中... ({retry}次剩余)") time.sleep(random.uniform(2, 3)) # 如果重试用完仍失败,跳过当前offset继续下一个 if retry == 0: print(f"⚠️ 跳过 offset {offset}") continue if len(results) < limit: print("🎯 到达数据末尾") break time.sleep(random.uniform(4, 6)) # 处理数据 print("🔄 处理数据中...") processed_data = self._process_data(all_data) # 确保输出目录存在 output_dir = 'reference_fields' os.makedirs(output_dir, exist_ok=True) # 保存为CSV output_file = os.path.join(output_dir, f"{data_set_id}_{region.lower()}_{universe.lower()}.csv") if processed_data: fieldnames = list(processed_data[0].keys()) with open(output_file, 'w', encoding='utf-8', newline='') as f: writer = csv.DictWriter(f, fieldnames=fieldnames) writer.writeheader() writer.writerows(processed_data) print(f"💾 处理后的数据已保存到: {output_file}") print(f"🎉 总共处理了 {len(processed_data)} 条记录") if __name__ == "__main__": downloader = DataSetDownloader() if downloader.client: plan_to_download = [ { 'id': 'analyst10', 'region': 'USA', 'universe': 'TOP3000' }, { 'id': 'analyst10', 'region': 'GLB', 'universe': 'TOP3000' } ] for item in plan_to_download: downloader.download_data_set(item['id'], item['region'], item['universe']) time.sleep(random.uniform(20, 30)) else: print("❌ 登录失败,无法下载数据")