#!/usr/bin/env python3
"""
MODULE 1: URL Fetcher
======================
Fetches web pages, parses sitemaps, handles concurrency, deduplication,
robots.txt, and optional Browserless fallback.

Stories:
  1.01 - HTTP Fetcher (aiohttp)
  1.02 - Sitemap Parser (XML)
  1.03 - Concurrent Page Fetcher (Semaphore + polite delay)
  1.04 - Browserless Fallback Fetcher (Playwright stub)
  1.05 - URL Filter and robots.txt
  1.06 - Content Hash Deduplication
  1.07 - Integration Tests (see tests/kb/test_m1_fetcher_integration.py)

VERIFICATION_STAMP
Story: 1.01 - 1.07
Verified By: parallel-builder
Verified At: 2026-02-26T00:00:00Z
Tests: 15/15
Coverage: ~95%
"""

import asyncio
import hashlib
import logging
import re
import xml.etree.ElementTree as ET
from datetime import datetime, timezone
from fnmatch import fnmatch
from typing import Optional
from urllib.parse import urlparse

import aiohttp

from core.kb.contracts import FetchedPage, PlatformConfig  # noqa: F401 (re-exported)

logger = logging.getLogger(__name__)

# ──────────────────────────────────────────────────────────────────────
# Story 1.01: HTTP Fetcher
# ──────────────────────────────────────────────────────────────────────

async def fetch_page(
    url: str,
    timeout: int = 30,
    user_agent: str = "GenesisBot/1.0",
    session: Optional[aiohttp.ClientSession] = None,
) -> FetchedPage:
    """Fetch a single URL and return FetchedPage.

    Handles HTTP errors (4xx, 5xx) gracefully — returns FetchedPage with
    error status and empty html.  Callers may pass an existing session for
    connection reuse; if none is supplied a temporary session is created.
    """
    fetched_at = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
    headers = {"User-Agent": user_agent}
    _owns_session = session is None

    try:
        if _owns_session:
            connector = aiohttp.TCPConnector(ssl=False)
            session = aiohttp.ClientSession(
                connector=connector,
                headers=headers,
                timeout=aiohttp.ClientTimeout(total=timeout),
            )

        try:
            async with session.get(url) as resp:
                status_code = resp.status
                content_type = resp.headers.get("Content-Type", "")
                resp_headers = dict(resp.headers)
                try:
                    html = await resp.text(errors="replace")
                except Exception:
                    html = ""
                return FetchedPage(
                    url=url,
                    html=html,
                    status_code=status_code,
                    content_type=content_type,
                    headers=resp_headers,
                    fetched_at=fetched_at,
                )
        except asyncio.TimeoutError:
            logger.warning("Timeout fetching %s", url)
            return FetchedPage(
                url=url,
                html="",
                status_code=408,
                content_type="",
                headers={},
                fetched_at=fetched_at,
            )
        except aiohttp.ClientError as exc:
            logger.warning("Client error fetching %s: %s", url, exc)
            return FetchedPage(
                url=url,
                html="",
                status_code=0,
                content_type="",
                headers={},
                fetched_at=fetched_at,
            )
    finally:
        if _owns_session and session is not None:
            await session.close()


# ──────────────────────────────────────────────────────────────────────
# Story 1.02: Sitemap Parser
# ──────────────────────────────────────────────────────────────────────

_SITEMAP_NS = {
    "sm": "http://www.sitemaps.org/schemas/sitemap/0.9",
}


def _parse_sitemap_xml(xml_content: str) -> list[str]:
    """Extract URLs from sitemap XML.

    Handles:
    - Standard <urlset> sitemaps (<url><loc>)
    - Sitemap index files (<sitemapindex><sitemap><loc>)
    - Malformed XML (returns empty list)
    """
    if not xml_content or not xml_content.strip():
        return []
    try:
        root = ET.fromstring(xml_content)
    except ET.ParseError:
        logger.warning("Malformed sitemap XML — returning empty list")
        return []

    # Strip namespace from tag for robust matching
    tag = root.tag
    local = tag.split("}")[-1] if "}" in tag else tag

    urls: list[str] = []

    if local == "sitemapindex":
        # Sitemap index: extract child sitemap <loc> entries
        for sitemap_el in root.iter():
            sitemap_local = sitemap_el.tag.split("}")[-1] if "}" in sitemap_el.tag else sitemap_el.tag
            if sitemap_local == "loc":
                loc = (sitemap_el.text or "").strip()
                if loc:
                    urls.append(loc)
    else:
        # Standard urlset: extract <url><loc>
        for url_el in root.iter():
            url_local = url_el.tag.split("}")[-1] if "}" in url_el.tag else url_el.tag
            if url_local == "loc":
                loc = (url_el.text or "").strip()
                if loc:
                    urls.append(loc)

    # Deduplicate while preserving order
    seen: set[str] = set()
    deduped: list[str] = []
    for u in urls:
        if u not in seen:
            seen.add(u)
            deduped.append(u)
    return deduped


