# -*- coding: utf-8 -*-
"""utils.pr_watcher_terminal_state_classifier — PR watcher 5-enum classifier.

task-2673 — PR watcher terminal/callback fix implementation (RCA task-2670 follow-up).
chair_authorization_id: CHAIR-AUTH-PR-WATCHER-TERMINAL-CALLBACK-FIX-20260526-JJONGS-IMPLEMENT-001

본 모듈은 dev7 watcher.py (`/home/jay/.cokacdir/workspace/29C74592/watcher.py`) 에서
관측된 4 결함을 재발 방지하기 위한 순수함수 classifier + envelope helper +
ANU normal callback registrar 로 구성된다.

회장 verbatim 5 enum (terminal_states):
    - MERGE_READY                   — CI 11/11 + Gemini fresh + 0 unresolved + CLEAN
    - HOLD_FOR_CHAIR                — 자동수렴 불가 · 회장 판정 필요
                                       (head drift / fresh evidence + new unresolved /
                                        loop_boundary residual)
    - GEMINI_EXTERNAL_TRIGGER_STALE — OWNER nudge 후에도 fresh review 미도착
    - CI_FAILED_NON_REMEDIABLE      — Critical7 FAILURE 등 자동수렴 불가
    - LOOP_BOUNDARY                 — max_watch 도과 (residual 없을 때만)

수정 목표 6 (회장 verbatim · task md):
    1. fresh unresolved 발생 시 HOLD_FOR_CHAIR 조기 전환
    2. terminal_state 5 enum evaluation 보강
    3. ANU normal callback registrar 호출 보장
    4. max_watch_minutes 도달 전 HOLD 조건 보강
    5. LOOP_BOUNDARY elapsed-only 우선순위 보정
    6. regression 추가
"""
from __future__ import annotations

import os
import subprocess
import time
from dataclasses import dataclass, field
from datetime import datetime, timedelta, timezone
from typing import Any, Callable, Dict, List, Optional, Tuple

# 5-enum (회장 verbatim · task md L21)
MERGE_READY = "MERGE_READY"
HOLD_FOR_CHAIR = "HOLD_FOR_CHAIR"
GEMINI_EXTERNAL_TRIGGER_STALE = "GEMINI_EXTERNAL_TRIGGER_STALE"
CI_FAILED_NON_REMEDIABLE = "CI_FAILED_NON_REMEDIABLE"
LOOP_BOUNDARY = "LOOP_BOUNDARY"

TERMINAL_STATES: Tuple[str, ...] = (
    MERGE_READY,
    HOLD_FOR_CHAIR,
    GEMINI_EXTERNAL_TRIGGER_STALE,
    CI_FAILED_NON_REMEDIABLE,
    LOOP_BOUNDARY,
)

DEFAULT_CRITICAL_CHECKS: frozenset = frozenset({
    "cancel-kill-switch",
    "qc-check",
    "hidden-path-audit",
    "lock-in-check",
    "merge-safety-check",
    "ci/guard",
    "guard",
})

# spec system_ci_watch_handoff_policy_spec_260523.md §11
ENVELOPE_MAX_BYTES = 3900
ANU_KEY_DEFAULT = "c119085addb0f8b7"
ANU_CHAT_ID_DEFAULT = 6937032012
COKACDIR_BIN = os.environ.get("COKACDIR_BIN", "/usr/local/bin/cokacdir")
CALLBACK_DELAY_SEC = 30  # absolute timestamp now+30s (task-2661 Phase 2b)


@dataclass
class PRSnapshot:
    """PR watcher poll snapshot (test fixture parity)."""

    head_ref_oid: str = ""
    merge_state_status: str = ""
    review_decision: str = ""
    status_check_rollup: List[Dict[str, Any]] = field(default_factory=list)
    reviews: List[Dict[str, Any]] = field(default_factory=list)
    unresolved_thread_count: int = 0

    @classmethod
    def from_gh(cls, pr_data: Dict[str, Any], th_data: Dict[str, Any]) -> "PRSnapshot":
        data = (th_data or {}).get("data") or {}
        repo = data.get("repository") or {}
        pr = repo.get("pullRequest") or {}
        threads = pr.get("reviewThreads") or {}
        nodes = threads.get("nodes") or []
        unresolved = sum(1 for n in nodes if n and not n.get("isResolved"))
        return cls(
            head_ref_oid=(pr_data or {}).get("headRefOid") or "",
            merge_state_status=(pr_data or {}).get("mergeStateStatus") or "",
            review_decision=(pr_data or {}).get("reviewDecision") or "",
            status_check_rollup=(pr_data or {}).get("statusCheckRollup") or [],
            reviews=(pr_data or {}).get("reviews") or [],
            unresolved_thread_count=unresolved,
        )


