"""anu_v2.owner_gemini_trigger_router_audit — task-2641 신규 audit log.

회장 verbatim 12 (2026-05-23) 1:1 박제 — OWNER_GEMINI_TRIGGER_UI_FALLBACK_MISROUTE
재발 방지 router audit.

본 모듈 책임 (spec §3.3, task md §재발 방지):
  - 신규 audit JSONL schema v1: ``anu_v2.owner_gemini_trigger_router_audit.v1``
  - router lifecycle 한 행 = 한 record (freshness 평가 → nudge 결과 → final state)
  - 회장 verbatim §6 raw token 출력 금지: token_hash_prefix (12 hex char) 만
  - 회장 verbatim §8 403 발생 시 X-Accepted-GitHub-Permissions / X-Accepted-OAuth-Scopes /
    X-OAuth-Scopes / X-RateLimit-Remaining record (raw token redact)
  - dedupe 보조 helper: nudge_count_for_pr_head — owner_trigger_only validate_decision
    의 ``nudge_count_for_pr_head=0`` 검사를 router 진입 시 미리 컴퓨트

one-way isolation: anu_v2/ 외부 import 금지. live cokacdir / gh / merge 호출 0.
"""

from __future__ import annotations

import fcntl
import hashlib
import json
import os
import re
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Final, Iterator


AUDIT_REL_PATH: Final[str] = "memory/events/owner-gemini-trigger-router-audit.jsonl"

AUDIT_SCHEMA: Final[str] = "anu_v2.owner_gemini_trigger_router_audit.v1"

# router final state enum (spec §3.2 RouterResult)
STATE_FRESH: Final[str] = "FRESH"
STATE_NUDGE_POSTED: Final[str] = "NUDGE_POSTED"
STATE_NUDGE_DEDUPED: Final[str] = "NUDGE_DEDUPED"
STATE_NUDGE_FAILED: Final[str] = "NUDGE_FAILED"
STATE_GEMINI_EXTERNAL_TRIGGER_STALE: Final[str] = "GEMINI_EXTERNAL_TRIGGER_STALE"
STATE_CHAIR_UI_FALLBACK_REQUIRED: Final[str] = "CHAIR_UI_FALLBACK_REQUIRED"
STATE_NUDGE_PERMISSION_DENIED: Final[str] = "NUDGE_PERMISSION_DENIED"
STATE_NOT_GEMINI_TRIGGER: Final[str] = "NOT_GEMINI_TRIGGER"

ALL_STATES: Final[frozenset[str]] = frozenset(
    {
        STATE_FRESH,
        STATE_NUDGE_POSTED,
        STATE_NUDGE_DEDUPED,
        STATE_NUDGE_FAILED,
        STATE_GEMINI_EXTERNAL_TRIGGER_STALE,
        STATE_CHAIR_UI_FALLBACK_REQUIRED,
        STATE_NUDGE_PERMISSION_DENIED,
        STATE_NOT_GEMINI_TRIGGER,
    }
)

# 회장 verbatim §8: 403 발생 시 기록 의무 header 4종 (spec §2.7)
RECORDED_403_HEADERS: Final[tuple[str, ...]] = (
    "x-accepted-github-permissions",
    "x-accepted-oauth-scopes",
    "x-oauth-scopes",
    "x-ratelimit-remaining",
)

# 회장 verbatim §6 raw token 출력 금지. record key 화이트리스트.
ALLOWED_AUDIT_KEYS: Final[frozenset[str]] = frozenset(
    {
        "schema",
        "ts_utc",
        "task_id",
        "pr_number",
        "current_head_sha",
        "freshness_state",
        "gemini_commit_id_observed",
        "nudge_attempted",
        "nudge_result",
        "permission_header_diagnostics",
        "token_present",
        "token_hash_prefix",
        "token_value_logged",
        "final_state",
        "reason",
    }
)


class RouterAuditRedactionError(RuntimeError):
    """audit record 에 raw token / Authorization / api_key 누출 의심."""


# 회장 verbatim §6 + spec §3.3 redaction:
# token / Authorization / api_key 등 sentinel 키/값이 record 에 포함되면 redact.
_REDACT_KEY_RE: Final[re.Pattern[str]] = re.compile(
    r"(?i)(token|authorization|api[_-]?key|secret|password)"
)

