"""
AIVA Data Lake Architecture - Petabyte-Scale Knowledge Management
=================================================================

A comprehensive Iceberg/Delta Lake-style data lake implementation for AIVA,
supporting petabyte-scale knowledge storage with time travel, schema evolution,
compaction, and smart partitioning.

Components:
- DataLakeManager: Core orchestration for Iceberg/Delta Lake-style operations
- PartitionStrategy: Smart partitioning schemes for optimal query performance
- CompactionEngine: Background compaction for storage optimization
- TimeTravel: Query historical states with snapshot isolation
- SchemaEvolution: Handle schema changes without data loss
- MetadataCatalog: Centralized metadata management

Author: Genesis AIVA System
Version: 1.0.0
"""

import os
import json
import uuid
import hashlib
import threading
import time
import logging
import asyncio
from abc import ABC, abstractmethod
from dataclasses import dataclass, field, asdict
from datetime import datetime, timedelta
from enum import Enum
from pathlib import Path
from typing import (
    Any, Dict, List, Optional, Set, Tuple, Union, Callable, Iterator, TypeVar, Generic
)
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
from collections import defaultdict
import struct
import pickle
import gzip
import shutil

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)


# ============================================================================
# ENUMS AND CONSTANTS
# ============================================================================

class PartitionType(Enum):
    """Partition strategy types."""
    HASH = "hash"
    RANGE = "range"
    LIST = "list"
    COMPOSITE = "composite"
    TIME_BASED = "time_based"
    IDENTITY = "identity"


class FileFormat(Enum):
    """Supported file formats."""
    PARQUET = "parquet"
    ORC = "orc"
    AVRO = "avro"
    JSON = "json"
    PICKLE = "pickle"


class SchemaAction(Enum):
    """Schema evolution actions."""
    ADD_COLUMN = "add_column"
    DROP_COLUMN = "drop_column"
    RENAME_COLUMN = "rename_column"
    CHANGE_TYPE = "change_type"
    CHANGE_NULLABILITY = "change_nullability"


class CompactionStrategy(Enum):
    """Compaction strategies."""
    SIZE_BASED = "size_based"
    TIME_BASED = "time_based"
    PARTITION_BASED = "partition_based"
    HYBRID = "hybrid"


class SnapshotOperation(Enum):
    """Snapshot operation types."""
    APPEND = "append"
    OVERWRITE = "overwrite"
    DELETE = "delete"
    MERGE = "merge"


# ============================================================================
# DATA CLASSES
# ============================================================================

@dataclass
class ColumnSchema:
    """Schema definition for a column."""
    name: str
    data_type: str
    nullable: bool = True
    default_value: Any = None
    metadata: Dict[str, Any] = field(default_factory=dict)

    def to_dict(self) -> Dict[str, Any]:
        return {
            "name": self.name,
            "data_type": self.data_type,
            "nullable": self.nullable,
            "default_value": self.default_value,
            "metadata": self.metadata
        }

    @classmethod
    def from_dict(cls, data: Dict[str, Any]) -> 'ColumnSchema':
        return cls(**data)


@dataclass
class TableSchema:
    """Schema definition for a table."""
    schema_id: str = field(default_factory=lambda: str(uuid.uuid4()))
    columns: List[ColumnSchema] = field(default_factory=list)
    partition_columns: List[str] = field(default_factory=list)
    sort_columns: List[str] = field(default_factory=list)
    primary_key: Optional[List[str]] = None
    created_at: datetime = field(default_factory=datetime.utcnow)
    version: int = 1

    def add_column(self, column: ColumnSchema) -> None:
        """Add a new column to the schema."""
        if any(c.name == column.name for c in self.columns):
            raise ValueError(f"Column {column.name} already exists")
        self.columns.append(column)
        self.version += 1

    def drop_column(self, column_name: str) -> None:
        """Drop a column from the schema."""
        self.columns = [c for c in self.columns if c.name != column_name]
        self.version += 1

    def get_column(self, name: str) -> Optional[ColumnSchema]:
        """Get column by name."""
        for col in self.columns:
            if col.name == name:
                return col
        return None

    def to_dict(self) -> Dict[str, Any]:
        return {
            "schema_id": self.schema_id,
            "columns": [c.to_dict() for c in self.columns],
            "partition_columns": self.partition_columns,
            "sort_columns": self.sort_columns,
            "primary_key": self.primary_key,
            "created_at": self.created_at.isoformat(),
            "version": self.version
        }


@dataclass
class DataFile:
    """Represents a data file in the lake."""
    file_id: str = field(default_factory=lambda: str(uuid.uuid4()))
    file_path: str = ""
    file_format: FileFormat = FileFormat.PARQUET
    file_size_bytes: int = 0
    record_count: int = 0
    partition_values: Dict[str, Any] = field(default_factory=dict)
    column_stats: Dict[str, Dict[str, Any]] = field(default_factory=dict)
    created_at: datetime = field(default_factory=datetime.utcnow)
    checksum: Optional[str] = None

    def compute_checksum(self, content: bytes) -> str:
        """Compute file checksum."""
        self.checksum = hashlib.sha256(content).hexdigest()
        return self.checksum

    def to_dict(self) -> Dict[str, Any]:
        return {
            "file_id": self.file_id,
            "file_path": self.file_path,
            "file_format": self.file_format.value,
            "file_size_bytes": self.file_size_bytes,
            "record_count": self.record_count,
            "partition_values": self.partition_values,
            "column_stats": self.column_stats,
            "created_at": self.created_at.isoformat(),
            "checksum": self.checksum
        }


@dataclass
class Snapshot:
    """Represents a point-in-time snapshot of the table."""
    snapshot_id: str = field(default_factory=lambda: str(uuid.uuid4()))
    parent_snapshot_id: Optional[str] = None
    timestamp: datetime = field(default_factory=datetime.utcnow)
    operation: SnapshotOperation = SnapshotOperation.APPEND
    schema_id: str = ""
    manifest_list: List[str] = field(default_factory=list)
    summary: Dict[str, Any] = field(default_factory=dict)

    def to_dict(self) -> Dict[str, Any]:
        return {
            "snapshot_id": self.snapshot_id,
            "parent_snapshot_id": self.parent_snapshot_id,
            "timestamp": self.timestamp.isoformat(),
            "operation": self.operation.value,
            "schema_id": self.schema_id,
            "manifest_list": self.manifest_list,
            "summary": self.summary
        }

    @classmethod
    def from_dict(cls, data: Dict[str, Any]) -> 'Snapshot':
        return cls(
            snapshot_id=data["snapshot_id"],
            parent_snapshot_id=data.get("parent_snapshot_id"),
            timestamp=datetime.fromisoformat(data["timestamp"]),
            operation=SnapshotOperation(data["operation"]),
            schema_id=data["schema_id"],
            manifest_list=data["manifest_list"],
            summary=data.get("summary", {})
        )


@dataclass
class ManifestEntry:
    """Entry in a manifest file."""
    status: str  # "added", "existing", "deleted"
    data_file: DataFile
    snapshot_id: str
    sequence_number: int = 0


@dataclass
class PartitionSpec:
    """Partition specification."""
    spec_id: str = field(default_factory=lambda: str(uuid.uuid4()))
    fields: List[Dict[str, Any]] = field(default_factory=list)

    def add_field(self, source_column: str, transform: str, name: Optional[str] = None) -> None:
        """Add a partition field."""
        self.fields.append({
            "source_column": source_column,
            "transform": transform,
            "name": name or source_column
        })