def latest_gemini_review(reviews: Optional[List[Dict[str, Any]]]) -> Optional[Dict[str, Any]]:
    if not reviews:
        return None
    gemini = [
        r for r in reviews
        if r and (r.get("author") or {}).get("login") == "gemini-code-assist"
    ]
    if not gemini:
        return None
    return max(gemini, key=lambda r: r.get("submittedAt") or "")


def _is_gemini_fresh(snap: PRSnapshot, expected_head: str) -> bool:
    lg = latest_gemini_review(snap.reviews)
    if not lg:
        return False
    return (lg.get("commit") or {}).get("oid", "") == expected_head


def classify(
    snap: PRSnapshot,
    *,
    elapsed_watcher_sec: int,
    expected_head: str,
    max_watch_seconds: int = 60 * 60,
    head_committed_utc: Optional[int] = None,
    gemini_stale_threshold_sec: int = 60 * 60,
    critical_checks: frozenset = DEFAULT_CRITICAL_CHECKS,
    now_utc_sec: Optional[int] = None,
) -> Tuple[str, str]:
    """5-enum decision tree (task md 수정 목표 1·2·4·5).

    우선순위:
      1) CI_FAILED_NON_REMEDIABLE (Critical7 FAILURE)
      2) HOLD_FOR_CHAIR — head drift                       (목표 2 · spec admin override)
      3) HOLD_FOR_CHAIR — fresh evidence + new unresolved  (목표 1 · 4 · 사고 박제)
      4) MERGE_READY                                       (목표 2)
      5) GEMINI_EXTERNAL_TRIGGER_STALE                     (목표 2)
      6) HOLD_FOR_CHAIR — loop_boundary residual           (목표 5)
      7) LOOP_BOUNDARY (residual 없을 때만)
      8) None / continue
    """
    check_map: Dict[str, Dict[str, Any]] = {
        c.get("name", ""): c for c in snap.status_check_rollup if c
    }

    critical_failures = [
        n for n in critical_checks
        if (check_map.get(n) or {}).get("conclusion", "") == "FAILURE"
    ]
    if critical_failures:
        return (
            CI_FAILED_NON_REMEDIABLE,
            f"critical_failure: {sorted(critical_failures)}",
        )

    if snap.head_ref_oid and snap.head_ref_oid != expected_head:
        return (
            HOLD_FOR_CHAIR,
            f"head_drift to {snap.head_ref_oid} "
            f"(expected {expected_head}; spec §5 admin override)",
        )

    gemini_fresh = _is_gemini_fresh(snap, expected_head)
    unresolved = snap.unresolved_thread_count
    mss = snap.merge_state_status

    # ★ 목표 1 · 4 — fresh evidence + new unresolved + BLOCKED → HOLD_FOR_CHAIR 조기 전환
    #   (사고 박제: dev7 watcher poll #12 → silent fall-through 재발 방지)
    if gemini_fresh and unresolved > 0 and mss == "BLOCKED":
        return (
            HOLD_FOR_CHAIR,
            f"fresh_gemini_head_match + unresolved={unresolved} + mss=BLOCKED "
            f"(자동수렴 불가 · 회장 판정 필요)",
        )

    all_checks_ok = bool(snap.status_check_rollup) and all(
        c and c.get("conclusion", "") == "SUCCESS" for c in snap.status_check_rollup
    )
    if (
        mss == "CLEAN"
        and snap.review_decision in ("APPROVED", "")
        and all_checks_ok
        and gemini_fresh
        and unresolved == 0
    ):
        return (
            MERGE_READY,
            "mss=CLEAN + all_checks SUCCESS + gemini_fresh + 0 unresolved",
        )

    if head_committed_utc is not None:
        now = now_utc_sec if now_utc_sec is not None else int(time.time())
        elapsed_since_head = now - head_committed_utc
        if elapsed_since_head > gemini_stale_threshold_sec and not gemini_fresh:
            return (
                GEMINI_EXTERNAL_TRIGGER_STALE,
                f"head committed {elapsed_since_head}s ago, "
                f"fresh gemini review missing",
            )

    # ★ 목표 5 — LOOP_BOUNDARY elapsed-only 우선순위 보정.
    #   max_watch 도달했더라도 unresolved/BLOCKED 잔재가 있으면 HOLD_FOR_CHAIR 격상.
    if elapsed_watcher_sec >= max_watch_seconds:
        if unresolved > 0 or mss == "BLOCKED":
            return (
                HOLD_FOR_CHAIR,
                f"loop_boundary_with_residual: "
                f"unresolved={unresolved} mss={mss} "
                f"elapsed={elapsed_watcher_sec}s "
                f"(자동수렴 불가 · 회장 판정 필요)",
            )
        return (
            LOOP_BOUNDARY,
            f"watcher elapsed {elapsed_watcher_sec}s "
            f"(no residual unresolved · no BLOCKED)",
        )

    return "", "continue"