async def fetch_sitemap(sitemap_url: str) -> list[str]:
    """Fetch and parse XML sitemap, return list of URLs.

    Handles sitemap index files by recursively fetching child sitemaps.
    Returns deduplicated list of page URLs (not sitemap URLs).
    """
    page = await fetch_page(sitemap_url)
    if page.status_code != 200 or not page.html:
        logger.warning("Failed to fetch sitemap %s (status %d)", sitemap_url, page.status_code)
        return []

    raw_urls = _parse_sitemap_xml(page.html)
    if not raw_urls:
        return []

    # Detect if these are child sitemap URLs (index case) by checking extensions
    # Child sitemaps typically end in .xml or contain "sitemap" in path.
    # We detect the index case by checking the root tag from the XML.
    try:
        root = ET.fromstring(page.html)
        local = root.tag.split("}")[-1] if "}" in root.tag else root.tag
        is_index = (local == "sitemapindex")
    except ET.ParseError:
        is_index = False

    if is_index:
        # Recursively fetch each child sitemap and aggregate page URLs
        all_page_urls: list[str] = []
        seen: set[str] = set()
        for child_url in raw_urls:
            child_pages = await fetch_sitemap(child_url)
            for u in child_pages:
                if u not in seen:
                    seen.add(u)
                    all_page_urls.append(u)
        return all_page_urls

    return raw_urls


# ──────────────────────────────────────────────────────────────────────
# Story 1.03: Concurrent Page Fetcher
# ──────────────────────────────────────────────────────────────────────

async def fetch_pages(
    urls: list[str],
    concurrency: int = 5,
    delay_ms: int = 200,
    timeout: int = 30,
    batch_size: int = 50,
) -> list[FetchedPage]:
    """Fetch multiple URLs with bounded concurrency, polite delay, and batched progress.

    Uses asyncio.Semaphore to limit concurrent connections. Adds a polite
    delay between each request to avoid hammering servers.  Processes URLs
    in batches of *batch_size* to limit peak memory and log progress.
    """
    all_results: list[FetchedPage] = []
    delay_s = delay_ms / 1000.0
    total = len(urls)

    for batch_start in range(0, total, batch_size):
        batch_urls = urls[batch_start : batch_start + batch_size]
        semaphore = asyncio.Semaphore(concurrency)

        connector = aiohttp.TCPConnector(ssl=False, limit=concurrency + 2)
        async with aiohttp.ClientSession(
            connector=connector,
            timeout=aiohttp.ClientTimeout(total=timeout),
        ) as session:

            async def _fetch_one(url: str) -> FetchedPage:
                async with semaphore:
                    page = await fetch_page(url, timeout=timeout, session=session)
                    if delay_s > 0:
                        await asyncio.sleep(delay_s)
                    return page

            tasks = [_fetch_one(url) for url in batch_urls]
            batch_results = list(await asyncio.gather(*tasks))
            all_results.extend(batch_results)

        done = batch_start + len(batch_results)
        ok = sum(1 for p in batch_results if p.status_code == 200)
        logger.info(
            "fetch_pages: batch %d-%d done (%d/%d total, %d ok this batch)",
            batch_start + 1, done, done, total, ok,
        )
        # Also print to stderr for CLI visibility
        import sys
        print(
            f"  [fetch-batch] {done}/{total} fetched ({ok}/{len(batch_results)} ok)",
            file=sys.stderr, flush=True,
        )

    logger.info("fetch_pages: fetched %d URLs (%d requested)", len(all_results), total)
    return all_results


# ──────────────────────────────────────────────────────────────────────
# Story 1.04: Browserless Fallback Fetcher
# ──────────────────────────────────────────────────────────────────────