# ============================================================================
# PARTITION STRATEGY
# ============================================================================

class PartitionStrategy:
    """
    Smart partitioning schemes for optimal query performance.

    Supports multiple partition types:
    - Hash partitioning for even distribution
    - Range partitioning for ordered data
    - List partitioning for categorical data
    - Time-based partitioning for temporal data
    - Composite partitioning combining multiple strategies
    """

    def __init__(
        self,
        partition_type: PartitionType = PartitionType.HASH,
        num_partitions: int = 256,
        partition_columns: Optional[List[str]] = None,
        range_bounds: Optional[List[Any]] = None,
        list_values: Optional[Dict[str, List[Any]]] = None,
        time_granularity: str = "day"
    ):
        self.partition_type = partition_type
        self.num_partitions = num_partitions
        self.partition_columns = partition_columns or []
        self.range_bounds = range_bounds or []
        self.list_values = list_values or {}
        self.time_granularity = time_granularity
        self._partition_stats: Dict[str, int] = defaultdict(int)

    def compute_partition(self, record: Dict[str, Any]) -> str:
        """Compute the partition key for a record."""
        if self.partition_type == PartitionType.HASH:
            return self._hash_partition(record)
        elif self.partition_type == PartitionType.RANGE:
            return self._range_partition(record)
        elif self.partition_type == PartitionType.LIST:
            return self._list_partition(record)
        elif self.partition_type == PartitionType.TIME_BASED:
            return self._time_partition(record)
        elif self.partition_type == PartitionType.COMPOSITE:
            return self._composite_partition(record)
        else:
            return self._identity_partition(record)

    def _hash_partition(self, record: Dict[str, Any]) -> str:
        """Hash-based partitioning."""
        values = tuple(record.get(col) for col in self.partition_columns)
        hash_value = hash(values) % self.num_partitions
        partition_key = f"partition_{hash_value:04d}"
        self._partition_stats[partition_key] += 1
        return partition_key

    def _range_partition(self, record: Dict[str, Any]) -> str:
        """Range-based partitioning."""
        if not self.partition_columns or not self.range_bounds:
            return "partition_default"

        value = record.get(self.partition_columns[0])
        for i, bound in enumerate(self.range_bounds):
            if value < bound:
                partition_key = f"partition_range_{i:04d}"
                self._partition_stats[partition_key] += 1
                return partition_key

        partition_key = f"partition_range_{len(self.range_bounds):04d}"
        self._partition_stats[partition_key] += 1
        return partition_key

    def _list_partition(self, record: Dict[str, Any]) -> str:
        """List-based partitioning for categorical data."""
        if not self.partition_columns:
            return "partition_default"

        col = self.partition_columns[0]
        value = record.get(col)

        for partition_name, values in self.list_values.items():
            if value in values:
                self._partition_stats[partition_name] += 1
                return partition_name

        partition_key = "partition_other"
        self._partition_stats[partition_key] += 1
        return partition_key

    def _time_partition(self, record: Dict[str, Any]) -> str:
        """Time-based partitioning."""
        if not self.partition_columns:
            return "partition_default"

        time_value = record.get(self.partition_columns[0])
        if isinstance(time_value, str):
            time_value = datetime.fromisoformat(time_value)
        elif not isinstance(time_value, datetime):
            return "partition_default"

        if self.time_granularity == "year":
            partition_key = f"year={time_value.year}"
        elif self.time_granularity == "month":
            partition_key = f"year={time_value.year}/month={time_value.month:02d}"
        elif self.time_granularity == "day":
            partition_key = f"year={time_value.year}/month={time_value.month:02d}/day={time_value.day:02d}"
        elif self.time_granularity == "hour":
            partition_key = f"year={time_value.year}/month={time_value.month:02d}/day={time_value.day:02d}/hour={time_value.hour:02d}"
        else:
            partition_key = f"date={time_value.strftime('%Y-%m-%d')}"

        self._partition_stats[partition_key] += 1
        return partition_key

    def _composite_partition(self, record: Dict[str, Any]) -> str:
        """Composite partitioning combining multiple columns."""
        parts = []
        for col in self.partition_columns:
            value = record.get(col)
            if isinstance(value, datetime):
                parts.append(f"{col}={value.strftime('%Y-%m-%d')}")
            else:
                parts.append(f"{col}={value}")

        partition_key = "/".join(parts)
        self._partition_stats[partition_key] += 1
        return partition_key

    def _identity_partition(self, record: Dict[str, Any]) -> str:
        """Identity partitioning - one partition per unique value."""
        if not self.partition_columns:
            return "partition_default"

        col = self.partition_columns[0]
        value = record.get(col, "null")
        partition_key = f"{col}={value}"
        self._partition_stats[partition_key] += 1
        return partition_key

    def get_partition_stats(self) -> Dict[str, int]:
        """Get partition distribution statistics."""
        return dict(self._partition_stats)

    def get_skewed_partitions(self, threshold: float = 2.0) -> List[str]:
        """Identify skewed partitions."""
        if not self._partition_stats:
            return []

        avg = sum(self._partition_stats.values()) / len(self._partition_stats)
        return [p for p, count in self._partition_stats.items() if count > avg * threshold]

    def suggest_rebalancing(self) -> Dict[str, Any]:
        """Suggest partition rebalancing strategies."""
        skewed = self.get_skewed_partitions()
        stats = self.get_partition_stats()

        if not stats:
            return {"action": "none", "reason": "No data"}

        total = sum(stats.values())
        num_partitions = len(stats)
        ideal_per_partition = total / num_partitions

        return {
            "action": "rebalance" if skewed else "none",
            "skewed_partitions": skewed,
            "current_distribution": {
                "total_records": total,
                "num_partitions": num_partitions,
                "ideal_per_partition": ideal_per_partition,
                "max_records": max(stats.values()) if stats else 0,
                "min_records": min(stats.values()) if stats else 0
            },
            "recommendation": "Consider hash-based sub-partitioning for skewed partitions" if skewed else "Distribution is balanced"
        }


# ============================================================================
# COMPACTION ENGINE
# ============================================================================

