You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 

195 lines
6.3 KiB

import os
import json
import random
import time
import httpx
from httpx import BasicAuth
class BrainLogin:
def __init__(self, credentials_file='account.txt'):
self.credentials_file = credentials_file
self.client = None
self.brain_api_url = 'https://api.worldquantbrain.com'
def load_credentials(self):
if not os.path.exists(self.credentials_file):
print("未找到 account.txt 文件")
with open(self.credentials_file, 'w') as f:
f.write("")
print("account.txt 文件已创建,请填写账号密码, 格式: ['username', 'password']")
exit(1)
with open(self.credentials_file) as f:
credentials = eval(f.read())
return credentials[0], credentials[1]
def login(self):
try:
username, password = self.load_credentials()
self.client = httpx.Client(auth=BasicAuth(username, password))
response = self.client.post(f'{self.brain_api_url}/authentication')
print(f"登录状态: {response.status_code}")
if response.status_code in [200, 201]:
print("登录成功!")
print(f"账户信息: {response.json()}")
return self.client
else:
print(f"登录失败: {response.json()}")
return None
except Exception as e:
print(f"登录过程中出现错误: {e}")
return None
def get_client(self):
return self.client
class DataSetDownloader:
def __init__(self, client):
self.client = client
self.base_api_url = 'https://api.worldquantbrain.com'
def debug_detailed_response(self, endpoint, data_set_id, offset, limit=20):
print(f"\n=== 调试请求: {endpoint} ===")
url = f"{self.base_api_url}/{endpoint}"
params = {
'dataset.id': data_set_id,
'delay': 1,
'instrumentType': 'EQUITY',
'limit': limit,
'offset': offset,
'region': 'USA',
'universe': 'TOP3000'
}
response = self.client.get(url, params=params)
if response.status_code == 200:
data = response.json()
print(f"count: {data.get('count')}")
print(f"results 长度: {len(data.get('results', []))}")
print(f"响应键: {list(data.keys())}")
def process_data(self, raw_data):
processed_data = []
for item in raw_data:
processed_item = {
'id': item.get('id', ''),
'description': item.get('description', ''),
'dataset_id': item.get('dataset', {}).get('id', ''),
'dataset_name': item.get('dataset', {}).get('name', ''),
'category_id': item.get('category', {}).get('id', ''),
'category_name': item.get('category', {}).get('name', ''),
'region': item.get('region', ''),
'delay': item.get('delay', ''),
'universe': item.get('universe', ''),
'type': item.get('type', '')
}
processed_data.append(processed_item)
return processed_data
def download_data_set(self, endpoint, data_set_id):
output_dir = 'reference_fields'
if not os.path.exists(output_dir):
os.makedirs(output_dir)
self.debug_detailed_response(endpoint, data_set_id, offset=0, limit=20)
url = f"{self.base_api_url}/{endpoint}"
params = {
'dataset.id': data_set_id,
'delay': 1,
'instrumentType': 'EQUITY',
'limit': 1,
'offset': 0,
'region': 'USA',
'universe': 'TOP3000'
}
response = self.client.get(url, params=params)
data = response.json()
total_count = data.get('count', 0)
print(f"📊 数据集总数: {total_count}")
if total_count == 0:
print("❌ 没有找到数据")
return
limit = 50
all_data = []
print("🚀 开始下载数据...")
for offset in range(0, total_count, limit):
sleep_time = random.uniform(1.0, 1.5)
time.sleep(sleep_time)
params = {
'dataset.id': data_set_id,
'delay': 1,
'instrumentType': 'EQUITY',
'limit': limit,
'offset': offset,
'region': 'USA',
'universe': 'TOP3000'
}
print(f"📥 下载进度: {offset}/{total_count} ({offset / total_count * 100:.1f}%)")
try:
response = self.client.get(url, params=params)
if response.status_code == 200:
data = response.json()
results = data.get('results', [])
print(f"✅ 本页获取到 {len(results)} 条记录")
all_data.extend(results)
print(f"✅ 累计获取 {len(all_data)} 条记录")
if len(results) < limit:
print("🎯 到达数据末尾")
break
else:
print(f"❌ 请求失败: {response.status_code}")
break
except Exception as e:
print(f"❌ 下载过程中出错: {e}")
break
print("🔄 处理数据中...")
processed_data = self.process_data(all_data)
output_file = os.path.join(output_dir, f"{data_set_id}_{endpoint}.json")
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(processed_data, f, ensure_ascii=False, indent=2)
print(f"💾 处理后的数据已保存到: {output_file}")
print(f"🎉 总共处理了 {len(processed_data)} 条记录")
if processed_data:
print("\n📋 处理后数据示例:")
print(json.dumps(processed_data[0], indent=2, ensure_ascii=False))
if __name__ == "__main__":
brain_login = BrainLogin()
client = brain_login.login()
if client:
downloader = DataSetDownloader(client)
endpoint_list = ['data-sets', 'data-fields']
endpoint = endpoint_list[0]
data_set_id = 'analyst4'
downloader.download_data_set(endpoint, data_set_id)
else:
print("❌ 登录失败,无法下载数据")