第2章:Qlib数据层深度解析



This content originally appeared on DEV Community and was authored by Henry Lin

第2章:数据层深度解析

学习目标

通过本章学习,您将能够:

  • 理解Qlib数据框架的设计原理
  • 掌握数据获取、清洗和预处理技术
  • 熟悉Alpha158和Alpha360数据集
  • 学会构建自定义数据集
  • 掌握数据质量检查和优化方法

2.1 数据框架设计

2.1.1 Qlib数据存储格式

数据存储架构

Qlib采用高效的数据存储格式,专门为量化投资场景优化设计。

存储层次结构:

qlib_data/
├── cn_data/                    # 中国市场数据
│   ├── calendars/              # 交易日历
│   ├── features/               # 特征数据
│   ├── instruments/            # 股票列表
│   └── cache/                  # 缓存数据
└── us_data/                    # 美国市场数据
    ├── calendars/
    ├── features/
    ├── instruments/
    └── cache/

数据格式特点

1. 二进制存储

  • 使用高效的二进制格式存储
  • 支持快速读取和写入
  • 减少存储空间占用

2. 列式存储

  • 按特征列存储数据
  • 便于向量化计算
  • 支持高效的数据查询

3. 索引优化

  • 时间索引优化
  • 股票代码索引
  • 支持快速数据定位

数据访问接口

from qlib.data import D

# 基础数据访问
data = D.features(
    instruments=['SH600000', 'SH600036'],  # 股票代码
    fields=['$close', '$volume', '$open'],  # 字段
    start_time='2020-01-01',               # 开始时间
    end_time='2020-12-31',                 # 结束时间
    freq='day'                             # 频率
)

2.1.2 数据访问接口设计

核心接口介绍

1. 日历接口

# 获取交易日历
calendar = D.calendar(
    start_time='2020-01-01',
    end_time='2020-12-31',
    freq='day'
)

# 获取交易日
trading_days = D.calendar(
    start_time='2020-01-01',
    end_time='2020-12-31',
    freq='day',
    future=False
)

2. 股票列表接口

# 获取股票列表
instruments = D.instruments('csi300')  # CSI300成分股
stock_list = D.list_instruments(
    instruments=instruments,
    start_time='2020-01-01',
    end_time='2020-12-31',
    as_list=True
)

3. 特征数据接口

# 获取特征数据
data = D.features(
    instruments=['SH600000'],
    fields=['$close', '$volume', 'Ref($close, 1)'],
    start_time='2020-01-01',
    end_time='2020-12-31',
    freq='day'
)

高级查询功能

1. 表达式支持

# 支持复杂表达式
fields = [
    '$close',                    # 收盘价
    'Ref($close, 1)',           # 前一日收盘价
    'Mean($close, 20)',         # 20日移动平均
    '$close / Ref($close, 1) - 1',  # 日收益率
    'Std($close, 20)',          # 20日标准差
]

2. 条件过滤

# 条件过滤
data = D.features(
    instruments=['SH600000'],
    fields=['$close'],
    start_time='2020-01-01',
    end_time='2020-12-31',
    freq='day',
    where='$volume > 1000000'  # 成交量大于100万
)

2.1.3 数据缓存机制

缓存策略

1. 多级缓存

  • 内存缓存:热点数据常驻内存
  • 磁盘缓存:中间结果缓存到磁盘
  • 网络缓存:分布式环境下的网络缓存

2. 缓存更新策略

# 缓存配置
qlib.init(
    mount_path="~/.qlib/qlib_data/cn_data",
    region=REG_CN,
    cache_dir="~/.qlib/cache",  # 缓存目录
    cache_size=1000,            # 缓存大小
    cache_timeout=3600          # 缓存超时时间
)

3. 缓存优化

# 预加载数据
D.features(
    instruments=['SH600000'],
    fields=['$close'],
    start_time='2020-01-01',
    end_time='2020-12-31',
    freq='day',
    cache=True  # 启用缓存
)

2.2 数据获取与处理

2.2.1 股票数据获取

数据源介绍

1. 官方数据源

  • Yahoo Finance:免费股票数据
  • 本地数据:预处理的二进制数据
  • 自定义数据:用户提供的数据

2. 数据频率

  • 日频数据:每日OHLCV数据
  • 分钟数据:1分钟、5分钟等高频数据
  • 实时数据:实时行情数据

数据获取方法

1. 使用官方脚本

# 获取日频数据
python scripts/get_data.py qlib_data \
  --target_dir ~/.qlib/qlib_data/cn_data \
  --region cn

