# -*- coding: utf-8 -*-
"""utils.gate_snapshot_validator — pre_merge_gate_snapshot.json validator.

task-2637 — real merge executor wiring 코드화 (activation false default · 실제 merge 실행 0).

Spec: memory/specs/system_real_merge_executor_wiring_spec_260523.md §4 + §7.2
sha256: bcaf654e981a43083af50879164021c918eeac9753cad3b3ad146209a1a62765

회장 verbatim (회장 10결정 #3):
    gate_snapshot TTL = 5분 확정.

11종 gate (spec §4 Table):
    1. ci_checks (11/11 success)
    2. gemini_review_gate (PASS, fresh head)
    3. phase3_merge_gate (PASS)
    4. unresolved_threads (value=0)
    5. mergeStateStatus (CLEAN/MERGEABLE)
    6. critical7_hits (value=0)
    7. blocking_secret (value=0; net_new_identifier_exposure=0)
    8. expected_files_exact (value=True)
    9. forbidden_path (value=0)
    10. admin_override_required (value=False)
    11. callback_lifecycle_artifact (value="normal" delivery_outcome 정상)

TTL 검증:
    snapshot.ts_kst + snapshot_ttl_seconds < now → STALE_SNAPSHOT
"""
from __future__ import annotations

from datetime import datetime, timedelta, timezone
from typing import Any, Dict, List, Optional, Tuple

GATE_SNAPSHOT_SCHEMA = "real_merge.pre_gate_snapshot.v1"
GATE_SNAPSHOT_TTL_SECONDS = 300  # 회장 결정 #3 — 5분 확정

# Required gate names in deterministic order (spec §4 Table).
REQUIRED_GATE_NAMES = (
    "ci_checks",
    "gemini_review_gate",
    "phase3_merge_gate",
    "unresolved_threads",
    "mergeStateStatus",
    "critical7_hits",
    "blocking_secret",
    "expected_files_exact",
    "forbidden_path",
    "admin_override_required",
    "callback_lifecycle_artifact",
)

# Result codes (mirrored into the real_merge_execute result_enum).
GATE_OK = "GATE_OK"
GATE_INVALID_SCHEMA = "GATE_INVALID_SCHEMA"
GATE_STALE = "GATE_STALE"
GATE_FAIL = "GATE_FAIL"

_KST = timezone(timedelta(hours=9))


def _parse_kst(text: str) -> Optional[datetime]:
    candidates = (
        "%Y-%m-%dT%H:%M:%S%z",
        "%Y-%m-%d %H:%M:%S%z",
        "%Y-%m-%dT%H:%M:%S",
        "%Y-%m-%d %H:%M:%S",
        "%Y-%m-%d %H:%M",
    )
    for fmt in candidates:
        try:
            dt = datetime.strptime(text, fmt)
            if dt.tzinfo is None:
                dt = dt.replace(tzinfo=_KST)
            return dt
        except ValueError:
            continue
    return None


def _now_kst(clock: Optional[Any]) -> datetime:
    if clock is None:
        return datetime.now(timezone.utc).astimezone(_KST)
    val = clock()
    if isinstance(val, datetime):
        return val if val.tzinfo else val.replace(tzinfo=_KST)
    parsed = _parse_kst(str(val))
    if parsed is None:
        raise ValueError(f"clock() returned unparsable value: {val!r}")
    return parsed


def _gate_passes(gate: Dict[str, Any]) -> Tuple[bool, str]:
    """Per-gate pass evaluator.

    Returns (passed, reason_if_fail). Each gate has a deterministic interpretation
    so the snapshot can be authored by any collector with the same semantics.
    """
    name = gate.get("name")
    if name == "ci_checks":
        if gate.get("conclusion") != "success":
            return False, f"ci_checks.conclusion={gate.get('conclusion')!r}"
        if gate.get("count") not in (None, "", "11/11"):
            return False, f"ci_checks.count={gate.get('count')!r} (expected '11/11')"
        return True, ""
    if name == "gemini_review_gate":
        return (
            gate.get("conclusion") == "PASS",
            f"gemini_review_gate.conclusion={gate.get('conclusion')!r}",
        )
    if name == "phase3_merge_gate":
        return (
            gate.get("conclusion") == "PASS",
            f"phase3_merge_gate.conclusion={gate.get('conclusion')!r}",
        )
    if name == "unresolved_threads":
        return (
            gate.get("value") == 0,
            f"unresolved_threads.value={gate.get('value')!r}",
        )
    if name == "mergeStateStatus":
        val = gate.get("value")
        if val not in ("CLEAN", "MERGEABLE"):
            return False, f"mergeStateStatus.value={val!r}"
        return True, ""
    if name == "critical7_hits":
        return (
            gate.get("value") == 0,
            f"critical7_hits.value={gate.get('value')!r}",
        )
    if name == "blocking_secret":
        if gate.get("value") != 0:
            return False, f"blocking_secret.value={gate.get('value')!r}"
        if gate.get("net_new_identifier_exposure") not in (0, None):
            return False, (
                f"blocking_secret.net_new_identifier_exposure="
                f"{gate.get('net_new_identifier_exposure')!r}"
            )
        return True, ""
    if name == "expected_files_exact":
        return (
            bool(gate.get("value")) is True,
            f"expected_files_exact.value={gate.get('value')!r}",
        )
    if name == "forbidden_path":
        return (
            gate.get("value") == 0,
            f"forbidden_path.value={gate.get('value')!r}",
        )
    if name == "admin_override_required":
        return (
            bool(gate.get("value")) is False,
            f"admin_override_required.value={gate.get('value')!r}",
        )
    if name == "callback_lifecycle_artifact":
        return (
            gate.get("value") == "normal",
            f"callback_lifecycle_artifact.value={gate.get('value')!r}",
        )
    return False, f"unknown gate name {name!r}"