class CompactionEngine:
    """
    Background compaction engine for storage optimization.

    Supports multiple compaction strategies:
    - Size-based: Merge small files into larger ones
    - Time-based: Compact files based on age
    - Partition-based: Compact within partitions
    - Hybrid: Combine multiple strategies
    """

    def __init__(
        self,
        data_path: str,
        strategy: CompactionStrategy = CompactionStrategy.SIZE_BASED,
        target_file_size_mb: int = 128,
        min_files_to_compact: int = 4,
        max_files_per_compaction: int = 100,
        compaction_interval_seconds: int = 3600
    ):
        self.data_path = Path(data_path)
        self.strategy = strategy
        self.target_file_size_bytes = target_file_size_mb * 1024 * 1024
        self.min_files_to_compact = min_files_to_compact
        self.max_files_per_compaction = max_files_per_compaction
        self.compaction_interval = compaction_interval_seconds

        self._running = False
        self._thread: Optional[threading.Thread] = None
        self._compaction_history: List[Dict[str, Any]] = []
        self._lock = threading.Lock()
        self._executor = ThreadPoolExecutor(max_workers=4)

    def start_background_compaction(self) -> None:
        """Start background compaction thread."""
        if self._running:
            logger.warning("Compaction already running")
            return

        self._running = True
        self._thread = threading.Thread(target=self._compaction_loop, daemon=True)
        self._thread.start()
        logger.info("Background compaction started")

    def stop_background_compaction(self) -> None:
        """Stop background compaction."""
        self._running = False
        if self._thread:
            self._thread.join(timeout=5)
        logger.info("Background compaction stopped")

    def _compaction_loop(self) -> None:
        """Main compaction loop."""
        while self._running:
            try:
                self.run_compaction()
            except Exception as e:
                logger.error(f"Compaction error: {e}")

            time.sleep(self.compaction_interval)

    def run_compaction(self, partition: Optional[str] = None) -> Dict[str, Any]:
        """Run compaction manually."""
        with self._lock:
            if self.strategy == CompactionStrategy.SIZE_BASED:
                result = self._size_based_compaction(partition)
            elif self.strategy == CompactionStrategy.TIME_BASED:
                result = self._time_based_compaction(partition)
            elif self.strategy == CompactionStrategy.PARTITION_BASED:
                result = self._partition_based_compaction()
            else:
                result = self._hybrid_compaction(partition)

            self._compaction_history.append({
                "timestamp": datetime.utcnow().isoformat(),
                "strategy": self.strategy.value,
                "result": result
            })

            return result

    def _size_based_compaction(self, partition: Optional[str] = None) -> Dict[str, Any]:
        """Compact small files into larger ones."""
        files_compacted = 0
        bytes_saved = 0

        target_path = self.data_path / partition if partition else self.data_path

        if not target_path.exists():
            return {"files_compacted": 0, "bytes_saved": 0}

        # Group files by partition
        partition_files: Dict[str, List[Path]] = defaultdict(list)

        for file_path in target_path.rglob("*.data"):
            partition_key = str(file_path.parent.relative_to(target_path))
            partition_files[partition_key].append(file_path)

        for part_key, files in partition_files.items():
            small_files = [f for f in files if f.stat().st_size < self.target_file_size_bytes]

            if len(small_files) >= self.min_files_to_compact:
                # Select files to compact
                to_compact = small_files[:self.max_files_per_compaction]

                # Merge files
                merged_data = []
                total_size = 0

                for file_path in to_compact:
                    try:
                        with gzip.open(file_path, 'rb') as f:
                            data = pickle.load(f)
                            if isinstance(data, list):
                                merged_data.extend(data)
                            else:
                                merged_data.append(data)
                            total_size += file_path.stat().st_size
                    except Exception as e:
                        logger.warning(f"Failed to read {file_path}: {e}")
                        continue

                if merged_data:
                    # Write merged file
                    merged_file = target_path / part_key / f"compacted_{uuid.uuid4().hex[:8]}.data"
                    merged_file.parent.mkdir(parents=True, exist_ok=True)

                    with gzip.open(merged_file, 'wb') as f:
                        pickle.dump(merged_data, f)

                    new_size = merged_file.stat().st_size

                    # Remove old files
                    for file_path in to_compact:
                        try:
                            file_path.unlink()
                            files_compacted += 1
                        except Exception as e:
                            logger.warning(f"Failed to remove {file_path}: {e}")

                    bytes_saved += total_size - new_size

        return {
            "files_compacted": files_compacted,
            "bytes_saved": bytes_saved,
            "partitions_processed": len(partition_files)
        }

    def _time_based_compaction(self, partition: Optional[str] = None, max_age_hours: int = 24) -> Dict[str, Any]:
        """Compact files based on age."""
        cutoff = datetime.utcnow() - timedelta(hours=max_age_hours)
        target_path = self.data_path / partition if partition else self.data_path

        old_files = []
        if target_path.exists():
            for file_path in target_path.rglob("*.data"):
                mtime = datetime.fromtimestamp(file_path.stat().st_mtime)
                if mtime < cutoff:
                    old_files.append(file_path)

        # Apply size-based compaction to old files
        return self._compact_file_list(old_files)

    def _partition_based_compaction(self) -> Dict[str, Any]:
        """Compact files within each partition."""
        total_result = {"files_compacted": 0, "bytes_saved": 0, "partitions_processed": 0}

        if not self.data_path.exists():
            return total_result

        # Find all partition directories
        for partition_dir in self.data_path.iterdir():
            if partition_dir.is_dir():
                result = self._size_based_compaction(partition_dir.name)
                total_result["files_compacted"] += result["files_compacted"]
                total_result["bytes_saved"] += result["bytes_saved"]
                total_result["partitions_processed"] += 1

        return total_result

    def _hybrid_compaction(self, partition: Optional[str] = None) -> Dict[str, Any]:
        """Combine multiple compaction strategies."""
        # First, time-based to identify old files
        time_result = self._time_based_compaction(partition)

        # Then, size-based for remaining small files
        size_result = self._size_based_compaction(partition)

        return {
            "files_compacted": time_result["files_compacted"] + size_result["files_compacted"],
            "bytes_saved": time_result["bytes_saved"] + size_result["bytes_saved"],
            "strategy_results": {
                "time_based": time_result,
                "size_based": size_result
            }
        }

    def _compact_file_list(self, files: List[Path]) -> Dict[str, Any]:
        """Compact a specific list of files."""
        if len(files) < self.min_files_to_compact:
            return {"files_compacted": 0, "bytes_saved": 0}

        # Group by parent directory
        by_parent: Dict[Path, List[Path]] = defaultdict(list)
        for f in files:
            by_parent[f.parent].append(f)

        total_compacted = 0
        total_saved = 0

        for parent, file_list in by_parent.items():
            merged_data = []
            total_size = 0

            for file_path in file_list[:self.max_files_per_compaction]:
                try:
                    with gzip.open(file_path, 'rb') as f:
                        data = pickle.load(f)
                        if isinstance(data, list):
                            merged_data.extend(data)
                        else:
                            merged_data.append(data)
                        total_size += file_path.stat().st_size
                except Exception:
                    continue

            if merged_data:
                merged_file = parent / f"compacted_{uuid.uuid4().hex[:8]}.data"
                with gzip.open(merged_file, 'wb') as f:
                    pickle.dump(merged_data, f)

                new_size = merged_file.stat().st_size

                for file_path in file_list[:self.max_files_per_compaction]:
                    try:
                        file_path.unlink()
                        total_compacted += 1
                    except Exception:
                        pass

                total_saved += total_size - new_size

        return {"files_compacted": total_compacted, "bytes_saved": total_saved}

    def get_compaction_stats(self) -> Dict[str, Any]:
        """Get compaction statistics."""
        return {
            "total_compactions": len(self._compaction_history),
            "recent_compactions": self._compaction_history[-10:],
            "strategy": self.strategy.value,
            "config": {
                "target_file_size_mb": self.target_file_size_bytes / (1024 * 1024),
                "min_files_to_compact": self.min_files_to_compact,
                "max_files_per_compaction": self.max_files_per_compaction,
                "interval_seconds": self.compaction_interval
            }
        }


# ============================================================================
# TIME TRAVEL
# ============================================================================

