"""anu_v2.pr_open_gemini_trigger_prevention — ANU v2 PR open Gemini trigger miss 예방 + 조기 분류 (task-2544).

회장 §명시 (2026-05-10):
  목표: PR open 시점에서 Gemini trigger miss를 예방하고 조기 분류한다.
  사고 배경: task-2537 PR #86 — Gemini Code Assist 미수신 → 25분 polling 재발 → 회장 직접 kill 2회.

설계 원칙:
  - one-way isolation: anu_v2/ 외부 import 금지. 외부 모듈(utils/dispatch/scripts/dashboard)은
    anu_v2를 import 가능하나, anu_v2 내부에서 이들을 직접 import 하는 것은 엄격히 금지.
  - token raw 0: raw 토큰/secret 값 (Personal Access Token, Bot Token 등) 어떤 형태로도
    (string literal, env read, print 등) 본 모듈 내 노출 금지.
  - chat 격리: chat_id=6937032012 기준으로 외부 chat record 노출 차단.
  - 25분 polling 부재: poll_first_gemini_evidence는 grace_seconds(기본 180초) hard limit.
    무한 루프/self-register/60초×N chain 패턴 절대 금지. 초과 호출 시 raise 보장.

5 분류 코드 + 우선순위 (회장 §명시 priority):
  1. INTERNAL_HEAD_MISMATCH — pushed_sha != github headRefOid (auto_retry=True, repush_head)
  2. INTERNAL_FETCH_128     — git exit 128 (broken ref fetch 실패, auto_retry=True, refetch)
  3. INTERNAL_BEHIND_BASE   — behind_count>0 AND head_diverged AND evidence_arrived=False
  4. EXTERNAL_TRIGGER_REQUIRED — evidence_arrived=False, head_match=True, fetchable=True
                                  (human_only=True, 25분 polling 재발 금지)
  5. PR_OPEN_GEMINI_TRIGGER_OK — evidence_arrived=True (정상 진행)
"""

from __future__ import annotations

import math
from datetime import datetime, timezone
from typing import Callable
import time


# ─── 5 분류 코드 상수 ────────────────────────────────────────────────────────
CLASSIFICATION_OK = "PR_OPEN_GEMINI_TRIGGER_OK"
CLASSIFICATION_INTERNAL_HEAD_MISMATCH = "PR_OPEN_GEMINI_TRIGGER_MISSED_INTERNAL_HEAD_MISMATCH"
CLASSIFICATION_INTERNAL_FETCH_128 = "PR_OPEN_GEMINI_TRIGGER_MISSED_INTERNAL_FETCH_128"
CLASSIFICATION_INTERNAL_BEHIND_BASE = "PR_OPEN_GEMINI_TRIGGER_MISSED_INTERNAL_BEHIND_BASE"
CLASSIFICATION_EXTERNAL_TRIGGER_REQUIRED = "EXTERNAL_TRIGGER_REQUIRED"

DEFAULT_CHAT_ID = "6937032012"

# polling interval (초)
_POLL_INTERVAL_SECONDS: int = 30


def _now_iso() -> str:
    return datetime.now(timezone.utc).isoformat(timespec="seconds")