def build_callback_envelope(
    *,
    task_id: str,
    pr_number: int,
    terminal_state: str,
    reason: str,
    polls_completed: int,
    elapsed_sec: int,
    last_snapshot: Optional[Dict[str, Any]] = None,
    extras: Optional[Dict[str, Any]] = None,
) -> str:
    """ANU normal callback envelope (UTF-8 ≤ ENVELOPE_MAX_BYTES).

    body 는 ANU 가 consolidated report 를 작성할 때 필요한 최소 evidence 만 박제한다.
    """
    if terminal_state not in TERMINAL_STATES:
        raise ValueError(f"invalid terminal_state: {terminal_state!r}")

    last = last_snapshot or {}
    head_match = bool(last.get("head_match_expected", False))
    mss = last.get("mergeStateStatus", "")
    unresolved = int(last.get("unresolved_thread_count", 0) or 0)
    lg = last.get("latest_gemini_review") or {}
    lg_at = lg.get("submittedAt", "-") if isinstance(lg, dict) else "-"

    lines = [
        f"[task-{task_id} PR #{pr_number} watcher terminal]",
        f"terminal_state: {terminal_state}",
        f"reason: {reason}",
        f"polls: {polls_completed}  elapsed: {elapsed_sec}s",
        f"head_match: {head_match}",
        f"mss: {mss}",
        f"unresolved: {unresolved}",
        f"latest_gemini: {lg_at}",
        "action: ANU consolidated report (handoff policy §6 step 5)",
    ]
    if extras:
        for k in sorted(extras.keys()):
            lines.append(f"{k}: {extras[k]}")

    body = "\n".join(lines)
    encoded = body.encode("utf-8")
    if len(encoded) > ENVELOPE_MAX_BYTES:
        # 줄 단위로 잘라 boundary preserve (UTF-8 invariant)
        out_lines: List[str] = []
        running = 0
        for ln in lines:
            ln_bytes = len(ln.encode("utf-8")) + 1
            if running + ln_bytes > ENVELOPE_MAX_BYTES:
                break
            out_lines.append(ln)
            running += ln_bytes
        body = "\n".join(out_lines)
        # 첫 헤더 라인 자체가 ENVELOPE_MAX_BYTES 초과 시 char 단위 fallback (★ 최소 식별 정보 보존)
        if not out_lines and lines:
            header = lines[0]
            truncated = header.encode("utf-8")[:ENVELOPE_MAX_BYTES]
            body = truncated.decode("utf-8", errors="ignore")
    return body


def _absolute_fire_at(delay_seconds: int = CALLBACK_DELAY_SEC) -> str:
    """spec §11 + task-2661 Phase 2b absolute timestamp policy."""
    fire = datetime.now(timezone.utc) + timedelta(seconds=delay_seconds)
    return fire.strftime("%Y-%m-%d %H:%M:%S")


