#!/usr/bin/env python3
"""
GENESIS DISTRIBUTED LOCK MANAGER
=================================
Coordination primitives for distributed systems.

Features:
    - Distributed mutex locks
    - Read/Write locks
    - Semaphores
    - Lock timeouts and TTL
    - Deadlock detection
    - Lock fencing tokens
    - Async lock support

Usage:
    lock_mgr = LockManager()

    # Mutex lock
    with lock_mgr.lock("resource_1"):
        # Exclusive access
        process_resource()

    # Async lock
    async with lock_mgr.async_lock("resource_2"):
        await process_async()

    # Semaphore
    sem = lock_mgr.semaphore("pool", permits=5)
    with sem:
        use_pool_connection()
"""

import asyncio
import hashlib
import json
import os
import threading
import time
import uuid
from abc import ABC, abstractmethod
from collections import defaultdict
from contextlib import contextmanager, asynccontextmanager
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from pathlib import Path
from typing import Dict, List, Any, Optional, Callable, Set


class LockType(Enum):
    """Types of locks."""
    MUTEX = "mutex"           # Exclusive lock
    READ = "read"             # Shared read lock
    WRITE = "write"           # Exclusive write lock
    SEMAPHORE = "semaphore"   # Counting semaphore


class LockState(Enum):
    """Lock states."""
    FREE = "free"
    HELD = "held"
    WAITING = "waiting"


@dataclass
class LockInfo:
    """Information about a held lock."""
    name: str
    lock_type: LockType
    holder_id: str
    acquired_at: float
    expires_at: Optional[float] = None
    fencing_token: int = 0
    metadata: Dict[str, Any] = field(default_factory=dict)

    def is_expired(self) -> bool:
        if self.expires_at is None:
            return False
        return time.time() > self.expires_at


@dataclass
class WaitingRequest:
    """A waiting lock request."""
    request_id: str
    holder_id: str
    lock_type: LockType
    requested_at: float
    timeout: Optional[float] = None
    event: threading.Event = field(default_factory=threading.Event)

    def is_timed_out(self) -> bool:
        if self.timeout is None:
            return False
        return time.time() - self.requested_at > self.timeout


class LockBackend(ABC):
    """Abstract backend for lock storage."""

    @abstractmethod
    def acquire(
        self,
        name: str,
        holder_id: str,
        lock_type: LockType,
        ttl: float = None
    ) -> Optional[int]:
        """Acquire a lock. Returns fencing token on success."""
        pass

    @abstractmethod
    def release(self, name: str, holder_id: str, fencing_token: int = None) -> bool:
        """Release a lock."""
        pass

    @abstractmethod
    def get_info(self, name: str) -> Optional[LockInfo]:
        """Get lock info."""
        pass

    @abstractmethod
    def extend(self, name: str, holder_id: str, ttl: float) -> bool:
        """Extend lock TTL."""
        pass


