#!/usr/bin/env python """ Main pipeline orchestration script for Alpha158 0_7 vs 0_7_beta comparison. This script orchestrates the full workflow: 1. Generate beta embeddings from alpha158_0_7_beta factors 2. Fetch original 0_7 predictions from DolphinDB 3. Generate predictions using beta embeddings 4. Generate actual returns from kline data 5. Compare predictions (IC, RankIC, correlation, etc.) Usage: python pipeline.py --start-date 2019-01-01 --end-date 2020-11-30 --skip-embeddings --skip-fetch Arguments: --start-date: Start date for data loading (default: 2019-01-01) --end-date: End date for data loading (default: 2020-11-30) --skip-embeddings: Skip embeddings generation (use existing) --skip-fetch: Skip fetching original predictions (use existing) --skip-returns: Skip returns generation (use existing) --skip-comparison: Skip final comparison """ import os import sys import argparse from datetime import datetime from pathlib import Path # Add scripts directory to path sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'scripts')) def step_generate_embeddings(start_date: str, end_date: str, data_dir: str) -> bool: """Step 1: Generate beta embeddings.""" print("\n" + "=" * 70) print("STEP 1: Generate Beta Embeddings") print("=" * 70) embedding_file = os.path.join(data_dir, "embedding_0_7_beta.parquet") if os.path.exists(embedding_file): print(f"Embeddings file already exists: {embedding_file}") response = input("Regenerate? (y/N): ").strip().lower() if response != 'y': print("Skipping embeddings generation.") return True try: from generate_beta_embedding import generate_embeddings df = generate_embeddings( start_date=start_date, end_date=end_date, output_file=embedding_file, use_vae=True ) print(f"\nGenerated {len(df)} embeddings") return True except Exception as e: print(f"Error generating embeddings: {e}") import traceback traceback.print_exc() return False def step_fetch_predictions(start_date: str, end_date: str, data_dir: str) -> bool: """Step 2: Fetch original predictions from DolphinDB.""" print("\n" + "=" * 70) print("STEP 2: Fetch Original Predictions from DolphinDB") print("=" * 70) predictions_file = os.path.join(data_dir, "original_predictions_0_7.parquet") if os.path.exists(predictions_file): print(f"Predictions file already exists: {predictions_file}") response = input("Refetch? (y/N): ").strip().lower() if response != 'y': print("Skipping fetch.") return True try: from fetch_predictions import fetch_original_predictions df = fetch_original_predictions( start_date=start_date, end_date=end_date, output_file=predictions_file ) print(f"\nFetched {len(df)} predictions") return True except Exception as e: print(f"Error fetching predictions: {e}") import traceback traceback.print_exc() return False def step_generate_beta_predictions(data_dir: str) -> bool: """Step 3: Generate predictions using beta embeddings.""" print("\n" + "=" * 70) print("STEP 3: Generate Predictions with Beta Embeddings") print("=" * 70) embedding_file = os.path.join(data_dir, "embedding_0_7_beta.parquet") predictions_file = os.path.join(data_dir, "predictions_beta_embedding.parquet") if not os.path.exists(embedding_file): print(f"Embeddings file not found: {embedding_file}") print("Run step 1 first.") return False if os.path.exists(predictions_file): print(f"Beta predictions file already exists: {predictions_file}") response = input("Regenerate? (y/N): ").strip().lower() if response != 'y': print("Skipping prediction generation.") return True try: from predict_with_embedding import generate_predictions df = generate_predictions( embedding_file=embedding_file, output_file=predictions_file, seq_len=40, batch_size=1000 ) print(f"\nGenerated {len(df)} predictions") return True except Exception as e: print(f"Error generating predictions: {e}") import traceback traceback.print_exc() return False def step_generate_returns(data_dir: str) -> bool: """Step 4: Generate actual returns from kline data.""" print("\n" + "=" * 70) print("STEP 4: Generate Actual Returns") print("=" * 70) predictions_file = os.path.join(data_dir, "original_predictions_0_7.parquet") returns_file = os.path.join(data_dir, "actual_returns.parquet") if os.path.exists(returns_file): print(f"Returns file already exists: {returns_file}") response = input("Regenerate? (y/N): ").strip().lower() if response != 'y': print("Skipping returns generation.") return True try: from generate_returns import generate_real_returns_from_kline # Use prediction file to determine date range if available prediction_file = predictions_file if os.path.exists(predictions_file) else None df = generate_real_returns_from_kline( input_kline_path="/data/parquet/dataset/stg_1day_wind_kline_adjusted_1D/", prediction_file=prediction_file, output_file=returns_file, return_days=5 ) if df is not None: print(f"\nGenerated {len(df)} returns") return True else: print("\nFailed to generate returns") return False except Exception as e: print(f"Error generating returns: {e}") import traceback traceback.print_exc() return False def step_compare_predictions(data_dir: str) -> bool: """Step 5: Compare 0_7 vs 0_7_beta predictions.""" print("\n" + "=" * 70) print("STEP 5: Compare Predictions") print("=" * 70) required_files = [ os.path.join(data_dir, "original_predictions_0_7.parquet"), os.path.join(data_dir, "predictions_beta_embedding.parquet"), ] for f in required_files: if not os.path.exists(f): print(f"Required file not found: {f}") return False try: # Import and run comparison from compare_predictions import main as compare_main compare_main() return True except Exception as e: print(f"Error comparing predictions: {e}") import traceback traceback.print_exc() return False def main(): """Main pipeline orchestration.""" parser = argparse.ArgumentParser( description="Alpha158 0_7 vs 0_7_beta Comparison Pipeline" ) parser.add_argument( "--start-date", type=str, default="2019-01-01", help="Start date (YYYY-MM-DD)" ) parser.add_argument( "--end-date", type=str, default="2020-11-30", help="End date (YYYY-MM-DD)" ) parser.add_argument( "--skip-embeddings", action="store_true", help="Skip embeddings generation" ) parser.add_argument( "--skip-fetch", action="store_true", help="Skip fetching original predictions" ) parser.add_argument( "--skip-returns", action="store_true", help="Skip returns generation" ) parser.add_argument( "--skip-comparison", action="store_true", help="Skip final comparison" ) parser.add_argument( "--data-dir", type=str, default=None, help="Data directory (default: ./data)" ) args = parser.parse_args() # Determine data directory script_dir = os.path.dirname(os.path.abspath(__file__)) data_dir = args.data_dir or os.path.join(script_dir, "data") print("=" * 70) print("Alpha158 0_7 vs 0_7_beta Comparison Pipeline") print("=" * 70) print(f"Date range: {args.start_date} to {args.end_date}") print(f"Data directory: {data_dir}") # Ensure data directory exists os.makedirs(data_dir, exist_ok=True) # Track results results = {} # Step 1: Generate embeddings if not args.skip_embeddings: results['embeddings'] = step_generate_embeddings( args.start_date, args.end_date, data_dir ) else: print("\nSkipping embeddings generation (as requested)") results['embeddings'] = True # Step 2: Fetch original predictions if not args.skip_fetch: results['fetch'] = step_fetch_predictions( args.start_date, args.end_date, data_dir ) else: print("\nSkipping fetch (as requested)") results['fetch'] = True # Step 3: Generate beta predictions if results.get('embeddings', True): results['beta_predictions'] = step_generate_beta_predictions(data_dir) else: print("\nSkipping beta predictions (embeddings generation failed)") results['beta_predictions'] = False # Step 4: Generate returns if not args.skip_returns: results['returns'] = step_generate_returns(data_dir) else: print("\nSkipping returns generation (as requested)") results['returns'] = True # Step 5: Compare predictions if not args.skip_comparison: if all([ results.get('fetch', True), results.get('beta_predictions', True) ]): results['comparison'] = step_compare_predictions(data_dir) else: print("\nSkipping comparison (previous steps failed)") results['comparison'] = False else: print("\nSkipping comparison (as requested)") results['comparison'] = True # Summary print("\n" + "=" * 70) print("PIPELINE SUMMARY") print("=" * 70) for step, success in results.items(): status = "✓ PASSED" if success else "✗ FAILED" print(f" {step:20s}: {status}") all_passed = all(results.values()) print("=" * 70) if all_passed: print("Pipeline completed successfully!") else: print("Pipeline completed with errors.") sys.exit(1) if __name__ == "__main__": main()