You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

217 lines
8.1 KiB

#!/usr/bin/env python3
"""
从 PostgreSQL 读取表达式,提取字段名
所有输出保存到文件,不在终端显示过多内容
"""
import re
import psycopg2
from collections import Counter
import csv
import sys
from datetime import datetime
# 数据库连接配置
DB_CONFIG = {
'host': '192.168.31.41',
'port': 30018,
'user': 'jack',
'password': 'aaaAAA111',
'database': 'quantify'
}
# 算子列表(84个,从 CSV 提取)
OPERATORS = {
'add', 'abs', 'log', 'subtract', 'signed_power', 'sign', 'reverse', 'power',
'multiply', 'min', 'max', 'inverse', 'sqrt', 's_log_1p', 'densify', 'divide',
'not', 'and', 'less', 'equal', 'or', 'not_equal', 'greater', 'greater_equal',
'less_equal', 'is_nan', 'if_else', 'ts_sum', 'ts_scale', 'ts_mean', 'ts_zscore',
'ts_std_dev', 'kth_element', 'inst_tvr', 'ts_corr', 'ts_count_nans',
'ts_target_tvr_decay', 'ts_median', 'ts_covariance', 'ts_decay_linear',
'ts_product', 'ts_regression', 'ts_delta_limit', 'ts_step', 'ts_decay_exp_window',
'ts_quantile', 'days_from_last_change', 'hump', 'last_diff_value', 'ts_arg_max',
'ts_arg_min', 'ts_av_diff', 'ts_backfill', 'ts_rank', 'ts_delay', 'ts_delta',
'winsorize', 'truncate', 'regression_neut', 'scale', 'rank', 'quantile',
'normalize', 'zscore', 'vec_min', 'vec_count', 'vec_stddev', 'vec_range',
'vec_avg', 'vec_sum', 'vec_max', 'left_tail', 'trade_when', 'right_tail',
'bucket', 'group_rank', 'group_cartesian_product', 'group_backfill', 'group_mean',
'group_neutralize', 'group_normalize', 'group_median', 'group_scale', 'group_zscore'
}
class TeeLogger:
"""同时输出到终端和文件"""
def __init__(self, filename):
self.terminal = sys.stdout
self.log_file = open(filename, 'w', encoding='utf-8')
def write(self, message):
self.terminal.write(message)
self.log_file.write(message)
def flush(self):
self.terminal.flush()
self.log_file.flush()
def close(self):
self.log_file.close()
def extract_fields(expression):
"""
从表达式字符串中提取字段名
字段名特征:小写字母+数字+下划线,不在算子列表里,不是数字
"""
candidates = re.findall(r'\b[a-z][a-z0-9_]*\b', expression)
fields = []
for c in candidates:
# 过滤条件
if (c.isdigit() or
c in OPERATORS or
len(c) < 3 or
c in ['true', 'false', 'nan', 'null', 'constant']):
continue
fields.append(c)
return fields
def main():
# 设置日志文件
log_filename = f'extract_fields_{datetime.now().strftime("%Y%m%d_%H%M%S")}.log'
logger = TeeLogger(log_filename)
sys.stdout = logger
try:
print("=" * 60)
print(f"开始时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print("=" * 60)
print("\n🔌 连接数据库...")
conn = psycopg2.connect(**DB_CONFIG)
conn.autocommit = True # 不开启事务,避免锁表
try:
cursor = conn.cursor()
# 先统计总数
print("📊 统计总表达式数...")
cursor.execute("SELECT COUNT(*) FROM alpha_expression_line")
total = cursor.fetchone()[0]
print(f" 总表达式数: {total:,}")
# 分批读取
batch_size = 5000
offset = 0
processed = 0
field_counter = Counter()
while offset < total:
print(f"📖 读取进度: {processed:,} / {total:,} ({processed*100//total}%)")
cursor.execute(
"SELECT expression FROM alpha_expression_line LIMIT %s OFFSET %s",
(batch_size, offset)
)
rows = cursor.fetchall()
for row in rows:
expr = row[0]
if expr and isinstance(expr, str):
fields = extract_fields(expr)
field_counter.update(fields)
processed += 1
offset += batch_size
print(f"\n✅ 提取完成,共处理 {processed:,} 条表达式")
print(f" 发现 {len(field_counter)} 个不同的字段")
# 按出现次数排序
sorted_fields = sorted(field_counter.items(), key=lambda x: x[1], reverse=True)
# 保存完整字段列表到 CSV
csv_filename = 'extracted_fields.csv'
with open(csv_filename, 'w', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
writer.writerow(['field_name', 'occurrence_count'])
for field, count in sorted_fields:
writer.writerow([field, count])
print(f"\n💾 完整字段列表已保存到: {csv_filename}")
print(f"{len(sorted_fields)} 个字段")
# 保存字段名列表(纯文本,方便复制)
txt_filename = 'field_names.txt'
with open(txt_filename, 'w', encoding='utf-8') as f:
for field, count in sorted_fields:
f.write(f"{field}\n")
print(f"💾 字段名列表(纯文本)已保存到: {txt_filename}")
# 统计信息
print("\n" + "=" * 60)
print("📊 统计信息")
print("=" * 60)
print(f"总表达式数: {total:,}")
print(f"有效表达式数: {processed:,}")
print(f"不同字段数: {len(sorted_fields):,}")
# 高频字段 Top 50
print("\n📋 出现次数最多的前 50 个字段:")
print("-" * 40)
for i, (field, count) in enumerate(sorted_fields[:50], 1):
pct = count * 100.0 / processed
print(f"{i:3}. {field:30} {count:8,} 次 ({pct:.2f}%)")
# 低频字段统计(只出现1次的)
once_fields = [f for f, c in sorted_fields if c == 1]
print(f"\n📋 仅出现 1 次的字段数: {len(once_fields)}")
if once_fields:
print(f" 示例: {', '.join(once_fields[:10])}")
# 保存详细统计报告
report_filename = f'field_analysis_report_{datetime.now().strftime("%Y%m%d_%H%M%S")}.txt'
with open(report_filename, 'w', encoding='utf-8') as f:
f.write("字段分析报告\n")
f.write("=" * 60 + "\n")
f.write(f"生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
f.write(f"总表达式数: {total:,}\n")
f.write(f"不同字段数: {len(sorted_fields):,}\n\n")
f.write("高频字段 Top 100:\n")
f.write("-" * 60 + "\n")
for i, (field, count) in enumerate(sorted_fields[:100], 1):
pct = count * 100.0 / processed
f.write(f"{i:3}. {field:30} {count:8,} 次 ({pct:.2f}%)\n")
f.write("\n\n完整字段列表:\n")
f.write("-" * 60 + "\n")
for field, count in sorted_fields:
f.write(f"{field},{count}\n")
print(f"\n💾 详细分析报告已保存到: {report_filename}")
print("\n" + "=" * 60)
print(f"完成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print("=" * 60)
finally:
cursor.close()
conn.close()
print("\n🔌 数据库连接已关闭")
except Exception as e:
print(f"\n❌ 错误: {e}")
import traceback
traceback.print_exc()
finally:
# 恢复标准输出
sys.stdout = logger.terminal
logger.close()
print(f"\n✅ 所有输出已保存到: {log_filename}")
if __name__ == "__main__":
main()