class MemoryLockBackend(LockBackend):
    """In-memory lock backend."""

    def __init__(self):
        self._locks: Dict[str, LockInfo] = {}
        self._fencing_counter: Dict[str, int] = defaultdict(int)
        self._read_holders: Dict[str, Set[str]] = defaultdict(set)
        self._lock = threading.RLock()

    def acquire(
        self,
        name: str,
        holder_id: str,
        lock_type: LockType,
        ttl: float = None
    ) -> Optional[int]:
        with self._lock:
            # Clean up expired
            self._cleanup_expired(name)

            existing = self._locks.get(name)

            if lock_type == LockType.READ:
                # Read lock - allow multiple readers, no writers
                if existing and existing.lock_type == LockType.WRITE:
                    if not existing.is_expired():
                        return None

                self._read_holders[name].add(holder_id)
                self._fencing_counter[name] += 1
                token = self._fencing_counter[name]

                self._locks[name] = LockInfo(
                    name=name,
                    lock_type=LockType.READ,
                    holder_id=",".join(self._read_holders[name]),
                    acquired_at=time.time(),
                    expires_at=time.time() + ttl if ttl else None,
                    fencing_token=token
                )
                return token

            elif lock_type in (LockType.MUTEX, LockType.WRITE):
                # Exclusive lock
                if existing:
                    if not existing.is_expired():
                        if existing.holder_id != holder_id:
                            return None
                        # Re-entrant lock
                        return existing.fencing_token

                if name in self._read_holders and self._read_holders[name]:
                    # Has readers
                    return None

                self._fencing_counter[name] += 1
                token = self._fencing_counter[name]

                self._locks[name] = LockInfo(
                    name=name,
                    lock_type=lock_type,
                    holder_id=holder_id,
                    acquired_at=time.time(),
                    expires_at=time.time() + ttl if ttl else None,
                    fencing_token=token
                )
                return token

            elif lock_type == LockType.SEMAPHORE:
                # Handled separately by Semaphore class
                pass

            return None

    def release(self, name: str, holder_id: str, fencing_token: int = None) -> bool:
        with self._lock:
            existing = self._locks.get(name)

            if not existing:
                return True

            # Check if caller owns the lock
            if existing.lock_type == LockType.READ:
                if holder_id in self._read_holders[name]:
                    self._read_holders[name].discard(holder_id)
                    if not self._read_holders[name]:
                        del self._locks[name]
                    return True

            elif existing.holder_id == holder_id:
                if fencing_token and existing.fencing_token != fencing_token:
                    return False  # Stale token
                del self._locks[name]
                self._read_holders.pop(name, None)
                return True

            return False

    def get_info(self, name: str) -> Optional[LockInfo]:
        with self._lock:
            self._cleanup_expired(name)
            return self._locks.get(name)

    def extend(self, name: str, holder_id: str, ttl: float) -> bool:
        with self._lock:
            existing = self._locks.get(name)
            if existing and existing.holder_id == holder_id:
                existing.expires_at = time.time() + ttl
                return True
            return False

    def _cleanup_expired(self, name: str):
        existing = self._locks.get(name)
        if existing and existing.is_expired():
            del self._locks[name]
            self._read_holders.pop(name, None)


class FileLockBackend(LockBackend):
    """File-based lock backend for inter-process locking."""

    def __init__(self, lock_dir: Path):
        self.lock_dir = lock_dir
        self.lock_dir.mkdir(parents=True, exist_ok=True)
        self._process_id = os.getpid()

    def _lock_file(self, name: str) -> Path:
        safe_name = hashlib.md5(name.encode()).hexdigest()
        return self.lock_dir / f"{safe_name}.lock"

    def acquire(
        self,
        name: str,
        holder_id: str,
        lock_type: LockType,
        ttl: float = None
    ) -> Optional[int]:
        lock_file = self._lock_file(name)

        try:
            # Check existing lock
            if lock_file.exists():
                try:
                    with open(lock_file, 'r') as f:
                        data = json.load(f)

                    expires_at = data.get("expires_at")
                    if expires_at and time.time() > expires_at:
                        lock_file.unlink()  # Expired
                    elif data.get("holder_id") != holder_id:
                        return None
                    else:
                        return data.get("fencing_token", 1)

                except (json.JSONDecodeError, IOError):
                    lock_file.unlink(missing_ok=True)

            # Try to create lock
            fencing_token = int(time.time() * 1000000)

            data = {
                "name": name,
                "holder_id": holder_id,
                "lock_type": lock_type.value,
                "acquired_at": time.time(),
                "expires_at": time.time() + ttl if ttl else None,
                "fencing_token": fencing_token,
                "pid": self._process_id
            }

            # Atomic write
            temp_file = lock_file.with_suffix('.tmp')
            with open(temp_file, 'w') as f:
                json.dump(data, f)

            temp_file.rename(lock_file)
            return fencing_token

        except OSError:
            return None

    def release(self, name: str, holder_id: str, fencing_token: int = None) -> bool:
        lock_file = self._lock_file(name)

        try:
            if lock_file.exists():
                with open(lock_file, 'r') as f:
                    data = json.load(f)

                if data.get("holder_id") == holder_id:
                    lock_file.unlink()
                    return True

            return True

        except (json.JSONDecodeError, IOError, OSError):
            return False

    def get_info(self, name: str) -> Optional[LockInfo]:
        lock_file = self._lock_file(name)

        try:
            if lock_file.exists():
                with open(lock_file, 'r') as f:
                    data = json.load(f)

                return LockInfo(
                    name=data["name"],
                    lock_type=LockType(data["lock_type"]),
                    holder_id=data["holder_id"],
                    acquired_at=data["acquired_at"],
                    expires_at=data.get("expires_at"),
                    fencing_token=data.get("fencing_token", 0)
                )
        except (json.JSONDecodeError, IOError):
            pass

        return None

    def extend(self, name: str, holder_id: str, ttl: float) -> bool:
        lock_file = self._lock_file(name)

        try:
            if lock_file.exists():
                with open(lock_file, 'r') as f:
                    data = json.load(f)

                if data.get("holder_id") == holder_id:
                    data["expires_at"] = time.time() + ttl
                    with open(lock_file, 'w') as f:
                        json.dump(data, f)
                    return True

        except (json.JSONDecodeError, IOError):
            pass

        return False


