This content originally appeared on DEV Community and was authored by Henry Lin
第2章:数据层深度解析
学习目标
通过本章学习,您将能够:
- 理解Qlib数据框架的设计原理
- 掌握数据获取、清洗和预处理技术
- 熟悉Alpha158和Alpha360数据集
- 学会构建自定义数据集
- 掌握数据质量检查和优化方法
2.1 数据框架设计
2.1.1 Qlib数据存储格式
数据存储架构
Qlib采用高效的数据存储格式,专门为量化投资场景优化设计。
存储层次结构:
qlib_data/
├── cn_data/ # 中国市场数据
│ ├── calendars/ # 交易日历
│ ├── features/ # 特征数据
│ ├── instruments/ # 股票列表
│ └── cache/ # 缓存数据
└── us_data/ # 美国市场数据
├── calendars/
├── features/
├── instruments/
└── cache/
数据格式特点
1. 二进制存储
- 使用高效的二进制格式存储
- 支持快速读取和写入
- 减少存储空间占用
2. 列式存储
- 按特征列存储数据
- 便于向量化计算
- 支持高效的数据查询
3. 索引优化
- 时间索引优化
- 股票代码索引
- 支持快速数据定位
数据访问接口
from qlib.data import D
# 基础数据访问
data = D.features(
instruments=['SH600000', 'SH600036'], # 股票代码
fields=['$close', '$volume', '$open'], # 字段
start_time='2020-01-01', # 开始时间
end_time='2020-12-31', # 结束时间
freq='day' # 频率
)
2.1.2 数据访问接口设计
核心接口介绍
1. 日历接口
# 获取交易日历
calendar = D.calendar(
start_time='2020-01-01',
end_time='2020-12-31',
freq='day'
)
# 获取交易日
trading_days = D.calendar(
start_time='2020-01-01',
end_time='2020-12-31',
freq='day',
future=False
)
2. 股票列表接口
# 获取股票列表
instruments = D.instruments('csi300') # CSI300成分股
stock_list = D.list_instruments(
instruments=instruments,
start_time='2020-01-01',
end_time='2020-12-31',
as_list=True
)
3. 特征数据接口
# 获取特征数据
data = D.features(
instruments=['SH600000'],
fields=['$close', '$volume', 'Ref($close, 1)'],
start_time='2020-01-01',
end_time='2020-12-31',
freq='day'
)
高级查询功能
1. 表达式支持
# 支持复杂表达式
fields = [
'$close', # 收盘价
'Ref($close, 1)', # 前一日收盘价
'Mean($close, 20)', # 20日移动平均
'$close / Ref($close, 1) - 1', # 日收益率
'Std($close, 20)', # 20日标准差
]
2. 条件过滤
# 条件过滤
data = D.features(
instruments=['SH600000'],
fields=['$close'],
start_time='2020-01-01',
end_time='2020-12-31',
freq='day',
where='$volume > 1000000' # 成交量大于100万
)
2.1.3 数据缓存机制
缓存策略
1. 多级缓存
- 内存缓存:热点数据常驻内存
- 磁盘缓存:中间结果缓存到磁盘
- 网络缓存:分布式环境下的网络缓存
2. 缓存更新策略
# 缓存配置
qlib.init(
mount_path="~/.qlib/qlib_data/cn_data",
region=REG_CN,
cache_dir="~/.qlib/cache", # 缓存目录
cache_size=1000, # 缓存大小
cache_timeout=3600 # 缓存超时时间
)
3. 缓存优化
# 预加载数据
D.features(
instruments=['SH600000'],
fields=['$close'],
start_time='2020-01-01',
end_time='2020-12-31',
freq='day',
cache=True # 启用缓存
)
2.2 数据获取与处理
2.2.1 股票数据获取
数据源介绍
1. 官方数据源
- Yahoo Finance:免费股票数据
- 本地数据:预处理的二进制数据
- 自定义数据:用户提供的数据
2. 数据频率
- 日频数据:每日OHLCV数据
- 分钟数据:1分钟、5分钟等高频数据
- 实时数据:实时行情数据
数据获取方法
1. 使用官方脚本
# 获取日频数据
python scripts/get_data.py qlib_data \
--target_dir ~/.qlib/qlib_data/cn_data \
--region cn
# 获取分钟数据
python scripts/get_data.py qlib_data \
--target_dir ~/.qlib/qlib_data/cn_data_1min \
--region cn \
--interval 1min
2. 使用模块接口
# 获取日频数据
python -m qlib.run.get_data qlib_data \
--target_dir ~/.qlib/qlib_data/cn_data \
--region cn
# 获取分钟数据
python -m qlib.run.get_data qlib_data \
--target_dir ~/.qlib/qlib_data/cn_data_1min \
--region cn \
--interval 1min
3. 自定义数据获取
# 自定义数据获取脚本
import pandas as pd
from qlib.data import D
def get_custom_data():
"""获取自定义数据"""
# 获取股票列表
instruments = D.instruments('csi300')
# 获取基础数据
data = D.features(
instruments=instruments,
fields=['$close', '$volume', '$open', '$high', '$low'],
start_time='2020-01-01',
end_time='2020-12-31',
freq='day'
)
return data
2.2.2 数据清洗和预处理
数据质量问题
1. 常见问题
- 缺失值:某些时间点数据缺失
- 异常值:价格或成交量异常
- 重复数据:同一时间点重复记录
- 数据不一致:不同数据源数据不一致
2. 数据清洗方法
import pandas as pd
import numpy as np
def clean_data(data):
"""数据清洗函数"""
# 处理缺失值
data = data.fillna(method='ffill') # 前向填充
data = data.fillna(method='bfill') # 后向填充
# 处理异常值
# 价格异常值处理
price_cols = ['$open', '$high', '$low', '$close']
for col in price_cols:
if col in data.columns:
# 使用3倍标准差过滤异常值
mean_val = data[col].mean()
std_val = data[col].std()
data[col] = data[col].clip(
lower=mean_val - 3 * std_val,
upper=mean_val + 3 * std_val
)
# 成交量异常值处理
if '$volume' in data.columns:
data['$volume'] = data['$volume'].clip(lower=0)
return data
数据预处理技术
1. 数据标准化
from sklearn.preprocessing import StandardScaler
def normalize_data(data):
"""数据标准化"""
scaler = StandardScaler()
normalized_data = scaler.fit_transform(data)
return pd.DataFrame(normalized_data, index=data.index, columns=data.columns)
2. 数据对齐
def align_data(data_dict):
"""数据对齐"""
# 获取所有数据的索引交集
common_index = None
for data in data_dict.values():
if common_index is None:
common_index = data.index
else:
common_index = common_index.intersection(data.index)
# 对齐数据
aligned_data = {}
for name, data in data_dict.items():
aligned_data[name] = data.loc[common_index]
return aligned_data
2.2.3 特征工程基础
技术指标特征
1. 移动平均线
def calculate_ma(data, window=20):
"""计算移动平均线"""
return data.rolling(window=window).mean()
# 使用示例
data['MA5'] = calculate_ma(data['$close'], 5)
data['MA20'] = calculate_ma(data['$close'], 20)
2. 相对强弱指数(RSI)
def calculate_rsi(data, window=14):
"""计算RSI指标"""
delta = data.diff()
gain = (delta.where(delta > 0, 0)).rolling(window=window).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=window).mean()
rs = gain / loss
rsi = 100 - (100 / (1 + rs))
return rsi
# 使用示例
data['RSI'] = calculate_rsi(data['$close'])
3. 布林带
def calculate_bollinger_bands(data, window=20, num_std=2):
"""计算布林带"""
ma = data.rolling(window=window).mean()
std = data.rolling(window=window).std()
upper_band = ma + (std * num_std)
lower_band = ma - (std * num_std)
return upper_band, ma, lower_band
# 使用示例
upper, middle, lower = calculate_bollinger_bands(data['$close'])
data['BB_upper'] = upper
data['BB_middle'] = middle
data['BB_lower'] = lower
价格特征
1. 收益率特征
def calculate_returns(data):
"""计算收益率特征"""
# 日收益率
data['daily_return'] = data['$close'].pct_change()
# 对数收益率
data['log_return'] = np.log(data['$close'] / data['$close'].shift(1))
# 累积收益率
data['cumulative_return'] = (1 + data['daily_return']).cumprod() - 1
return data
2. 波动率特征
def calculate_volatility(data, window=20):
"""计算波动率特征"""
# 滚动波动率
data['volatility'] = data['daily_return'].rolling(window=window).std()
# 年化波动率
data['annualized_volatility'] = data['volatility'] * np.sqrt(252)
return data
成交量特征
1. 成交量指标
def calculate_volume_features(data):
"""计算成交量特征"""
# 成交量移动平均
data['volume_ma5'] = data['$volume'].rolling(window=5).mean()
data['volume_ma20'] = data['$volume'].rolling(window=20).mean()
# 成交量比率
data['volume_ratio'] = data['$volume'] / data['volume_ma20']
# 成交量变化率
data['volume_change'] = data['$volume'].pct_change()
return data
2.2.4 数据质量检查
数据质量指标
1. 完整性检查
def check_completeness(data):
"""检查数据完整性"""
# 缺失值统计
missing_stats = data.isnull().sum()
missing_ratio = missing_stats / len(data)
print("缺失值统计:")
for col, ratio in missing_ratio.items():
if ratio > 0:
print(f"{col}: {ratio:.2%}")
return missing_ratio
2. 一致性检查
def check_consistency(data):
"""检查数据一致性"""
# 价格逻辑检查
if all(col in data.columns for col in ['$open', '$high', '$low', '$close']):
# 检查OHLC逻辑
invalid_high = data['$high'] < data[['$open', '$close']].max(axis=1)
invalid_low = data['$low'] > data[['$open', '$close']].min(axis=1)
print(f"高价逻辑错误: {invalid_high.sum()} 条")
print(f"低价逻辑错误: {invalid_low.sum()} 条")
# 成交量检查
if '$volume' in data.columns:
negative_volume = data['$volume'] < 0
print(f"负成交量: {negative_volume.sum()} 条")
3. 异常值检测
def detect_outliers(data, method='iqr'):
"""异常值检测"""
outliers = {}
for col in data.select_dtypes(include=[np.number]).columns:
if method == 'iqr':
# IQR方法
Q1 = data[col].quantile(0.25)
Q3 = data[col].quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
outliers[col] = (data[col] < lower_bound) | (data[col] > upper_bound)
elif method == 'zscore':
# Z-score方法
z_scores = np.abs((data[col] - data[col].mean()) / data[col].std())
outliers[col] = z_scores > 3
return outliers
数据质量报告
def generate_quality_report(data):
"""生成数据质量报告"""
report = {
'基本信息': {
'数据形状': data.shape,
'数据类型': data.dtypes.to_dict(),
'内存使用': data.memory_usage(deep=True).sum()
},
'缺失值': data.isnull().sum().to_dict(),
'重复值': data.duplicated().sum(),
'唯一值': {col: data[col].nunique() for col in data.columns}
}
# 数值型数据统计
numeric_cols = data.select_dtypes(include=[np.number]).columns
if len(numeric_cols) > 0:
report['数值统计'] = data[numeric_cols].describe().to_dict()
return report
2.3 量化数据集介绍
2.3.1 Alpha158数据集详解
数据集概述
Alpha158是Qlib提供的一个经典量化数据集,包含158个经过精心设计的特征。
数据集特点:
- 特征数量:158个特征
- 数据频率:日频数据
- 时间范围:2008-2020年
- 股票范围:A股市场
- 特征类型:价格、成交量、技术指标等
特征分类
1. 价格特征(Price Features)
# 基础价格特征
price_features = [
'$open', '$high', '$low', '$close', # OHLC
'$vwap', '$adj', # 成交量加权平均价、复权价
]
2. 收益率特征(Return Features)
# 收益率特征
return_features = [
'Ref($close, 1)', # 前一日收盘价
'Ref($close, 2)', # 前二日收盘价
'$close / Ref($close, 1) - 1', # 日收益率
'Ref($close, 1) / Ref($close, 2) - 1', # 前一日收益率
]
3. 技术指标特征(Technical Indicators)
# 移动平均线
ma_features = [
'Mean($close, 5)', # 5日移动平均
'Mean($close, 10)', # 10日移动平均
'Mean($close, 20)', # 20日移动平均
'Mean($close, 60)', # 60日移动平均
]
# 波动率指标
volatility_features = [
'Std($close, 5)', # 5日标准差
'Std($close, 10)', # 10日标准差
'Std($close, 20)', # 20日标准差
]
4. 成交量特征(Volume Features)
# 成交量特征
volume_features = [
'$volume', # 成交量
'Mean($volume, 5)', # 5日平均成交量
'Mean($volume, 10)', # 10日平均成交量
'$volume / Mean($volume, 20)', # 成交量比率
]
数据集使用
1. 加载Alpha158数据
from qlib.contrib.data.handler import Alpha158
# 创建数据处理器
handler = Alpha158(
instruments='csi300',
start_time='2020-01-01',
end_time='2020-12-31',
freq='day'
)
# 获取训练数据
train_data = handler.fetch(
segments={
'train': ('2020-01-01', '2020-06-30'),
'valid': ('2020-07-01', '2020-09-30'),
'test': ('2020-10-01', '2020-12-31')
}
)
2. 特征工程
# 特征处理
features = train_data['train']['feature']
labels = train_data['train']['label']
# 特征选择
selected_features = [
'$close', '$volume', 'Ref($close, 1)',
'Mean($close, 20)', 'Std($close, 20)',
'$volume / Mean($volume, 20)'
]
X = features[selected_features]
y = labels['LABEL0'] # 未来收益率
2.3.2 Alpha360数据集详解
数据集概述
Alpha360是一个更高级的数据集,包含360个特征,主要基于原始价格和成交量数据构建。
数据集特点:
- 特征数量:360个特征
- 数据频率:日频数据
- 时间范围:2008-2020年
- 股票范围:A股市场
- 特征类型:原始价格数据、技术指标、统计特征
特征构建原理
1. 原始数据特征
# 基础价格数据
raw_price_features = [
'$open', '$high', '$low', '$close', '$volume',
'$vwap', '$adj', '$factor' # 复权因子
]
2. 时间序列特征
# 时间窗口特征
time_window_features = [
# 过去1-60天的价格特征
'Ref($close, 1)', 'Ref($close, 2)', ..., 'Ref($close, 60)',
'Ref($volume, 1)', 'Ref($volume, 2)', ..., 'Ref($volume, 60)',
# 统计特征
'Mean($close, 5)', 'Mean($close, 10)', 'Mean($close, 20)',
'Std($close, 5)', 'Std($close, 10)', 'Std($close, 20)',
]
3. 技术指标特征
# 技术指标
technical_features = [
# 移动平均线
'Mean($close, 5)', 'Mean($close, 10)', 'Mean($close, 20)',
# 相对强弱指数
'RSI($close, 14)',
# 布林带
'BB_upper($close, 20, 2)', 'BB_lower($close, 20, 2)',
# MACD
'MACD($close, 12, 26, 9)',
]
数据集优势
1. 特征丰富性
- 包含更多原始数据特征
- 时间序列特征更完整
- 技术指标更全面
2. 数据质量
- 经过严格的数据清洗
- 特征工程更规范
- 数据一致性更好
3. 适用性
- 适合深度学习模型
- 支持复杂特征组合
- 便于模型优化
2.3.3 自定义数据集构建
数据集构建流程
1. 数据准备
import pandas as pd
import numpy as np
from qlib.data import D
def prepare_custom_data():
"""准备自定义数据"""
# 获取股票列表
instruments = D.instruments('csi300')
# 获取基础数据
data = D.features(
instruments=instruments,
fields=['$close', '$volume', '$open', '$high', '$low'],
start_time='2020-01-01',
end_time='2020-12-31',
freq='day'
)
return data
2. 特征工程
def engineer_features(data):
"""特征工程"""
# 计算技术指标
data['MA5'] = data['$close'].rolling(window=5).mean()
data['MA20'] = data['$close'].rolling(window=20).mean()
data['RSI'] = calculate_rsi(data['$close'])
data['VOLUME_MA5'] = data['$volume'].rolling(window=5).mean()
# 计算收益率
data['RETURN'] = data['$close'].pct_change()
data['RETURN_5'] = data['$close'].pct_change(periods=5)
# 计算波动率
data['VOLATILITY'] = data['RETURN'].rolling(window=20).std()
return data
3. 标签生成
def generate_labels(data, forward_days=1):
"""生成标签"""
# 计算未来收益率
future_return = data['$close'].shift(-forward_days) / data['$close'] - 1
# 生成分类标签(1表示上涨,0表示下跌)
labels = (future_return > 0).astype(int)
return labels
数据集处理器
from qlib.contrib.data.handler import DataHandler
class CustomDataHandler(DataHandler):
"""自定义数据处理器"""
def __init__(self, instruments, start_time, end_time, freq='day'):
super().__init__(instruments, start_time, end_time, freq)
def fetch(self, segments, **kwargs):
"""获取数据"""
# 准备数据
data = prepare_custom_data()
# 特征工程
data = engineer_features(data)
# 生成标签
labels = generate_labels(data)
# 分割数据
result = {}
for segment, (start, end) in segments.items():
mask = (data.index >= start) & (data.index <= end)
segment_data = data[mask]
segment_labels = labels[mask]
result[segment] = {
'feature': segment_data,
'label': segment_labels
}
return result
2.3.4 数据集的评估指标
数据质量指标
1. 完整性指标
def calculate_completeness(data):
"""计算数据完整性"""
total_elements = data.size
non_null_elements = data.count().sum()
completeness = non_null_elements / total_elements
return completeness
2. 一致性指标
def calculate_consistency(data):
"""计算数据一致性"""
# 检查价格逻辑
if all(col in data.columns for col in ['$open', '$high', '$low', '$close']):
valid_high = (data['$high'] >= data[['$open', '$close']].max(axis=1)).sum()
valid_low = (data['$low'] <= data[['$open', '$close']].min(axis=1)).sum()
total_records = len(data)
consistency = (valid_high + valid_low) / (2 * total_records)
return consistency
return 1.0
3. 时效性指标
def calculate_timeliness(data, expected_freq='D'):
"""计算数据时效性"""
# 检查数据频率
time_diff = data.index.to_series().diff()
expected_diff = pd.Timedelta(expected_freq)
# 计算频率一致性
freq_consistency = (time_diff == expected_diff).mean()
return freq_consistency
特征质量指标
1. 特征相关性
def calculate_feature_correlation(features, labels):
"""计算特征与标签的相关性"""
correlations = {}
for col in features.columns:
corr = features[col].corr(labels)
correlations[col] = corr
return pd.Series(correlations)
2. 特征重要性
from sklearn.ensemble import RandomForestRegressor
def calculate_feature_importance(features, labels):
"""计算特征重要性"""
rf = RandomForestRegressor(n_estimators=100, random_state=42)
rf.fit(features, labels)
importance = pd.Series(rf.feature_importances_, index=features.columns)
return importance.sort_values(ascending=False)
3. 特征稳定性
def calculate_feature_stability(features, time_periods):
"""计算特征稳定性"""
stability_scores = {}
for col in features.columns:
feature_data = features[col]
stability = []
for i in range(len(time_periods) - 1):
period1 = feature_data[time_periods[i]]
period2 = feature_data[time_periods[i + 1]]
# 计算分布相似性
similarity = calculate_distribution_similarity(period1, period2)
stability.append(similarity)
stability_scores[col] = np.mean(stability)
return pd.Series(stability_scores)
本章小结
本章深入介绍了Qlib数据层的各个方面,包括:
- 数据框架设计:理解了Qlib的数据存储格式、访问接口和缓存机制
- 数据获取与处理:掌握了数据获取、清洗、预处理和特征工程技术
- 量化数据集:熟悉了Alpha158和Alpha360数据集的特点和使用方法
- 数据质量检查:学会了数据质量评估和优化方法
课后练习
练习1:数据获取实践
- 下载并初始化Qlib数据
- 获取CSI300成分股的基础数据
- 计算简单的技术指标
练习2:特征工程
- 基于价格数据计算移动平均线
- 计算RSI、布林带等技术指标
- 构建自定义特征
练习3:数据质量检查
- 编写数据质量检查脚本
- 分析数据完整性和一致性
- 生成数据质量报告
扩展阅读
-
数据处理技术
- 《Python数据科学手册》
- 《特征工程:从入门到实践》
-
量化数据相关
- 《量化投资数据挖掘》
- 《金融时间序列分析》
-
技术指标
- 《技术分析指标大全》
- 《量化技术分析》
This content originally appeared on DEV Community and was authored by Henry Lin