You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

189 lines
6.4 KiB

#!/usr/bin/env python3
"""
计算特征增益值(修复版 v2)
"""
import sqlite3
import pandas as pd
from pathlib import Path
SQLITE_PATH = Path(__file__).parent / "alpha_analysis.db"
def main():
print("=" * 60)
print("开始计算特征增益值")
print("=" * 60)
conn = sqlite3.connect(str(SQLITE_PATH))
# 1. 获取所有 Alpha 的 fitness(使用 id 作为主键)
print("\n📊 获取 Alpha 数据...")
df_alpha = pd.read_sql_query("""
SELECT id, alpha_id, fitness
FROM alpha_success
WHERE fitness IS NOT NULL
""", conn)
print(f" 列名: {list(df_alpha.columns)}")
global_avg = df_alpha['fitness'].mean()
print(f" 全局平均 fitness: {global_avg:.4f}")
print(f" 样本数: {len(df_alpha):,}")
# 2. 获取所有特征
print("\n🔧 获取特征数据...")
df_features = pd.read_sql_query("""
SELECT alpha_id, feature_type, feature_name
FROM alpha_feature_long
""", conn)
print(f" 总特征数: {len(df_features):,}")
# 3. 合并 Alpha 的 fitness 到特征
print("\n📈 计算每个特征的平均分...")
# 建立 fitness 映射(使用 id)
fitness_map = dict(zip(df_alpha['id'], df_alpha['fitness']))
df_features['fitness'] = df_features['alpha_id'].map(fitness_map)
# 过滤掉没有 fitness 的特征
df_features = df_features[df_features['fitness'].notna()]
print(f" 有效特征数: {len(df_features):,}")
# 按特征分组统计
stats = df_features.groupby(['feature_type', 'feature_name']).agg(
avg_score=('fitness', 'mean'),
sample_count=('fitness', 'count'),
total_fitness=('fitness', 'sum')
).reset_index()
# 计算增益值
stats['gain'] = stats['avg_score'] - global_avg
# 按样本数过滤(至少出现 10 次)
stats_filtered = stats[stats['sample_count'] >= 10].copy()
print(f" 过滤后特征数(样本≥10): {len(stats_filtered):,}")
# 4. 按增益值排序
stats_positive = stats_filtered[stats_filtered['gain'] > 0.05].sort_values('gain', ascending=False)
stats_negative = stats_filtered[stats_filtered['gain'] < -0.05].sort_values('gain', ascending=True)
print(f"\n✅ 统计完成:")
print(f" 总特征类型数: {len(stats_filtered):,}")
print(f" 正向特征 (gain > 0.05): {len(stats_positive):,}")
print(f" 负向特征 (gain < -0.05): {len(stats_negative):,}")
# 5. 输出正向特征 Top 30
print("\n" + "=" * 60)
print("📈 正向特征 Top 30 (增益值 > 0.05)")
print("=" * 60)
for _, row in stats_positive.head(30).iterrows():
print(f" {row['feature_type']:15} {row['feature_name']:35} 增益: {row['gain']:+.4f} (样本: {row['sample_count']:,})")
# 6. 输出负向特征 Bottom 30
if len(stats_negative) > 0:
print("\n" + "=" * 60)
print("📉 负向特征 Bottom 30 (增益值 < -0.05)")
print("=" * 60)
for _, row in stats_negative.head(30).iterrows():
print(f" {row['feature_type']:15} {row['feature_name']:35} 增益: {row['gain']:+.4f} (样本: {row['sample_count']:,})")
# 7. 保存结果到 feature_statistics 表
print("\n💾 保存到 feature_statistics 表...")
# 清空旧数据
conn.execute("DELETE FROM feature_statistics")
# 插入新数据
for _, row in stats_filtered.iterrows():
positive_effect = 0
if row['gain'] > 0.05:
positive_effect = 1
elif row['gain'] < -0.05:
positive_effect = -1
conn.execute("""
INSERT INTO feature_statistics
(feature_type, feature_name, gain_value, avg_score, global_avg_score, sample_count, positive_effect)
VALUES (?, ?, ?, ?, ?, ?, ?)
""", (
row['feature_type'],
row['feature_name'],
row['gain'],
row['avg_score'],
global_avg,
row['sample_count'],
positive_effect
))
conn.commit()
# 8. 更新 generation_bias 表
print("\n💾 更新 generation_bias 表...")
conn.execute("DELETE FROM generation_bias")
# 只取增益绝对值 > 0.05 且样本数 >= 10 的特征
bias_features = stats_filtered[abs(stats_filtered['gain']) > 0.05]
for _, row in bias_features.iterrows():
bias_weight = 1.0
if row['gain'] > 0.05:
bias_weight = 1.0 + min(row['gain'] * 2, 2.0)
elif row['gain'] < -0.05:
bias_weight = max(0.1, 1.0 - abs(row['gain']) * 2)
conn.execute("""
INSERT INTO generation_bias
(feature_type, feature_name, bias_weight, gain_value, sample_count, is_active)
VALUES (?, ?, ?, ?, ?, 1)
""", (
row['feature_type'],
row['feature_name'],
bias_weight,
row['gain'],
row['sample_count']
))
conn.commit()
# 9. 统计信息
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM feature_statistics")
stats_count = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(*) FROM generation_bias")
bias_count = cursor.fetchone()[0]
print(f"\n✅ 完成!")
print(f" feature_statistics: {stats_count:,}")
print(f" generation_bias: {bias_count:,}")
# 10. 显示一些关键发现
print("\n" + "=" * 60)
print("💡 关键发现")
print("=" * 60)
# 最佳算子
best_ops = stats_positive[stats_positive['feature_type'] == 'operator'].head(5)
if len(best_ops) > 0:
print("\n🏆 最佳算子 (增益最高):")
for _, row in best_ops.iterrows():
print(f" {row['feature_name']}: 增益 {row['gain']:+.4f}")
# 最佳字段
best_fields = stats_positive[stats_positive['feature_type'] == 'field'].head(5)
if len(best_fields) > 0:
print("\n🏆 最佳字段 (增益最高):")
for _, row in best_fields.iterrows():
print(f" {row['feature_name']}: 增益 {row['gain']:+.4f}")
# 最差算子
worst_ops = stats_negative[stats_negative['feature_type'] == 'operator'].tail(5)
if len(worst_ops) > 0:
print("\n 最差算子 (增益最低):")
for _, row in worst_ops.iterrows():
print(f" {row['feature_name']}: 增益 {row['gain']:+.4f}")
conn.close()
if __name__ == "__main__":
main()