|
|
# -*- coding: utf-8 -*-
|
|
|
import os
|
|
|
import json
|
|
|
import random
|
|
|
import time
|
|
|
import csv
|
|
|
import httpx
|
|
|
from httpx import BasicAuth
|
|
|
|
|
|
def read_category_json(filename):
|
|
|
# 构建文件路径
|
|
|
file_path = os.path.join('category_files', filename)
|
|
|
|
|
|
try:
|
|
|
# 读取并解析 JSON 文件
|
|
|
with open(file_path, 'r', encoding='utf-8') as f:
|
|
|
data = json.load(f)
|
|
|
print(f"成功读取文件:{file_path}")
|
|
|
return data
|
|
|
except FileNotFoundError:
|
|
|
print(f"错误:文件不存在 - {file_path}")
|
|
|
return None
|
|
|
except json.JSONDecodeError:
|
|
|
print(f"错误:JSON 格式错误 - {file_path}")
|
|
|
return None
|
|
|
except PermissionError:
|
|
|
print(f"错误:没有文件读取权限 - {file_path}")
|
|
|
return None
|
|
|
except Exception as e:
|
|
|
print(f"读取文件时发生未知错误:{e}")
|
|
|
return None
|
|
|
|
|
|
|
|
|
class DataSetDownloader:
|
|
|
def __init__(self):
|
|
|
self.base_api_url = 'https://api.worldquantbrain.com'
|
|
|
self.client = self.login()
|
|
|
|
|
|
def login(self):
|
|
|
"""登录并返回客户端实例"""
|
|
|
username, password = "jack0210_@hotmail.com", "!QAZ2wsx+0913"
|
|
|
client = httpx.Client(auth=BasicAuth(username, password))
|
|
|
|
|
|
try:
|
|
|
response = client.post(f'{self.base_api_url}/authentication')
|
|
|
print(f"登录状态: {response.status_code}")
|
|
|
|
|
|
if response.status_code in [200, 201]:
|
|
|
print("登录成功!")
|
|
|
return client
|
|
|
else:
|
|
|
print(f"登录失败: {response.json()}")
|
|
|
return None
|
|
|
except Exception as e:
|
|
|
print(f"登录过程中出现错误: {e}")
|
|
|
return None
|
|
|
|
|
|
def _debug_response(self, endpoint, data_set_id, offset=0, limit=20):
|
|
|
"""调试请求响应"""
|
|
|
print(f"\n=== 调试请求: {endpoint} ===")
|
|
|
url = f"{self.base_api_url}/{data_set_id}"
|
|
|
params = self._build_params(data_set_id, offset, limit)
|
|
|
|
|
|
response = self.client.get(url, params=params)
|
|
|
|
|
|
if response.status_code == 200:
|
|
|
data = response.json()
|
|
|
print(f"count: {data.get('count')}")
|
|
|
print(f"results 长度: {len(data.get('results', []))}")
|
|
|
print(f"响应键: {list(data.keys())}")
|
|
|
|
|
|
def _build_params(self, data_set_id, region, universe, offset=0, limit=50):
|
|
|
"""构建请求参数"""
|
|
|
return {
|
|
|
'dataset.id': data_set_id,
|
|
|
'delay': 1,
|
|
|
'instrumentType': 'EQUITY',
|
|
|
'limit': limit,
|
|
|
'offset': offset,
|
|
|
'region': region,
|
|
|
'universe': universe
|
|
|
}
|
|
|
|
|
|
def _process_item(self, item):
|
|
|
"""处理单个数据项"""
|
|
|
return {
|
|
|
'id': item.get('id', ''),
|
|
|
'description': item.get('description', ''),
|
|
|
'dataset_id': item.get('dataset', {}).get('id', ''),
|
|
|
'dataset_name': item.get('dataset', {}).get('name', ''),
|
|
|
'category_id': item.get('category', {}).get('id', ''),
|
|
|
'category_name': item.get('category', {}).get('name', ''),
|
|
|
'region': item.get('region', ''),
|
|
|
'delay': item.get('delay', ''),
|
|
|
'universe': item.get('universe', ''),
|
|
|
'type': item.get('type', '')
|
|
|
}
|
|
|
|
|
|
def _process_data(self, raw_data):
|
|
|
"""批量处理数据"""
|
|
|
return [self._process_item(item) for item in raw_data]
|
|
|
|
|
|
def download_data_set(self, data_set_id, region, universe):
|
|
|
endpoint = 'data-fields'
|
|
|
"""下载数据集"""
|
|
|
# 检查登录状态
|
|
|
if not self.client:
|
|
|
print("❌ 客户端未初始化,无法下载数据")
|
|
|
return
|
|
|
|
|
|
# 调试请求
|
|
|
self._debug_response(endpoint, data_set_id, offset=0, limit=20)
|
|
|
|
|
|
# 获取数据总数
|
|
|
url = f"{self.base_api_url}/{endpoint}"
|
|
|
params = self._build_params(data_set_id, region, universe, limit=1)
|
|
|
|
|
|
response = self.client.get(url, params=params)
|
|
|
data = response.json()
|
|
|
total_count = data.get('count', 0)
|
|
|
|
|
|
print(f"📊 数据集总数: {total_count}")
|
|
|
|
|
|
if total_count == 0:
|
|
|
print("❌ 没有找到数据")
|
|
|
return
|
|
|
|
|
|
# 下载所有数据
|
|
|
limit = 50
|
|
|
all_data = []
|
|
|
print("🚀 开始下载数据...")
|
|
|
|
|
|
for offset in range(0, total_count, limit):
|
|
|
time.sleep(random.uniform(1.0, 1.5))
|
|
|
|
|
|
params = self._build_params(data_set_id, region, universe, offset, limit) # 修正参数
|
|
|
print(f"📥 下载进度: {offset}/{total_count} ({offset / total_count * 100:.1f}%)")
|
|
|
|
|
|
results = []
|
|
|
|
|
|
retry = 3
|
|
|
while retry > 0:
|
|
|
try:
|
|
|
response = self.client.get(url, params=params)
|
|
|
|
|
|
if response.status_code == 200:
|
|
|
data = response.json()
|
|
|
results = data.get('results', [])
|
|
|
|
|
|
print(f"✅ 本页获取到 {len(results)} 条记录")
|
|
|
all_data.extend(results)
|
|
|
|
|
|
# 成功时退出重试循环
|
|
|
break
|
|
|
|
|
|
else:
|
|
|
print(f"❌ 请求失败: {response.status_code}")
|
|
|
retry -= 1
|
|
|
if retry > 0:
|
|
|
print(f"🔄 重试中... ({retry}次剩余)")
|
|
|
time.sleep(random.uniform(2, 3))
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f"❌ 下载过程中出错: {e}")
|
|
|
retry -= 1
|
|
|
if retry > 0:
|
|
|
print(f"🔄 重试中... ({retry}次剩余)")
|
|
|
time.sleep(random.uniform(2, 3))
|
|
|
|
|
|
# 如果重试用完仍失败,跳过当前offset继续下一个
|
|
|
if retry == 0:
|
|
|
print(f"⚠️ 跳过 offset {offset}")
|
|
|
continue
|
|
|
|
|
|
if len(results) < limit:
|
|
|
print("🎯 到达数据末尾")
|
|
|
break
|
|
|
|
|
|
time.sleep(random.uniform(10, 15))
|
|
|
|
|
|
# 处理数据
|
|
|
print("🔄 处理数据中...")
|
|
|
processed_data = self._process_data(all_data)
|
|
|
|
|
|
# 确保输出目录存在
|
|
|
output_dir = 'reference_fields'
|
|
|
os.makedirs(output_dir, exist_ok=True)
|
|
|
|
|
|
# 保存为CSV
|
|
|
output_file = os.path.join(output_dir, f"{data_set_id}_{region.lower()}_{universe.lower()}.csv")
|
|
|
|
|
|
if processed_data:
|
|
|
fieldnames = list(processed_data[0].keys())
|
|
|
with open(output_file, 'w', encoding='utf-8', newline='') as f:
|
|
|
writer = csv.DictWriter(f, fieldnames=fieldnames)
|
|
|
writer.writeheader()
|
|
|
writer.writerows(processed_data)
|
|
|
|
|
|
print(f"💾 处理后的数据已保存到: {output_file}")
|
|
|
print(f"🎉 总共处理了 {len(processed_data)} 条记录")
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
# category_list = [
|
|
|
# 'analyst',
|
|
|
# 'broker',
|
|
|
# 'earnings',
|
|
|
# 'fundamental',
|
|
|
# 'imbalance',
|
|
|
# 'insiders',
|
|
|
# 'institutions',
|
|
|
# 'macro',
|
|
|
# 'model',
|
|
|
# 'news',
|
|
|
# 'option',
|
|
|
# 'other',
|
|
|
# 'pv',
|
|
|
# 'risk',
|
|
|
# 'sentiment',
|
|
|
# 'shortinterest',
|
|
|
# 'socialmedia'
|
|
|
# ]
|
|
|
|
|
|
plan_to_download = read_category_json('analyst.json')
|
|
|
|
|
|
downloader = DataSetDownloader()
|
|
|
|
|
|
if downloader.client:
|
|
|
for item in plan_to_download:
|
|
|
downloader.download_data_set(item['id'], item['region'], item['universe'])
|
|
|
time.sleep(random.uniform(20, 30))
|
|
|
else:
|
|
|
print("❌ 登录失败,无法下载数据") |