# 获取分钟数据
python scripts/get_data.py qlib_data \
  --target_dir ~/.qlib/qlib_data/cn_data_1min \
  --region cn \
  --interval 1min

2. 使用模块接口

# 获取日频数据
python -m qlib.run.get_data qlib_data \
  --target_dir ~/.qlib/qlib_data/cn_data \
  --region cn

# 获取分钟数据
python -m qlib.run.get_data qlib_data \
  --target_dir ~/.qlib/qlib_data/cn_data_1min \
  --region cn \
  --interval 1min

3. 自定义数据获取

# 自定义数据获取脚本
import pandas as pd
from qlib.data import D

def get_custom_data():
    """获取自定义数据"""
    # 获取股票列表
    instruments = D.instruments('csi300')

    # 获取基础数据
    data = D.features(
        instruments=instruments,
        fields=['$close', '$volume', '$open', '$high', '$low'],
        start_time='2020-01-01',
        end_time='2020-12-31',
        freq='day'
    )

    return data

2.2.2 数据清洗和预处理

数据质量问题

1. 常见问题

  • 缺失值:某些时间点数据缺失
  • 异常值:价格或成交量异常
  • 重复数据:同一时间点重复记录
  • 数据不一致:不同数据源数据不一致

2. 数据清洗方法

import pandas as pd
import numpy as np

def clean_data(data):
    """数据清洗函数"""
    # 处理缺失值
    data = data.fillna(method='ffill')  # 前向填充
    data = data.fillna(method='bfill')  # 后向填充

    # 处理异常值
    # 价格异常值处理
    price_cols = ['$open', '$high', '$low', '$close']
    for col in price_cols:
        if col in data.columns:
            # 使用3倍标准差过滤异常值
            mean_val = data[col].mean()
            std_val = data[col].std()
            data[col] = data[col].clip(
                lower=mean_val - 3 * std_val,
                upper=mean_val + 3 * std_val
            )

    # 成交量异常值处理
    if '$volume' in data.columns:
        data['$volume'] = data['$volume'].clip(lower=0)

    return data

数据预处理技术

1. 数据标准化

from sklearn.preprocessing import StandardScaler

def normalize_data(data):
    """数据标准化"""
    scaler = StandardScaler()
    normalized_data = scaler.fit_transform(data)
    return pd.DataFrame(normalized_data, index=data.index, columns=data.columns)

2. 数据对齐

def align_data(data_dict):
    """数据对齐"""
    # 获取所有数据的索引交集
    common_index = None
    for data in data_dict.values():
        if common_index is None:
            common_index = data.index
        else:
            common_index = common_index.intersection(data.index)

    # 对齐数据
    aligned_data = {}
    for name, data in data_dict.items():
        aligned_data[name] = data.loc[common_index]

    return aligned_data

2.2.3 特征工程基础

技术指标特征

1. 移动平均线

def calculate_ma(data, window=20):
    """计算移动平均线"""
    return data.rolling(window=window).mean()

# 使用示例
data['MA5'] = calculate_ma(data['$close'], 5)
data['MA20'] = calculate_ma(data['$close'], 20)

2. 相对强弱指数(RSI)

def calculate_rsi(data, window=14):
    """计算RSI指标"""
    delta = data.diff()
    gain = (delta.where(delta > 0, 0)).rolling(window=window).mean()
    loss = (-delta.where(delta < 0, 0)).rolling(window=window).mean()
    rs = gain / loss
    rsi = 100 - (100 / (1 + rs))
    return rsi

# 使用示例
data['RSI'] = calculate_rsi(data['$close'])

3. 布林带

def calculate_bollinger_bands(data, window=20, num_std=2):
    """计算布林带"""
    ma = data.rolling(window=window).mean()
    std = data.rolling(window=window).std()
    upper_band = ma + (std * num_std)
    lower_band = ma - (std * num_std)
    return upper_band, ma, lower_band

# 使用示例
upper, middle, lower = calculate_bollinger_bands(data['$close'])
data['BB_upper'] = upper
data['BB_middle'] = middle
data['BB_lower'] = lower

价格特征

1. 收益率特征

def calculate_returns(data):
    """计算收益率特征"""
    # 日收益率
    data['daily_return'] = data['$close'].pct_change()

    # 对数收益率
    data['log_return'] = np.log(data['$close'] / data['$close'].shift(1))

    # 累积收益率
    data['cumulative_return'] = (1 + data['daily_return']).cumprod() - 1

    return data

2. 波动率特征

