Edwin Salguero
Initial commit: Enhanced Algorithmic Trading System with Synthetic Data Generation, Comprehensive Logging, and Extensive Testing
859af74
| import pandas as pd | |
| import logging | |
| import os | |
| from typing import Dict, Any | |
| from .synthetic_data_generator import SyntheticDataGenerator | |
| logger = logging.getLogger(__name__) | |
| def load_data(config: Dict[str, Any]) -> pd.DataFrame: | |
| """ | |
| Load market data from file or generate synthetic data if needed. | |
| Args: | |
| config: Configuration dictionary | |
| Returns: | |
| DataFrame with market data | |
| """ | |
| logger.info("Starting data ingestion process") | |
| try: | |
| data_source = config['data_source'] | |
| data_type = data_source['type'] | |
| if data_type == 'csv': | |
| return _load_csv_data(config) | |
| elif data_type == 'synthetic': | |
| return _generate_synthetic_data(config) | |
| else: | |
| raise ValueError(f"Unsupported data source type: {data_type}") | |
| except Exception as e: | |
| logger.error(f"Error in data ingestion: {e}", exc_info=True) | |
| raise | |
| def _load_csv_data(config: Dict[str, Any]) -> pd.DataFrame: | |
| """Load data from CSV file""" | |
| path = config['data_source']['path'] | |
| if not os.path.exists(path): | |
| logger.warning(f"CSV file not found at {path}, generating synthetic data instead") | |
| return _generate_synthetic_data(config) | |
| logger.info(f"Loading data from CSV: {path}") | |
| df = pd.read_csv(path) | |
| # Validate data | |
| required_columns = ['timestamp', 'open', 'high', 'low', 'close', 'volume'] | |
| missing_columns = [col for col in required_columns if col not in df.columns] | |
| if missing_columns: | |
| logger.warning(f"Missing columns in CSV: {missing_columns}") | |
| logger.info("Generating synthetic data instead") | |
| return _generate_synthetic_data(config) | |
| # Convert timestamp to datetime | |
| df['timestamp'] = pd.to_datetime(df['timestamp']) | |
| logger.info(f"Successfully loaded {len(df)} data points from CSV") | |
| return df | |
| def _generate_synthetic_data(config: Dict[str, Any]) -> pd.DataFrame: | |
| """Generate synthetic data using the SyntheticDataGenerator""" | |
| logger.info("Generating synthetic market data") | |
| try: | |
| # Create data directory if it doesn't exist | |
| data_path = config['synthetic_data']['data_path'] | |
| os.makedirs(os.path.dirname(data_path), exist_ok=True) | |
| # Initialize synthetic data generator | |
| generator = SyntheticDataGenerator(config) | |
| # Generate OHLCV data | |
| df = generator.generate_ohlcv_data( | |
| symbol=config['trading']['symbol'], | |
| start_date='2024-01-01', | |
| end_date='2024-12-31', | |
| frequency=config['trading']['timeframe'] | |
| ) | |
| # Save to CSV if configured | |
| if config['synthetic_data'].get('generate_data', True): | |
| generator.save_to_csv(df, data_path) | |
| logger.info(f"Saved synthetic data to {data_path}") | |
| return df | |
| except Exception as e: | |
| logger.error(f"Error generating synthetic data: {e}", exc_info=True) | |
| raise | |
| def validate_data(df: pd.DataFrame) -> bool: | |
| """ | |
| Validate the loaded data for required fields and data quality. | |
| Args: | |
| df: DataFrame to validate | |
| Returns: | |
| True if data is valid, False otherwise | |
| """ | |
| logger.info("Validating data quality") | |
| try: | |
| # Check for required columns | |
| required_columns = ['timestamp', 'open', 'high', 'low', 'close', 'volume'] | |
| missing_columns = [col for col in required_columns if col not in df.columns] | |
| if missing_columns: | |
| logger.error(f"Missing required columns: {missing_columns}") | |
| return False | |
| # Check for null values | |
| null_counts = df[required_columns].isnull().sum() | |
| if null_counts.sum() > 0: | |
| logger.warning(f"Found null values: {null_counts.to_dict()}") | |
| # Check for negative prices | |
| price_columns = ['open', 'high', 'low', 'close'] | |
| negative_prices = df[price_columns].lt(0).any().any() | |
| if negative_prices: | |
| logger.error("Found negative prices in data") | |
| return False | |
| # Check for negative volumes | |
| if (df['volume'] < 0).any(): | |
| logger.error("Found negative volumes in data") | |
| return False | |
| # Check OHLC consistency | |
| invalid_ohlc = ( | |
| (df['high'] < df['low']) | | |
| (df['open'] > df['high']) | | |
| (df['open'] < df['low']) | | |
| (df['close'] > df['high']) | | |
| (df['close'] < df['low']) | |
| ) | |
| if invalid_ohlc.any(): | |
| logger.error(f"Found {invalid_ohlc.sum()} rows with invalid OHLC data") | |
| return False | |
| logger.info("Data validation passed") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Error during data validation: {e}", exc_info=True) | |
| return False | |