_REDACT_VALUE_SENTINELS: Final[tuple[str, ...]] = (
    "Bearer ",
    "gh" + "p_",
    "github" + "_pat_",
    "gh" + "u_",
    "gh" + "s_",
    "gh" + "r_",
)

REDACTED_PLACEHOLDER: Final[str] = "<redacted>"


def _redact(data: Any, *, _depth: int = 0) -> Any:
    """dict/list/tuple/str 재귀 redact (token / Authorization / api_key)."""
    if _depth >= 8:
        return REDACTED_PLACEHOLDER
    if data is None or isinstance(data, (bool, int, float)):
        return data
    if isinstance(data, str):
        lower = data.lower()
        for sentinel in _REDACT_VALUE_SENTINELS:
            if sentinel.lower() in lower:
                return REDACTED_PLACEHOLDER
        return data
    if isinstance(data, dict):
        result: dict = {}
        for key, value in data.items():
            key_str = str(key)
            if _REDACT_KEY_RE.search(key_str):
                result[key_str] = REDACTED_PLACEHOLDER
            else:
                result[key_str] = _redact(value, _depth=_depth + 1)
        return result
    if isinstance(data, list):
        return [_redact(v, _depth=_depth + 1) for v in data]
    if isinstance(data, tuple):
        return tuple(_redact(v, _depth=_depth + 1) for v in data)
    try:
        text = str(data)
    except Exception:  # noqa: BLE001 — defense fail-closed
        return REDACTED_PLACEHOLDER
    text_lower = text.lower()
    for sentinel in _REDACT_VALUE_SENTINELS:
        if sentinel.lower() in text_lower:
            return REDACTED_PLACEHOLDER
    return text


def redact_diagnostics(data: Any) -> Any:
    """public alias — router 코드가 명시적으로 호출."""
    return _redact(data)


def extract_403_headers(response_headers: Any) -> dict:
    """403 응답 header dict 에서 RECORDED_403_HEADERS 만 추출 (case-insensitive).

    회장 verbatim §8 + spec §2.7 1:1: ``X-Accepted-GitHub-Permissions`` /
    ``X-Accepted-OAuth-Scopes`` / ``X-OAuth-Scopes`` / ``X-RateLimit-Remaining`` 만
    화이트리스트로 캡처. token / Authorization / api_key 절대 0.
    """
    if not isinstance(response_headers, dict):
        return {}
    lowered = {str(k).lower(): v for k, v in response_headers.items()}
    out: dict = {}
    for header_key in RECORDED_403_HEADERS:
        if header_key in lowered:
            out[header_key] = lowered[header_key]
    # defense in depth — redact any sentinel that slipped in
    redacted = _redact(out)
    if not isinstance(redacted, dict):
        return {}
    return redacted


def token_hash_prefix(token: str, *, length: int = 12) -> str:
    """token → SHA256 16진수 앞 ``length`` 자리. spec §3.3 1:1 (12 hex chars)."""
    if not isinstance(token, str) or not token:
        raise ValueError("token must be a non-empty string")
    digest = hashlib.sha256(token.encode("utf-8")).hexdigest()
    return digest[:length]


def _now_iso() -> str:
    return datetime.now(timezone.utc).isoformat(timespec="seconds")


def _ensure_no_secret_leak(record: dict) -> None:
    """record 에 raw token / Authorization 값 누출 차단 (spec §3.3)."""
    extra = set(record.keys()) - ALLOWED_AUDIT_KEYS
    if extra:
        raise RouterAuditRedactionError(
            f"disallowed audit keys: {sorted(extra)}"
        )
    if record.get("token_value_logged") is not False:
        raise RouterAuditRedactionError("token_value_logged must be False")
    serialised = json.dumps(record, ensure_ascii=False)
    sentinels = ("Bearer ", "ghp_", "github_pat_", "ghu_", "ghs_", "ghr_")
    for s in sentinels:
        if s in serialised:
            raise RouterAuditRedactionError(
                f"audit record contains token sentinel {s!r}"
            )