class TimeTravel:
    """
    Query historical states with snapshot isolation.

    Supports:
    - Point-in-time queries
    - Snapshot rollback
    - Version history navigation
    - Garbage collection of old snapshots
    """

    def __init__(self, metadata_path: str, retention_days: int = 30):
        self.metadata_path = Path(metadata_path)
        self.retention_days = retention_days
        self.snapshots: Dict[str, Snapshot] = {}
        self.snapshot_timeline: List[str] = []  # Ordered list of snapshot IDs
        self._lock = threading.Lock()

        self._load_snapshots()

    def _load_snapshots(self) -> None:
        """Load snapshots from disk."""
        snapshot_file = self.metadata_path / "snapshots.json"
        if snapshot_file.exists():
            try:
                with open(snapshot_file, 'r') as f:
                    data = json.load(f)
                    for snap_data in data.get("snapshots", []):
                        snapshot = Snapshot.from_dict(snap_data)
                        self.snapshots[snapshot.snapshot_id] = snapshot
                        self.snapshot_timeline.append(snapshot.snapshot_id)
            except Exception as e:
                logger.error(f"Failed to load snapshots: {e}")

    def _save_snapshots(self) -> None:
        """Save snapshots to disk."""
        self.metadata_path.mkdir(parents=True, exist_ok=True)
        snapshot_file = self.metadata_path / "snapshots.json"

        data = {
            "snapshots": [s.to_dict() for s in self.snapshots.values()],
            "current_snapshot": self.snapshot_timeline[-1] if self.snapshot_timeline else None
        }

        with open(snapshot_file, 'w') as f:
            json.dump(data, f, indent=2)

    def create_snapshot(
        self,
        operation: SnapshotOperation,
        schema_id: str,
        manifest_list: List[str],
        summary: Optional[Dict[str, Any]] = None
    ) -> Snapshot:
        """Create a new snapshot."""
        with self._lock:
            parent_id = self.snapshot_timeline[-1] if self.snapshot_timeline else None

            snapshot = Snapshot(
                parent_snapshot_id=parent_id,
                operation=operation,
                schema_id=schema_id,
                manifest_list=manifest_list,
                summary=summary or {}
            )

            self.snapshots[snapshot.snapshot_id] = snapshot
            self.snapshot_timeline.append(snapshot.snapshot_id)
            self._save_snapshots()

            logger.info(f"Created snapshot {snapshot.snapshot_id}")
            return snapshot

    def get_snapshot(self, snapshot_id: str) -> Optional[Snapshot]:
        """Get a specific snapshot."""
        return self.snapshots.get(snapshot_id)

    def get_current_snapshot(self) -> Optional[Snapshot]:
        """Get the current (latest) snapshot."""
        if not self.snapshot_timeline:
            return None
        return self.snapshots.get(self.snapshot_timeline[-1])

    def query_at_timestamp(self, timestamp: datetime) -> Optional[Snapshot]:
        """Find the snapshot valid at a given timestamp."""
        valid_snapshot = None

        for snap_id in self.snapshot_timeline:
            snapshot = self.snapshots.get(snap_id)
            if snapshot and snapshot.timestamp <= timestamp:
                valid_snapshot = snapshot
            else:
                break

        return valid_snapshot

    def query_at_version(self, version: int) -> Optional[Snapshot]:
        """Get snapshot at a specific version (1-indexed)."""
        if 1 <= version <= len(self.snapshot_timeline):
            snap_id = self.snapshot_timeline[version - 1]
            return self.snapshots.get(snap_id)
        return None

    def rollback_to_snapshot(self, snapshot_id: str) -> bool:
        """Rollback to a specific snapshot."""
        with self._lock:
            if snapshot_id not in self.snapshots:
                logger.error(f"Snapshot {snapshot_id} not found")
                return False

            # Find the index of the target snapshot
            try:
                idx = self.snapshot_timeline.index(snapshot_id)
            except ValueError:
                return False

            # Create a new snapshot that represents the rollback
            target_snapshot = self.snapshots[snapshot_id]
            rollback_snapshot = Snapshot(
                parent_snapshot_id=self.snapshot_timeline[-1],
                operation=SnapshotOperation.OVERWRITE,
                schema_id=target_snapshot.schema_id,
                manifest_list=target_snapshot.manifest_list,
                summary={"rollback_to": snapshot_id, "rollback_timestamp": datetime.utcnow().isoformat()}
            )

            self.snapshots[rollback_snapshot.snapshot_id] = rollback_snapshot
            self.snapshot_timeline.append(rollback_snapshot.snapshot_id)
            self._save_snapshots()

            logger.info(f"Rolled back to snapshot {snapshot_id}")
            return True

    def get_history(self, limit: int = 100) -> List[Dict[str, Any]]:
        """Get snapshot history."""
        history = []
        for snap_id in reversed(self.snapshot_timeline[-limit:]):
            snapshot = self.snapshots.get(snap_id)
            if snapshot:
                history.append(snapshot.to_dict())
        return history

    def get_changes_between(self, from_snapshot_id: str, to_snapshot_id: str) -> Dict[str, Any]:
        """Get changes between two snapshots."""
        from_snap = self.snapshots.get(from_snapshot_id)
        to_snap = self.snapshots.get(to_snapshot_id)

        if not from_snap or not to_snap:
            return {"error": "Snapshot not found"}

        from_files = set(from_snap.manifest_list)
        to_files = set(to_snap.manifest_list)

        return {
            "added": list(to_files - from_files),
            "removed": list(from_files - to_files),
            "unchanged": list(from_files & to_files),
            "from_timestamp": from_snap.timestamp.isoformat(),
            "to_timestamp": to_snap.timestamp.isoformat()
        }

    def garbage_collect(self) -> Dict[str, Any]:
        """Remove expired snapshots."""
        with self._lock:
            cutoff = datetime.utcnow() - timedelta(days=self.retention_days)
            removed = []

            # Keep at least the latest snapshot
            snapshots_to_check = self.snapshot_timeline[:-1] if self.snapshot_timeline else []

            for snap_id in list(snapshots_to_check):
                snapshot = self.snapshots.get(snap_id)
                if snapshot and snapshot.timestamp < cutoff:
                    del self.snapshots[snap_id]
                    self.snapshot_timeline.remove(snap_id)
                    removed.append(snap_id)

            if removed:
                self._save_snapshots()

            return {
                "removed_count": len(removed),
                "removed_snapshots": removed,
                "remaining_snapshots": len(self.snapshot_timeline)
            }


# ============================================================================
# SCHEMA EVOLUTION
# ============================================================================

