Consolidate Python code under src/ directories

- Move cta_1d/loader.py, train.py, backtest.py to cta_1d/src/
- Move stock_15m/loader.py, train.py to stock_15m/src/
- Update root __init__.py files to re-export from src/ submodules
- Update src/__init__.py files with proper public API exports
- Update imports to use relative paths within src/
- Add missing create_experiment_dir export to common/__init__.py
- Remove legacy/ directories with old pandas implementations
- Remove test_new_loaders.py development test file

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
master
guofu 3 weeks ago
parent cdf6373325
commit 966c17d7a9

@ -1,12 +1,13 @@
"""Common utilities for alpha_lab experiments."""
from .paths import ensure_dir, get_results_dir, get_task_results_dir
from .paths import ensure_dir, get_results_dir, get_task_results_dir, create_experiment_dir
from .plotting import setup_plot_style, plot_ic_series, plot_cumulative_returns
__all__ = [
'ensure_dir',
'get_results_dir',
'get_task_results_dir',
'create_experiment_dir',
'setup_plot_style',
'plot_ic_series',
'plot_cumulative_returns',

@ -0,0 +1,75 @@
"""
CTA 1-day return prediction experiments.
This module provides dataset loading and experiment utilities for
CTA (Commodity Trading Advisor) 1-day return prediction.
Example:
>>> from alpha_lab.cta_1d import CTA1DLoader
>>>
>>> loader = CTA1DLoader(
... return_type='o2c_twap1min',
... normalization='dual',
... feature_sets=['alpha158', 'hffactor']
... )
>>> dataset = loader.load(dt_range=['2020-01-01', '2023-12-31'])
>>>
>>> # Define train/test split
>>> dataset = dataset.with_segments({
... 'train': ('2020-01-01', '2022-12-31'),
... 'test': ('2023-01-01', '2023-12-31')
... })
>>>
>>> # Extract training data
>>> X_train, y_train, w_train = dataset.split('train').to_numpy()
Training:
>>> from alpha_lab.cta_1d import train_model, TrainConfig
>>>
>>> config = TrainConfig(
... dt_range=['2020-01-01', '2023-12-31'],
... feature_sets=['alpha158'],
... normalization='dual'
... )
>>> model, metrics = train_model(config, output_dir='results/exp01')
Backtesting:
>>> from alpha_lab.cta_1d import run_backtest, BacktestConfig
>>>
>>> config = BacktestConfig(
... model_path='results/exp01/model.json',
... dt_range=['2023-01-01', '2023-12-31'],
... feature_sets=['alpha158']
... )
>>> results = run_backtest(config)
"""
# Re-export all public APIs from src submodules
from .src import (
CTA1DLoader,
get_blend_weights,
describe_blend_config,
BLEND_CONFIGS,
)
try:
from .src import train_model, TrainConfig
from .src import run_backtest, BacktestConfig
__all__ = [
'CTA1DLoader',
'get_blend_weights',
'describe_blend_config',
'BLEND_CONFIGS',
'train_model',
'TrainConfig',
'run_backtest',
'BacktestConfig',
]
except ImportError:
# xgboost or sklearn not installed
__all__ = [
'CTA1DLoader',
'get_blend_weights',
'describe_blend_config',
'BLEND_CONFIGS',
]

@ -1,5 +1,26 @@
"""CTA 1-day task-specific utilities."""
from .labels import get_blend_weights, describe_blend_config
from .loader import CTA1DLoader
from .labels import get_blend_weights, describe_blend_config, BLEND_CONFIGS
__all__ = ['get_blend_weights', 'describe_blend_config']
try:
from .train import train_model, TrainConfig
from .backtest import run_backtest, BacktestConfig
__all__ = [
'CTA1DLoader',
'train_model',
'TrainConfig',
'run_backtest',
'BacktestConfig',
'get_blend_weights',
'describe_blend_config',
'BLEND_CONFIGS',
]
except ImportError:
# xgboost or sklearn not installed
__all__ = [
'CTA1DLoader',
'get_blend_weights',
'describe_blend_config',
'BLEND_CONFIGS',
]

@ -0,0 +1,197 @@
"""
Backtest script for CTA 1-day return prediction models.
Example usage:
python -m alpha_lab.cta_1d.backtest \
--model results/experiment_01/model.json \
--config config.yaml \
--output results/backtest_01
Or programmatically:
from alpha_lab.cta_1d.backtest import run_backtest
results = run_backtest(
model_path='results/exp01/model.json',
dt_range=['2023-01-01', '2023-12-31'],
num_trades=4
)
"""
import argparse
import json
import logging
from pathlib import Path
from typing import Optional, Dict, Any, List
from dataclasses import dataclass
import numpy as np
import pandas as pd
import xgboost as xgb
from .loader import CTA1DLoader
from qshare.eval.cta.backtest import CTABacktester
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class BacktestConfig:
"""Backtest configuration."""
model_path: str
dt_range: list[str]
feature_sets: list[str]
normalization: str = 'dual'
num_trades: int = 4
signal_dist: str = 'normal'
pos_weight: bool = True
output_dir: Optional[str] = None
def parse_args() -> argparse.Namespace:
"""Parse command line arguments."""
parser = argparse.ArgumentParser(
description='Backtest CTA 1-day return prediction model'
)
parser.add_argument(
'--model', '-m',
type=str,
required=True,
help='Path to trained model JSON file'
)
parser.add_argument(
'--output', '-o',
type=str,
default='results/cta_1d_backtest',
help='Output directory for results'
)
parser.add_argument(
'--dt-range',
nargs=2,
metavar=('START', 'END'),
required=True,
help='Date range [start_date, end_date]'
)
parser.add_argument(
'--feature-sets',
nargs='+',
default=['alpha158'],
help='Feature sets to use (must match training)'
)
parser.add_argument(
'--normalization',
default='dual',
choices=['zscore', 'cs_zscore', 'rolling_20', 'rolling_60', 'dual'],
help='Label normalization (must match training)'
)
parser.add_argument(
'--num-trades',
type=int,
default=4,
help='Number of trades per day'
)
return parser.parse_args()
def load_model(model_path: str) -> xgb.Booster:
"""Load trained XGBoost model."""
model = xgb.Booster()
model.load_model(model_path)
return model
def run_backtest(
config: BacktestConfig
) -> Dict[str, Any]:
"""
Run backtest with given configuration.
Args:
config: Backtest configuration
Returns:
Dictionary with backtest results
"""
logger.info(f"Loading model from {config.model_path}")
model = load_model(config.model_path)
logger.info(f"Loading dataset for range {config.dt_range}")
loader = CTA1DLoader(
return_type='o2c_twap1min',
normalization=config.normalization,
feature_sets=config.feature_sets
)
dataset = loader.load(dt_range=config.dt_range)
X, y, _ = dataset.to_numpy()
feature_cols = dataset.features
logger.info(f"Loaded {len(X)} samples with {len(feature_cols)} features")
# Generate predictions
dmatrix = xgb.DMatrix(X)
predictions = model.predict(dmatrix)
# Run backtest
logger.info("Running backtest...")
backtester = CTABacktester(
num_trades=config.num_trades,
signal_dist=config.signal_dist,
pos_weight=config.pos_weight
)
results = backtester.run(y, predictions)
summary = backtester.summary()
logger.info(f"Backtest Results:")
logger.info(f" IC: {summary.get('ic', 'N/A')}")
logger.info(f" Return: {summary.get('total_return', 'N/A')}")
logger.info(f" Sharpe: {summary.get('sharpe', 'N/A')}")
# Save results if output_dir specified
if config.output_dir:
output_path = Path(config.output_dir)
output_path.mkdir(parents=True, exist_ok=True)
# Save summary
with open(output_path / 'backtest_summary.json', 'w') as f:
json.dump(summary, f, indent=2)
# Save predictions
pred_df = pd.DataFrame({
'actual': y,
'predicted': predictions,
'signal': backtester.signals if hasattr(backtester, 'signals') else predictions
})
pred_df.to_csv(output_path / 'predictions.csv', index=False)
logger.info(f"Results saved to {config.output_dir}")
return {
'summary': summary,
'predictions': predictions,
'actual': y
}
def main():
"""Main entry point."""
args = parse_args()
config = BacktestConfig(
model_path=args.model,
dt_range=args.dt_range,
feature_sets=args.feature_sets,
normalization=args.normalization,
num_trades=args.num_trades,
output_dir=args.output
)
results = run_backtest(config)
logger.info("Backtest complete!")
if __name__ == '__main__':
main()

@ -0,0 +1,448 @@
"""
CTA 1-day return prediction dataset loader.
Uses the new qshare.data framework with Dataset class and processors.
"""
from dataclasses import dataclass
from datetime import date
from typing import List, Optional
import numpy as np
import pandas as pd
import polars as pl
from qshare.data import pl_Dataset, pl_pipe, pl_clip, pl_cs_zscore
from qshare.data.universal import DataSpec
from qshare.io.ddb import get_ddb_sess, reset_index_from_ddb
from qshare.config.research.cta.features import HFFACTOR_COLS
from .labels import get_blend_weights
@dataclass
class CTA1DLoader:
"""
CTA 1-day return prediction dataset loader.
Loads features (alpha158, hffactor), labels, and calculates weights
for CTA futures daily return prediction tasks.
Example:
>>> loader = CTA1DLoader(
... return_type='o2c_twap1min',
... normalization='dual',
... feature_sets=['alpha158', 'hffactor']
... )
>>> dataset = loader.load(dt_range=['2020-01-01', '2023-12-31'])
>>> dataset = dataset.with_segments({
... 'train': ('2020-01-01', '2022-12-31'),
... 'test': ('2023-01-01', '2023-12-31')
... })
>>> X_train, y_train, w_train = dataset.split('train').to_numpy()
"""
return_type: str = 'o2c_twap1min'
normalization: str = 'dual'
feature_sets: List[str] = None
weight_factors: dict = None
blend_weights: str | List[float] | None = None
ddb_host: str = '192.168.1.146'
label_cap_upper: float = 0.5
label_cap_lower: float = -0.5
def __post_init__(self):
if self.feature_sets is None:
self.feature_sets = ['alpha158', 'hffactor']
if self.weight_factors is None:
self.weight_factors = {'positive': 1.0, 'negative': 2.0}
def load(
self,
dt_range: List[str],
fit_range: Optional[List[str]] = None
) -> pl_Dataset:
"""
Load and prepare CTA 1-day training dataset.
Args:
dt_range: Date range [start_date, end_date] for dataset
fit_range: Date range [start, end] for fitting normalization params.
If None, uses first 60% of dt_range.
Returns:
pl_Dataset with features, label, and weight columns
"""
start_date, end_date = dt_range
if fit_range is None:
# Default: use first 60% for fit
all_dates = pd.date_range(start_date, end_date)
split_idx = int(len(all_dates) * 0.6)
fit_range = [
all_dates[0].strftime('%Y-%m-%d'),
all_dates[split_idx].strftime('%Y-%m-%d')
]
# Load extended history for rolling normalization
load_start = (pd.to_datetime(start_date) - pd.Timedelta(days=120)).strftime('%Y-%m-%d')
# Load features
df_features = self._load_features(load_start, end_date)
# Load and normalize labels
df_label = self._load_labels(load_start, end_date, fit_range)
# Combine
df = df_features.join(df_label, on=['datetime', 'instrument'], how='inner')
# Filter to requested date range
df = df.filter(
(pl.col('datetime') >= start_date) &
(pl.col('datetime') <= end_date)
)
# Calculate weights
df = self._calculate_weights(df)
# Clean data
df = self._clean_data(df)
# Get feature columns
feature_cols = [c for c in df.columns
if any(c.startswith(prefix) for prefix in ['f_a158_', 'f_hf_'])]
return pl_Dataset(
data=df,
features=feature_cols,
label='label',
weight='weight' if self.weight_factors else None
)
def _load_features(self, start_date: str, end_date: str) -> pl.DataFrame:
"""Load feature data from DolphinDB."""
sess = get_ddb_sess(host=self.ddb_host)
try:
feature_dfs = []
if 'alpha158' in self.feature_sets:
df_alpha = self._load_alpha158(sess, start_date, end_date)
feature_dfs.append(df_alpha)
if 'hffactor' in self.feature_sets:
df_hf = self._load_hffactor(sess, start_date, end_date)
feature_dfs.append(df_hf)
# Join all feature sets
result = feature_dfs[0]
for df in feature_dfs[1:]:
result = result.join(df, on=['datetime', 'instrument'], how='inner')
return result
finally:
sess.close()
def _load_alpha158(self, sess, start_date: str, end_date: str) -> pl.DataFrame:
"""Load alpha158 features from DolphinDB."""
since_ddb = pd.to_datetime(start_date).strftime('%Y.%m.%d')
df = sess.run(f"""
select code, m_nDate, *
from loadTable('dfs://daily_stock_run', 'stg_1day_tinysoft_cta_alpha159_0_7_beta')
where m_nDate >= {since_ddb}
""")
df = reset_index_from_ddb(df)
# Drop non-numeric columns
if 'code_init' in df.columns:
df = df.drop(columns=['code_init'])
# Convert to polars and add prefix
pl_df = pl.from_pandas(df.reset_index())
pl_df = pl_df.rename({
c: f'f_a158_{c}'
for c in pl_df.columns
if c not in ['datetime', 'instrument']
})
return pl_df
def _load_hffactor(self, sess, start_date: str, end_date: str) -> pl.DataFrame:
"""Load hffactor features from DolphinDB."""
since_ddb = pd.to_datetime(start_date).strftime('%Y.%m.%d')
# Load from factor table
df = sess.run(f"""
select code, m_nDate, factor_name, value
from loadTable('dfs://daily_stock_run', 'stg_1day_tinysoft_cta_hffactor')
where m_nDate >= {since_ddb}
and factor_name in [{','.join([f"'{c}'" for c in HFFACTOR_COLS])}]
""")
# Pivot to wide format
df = df.pivot_table(
index=['code', 'm_nDate'],
columns='factor_name',
values='value'
).reset_index()
df = reset_index_from_ddb(df)
# Convert to polars and add prefix
pl_df = pl.from_pandas(df.reset_index())
pl_df = pl_df.rename({
c: f'f_hf_{c}'
for c in pl_df.columns
if c not in ['datetime', 'instrument']
})
return pl_df
def _load_labels(
self,
start_date: str,
end_date: str,
fit_range: List[str]
) -> pl.DataFrame:
"""Load and normalize labels."""
sess = get_ddb_sess(host=self.ddb_host)
try:
# Map return type to indicator name
indicator_map = {
'o2c_twap1min': 'twap_open1m@1_twap_close1m@1',
'o2o_twap1min': 'twap_open1m@1_twap_open1m@2',
}
indicator = indicator_map.get(self.return_type, self.return_type)
since_ddb = pd.to_datetime(start_date).strftime('%Y.%m.%d')
# Load dominant contract mapping
df_contract = sess.run(f"""
select first(code) as code, m_nDate, code_init
from loadTable('dfs://daily_stock_run', 'dwm_1day_cta_dom')
where m_nDate >= {since_ddb} and version='vp_csmax_roll2_cummax'
group by m_nDate, code_init
""")
# Load returns
df_return = sess.run(f"""
select code, m_nDate, value as ret
from loadTable('dfs://daily_stock_run', 'stg_1day_tinysoft_cta_hfvalue')
where indicator='{indicator}' and m_nDate >= {since_ddb}
""")
# Merge with dominant contract mapping
df_return = pd.merge(
left=df_return[['code', 'm_nDate', 'ret']],
right=df_contract,
on=['code', 'm_nDate'],
how='inner'
)
# Convert to index format
df_return['code'] = df_return['code_init'] + 'Ind'
df_return = df_return[['code', 'm_nDate', 'ret']]
df_return = reset_index_from_ddb(df_return)
# Convert to polars
pl_df = pl.from_pandas(df_return.reset_index())
# Apply normalization
pl_df = self._normalize_label(pl_df, fit_range)
return pl_df
finally:
sess.close()
def _normalize_label(self, pl_df: pl.DataFrame, fit_range: List[str]) -> pl.DataFrame:
"""Apply specified normalization to label."""
fit_start, fit_end = fit_range
# Ensure datetime column is string for comparison
if pl_df['datetime'].dtype == pl.Date:
pl_df = pl_df.with_columns(
pl.col('datetime').dt.strftime('%Y-%m-%d').alias('datetime_str')
)
date_col = 'datetime_str'
else:
date_col = 'datetime'
if self.normalization == 'zscore':
# Calculate mean/std on fit range
fit_data = pl_df.filter(
(pl.col(date_col) >= fit_start) &
(pl.col(date_col) <= fit_end)
)
mean = fit_data['ret'].mean()
std = fit_data['ret'].std()
result = pl_df.with_columns(
((pl.col('ret') - mean) / std).clip(
self.label_cap_lower, self.label_cap_upper
).alias('label')
).select(['datetime', 'instrument', 'label'])
return result
elif self.normalization == 'cs_zscore':
# Cross-sectional z-score per datetime
return pl_df.with_columns(
((pl.col('ret') - pl.col('ret').mean().over('datetime')) /
pl.col('ret').std().over('datetime')).clip(
self.label_cap_lower, self.label_cap_upper
).alias('label')
).select(['datetime', 'instrument', 'label'])
elif self.normalization == 'rolling_20':
return self._apply_rolling_norm(pl_df, window=20, fit_range=fit_range)
elif self.normalization == 'rolling_60':
return self._apply_rolling_norm(pl_df, window=60, fit_range=fit_range)
elif self.normalization == 'dual':
# Create all normalization variants
label_zscore = self._normalize_zscore(pl_df, fit_range)
label_cszscore = self._normalize_cs_zscore(pl_df)
label_roll20 = self._normalize_rolling(pl_df, window=20, fit_range=fit_range)
label_roll60 = self._normalize_rolling(pl_df, window=60, fit_range=fit_range)
# Get blend weights
weights = get_blend_weights(self.blend_weights)
# Join and blend
pl_df = label_zscore.join(label_cszscore, on=['datetime', 'instrument'])
pl_df = pl_df.join(label_roll20, on=['datetime', 'instrument'])
pl_df = pl_df.join(label_roll60, on=['datetime', 'instrument'])
return pl_df.with_columns(
(weights[0] * pl.col('label_zscore') +
weights[1] * pl.col('label_cszscore') +
weights[2] * pl.col('label_roll20') +
weights[3] * pl.col('label_roll60')).clip(
self.label_cap_lower, self.label_cap_upper
).alias('label')
).select(['datetime', 'instrument', 'label'])
else:
raise ValueError(f"Unknown normalization: {self.normalization}")
def _normalize_zscore(self, pl_df: pl.DataFrame, fit_range: List[str]) -> pl.DataFrame:
"""Create z-score normalized label."""
fit_start, fit_end = fit_range
# Handle date type conversion for comparison
if pl_df['datetime'].dtype == pl.Date:
fit_data = pl_df.filter(
(pl.col('datetime').dt.strftime('%Y-%m-%d') >= fit_start) &
(pl.col('datetime').dt.strftime('%Y-%m-%d') <= fit_end)
)
else:
fit_data = pl_df.filter(
(pl.col('datetime') >= fit_start) &
(pl.col('datetime') <= fit_end)
)
mean = fit_data['ret'].mean()
std = fit_data['ret'].std()
return pl_df.with_columns(
((pl.col('ret') - mean) / std).alias('label_zscore')
).select(['datetime', 'instrument', 'label_zscore'])
def _normalize_cs_zscore(self, pl_df: pl.DataFrame) -> pl.DataFrame:
"""Create cross-sectional z-score normalized label."""
return pl_df.with_columns(
((pl.col('ret') - pl.col('ret').mean().over('datetime')) /
pl.col('ret').std().over('datetime')).alias('label_cszscore')
).select(['datetime', 'instrument', 'label_cszscore'])
def _normalize_rolling(
self,
pl_df: pl.DataFrame,
window: int,
fit_range: List[str]
) -> pl.DataFrame:
"""Create rolling window normalized label."""
# Convert to pandas for rolling calculation
pd_df = pl_df.to_pandas().set_index(['datetime', 'instrument'])
# Unstack to wide format
df_wide = pd_df['ret'].unstack('instrument')
# Calculate rolling mean and std
rolling_mean = df_wide.rolling(window=window, min_periods=window//2).mean()
rolling_std = df_wide.rolling(window=window, min_periods=window//2).std()
# Normalize
df_normalized = (df_wide - rolling_mean) / rolling_std
# Restack
rolling_label = df_normalized.stack().reset_index()
rolling_label.columns = ['datetime', 'instrument', f'label_roll{window}']
return pl.from_pandas(rolling_label)
def _apply_rolling_norm(
self,
pl_df: pl.DataFrame,
window: int,
fit_range: List[str]
) -> pl.DataFrame:
"""Apply rolling normalization and cap."""
result = self._normalize_rolling(pl_df, window, fit_range)
return result.with_columns(
pl.col(f'label_roll{window}').clip(
self.label_cap_lower, self.label_cap_upper
).alias('label')
).select(['datetime', 'instrument', 'label'])
def _calculate_weights(self, pl_df: pl.DataFrame) -> pl.DataFrame:
"""Calculate sample weights based on return magnitude."""
# Base weights by return magnitude tiers
pl_df = pl_df.with_columns(
pl.when(pl.col('label').abs() > 1.5).then(pl.lit(2.5))
.when(pl.col('label').abs() > 1.0).then(pl.lit(2.0))
.when(pl.col('label').abs() > 0.5).then(pl.lit(1.5))
.when(pl.col('label').abs() > 0.2).then(pl.lit(1.0))
.otherwise(0.0).alias('weight')
)
# Apply negative return multiplier
if self.weight_factors.get('negative'):
pl_df = pl_df.with_columns(
pl.when(pl.col('label') < -0.5)
.then(pl.col('weight') * self.weight_factors['negative'])
.otherwise(pl.col('weight'))
.alias('weight')
)
# Apply positive return multiplier
if self.weight_factors.get('positive'):
pl_df = pl_df.with_columns(
pl.when(pl.col('label') > 0.5)
.then(pl.col('weight') * self.weight_factors['positive'])
.otherwise(pl.col('weight'))
.alias('weight')
)
return pl_df
def _clean_data(self, pl_df: pl.DataFrame) -> pl.DataFrame:
"""Clean data: remove inf/nan values."""
# Get numeric columns only
numeric_cols = [
c for c in pl_df.columns
if pl_df[c].dtype in [pl.Float32, pl.Float64, pl.Int32, pl.Int64]
]
# Replace inf with null, then drop nulls
pl_df = pl_df.with_columns([
pl.when(pl.col(c).is_infinite()).then(None).otherwise(pl.col(c)).alias(c)
for c in numeric_cols
])
return pl_df.drop_nulls()

@ -0,0 +1,286 @@
"""
Training script for CTA 1-day return prediction models.
Example usage:
python -m alpha_lab.cta_1d.train \
--config config.yaml \
--output results/experiment_01
Or programmatically:
from alpha_lab.cta_1d.train import train_model
model, metrics = train_model(
dt_range=['2020-01-01', '2023-12-31'],
feature_sets=['alpha158'],
normalization='dual',
model_type='xgb',
output_dir='results/exp01'
)
"""
import argparse
import json
import logging
from pathlib import Path
from typing import Optional, Dict, Any
from dataclasses import dataclass
import numpy as np
import pandas as pd
from sklearn.metrics import r2_score
import xgboost as xgb
from .loader import CTA1DLoader
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class TrainConfig:
"""Training configuration."""
dt_range: list[str]
feature_sets: list[str]
normalization: str = 'dual'
blend_weights: Optional[str] = None
model_type: str = 'xgb'
model_params: Optional[Dict[str, Any]] = None
segments: Optional[Dict[str, tuple]] = None
def __post_init__(self):
if self.model_params is None:
self.model_params = {}
if self.segments is None:
# Default: 60% train, 20% valid, 20% test
self.segments = {
'train': (self.dt_range[0], '2022-06-30'),
'valid': ('2022-07-01', '2022-12-31'),
'test': ('2023-01-01', self.dt_range[1])
}
def parse_args() -> argparse.Namespace:
"""Parse command line arguments."""
parser = argparse.ArgumentParser(
description='Train CTA 1-day return prediction model'
)
parser.add_argument(
'--config', '-c',
type=str,
help='Path to config YAML file'
)
parser.add_argument(
'--output', '-o',
type=str,
default='results/cta_1d_experiment',
help='Output directory for results'
)
parser.add_argument(
'--dt-range',
nargs=2,
metavar=('START', 'END'),
help='Date range [start_date, end_date]'
)
parser.add_argument(
'--feature-sets',
nargs='+',
default=['alpha158'],
help='Feature sets to use'
)
parser.add_argument(
'--normalization',
default='dual',
choices=['zscore', 'cs_zscore', 'rolling_20', 'rolling_60', 'dual'],
help='Label normalization method'
)
parser.add_argument(
'--model-type',
default='xgb',
choices=['xgb', 'linear'],
help='Model type'
)
return parser.parse_args()
def load_config(args: argparse.Namespace) -> TrainConfig:
"""Load configuration from args or YAML file."""
if args.config:
import yaml
with open(args.config) as f:
config_dict = yaml.safe_load(f)
return TrainConfig(**config_dict)
# Create config from CLI args
return TrainConfig(
dt_range=args.dt_range,
feature_sets=args.feature_sets,
normalization=args.normalization,
model_type=args.model_type
)
def train_xgb_model(
X_train: np.ndarray,
y_train: np.ndarray,
w_train: Optional[np.ndarray],
X_valid: np.ndarray,
y_valid: np.ndarray,
params: Optional[Dict] = None
) -> xgb.Booster:
"""Train XGBoost model."""
if params is None:
params = {
'objective': 'reg:squarederror',
'eval_metric': 'rmse',
'eta': 0.05,
'max_depth': 6,
'subsample': 0.8,
'colsample_bytree': 0.8,
'seed': 42
}
dtrain = xgb.DMatrix(X_train, label=y_train, weight=w_train)
dvalid = xgb.DMatrix(X_valid, label=y_valid)
logger.info("Training XGBoost model...")
model = xgb.train(
params,
dtrain,
num_boost_round=500,
evals=[(dtrain, 'train'), (dvalid, 'valid')],
early_stopping_rounds=50,
verbose_eval=50
)
return model
def evaluate_model(
model: xgb.Booster,
X: np.ndarray,
y: np.ndarray,
dataset_name: str = 'dataset'
) -> Dict[str, float]:
"""Evaluate model and return metrics."""
dmatrix = xgb.DMatrix(X)
predictions = model.predict(dmatrix)
# Calculate metrics
ic = np.corrcoef(predictions, y)[0, 1]
r2 = r2_score(y, predictions)
metrics = {
f'{dataset_name}_ic': ic,
f'{dataset_name}_r2': r2,
}
logger.info(f"{dataset_name} - IC: {ic:.4f}, R²: {r2:.4f}")
return metrics
def train_model(
config: TrainConfig,
output_dir: Optional[str] = None
) -> tuple[xgb.Booster, Dict]:
"""
Train CTA model with given configuration.
Args:
config: Training configuration
output_dir: Directory to save results (optional)
Returns:
Tuple of (trained_model, metrics_dict)
"""
logger.info(f"Loading dataset for range {config.dt_range}")
# Load dataset
loader = CTA1DLoader(
return_type='o2c_twap1min',
normalization=config.normalization,
feature_sets=config.feature_sets,
blend_weights=config.blend_weights
)
dataset = loader.load(dt_range=config.dt_range)
dataset = dataset.with_segments(config.segments)
feature_cols = dataset.features
logger.info(f"Loaded {len(feature_cols)} features")
# Extract data for each split
train_data = dataset.split('train')
valid_data = dataset.split('valid')
test_data = dataset.split('test')
X_train, y_train, w_train = train_data.to_numpy()
X_valid, y_valid, _ = valid_data.to_numpy()
X_test, y_test, _ = test_data.to_numpy()
logger.info(f"Train size: {len(X_train)}, Valid: {len(X_valid)}, Test: {len(X_test)}")
# Train model
if config.model_type == 'xgb':
model = train_xgb_model(
X_train, y_train, w_train,
X_valid, y_valid,
config.model_params
)
else:
raise ValueError(f"Unsupported model type: {config.model_type}")
# Evaluate
metrics = {}
metrics.update(evaluate_model(model, X_train, y_train, 'train'))
metrics.update(evaluate_model(model, X_valid, y_valid, 'valid'))
metrics.update(evaluate_model(model, X_test, y_test, 'test'))
# Save results if output_dir specified
if output_dir:
output_path = Path(output_dir)
output_path.mkdir(parents=True, exist_ok=True)
# Save model
model.save_model(str(output_path / 'model.json'))
# Save metrics
with open(output_path / 'metrics.json', 'w') as f:
json.dump(metrics, f, indent=2)
# Save config
with open(output_path / 'config.json', 'w') as f:
json.dump({
'dt_range': config.dt_range,
'feature_sets': config.feature_sets,
'normalization': config.normalization,
'model_type': config.model_type,
'segments': config.segments
}, f, indent=2)
# Save feature importance
importance = model.get_score(importance_type='gain')
importance_df = pd.DataFrame([
{'feature': feature_cols[int(k[1:])], 'importance': v}
for k, v in importance.items()
]).sort_values('importance', ascending=False)
importance_df.to_csv(output_path / 'feature_importance.csv', index=False)
logger.info(f"Results saved to {output_dir}")
return model, metrics
def main():
"""Main entry point."""
args = parse_args()
config = load_config(args)
model, metrics = train_model(config, output_dir=args.output)
logger.info("Training complete!")
logger.info(f"Final metrics: {metrics}")
if __name__ == '__main__':
main()

@ -0,0 +1,43 @@
"""
Stock 15-minute return prediction experiments.
This module provides dataset loading and experiment utilities for
stock 15-minute (intraday) return prediction.
Example:
>>> from alpha_lab.stock_15m import Stock15mLoader
>>>
>>> loader = Stock15mLoader(
... normalization_mode='dual',
... positive_factor=1.0,
... negative_factor=2.0
... )
>>> dataset = loader.load(
... dt_range=['2020-01-01', '2023-12-31'],
... feature_path='/data/parquet/stock_1min_alpha158',
... kline_path='/data/parquet/stock_1min_kline'
... )
>>>
>>> # Extract training data
>>> X_train, y_train, w_train = dataset.to_numpy()
Training:
>>> from alpha_lab.stock_15m import train_model, TrainConfig
>>>
>>> config = TrainConfig(
... dt_range=['2020-01-01', '2023-12-31'],
... feature_path='/data/parquet/stock_1min_alpha158',
... kline_path='/data/parquet/stock_1min_kline'
... )
>>> model, metrics = train_model(config, output_dir='results/exp01')
"""
# Re-export all public APIs from src submodules
from .src import Stock15mLoader
try:
from .src import train_model, TrainConfig
__all__ = ['Stock15mLoader', 'train_model', 'TrainConfig']
except ImportError:
# xgboost or sklearn not installed
__all__ = ['Stock15mLoader']

@ -1,3 +1,10 @@
"""Stock 15m task-specific utilities."""
# Add task-specific functions here as needed
from .loader import Stock15mLoader
try:
from .train import train_model, TrainConfig
__all__ = ['Stock15mLoader', 'train_model', 'TrainConfig']
except ImportError:
# xgboost or sklearn not installed
__all__ = ['Stock15mLoader']

@ -0,0 +1,389 @@
"""
Stock 15-minute return prediction dataset loader.
Uses the new qshare.data framework with Dataset class and processors.
"""
from dataclasses import dataclass
from typing import List, Optional
import numpy as np
import polars as pl
from qshare.data import pl_Dataset, pl_pipe, pl_clip, pl_cs_zscore
from qshare.io.polars import load_from_pq
from qshare.algo.polars import ntrl_by_idx, cs_zscore
from qshare.algo.polars.limit import detect_limit_stopping
# Column definitions
COLS_INDEX_1MIN = ['datetime', 'instrument', 'm_nTime']
COLS_INDEX_1DAY = ['datetime', 'instrument']
@dataclass
class Stock15mLoader:
"""
Stock 15-minute return prediction dataset loader.
Loads Alpha158 features at 1-minute frequency and constructs 15-minute
forward returns for stock return prediction tasks.
Example:
>>> loader = Stock15mLoader(
... normalization_mode='dual',
... positive_factor=1.0,
... negative_factor=2.0
... )
>>> dataset = loader.load(
... dt_range=['2020-01-01', '2023-12-31'],
... feature_path='/data/parquet/stock_1min_alpha158',
... kline_path='/data/parquet/stock_1min_kline'
... )
"""
normalization_mode: str = 'industry'
positive_factor: Optional[float] = None
negative_factor: Optional[float] = None
label_cap_upper: float = 0.5
label_cap_lower: float = -0.5
def __post_init__(self):
valid_modes = ['industry', 'cs_zscore', 'dual']
if self.normalization_mode not in valid_modes:
raise ValueError(f"normalization_mode must be one of {valid_modes}")
def load(
self,
dt_range: List[str],
feature_path: str,
kline_path: str,
industry_path: Optional[str] = None
) -> pl_Dataset:
"""
Load and prepare 15-minute return training dataset.
Args:
dt_range: Date range [start_date, end_date]
feature_path: Path to Alpha158 features parquet
kline_path: Path to kline data parquet
industry_path: Path to industry index data (optional)
Returns:
pl_Dataset with features, label, and weight columns
"""
# Load data
pl_ldf_a158 = self._load_features(feature_path, dt_range)
pl_ldf_kline_1min = self._load_kline(kline_path, '1min', dt_range)
pl_ldf_kline_1day = self._load_kline(kline_path, '1day', dt_range)
if industry_path:
pl_ldf_indus = self._load_industry(industry_path, dt_range)
else:
pl_ldf_indus = None
# Get feature columns from schema
a158_schema = pl_ldf_a158.collect_schema()['a158']
cols_a158 = [f.name for f in a158_schema.fields]
# Determine feature columns and target name based on mode
if self.normalization_mode == 'dual':
cols_feature = (
cols_a158 +
[f"{name}_ntrl" for name in cols_a158] +
['dist_to_limit', 'dist_to_stopping', 'timestep']
)
target_col = 'return_15min_dual'
else:
cols_feature = cols_a158 + ['dist_to_limit', 'dist_to_stopping', 'timestep']
target_col = (
'return_15min_ntrlz_1min'
if self.normalization_mode == 'industry'
else 'return_15min_csz'
)
# Compute limit/stopping features
pl_ldf_limit = self._compute_limit_features(pl_ldf_kline_1day, pl_ldf_kline_1min)
# Calculate forward returns
pl_ldf_return = self._calculate_returns(pl_ldf_kline_1min)
# Normalize returns
pl_ldf_return = self._normalize_returns(pl_ldf_return, pl_ldf_indus, target_col)
# Cap returns at limits
pl_ldf_return = self._cap_returns(pl_ldf_return.join(pl_ldf_limit, on=COLS_INDEX_1MIN), target_col)
# Normalize features
pl_ldf_a158 = self._normalize_features(pl_ldf_a158, pl_ldf_indus)
# Create extra features
pl_ldf_extra = self._create_extra_features(pl_ldf_limit, pl_ldf_kline_1min)
# Join features with extra
pl_ldf_features = self._join_features(pl_ldf_a158, pl_ldf_extra)
# Join everything
pl_df = pl_ldf_features.join(
pl_ldf_return.select(COLS_INDEX_1MIN + [target_col]),
on=COLS_INDEX_1MIN,
how='inner'
).collect()
# Clean data
pl_df = self._clean_data(pl_df, cols_feature)
# Calculate weights
pl_df = self._calculate_weights(pl_df, target_col)
return pl_Dataset(
data=pl_df,
features=cols_feature,
label=target_col,
weight='weight' if self.positive_factor or self.negative_factor else None
)
def _load_features(self, path: str, dt_range: List[str]) -> pl.LazyFrame:
"""Load Alpha158 features."""
return load_from_pq(
path=path,
table_alias='a158',
start_time=dt_range[0],
as_struct=True
)
def _load_kline(self, path: str, freq: str, dt_range: List[str]) -> pl.LazyFrame:
"""Load kline data."""
return load_from_pq(
path=path,
table_alias=f'kline_{freq}',
start_time=dt_range[0]
)
def _load_industry(self, path: str, dt_range: List[str]) -> pl.LazyFrame:
"""Load industry index data."""
return load_from_pq(
path=path,
table_alias='indus_idx',
start_time=dt_range[0]
)
def _compute_limit_features(
self,
pl_ldf_kline_1day: pl.LazyFrame,
pl_ldf_kline_1min: pl.LazyFrame
) -> pl.LazyFrame:
"""Compute limit/stopping features."""
pl_ldf_limit = detect_limit_stopping(
pl_ldf_kline_1day=pl_ldf_kline_1day,
pl_ldf_kline_1min=pl_ldf_kline_1min,
)
return pl_ldf_limit.select(
COLS_INDEX_1MIN,
(pl.col('Limit') - pl.col('return_from_yclose')).alias('dist_to_limit'),
(pl.col('return_from_yclose') - pl.col('Stopping')).alias('dist_to_stopping'),
pl.col(['meet_limit', 'meet_stopping']).shift(-16).over(
['datetime', 'instrument'], order_by='m_nTime'
).fill_null(pl.lit(False))
)
def _calculate_returns(self, pl_ldf_kline_1min: pl.LazyFrame) -> pl.LazyFrame:
"""Calculate 15-minute forward returns: close[t+16] / close[t+1] - 1."""
return pl_ldf_kline_1min.select(
COLS_INDEX_1MIN,
pl.struct(
(pl.col('close').shift(-16) / pl.col('close').shift(-1) - 1)
.over(['datetime', 'instrument'], order_by='m_nTime')
.alias('return_15min')
).alias('return')
)
def _normalize_returns(
self,
pl_ldf_return: pl.LazyFrame,
pl_ldf_indus: Optional[pl.LazyFrame],
target_col: str
) -> pl.LazyFrame:
"""Normalize returns based on mode."""
if self.normalization_mode == 'cs_zscore':
return pl_ldf_return.with_columns(
((pl.col('return').struct.field('return_15min') -
pl.col('return').struct.field('return_15min').mean()) /
pl.col('return').struct.field('return_15min').std())
.over(['datetime', 'm_nTime']).alias(target_col)
)
elif self.normalization_mode == 'dual':
# Create both normalizations
pl_ldf_csz = pl_ldf_return.with_columns(
((pl.col('return').struct.field('return_15min') -
pl.col('return').struct.field('return_15min').mean()) /
pl.col('return').struct.field('return_15min').std())
.over(['datetime', 'm_nTime']).alias('return_15min_csz')
)
if pl_ldf_indus is not None:
pl_ldf_ntrl = pl_ldf_return.join(
pl_ldf_indus, on=COLS_INDEX_1DAY, how='inner'
).pipe(
ntrl_by_idx,
input_col='return',
idx_col='indus_idx',
partition_cols=['datetime', 'm_nTime']
).unnest('return').rename({'return_15min': 'return_15min_ntrlz_1min'})
pl_ldf_csz = pl_ldf_csz.join(
pl_ldf_ntrl, on=COLS_INDEX_1MIN, how='inner'
)
return pl_ldf_csz.with_columns(
(0.8 * pl.col('return_15min_ntrlz_1min') +
0.2 * pl.col('return_15min_csz')).alias(target_col)
)
else: # industry
if pl_ldf_indus is None:
raise ValueError("industry_path required for industry normalization")
return pl_ldf_return.join(
pl_ldf_indus, on=COLS_INDEX_1DAY, how='inner'
).pipe(
ntrl_by_idx,
input_col='return',
idx_col='indus_idx',
partition_cols=['datetime', 'm_nTime']
).unnest('return').rename({'return_15min': target_col})
def _cap_returns(
self,
pl_ldf_return: pl.LazyFrame,
target_col: str
) -> pl.LazyFrame:
"""Cap returns at limit up/down."""
return pl_ldf_return.with_columns(
pl.when(pl.col('meet_limit'))
.then(pl.lit(self.label_cap_upper))
.when(pl.col('meet_stopping'))
.then(pl.lit(self.label_cap_lower))
.otherwise(pl.col(target_col))
.alias(target_col)
)
def _normalize_features(
self,
pl_ldf_a158: pl.LazyFrame,
pl_ldf_indus: Optional[pl.LazyFrame]
) -> pl.LazyFrame:
"""Normalize Alpha158 features."""
if self.normalization_mode == 'cs_zscore':
return pl_ldf_a158.collect().lazy().pipe(
cs_zscore,
input_col='a158',
partition_cols=['datetime', 'm_nTime']
).unnest('a158')
elif self.normalization_mode == 'dual':
# Create both normalizations
pl_ldf_csz = pl_ldf_a158.collect().lazy().pipe(
cs_zscore,
input_col='a158',
partition_cols=['datetime', 'm_nTime']
).unnest('a158')
if pl_ldf_indus is not None:
pl_ldf_ntrl = pl_ldf_a158.join(
pl_ldf_indus, on=COLS_INDEX_1DAY, how='inner'
).collect().lazy().pipe(
ntrl_by_idx,
input_col='a158',
idx_col='indus_idx',
partition_cols=['datetime', 'm_nTime']
).unnest('a158').with_columns(
pl.all().name.suffix('_ntrl')
)
return pl_ldf_csz.join(pl_ldf_ntrl, on=COLS_INDEX_1MIN, how='inner')
return pl_ldf_csz
else: # industry
if pl_ldf_indus is None:
raise ValueError("industry_path required for industry normalization")
return pl_ldf_a158.join(
pl_ldf_indus, on=COLS_INDEX_1DAY, how='inner'
).collect().lazy().pipe(
ntrl_by_idx,
input_col='a158',
idx_col='indus_idx',
partition_cols=['datetime', 'm_nTime']
).unnest('a158')
def _create_extra_features(
self,
pl_ldf_limit: pl.LazyFrame,
pl_ldf_kline_1min: pl.LazyFrame
) -> pl.DataFrame:
"""Create extra features: timestep."""
# Infer timestep from m_nTime
pl_df_timestep = pl_ldf_kline_1min.select(
pl.col('m_nTime'),
((pl.col('m_nTime').rank(method='ordinal').over(['datetime', 'instrument']) - 1) / 239)
.alias('timestep')
).unique('m_nTime').sort('m_nTime').collect()
return pl_ldf_limit.select(
COLS_INDEX_1MIN,
pl.col(['dist_to_limit', 'dist_to_stopping'])
).join(
pl_df_timestep,
on='m_nTime',
how='inner'
)
def _join_features(
self,
pl_ldf_a158: pl.LazyFrame,
pl_ldf_extra: pl.DataFrame
) -> pl.LazyFrame:
"""Join features with extra features."""
if self.normalization_mode != 'dual':
return pl_ldf_a158.join(
pl_ldf_extra, on=COLS_INDEX_1MIN, how='inner'
).unnest('a158')
else:
return pl_ldf_a158.join(
pl_ldf_extra, on=COLS_INDEX_1MIN, how='inner'
)
def _clean_data(self, pl_df: pl.DataFrame, cols_feature: List[str]) -> pl.DataFrame:
"""Clean data: remove inf/nan values."""
pl_df = pl_df.with_columns(
pl.col(cols_feature).replace([np.inf, -np.inf], None)
).drop_nulls().fill_nan(0.)
return pl_df
def _calculate_weights(self, pl_df: pl.DataFrame, target_col: str) -> pl.DataFrame:
"""Calculate sample weights."""
# Base weights by return magnitude tiers
pl_df = pl_df.with_columns(
pl.when(pl.col(target_col).abs() > 1.5).then(pl.lit(2.5))
.when(pl.col(target_col).abs() > 1.0).then(pl.lit(2.0))
.when(pl.col(target_col).abs() > 0.5).then(pl.lit(1.5))
.when(pl.col(target_col).abs() > 0.2).then(pl.lit(1.0))
.otherwise(0.0).alias('weight')
)
# Apply multipliers
if self.negative_factor:
pl_df = pl_df.with_columns(
pl.when(pl.col(target_col) < -0.5)
.then(pl.col('weight') * self.negative_factor)
.otherwise(pl.col('weight'))
.alias('weight')
)
if self.positive_factor:
pl_df = pl_df.with_columns(
pl.when(pl.col(target_col) > 0.5)
.then(pl.col('weight') * self.positive_factor)
.otherwise(pl.col('weight'))
.alias('weight')
)
return pl_df

@ -0,0 +1,208 @@
"""
Training script for stock 15-minute return prediction models.
Example usage:
python -m alpha_lab.stock_15m.train \
--config config.yaml \
--output results/stock_15m_exp01
"""
import argparse
import json
import logging
from pathlib import Path
from typing import Optional, Dict, Any
from dataclasses import dataclass
import numpy as np
import pandas as pd
from sklearn.metrics import r2_score
import xgboost as xgb
from .loader import Stock15mLoader
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class TrainConfig:
"""Training configuration."""
dt_range: list[str]
feature_path: str
kline_path: str
industry_path: Optional[str] = None
normalization_mode: str = 'dual'
model_type: str = 'xgb'
model_params: Optional[Dict[str, Any]] = None
positive_factor: Optional[float] = None
negative_factor: Optional[float] = None
def __post_init__(self):
if self.model_params is None:
self.model_params = {}
def parse_args() -> argparse.Namespace:
"""Parse command line arguments."""
parser = argparse.ArgumentParser(
description='Train stock 15-minute return prediction model'
)
parser.add_argument(
'--config', '-c',
type=str,
help='Path to config YAML file'
)
parser.add_argument(
'--output', '-o',
type=str,
default='results/stock_15m_experiment',
help='Output directory for results'
)
parser.add_argument(
'--dt-range',
nargs=2,
metavar=('START', 'END'),
help='Date range [start_date, end_date]'
)
parser.add_argument(
'--feature-path',
type=str,
help='Path to Alpha158 features parquet'
)
parser.add_argument(
'--kline-path',
type=str,
help='Path to kline data parquet'
)
parser.add_argument(
'--normalization-mode',
default='dual',
choices=['industry', 'cs_zscore', 'dual'],
help='Feature normalization mode'
)
return parser.parse_args()
def load_config(args: argparse.Namespace) -> TrainConfig:
"""Load configuration from args or YAML file."""
if args.config:
import yaml
with open(args.config) as f:
config_dict = yaml.safe_load(f)
return TrainConfig(**config_dict['data'], **config_dict.get('model', {}))
return TrainConfig(
dt_range=args.dt_range,
feature_path=args.feature_path,
kline_path=args.kline_path,
normalization_mode=args.normalization_mode
)
def train_model(
config: TrainConfig,
output_dir: Optional[str] = None
) -> tuple[xgb.Booster, Dict]:
"""Train stock 15m model with given configuration."""
logger.info(f"Loading dataset for range {config.dt_range}")
# Load dataset
loader = Stock15mLoader(
normalization_mode=config.normalization_mode,
positive_factor=config.positive_factor,
negative_factor=config.negative_factor
)
dataset = loader.load(
dt_range=config.dt_range,
feature_path=config.feature_path,
kline_path=config.kline_path,
industry_path=config.industry_path
)
feature_cols = dataset.features
logger.info(f"Loaded {len(feature_cols)} features")
# Extract data
X, y, w = dataset.to_numpy()
# Simple train/test split (80/20)
split_idx = int(len(X) * 0.8)
X_train, X_test = X[:split_idx], X[split_idx:]
y_train, y_test = y[:split_idx], y[split_idx:]
w_train = w[:split_idx] if w is not None else None
logger.info(f"Train size: {len(X_train)}, Test: {len(X_test)}")
# Train model
dtrain = xgb.DMatrix(X_train, label=y_train, weight=w_train)
dtest = xgb.DMatrix(X_test, label=y_test)
params = config.model_params or {
'objective': 'reg:squarederror',
'eval_metric': 'rmse',
'eta': 0.05,
'max_depth': 6,
'subsample': 0.8,
'colsample_bytree': 0.8,
'seed': 42
}
logger.info("Training XGBoost model...")
model = xgb.train(
params,
dtrain,
num_boost_round=500,
evals=[(dtrain, 'train'), (dtest, 'test')],
early_stopping_rounds=50,
verbose_eval=50
)
# Evaluate
predictions = model.predict(dtest)
ic = np.corrcoef(predictions, y_test)[0, 1]
r2 = r2_score(y_test, predictions)
metrics = {
'test_ic': ic,
'test_r2': r2,
}
logger.info(f"Test IC: {ic:.4f}, R²: {r2:.4f}")
# Save results
if output_dir:
output_path = Path(output_dir)
output_path.mkdir(parents=True, exist_ok=True)
model.save_model(str(output_path / 'model.json'))
with open(output_path / 'metrics.json', 'w') as f:
json.dump(metrics, f, indent=2)
with open(output_path / 'config.json', 'w') as f:
json.dump({
'dt_range': config.dt_range,
'normalization_mode': config.normalization_mode,
'model_type': config.model_type
}, f, indent=2)
logger.info(f"Results saved to {output_dir}")
return model, metrics
def main():
"""Main entry point."""
args = parse_args()
config = load_config(args)
model, metrics = train_model(config, output_dir=args.output)
logger.info("Training complete!")
if __name__ == '__main__':
main()
Loading…
Cancel
Save