#!/usr/bin/env python3 """ 从 CSV 导入 Alpha 数据到 SQLite(修复去重版) """ import sqlite3 import pandas as pd import ast import re from pathlib import Path # 路径配置 CSV_PATH = Path(__file__).parent / "alpha_list.csv" SQLITE_PATH = Path(__file__).parent / "alpha_analysis.db" # 84 个算子集合(同上,省略重复部分,你复制完整的) OPERATORS_SET = { 'add', 'abs', 'log', 'subtract', 'signed_power', 'sign', 'reverse', 'power', 'multiply', 'min', 'max', 'inverse', 'sqrt', 's_log_1p', 'densify', 'divide', 'not', 'and', 'less', 'equal', 'or', 'not_equal', 'greater', 'greater_equal', 'less_equal', 'is_nan', 'if_else', 'ts_sum', 'ts_scale', 'ts_mean', 'ts_zscore', 'ts_std_dev', 'kth_element', 'inst_tvr', 'ts_corr', 'ts_count_nans', 'ts_target_tvr_decay', 'ts_median', 'ts_covariance', 'ts_decay_linear', 'ts_product', 'ts_regression', 'ts_delta_limit', 'ts_step', 'ts_decay_exp_window', 'ts_quantile', 'days_from_last_change', 'hump', 'last_diff_value', 'ts_arg_max', 'ts_arg_min', 'ts_av_diff', 'ts_backfill', 'ts_rank', 'ts_delay', 'ts_delta', 'winsorize', 'truncate', 'regression_neut', 'scale', 'rank', 'quantile', 'normalize', 'zscore', 'vec_min', 'vec_count', 'vec_stddev', 'vec_range', 'vec_avg', 'vec_sum', 'vec_max', 'left_tail', 'trade_when', 'right_tail', 'bucket', 'group_rank', 'group_cartesian_product', 'group_backfill', 'group_mean', 'group_neutralize', 'group_normalize', 'group_median', 'group_scale', 'group_zscore' } WINDOWS = {1, 2, 5, 10, 20, 30, 60, 90, 252} def safe_parse_dict(s): """安全解析 Python 字典字符串""" if pd.isna(s) or s == '': return {} try: return ast.literal_eval(s) except: return {} def extract_features(expression): """从表达式提取特征""" if not expression: return [], [], [] operators = set() for match in re.finditer(r'\b([a-z_][a-z0-9_]*)\s*\(', expression): op = match.group(1) if op in OPERATORS_SET: operators.add(op) candidates = set(re.findall(r'\b([a-z][a-z0-9_]*)\b', expression)) fields = set() for c in candidates: if (c not in OPERATORS_SET and not c.isdigit() and len(c) > 2 and c not in ['true', 'false', 'nan', 'null', 'constant', 'filter']): fields.add(c) windows = set() for match in re.finditer(r'\b(\d+)\b', expression): num = int(match.group(1)) if num in WINDOWS: windows.add(num) return list(operators), list(fields), list(windows) def main(): print("=" * 60) print("开始导入 CSV 到 SQLite") print("=" * 60) # 1. 读取 CSV print("\n📖 读取 CSV...") df = pd.read_csv(CSV_PATH) print(f" 共 {len(df):,} 条记录") # 2. 解析字段 print("\n🔍 解析字段...") df['regular_dict'] = df['regular'].apply(safe_parse_dict) df['expression'] = df['regular_dict'].apply(lambda x: x.get('code', '')) df['settings_dict'] = df['settings'].apply(safe_parse_dict) df['neutralization'] = df['settings_dict'].apply(lambda x: x.get('neutralization')) df['universe'] = df['settings_dict'].apply(lambda x: x.get('universe')) df['is_dict'] = df['is'].apply(safe_parse_dict) df['sharpe'] = df['is_dict'].apply(lambda x: x.get('sharpe')) df['fitness'] = df['is_dict'].apply(lambda x: x.get('fitness')) df['drawdown'] = df['is_dict'].apply(lambda x: x.get('drawdown')) df['turnover'] = df['is_dict'].apply(lambda x: x.get('turnover')) # 过滤有效数据 df_valid = df[df['expression'].notna() & (df['expression'] != '') & df['fitness'].notna()].copy() print(f" 有效记录: {len(df_valid):,} 条") # 3. 去重:同一个 alpha_id 保留 fitness 最高的 print("\n🔄 去重(按 alpha_id 保留最高 fitness)...") df_unique = df_valid.sort_values('fitness', ascending=False).drop_duplicates(subset=['id'], keep='first') print(f" 去重后: {len(df_unique):,} 条") # 4. 连接 SQLite conn = sqlite3.connect(str(SQLITE_PATH)) cursor = conn.cursor() # 清空旧数据 print("\n🗑️ 清空旧数据...") cursor.execute("DELETE FROM alpha_feature_long") cursor.execute("DELETE FROM alpha_success") conn.commit() # 5. 写入 alpha_success print("\n💾 写入 alpha_success 表...") for idx, row in df_unique.iterrows(): try: cursor.execute(""" INSERT INTO alpha_success (alpha_id, expression, sharpe, fitness, drawdown, turnover, neutralization, universe) VALUES (?, ?, ?, ?, ?, ?, ?, ?) """, ( row['id'], row['expression'], row['sharpe'], row['fitness'], row['drawdown'], row['turnover'], row['neutralization'], row['universe'] )) except sqlite3.IntegrityError: # 如果还有重复,跳过(理论上不应该) print(f" 跳过重复: {row['id']}") continue conn.commit() # 获取插入后的 id 映射 cursor.execute("SELECT alpha_id, rowid FROM alpha_success") id_map = {row[0]: row[1] for row in cursor.fetchall()} print(f" 已写入 {len(id_map):,} 条") # 6. 提取特征并写入 alpha_feature_long print("\n🔧 提取特征并写入 alpha_feature_long...") feature_count = 0 processed = 0 for idx, row in df_unique.iterrows(): alpha_id = row['id'] if alpha_id not in id_map: continue sqlite_id = id_map[alpha_id] expression = row['expression'] operators, fields, windows = extract_features(expression) # 插入算子 for op in operators: cursor.execute( "INSERT INTO alpha_feature_long (alpha_id, feature_type, feature_name) VALUES (?, ?, ?)", (sqlite_id, 'operator', op) ) feature_count += 1 # 插入字段 for field in fields: cursor.execute( "INSERT INTO alpha_feature_long (alpha_id, feature_type, feature_name) VALUES (?, ?, ?)", (sqlite_id, 'field', field) ) feature_count += 1 # 插入窗口 for w in windows: cursor.execute( "INSERT INTO alpha_feature_long (alpha_id, feature_type, feature_name, feature_value) VALUES (?, ?, ?, ?)", (sqlite_id, 'window', str(w), w) ) feature_count += 1 # 插入中性化方式 if pd.notna(row['neutralization']): cursor.execute( "INSERT INTO alpha_feature_long (alpha_id, feature_type, feature_name) VALUES (?, ?, ?)", (sqlite_id, 'neutralization', row['neutralization']) ) feature_count += 1 processed += 1 if processed % 1000 == 0: print(f" 已处理 {processed:,} 条,特征数 {feature_count:,}") conn.commit() # 每 1000 条提交一次 conn.commit() # 7. 最终统计 cursor.execute("SELECT COUNT(*) FROM alpha_success") success_count = cursor.fetchone()[0] cursor.execute("SELECT COUNT(*) FROM alpha_feature_long") final_feature_count = cursor.fetchone()[0] print(f"\n✅ 导入完成!") print(f" alpha_success: {success_count:,} 条") print(f" alpha_feature_long: {final_feature_count:,} 条特征") # 8. 验证一条数据 print("\n🔍 验证前 10 条特征:") cursor.execute(""" SELECT a.alpha_id, f.feature_type, f.feature_name FROM alpha_success a JOIN alpha_feature_long f ON a.rowid = f.alpha_id LIMIT 10 """) for row in cursor.fetchall(): print(f" {row[0]} | {row[1]}: {row[2]}") conn.close() if __name__ == "__main__": main()