class SchemaEvolution:
    """
    Handle schema changes without data loss.

    Supports:
    - Adding columns with defaults
    - Dropping columns safely
    - Renaming columns with mapping
    - Type changes with validation
    - Full schema history
    """

    def __init__(self, metadata_path: str):
        self.metadata_path = Path(metadata_path)
        self.schema_history: List[TableSchema] = []
        self.evolution_log: List[Dict[str, Any]] = []
        self._lock = threading.Lock()

        self._load_schema_history()

    def _load_schema_history(self) -> None:
        """Load schema history from disk."""
        schema_file = self.metadata_path / "schema_history.json"
        if schema_file.exists():
            try:
                with open(schema_file, 'r') as f:
                    data = json.load(f)
                    self.evolution_log = data.get("evolution_log", [])
            except Exception as e:
                logger.error(f"Failed to load schema history: {e}")

    def _save_schema_history(self) -> None:
        """Save schema history to disk."""
        self.metadata_path.mkdir(parents=True, exist_ok=True)
        schema_file = self.metadata_path / "schema_history.json"

        data = {
            "schemas": [s.to_dict() for s in self.schema_history],
            "current_schema": self.schema_history[-1].to_dict() if self.schema_history else None,
            "evolution_log": self.evolution_log
        }

        with open(schema_file, 'w') as f:
            json.dump(data, f, indent=2)

    def register_schema(self, schema: TableSchema) -> None:
        """Register a new schema version."""
        with self._lock:
            self.schema_history.append(schema)
            self.evolution_log.append({
                "action": "register",
                "schema_id": schema.schema_id,
                "version": schema.version,
                "timestamp": datetime.utcnow().isoformat()
            })
            self._save_schema_history()

    def get_current_schema(self) -> Optional[TableSchema]:
        """Get the current schema."""
        return self.schema_history[-1] if self.schema_history else None

    def get_schema_by_id(self, schema_id: str) -> Optional[TableSchema]:
        """Get schema by ID."""
        for schema in self.schema_history:
            if schema.schema_id == schema_id:
                return schema
        return None

    def add_column(
        self,
        column: ColumnSchema,
        default_value: Any = None
    ) -> TableSchema:
        """Add a new column to the schema."""
        with self._lock:
            current = self.get_current_schema()
            if not current:
                raise ValueError("No schema registered")

            # Create new schema version
            new_schema = TableSchema(
                columns=current.columns.copy(),
                partition_columns=current.partition_columns,
                sort_columns=current.sort_columns,
                primary_key=current.primary_key,
                version=current.version + 1
            )

            column.default_value = default_value
            new_schema.add_column(column)

            self.schema_history.append(new_schema)
            self.evolution_log.append({
                "action": SchemaAction.ADD_COLUMN.value,
                "column_name": column.name,
                "column_type": column.data_type,
                "default_value": str(default_value),
                "from_version": current.version,
                "to_version": new_schema.version,
                "timestamp": datetime.utcnow().isoformat()
            })
            self._save_schema_history()

            return new_schema

    def drop_column(self, column_name: str) -> TableSchema:
        """Drop a column from the schema."""
        with self._lock:
            current = self.get_current_schema()
            if not current:
                raise ValueError("No schema registered")

            if not current.get_column(column_name):
                raise ValueError(f"Column {column_name} does not exist")

            # Create new schema version
            new_schema = TableSchema(
                columns=[c for c in current.columns if c.name != column_name],
                partition_columns=[p for p in current.partition_columns if p != column_name],
                sort_columns=[s for s in current.sort_columns if s != column_name],
                primary_key=[k for k in (current.primary_key or []) if k != column_name] or None,
                version=current.version + 1
            )

            self.schema_history.append(new_schema)
            self.evolution_log.append({
                "action": SchemaAction.DROP_COLUMN.value,
                "column_name": column_name,
                "from_version": current.version,
                "to_version": new_schema.version,
                "timestamp": datetime.utcnow().isoformat()
            })
            self._save_schema_history()

            return new_schema

    def rename_column(self, old_name: str, new_name: str) -> TableSchema:
        """Rename a column."""
        with self._lock:
            current = self.get_current_schema()
            if not current:
                raise ValueError("No schema registered")

            old_column = current.get_column(old_name)
            if not old_column:
                raise ValueError(f"Column {old_name} does not exist")

            # Create new schema with renamed column
            new_columns = []
            for col in current.columns:
                if col.name == old_name:
                    new_col = ColumnSchema(
                        name=new_name,
                        data_type=col.data_type,
                        nullable=col.nullable,
                        default_value=col.default_value,
                        metadata={**col.metadata, "renamed_from": old_name}
                    )
                    new_columns.append(new_col)
                else:
                    new_columns.append(col)

            new_schema = TableSchema(
                columns=new_columns,
                partition_columns=[new_name if p == old_name else p for p in current.partition_columns],
                sort_columns=[new_name if s == old_name else s for s in current.sort_columns],
                primary_key=[new_name if k == old_name else k for k in (current.primary_key or [])] or None,
                version=current.version + 1
            )

            self.schema_history.append(new_schema)
            self.evolution_log.append({
                "action": SchemaAction.RENAME_COLUMN.value,
                "old_name": old_name,
                "new_name": new_name,
                "from_version": current.version,
                "to_version": new_schema.version,
                "timestamp": datetime.utcnow().isoformat()
            })
            self._save_schema_history()

            return new_schema

    def change_column_type(
        self,
        column_name: str,
        new_type: str,
        coerce_function: Optional[Callable] = None
    ) -> TableSchema:
        """Change a column's data type."""
        with self._lock:
            current = self.get_current_schema()
            if not current:
                raise ValueError("No schema registered")

            old_column = current.get_column(column_name)
            if not old_column:
                raise ValueError(f"Column {column_name} does not exist")

            # Create new schema with changed type
            new_columns = []
            for col in current.columns:
                if col.name == column_name:
                    new_col = ColumnSchema(
                        name=col.name,
                        data_type=new_type,
                        nullable=col.nullable,
                        default_value=col.default_value,
                        metadata={**col.metadata, "previous_type": col.data_type}
                    )
                    new_columns.append(new_col)
                else:
                    new_columns.append(col)

            new_schema = TableSchema(
                columns=new_columns,
                partition_columns=current.partition_columns,
                sort_columns=current.sort_columns,
                primary_key=current.primary_key,
                version=current.version + 1
            )

            self.schema_history.append(new_schema)
            self.evolution_log.append({
                "action": SchemaAction.CHANGE_TYPE.value,
                "column_name": column_name,
                "old_type": old_column.data_type,
                "new_type": new_type,
                "from_version": current.version,
                "to_version": new_schema.version,
                "timestamp": datetime.utcnow().isoformat()
            })
            self._save_schema_history()

            return new_schema

    def get_evolution_history(self) -> List[Dict[str, Any]]:
        """Get complete schema evolution history."""
        return self.evolution_log

    def validate_record(self, record: Dict[str, Any], schema: Optional[TableSchema] = None) -> Tuple[bool, List[str]]:
        """Validate a record against a schema."""
        schema = schema or self.get_current_schema()
        if not schema:
            return False, ["No schema available"]

        errors = []

        for column in schema.columns:
            value = record.get(column.name)

            if value is None:
                if not column.nullable and column.default_value is None:
                    errors.append(f"Column {column.name} is required")
            else:
                # Basic type validation
                expected_type = column.data_type.lower()
                actual_type = type(value).__name__.lower()

                type_mapping = {
                    "string": ["str"],
                    "integer": ["int"],
                    "float": ["float", "int"],
                    "boolean": ["bool"],
                    "datetime": ["datetime", "str"],
                    "json": ["dict", "list"]
                }

                valid_types = type_mapping.get(expected_type, [expected_type])
                if actual_type not in valid_types:
                    errors.append(f"Column {column.name} expected {expected_type}, got {actual_type}")

        return len(errors) == 0, errors


# ============================================================================
# METADATA CATALOG
# ============================================================================