def calculate_volatility(data, window=20):
    """计算波动率特征"""
    # 滚动波动率
    data['volatility'] = data['daily_return'].rolling(window=window).std()

    # 年化波动率
    data['annualized_volatility'] = data['volatility'] * np.sqrt(252)

    return data

成交量特征

1. 成交量指标

def calculate_volume_features(data):
    """计算成交量特征"""
    # 成交量移动平均
    data['volume_ma5'] = data['$volume'].rolling(window=5).mean()
    data['volume_ma20'] = data['$volume'].rolling(window=20).mean()

    # 成交量比率
    data['volume_ratio'] = data['$volume'] / data['volume_ma20']

    # 成交量变化率
    data['volume_change'] = data['$volume'].pct_change()

    return data

2.2.4 数据质量检查

数据质量指标

1. 完整性检查

def check_completeness(data):
    """检查数据完整性"""
    # 缺失值统计
    missing_stats = data.isnull().sum()
    missing_ratio = missing_stats / len(data)

    print("缺失值统计:")
    for col, ratio in missing_ratio.items():
        if ratio > 0:
            print(f"{col}: {ratio:.2%}")

    return missing_ratio

2. 一致性检查

def check_consistency(data):
    """检查数据一致性"""
    # 价格逻辑检查
    if all(col in data.columns for col in ['$open', '$high', '$low', '$close']):
        # 检查OHLC逻辑
        invalid_high = data['$high'] < data[['$open', '$close']].max(axis=1)
        invalid_low = data['$low'] > data[['$open', '$close']].min(axis=1)

        print(f"高价逻辑错误: {invalid_high.sum()}")
        print(f"低价逻辑错误: {invalid_low.sum()}")

    # 成交量检查
    if '$volume' in data.columns:
        negative_volume = data['$volume'] < 0
        print(f"负成交量: {negative_volume.sum()}")

3. 异常值检测

def detect_outliers(data, method='iqr'):
    """异常值检测"""
    outliers = {}

    for col in data.select_dtypes(include=[np.number]).columns:
        if method == 'iqr':
            # IQR方法
            Q1 = data[col].quantile(0.25)
            Q3 = data[col].quantile(0.75)
            IQR = Q3 - Q1
            lower_bound = Q1 - 1.5 * IQR
            upper_bound = Q3 + 1.5 * IQR

            outliers[col] = (data[col] < lower_bound) | (data[col] > upper_bound)

        elif method == 'zscore':
            # Z-score方法
            z_scores = np.abs((data[col] - data[col].mean()) / data[col].std())
            outliers[col] = z_scores > 3

    return outliers

数据质量报告

def generate_quality_report(data):
    """生成数据质量报告"""
    report = {
        '基本信息': {
            '数据形状': data.shape,
            '数据类型': data.dtypes.to_dict(),
            '内存使用': data.memory_usage(deep=True).sum()
        },
        '缺失值': data.isnull().sum().to_dict(),
        '重复值': data.duplicated().sum(),
        '唯一值': {col: data[col].nunique() for col in data.columns}
    }

    # 数值型数据统计
    numeric_cols = data.select_dtypes(include=[np.number]).columns
    if len(numeric_cols) > 0:
        report['数值统计'] = data[numeric_cols].describe().to_dict()

    return report

2.3 量化数据集介绍

2.3.1 Alpha158数据集详解

数据集概述

Alpha158是Qlib提供的一个经典量化数据集,包含158个经过精心设计的特征。

数据集特点:

  • 特征数量:158个特征
  • 数据频率:日频数据
  • 时间范围:2008-2020年
  • 股票范围:A股市场
  • 特征类型:价格、成交量、技术指标等

特征分类

1. 价格特征(Price Features)

# 基础价格特征
price_features = [
    '$open', '$high', '$low', '$close',  # OHLC
    '$vwap', '$adj',                     # 成交量加权平均价、复权价
]

2. 收益率特征(Return Features)

# 收益率特征
return_features = [
    'Ref($close, 1)',                    # 前一日收盘价
    'Ref($close, 2)',                    # 前二日收盘价
    '$close / Ref($close, 1) - 1',      # 日收益率
    'Ref($close, 1) / Ref($close, 2) - 1',  # 前一日收益率
]

3. 技术指标特征(Technical Indicators)

# 移动平均线
ma_features = [
    'Mean($close, 5)',   # 5日移动平均
    'Mean($close, 10)',  # 10日移动平均
    'Mean($close, 20)',  # 20日移动平均
    'Mean($close, 60)',  # 60日移动平均
]

