270 lines
7.7 KiB
Python
Executable File
270 lines
7.7 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""Batch process OHLCV data: clean, filter, and save."""
|
|
|
|
import argparse
|
|
import sys
|
|
from pathlib import Path
|
|
|
|
# Add project root to path
|
|
project_root = Path(__file__).parent.parent
|
|
sys.path.insert(0, str(project_root))
|
|
|
|
from src.core.enums import Timeframe # noqa: E402
|
|
from src.data.database import get_db_session # noqa: E402
|
|
from src.data.loaders import load_and_preprocess # noqa: E402
|
|
from src.data.models import OHLCVData # noqa: E402
|
|
from src.data.repositories import OHLCVRepository # noqa: E402
|
|
from src.logging import get_logger # noqa: E402
|
|
|
|
logger = get_logger(__name__)
|
|
|
|
|
|
def process_file(
|
|
input_file: Path,
|
|
symbol: str,
|
|
timeframe: Timeframe,
|
|
output_dir: Path,
|
|
save_to_db: bool = False,
|
|
filter_session_hours: bool = True,
|
|
) -> None:
|
|
"""
|
|
Process a single data file.
|
|
|
|
Args:
|
|
input_file: Path to input file
|
|
symbol: Trading symbol
|
|
timeframe: Timeframe enum
|
|
output_dir: Output directory
|
|
save_to_db: Whether to save to database
|
|
filter_session_hours: Whether to filter to trading session (3-4 AM EST)
|
|
"""
|
|
logger.info(f"Processing file: {input_file}")
|
|
|
|
# Load and preprocess
|
|
df = load_and_preprocess(
|
|
str(input_file),
|
|
loader_type="auto",
|
|
validate=True,
|
|
preprocess=True,
|
|
filter_to_session=filter_session_hours,
|
|
)
|
|
|
|
# Ensure symbol and timeframe columns
|
|
df["symbol"] = symbol
|
|
df["timeframe"] = timeframe.value
|
|
|
|
# Save processed CSV
|
|
output_dir.mkdir(parents=True, exist_ok=True)
|
|
output_csv = output_dir / f"{symbol}_{timeframe.value}_processed.csv"
|
|
df.to_csv(output_csv, index=False)
|
|
logger.info(f"Saved processed CSV: {output_csv} ({len(df)} rows)")
|
|
|
|
# Save processed Parquet
|
|
output_parquet = output_dir / f"{symbol}_{timeframe.value}_processed.parquet"
|
|
df.to_parquet(output_parquet, index=False)
|
|
logger.info(f"Saved processed Parquet: {output_parquet} ({len(df)} rows)")
|
|
|
|
# Save to database if requested
|
|
if save_to_db:
|
|
logger.info("Saving to database...")
|
|
with get_db_session() as session:
|
|
repo = OHLCVRepository(session=session)
|
|
|
|
# Convert DataFrame to OHLCVData models
|
|
records = []
|
|
for _, row in df.iterrows():
|
|
# Check if record already exists
|
|
if repo.exists(symbol, timeframe, row["timestamp"]):
|
|
continue
|
|
|
|
record = OHLCVData(
|
|
symbol=symbol,
|
|
timeframe=timeframe,
|
|
timestamp=row["timestamp"],
|
|
open=row["open"],
|
|
high=row["high"],
|
|
low=row["low"],
|
|
close=row["close"],
|
|
volume=row.get("volume"),
|
|
)
|
|
records.append(record)
|
|
|
|
if records:
|
|
repo.create_batch(records)
|
|
logger.info(f"Saved {len(records)} records to database")
|
|
else:
|
|
logger.info("No new records to save (all already exist)")
|
|
|
|
|
|
def process_directory(
|
|
input_dir: Path,
|
|
output_dir: Path,
|
|
symbol: str = "DAX",
|
|
save_to_db: bool = False,
|
|
filter_session_hours: bool = True,
|
|
) -> None:
|
|
"""
|
|
Process all data files in a directory.
|
|
|
|
Args:
|
|
input_dir: Input directory
|
|
output_dir: Output directory
|
|
symbol: Trading symbol
|
|
save_to_db: Whether to save to database
|
|
filter_session_hours: Whether to filter to trading session
|
|
"""
|
|
# Find all CSV and Parquet files
|
|
files = list(input_dir.glob("*.csv")) + list(input_dir.glob("*.parquet"))
|
|
|
|
if not files:
|
|
logger.warning(f"No data files found in {input_dir}")
|
|
return
|
|
|
|
# Detect timeframe from directory name or file
|
|
timeframe_map = {
|
|
"1min": Timeframe.M1,
|
|
"5min": Timeframe.M5,
|
|
"15min": Timeframe.M15,
|
|
}
|
|
|
|
timeframe = None
|
|
for tf_name, tf_enum in timeframe_map.items():
|
|
if tf_name in str(input_dir):
|
|
timeframe = tf_enum
|
|
break
|
|
|
|
if timeframe is None:
|
|
logger.error(f"Could not determine timeframe from directory: {input_dir}")
|
|
return
|
|
|
|
logger.info(f"Processing {len(files)} files from {input_dir}")
|
|
|
|
for file_path in files:
|
|
try:
|
|
process_file(
|
|
file_path,
|
|
symbol,
|
|
timeframe,
|
|
output_dir,
|
|
save_to_db,
|
|
filter_session_hours,
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Failed to process {file_path}: {e}", exc_info=True)
|
|
continue
|
|
|
|
logger.info("Batch processing completed")
|
|
|
|
|
|
def main():
|
|
"""Main entry point."""
|
|
parser = argparse.ArgumentParser(
|
|
description="Batch process OHLCV data",
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
epilog="""
|
|
Examples:
|
|
# Process single file
|
|
python scripts/process_data.py --input data/raw/ohlcv/1min/m1.csv \\
|
|
--output data/processed/ --symbol DAX --timeframe 1min
|
|
|
|
# Process directory
|
|
python scripts/process_data.py --input data/raw/ohlcv/1min/ \\
|
|
--output data/processed/ --symbol DAX
|
|
|
|
# Process and save to database
|
|
python scripts/process_data.py --input data/raw/ohlcv/1min/ \\
|
|
--output data/processed/ --save-db
|
|
""",
|
|
)
|
|
|
|
parser.add_argument(
|
|
"--input",
|
|
type=str,
|
|
required=True,
|
|
help="Input file or directory",
|
|
)
|
|
parser.add_argument(
|
|
"--output",
|
|
type=str,
|
|
required=True,
|
|
help="Output directory",
|
|
)
|
|
parser.add_argument(
|
|
"--symbol",
|
|
type=str,
|
|
default="DAX",
|
|
help="Trading symbol (default: DAX)",
|
|
)
|
|
parser.add_argument(
|
|
"--timeframe",
|
|
type=str,
|
|
choices=["1min", "5min", "15min"],
|
|
help="Timeframe (required if processing single file)",
|
|
)
|
|
parser.add_argument(
|
|
"--save-db",
|
|
action="store_true",
|
|
help="Save processed data to database",
|
|
)
|
|
parser.add_argument(
|
|
"--no-session-filter",
|
|
action="store_true",
|
|
help="Don't filter to trading session hours (3-4 AM EST)",
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
|
|
try:
|
|
input_path = Path(args.input)
|
|
output_dir = Path(args.output)
|
|
|
|
if not input_path.exists():
|
|
logger.error(f"Input path does not exist: {input_path}")
|
|
return 1
|
|
|
|
# Process single file or directory
|
|
if input_path.is_file():
|
|
if not args.timeframe:
|
|
parser.error("--timeframe is required when processing a single file")
|
|
return 1
|
|
|
|
timeframe_map = {
|
|
"1min": Timeframe.M1,
|
|
"5min": Timeframe.M5,
|
|
"15min": Timeframe.M15,
|
|
}
|
|
timeframe = timeframe_map[args.timeframe]
|
|
|
|
process_file(
|
|
input_path,
|
|
args.symbol,
|
|
timeframe,
|
|
output_dir,
|
|
save_to_db=args.save_db,
|
|
filter_session_hours=not args.no_session_filter,
|
|
)
|
|
|
|
elif input_path.is_dir():
|
|
process_directory(
|
|
input_path,
|
|
output_dir,
|
|
symbol=args.symbol,
|
|
save_to_db=args.save_db,
|
|
filter_session_hours=not args.no_session_filter,
|
|
)
|
|
|
|
else:
|
|
logger.error(f"Input path is neither file nor directory: {input_path}")
|
|
return 1
|
|
|
|
logger.info("Data processing completed successfully")
|
|
return 0
|
|
|
|
except Exception as e:
|
|
logger.error(f"Data processing failed: {e}", exc_info=True)
|
|
return 1
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|