class MetadataCatalog:
    """
    Centralized metadata management for the data lake.

    Manages:
    - Table metadata
    - Partition information
    - Schema registry
    - Statistics cache
    - Lineage tracking
    """

    def __init__(self, catalog_path: str):
        self.catalog_path = Path(catalog_path)
        self.catalog_path.mkdir(parents=True, exist_ok=True)

        self.tables: Dict[str, Dict[str, Any]] = {}
        self.namespaces: Dict[str, Set[str]] = defaultdict(set)
        self.statistics: Dict[str, Dict[str, Any]] = {}
        self.lineage: Dict[str, List[Dict[str, Any]]] = defaultdict(list)
        self._lock = threading.Lock()

        self._load_catalog()

    def _load_catalog(self) -> None:
        """Load catalog from disk."""
        catalog_file = self.catalog_path / "catalog.json"
        if catalog_file.exists():
            try:
                with open(catalog_file, 'r') as f:
                    data = json.load(f)
                    self.tables = data.get("tables", {})
                    self.namespaces = defaultdict(set, {
                        k: set(v) for k, v in data.get("namespaces", {}).items()
                    })
            except Exception as e:
                logger.error(f"Failed to load catalog: {e}")

    def _save_catalog(self) -> None:
        """Save catalog to disk."""
        catalog_file = self.catalog_path / "catalog.json"
        data = {
            "tables": self.tables,
            "namespaces": {k: list(v) for k, v in self.namespaces.items()},
            "updated_at": datetime.utcnow().isoformat()
        }

        with open(catalog_file, 'w') as f:
            json.dump(data, f, indent=2)

    def create_namespace(self, namespace: str, properties: Optional[Dict[str, Any]] = None) -> None:
        """Create a new namespace."""
        with self._lock:
            if namespace not in self.namespaces:
                self.namespaces[namespace] = set()
                logger.info(f"Created namespace: {namespace}")
                self._save_catalog()

    def list_namespaces(self) -> List[str]:
        """List all namespaces."""
        return list(self.namespaces.keys())

    def register_table(
        self,
        namespace: str,
        table_name: str,
        schema: TableSchema,
        location: str,
        properties: Optional[Dict[str, Any]] = None
    ) -> str:
        """Register a new table in the catalog."""
        with self._lock:
            full_name = f"{namespace}.{table_name}"

            if full_name in self.tables:
                raise ValueError(f"Table {full_name} already exists")

            self.tables[full_name] = {
                "namespace": namespace,
                "name": table_name,
                "schema_id": schema.schema_id,
                "schema": schema.to_dict(),
                "location": location,
                "properties": properties or {},
                "created_at": datetime.utcnow().isoformat(),
                "updated_at": datetime.utcnow().isoformat()
            }

            self.namespaces[namespace].add(table_name)
            self._save_catalog()

            logger.info(f"Registered table: {full_name}")
            return full_name

    def get_table(self, namespace: str, table_name: str) -> Optional[Dict[str, Any]]:
        """Get table metadata."""
        full_name = f"{namespace}.{table_name}"
        return self.tables.get(full_name)

    def list_tables(self, namespace: Optional[str] = None) -> List[str]:
        """List tables, optionally filtered by namespace."""
        if namespace:
            return list(self.namespaces.get(namespace, set()))
        return list(self.tables.keys())

    def drop_table(self, namespace: str, table_name: str) -> bool:
        """Drop a table from the catalog."""
        with self._lock:
            full_name = f"{namespace}.{table_name}"

            if full_name not in self.tables:
                return False

            del self.tables[full_name]
            self.namespaces[namespace].discard(table_name)
            self._save_catalog()

            logger.info(f"Dropped table: {full_name}")
            return True

    def update_table_properties(
        self,
        namespace: str,
        table_name: str,
        properties: Dict[str, Any]
    ) -> None:
        """Update table properties."""
        with self._lock:
            full_name = f"{namespace}.{table_name}"

            if full_name not in self.tables:
                raise ValueError(f"Table {full_name} not found")

            self.tables[full_name]["properties"].update(properties)
            self.tables[full_name]["updated_at"] = datetime.utcnow().isoformat()
            self._save_catalog()

    def update_statistics(
        self,
        namespace: str,
        table_name: str,
        stats: Dict[str, Any]
    ) -> None:
        """Update table statistics."""
        full_name = f"{namespace}.{table_name}"
        self.statistics[full_name] = {
            **stats,
            "computed_at": datetime.utcnow().isoformat()
        }

    def get_statistics(self, namespace: str, table_name: str) -> Optional[Dict[str, Any]]:
        """Get table statistics."""
        full_name = f"{namespace}.{table_name}"
        return self.statistics.get(full_name)

    def add_lineage(
        self,
        target_table: str,
        source_tables: List[str],
        transformation: str,
        metadata: Optional[Dict[str, Any]] = None
    ) -> None:
        """Add lineage information."""
        self.lineage[target_table].append({
            "source_tables": source_tables,
            "transformation": transformation,
            "metadata": metadata or {},
            "timestamp": datetime.utcnow().isoformat()
        })

    def get_lineage(self, table_name: str, direction: str = "upstream") -> List[Dict[str, Any]]:
        """Get lineage for a table."""
        if direction == "upstream":
            return self.lineage.get(table_name, [])
        else:
            # Find downstream dependencies
            downstream = []
            for target, lineage_list in self.lineage.items():
                for lineage in lineage_list:
                    if table_name in lineage.get("source_tables", []):
                        downstream.append({
                            "target_table": target,
                            **lineage
                        })
            return downstream

    def search_tables(self, query: str) -> List[Dict[str, Any]]:
        """Search tables by name or properties."""
        results = []
        query_lower = query.lower()

        for full_name, metadata in self.tables.items():
            if query_lower in full_name.lower():
                results.append({"name": full_name, "metadata": metadata})
                continue

            for key, value in metadata.get("properties", {}).items():
                if query_lower in str(value).lower():
                    results.append({"name": full_name, "metadata": metadata})
                    break

        return results


# ============================================================================
# DATA LAKE MANAGER
# ============================================================================