class OwnerGeminiTriggerRouterAudit:
    """append-only audit JSONL for router lifecycle.

    한 router.route_for_pr 호출 = 한 record. atomic via ``fcntl.flock(LOCK_EX)``.
    """

    def __init__(self, workspace_root: str | Path) -> None:
        root = Path(workspace_root).resolve()
        self._workspace_root = root
        self._path = root / AUDIT_REL_PATH

    @property
    def path(self) -> Path:
        return self._path

    def _ensure_parent(self) -> None:
        self._path.parent.mkdir(parents=True, exist_ok=True)

    def _iter_rows_forward(self) -> Iterator[dict]:
        """Stream audit records line-by-line (전체 파일 메모리 로드 0).

        Gemini PR #144 HIGH (memory exhaustion + cross-PR drift) 자동수렴:
        전체 audit 정확 스캔 — 다른 PR 활동 누적되어도 §9 hard limit 정확성 유지.
        """
        if not self._path.exists():
            return
        try:
            with open(self._path, "r", encoding="utf-8") as fh:
                for line in fh:
                    if not line.strip():
                        continue
                    try:
                        yield json.loads(line)
                    except json.JSONDecodeError:
                        continue
        except FileNotFoundError:
            return

    def nudge_count_for_pr_head(self, *, pr_number: int, head: str) -> int:
        """기존 audit record 에서 (pr, head) 에 대한 nudge_attempted=True 카운트.

        owner_trigger_only.validate_decision 의 ``nudge_count_for_pr_head=0`` 정합
        체크에 사용. spec §2.8: PR/head 당 1회 hard limit.

        전체 audit JSONL 을 line-by-line streaming 으로 스캔. cross-PR 활동 누적과
        무관하게 (pr, head) 매칭 record 전부 카운트 (Gemini PR #144 HIGH 자동수렴).
        """
        head_norm = head.lower() if isinstance(head, str) else ""
        count = 0
        for row in self._iter_rows_forward():
            if (
                row.get("pr_number") == pr_number
                and isinstance(row.get("current_head_sha"), str)
                and row["current_head_sha"].lower() == head_norm
                and row.get("nudge_attempted") is True
            ):
                count += 1
        return count

    def append(self, record: dict) -> None:
        """단일 record append. token 누출 가드 + atomic flock 보호."""
        rec = dict(record)
        rec.setdefault("schema", AUDIT_SCHEMA)
        rec.setdefault("ts_utc", _now_iso())
        rec.setdefault("token_value_logged", False)
        # nudge_attempted 미지정 시 False 기본 (spec §2.8 dedupe 계산 안정성)
        rec.setdefault("nudge_attempted", False)
        # permission_header_diagnostics 미지정 시 None (spec §2.7)
        rec.setdefault("permission_header_diagnostics", None)
        # final_state 누락 시 record-level skema 위반
        if rec.get("final_state") not in ALL_STATES:
            raise RouterAuditRedactionError(
                f"final_state must be one of {sorted(ALL_STATES)}, got "
                f"{rec.get('final_state')!r}"
            )
        _ensure_no_secret_leak(rec)
        # head 정규화 (40-char hex → lower)
        head_val = rec.get("current_head_sha")
        if isinstance(head_val, str) and len(head_val) == 40:
            rec["current_head_sha"] = head_val.lower()

        self._ensure_parent()
        with open(self._path, "a", encoding="utf-8") as fh:
            fcntl.flock(fh.fileno(), fcntl.LOCK_EX)
            try:
                fh.write(
                    json.dumps(rec, ensure_ascii=False, sort_keys=True) + "\n"
                )
                fh.flush()
                os.fsync(fh.fileno())
            finally:
                fcntl.flock(fh.fileno(), fcntl.LOCK_UN)


__all__ = [
    "AUDIT_REL_PATH",
    "AUDIT_SCHEMA",
    "STATE_FRESH",
    "STATE_NUDGE_POSTED",
    "STATE_NUDGE_DEDUPED",
    "STATE_NUDGE_FAILED",
    "STATE_GEMINI_EXTERNAL_TRIGGER_STALE",
    "STATE_CHAIR_UI_FALLBACK_REQUIRED",
    "STATE_NUDGE_PERMISSION_DENIED",
    "STATE_NOT_GEMINI_TRIGGER",
    "ALL_STATES",
    "RECORDED_403_HEADERS",
    "ALLOWED_AUDIT_KEYS",
    "REDACTED_PLACEHOLDER",
    "RouterAuditRedactionError",
    "extract_403_headers",
    "redact_diagnostics",
    "token_hash_prefix",
    "OwnerGeminiTriggerRouterAudit",
]