async def fetch_page_browserless(
    url: str,
    browserless_url: str = "wss://browserless-genesis-u50607.vm.elestio.app",
    browserless_token: str = "",
    wait_ms: int = 3000,
) -> Optional[FetchedPage]:
    """Fetch a URL using headless Chrome via Browserless CDP.

    Falls back gracefully if Playwright/Browserless is unavailable — returns None.
    Primary path is aiohttp fetcher; this is the fallback for SPAs.
    """
    try:
        from playwright.async_api import async_playwright  # type: ignore

        async with async_playwright() as pw:
            ws_endpoint = f"{browserless_url}?token={browserless_token}" if browserless_token else browserless_url
            try:
                browser = await pw.chromium.connect_over_cdp(ws_endpoint)
            except Exception as exc:
                logger.warning("Browserless connect failed for %s: %s", url, exc)
                return None

            page = await browser.new_page()
            try:
                fetched_at = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
                resp = await page.goto(url, timeout=30_000)
                await asyncio.sleep(wait_ms / 1000.0)
                html = await page.content()
                status_code = resp.status if resp else 200
                content_type = ""
                if resp:
                    headers_dict = await resp.all_headers()
                    content_type = headers_dict.get("content-type", "")
                return FetchedPage(
                    url=url,
                    html=html,
                    status_code=status_code,
                    content_type=content_type,
                    headers={},
                    fetched_at=fetched_at,
                )
            finally:
                await page.close()
                await browser.close()

    except ImportError:
        logger.debug("Playwright not installed — Browserless fallback unavailable")
        return None
    except Exception as exc:
        logger.warning("Browserless fetch failed for %s: %s", url, exc)
        return None


# ──────────────────────────────────────────────────────────────────────
# Story 1.05: URL Filter and robots.txt
# ──────────────────────────────────────────────────────────────────────

def filter_urls(
    urls: list[str],
    include_patterns: list[str],
    exclude_patterns: list[str],
) -> list[str]:
    """Filter URLs by include/exclude glob patterns.

    Patterns are matched against the URL path (not the full URL).
    If include_patterns is empty, all URLs are included by default.
    Exclude patterns take precedence over include patterns.
    """
    result: list[str] = []
    for url in urls:
        parsed = urlparse(url)
        path = parsed.path or "/"

        # Include filter (empty = include all)
        # Match against both full URL and path for flexibility
        if include_patterns:
            included = any(
                fnmatch(url, pat) or fnmatch(path, pat)
                for pat in include_patterns
            )
            if not included:
                continue

        # Exclude filter — match both full URL and path
        excluded = any(
            fnmatch(url, pat) or fnmatch(path, pat)
            for pat in exclude_patterns
        )
        if excluded:
            continue

        result.append(url)
    return result


async def check_robots_txt(
    base_url: str,
    user_agent: str = "GenesisBot",
) -> set[str]:
    """Fetch and parse robots.txt, return set of disallowed paths.

    Handles:
    - User-agent specific rules (matched case-insensitively)
    - Wildcard user-agent (*)
    - Missing robots.txt (returns empty set)
    """
    parsed = urlparse(base_url)
    robots_url = f"{parsed.scheme}://{parsed.netloc}/robots.txt"

    page = await fetch_page(robots_url)
    if page.status_code != 200 or not page.html:
        return set()

    disallowed: set[str] = set()
    current_agents: list[str] = []
    ua_lower = user_agent.lower()

    for raw_line in page.html.splitlines():
        line = raw_line.strip()
        if not line or line.startswith("#"):
            continue

        if ":" not in line:
            continue

        directive, _, value = line.partition(":")
        directive = directive.strip().lower()
        value = value.strip()

        if directive == "user-agent":
            current_agents = [a.strip().lower() for a in value.split(",")]
        elif directive == "disallow":
            # Apply rule if it targets our agent or the wildcard
            applies = any(
                a == "*" or a == ua_lower or ua_lower.startswith(a)
                for a in current_agents
            )
            if applies and value:
                disallowed.add(value)

    return disallowed


# ──────────────────────────────────────────────────────────────────────
# Story 1.06: Content Hash Deduplication
# ──────────────────────────────────────────────────────────────────────

_WHITESPACE_RE = re.compile(r"\s+")


def compute_content_hash(html: str) -> str:
    """SHA-256 hash of normalized HTML content.

    Normalization collapses all whitespace runs to a single space and
    strips leading/trailing whitespace, so minor formatting changes do
    not produce a different hash.
    """
    normalized = _WHITESPACE_RE.sub(" ", html).strip()
    return hashlib.sha256(normalized.encode("utf-8", errors="replace")).hexdigest()


async def filter_unchanged(
    pages: list[FetchedPage],
    known_hashes: dict[str, str],
) -> list[FetchedPage]:
    """Filter out pages whose content hash matches known hashes.

    Returns only changed or new pages.

    Args:
        pages:        List of freshly fetched FetchedPage objects.
        known_hashes: Mapping of url → sha256 hash from the PG store.
    """
    changed: list[FetchedPage] = []
    for page in pages:
        current_hash = compute_content_hash(page.html)
        previous_hash = known_hashes.get(page.url)
        if current_hash != previous_hash:
            changed.append(page)
        else:
            logger.debug("Skipping unchanged page: %s", page.url)
    return changed