class DataLakeManager:
    """
    Core orchestration for Iceberg/Delta Lake-style data lake operations.

    Integrates all components:
    - Partition management
    - Compaction scheduling
    - Time travel capabilities
    - Schema evolution
    - Metadata catalog

    Supports petabyte-scale knowledge management for AIVA.
    """

    def __init__(
        self,
        lake_path: str,
        default_file_format: FileFormat = FileFormat.PICKLE,
        enable_compaction: bool = True,
        compaction_interval_seconds: int = 3600,
        snapshot_retention_days: int = 30
    ):
        self.lake_path = Path(lake_path)
        self.lake_path.mkdir(parents=True, exist_ok=True)

        self.default_file_format = default_file_format

        # Initialize paths
        self.data_path = self.lake_path / "data"
        self.metadata_path = self.lake_path / "metadata"
        self.catalog_path = self.lake_path / "catalog"

        self.data_path.mkdir(exist_ok=True)
        self.metadata_path.mkdir(exist_ok=True)
        self.catalog_path.mkdir(exist_ok=True)

        # Initialize components
        self.catalog = MetadataCatalog(str(self.catalog_path))
        self.time_travel = TimeTravel(str(self.metadata_path), snapshot_retention_days)
        self.schema_evolution = SchemaEvolution(str(self.metadata_path))

        self.compaction_engine = CompactionEngine(
            str(self.data_path),
            strategy=CompactionStrategy.HYBRID,
            compaction_interval_seconds=compaction_interval_seconds
        )

        # Partition strategies per table
        self.partition_strategies: Dict[str, PartitionStrategy] = {}

        # File tracking
        self.data_files: Dict[str, List[DataFile]] = defaultdict(list)
        self.manifests: Dict[str, List[ManifestEntry]] = defaultdict(list)

        # Executor for async operations
        self._executor = ThreadPoolExecutor(max_workers=8)
        self._lock = threading.Lock()

        if enable_compaction:
            self.compaction_engine.start_background_compaction()

        logger.info(f"DataLakeManager initialized at {self.lake_path}")

    def create_table(
        self,
        namespace: str,
        table_name: str,
        schema: TableSchema,
        partition_strategy: Optional[PartitionStrategy] = None,
        properties: Optional[Dict[str, Any]] = None
    ) -> str:
        """Create a new table in the data lake."""
        with self._lock:
            # Create namespace if needed
            self.catalog.create_namespace(namespace)

            # Create table directory
            table_path = self.data_path / namespace / table_name
            table_path.mkdir(parents=True, exist_ok=True)

            # Register schema
            self.schema_evolution.register_schema(schema)

            # Register table in catalog
            full_name = self.catalog.register_table(
                namespace=namespace,
                table_name=table_name,
                schema=schema,
                location=str(table_path),
                properties=properties
            )

            # Set partition strategy
            if partition_strategy:
                self.partition_strategies[full_name] = partition_strategy
            else:
                # Default to time-based partitioning if there's a timestamp column
                timestamp_cols = [c.name for c in schema.columns if 'time' in c.name.lower() or 'date' in c.name.lower()]
                if timestamp_cols:
                    self.partition_strategies[full_name] = PartitionStrategy(
                        partition_type=PartitionType.TIME_BASED,
                        partition_columns=[timestamp_cols[0]],
                        time_granularity="day"
                    )
                else:
                    self.partition_strategies[full_name] = PartitionStrategy(
                        partition_type=PartitionType.HASH,
                        num_partitions=256
                    )

            # Create initial snapshot
            self.time_travel.create_snapshot(
                operation=SnapshotOperation.APPEND,
                schema_id=schema.schema_id,
                manifest_list=[],
                summary={"action": "create_table", "table": full_name}
            )

            logger.info(f"Created table: {full_name}")
            return full_name

    def write_records(
        self,
        namespace: str,
        table_name: str,
        records: List[Dict[str, Any]],
        validate: bool = True
    ) -> Dict[str, Any]:
        """Write records to a table."""
        full_name = f"{namespace}.{table_name}"

        table_meta = self.catalog.get_table(namespace, table_name)
        if not table_meta:
            raise ValueError(f"Table {full_name} not found")

        # Validate records
        if validate:
            schema = self.schema_evolution.get_current_schema()
            for i, record in enumerate(records):
                valid, errors = self.schema_evolution.validate_record(record, schema)
                if not valid:
                    raise ValueError(f"Record {i} validation failed: {errors}")

        # Partition records
        partition_strategy = self.partition_strategies.get(full_name)
        partitioned_records: Dict[str, List[Dict[str, Any]]] = defaultdict(list)

        for record in records:
            if partition_strategy:
                partition_key = partition_strategy.compute_partition(record)
            else:
                partition_key = "partition_default"
            partitioned_records[partition_key].append(record)

        # Write to files
        written_files = []
        total_bytes = 0

        table_path = Path(table_meta["location"])

        for partition_key, partition_records in partitioned_records.items():
            partition_path = table_path / partition_key
            partition_path.mkdir(parents=True, exist_ok=True)

            # Create data file
            file_id = uuid.uuid4().hex[:12]
            file_name = f"data_{file_id}.data"
            file_path = partition_path / file_name

            # Write with compression
            with gzip.open(file_path, 'wb') as f:
                pickle.dump(partition_records, f)

            file_size = file_path.stat().st_size
            total_bytes += file_size

            # Create DataFile record
            data_file = DataFile(
                file_id=file_id,
                file_path=str(file_path),
                file_format=self.default_file_format,
                file_size_bytes=file_size,
                record_count=len(partition_records),
                partition_values={"partition": partition_key}
            )

            self.data_files[full_name].append(data_file)
            written_files.append(data_file.file_path)

        # Create snapshot
        schema = self.schema_evolution.get_current_schema()
        self.time_travel.create_snapshot(
            operation=SnapshotOperation.APPEND,
            schema_id=schema.schema_id if schema else "",
            manifest_list=written_files,
            summary={
                "action": "append",
                "records_written": len(records),
                "files_created": len(written_files),
                "bytes_written": total_bytes
            }
        )

        return {
            "records_written": len(records),
            "files_created": len(written_files),
            "bytes_written": total_bytes,
            "partitions": list(partitioned_records.keys())
        }

    def read_table(
        self,
        namespace: str,
        table_name: str,
        partition_filter: Optional[Dict[str, Any]] = None,
        columns: Optional[List[str]] = None,
        limit: Optional[int] = None,
        snapshot_id: Optional[str] = None
    ) -> List[Dict[str, Any]]:
        """Read records from a table."""
        full_name = f"{namespace}.{table_name}"

        table_meta = self.catalog.get_table(namespace, table_name)
        if not table_meta:
            raise ValueError(f"Table {full_name} not found")

        # Get snapshot to read from
        if snapshot_id:
            snapshot = self.time_travel.get_snapshot(snapshot_id)
        else:
            snapshot = self.time_travel.get_current_snapshot()

        records = []
        table_path = Path(table_meta["location"])

        if not table_path.exists():
            return records

        # Read from data files
        for data_file_path in table_path.rglob("*.data"):
            # Apply partition filter
            if partition_filter:
                partition_parts = str(data_file_path.parent.relative_to(table_path))
                skip = False
                for key, value in partition_filter.items():
                    if f"{key}={value}" not in partition_parts and str(value) not in partition_parts:
                        skip = True
                        break
                if skip:
                    continue

            try:
                with gzip.open(data_file_path, 'rb') as f:
                    file_records = pickle.load(f)

                    for record in file_records:
                        # Apply column selection
                        if columns:
                            record = {k: v for k, v in record.items() if k in columns}

                        records.append(record)

                        if limit and len(records) >= limit:
                            return records
            except Exception as e:
                logger.warning(f"Failed to read {data_file_path}: {e}")
                continue

        return records

    def delete_records(
        self,
        namespace: str,
        table_name: str,
        predicate: Callable[[Dict[str, Any]], bool]
    ) -> Dict[str, Any]:
        """Delete records matching a predicate."""
        full_name = f"{namespace}.{table_name}"

        table_meta = self.catalog.get_table(namespace, table_name)
        if not table_meta:
            raise ValueError(f"Table {full_name} not found")

        table_path = Path(table_meta["location"])
        deleted_count = 0
        files_modified = 0

        for data_file_path in table_path.rglob("*.data"):
            try:
                with gzip.open(data_file_path, 'rb') as f:
                    records = pickle.load(f)

                original_count = len(records)
                records = [r for r in records if not predicate(r)]
                new_count = len(records)

                if new_count < original_count:
                    deleted_count += original_count - new_count
                    files_modified += 1

                    if records:
                        with gzip.open(data_file_path, 'wb') as f:
                            pickle.dump(records, f)
                    else:
                        data_file_path.unlink()
            except Exception as e:
                logger.warning(f"Failed to process {data_file_path}: {e}")
                continue

        # Create snapshot
        schema = self.schema_evolution.get_current_schema()
        self.time_travel.create_snapshot(
            operation=SnapshotOperation.DELETE,
            schema_id=schema.schema_id if schema else "",
            manifest_list=[],
            summary={
                "action": "delete",
                "records_deleted": deleted_count,
                "files_modified": files_modified
            }
        )

        return {
            "records_deleted": deleted_count,
            "files_modified": files_modified
        }

    def query_at_time(
        self,
        namespace: str,
        table_name: str,
        timestamp: datetime,
        **read_kwargs
    ) -> List[Dict[str, Any]]:
        """Query table state at a specific timestamp."""
        snapshot = self.time_travel.query_at_timestamp(timestamp)
        if not snapshot:
            return []

        return self.read_table(
            namespace=namespace,
            table_name=table_name,
            snapshot_id=snapshot.snapshot_id,
            **read_kwargs
        )

    def rollback_table(self, namespace: str, table_name: str, snapshot_id: str) -> bool:
        """Rollback table to a specific snapshot."""
        return self.time_travel.rollback_to_snapshot(snapshot_id)

    def evolve_schema(
        self,
        namespace: str,
        table_name: str,
        action: SchemaAction,
        **kwargs
    ) -> TableSchema:
        """Evolve table schema."""
        if action == SchemaAction.ADD_COLUMN:
            column = ColumnSchema(
                name=kwargs["column_name"],
                data_type=kwargs["data_type"],
                nullable=kwargs.get("nullable", True)
            )
            return self.schema_evolution.add_column(column, kwargs.get("default_value"))
        elif action == SchemaAction.DROP_COLUMN:
            return self.schema_evolution.drop_column(kwargs["column_name"])
        elif action == SchemaAction.RENAME_COLUMN:
            return self.schema_evolution.rename_column(kwargs["old_name"], kwargs["new_name"])
        elif action == SchemaAction.CHANGE_TYPE:
            return self.schema_evolution.change_column_type(
                kwargs["column_name"],
                kwargs["new_type"]
            )
        else:
            raise ValueError(f"Unknown schema action: {action}")

    def run_compaction(self, namespace: Optional[str] = None, table_name: Optional[str] = None) -> Dict[str, Any]:
        """Run compaction manually."""
        if namespace and table_name:
            partition = f"{namespace}/{table_name}"
        else:
            partition = None

        return self.compaction_engine.run_compaction(partition)

    def get_table_stats(self, namespace: str, table_name: str) -> Dict[str, Any]:
        """Get comprehensive table statistics."""
        full_name = f"{namespace}.{table_name}"
        table_meta = self.catalog.get_table(namespace, table_name)

        if not table_meta:
            return {"error": f"Table {full_name} not found"}

        table_path = Path(table_meta["location"])

        total_files = 0
        total_bytes = 0
        total_records = 0
        partitions = set()

        if table_path.exists():
            for data_file_path in table_path.rglob("*.data"):
                total_files += 1
                total_bytes += data_file_path.stat().st_size

                partition = str(data_file_path.parent.relative_to(table_path))
                partitions.add(partition)

                try:
                    with gzip.open(data_file_path, 'rb') as f:
                        records = pickle.load(f)
                        total_records += len(records)
                except Exception:
                    pass

        # Get partition strategy stats
        partition_strategy = self.partition_strategies.get(full_name)
        partition_stats = partition_strategy.get_partition_stats() if partition_strategy else {}

        stats = {
            "table_name": full_name,
            "location": str(table_path),
            "total_files": total_files,
            "total_bytes": total_bytes,
            "total_bytes_human": f"{total_bytes / (1024 * 1024):.2f} MB",
            "total_records": total_records,
            "num_partitions": len(partitions),
            "partitions": list(partitions)[:100],  # Limit for display
            "partition_distribution": partition_stats,
            "schema_version": self.schema_evolution.get_current_schema().version if self.schema_evolution.get_current_schema() else 0,
            "snapshot_count": len(self.time_travel.snapshot_timeline)
        }

        # Cache stats
        self.catalog.update_statistics(namespace, table_name, stats)

        return stats

    def get_lake_status(self) -> Dict[str, Any]:
        """Get overall data lake status."""
        return {
            "lake_path": str(self.lake_path),
            "namespaces": self.catalog.list_namespaces(),
            "tables": self.catalog.list_tables(),
            "total_snapshots": len(self.time_travel.snapshot_timeline),
            "schema_versions": len(self.schema_evolution.schema_history),
            "compaction_stats": self.compaction_engine.get_compaction_stats(),
            "schema_evolution_log": self.schema_evolution.get_evolution_history()[-10:]
        }

    def shutdown(self) -> None:
        """Shutdown the data lake manager."""
        self.compaction_engine.stop_background_compaction()
        self._executor.shutdown(wait=True)
        logger.info("DataLakeManager shutdown complete")