class Lock:
    """A distributed lock handle."""

    def __init__(
        self,
        manager: 'LockManager',
        name: str,
        lock_type: LockType = LockType.MUTEX,
        timeout: float = None,
        ttl: float = None
    ):
        self.manager = manager
        self.name = name
        self.lock_type = lock_type
        self.timeout = timeout
        self.ttl = ttl

        self._holder_id = f"{os.getpid()}-{threading.get_ident()}-{uuid.uuid4().hex[:8]}"
        self._fencing_token: Optional[int] = None
        self._held = False

    def acquire(self, blocking: bool = True) -> bool:
        """Acquire the lock."""
        start_time = time.time()

        while True:
            token = self.manager.backend.acquire(
                self.name,
                self._holder_id,
                self.lock_type,
                self.ttl
            )

            if token is not None:
                self._fencing_token = token
                self._held = True
                return True

            if not blocking:
                return False

            if self.timeout and time.time() - start_time > self.timeout:
                return False

            time.sleep(0.01)  # Small sleep before retry

    def release(self) -> bool:
        """Release the lock."""
        if not self._held:
            return True

        success = self.manager.backend.release(
            self.name,
            self._holder_id,
            self._fencing_token
        )

        if success:
            self._held = False
            self._fencing_token = None

        return success

    def extend(self, ttl: float = None) -> bool:
        """Extend lock TTL."""
        if not self._held:
            return False

        return self.manager.backend.extend(
            self.name,
            self._holder_id,
            ttl or self.ttl or 60.0
        )

    @property
    def held(self) -> bool:
        return self._held

    @property
    def fencing_token(self) -> Optional[int]:
        return self._fencing_token

    def __enter__(self):
        if not self.acquire():
            raise TimeoutError(f"Failed to acquire lock: {self.name}")
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.release()
        return False


class AsyncLock:
    """An async distributed lock handle."""

    def __init__(
        self,
        manager: 'LockManager',
        name: str,
        timeout: float = None,
        ttl: float = None
    ):
        self.manager = manager
        self.name = name
        self.timeout = timeout
        self.ttl = ttl

        self._holder_id = f"{os.getpid()}-{id(asyncio.current_task())}-{uuid.uuid4().hex[:8]}"
        self._fencing_token: Optional[int] = None
        self._held = False

    async def acquire(self) -> bool:
        """Acquire the lock asynchronously."""
        start_time = time.time()

        while True:
            token = self.manager.backend.acquire(
                self.name,
                self._holder_id,
                LockType.MUTEX,
                self.ttl
            )

            if token is not None:
                self._fencing_token = token
                self._held = True
                return True

            if self.timeout and time.time() - start_time > self.timeout:
                return False

            await asyncio.sleep(0.01)

    async def release(self) -> bool:
        """Release the lock."""
        if not self._held:
            return True

        success = self.manager.backend.release(
            self.name,
            self._holder_id,
            self._fencing_token
        )

        if success:
            self._held = False

        return success

    async def __aenter__(self):
        if not await self.acquire():
            raise TimeoutError(f"Failed to acquire lock: {self.name}")
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await self.release()
        return False