# 波动率指标
volatility_features = [
    'Std($close, 5)',    # 5日标准差
    'Std($close, 10)',   # 10日标准差
    'Std($close, 20)',   # 20日标准差
]

4. 成交量特征(Volume Features)

# 成交量特征
volume_features = [
    '$volume',                           # 成交量
    'Mean($volume, 5)',                 # 5日平均成交量
    'Mean($volume, 10)',                # 10日平均成交量
    '$volume / Mean($volume, 20)',      # 成交量比率
]

数据集使用

1. 加载Alpha158数据

from qlib.contrib.data.handler import Alpha158

# 创建数据处理器
handler = Alpha158(
    instruments='csi300',
    start_time='2020-01-01',
    end_time='2020-12-31',
    freq='day'
)

# 获取训练数据
train_data = handler.fetch(
    segments={
        'train': ('2020-01-01', '2020-06-30'),
        'valid': ('2020-07-01', '2020-09-30'),
        'test': ('2020-10-01', '2020-12-31')
    }
)

2. 特征工程

# 特征处理
features = train_data['train']['feature']
labels = train_data['train']['label']

# 特征选择
selected_features = [
    '$close', '$volume', 'Ref($close, 1)',
    'Mean($close, 20)', 'Std($close, 20)',
    '$volume / Mean($volume, 20)'
]

X = features[selected_features]
y = labels['LABEL0']  # 未来收益率

2.3.2 Alpha360数据集详解

数据集概述

Alpha360是一个更高级的数据集,包含360个特征,主要基于原始价格和成交量数据构建。

数据集特点:

  • 特征数量:360个特征
  • 数据频率:日频数据
  • 时间范围:2008-2020年
  • 股票范围:A股市场
  • 特征类型:原始价格数据、技术指标、统计特征

特征构建原理

1. 原始数据特征

# 基础价格数据
raw_price_features = [
    '$open', '$high', '$low', '$close', '$volume',
    '$vwap', '$adj', '$factor'  # 复权因子
]

2. 时间序列特征

# 时间窗口特征
time_window_features = [
    # 过去1-60天的价格特征
    'Ref($close, 1)', 'Ref($close, 2)', ..., 'Ref($close, 60)',
    'Ref($volume, 1)', 'Ref($volume, 2)', ..., 'Ref($volume, 60)',

    # 统计特征
    'Mean($close, 5)', 'Mean($close, 10)', 'Mean($close, 20)',
    'Std($close, 5)', 'Std($close, 10)', 'Std($close, 20)',
]

3. 技术指标特征

# 技术指标
technical_features = [
    # 移动平均线
    'Mean($close, 5)', 'Mean($close, 10)', 'Mean($close, 20)',

    # 相对强弱指数
    'RSI($close, 14)',

    # 布林带
    'BB_upper($close, 20, 2)', 'BB_lower($close, 20, 2)',

    # MACD
    'MACD($close, 12, 26, 9)',
]

数据集优势

1. 特征丰富性

  • 包含更多原始数据特征
  • 时间序列特征更完整
  • 技术指标更全面

2. 数据质量

  • 经过严格的数据清洗
  • 特征工程更规范
  • 数据一致性更好

3. 适用性

  • 适合深度学习模型
  • 支持复杂特征组合
  • 便于模型优化

2.3.3 自定义数据集构建

数据集构建流程

1. 数据准备

import pandas as pd
import numpy as np
from qlib.data import D

def prepare_custom_data():
    """准备自定义数据"""
    # 获取股票列表
    instruments = D.instruments('csi300')

    # 获取基础数据
    data = D.features(
        instruments=instruments,
        fields=['$close', '$volume', '$open', '$high', '$low'],
        start_time='2020-01-01',
        end_time='2020-12-31',
        freq='day'
    )

    return data

2. 特征工程

def engineer_features(data):
    """特征工程"""
    # 计算技术指标
    data['MA5'] = data['$close'].rolling(window=5).mean()
    data['MA20'] = data['$close'].rolling(window=20).mean()
    data['RSI'] = calculate_rsi(data['$close'])
    data['VOLUME_MA5'] = data['$volume'].rolling(window=5).mean()

    # 计算收益率
    data['RETURN'] = data['$close'].pct_change()
    data['RETURN_5'] = data['$close'].pct_change(periods=5)

    # 计算波动率
    data['VOLATILITY'] = data['RETURN'].rolling(window=20).std()

    return data

3. 标签生成