@dataclass
class CallbackRegisterResult:
    fired: bool
    skipped_reason: str = ""
    envelope_bytes: int = 0
    fire_at: str = ""
    returncode: Optional[int] = None
    stdout: str = ""
    stderr: str = ""


def register_terminal_callback(
    *,
    envelope: str,
    anu_key: Optional[str] = None,
    chat_id: Optional[int] = None,
    delay_seconds: int = CALLBACK_DELAY_SEC,
    cokacdir_bin: str = COKACDIR_BIN,
    runner: Optional[Callable[..., subprocess.CompletedProcess]] = None,
    timeout_sec: int = 30,
) -> CallbackRegisterResult:
    """ANU normal callback registrar (수정 목표 3 · spec §6 step 4).

    - envelope UTF-8 ≤ ENVELOPE_MAX_BYTES (invariant)
    - absolute timestamp now+delay_seconds (★ relative `30s` 형식 금지)
    - ANU key 단일출처 (env ANU_KEY → fallback ANU_KEY_DEFAULT; self-key 차단)
    - subprocess timeout 30s · retry 0
    - 호출 실패는 silent (watcher 본체 정상 종료 보장)
    """
    encoded = envelope.encode("utf-8")
    if not envelope:
        return CallbackRegisterResult(
            fired=False, skipped_reason="empty_envelope", envelope_bytes=0
        )
    if len(encoded) > ENVELOPE_MAX_BYTES:
        return CallbackRegisterResult(
            fired=False,
            skipped_reason=f"envelope_oversize: {len(encoded)} > {ENVELOPE_MAX_BYTES}",
            envelope_bytes=len(encoded),
        )

    key = anu_key or os.environ.get("ANU_KEY") or ANU_KEY_DEFAULT
    if chat_id is not None:
        chat = chat_id
    else:
        # 환경변수 ANU_CHAT_ID 가 비숫자일 경우 ValueError fallback to default
        raw_chat = os.environ.get("ANU_CHAT_ID", "") or ""
        try:
            chat = int(raw_chat) if raw_chat else ANU_CHAT_ID_DEFAULT
        except ValueError:
            chat = ANU_CHAT_ID_DEFAULT
    if not key:
        return CallbackRegisterResult(
            fired=False,
            skipped_reason="anu_key_missing",
            envelope_bytes=len(encoded),
        )

    fire_at = _absolute_fire_at(delay_seconds)
    cmd = [
        cokacdir_bin,
        "--cron",
        envelope,
        "--at",
        fire_at,
        "--chat",
        str(chat),
        "--key",
        key,
        "--once",
    ]
    run = runner or subprocess.run
    try:
        proc = run(cmd, capture_output=True, text=True, timeout=timeout_sec, check=False)
    except Exception as exc:  # subprocess.TimeoutExpired 등 silent
        return CallbackRegisterResult(
            fired=False,
            skipped_reason=f"subprocess_exception: {exc.__class__.__name__}: {exc}",
            envelope_bytes=len(encoded),
            fire_at=fire_at,
        )

    return CallbackRegisterResult(
        fired=(getattr(proc, "returncode", 1) == 0),
        skipped_reason="" if getattr(proc, "returncode", 1) == 0 else "non_zero_exit",
        envelope_bytes=len(encoded),
        fire_at=fire_at,
        returncode=getattr(proc, "returncode", None),
        stdout=getattr(proc, "stdout", "") or "",
        stderr=getattr(proc, "stderr", "") or "",
    )


__all__ = [
    "MERGE_READY",
    "HOLD_FOR_CHAIR",
    "GEMINI_EXTERNAL_TRIGGER_STALE",
    "CI_FAILED_NON_REMEDIABLE",
    "LOOP_BOUNDARY",
    "TERMINAL_STATES",
    "DEFAULT_CRITICAL_CHECKS",
    "ENVELOPE_MAX_BYTES",
    "ANU_KEY_DEFAULT",
    "ANU_CHAT_ID_DEFAULT",
    "COKACDIR_BIN",
    "CALLBACK_DELAY_SEC",
    "PRSnapshot",
    "CallbackRegisterResult",
    "latest_gemini_review",
    "classify",
    "build_callback_envelope",
    "register_terminal_callback",
]