class Semaphore:
    """A counting semaphore."""

    def __init__(
        self,
        manager: 'LockManager',
        name: str,
        permits: int,
        timeout: float = None
    ):
        self.manager = manager
        self.name = name
        self.permits = permits
        self.timeout = timeout

        self._holder_id = f"{os.getpid()}-{threading.get_ident()}-{uuid.uuid4().hex[:8]}"
        self._lock = threading.Lock()
        self._held = False

    def acquire(self, blocking: bool = True) -> bool:
        """Acquire a permit."""
        start_time = time.time()

        while True:
            with self._lock:
                # Get current count
                info = self.manager.backend.get_info(f"sem:{self.name}")
                current = 0 if not info else int(info.metadata.get("count", 0))

                if current < self.permits:
                    # Acquire by incrementing count
                    # Using a special lock to track semaphore
                    token = self.manager.backend.acquire(
                        f"sem:{self.name}:{self._holder_id}",
                        self._holder_id,
                        LockType.MUTEX,
                        ttl=None
                    )
                    if token:
                        self._held = True
                        return True

            if not blocking:
                return False

            if self.timeout and time.time() - start_time > self.timeout:
                return False

            time.sleep(0.01)

    def release(self) -> bool:
        """Release a permit."""
        if not self._held:
            return True

        success = self.manager.backend.release(
            f"sem:{self.name}:{self._holder_id}",
            self._holder_id
        )

        if success:
            self._held = False

        return success

    def __enter__(self):
        if not self.acquire():
            raise TimeoutError(f"Failed to acquire semaphore: {self.name}")
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.release()
        return False


class ReadWriteLock:
    """A read-write lock (multiple readers, single writer)."""

    def __init__(
        self,
        manager: 'LockManager',
        name: str,
        timeout: float = None,
        ttl: float = None
    ):
        self.manager = manager
        self.name = name
        self.timeout = timeout
        self.ttl = ttl

    @contextmanager
    def read_lock(self):
        """Acquire a read lock."""
        lock = Lock(
            self.manager, self.name,
            lock_type=LockType.READ,
            timeout=self.timeout,
            ttl=self.ttl
        )
        try:
            if not lock.acquire():
                raise TimeoutError(f"Failed to acquire read lock: {self.name}")
            yield lock
        finally:
            lock.release()

    @contextmanager
    def write_lock(self):
        """Acquire a write lock."""
        lock = Lock(
            self.manager, self.name,
            lock_type=LockType.WRITE,
            timeout=self.timeout,
            ttl=self.ttl
        )
        try:
            if not lock.acquire():
                raise TimeoutError(f"Failed to acquire write lock: {self.name}")
            yield lock
        finally:
            lock.release()


class LockManager:
    """
    Central lock management system.
    """

    def __init__(
        self,
        backend: LockBackend = None,
        default_timeout: float = None,
        default_ttl: float = 60.0
    ):
        self.backend = backend or MemoryLockBackend()
        self.default_timeout = default_timeout
        self.default_ttl = default_ttl

        self._active_locks: Dict[str, Lock] = {}
        self._stats = {
            "acquired": 0,
            "released": 0,
            "timeouts": 0,
            "contention": 0
        }
        self._lock = threading.RLock()

    def lock(
        self,
        name: str,
        timeout: float = None,
        ttl: float = None
    ) -> Lock:
        """Create a mutex lock."""
        return Lock(
            self, name,
            lock_type=LockType.MUTEX,
            timeout=timeout or self.default_timeout,
            ttl=ttl or self.default_ttl
        )

    def async_lock(
        self,
        name: str,
        timeout: float = None,
        ttl: float = None
    ) -> AsyncLock:
        """Create an async lock."""
        return AsyncLock(
            self, name,
            timeout=timeout or self.default_timeout,
            ttl=ttl or self.default_ttl
        )

    def rwlock(
        self,
        name: str,
        timeout: float = None,
        ttl: float = None
    ) -> ReadWriteLock:
        """Create a read-write lock."""
        return ReadWriteLock(
            self, name,
            timeout=timeout or self.default_timeout,
            ttl=ttl or self.default_ttl
        )

    def semaphore(
        self,
        name: str,
        permits: int,
        timeout: float = None
    ) -> Semaphore:
        """Create a semaphore."""
        return Semaphore(
            self, name, permits,
            timeout=timeout or self.default_timeout
        )

    def get_lock_info(self, name: str) -> Optional[LockInfo]:
        """Get information about a lock."""
        return self.backend.get_info(name)

    def is_locked(self, name: str) -> bool:
        """Check if a resource is locked."""
        info = self.backend.get_info(name)
        return info is not None and not info.is_expired()

    def force_release(self, name: str) -> bool:
        """Force release a lock (admin operation)."""
        info = self.backend.get_info(name)
        if info:
            return self.backend.release(name, info.holder_id)
        return True

    def get_stats(self) -> Dict:
        """Get lock manager statistics."""
        return self._stats.copy()