def generate_labels(data, forward_days=1):
    """生成标签"""
    # 计算未来收益率
    future_return = data['$close'].shift(-forward_days) / data['$close'] - 1

    # 生成分类标签(1表示上涨,0表示下跌)
    labels = (future_return > 0).astype(int)

    return labels

数据集处理器

from qlib.contrib.data.handler import DataHandler

class CustomDataHandler(DataHandler):
    """自定义数据处理器"""

    def __init__(self, instruments, start_time, end_time, freq='day'):
        super().__init__(instruments, start_time, end_time, freq)

    def fetch(self, segments, **kwargs):
        """获取数据"""
        # 准备数据
        data = prepare_custom_data()

        # 特征工程
        data = engineer_features(data)

        # 生成标签
        labels = generate_labels(data)

        # 分割数据
        result = {}
        for segment, (start, end) in segments.items():
            mask = (data.index >= start) & (data.index <= end)
            segment_data = data[mask]
            segment_labels = labels[mask]

            result[segment] = {
                'feature': segment_data,
                'label': segment_labels
            }

        return result

2.3.4 数据集的评估指标

数据质量指标

1. 完整性指标

def calculate_completeness(data):
    """计算数据完整性"""
    total_elements = data.size
    non_null_elements = data.count().sum()
    completeness = non_null_elements / total_elements
    return completeness

2. 一致性指标

def calculate_consistency(data):
    """计算数据一致性"""
    # 检查价格逻辑
    if all(col in data.columns for col in ['$open', '$high', '$low', '$close']):
        valid_high = (data['$high'] >= data[['$open', '$close']].max(axis=1)).sum()
        valid_low = (data['$low'] <= data[['$open', '$close']].min(axis=1)).sum()
        total_records = len(data)

        consistency = (valid_high + valid_low) / (2 * total_records)
        return consistency
    return 1.0

3. 时效性指标

def calculate_timeliness(data, expected_freq='D'):
    """计算数据时效性"""
    # 检查数据频率
    time_diff = data.index.to_series().diff()
    expected_diff = pd.Timedelta(expected_freq)

    # 计算频率一致性
    freq_consistency = (time_diff == expected_diff).mean()
    return freq_consistency

特征质量指标

1. 特征相关性

def calculate_feature_correlation(features, labels):
    """计算特征与标签的相关性"""
    correlations = {}
    for col in features.columns:
        corr = features[col].corr(labels)
        correlations[col] = corr

    return pd.Series(correlations)

2. 特征重要性

from sklearn.ensemble import RandomForestRegressor

def calculate_feature_importance(features, labels):
    """计算特征重要性"""
    rf = RandomForestRegressor(n_estimators=100, random_state=42)
    rf.fit(features, labels)

    importance = pd.Series(rf.feature_importances_, index=features.columns)
    return importance.sort_values(ascending=False)

3. 特征稳定性

def calculate_feature_stability(features, time_periods):
    """计算特征稳定性"""
    stability_scores = {}

    for col in features.columns:
        feature_data = features[col]
        stability = []

        for i in range(len(time_periods) - 1):
            period1 = feature_data[time_periods[i]]
            period2 = feature_data[time_periods[i + 1]]

            # 计算分布相似性
            similarity = calculate_distribution_similarity(period1, period2)
            stability.append(similarity)

        stability_scores[col] = np.mean(stability)

    return pd.Series(stability_scores)

本章小结

本章深入介绍了Qlib数据层的各个方面,包括:

  1. 数据框架设计:理解了Qlib的数据存储格式、访问接口和缓存机制
  2. 数据获取与处理:掌握了数据获取、清洗、预处理和特征工程技术
  3. 量化数据集:熟悉了Alpha158和Alpha360数据集的特点和使用方法
  4. 数据质量检查:学会了数据质量评估和优化方法

课后练习

练习1:数据获取实践

  1. 下载并初始化Qlib数据
  2. 获取CSI300成分股的基础数据
  3. 计算简单的技术指标

练习2:特征工程

  1. 基于价格数据计算移动平均线
  2. 计算RSI、布林带等技术指标
  3. 构建自定义特征

练习3:数据质量检查

  1. 编写数据质量检查脚本
  2. 分析数据完整性和一致性
  3. 生成数据质量报告

扩展阅读

  1. 数据处理技术

    • 《Python数据科学手册》
    • 《特征工程:从入门到实践》
  2. 量化数据相关

    • 《量化投资数据挖掘》
    • 《金融时间序列分析》
  3. 技术指标

    • 《技术分析指标大全》
    • 《量化技术分析》


This content originally appeared on DEV Community and was authored by Henry Lin