#!/usr/bin/env python3 """Batch process OHLCV data: clean, filter, and save.""" import argparse import sys from pathlib import Path # Add project root to path project_root = Path(__file__).parent.parent sys.path.insert(0, str(project_root)) from src.core.enums import Timeframe # noqa: E402 from src.data.database import get_db_session # noqa: E402 from src.data.loaders import load_and_preprocess # noqa: E402 from src.data.models import OHLCVData # noqa: E402 from src.data.repositories import OHLCVRepository # noqa: E402 from src.logging import get_logger # noqa: E402 logger = get_logger(__name__) def process_file( input_file: Path, symbol: str, timeframe: Timeframe, output_dir: Path, save_to_db: bool = False, filter_session_hours: bool = True, ) -> None: """ Process a single data file. Args: input_file: Path to input file symbol: Trading symbol timeframe: Timeframe enum output_dir: Output directory save_to_db: Whether to save to database filter_session_hours: Whether to filter to trading session (3-4 AM EST) """ logger.info(f"Processing file: {input_file}") # Load and preprocess df = load_and_preprocess( str(input_file), loader_type="auto", validate=True, preprocess=True, filter_to_session=filter_session_hours, ) # Ensure symbol and timeframe columns df["symbol"] = symbol df["timeframe"] = timeframe.value # Save processed CSV output_dir.mkdir(parents=True, exist_ok=True) output_csv = output_dir / f"{symbol}_{timeframe.value}_processed.csv" df.to_csv(output_csv, index=False) logger.info(f"Saved processed CSV: {output_csv} ({len(df)} rows)") # Save processed Parquet output_parquet = output_dir / f"{symbol}_{timeframe.value}_processed.parquet" df.to_parquet(output_parquet, index=False) logger.info(f"Saved processed Parquet: {output_parquet} ({len(df)} rows)") # Save to database if requested if save_to_db: logger.info("Saving to database...") with get_db_session() as session: repo = OHLCVRepository(session=session) # Convert DataFrame to OHLCVData models records = [] for _, row in df.iterrows(): # Check if record already exists if repo.exists(symbol, timeframe, row["timestamp"]): continue record = OHLCVData( symbol=symbol, timeframe=timeframe, timestamp=row["timestamp"], open=row["open"], high=row["high"], low=row["low"], close=row["close"], volume=row.get("volume"), ) records.append(record) if records: repo.create_batch(records) logger.info(f"Saved {len(records)} records to database") else: logger.info("No new records to save (all already exist)") def process_directory( input_dir: Path, output_dir: Path, symbol: str = "DAX", save_to_db: bool = False, filter_session_hours: bool = True, ) -> None: """ Process all data files in a directory. Args: input_dir: Input directory output_dir: Output directory symbol: Trading symbol save_to_db: Whether to save to database filter_session_hours: Whether to filter to trading session """ # Find all CSV and Parquet files files = list(input_dir.glob("*.csv")) + list(input_dir.glob("*.parquet")) if not files: logger.warning(f"No data files found in {input_dir}") return # Detect timeframe from directory name or file timeframe_map = { "1min": Timeframe.M1, "5min": Timeframe.M5, "15min": Timeframe.M15, } timeframe = None for tf_name, tf_enum in timeframe_map.items(): if tf_name in str(input_dir): timeframe = tf_enum break if timeframe is None: logger.error(f"Could not determine timeframe from directory: {input_dir}") return logger.info(f"Processing {len(files)} files from {input_dir}") for file_path in files: try: process_file( file_path, symbol, timeframe, output_dir, save_to_db, filter_session_hours, ) except Exception as e: logger.error(f"Failed to process {file_path}: {e}", exc_info=True) continue logger.info("Batch processing completed") def main(): """Main entry point.""" parser = argparse.ArgumentParser( description="Batch process OHLCV data", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: # Process single file python scripts/process_data.py --input data/raw/ohlcv/1min/m1.csv \\ --output data/processed/ --symbol DAX --timeframe 1min # Process directory python scripts/process_data.py --input data/raw/ohlcv/1min/ \\ --output data/processed/ --symbol DAX # Process and save to database python scripts/process_data.py --input data/raw/ohlcv/1min/ \\ --output data/processed/ --save-db """, ) parser.add_argument( "--input", type=str, required=True, help="Input file or directory", ) parser.add_argument( "--output", type=str, required=True, help="Output directory", ) parser.add_argument( "--symbol", type=str, default="DAX", help="Trading symbol (default: DAX)", ) parser.add_argument( "--timeframe", type=str, choices=["1min", "5min", "15min"], help="Timeframe (required if processing single file)", ) parser.add_argument( "--save-db", action="store_true", help="Save processed data to database", ) parser.add_argument( "--no-session-filter", action="store_true", help="Don't filter to trading session hours (3-4 AM EST)", ) args = parser.parse_args() try: input_path = Path(args.input) output_dir = Path(args.output) if not input_path.exists(): logger.error(f"Input path does not exist: {input_path}") return 1 # Process single file or directory if input_path.is_file(): if not args.timeframe: parser.error("--timeframe is required when processing a single file") return 1 timeframe_map = { "1min": Timeframe.M1, "5min": Timeframe.M5, "15min": Timeframe.M15, } timeframe = timeframe_map[args.timeframe] process_file( input_path, args.symbol, timeframe, output_dir, save_to_db=args.save_db, filter_session_hours=not args.no_session_filter, ) elif input_path.is_dir(): process_directory( input_path, output_dir, symbol=args.symbol, save_to_db=args.save_db, filter_session_hours=not args.no_session_filter, ) else: logger.error(f"Input path is neither file nor directory: {input_path}") return 1 logger.info("Data processing completed successfully") return 0 except Exception as e: logger.error(f"Data processing failed: {e}", exc_info=True) return 1 if __name__ == "__main__": sys.exit(main())