# Global lock manager
_manager: Optional[LockManager] = None


def get_lock_manager() -> LockManager:
    """Get global lock manager."""
    global _manager
    if _manager is None:
        _manager = LockManager()
    return _manager


def main():
    """CLI and demo for lock manager."""
    import argparse
    parser = argparse.ArgumentParser(description="Genesis Lock Manager")
    parser.add_argument("command", choices=["demo", "status"])
    args = parser.parse_args()

    if args.command == "demo":
        print("Lock Manager Demo")
        print("=" * 40)

        lm = LockManager()

        # Basic mutex
        print("\n1. Basic Mutex Lock:")
        with lm.lock("resource_1") as lock:
            print(f"  Acquired lock with token: {lock.fencing_token}")
            print(f"  Lock held: {lock.held}")

        print(f"  Lock released: {not lock.held}")

        # Lock contention
        print("\n2. Lock Contention:")
        lock1 = lm.lock("resource_2", timeout=0.5)
        lock2 = lm.lock("resource_2", timeout=0.5)

        lock1.acquire()
        print(f"  Lock1 acquired: {lock1.held}")

        result = lock2.acquire(blocking=False)
        print(f"  Lock2 immediate acquire: {result}")

        lock1.release()
        result = lock2.acquire()
        print(f"  Lock2 after release: {result}")
        lock2.release()

        # TTL-based lock
        print("\n3. TTL-based Lock:")
        with lm.lock("resource_3", ttl=2.0) as lock:
            info = lm.get_lock_info("resource_3")
            print(f"  Lock acquired, expires at: {info.expires_at}")
            lock.extend(5.0)
            info = lm.get_lock_info("resource_3")
            print(f"  After extension: {info.expires_at}")

        # Read-Write Lock
        print("\n4. Read-Write Lock:")
        rwl = lm.rwlock("document_1")

        with rwl.read_lock() as rl:
            print(f"  Read lock acquired: {rl.held}")

        with rwl.write_lock() as wl:
            print(f"  Write lock acquired: {wl.held}")

        # Semaphore
        print("\n5. Semaphore (3 permits):")
        sem = lm.semaphore("connection_pool", permits=3)

        acquired = []
        for i in range(5):
            s = lm.semaphore("connection_pool", permits=3, timeout=0.1)
            result = s.acquire(blocking=False)
            acquired.append(result)
            if result:
                print(f"  Permit {i+1}: acquired")
            else:
                print(f"  Permit {i+1}: denied (pool exhausted)")

        # Async lock demo
        print("\n6. Async Lock (simulated):")

        async def async_demo():
            async with lm.async_lock("async_resource") as lock:
                print(f"  Async lock acquired: token={lock._fencing_token}")
                await asyncio.sleep(0.1)
            print(f"  Async lock released")

        asyncio.run(async_demo())

        # File-based lock
        print("\n7. File-based Lock (inter-process):")
        file_backend = FileLockBackend(Path("/tmp/genesis_locks"))
        file_lm = LockManager(backend=file_backend)

        with file_lm.lock("shared_resource") as lock:
            print(f"  File lock acquired: {lock.fencing_token}")
            info = file_lm.get_lock_info("shared_resource")
            print(f"  Lock file info: holder={info.holder_id[:30]}...")

        # Lock info
        print("\n8. Lock Information:")
        with lm.lock("info_test") as lock:
            info = lm.get_lock_info("info_test")
            print(f"  Name: {info.name}")
            print(f"  Type: {info.lock_type.value}")
            print(f"  Token: {info.fencing_token}")
            print(f"  Acquired: {datetime.fromtimestamp(info.acquired_at).isoformat()}")

    elif args.command == "status":
        lm = get_lock_manager()
        print(json.dumps(lm.get_stats(), indent=2))


if __name__ == "__main__":
    main()