# ============================================================================
# EXAMPLE USAGE
# ============================================================================

if __name__ == "__main__":
    # Example usage demonstrating all features
    print("=" * 60)
    print("AIVA Data Lake Architecture Demo")
    print("=" * 60)

    # Initialize data lake
    lake = DataLakeManager(
        lake_path="./aiva_data_lake",
        enable_compaction=False  # Disable for demo
    )

    # Define schema
    knowledge_schema = TableSchema(
        columns=[
            ColumnSchema("id", "string", nullable=False),
            ColumnSchema("content", "string"),
            ColumnSchema("category", "string"),
            ColumnSchema("confidence", "float"),
            ColumnSchema("created_at", "datetime"),
            ColumnSchema("metadata", "json")
        ],
        partition_columns=["category"],
        sort_columns=["created_at"],
        primary_key=["id"]
    )

    # Create table with time-based partitioning
    partition_strategy = PartitionStrategy(
        partition_type=PartitionType.COMPOSITE,
        partition_columns=["category", "created_at"]
    )

    try:
        lake.create_table(
            namespace="aiva",
            table_name="knowledge_base",
            schema=knowledge_schema,
            partition_strategy=partition_strategy,
            properties={"description": "Core knowledge storage for AIVA"}
        )
        print("Table created: aiva.knowledge_base")
    except ValueError as e:
        print(f"Table exists: {e}")

    # Write some records
    test_records = [
        {
            "id": str(uuid.uuid4()),
            "content": "AIVA operates through autonomous cognitive loops",
            "category": "architecture",
            "confidence": 0.95,
            "created_at": datetime.utcnow().isoformat(),
            "metadata": {"source": "design_doc", "version": "1.0"}
        },
        {
            "id": str(uuid.uuid4()),
            "content": "Triple-gate validation ensures AI response accuracy",
            "category": "validation",
            "confidence": 0.92,
            "created_at": datetime.utcnow().isoformat(),
            "metadata": {"source": "patent", "patent_id": "P001"}
        },
        {
            "id": str(uuid.uuid4()),
            "content": "Genesis system provides self-evolving capabilities",
            "category": "evolution",
            "confidence": 0.88,
            "created_at": datetime.utcnow().isoformat(),
            "metadata": {"source": "specification"}
        }
    ]

    write_result = lake.write_records("aiva", "knowledge_base", test_records)
    print(f"\nWrite result: {write_result}")

    # Read records
    records = lake.read_table("aiva", "knowledge_base")
    print(f"\nRead {len(records)} records")

    # Get table stats
    stats = lake.get_table_stats("aiva", "knowledge_base")
    print(f"\nTable stats:")
    print(f"  Total files: {stats['total_files']}")
    print(f"  Total records: {stats['total_records']}")
    print(f"  Total size: {stats['total_bytes_human']}")

    # Schema evolution - add a new column
    new_schema = lake.evolve_schema(
        "aiva", "knowledge_base",
        SchemaAction.ADD_COLUMN,
        column_name="embedding_vector",
        data_type="json",
        nullable=True,
        default_value=None
    )
    print(f"\nSchema evolved to version: {new_schema.version}")

    # Get snapshot history
    history = lake.time_travel.get_history(limit=5)
    print(f"\nSnapshot history ({len(history)} snapshots):")
    for snap in history[:3]:
        print(f"  - {snap['snapshot_id'][:8]}... at {snap['timestamp']} ({snap['operation']})")

    # Get lake status
    status = lake.get_lake_status()
    print(f"\nData Lake Status:")
    print(f"  Path: {status['lake_path']}")
    print(f"  Namespaces: {status['namespaces']}")
    print(f"  Tables: {status['tables']}")

    # Cleanup
    lake.shutdown()
    print("\nData lake shutdown complete")