def validate_gate_snapshot(
    snapshot: Any,
    *,
    clock: Optional[Any] = None,
    ttl_seconds: int = GATE_SNAPSHOT_TTL_SECONDS,
) -> Tuple[bool, str, List[str]]:
    """Validate a pre_merge_gate_snapshot.json payload.

    Returns:
        (all_pass, code, reasons) — ``code`` is one of GATE_* enums.

    Order of checks:
      1. dict + schema
      2. ts_kst parsable + within TTL
      3. 11 gate names present, no extras
      4. Each gate passes its deterministic interpretation
    """
    reasons: List[str] = []
    if not isinstance(snapshot, dict):
        return False, GATE_INVALID_SCHEMA, [
            f"snapshot must be dict, got {type(snapshot).__name__}"
        ]
    if snapshot.get("schema") != GATE_SNAPSHOT_SCHEMA:
        reasons.append(
            f"schema must be {GATE_SNAPSHOT_SCHEMA!r}, got {snapshot.get('schema')!r}"
        )

    ts_raw = snapshot.get("ts_kst")
    if not isinstance(ts_raw, str) or not ts_raw:
        reasons.append("ts_kst must be a non-empty ISO string")
    if reasons:
        return False, GATE_INVALID_SCHEMA, reasons

    ts_kst = _parse_kst(ts_raw) if isinstance(ts_raw, str) else None
    if ts_kst is None:
        return False, GATE_INVALID_SCHEMA, [f"ts_kst unparsable: {ts_raw!r}"]

    declared_ttl = snapshot.get("snapshot_ttl_seconds", ttl_seconds)
    try:
        declared_ttl_int = int(declared_ttl)
    except (TypeError, ValueError):
        return False, GATE_INVALID_SCHEMA, [
            f"snapshot_ttl_seconds must be int, got {declared_ttl!r}"
        ]
    effective_ttl = min(int(ttl_seconds), declared_ttl_int)
    now = _now_kst(clock)
    age_seconds = (now - ts_kst).total_seconds()
    if age_seconds > effective_ttl:
        return False, GATE_STALE, [
            f"snapshot age {age_seconds:.0f}s > ttl {effective_ttl}s "
            f"(ts_kst={ts_kst.isoformat()}, now={now.isoformat()})"
        ]

    gates = snapshot.get("gates")
    if not isinstance(gates, list):
        return False, GATE_INVALID_SCHEMA, [
            f"gates must be a list, got {type(gates).__name__}"
        ]
    observed_names = {g.get("name") for g in gates if isinstance(g, dict)}
    missing = [n for n in REQUIRED_GATE_NAMES if n not in observed_names]
    if missing:
        return False, GATE_INVALID_SCHEMA, [
            f"missing required gate names: {missing}"
        ]
    extras = sorted(
        str(n) for n in observed_names
        if n is not None and n not in REQUIRED_GATE_NAMES
    )
    if extras:
        return False, GATE_INVALID_SCHEMA, [
            f"unexpected gate names: {extras}"
        ]

    fail_reasons: List[str] = []
    for gate in gates:
        if not isinstance(gate, dict):
            fail_reasons.append(f"gate entry not a dict: {gate!r}")
            continue
        ok, reason = _gate_passes(gate)
        if not ok:
            fail_reasons.append(reason)

    if fail_reasons:
        return False, GATE_FAIL, fail_reasons

    declared_all_pass = snapshot.get("all_pass")
    if declared_all_pass is False:
        return False, GATE_FAIL, [
            "snapshot.all_pass is False despite every gate passing — "
            "trust authored all_pass=False (fail-closed)"
        ]
    return True, GATE_OK, []


def is_snapshot_fresh(
    snapshot: Any,
    *,
    clock: Optional[Any] = None,
    ttl_seconds: int = GATE_SNAPSHOT_TTL_SECONDS,
) -> bool:
    """Single-purpose freshness predicate (no gate evaluation)."""
    if not isinstance(snapshot, dict):
        return False
    ts_raw = snapshot.get("ts_kst")
    ts = _parse_kst(ts_raw) if isinstance(ts_raw, str) else None
    if ts is None:
        return False
    now = _now_kst(clock)
    return (now - ts).total_seconds() <= ttl_seconds
