feat(v0.2.0): data pipeline

This commit is contained in:
0x_n3m0_
2026-01-05 11:34:18 +02:00
parent 2527938680
commit b5e7043df6
23 changed files with 2813 additions and 7 deletions

269
scripts/process_data.py Executable file
View File

@@ -0,0 +1,269 @@
#!/usr/bin/env python3
"""Batch process OHLCV data: clean, filter, and save."""
import argparse
import sys
from pathlib import Path
# Add project root to path
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root))
from src.core.enums import Timeframe # noqa: E402
from src.data.database import get_db_session # noqa: E402
from src.data.loaders import load_and_preprocess # noqa: E402
from src.data.models import OHLCVData # noqa: E402
from src.data.repositories import OHLCVRepository # noqa: E402
from src.logging import get_logger # noqa: E402
logger = get_logger(__name__)
def process_file(
input_file: Path,
symbol: str,
timeframe: Timeframe,
output_dir: Path,
save_to_db: bool = False,
filter_session_hours: bool = True,
) -> None:
"""
Process a single data file.
Args:
input_file: Path to input file
symbol: Trading symbol
timeframe: Timeframe enum
output_dir: Output directory
save_to_db: Whether to save to database
filter_session_hours: Whether to filter to trading session (3-4 AM EST)
"""
logger.info(f"Processing file: {input_file}")
# Load and preprocess
df = load_and_preprocess(
str(input_file),
loader_type="auto",
validate=True,
preprocess=True,
filter_to_session=filter_session_hours,
)
# Ensure symbol and timeframe columns
df["symbol"] = symbol
df["timeframe"] = timeframe.value
# Save processed CSV
output_dir.mkdir(parents=True, exist_ok=True)
output_csv = output_dir / f"{symbol}_{timeframe.value}_processed.csv"
df.to_csv(output_csv, index=False)
logger.info(f"Saved processed CSV: {output_csv} ({len(df)} rows)")
# Save processed Parquet
output_parquet = output_dir / f"{symbol}_{timeframe.value}_processed.parquet"
df.to_parquet(output_parquet, index=False)
logger.info(f"Saved processed Parquet: {output_parquet} ({len(df)} rows)")
# Save to database if requested
if save_to_db:
logger.info("Saving to database...")
with get_db_session() as session:
repo = OHLCVRepository(session=session)
# Convert DataFrame to OHLCVData models
records = []
for _, row in df.iterrows():
# Check if record already exists
if repo.exists(symbol, timeframe, row["timestamp"]):
continue
record = OHLCVData(
symbol=symbol,
timeframe=timeframe,
timestamp=row["timestamp"],
open=row["open"],
high=row["high"],
low=row["low"],
close=row["close"],
volume=row.get("volume"),
)
records.append(record)
if records:
repo.create_batch(records)
logger.info(f"Saved {len(records)} records to database")
else:
logger.info("No new records to save (all already exist)")
def process_directory(
input_dir: Path,
output_dir: Path,
symbol: str = "DAX",
save_to_db: bool = False,
filter_session_hours: bool = True,
) -> None:
"""
Process all data files in a directory.
Args:
input_dir: Input directory
output_dir: Output directory
symbol: Trading symbol
save_to_db: Whether to save to database
filter_session_hours: Whether to filter to trading session
"""
# Find all CSV and Parquet files
files = list(input_dir.glob("*.csv")) + list(input_dir.glob("*.parquet"))
if not files:
logger.warning(f"No data files found in {input_dir}")
return
# Detect timeframe from directory name or file
timeframe_map = {
"1min": Timeframe.M1,
"5min": Timeframe.M5,
"15min": Timeframe.M15,
}
timeframe = None
for tf_name, tf_enum in timeframe_map.items():
if tf_name in str(input_dir):
timeframe = tf_enum
break
if timeframe is None:
logger.error(f"Could not determine timeframe from directory: {input_dir}")
return
logger.info(f"Processing {len(files)} files from {input_dir}")
for file_path in files:
try:
process_file(
file_path,
symbol,
timeframe,
output_dir,
save_to_db,
filter_session_hours,
)
except Exception as e:
logger.error(f"Failed to process {file_path}: {e}", exc_info=True)
continue
logger.info("Batch processing completed")
def main():
"""Main entry point."""
parser = argparse.ArgumentParser(
description="Batch process OHLCV data",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Process single file
python scripts/process_data.py --input data/raw/ohlcv/1min/m1.csv \\
--output data/processed/ --symbol DAX --timeframe 1min
# Process directory
python scripts/process_data.py --input data/raw/ohlcv/1min/ \\
--output data/processed/ --symbol DAX
# Process and save to database
python scripts/process_data.py --input data/raw/ohlcv/1min/ \\
--output data/processed/ --save-db
""",
)
parser.add_argument(
"--input",
type=str,
required=True,
help="Input file or directory",
)
parser.add_argument(
"--output",
type=str,
required=True,
help="Output directory",
)
parser.add_argument(
"--symbol",
type=str,
default="DAX",
help="Trading symbol (default: DAX)",
)
parser.add_argument(
"--timeframe",
type=str,
choices=["1min", "5min", "15min"],
help="Timeframe (required if processing single file)",
)
parser.add_argument(
"--save-db",
action="store_true",
help="Save processed data to database",
)
parser.add_argument(
"--no-session-filter",
action="store_true",
help="Don't filter to trading session hours (3-4 AM EST)",
)
args = parser.parse_args()
try:
input_path = Path(args.input)
output_dir = Path(args.output)
if not input_path.exists():
logger.error(f"Input path does not exist: {input_path}")
return 1
# Process single file or directory
if input_path.is_file():
if not args.timeframe:
parser.error("--timeframe is required when processing a single file")
return 1
timeframe_map = {
"1min": Timeframe.M1,
"5min": Timeframe.M5,
"15min": Timeframe.M15,
}
timeframe = timeframe_map[args.timeframe]
process_file(
input_path,
args.symbol,
timeframe,
output_dir,
save_to_db=args.save_db,
filter_session_hours=not args.no_session_filter,
)
elif input_path.is_dir():
process_directory(
input_path,
output_dir,
symbol=args.symbol,
save_to_db=args.save_db,
filter_session_hours=not args.no_session_filter,
)
else:
logger.error(f"Input path is neither file nor directory: {input_path}")
return 1
logger.info("Data processing completed successfully")
return 0
except Exception as e:
logger.error(f"Data processing failed: {e}", exc_info=True)
return 1
if __name__ == "__main__":
sys.exit(main())