You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
alpha_tools/alpha-forge/simple_frature_analysis.py

232 lines
8.0 KiB

#!/usr/bin/env python3
"""
从 CSV 导入 Alpha 数据到 SQLite(修复去重版)
"""
import sqlite3
import pandas as pd
import ast
import re
from pathlib import Path
# 路径配置
CSV_PATH = Path(__file__).parent / "alpha_list.csv"
SQLITE_PATH = Path(__file__).parent / "alpha_analysis.db"
# 84 个算子集合(同上,省略重复部分,你复制完整的)
OPERATORS_SET = {
'add', 'abs', 'log', 'subtract', 'signed_power', 'sign', 'reverse', 'power',
'multiply', 'min', 'max', 'inverse', 'sqrt', 's_log_1p', 'densify', 'divide',
'not', 'and', 'less', 'equal', 'or', 'not_equal', 'greater', 'greater_equal',
'less_equal', 'is_nan', 'if_else', 'ts_sum', 'ts_scale', 'ts_mean', 'ts_zscore',
'ts_std_dev', 'kth_element', 'inst_tvr', 'ts_corr', 'ts_count_nans',
'ts_target_tvr_decay', 'ts_median', 'ts_covariance', 'ts_decay_linear',
'ts_product', 'ts_regression', 'ts_delta_limit', 'ts_step', 'ts_decay_exp_window',
'ts_quantile', 'days_from_last_change', 'hump', 'last_diff_value', 'ts_arg_max',
'ts_arg_min', 'ts_av_diff', 'ts_backfill', 'ts_rank', 'ts_delay', 'ts_delta',
'winsorize', 'truncate', 'regression_neut', 'scale', 'rank', 'quantile',
'normalize', 'zscore', 'vec_min', 'vec_count', 'vec_stddev', 'vec_range',
'vec_avg', 'vec_sum', 'vec_max', 'left_tail', 'trade_when', 'right_tail',
'bucket', 'group_rank', 'group_cartesian_product', 'group_backfill', 'group_mean',
'group_neutralize', 'group_normalize', 'group_median', 'group_scale', 'group_zscore'
}
WINDOWS = {1, 2, 5, 10, 20, 30, 60, 90, 252}
def safe_parse_dict(s):
"""安全解析 Python 字典字符串"""
if pd.isna(s) or s == '':
return {}
try:
return ast.literal_eval(s)
except:
return {}
def extract_features(expression):
"""从表达式提取特征"""
if not expression:
return [], [], []
operators = set()
for match in re.finditer(r'\b([a-z_][a-z0-9_]*)\s*\(', expression):
op = match.group(1)
if op in OPERATORS_SET:
operators.add(op)
candidates = set(re.findall(r'\b([a-z][a-z0-9_]*)\b', expression))
fields = set()
for c in candidates:
if (c not in OPERATORS_SET and
not c.isdigit() and
len(c) > 2 and
c not in ['true', 'false', 'nan', 'null', 'constant', 'filter']):
fields.add(c)
windows = set()
for match in re.finditer(r'\b(\d+)\b', expression):
num = int(match.group(1))
if num in WINDOWS:
windows.add(num)
return list(operators), list(fields), list(windows)
def main():
print("=" * 60)
print("开始导入 CSV 到 SQLite")
print("=" * 60)
# 1. 读取 CSV
print("\n📖 读取 CSV...")
df = pd.read_csv(CSV_PATH)
print(f"{len(df):,} 条记录")
# 2. 解析字段
print("\n🔍 解析字段...")
df['regular_dict'] = df['regular'].apply(safe_parse_dict)
df['expression'] = df['regular_dict'].apply(lambda x: x.get('code', ''))
df['settings_dict'] = df['settings'].apply(safe_parse_dict)
df['neutralization'] = df['settings_dict'].apply(lambda x: x.get('neutralization'))
df['universe'] = df['settings_dict'].apply(lambda x: x.get('universe'))
df['is_dict'] = df['is'].apply(safe_parse_dict)
df['sharpe'] = df['is_dict'].apply(lambda x: x.get('sharpe'))
df['fitness'] = df['is_dict'].apply(lambda x: x.get('fitness'))
df['drawdown'] = df['is_dict'].apply(lambda x: x.get('drawdown'))
df['turnover'] = df['is_dict'].apply(lambda x: x.get('turnover'))
# 过滤有效数据
df_valid = df[df['expression'].notna() & (df['expression'] != '') & df['fitness'].notna()].copy()
print(f" 有效记录: {len(df_valid):,}")
# 3. 去重:同一个 alpha_id 保留 fitness 最高的
print("\n🔄 去重(按 alpha_id 保留最高 fitness)...")
df_unique = df_valid.sort_values('fitness', ascending=False).drop_duplicates(subset=['id'], keep='first')
print(f" 去重后: {len(df_unique):,}")
# 4. 连接 SQLite
conn = sqlite3.connect(str(SQLITE_PATH))
cursor = conn.cursor()
# 清空旧数据
print("\n🗑 清空旧数据...")
cursor.execute("DELETE FROM alpha_feature_long")
cursor.execute("DELETE FROM alpha_success")
conn.commit()
# 5. 写入 alpha_success
print("\n💾 写入 alpha_success 表...")
for idx, row in df_unique.iterrows():
try:
cursor.execute("""
INSERT INTO alpha_success
(alpha_id, expression, sharpe, fitness, drawdown, turnover, neutralization, universe)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", (
row['id'],
row['expression'],
row['sharpe'],
row['fitness'],
row['drawdown'],
row['turnover'],
row['neutralization'],
row['universe']
))
except sqlite3.IntegrityError:
# 如果还有重复,跳过(理论上不应该)
print(f" 跳过重复: {row['id']}")
continue
conn.commit()
# 获取插入后的 id 映射
cursor.execute("SELECT alpha_id, rowid FROM alpha_success")
id_map = {row[0]: row[1] for row in cursor.fetchall()}
print(f" 已写入 {len(id_map):,}")
# 6. 提取特征并写入 alpha_feature_long
print("\n🔧 提取特征并写入 alpha_feature_long...")
feature_count = 0
processed = 0
for idx, row in df_unique.iterrows():
alpha_id = row['id']
if alpha_id not in id_map:
continue
sqlite_id = id_map[alpha_id]
expression = row['expression']
operators, fields, windows = extract_features(expression)
# 插入算子
for op in operators:
cursor.execute(
"INSERT INTO alpha_feature_long (alpha_id, feature_type, feature_name) VALUES (?, ?, ?)",
(sqlite_id, 'operator', op)
)
feature_count += 1
# 插入字段
for field in fields:
cursor.execute(
"INSERT INTO alpha_feature_long (alpha_id, feature_type, feature_name) VALUES (?, ?, ?)",
(sqlite_id, 'field', field)
)
feature_count += 1
# 插入窗口
for w in windows:
cursor.execute(
"INSERT INTO alpha_feature_long (alpha_id, feature_type, feature_name, feature_value) VALUES (?, ?, ?, ?)",
(sqlite_id, 'window', str(w), w)
)
feature_count += 1
# 插入中性化方式
if pd.notna(row['neutralization']):
cursor.execute(
"INSERT INTO alpha_feature_long (alpha_id, feature_type, feature_name) VALUES (?, ?, ?)",
(sqlite_id, 'neutralization', row['neutralization'])
)
feature_count += 1
processed += 1
if processed % 1000 == 0:
print(f" 已处理 {processed:,} 条,特征数 {feature_count:,}")
conn.commit() # 每 1000 条提交一次
conn.commit()
# 7. 最终统计
cursor.execute("SELECT COUNT(*) FROM alpha_success")
success_count = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(*) FROM alpha_feature_long")
final_feature_count = cursor.fetchone()[0]
print(f"\n✅ 导入完成!")
print(f" alpha_success: {success_count:,}")
print(f" alpha_feature_long: {final_feature_count:,} 条特征")
# 8. 验证一条数据
print("\n🔍 验证前 10 条特征:")
cursor.execute("""
SELECT a.alpha_id, f.feature_type, f.feature_name
FROM alpha_success a
JOIN alpha_feature_long f ON a.rowid = f.alpha_id
LIMIT 10
""")
for row in cursor.fetchall():
print(f" {row[0]} | {row[1]}: {row[2]}")
conn.close()
if __name__ == "__main__":
main()