From 966c17d7a90d7e0ee277a314cda301069b432c94 Mon Sep 17 00:00:00 2001 From: guofu Date: Fri, 13 Feb 2026 13:25:08 +0800 Subject: [PATCH] Consolidate Python code under src/ directories - Move cta_1d/loader.py, train.py, backtest.py to cta_1d/src/ - Move stock_15m/loader.py, train.py to stock_15m/src/ - Update root __init__.py files to re-export from src/ submodules - Update src/__init__.py files with proper public API exports - Update imports to use relative paths within src/ - Add missing create_experiment_dir export to common/__init__.py - Remove legacy/ directories with old pandas implementations - Remove test_new_loaders.py development test file Co-Authored-By: Claude Sonnet 4.5 --- common/__init__.py | 3 +- cta_1d/__init__.py | 75 +++++++ cta_1d/src/__init__.py | 25 ++- cta_1d/src/backtest.py | 197 +++++++++++++++++ cta_1d/src/loader.py | 448 ++++++++++++++++++++++++++++++++++++++ cta_1d/src/train.py | 286 ++++++++++++++++++++++++ stock_15m/__init__.py | 43 ++++ stock_15m/src/__init__.py | 9 +- stock_15m/src/loader.py | 389 +++++++++++++++++++++++++++++++++ stock_15m/src/train.py | 208 ++++++++++++++++++ 10 files changed, 1679 insertions(+), 4 deletions(-) create mode 100644 cta_1d/__init__.py create mode 100644 cta_1d/src/backtest.py create mode 100644 cta_1d/src/loader.py create mode 100644 cta_1d/src/train.py create mode 100644 stock_15m/__init__.py create mode 100644 stock_15m/src/loader.py create mode 100644 stock_15m/src/train.py diff --git a/common/__init__.py b/common/__init__.py index c749aa7..ddc9bcc 100644 --- a/common/__init__.py +++ b/common/__init__.py @@ -1,12 +1,13 @@ """Common utilities for alpha_lab experiments.""" -from .paths import ensure_dir, get_results_dir, get_task_results_dir +from .paths import ensure_dir, get_results_dir, get_task_results_dir, create_experiment_dir from .plotting import setup_plot_style, plot_ic_series, plot_cumulative_returns __all__ = [ 'ensure_dir', 'get_results_dir', 'get_task_results_dir', + 'create_experiment_dir', 'setup_plot_style', 'plot_ic_series', 'plot_cumulative_returns', diff --git a/cta_1d/__init__.py b/cta_1d/__init__.py new file mode 100644 index 0000000..f7c6774 --- /dev/null +++ b/cta_1d/__init__.py @@ -0,0 +1,75 @@ +""" +CTA 1-day return prediction experiments. + +This module provides dataset loading and experiment utilities for +CTA (Commodity Trading Advisor) 1-day return prediction. + +Example: + >>> from alpha_lab.cta_1d import CTA1DLoader + >>> + >>> loader = CTA1DLoader( + ... return_type='o2c_twap1min', + ... normalization='dual', + ... feature_sets=['alpha158', 'hffactor'] + ... ) + >>> dataset = loader.load(dt_range=['2020-01-01', '2023-12-31']) + >>> + >>> # Define train/test split + >>> dataset = dataset.with_segments({ + ... 'train': ('2020-01-01', '2022-12-31'), + ... 'test': ('2023-01-01', '2023-12-31') + ... }) + >>> + >>> # Extract training data + >>> X_train, y_train, w_train = dataset.split('train').to_numpy() + +Training: + >>> from alpha_lab.cta_1d import train_model, TrainConfig + >>> + >>> config = TrainConfig( + ... dt_range=['2020-01-01', '2023-12-31'], + ... feature_sets=['alpha158'], + ... normalization='dual' + ... ) + >>> model, metrics = train_model(config, output_dir='results/exp01') + +Backtesting: + >>> from alpha_lab.cta_1d import run_backtest, BacktestConfig + >>> + >>> config = BacktestConfig( + ... model_path='results/exp01/model.json', + ... dt_range=['2023-01-01', '2023-12-31'], + ... feature_sets=['alpha158'] + ... ) + >>> results = run_backtest(config) +""" + +# Re-export all public APIs from src submodules +from .src import ( + CTA1DLoader, + get_blend_weights, + describe_blend_config, + BLEND_CONFIGS, +) + +try: + from .src import train_model, TrainConfig + from .src import run_backtest, BacktestConfig + __all__ = [ + 'CTA1DLoader', + 'get_blend_weights', + 'describe_blend_config', + 'BLEND_CONFIGS', + 'train_model', + 'TrainConfig', + 'run_backtest', + 'BacktestConfig', + ] +except ImportError: + # xgboost or sklearn not installed + __all__ = [ + 'CTA1DLoader', + 'get_blend_weights', + 'describe_blend_config', + 'BLEND_CONFIGS', + ] diff --git a/cta_1d/src/__init__.py b/cta_1d/src/__init__.py index 615b18d..3aa7baf 100644 --- a/cta_1d/src/__init__.py +++ b/cta_1d/src/__init__.py @@ -1,5 +1,26 @@ """CTA 1-day task-specific utilities.""" -from .labels import get_blend_weights, describe_blend_config +from .loader import CTA1DLoader +from .labels import get_blend_weights, describe_blend_config, BLEND_CONFIGS -__all__ = ['get_blend_weights', 'describe_blend_config'] +try: + from .train import train_model, TrainConfig + from .backtest import run_backtest, BacktestConfig + __all__ = [ + 'CTA1DLoader', + 'train_model', + 'TrainConfig', + 'run_backtest', + 'BacktestConfig', + 'get_blend_weights', + 'describe_blend_config', + 'BLEND_CONFIGS', + ] +except ImportError: + # xgboost or sklearn not installed + __all__ = [ + 'CTA1DLoader', + 'get_blend_weights', + 'describe_blend_config', + 'BLEND_CONFIGS', + ] diff --git a/cta_1d/src/backtest.py b/cta_1d/src/backtest.py new file mode 100644 index 0000000..6b4ba1b --- /dev/null +++ b/cta_1d/src/backtest.py @@ -0,0 +1,197 @@ +""" +Backtest script for CTA 1-day return prediction models. + +Example usage: + python -m alpha_lab.cta_1d.backtest \ + --model results/experiment_01/model.json \ + --config config.yaml \ + --output results/backtest_01 + +Or programmatically: + from alpha_lab.cta_1d.backtest import run_backtest + + results = run_backtest( + model_path='results/exp01/model.json', + dt_range=['2023-01-01', '2023-12-31'], + num_trades=4 + ) +""" + +import argparse +import json +import logging +from pathlib import Path +from typing import Optional, Dict, Any, List +from dataclasses import dataclass + +import numpy as np +import pandas as pd +import xgboost as xgb + +from .loader import CTA1DLoader +from qshare.eval.cta.backtest import CTABacktester + + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +@dataclass +class BacktestConfig: + """Backtest configuration.""" + model_path: str + dt_range: list[str] + feature_sets: list[str] + normalization: str = 'dual' + num_trades: int = 4 + signal_dist: str = 'normal' + pos_weight: bool = True + output_dir: Optional[str] = None + + +def parse_args() -> argparse.Namespace: + """Parse command line arguments.""" + parser = argparse.ArgumentParser( + description='Backtest CTA 1-day return prediction model' + ) + parser.add_argument( + '--model', '-m', + type=str, + required=True, + help='Path to trained model JSON file' + ) + parser.add_argument( + '--output', '-o', + type=str, + default='results/cta_1d_backtest', + help='Output directory for results' + ) + parser.add_argument( + '--dt-range', + nargs=2, + metavar=('START', 'END'), + required=True, + help='Date range [start_date, end_date]' + ) + parser.add_argument( + '--feature-sets', + nargs='+', + default=['alpha158'], + help='Feature sets to use (must match training)' + ) + parser.add_argument( + '--normalization', + default='dual', + choices=['zscore', 'cs_zscore', 'rolling_20', 'rolling_60', 'dual'], + help='Label normalization (must match training)' + ) + parser.add_argument( + '--num-trades', + type=int, + default=4, + help='Number of trades per day' + ) + return parser.parse_args() + + +def load_model(model_path: str) -> xgb.Booster: + """Load trained XGBoost model.""" + model = xgb.Booster() + model.load_model(model_path) + return model + + +def run_backtest( + config: BacktestConfig +) -> Dict[str, Any]: + """ + Run backtest with given configuration. + + Args: + config: Backtest configuration + + Returns: + Dictionary with backtest results + """ + logger.info(f"Loading model from {config.model_path}") + model = load_model(config.model_path) + + logger.info(f"Loading dataset for range {config.dt_range}") + loader = CTA1DLoader( + return_type='o2c_twap1min', + normalization=config.normalization, + feature_sets=config.feature_sets + ) + + dataset = loader.load(dt_range=config.dt_range) + X, y, _ = dataset.to_numpy() + feature_cols = dataset.features + + logger.info(f"Loaded {len(X)} samples with {len(feature_cols)} features") + + # Generate predictions + dmatrix = xgb.DMatrix(X) + predictions = model.predict(dmatrix) + + # Run backtest + logger.info("Running backtest...") + backtester = CTABacktester( + num_trades=config.num_trades, + signal_dist=config.signal_dist, + pos_weight=config.pos_weight + ) + + results = backtester.run(y, predictions) + summary = backtester.summary() + + logger.info(f"Backtest Results:") + logger.info(f" IC: {summary.get('ic', 'N/A')}") + logger.info(f" Return: {summary.get('total_return', 'N/A')}") + logger.info(f" Sharpe: {summary.get('sharpe', 'N/A')}") + + # Save results if output_dir specified + if config.output_dir: + output_path = Path(config.output_dir) + output_path.mkdir(parents=True, exist_ok=True) + + # Save summary + with open(output_path / 'backtest_summary.json', 'w') as f: + json.dump(summary, f, indent=2) + + # Save predictions + pred_df = pd.DataFrame({ + 'actual': y, + 'predicted': predictions, + 'signal': backtester.signals if hasattr(backtester, 'signals') else predictions + }) + pred_df.to_csv(output_path / 'predictions.csv', index=False) + + logger.info(f"Results saved to {config.output_dir}") + + return { + 'summary': summary, + 'predictions': predictions, + 'actual': y + } + + +def main(): + """Main entry point.""" + args = parse_args() + + config = BacktestConfig( + model_path=args.model, + dt_range=args.dt_range, + feature_sets=args.feature_sets, + normalization=args.normalization, + num_trades=args.num_trades, + output_dir=args.output + ) + + results = run_backtest(config) + + logger.info("Backtest complete!") + + +if __name__ == '__main__': + main() diff --git a/cta_1d/src/loader.py b/cta_1d/src/loader.py new file mode 100644 index 0000000..2c0dc46 --- /dev/null +++ b/cta_1d/src/loader.py @@ -0,0 +1,448 @@ +""" +CTA 1-day return prediction dataset loader. + +Uses the new qshare.data framework with Dataset class and processors. +""" + +from dataclasses import dataclass +from datetime import date +from typing import List, Optional + +import numpy as np +import pandas as pd +import polars as pl + +from qshare.data import pl_Dataset, pl_pipe, pl_clip, pl_cs_zscore +from qshare.data.universal import DataSpec +from qshare.io.ddb import get_ddb_sess, reset_index_from_ddb +from qshare.config.research.cta.features import HFFACTOR_COLS + +from .labels import get_blend_weights + + +@dataclass +class CTA1DLoader: + """ + CTA 1-day return prediction dataset loader. + + Loads features (alpha158, hffactor), labels, and calculates weights + for CTA futures daily return prediction tasks. + + Example: + >>> loader = CTA1DLoader( + ... return_type='o2c_twap1min', + ... normalization='dual', + ... feature_sets=['alpha158', 'hffactor'] + ... ) + >>> dataset = loader.load(dt_range=['2020-01-01', '2023-12-31']) + >>> dataset = dataset.with_segments({ + ... 'train': ('2020-01-01', '2022-12-31'), + ... 'test': ('2023-01-01', '2023-12-31') + ... }) + >>> X_train, y_train, w_train = dataset.split('train').to_numpy() + """ + + return_type: str = 'o2c_twap1min' + normalization: str = 'dual' + feature_sets: List[str] = None + weight_factors: dict = None + blend_weights: str | List[float] | None = None + ddb_host: str = '192.168.1.146' + label_cap_upper: float = 0.5 + label_cap_lower: float = -0.5 + + def __post_init__(self): + if self.feature_sets is None: + self.feature_sets = ['alpha158', 'hffactor'] + if self.weight_factors is None: + self.weight_factors = {'positive': 1.0, 'negative': 2.0} + + def load( + self, + dt_range: List[str], + fit_range: Optional[List[str]] = None + ) -> pl_Dataset: + """ + Load and prepare CTA 1-day training dataset. + + Args: + dt_range: Date range [start_date, end_date] for dataset + fit_range: Date range [start, end] for fitting normalization params. + If None, uses first 60% of dt_range. + + Returns: + pl_Dataset with features, label, and weight columns + """ + start_date, end_date = dt_range + + if fit_range is None: + # Default: use first 60% for fit + all_dates = pd.date_range(start_date, end_date) + split_idx = int(len(all_dates) * 0.6) + fit_range = [ + all_dates[0].strftime('%Y-%m-%d'), + all_dates[split_idx].strftime('%Y-%m-%d') + ] + + # Load extended history for rolling normalization + load_start = (pd.to_datetime(start_date) - pd.Timedelta(days=120)).strftime('%Y-%m-%d') + + # Load features + df_features = self._load_features(load_start, end_date) + + # Load and normalize labels + df_label = self._load_labels(load_start, end_date, fit_range) + + # Combine + df = df_features.join(df_label, on=['datetime', 'instrument'], how='inner') + + # Filter to requested date range + df = df.filter( + (pl.col('datetime') >= start_date) & + (pl.col('datetime') <= end_date) + ) + + # Calculate weights + df = self._calculate_weights(df) + + # Clean data + df = self._clean_data(df) + + # Get feature columns + feature_cols = [c for c in df.columns + if any(c.startswith(prefix) for prefix in ['f_a158_', 'f_hf_'])] + + return pl_Dataset( + data=df, + features=feature_cols, + label='label', + weight='weight' if self.weight_factors else None + ) + + def _load_features(self, start_date: str, end_date: str) -> pl.DataFrame: + """Load feature data from DolphinDB.""" + sess = get_ddb_sess(host=self.ddb_host) + + try: + feature_dfs = [] + + if 'alpha158' in self.feature_sets: + df_alpha = self._load_alpha158(sess, start_date, end_date) + feature_dfs.append(df_alpha) + + if 'hffactor' in self.feature_sets: + df_hf = self._load_hffactor(sess, start_date, end_date) + feature_dfs.append(df_hf) + + # Join all feature sets + result = feature_dfs[0] + for df in feature_dfs[1:]: + result = result.join(df, on=['datetime', 'instrument'], how='inner') + + return result + + finally: + sess.close() + + def _load_alpha158(self, sess, start_date: str, end_date: str) -> pl.DataFrame: + """Load alpha158 features from DolphinDB.""" + since_ddb = pd.to_datetime(start_date).strftime('%Y.%m.%d') + + df = sess.run(f""" + select code, m_nDate, * + from loadTable('dfs://daily_stock_run', 'stg_1day_tinysoft_cta_alpha159_0_7_beta') + where m_nDate >= {since_ddb} + """) + + df = reset_index_from_ddb(df) + + # Drop non-numeric columns + if 'code_init' in df.columns: + df = df.drop(columns=['code_init']) + + # Convert to polars and add prefix + pl_df = pl.from_pandas(df.reset_index()) + pl_df = pl_df.rename({ + c: f'f_a158_{c}' + for c in pl_df.columns + if c not in ['datetime', 'instrument'] + }) + + return pl_df + + def _load_hffactor(self, sess, start_date: str, end_date: str) -> pl.DataFrame: + """Load hffactor features from DolphinDB.""" + since_ddb = pd.to_datetime(start_date).strftime('%Y.%m.%d') + + # Load from factor table + df = sess.run(f""" + select code, m_nDate, factor_name, value + from loadTable('dfs://daily_stock_run', 'stg_1day_tinysoft_cta_hffactor') + where m_nDate >= {since_ddb} + and factor_name in [{','.join([f"'{c}'" for c in HFFACTOR_COLS])}] + """) + + # Pivot to wide format + df = df.pivot_table( + index=['code', 'm_nDate'], + columns='factor_name', + values='value' + ).reset_index() + + df = reset_index_from_ddb(df) + + # Convert to polars and add prefix + pl_df = pl.from_pandas(df.reset_index()) + pl_df = pl_df.rename({ + c: f'f_hf_{c}' + for c in pl_df.columns + if c not in ['datetime', 'instrument'] + }) + + return pl_df + + def _load_labels( + self, + start_date: str, + end_date: str, + fit_range: List[str] + ) -> pl.DataFrame: + """Load and normalize labels.""" + sess = get_ddb_sess(host=self.ddb_host) + + try: + # Map return type to indicator name + indicator_map = { + 'o2c_twap1min': 'twap_open1m@1_twap_close1m@1', + 'o2o_twap1min': 'twap_open1m@1_twap_open1m@2', + } + indicator = indicator_map.get(self.return_type, self.return_type) + + since_ddb = pd.to_datetime(start_date).strftime('%Y.%m.%d') + + # Load dominant contract mapping + df_contract = sess.run(f""" + select first(code) as code, m_nDate, code_init + from loadTable('dfs://daily_stock_run', 'dwm_1day_cta_dom') + where m_nDate >= {since_ddb} and version='vp_csmax_roll2_cummax' + group by m_nDate, code_init + """) + + # Load returns + df_return = sess.run(f""" + select code, m_nDate, value as ret + from loadTable('dfs://daily_stock_run', 'stg_1day_tinysoft_cta_hfvalue') + where indicator='{indicator}' and m_nDate >= {since_ddb} + """) + + # Merge with dominant contract mapping + df_return = pd.merge( + left=df_return[['code', 'm_nDate', 'ret']], + right=df_contract, + on=['code', 'm_nDate'], + how='inner' + ) + + # Convert to index format + df_return['code'] = df_return['code_init'] + 'Ind' + df_return = df_return[['code', 'm_nDate', 'ret']] + df_return = reset_index_from_ddb(df_return) + + # Convert to polars + pl_df = pl.from_pandas(df_return.reset_index()) + + # Apply normalization + pl_df = self._normalize_label(pl_df, fit_range) + + return pl_df + + finally: + sess.close() + + def _normalize_label(self, pl_df: pl.DataFrame, fit_range: List[str]) -> pl.DataFrame: + """Apply specified normalization to label.""" + fit_start, fit_end = fit_range + + # Ensure datetime column is string for comparison + if pl_df['datetime'].dtype == pl.Date: + pl_df = pl_df.with_columns( + pl.col('datetime').dt.strftime('%Y-%m-%d').alias('datetime_str') + ) + date_col = 'datetime_str' + else: + date_col = 'datetime' + + if self.normalization == 'zscore': + # Calculate mean/std on fit range + fit_data = pl_df.filter( + (pl.col(date_col) >= fit_start) & + (pl.col(date_col) <= fit_end) + ) + mean = fit_data['ret'].mean() + std = fit_data['ret'].std() + + result = pl_df.with_columns( + ((pl.col('ret') - mean) / std).clip( + self.label_cap_lower, self.label_cap_upper + ).alias('label') + ).select(['datetime', 'instrument', 'label']) + return result + + elif self.normalization == 'cs_zscore': + # Cross-sectional z-score per datetime + return pl_df.with_columns( + ((pl.col('ret') - pl.col('ret').mean().over('datetime')) / + pl.col('ret').std().over('datetime')).clip( + self.label_cap_lower, self.label_cap_upper + ).alias('label') + ).select(['datetime', 'instrument', 'label']) + + elif self.normalization == 'rolling_20': + return self._apply_rolling_norm(pl_df, window=20, fit_range=fit_range) + + elif self.normalization == 'rolling_60': + return self._apply_rolling_norm(pl_df, window=60, fit_range=fit_range) + + elif self.normalization == 'dual': + # Create all normalization variants + label_zscore = self._normalize_zscore(pl_df, fit_range) + label_cszscore = self._normalize_cs_zscore(pl_df) + label_roll20 = self._normalize_rolling(pl_df, window=20, fit_range=fit_range) + label_roll60 = self._normalize_rolling(pl_df, window=60, fit_range=fit_range) + + # Get blend weights + weights = get_blend_weights(self.blend_weights) + + # Join and blend + pl_df = label_zscore.join(label_cszscore, on=['datetime', 'instrument']) + pl_df = pl_df.join(label_roll20, on=['datetime', 'instrument']) + pl_df = pl_df.join(label_roll60, on=['datetime', 'instrument']) + + return pl_df.with_columns( + (weights[0] * pl.col('label_zscore') + + weights[1] * pl.col('label_cszscore') + + weights[2] * pl.col('label_roll20') + + weights[3] * pl.col('label_roll60')).clip( + self.label_cap_lower, self.label_cap_upper + ).alias('label') + ).select(['datetime', 'instrument', 'label']) + + else: + raise ValueError(f"Unknown normalization: {self.normalization}") + + def _normalize_zscore(self, pl_df: pl.DataFrame, fit_range: List[str]) -> pl.DataFrame: + """Create z-score normalized label.""" + fit_start, fit_end = fit_range + + # Handle date type conversion for comparison + if pl_df['datetime'].dtype == pl.Date: + fit_data = pl_df.filter( + (pl.col('datetime').dt.strftime('%Y-%m-%d') >= fit_start) & + (pl.col('datetime').dt.strftime('%Y-%m-%d') <= fit_end) + ) + else: + fit_data = pl_df.filter( + (pl.col('datetime') >= fit_start) & + (pl.col('datetime') <= fit_end) + ) + + mean = fit_data['ret'].mean() + std = fit_data['ret'].std() + + return pl_df.with_columns( + ((pl.col('ret') - mean) / std).alias('label_zscore') + ).select(['datetime', 'instrument', 'label_zscore']) + + def _normalize_cs_zscore(self, pl_df: pl.DataFrame) -> pl.DataFrame: + """Create cross-sectional z-score normalized label.""" + return pl_df.with_columns( + ((pl.col('ret') - pl.col('ret').mean().over('datetime')) / + pl.col('ret').std().over('datetime')).alias('label_cszscore') + ).select(['datetime', 'instrument', 'label_cszscore']) + + def _normalize_rolling( + self, + pl_df: pl.DataFrame, + window: int, + fit_range: List[str] + ) -> pl.DataFrame: + """Create rolling window normalized label.""" + # Convert to pandas for rolling calculation + pd_df = pl_df.to_pandas().set_index(['datetime', 'instrument']) + + # Unstack to wide format + df_wide = pd_df['ret'].unstack('instrument') + + # Calculate rolling mean and std + rolling_mean = df_wide.rolling(window=window, min_periods=window//2).mean() + rolling_std = df_wide.rolling(window=window, min_periods=window//2).std() + + # Normalize + df_normalized = (df_wide - rolling_mean) / rolling_std + + # Restack + rolling_label = df_normalized.stack().reset_index() + rolling_label.columns = ['datetime', 'instrument', f'label_roll{window}'] + + return pl.from_pandas(rolling_label) + + def _apply_rolling_norm( + self, + pl_df: pl.DataFrame, + window: int, + fit_range: List[str] + ) -> pl.DataFrame: + """Apply rolling normalization and cap.""" + result = self._normalize_rolling(pl_df, window, fit_range) + return result.with_columns( + pl.col(f'label_roll{window}').clip( + self.label_cap_lower, self.label_cap_upper + ).alias('label') + ).select(['datetime', 'instrument', 'label']) + + def _calculate_weights(self, pl_df: pl.DataFrame) -> pl.DataFrame: + """Calculate sample weights based on return magnitude.""" + # Base weights by return magnitude tiers + pl_df = pl_df.with_columns( + pl.when(pl.col('label').abs() > 1.5).then(pl.lit(2.5)) + .when(pl.col('label').abs() > 1.0).then(pl.lit(2.0)) + .when(pl.col('label').abs() > 0.5).then(pl.lit(1.5)) + .when(pl.col('label').abs() > 0.2).then(pl.lit(1.0)) + .otherwise(0.0).alias('weight') + ) + + # Apply negative return multiplier + if self.weight_factors.get('negative'): + pl_df = pl_df.with_columns( + pl.when(pl.col('label') < -0.5) + .then(pl.col('weight') * self.weight_factors['negative']) + .otherwise(pl.col('weight')) + .alias('weight') + ) + + # Apply positive return multiplier + if self.weight_factors.get('positive'): + pl_df = pl_df.with_columns( + pl.when(pl.col('label') > 0.5) + .then(pl.col('weight') * self.weight_factors['positive']) + .otherwise(pl.col('weight')) + .alias('weight') + ) + + return pl_df + + def _clean_data(self, pl_df: pl.DataFrame) -> pl.DataFrame: + """Clean data: remove inf/nan values.""" + # Get numeric columns only + numeric_cols = [ + c for c in pl_df.columns + if pl_df[c].dtype in [pl.Float32, pl.Float64, pl.Int32, pl.Int64] + ] + + # Replace inf with null, then drop nulls + pl_df = pl_df.with_columns([ + pl.when(pl.col(c).is_infinite()).then(None).otherwise(pl.col(c)).alias(c) + for c in numeric_cols + ]) + + return pl_df.drop_nulls() diff --git a/cta_1d/src/train.py b/cta_1d/src/train.py new file mode 100644 index 0000000..274ea76 --- /dev/null +++ b/cta_1d/src/train.py @@ -0,0 +1,286 @@ +""" +Training script for CTA 1-day return prediction models. + +Example usage: + python -m alpha_lab.cta_1d.train \ + --config config.yaml \ + --output results/experiment_01 + +Or programmatically: + from alpha_lab.cta_1d.train import train_model + + model, metrics = train_model( + dt_range=['2020-01-01', '2023-12-31'], + feature_sets=['alpha158'], + normalization='dual', + model_type='xgb', + output_dir='results/exp01' + ) +""" + +import argparse +import json +import logging +from pathlib import Path +from typing import Optional, Dict, Any +from dataclasses import dataclass + +import numpy as np +import pandas as pd +from sklearn.metrics import r2_score +import xgboost as xgb + +from .loader import CTA1DLoader + + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +@dataclass +class TrainConfig: + """Training configuration.""" + dt_range: list[str] + feature_sets: list[str] + normalization: str = 'dual' + blend_weights: Optional[str] = None + model_type: str = 'xgb' + model_params: Optional[Dict[str, Any]] = None + segments: Optional[Dict[str, tuple]] = None + + def __post_init__(self): + if self.model_params is None: + self.model_params = {} + if self.segments is None: + # Default: 60% train, 20% valid, 20% test + self.segments = { + 'train': (self.dt_range[0], '2022-06-30'), + 'valid': ('2022-07-01', '2022-12-31'), + 'test': ('2023-01-01', self.dt_range[1]) + } + + +def parse_args() -> argparse.Namespace: + """Parse command line arguments.""" + parser = argparse.ArgumentParser( + description='Train CTA 1-day return prediction model' + ) + parser.add_argument( + '--config', '-c', + type=str, + help='Path to config YAML file' + ) + parser.add_argument( + '--output', '-o', + type=str, + default='results/cta_1d_experiment', + help='Output directory for results' + ) + parser.add_argument( + '--dt-range', + nargs=2, + metavar=('START', 'END'), + help='Date range [start_date, end_date]' + ) + parser.add_argument( + '--feature-sets', + nargs='+', + default=['alpha158'], + help='Feature sets to use' + ) + parser.add_argument( + '--normalization', + default='dual', + choices=['zscore', 'cs_zscore', 'rolling_20', 'rolling_60', 'dual'], + help='Label normalization method' + ) + parser.add_argument( + '--model-type', + default='xgb', + choices=['xgb', 'linear'], + help='Model type' + ) + return parser.parse_args() + + +def load_config(args: argparse.Namespace) -> TrainConfig: + """Load configuration from args or YAML file.""" + if args.config: + import yaml + with open(args.config) as f: + config_dict = yaml.safe_load(f) + return TrainConfig(**config_dict) + + # Create config from CLI args + return TrainConfig( + dt_range=args.dt_range, + feature_sets=args.feature_sets, + normalization=args.normalization, + model_type=args.model_type + ) + + +def train_xgb_model( + X_train: np.ndarray, + y_train: np.ndarray, + w_train: Optional[np.ndarray], + X_valid: np.ndarray, + y_valid: np.ndarray, + params: Optional[Dict] = None +) -> xgb.Booster: + """Train XGBoost model.""" + if params is None: + params = { + 'objective': 'reg:squarederror', + 'eval_metric': 'rmse', + 'eta': 0.05, + 'max_depth': 6, + 'subsample': 0.8, + 'colsample_bytree': 0.8, + 'seed': 42 + } + + dtrain = xgb.DMatrix(X_train, label=y_train, weight=w_train) + dvalid = xgb.DMatrix(X_valid, label=y_valid) + + logger.info("Training XGBoost model...") + model = xgb.train( + params, + dtrain, + num_boost_round=500, + evals=[(dtrain, 'train'), (dvalid, 'valid')], + early_stopping_rounds=50, + verbose_eval=50 + ) + + return model + + +def evaluate_model( + model: xgb.Booster, + X: np.ndarray, + y: np.ndarray, + dataset_name: str = 'dataset' +) -> Dict[str, float]: + """Evaluate model and return metrics.""" + dmatrix = xgb.DMatrix(X) + predictions = model.predict(dmatrix) + + # Calculate metrics + ic = np.corrcoef(predictions, y)[0, 1] + r2 = r2_score(y, predictions) + + metrics = { + f'{dataset_name}_ic': ic, + f'{dataset_name}_r2': r2, + } + + logger.info(f"{dataset_name} - IC: {ic:.4f}, R²: {r2:.4f}") + return metrics + + +def train_model( + config: TrainConfig, + output_dir: Optional[str] = None +) -> tuple[xgb.Booster, Dict]: + """ + Train CTA model with given configuration. + + Args: + config: Training configuration + output_dir: Directory to save results (optional) + + Returns: + Tuple of (trained_model, metrics_dict) + """ + logger.info(f"Loading dataset for range {config.dt_range}") + + # Load dataset + loader = CTA1DLoader( + return_type='o2c_twap1min', + normalization=config.normalization, + feature_sets=config.feature_sets, + blend_weights=config.blend_weights + ) + + dataset = loader.load(dt_range=config.dt_range) + dataset = dataset.with_segments(config.segments) + + feature_cols = dataset.features + logger.info(f"Loaded {len(feature_cols)} features") + + # Extract data for each split + train_data = dataset.split('train') + valid_data = dataset.split('valid') + test_data = dataset.split('test') + + X_train, y_train, w_train = train_data.to_numpy() + X_valid, y_valid, _ = valid_data.to_numpy() + X_test, y_test, _ = test_data.to_numpy() + + logger.info(f"Train size: {len(X_train)}, Valid: {len(X_valid)}, Test: {len(X_test)}") + + # Train model + if config.model_type == 'xgb': + model = train_xgb_model( + X_train, y_train, w_train, + X_valid, y_valid, + config.model_params + ) + else: + raise ValueError(f"Unsupported model type: {config.model_type}") + + # Evaluate + metrics = {} + metrics.update(evaluate_model(model, X_train, y_train, 'train')) + metrics.update(evaluate_model(model, X_valid, y_valid, 'valid')) + metrics.update(evaluate_model(model, X_test, y_test, 'test')) + + # Save results if output_dir specified + if output_dir: + output_path = Path(output_dir) + output_path.mkdir(parents=True, exist_ok=True) + + # Save model + model.save_model(str(output_path / 'model.json')) + + # Save metrics + with open(output_path / 'metrics.json', 'w') as f: + json.dump(metrics, f, indent=2) + + # Save config + with open(output_path / 'config.json', 'w') as f: + json.dump({ + 'dt_range': config.dt_range, + 'feature_sets': config.feature_sets, + 'normalization': config.normalization, + 'model_type': config.model_type, + 'segments': config.segments + }, f, indent=2) + + # Save feature importance + importance = model.get_score(importance_type='gain') + importance_df = pd.DataFrame([ + {'feature': feature_cols[int(k[1:])], 'importance': v} + for k, v in importance.items() + ]).sort_values('importance', ascending=False) + importance_df.to_csv(output_path / 'feature_importance.csv', index=False) + + logger.info(f"Results saved to {output_dir}") + + return model, metrics + + +def main(): + """Main entry point.""" + args = parse_args() + config = load_config(args) + + model, metrics = train_model(config, output_dir=args.output) + + logger.info("Training complete!") + logger.info(f"Final metrics: {metrics}") + + +if __name__ == '__main__': + main() diff --git a/stock_15m/__init__.py b/stock_15m/__init__.py new file mode 100644 index 0000000..5add8c3 --- /dev/null +++ b/stock_15m/__init__.py @@ -0,0 +1,43 @@ +""" +Stock 15-minute return prediction experiments. + +This module provides dataset loading and experiment utilities for +stock 15-minute (intraday) return prediction. + +Example: + >>> from alpha_lab.stock_15m import Stock15mLoader + >>> + >>> loader = Stock15mLoader( + ... normalization_mode='dual', + ... positive_factor=1.0, + ... negative_factor=2.0 + ... ) + >>> dataset = loader.load( + ... dt_range=['2020-01-01', '2023-12-31'], + ... feature_path='/data/parquet/stock_1min_alpha158', + ... kline_path='/data/parquet/stock_1min_kline' + ... ) + >>> + >>> # Extract training data + >>> X_train, y_train, w_train = dataset.to_numpy() + +Training: + >>> from alpha_lab.stock_15m import train_model, TrainConfig + >>> + >>> config = TrainConfig( + ... dt_range=['2020-01-01', '2023-12-31'], + ... feature_path='/data/parquet/stock_1min_alpha158', + ... kline_path='/data/parquet/stock_1min_kline' + ... ) + >>> model, metrics = train_model(config, output_dir='results/exp01') +""" + +# Re-export all public APIs from src submodules +from .src import Stock15mLoader + +try: + from .src import train_model, TrainConfig + __all__ = ['Stock15mLoader', 'train_model', 'TrainConfig'] +except ImportError: + # xgboost or sklearn not installed + __all__ = ['Stock15mLoader'] diff --git a/stock_15m/src/__init__.py b/stock_15m/src/__init__.py index 7177588..e48f85f 100644 --- a/stock_15m/src/__init__.py +++ b/stock_15m/src/__init__.py @@ -1,3 +1,10 @@ """Stock 15m task-specific utilities.""" -# Add task-specific functions here as needed +from .loader import Stock15mLoader + +try: + from .train import train_model, TrainConfig + __all__ = ['Stock15mLoader', 'train_model', 'TrainConfig'] +except ImportError: + # xgboost or sklearn not installed + __all__ = ['Stock15mLoader'] diff --git a/stock_15m/src/loader.py b/stock_15m/src/loader.py new file mode 100644 index 0000000..a0b7cda --- /dev/null +++ b/stock_15m/src/loader.py @@ -0,0 +1,389 @@ +""" +Stock 15-minute return prediction dataset loader. + +Uses the new qshare.data framework with Dataset class and processors. +""" + +from dataclasses import dataclass +from typing import List, Optional + +import numpy as np +import polars as pl + +from qshare.data import pl_Dataset, pl_pipe, pl_clip, pl_cs_zscore +from qshare.io.polars import load_from_pq +from qshare.algo.polars import ntrl_by_idx, cs_zscore +from qshare.algo.polars.limit import detect_limit_stopping + + +# Column definitions +COLS_INDEX_1MIN = ['datetime', 'instrument', 'm_nTime'] +COLS_INDEX_1DAY = ['datetime', 'instrument'] + + +@dataclass +class Stock15mLoader: + """ + Stock 15-minute return prediction dataset loader. + + Loads Alpha158 features at 1-minute frequency and constructs 15-minute + forward returns for stock return prediction tasks. + + Example: + >>> loader = Stock15mLoader( + ... normalization_mode='dual', + ... positive_factor=1.0, + ... negative_factor=2.0 + ... ) + >>> dataset = loader.load( + ... dt_range=['2020-01-01', '2023-12-31'], + ... feature_path='/data/parquet/stock_1min_alpha158', + ... kline_path='/data/parquet/stock_1min_kline' + ... ) + """ + + normalization_mode: str = 'industry' + positive_factor: Optional[float] = None + negative_factor: Optional[float] = None + label_cap_upper: float = 0.5 + label_cap_lower: float = -0.5 + + def __post_init__(self): + valid_modes = ['industry', 'cs_zscore', 'dual'] + if self.normalization_mode not in valid_modes: + raise ValueError(f"normalization_mode must be one of {valid_modes}") + + def load( + self, + dt_range: List[str], + feature_path: str, + kline_path: str, + industry_path: Optional[str] = None + ) -> pl_Dataset: + """ + Load and prepare 15-minute return training dataset. + + Args: + dt_range: Date range [start_date, end_date] + feature_path: Path to Alpha158 features parquet + kline_path: Path to kline data parquet + industry_path: Path to industry index data (optional) + + Returns: + pl_Dataset with features, label, and weight columns + """ + # Load data + pl_ldf_a158 = self._load_features(feature_path, dt_range) + pl_ldf_kline_1min = self._load_kline(kline_path, '1min', dt_range) + pl_ldf_kline_1day = self._load_kline(kline_path, '1day', dt_range) + + if industry_path: + pl_ldf_indus = self._load_industry(industry_path, dt_range) + else: + pl_ldf_indus = None + + # Get feature columns from schema + a158_schema = pl_ldf_a158.collect_schema()['a158'] + cols_a158 = [f.name for f in a158_schema.fields] + + # Determine feature columns and target name based on mode + if self.normalization_mode == 'dual': + cols_feature = ( + cols_a158 + + [f"{name}_ntrl" for name in cols_a158] + + ['dist_to_limit', 'dist_to_stopping', 'timestep'] + ) + target_col = 'return_15min_dual' + else: + cols_feature = cols_a158 + ['dist_to_limit', 'dist_to_stopping', 'timestep'] + target_col = ( + 'return_15min_ntrlz_1min' + if self.normalization_mode == 'industry' + else 'return_15min_csz' + ) + + # Compute limit/stopping features + pl_ldf_limit = self._compute_limit_features(pl_ldf_kline_1day, pl_ldf_kline_1min) + + # Calculate forward returns + pl_ldf_return = self._calculate_returns(pl_ldf_kline_1min) + + # Normalize returns + pl_ldf_return = self._normalize_returns(pl_ldf_return, pl_ldf_indus, target_col) + + # Cap returns at limits + pl_ldf_return = self._cap_returns(pl_ldf_return.join(pl_ldf_limit, on=COLS_INDEX_1MIN), target_col) + + # Normalize features + pl_ldf_a158 = self._normalize_features(pl_ldf_a158, pl_ldf_indus) + + # Create extra features + pl_ldf_extra = self._create_extra_features(pl_ldf_limit, pl_ldf_kline_1min) + + # Join features with extra + pl_ldf_features = self._join_features(pl_ldf_a158, pl_ldf_extra) + + # Join everything + pl_df = pl_ldf_features.join( + pl_ldf_return.select(COLS_INDEX_1MIN + [target_col]), + on=COLS_INDEX_1MIN, + how='inner' + ).collect() + + # Clean data + pl_df = self._clean_data(pl_df, cols_feature) + + # Calculate weights + pl_df = self._calculate_weights(pl_df, target_col) + + return pl_Dataset( + data=pl_df, + features=cols_feature, + label=target_col, + weight='weight' if self.positive_factor or self.negative_factor else None + ) + + def _load_features(self, path: str, dt_range: List[str]) -> pl.LazyFrame: + """Load Alpha158 features.""" + return load_from_pq( + path=path, + table_alias='a158', + start_time=dt_range[0], + as_struct=True + ) + + def _load_kline(self, path: str, freq: str, dt_range: List[str]) -> pl.LazyFrame: + """Load kline data.""" + return load_from_pq( + path=path, + table_alias=f'kline_{freq}', + start_time=dt_range[0] + ) + + def _load_industry(self, path: str, dt_range: List[str]) -> pl.LazyFrame: + """Load industry index data.""" + return load_from_pq( + path=path, + table_alias='indus_idx', + start_time=dt_range[0] + ) + + def _compute_limit_features( + self, + pl_ldf_kline_1day: pl.LazyFrame, + pl_ldf_kline_1min: pl.LazyFrame + ) -> pl.LazyFrame: + """Compute limit/stopping features.""" + pl_ldf_limit = detect_limit_stopping( + pl_ldf_kline_1day=pl_ldf_kline_1day, + pl_ldf_kline_1min=pl_ldf_kline_1min, + ) + + return pl_ldf_limit.select( + COLS_INDEX_1MIN, + (pl.col('Limit') - pl.col('return_from_yclose')).alias('dist_to_limit'), + (pl.col('return_from_yclose') - pl.col('Stopping')).alias('dist_to_stopping'), + pl.col(['meet_limit', 'meet_stopping']).shift(-16).over( + ['datetime', 'instrument'], order_by='m_nTime' + ).fill_null(pl.lit(False)) + ) + + def _calculate_returns(self, pl_ldf_kline_1min: pl.LazyFrame) -> pl.LazyFrame: + """Calculate 15-minute forward returns: close[t+16] / close[t+1] - 1.""" + return pl_ldf_kline_1min.select( + COLS_INDEX_1MIN, + pl.struct( + (pl.col('close').shift(-16) / pl.col('close').shift(-1) - 1) + .over(['datetime', 'instrument'], order_by='m_nTime') + .alias('return_15min') + ).alias('return') + ) + + def _normalize_returns( + self, + pl_ldf_return: pl.LazyFrame, + pl_ldf_indus: Optional[pl.LazyFrame], + target_col: str + ) -> pl.LazyFrame: + """Normalize returns based on mode.""" + if self.normalization_mode == 'cs_zscore': + return pl_ldf_return.with_columns( + ((pl.col('return').struct.field('return_15min') - + pl.col('return').struct.field('return_15min').mean()) / + pl.col('return').struct.field('return_15min').std()) + .over(['datetime', 'm_nTime']).alias(target_col) + ) + elif self.normalization_mode == 'dual': + # Create both normalizations + pl_ldf_csz = pl_ldf_return.with_columns( + ((pl.col('return').struct.field('return_15min') - + pl.col('return').struct.field('return_15min').mean()) / + pl.col('return').struct.field('return_15min').std()) + .over(['datetime', 'm_nTime']).alias('return_15min_csz') + ) + + if pl_ldf_indus is not None: + pl_ldf_ntrl = pl_ldf_return.join( + pl_ldf_indus, on=COLS_INDEX_1DAY, how='inner' + ).pipe( + ntrl_by_idx, + input_col='return', + idx_col='indus_idx', + partition_cols=['datetime', 'm_nTime'] + ).unnest('return').rename({'return_15min': 'return_15min_ntrlz_1min'}) + + pl_ldf_csz = pl_ldf_csz.join( + pl_ldf_ntrl, on=COLS_INDEX_1MIN, how='inner' + ) + + return pl_ldf_csz.with_columns( + (0.8 * pl.col('return_15min_ntrlz_1min') + + 0.2 * pl.col('return_15min_csz')).alias(target_col) + ) + else: # industry + if pl_ldf_indus is None: + raise ValueError("industry_path required for industry normalization") + + return pl_ldf_return.join( + pl_ldf_indus, on=COLS_INDEX_1DAY, how='inner' + ).pipe( + ntrl_by_idx, + input_col='return', + idx_col='indus_idx', + partition_cols=['datetime', 'm_nTime'] + ).unnest('return').rename({'return_15min': target_col}) + + def _cap_returns( + self, + pl_ldf_return: pl.LazyFrame, + target_col: str + ) -> pl.LazyFrame: + """Cap returns at limit up/down.""" + return pl_ldf_return.with_columns( + pl.when(pl.col('meet_limit')) + .then(pl.lit(self.label_cap_upper)) + .when(pl.col('meet_stopping')) + .then(pl.lit(self.label_cap_lower)) + .otherwise(pl.col(target_col)) + .alias(target_col) + ) + + def _normalize_features( + self, + pl_ldf_a158: pl.LazyFrame, + pl_ldf_indus: Optional[pl.LazyFrame] + ) -> pl.LazyFrame: + """Normalize Alpha158 features.""" + if self.normalization_mode == 'cs_zscore': + return pl_ldf_a158.collect().lazy().pipe( + cs_zscore, + input_col='a158', + partition_cols=['datetime', 'm_nTime'] + ).unnest('a158') + elif self.normalization_mode == 'dual': + # Create both normalizations + pl_ldf_csz = pl_ldf_a158.collect().lazy().pipe( + cs_zscore, + input_col='a158', + partition_cols=['datetime', 'm_nTime'] + ).unnest('a158') + + if pl_ldf_indus is not None: + pl_ldf_ntrl = pl_ldf_a158.join( + pl_ldf_indus, on=COLS_INDEX_1DAY, how='inner' + ).collect().lazy().pipe( + ntrl_by_idx, + input_col='a158', + idx_col='indus_idx', + partition_cols=['datetime', 'm_nTime'] + ).unnest('a158').with_columns( + pl.all().name.suffix('_ntrl') + ) + return pl_ldf_csz.join(pl_ldf_ntrl, on=COLS_INDEX_1MIN, how='inner') + return pl_ldf_csz + else: # industry + if pl_ldf_indus is None: + raise ValueError("industry_path required for industry normalization") + + return pl_ldf_a158.join( + pl_ldf_indus, on=COLS_INDEX_1DAY, how='inner' + ).collect().lazy().pipe( + ntrl_by_idx, + input_col='a158', + idx_col='indus_idx', + partition_cols=['datetime', 'm_nTime'] + ).unnest('a158') + + def _create_extra_features( + self, + pl_ldf_limit: pl.LazyFrame, + pl_ldf_kline_1min: pl.LazyFrame + ) -> pl.DataFrame: + """Create extra features: timestep.""" + # Infer timestep from m_nTime + pl_df_timestep = pl_ldf_kline_1min.select( + pl.col('m_nTime'), + ((pl.col('m_nTime').rank(method='ordinal').over(['datetime', 'instrument']) - 1) / 239) + .alias('timestep') + ).unique('m_nTime').sort('m_nTime').collect() + + return pl_ldf_limit.select( + COLS_INDEX_1MIN, + pl.col(['dist_to_limit', 'dist_to_stopping']) + ).join( + pl_df_timestep, + on='m_nTime', + how='inner' + ) + + def _join_features( + self, + pl_ldf_a158: pl.LazyFrame, + pl_ldf_extra: pl.DataFrame + ) -> pl.LazyFrame: + """Join features with extra features.""" + if self.normalization_mode != 'dual': + return pl_ldf_a158.join( + pl_ldf_extra, on=COLS_INDEX_1MIN, how='inner' + ).unnest('a158') + else: + return pl_ldf_a158.join( + pl_ldf_extra, on=COLS_INDEX_1MIN, how='inner' + ) + + def _clean_data(self, pl_df: pl.DataFrame, cols_feature: List[str]) -> pl.DataFrame: + """Clean data: remove inf/nan values.""" + pl_df = pl_df.with_columns( + pl.col(cols_feature).replace([np.inf, -np.inf], None) + ).drop_nulls().fill_nan(0.) + return pl_df + + def _calculate_weights(self, pl_df: pl.DataFrame, target_col: str) -> pl.DataFrame: + """Calculate sample weights.""" + # Base weights by return magnitude tiers + pl_df = pl_df.with_columns( + pl.when(pl.col(target_col).abs() > 1.5).then(pl.lit(2.5)) + .when(pl.col(target_col).abs() > 1.0).then(pl.lit(2.0)) + .when(pl.col(target_col).abs() > 0.5).then(pl.lit(1.5)) + .when(pl.col(target_col).abs() > 0.2).then(pl.lit(1.0)) + .otherwise(0.0).alias('weight') + ) + + # Apply multipliers + if self.negative_factor: + pl_df = pl_df.with_columns( + pl.when(pl.col(target_col) < -0.5) + .then(pl.col('weight') * self.negative_factor) + .otherwise(pl.col('weight')) + .alias('weight') + ) + + if self.positive_factor: + pl_df = pl_df.with_columns( + pl.when(pl.col(target_col) > 0.5) + .then(pl.col('weight') * self.positive_factor) + .otherwise(pl.col('weight')) + .alias('weight') + ) + + return pl_df diff --git a/stock_15m/src/train.py b/stock_15m/src/train.py new file mode 100644 index 0000000..586b219 --- /dev/null +++ b/stock_15m/src/train.py @@ -0,0 +1,208 @@ +""" +Training script for stock 15-minute return prediction models. + +Example usage: + python -m alpha_lab.stock_15m.train \ + --config config.yaml \ + --output results/stock_15m_exp01 +""" + +import argparse +import json +import logging +from pathlib import Path +from typing import Optional, Dict, Any +from dataclasses import dataclass + +import numpy as np +import pandas as pd +from sklearn.metrics import r2_score +import xgboost as xgb + +from .loader import Stock15mLoader + + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +@dataclass +class TrainConfig: + """Training configuration.""" + dt_range: list[str] + feature_path: str + kline_path: str + industry_path: Optional[str] = None + normalization_mode: str = 'dual' + model_type: str = 'xgb' + model_params: Optional[Dict[str, Any]] = None + positive_factor: Optional[float] = None + negative_factor: Optional[float] = None + + def __post_init__(self): + if self.model_params is None: + self.model_params = {} + + +def parse_args() -> argparse.Namespace: + """Parse command line arguments.""" + parser = argparse.ArgumentParser( + description='Train stock 15-minute return prediction model' + ) + parser.add_argument( + '--config', '-c', + type=str, + help='Path to config YAML file' + ) + parser.add_argument( + '--output', '-o', + type=str, + default='results/stock_15m_experiment', + help='Output directory for results' + ) + parser.add_argument( + '--dt-range', + nargs=2, + metavar=('START', 'END'), + help='Date range [start_date, end_date]' + ) + parser.add_argument( + '--feature-path', + type=str, + help='Path to Alpha158 features parquet' + ) + parser.add_argument( + '--kline-path', + type=str, + help='Path to kline data parquet' + ) + parser.add_argument( + '--normalization-mode', + default='dual', + choices=['industry', 'cs_zscore', 'dual'], + help='Feature normalization mode' + ) + return parser.parse_args() + + +def load_config(args: argparse.Namespace) -> TrainConfig: + """Load configuration from args or YAML file.""" + if args.config: + import yaml + with open(args.config) as f: + config_dict = yaml.safe_load(f) + return TrainConfig(**config_dict['data'], **config_dict.get('model', {})) + + return TrainConfig( + dt_range=args.dt_range, + feature_path=args.feature_path, + kline_path=args.kline_path, + normalization_mode=args.normalization_mode + ) + + +def train_model( + config: TrainConfig, + output_dir: Optional[str] = None +) -> tuple[xgb.Booster, Dict]: + """Train stock 15m model with given configuration.""" + logger.info(f"Loading dataset for range {config.dt_range}") + + # Load dataset + loader = Stock15mLoader( + normalization_mode=config.normalization_mode, + positive_factor=config.positive_factor, + negative_factor=config.negative_factor + ) + + dataset = loader.load( + dt_range=config.dt_range, + feature_path=config.feature_path, + kline_path=config.kline_path, + industry_path=config.industry_path + ) + + feature_cols = dataset.features + logger.info(f"Loaded {len(feature_cols)} features") + + # Extract data + X, y, w = dataset.to_numpy() + + # Simple train/test split (80/20) + split_idx = int(len(X) * 0.8) + X_train, X_test = X[:split_idx], X[split_idx:] + y_train, y_test = y[:split_idx], y[split_idx:] + w_train = w[:split_idx] if w is not None else None + + logger.info(f"Train size: {len(X_train)}, Test: {len(X_test)}") + + # Train model + dtrain = xgb.DMatrix(X_train, label=y_train, weight=w_train) + dtest = xgb.DMatrix(X_test, label=y_test) + + params = config.model_params or { + 'objective': 'reg:squarederror', + 'eval_metric': 'rmse', + 'eta': 0.05, + 'max_depth': 6, + 'subsample': 0.8, + 'colsample_bytree': 0.8, + 'seed': 42 + } + + logger.info("Training XGBoost model...") + model = xgb.train( + params, + dtrain, + num_boost_round=500, + evals=[(dtrain, 'train'), (dtest, 'test')], + early_stopping_rounds=50, + verbose_eval=50 + ) + + # Evaluate + predictions = model.predict(dtest) + ic = np.corrcoef(predictions, y_test)[0, 1] + r2 = r2_score(y_test, predictions) + + metrics = { + 'test_ic': ic, + 'test_r2': r2, + } + + logger.info(f"Test IC: {ic:.4f}, R²: {r2:.4f}") + + # Save results + if output_dir: + output_path = Path(output_dir) + output_path.mkdir(parents=True, exist_ok=True) + + model.save_model(str(output_path / 'model.json')) + + with open(output_path / 'metrics.json', 'w') as f: + json.dump(metrics, f, indent=2) + + with open(output_path / 'config.json', 'w') as f: + json.dump({ + 'dt_range': config.dt_range, + 'normalization_mode': config.normalization_mode, + 'model_type': config.model_type + }, f, indent=2) + + logger.info(f"Results saved to {output_dir}") + + return model, metrics + + +def main(): + """Main entry point.""" + args = parse_args() + config = load_config(args) + + model, metrics = train_model(config, output_dir=args.output) + + logger.info("Training complete!") + + +if __name__ == '__main__': + main()