class PROpenGeminiTriggerPrevention:
    """ANU v2 PR open Gemini trigger miss 예방 + 조기 분류 모듈.

    PR open 직전 preflight (base/head freshness, ref fetch) → PR open 직후 검증
    (headRefOid match, gemini-review-gate 진입) → grace window 내 first evidence
    polling → 미도착 시 외부/내부 원인 분리 → 즉시 분류.

    one-way isolation: anu_v2/ 외부 import 금지. 외부 모듈은 anu_v2 import 가능하나
    anu_v2 내부에서 utils/dispatch/scripts/dashboard 직접 import 금지.

    token raw 0: 토큰/secret raw value는 어디에도 노출하지 않는다.
    chat 격리: DEFAULT_CHAT_ID 기준으로 다른 chat record 노출 차단.
    25분 polling 부재: 모든 polling은 grace_seconds hard limit 준수.

    모든 외부 부수효과는 주입 가능 callable로 추상화. None이면 NotImplementedError.
    """

    def __init__(
        self,
        *,
        gh_runner: Callable[[str, list[str]], dict] | None = None,
        git_runner: Callable[[list[str]], dict] | None = None,
        evidence_poller: Callable[[int, str], list[dict]] | None = None,
        clock: Callable[[], float] | None = None,
        audit_writer: Callable[[dict], None] | None = None,
        chat_id: str = DEFAULT_CHAT_ID,
    ) -> None:
        self._gh_runner = gh_runner
        self._git_runner = git_runner
        self._evidence_poller = evidence_poller
        self._clock = clock
        self._audit_writer = audit_writer
        self._chat_id = str(chat_id)

    # ─── 내부 헬퍼 ─────────────────────────────────────────────────────────

    def _require_gh_runner(self) -> Callable[[str, list[str]], dict]:
        if self._gh_runner is None:
            raise NotImplementedError("gh_runner must be injected")
        return self._gh_runner

    def _require_git_runner(self) -> Callable[[list[str]], dict]:
        if self._git_runner is None:
            raise NotImplementedError("git_runner must be injected")
        return self._git_runner

    def _require_evidence_poller(self) -> Callable[[int, str], list[dict]]:
        if self._evidence_poller is None:
            raise NotImplementedError("evidence_poller must be injected")
        return self._evidence_poller

    def _require_clock(self) -> Callable[[], float]:
        if self._clock is None:
            raise NotImplementedError("clock must be injected")
        return self._clock

    # ─── 1. preflight ──────────────────────────────────────────────────────

    def preflight_base_head_freshness(self, base_branch: str, head_branch: str) -> dict:
        """base origin/main fetch + head 로컬 == origin/head 검증.

        git_runner를 사용해 다음을 수행:
          - git fetch origin (base_branch 최신 취득)
          - git rev-list --count {head_branch}..origin/{base_branch} → behind_count
            (head가 origin/base_branch 대비 얼마나 뒤쳐졌나)
          - git rev-parse {head_branch} vs git rev-parse origin/{head_branch} SHA 직접 비교
            → head_diverged (SHA 불일치 시 True)
        fresh = True iff behind_count == 0 AND head_diverged == False.

        Returns:
            {"fresh": bool, "behind_count": int, "head_diverged": bool, "reasons": list[str]}
        """
        git = self._require_git_runner()
        reasons: list[str] = []

        # 1) fetch origin
        fetch_result = git(["fetch", "origin"])
        if fetch_result.get("exit_code", 0) == 128:
            reasons.append("git_fetch_exit_128")
            return {
                "fresh": False,
                "behind_count": -1,
                "head_diverged": True,
                "reasons": reasons,
            }

        # 2) behind_count: head가 origin/base_branch 대비 얼마나 뒤쳐졌나
        behind_result = git(["rev-list", "--count", f"{head_branch}..origin/{base_branch}"])
        try:
            behind_count = int(behind_result.get("stdout", "0").strip())
        except (ValueError, TypeError):
            behind_count = 0

        # 3) head_diverged: 로컬 head SHA vs 원격 head SHA 직접 비교
        local_sha_result = git(["rev-parse", head_branch])
        remote_sha_result = git(["rev-parse", f"origin/{head_branch}"])
        local_sha = local_sha_result.get("stdout", "").strip()
        remote_sha = remote_sha_result.get("stdout", "").strip()
        head_diverged = bool(local_sha and remote_sha and local_sha != remote_sha)

        if behind_count > 0:
            reasons.append(f"behind_count={behind_count}")
        if head_diverged:
            reasons.append("head_diverged=True")

        fresh = (behind_count == 0) and (not head_diverged)

        return {
            "fresh": fresh,
            "behind_count": behind_count,
            "head_diverged": head_diverged,
            "reasons": reasons,
        }

    def preflight_ref_fetchability(self, head_sha: str) -> dict:
        """head SHA가 GitHub에서 fetch 가능한지 + base merge-base resolvable 검증.

        git_runner를 사용해 다음을 수행:
          - git fetch origin {head_sha} → exit_code 128이면 fetchable=False
          - git merge-base HEAD {head_sha} → exit_code 0이면 merge_base_resolvable=True

        Returns:
            {"fetchable": bool, "merge_base_resolvable": bool, "git_exit_code": int|None}
        """
        git = self._require_git_runner()

        # 1) fetch head SHA
        fetch_result = git(["fetch", "origin", head_sha])
        fetch_exit = fetch_result.get("exit_code", 0)

        if fetch_exit == 128:
            return {
                "fetchable": False,
                "merge_base_resolvable": False,
                "git_exit_code": 128,
            }

        fetchable = (fetch_exit == 0)

        # 2) merge-base check
        merge_result = git(["merge-base", "HEAD", head_sha])
        merge_exit = merge_result.get("exit_code", 0)
        merge_base_resolvable = (merge_exit == 0)

        return {
            "fetchable": fetchable,
            "merge_base_resolvable": merge_base_resolvable,
            "git_exit_code": fetch_exit if fetch_exit != 0 else None,
        }

    # ─── 2. PR open 직후 검증 ──────────────────────────────────────────────

    def verify_pr_head_sha_match(self, pr_number: int, pushed_sha: str) -> dict:
        """gh api pulls/<n>.headRefOid == pushed_sha 검증.

        gh_runner("pulls/{pr_number}", []) → headRefOid 필드를 비교.

        Returns:
            {"match": bool, "github_head_sha": str, "pushed_sha": str}
        """
        gh = self._require_gh_runner()
        result = gh(f"pulls/{pr_number}", [])
        github_head_sha: str = result.get("headRefOid", "")
        match = bool(github_head_sha) and (github_head_sha == pushed_sha)

        return {
            "match": match,
            "github_head_sha": github_head_sha,
            "pushed_sha": pushed_sha,
        }

    def verify_gemini_review_gate_check_present(
        self, head_sha: str, max_wait_seconds: int = 60
    ) -> dict:
        """gemini-review-gate check_run이 head_sha에 존재하는지 + git exit 128 여부.

        gh_runner("commits/{sha}/check-runs", []) → check_runs 목록에서
        name == "gemini-review-gate" 존재 여부 확인.

        conclusion이 failure이고 출력에 "exit 128" 또는 "fatal: " 포함 시 git_exit_128=True.
        internal_cause_candidate는 ["check_missing", "git_exit_128", "ref_fetch_failure"] 등.

        Returns:
            {"check_present": bool, "git_exit_128": bool, "conclusion": str|None,
             "internal_cause_candidate": list[str]}
        """
        _ = max_wait_seconds  # spec parameter, polling 미사용 (단발 check 조회)
        gh = self._require_gh_runner()
        result = gh(f"commits/{head_sha}/check-runs", [])

        check_runs: list[dict] = result.get("check_runs", [])
        internal_cause: list[str] = []
        check_present = False
        conclusion: str | None = None
        git_exit_128 = False

        for run in check_runs:
            if run.get("name") == "gemini-review-gate":
                check_present = True
                conclusion = run.get("conclusion")

                # git exit 128 신호 탐지
                output_text = ""
                if isinstance(run.get("output"), dict):
                    output_text = run["output"].get("text", "") or ""
                elif isinstance(run.get("output"), str):
                    output_text = run["output"]

                if conclusion == "failure" and (
                    "exit 128" in output_text or "fatal: " in output_text
                ):
                    git_exit_128 = True
                    internal_cause.append("git_exit_128")
                break

        if not check_present:
            internal_cause.append("check_missing")

        # git_exit_128가 True이고 check는 없거나 실패면 ref_fetch_failure 후보 추가
        if git_exit_128:
            internal_cause.append("ref_fetch_failure")

        return {
            "check_present": check_present,
            "git_exit_128": git_exit_128,
            "conclusion": conclusion,
            "internal_cause_candidate": internal_cause,
        }

    # ─── 3. grace window first evidence polling ────────────────────────────

    def poll_first_gemini_evidence(
        self, pr_number: int, head_sha: str, grace_seconds: int = 180
    ) -> dict:
        """PR open 후 grace_seconds 내 Gemini 첫 evidence polling.

        무한 polling/self-register 금지. grace 만료 즉시 종료.
        poller 호출 횟수 hard limit: ceil(grace_seconds / _POLL_INTERVAL_SECONDS).
        초과 시 RuntimeError (사고 방지).

        Returns:
            {"evidence_arrived": bool, "elapsed_seconds": int,
             "review_id": int|None, "review_commit_id": str|None}
        """
        poller = self._require_evidence_poller()
        clock = self._require_clock()

        max_calls = math.ceil(grace_seconds / _POLL_INTERVAL_SECONDS)
        call_count = 0
        start_time = clock()

        while True:
            # grace 시간 만료 여부 체크
            elapsed = clock() - start_time
            if elapsed >= grace_seconds:
                break

            call_count += 1
            if call_count > max_calls:
                raise RuntimeError(
                    f"poll_first_gemini_evidence: poller call count {call_count} "
                    f"exceeded hard limit {max_calls} (grace_seconds={grace_seconds}). "
                    "무한 polling 방지를 위한 안전 종료."
                )

            reviews: list[dict] = poller(pr_number, head_sha)

            for review in reviews:
                user = review.get("user") or {}
                login = user.get("login", "")
                if login == "gemini-code-assist[bot]":
                    elapsed_final = int(clock() - start_time)
                    return {
                        "evidence_arrived": True,
                        "elapsed_seconds": elapsed_final,
                        "review_id": review.get("id"),
                        "review_commit_id": review.get("commit_id"),
                    }

            # 다음 poll 전 grace 만료 재확인
            time.sleep(_POLL_INTERVAL_SECONDS)
            elapsed = clock() - start_time
            if elapsed >= grace_seconds:
                break

        elapsed_final = int(clock() - start_time)
        return {
            "evidence_arrived": False,
            "elapsed_seconds": elapsed_final,
            "review_id": None,
            "review_commit_id": None,
        }

    # ─── 4. 분류 ──────────────────────────────────────────────────────────

    def classify_trigger_miss(
        self,
        preflight: dict,
        head_match: dict,
        gate_present: dict,
        evidence: dict,
    ) -> dict:
        """4 input dict → 분류 → 후속 액션.

        분류 우선순위 (회장 §명시 높음→낮음):
          1. head_match["match"] is False
             → INTERNAL_HEAD_MISMATCH (auto_retry=True, action=repush_head_and_retry)
          2. preflight["fetchable"] is False OR (gate_present["git_exit_128"] is True)
             → INTERNAL_FETCH_128 (auto_retry=True, action=refetch_refs_and_retry)
          3. preflight["behind_count"] > 0 AND preflight["head_diverged"]
             AND evidence["evidence_arrived"] is False
             → INTERNAL_BEHIND_BASE (auto_retry=True, action=merge_base_and_reopen)
          4. evidence["evidence_arrived"] is False AND head_match["match"] is True
             AND preflight.get("fetchable", True) is True
             → EXTERNAL_TRIGGER_REQUIRED (auto_retry=False, human_only=True)
          5. evidence["evidence_arrived"] is True
             → PR_OPEN_GEMINI_TRIGGER_OK

        Returns:
            {"classification": str, "is_internal": bool, "auto_retry_allowed": bool,
             "human_only": bool, "next_action": str, "reasons": list[str]}
        """
        reasons: list[str] = []

        # priority 1: head SHA mismatch
        if not head_match.get("match", True):
            reasons.append("head_sha_mismatch")
            return {
                "classification": CLASSIFICATION_INTERNAL_HEAD_MISMATCH,
                "is_internal": True,
                "auto_retry_allowed": True,
                "human_only": False,
                "next_action": "repush_head_and_retry",
                "reasons": reasons,
            }

        # priority 2: git exit 128 / fetch failure
        fetchable = preflight.get("fetchable", True)
        git_exit_128 = gate_present.get("git_exit_128", False)
        if (not fetchable) or git_exit_128:
            if not fetchable:
                reasons.append("ref_fetch_failure_exit_128")
            if git_exit_128:
                reasons.append("git_exit_128")
            return {
                "classification": CLASSIFICATION_INTERNAL_FETCH_128,
                "is_internal": True,
                "auto_retry_allowed": True,
                "human_only": False,
                "next_action": "refetch_refs_and_retry",
                "reasons": reasons,
            }

        # priority 3: behind base AND diverged AND evidence not arrived
        behind_count = preflight.get("behind_count", 0)
        head_diverged = preflight.get("head_diverged", False)
        evidence_arrived = evidence.get("evidence_arrived", False)
        if behind_count > 0 and head_diverged and not evidence_arrived:
            reasons.append(f"behind_count={behind_count}")
            reasons.append("head_diverged=True")
            return {
                "classification": CLASSIFICATION_INTERNAL_BEHIND_BASE,
                "is_internal": True,
                "auto_retry_allowed": True,
                "human_only": False,
                "next_action": "merge_base_and_reopen",
                "reasons": reasons,
            }

        # priority 4: external trigger required (evidence not arrived, all internal OK)
        if not evidence_arrived:
            reasons.append("external_trigger_missing")
            if not gate_present.get("check_present", True):
                reasons.append("check_missing")
            return {
                "classification": CLASSIFICATION_EXTERNAL_TRIGGER_REQUIRED,
                "is_internal": False,
                "auto_retry_allowed": False,
                "human_only": True,
                "next_action": "wait_for_owner_/gemini_review_or_chairman_directive",
                "reasons": reasons,
            }

        # priority 5: OK
        return {
            "classification": CLASSIFICATION_OK,
            "is_internal": False,
            "auto_retry_allowed": False,
            "human_only": False,
            "next_action": "proceed_to_merge_gates",
            "reasons": reasons,
        }

    # ─── 5. 통합 entry point ──────────────────────────────────────────────

    def run(
        self,
        pr_number: int,
        base_branch: str,
        head_branch: str,
        head_sha: str,
        grace_seconds: int = 180,
    ) -> dict:
        """preflight → verify → poll → classify 한 번에 실행.

        순서:
          1. preflight_base_head_freshness
          2. preflight_ref_fetchability
          3. verify_pr_head_sha_match
          4. verify_gemini_review_gate_check_present (max_wait_seconds=60)
          5. poll_first_gemini_evidence (grace_seconds)
          6. classify_trigger_miss

        audit_writer가 주입돼 있으면 결과를 호출 (token raw 0 보장).

        Returns:
            {"classification": str, "preflight": dict, "head_match": dict,
             "gate_present": dict, "evidence": dict, "next_action": str}
        """
        # Step 1 & 2: preflight
        freshness = self.preflight_base_head_freshness(base_branch, head_branch)
        fetchability = self.preflight_ref_fetchability(head_sha)

        # merge preflight dicts (fetchability fields override freshness on conflict)
        preflight: dict = {**freshness, **fetchability}

        # Step 3: verify head SHA match
        head_match = self.verify_pr_head_sha_match(pr_number, head_sha)

        # Step 4: verify gemini-review-gate check
        gate_present = self.verify_gemini_review_gate_check_present(head_sha, max_wait_seconds=60)

        # Step 5: poll first evidence
        evidence = self.poll_first_gemini_evidence(pr_number, head_sha, grace_seconds=grace_seconds)

        # Step 6: classify
        classification_result = self.classify_trigger_miss(preflight, head_match, gate_present, evidence)

        result: dict = {
            "classification": classification_result["classification"],
            "preflight": preflight,
            "head_match": head_match,
            "gate_present": gate_present,
            "evidence": evidence,
            "next_action": classification_result["next_action"],
            "is_internal": classification_result["is_internal"],
            "auto_retry_allowed": classification_result["auto_retry_allowed"],
            "human_only": classification_result["human_only"],
            "reasons": classification_result["reasons"],
            "chat_id": self._chat_id,
            "run_at": _now_iso(),
        }

        # audit (token raw 0 — result에 토큰 없음)
        if self._audit_writer is not None:
            self._audit_writer(result)

        return result
