You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

135 lines
5.3 KiB

from WindPy import *
import numpy as np
import pandas as pd
import numpy.linalg as la
import seaborn as sns
w.start()
## 组合多因子
lc = data.copy()
num_list = [42,44,36,15,4]
num_list = [i-1 for i in num_list]
m = []
alpha_m = lc[num_list[0]].copy()
for i in num_list[1:]:
alpha_m['alpha'+str(i+1)] = lc[i]['alpha'+str(i+1)]
## 因子数据预处理
# 因子相关系数热力图
fig=plt.figure(figsize=(10,6))
relations= alpha_m.corr()
sns.heatmap(relations,annot=True,linewidths=0.05,linecolor='white',annot_kws={'size':8,'weight':'bold'})
# 中位数去极值
def extreme_process_MAD(sample): # 输入的sample为时间截面的股票因子df数据
factor_name = list(sample.columns)
for name in factor_name:
x = sample[name]
median = x.median()
MAD = abs(x - median).median()
x[x>(median+3*1.4826*MAD)] = median+3*1.4826*MAD
x[x<(median-3*1.4826*MAD)] = median-3*1.4826*MAD
sample[name] = x
return sample
# 行业市值中性化
def data_scale_neutral(sample,date):
stocks = list(sample.index)
ind=w.wss(stocks, "industry_citic","unit=1;tradeDate="+date+";industryType=1", usedf=True)[1]
Incap=w.wss(stocks, "val_lnmv","unit=1;tradeDate="+date+";industryType=1", usedf=True)[1]
data_med = pd.get_dummies(ind,columns=['INDUSTRY_CITIC']) # 生成0-1变量矩阵
x = pd.concat([data_med,Incap],axis=1).dropna()
X= np.array(x)
sample = sample.loc[list(x.index)]
factor_name = list(sample.columns)
for name in factor_name:
y = np.array(sample[name])
if la.matrix_rank(X.T.dot(X)) == X.shape[1]:
beta_ols = la.inv(X.T.dot(X)).dot(X.T).dot(y) # 最小二乘法计算拟合值
residual = y - X.dot(beta_ols) # 取残差为中性化后的因子值
else:
residual = y
sample[name] = residual
return sample
# 标准化
def standardize(sample):
factor_name = list(sample.columns)
for name in factor_name:
x = sample[name]
sample[name] = (x - np.mean(x))/(np.std(x))
return sample
# 数据预处理
def data_process(sample,date):
sample = extreme_process_MAD(sample)
sample = data_scale_neutral(sample,date)
sample = standardize(sample)
return sample
# 回测筛选股票池
def get_stocks(trDate,A_stocks):
status = w.wss(A_stocks, "trade_status,maxupordown,riskwarning,ipo_date", tradeDate=trDate, usedf=True)[1]
date_least=w.tdaysoffset(-6,trDate,'Period=M').Data[0][0]
trade_codes=list(status[(status['TRADE_STATUS']=='交易')&(status['IPO_DATE']<=date_least)&(status['MAXUPORDOWN']==0)&(status['RISKWARNING']=='否')].index)
return trade_codes
# 最大化历史ICIR加权/历史IC均值加权
def IR_weight(bar_datetime_str,stocks,alpha_data):
Period="W"
begin_time = w.tdaysoffset(-12, bar_datetime_str, Period=Period, usedf=True).Data[0][0].strftime('%Y-%m-%d')
time_list = w.tdays(begin_time,bar_datetime_str,Period=Period, usedf=True).Data[0]
time_list = [time.strftime('%Y-%m-%d') for time in time_list]
IC_s = []
IC = []
next_ret = w.wsd(stocks, "pct_chg", begin_time, bar_datetime_str, usedf=True, Period = Period)[1].fillna(value = 0).iloc[1:]
for i in range(12):
factor = alpha_data.loc[time_list[i]].reset_index().set_index(['codes']).drop(columns = ['date']).loc[stocks] # 转化为股票单层索引
factor_name = list(factor.columns)
ic_s = []
for name in factor_name:
try:
ic_s.append(factor[name].corr(next_ret.iloc[i],method='spearman'))
except:
ic_s.append(factor[name].corr(next_ret.iloc[i-1],method='spearman'))
IC_s.append(ic_s)
IC_s = np.array(IC_s).T
W = np.cov(IC_s)
for i in range(len(IC_s)):
IC.append(IC_s[i].mean())
IC_IR = np.dot(W,np.array(IC))
return IC_IR # np.array(IC)
# 最大化历史收益率加权
def rate_weight(bar_datetime_str,stocks,alpha_data,num=10):
Period="W"
begin_time = w.tdaysoffset(-12, bar_datetime_str, Period=Period, usedf=True).Data[0][0].strftime('%Y-%m-%d')
time_list = w.tdays(begin_time,bar_datetime_str,Period=Period, usedf=True).Data[0]
time_list = [time.strftime('%Y-%m-%d') for time in time_list]
IC_s = []
IC = []
next_ret = w.wsd(stocks, "pct_chg", begin_time, bar_datetime_str, usedf=True, Period = Period)[1].fillna(value = 0).iloc[1:]
for i in range(12):
factor = alpha_data.loc[time_list[i]].reset_index().set_index(['codes']).drop(columns = ['date']).loc[stocks]
factor_name = list(factor.columns)
ic_s = []
for name in factor_name:
g10 = list(factor.sort_values([name],ascending=False).iloc[:round(len(factor)/num)].index)
try:
ic_s = (next_ret.T.loc[g10]).T.iloc[i].mean()
except:
ic_s = (next_ret.T.loc[g10]).T.iloc[i-1].mean()
IC_s.append(ic_s)
IC_s = np.array(IC_s).T
for i in range(len(IC_s)):
IC.append(IC_s[i].mean())
rate = np.array(IC)
return rate
# 因子打分
def factor_sum(sample,weight_list):
factor_name = list(sample.columns)
sample['alpha_sum'] = sample[factor_name[0]] * 0
for i in range(len(factor_name)):
sample['alpha_sum'] = sample['alpha_sum'] + sample[factor_name[i]] * weight_list[i]
return sample