Files
dax-ml/scripts/validate_data_pipeline.py

315 lines
8.5 KiB
Python
Executable File

#!/usr/bin/env python3
"""Validate data pipeline implementation (v0.2.0)."""
import argparse
import sys
from pathlib import Path
# Add project root to path
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root))
from src.logging import get_logger # noqa: E402
logger = get_logger(__name__)
def validate_imports():
"""Validate that all data pipeline modules can be imported."""
logger.info("Validating imports...")
try:
# Database
from src.data.database import get_engine, get_session, init_database # noqa: F401
# Loaders
from src.data.loaders import ( # noqa: F401
CSVLoader,
DatabaseLoader,
ParquetLoader,
load_and_preprocess,
)
# Models
from src.data.models import ( # noqa: F401
DetectedPattern,
OHLCVData,
PatternLabel,
SetupLabel,
Trade,
)
# Preprocessors
from src.data.preprocessors import ( # noqa: F401
filter_session,
handle_missing_data,
remove_duplicates,
)
# Repositories
from src.data.repositories import ( # noqa: F401
OHLCVRepository,
PatternRepository,
Repository,
)
# Schemas
from src.data.schemas import OHLCVSchema, PatternSchema # noqa: F401
# Validators
from src.data.validators import ( # noqa: F401
check_continuity,
detect_outliers,
validate_ohlcv,
)
logger.info("✅ All imports successful")
return True
except Exception as e:
logger.error(f"❌ Import validation failed: {e}", exc_info=True)
return False
def validate_database():
"""Validate database connection and tables."""
logger.info("Validating database...")
try:
from src.data.database import get_engine, init_database
# Initialize database
init_database(create_tables=True)
# Check engine
engine = get_engine()
if engine is None:
raise RuntimeError("Failed to get database engine")
# Check connection
with engine.connect():
logger.debug("Database connection successful")
logger.info("✅ Database validation successful")
return True
except Exception as e:
logger.error(f"❌ Database validation failed: {e}", exc_info=True)
return False
def validate_loaders():
"""Validate data loaders with sample data."""
logger.info("Validating data loaders...")
try:
from src.core.enums import Timeframe
from src.data.loaders import CSVLoader
# Check for sample data
sample_file = project_root / "tests" / "fixtures" / "sample_data" / "sample_ohlcv.csv"
if not sample_file.exists():
logger.warning(f"Sample file not found: {sample_file}")
return True # Not critical
# Load sample data
loader = CSVLoader()
df = loader.load(str(sample_file), symbol="TEST", timeframe=Timeframe.M1)
if df.empty:
raise RuntimeError("Loaded DataFrame is empty")
logger.info(f"✅ Data loaders validated (loaded {len(df)} rows)")
return True
except Exception as e:
logger.error(f"❌ Data loader validation failed: {e}", exc_info=True)
return False
def validate_preprocessors():
"""Validate data preprocessors."""
logger.info("Validating preprocessors...")
try:
import numpy as np
import pandas as pd
from src.data.preprocessors import handle_missing_data, remove_duplicates
# Create test data with issues
df = pd.DataFrame(
{
"timestamp": pd.date_range("2024-01-01", periods=10, freq="1min"),
"value": [1, 2, np.nan, 4, 5, 5, 7, 8, 9, 10],
}
)
# Test missing data handling
df_clean = handle_missing_data(df.copy(), method="forward_fill")
if df_clean["value"].isna().any():
raise RuntimeError("Missing data not handled correctly")
# Test duplicate removal
df_nodup = remove_duplicates(df.copy())
if len(df_nodup) >= len(df):
logger.warning("No duplicates found (expected for test data)")
logger.info("✅ Preprocessors validated")
return True
except Exception as e:
logger.error(f"❌ Preprocessor validation failed: {e}", exc_info=True)
return False
def validate_validators():
"""Validate data validators."""
logger.info("Validating validators...")
try:
import pandas as pd
from src.data.validators import validate_ohlcv
# Create valid test data
df = pd.DataFrame(
{
"timestamp": pd.date_range("2024-01-01", periods=10, freq="1min"),
"open": [100.0] * 10,
"high": [100.5] * 10,
"low": [99.5] * 10,
"close": [100.2] * 10,
"volume": [1000] * 10,
}
)
# Validate
df_validated = validate_ohlcv(df)
if df_validated.empty:
raise RuntimeError("Validation removed all data")
logger.info("✅ Validators validated")
return True
except Exception as e:
logger.error(f"❌ Validator validation failed: {e}", exc_info=True)
return False
def validate_directories():
"""Validate required directory structure."""
logger.info("Validating directory structure...")
required_dirs = [
"data/raw/ohlcv/1min",
"data/raw/ohlcv/5min",
"data/raw/ohlcv/15min",
"data/processed/features",
"data/processed/patterns",
"data/processed/snapshots",
"data/labels/individual_patterns",
"data/labels/complete_setups",
"data/labels/anchors",
"data/screenshots/patterns",
"data/screenshots/setups",
]
missing = []
for dir_path in required_dirs:
full_path = project_root / dir_path
if not full_path.exists():
missing.append(dir_path)
if missing:
logger.error(f"❌ Missing directories: {missing}")
return False
logger.info("✅ All required directories exist")
return True
def validate_scripts():
"""Validate that utility scripts exist."""
logger.info("Validating utility scripts...")
required_scripts = [
"scripts/setup_database.py",
"scripts/download_data.py",
"scripts/process_data.py",
]
missing = []
for script_path in required_scripts:
full_path = project_root / script_path
if not full_path.exists():
missing.append(script_path)
if missing:
logger.error(f"❌ Missing scripts: {missing}")
return False
logger.info("✅ All required scripts exist")
return True
def main():
"""Main entry point."""
parser = argparse.ArgumentParser(description="Validate data pipeline implementation")
parser.add_argument(
"--verbose",
"-v",
action="store_true",
help="Enable verbose logging",
)
parser.add_argument(
"--quick",
action="store_true",
help="Skip detailed validations (imports and directories only)",
)
args = parser.parse_args()
print("\n" + "=" * 70)
print("Data Pipeline Validation (v0.2.0)")
print("=" * 70 + "\n")
results = []
# Always run these
results.append(("Imports", validate_imports()))
results.append(("Directory Structure", validate_directories()))
results.append(("Scripts", validate_scripts()))
# Detailed validations
if not args.quick:
results.append(("Database", validate_database()))
results.append(("Loaders", validate_loaders()))
results.append(("Preprocessors", validate_preprocessors()))
results.append(("Validators", validate_validators()))
# Summary
print("\n" + "=" * 70)
print("Validation Summary")
print("=" * 70)
for name, passed in results:
status = "✅ PASS" if passed else "❌ FAIL"
print(f"{status:12} {name}")
total = len(results)
passed = sum(1 for _, p in results if p)
print(f"\nTotal: {passed}/{total} checks passed")
if passed == total:
print("\n🎉 All validations passed! v0.2.0 Data Pipeline is complete.")
return 0
else:
print("\n⚠️ Some validations failed. Please review the errors above.")
return 1
if __name__ == "__main__":
sys.exit(main())