"""anu_v2.executor_scheduler — 자동 entry point (task-2556 §1 / §5 / §6 / §7 / §8 / §10 / §12).

회장 §명시 2026-05-12 KST (task-2556 본질):
  PR #106 → OWNER_TRIGGER_ONLY_CAPABILITY runner 는 main 반영. 단 PR #107 이 실증한 갭:
    - idle PR 을 자동 감지해 runner 를 호출하는 daemon/scheduler entry point 부재.
    - 봇 session 종료되면 ``FIRST_GEMINI_TRIGGER_MISSING`` / ``GEMINI_STALE_ON_HEAD``
      자동 처리 안 됨.
  본 task = 자동 entry point (executor scheduler) 구현. capability 자동 활성화 완성.

본 모듈 책임 (회장 §1 / §5 / §6 / §7 / §8 / §10 / §12):
  §1  OPEN PR idle scan — ``gh pr list --state open`` 결과를 받아 정기 scan.
  §5  OWNER_TRIGGER_REQUIRED decision write — ``emit_owner_trigger_decision`` 호출.
  §6  ``owner_trigger_only.trigger_gemini_review()`` runner 자동 호출.
  §7  decision.json / audit.jsonl / requested/posted/failed marker 박제.
  §8  scheduled/event-driven recheck (회장 chat 노출 0).
  §10 duplicate same-head dedupe — fcntl.flock atomic, 이미 trigger 된 head 재호출 차단.
  §12 bot session 종료 후 재진입 — state persisted markers 기반 scheduler 가 재진입.

본 모듈 NOT 책임:
  - HTTP call 직접 수행 (owner_trigger_only 가 함).
  - merge 실행 (merge_queue_executor 가 함).
  - 회장 chat 알림 (정책상 노출 0 — scheduler 는 silent).

부수효과 (제한):
  - decision_dir 에 owner_trigger_decision.json / marker 파일 생성.
  - scheduler lock 파일 fcntl.flock (same-head dedupe).
  - audit JSONL append (owner_trigger_audit 가 함).

one-way isolation: anu_v2/* 만 import. 외부 (utils/dispatch/scripts/dashboard) 의존성 0.
"""

from __future__ import annotations

import fcntl
import json
import logging
import os
from contextlib import contextmanager
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from typing import Callable, Final, Iterator, Sequence

from anu_v2.idle_pr_diagnoser import (
    IdlePRDiagnoser,
    IdlePRDiagnosis,
    IdlePRSnapshot,
    STATE_FIRST_GEMINI_TRIGGER_MISSING,
    STATE_FIRST_TRIGGER_PENDING,
    STATE_GEMINI_FRESH_ON_HEAD,
    STATE_GEMINI_STALE_ON_HEAD,
    STATE_WITHIN_GRACE_PERIOD,
)
from anu_v2.merge_queue_executor import (
    AUTO_MERGE_ALLOWED,
    GEMINI_FRESH_DETECTED,
    MergeQueueExecutor,
    OWNER_TRIGGER_FAILED,
    OWNER_TRIGGER_POSTED,
    PRMeta,
)
from anu_v2.owner_trigger_audit import (
    RESULT_DEDUPED,
    RESULT_FAILED,
    RESULT_POSTED,
)
from anu_v2.owner_trigger_decision import DecisionInvalidError
from anu_v2.owner_trigger_only import (
    OwnerTriggerOnly,
    TokenBoundaryViolation,
    assert_scheduler_token_boundary,
    invoke_from_scheduler,
)
from anu_v2.polling_policy import (
    BotSessionExitRequired,
    PollingState,
    advance_recheck,
    assert_first_timeout_not_exceeded,
    must_exit_now,
)
from anu_v2.second_review_recovery import (
    SecondReviewInput,
    auto_trigger_owner_review,
    determine_state,
    emit_phase2_markers,
)


logger = logging.getLogger(__name__)

SCHEDULER_LOCK_REL_PATH: Final[str] = "memory/events/executor_scheduler.lock"
SCHEDULER_AUDIT_REL_PATH: Final[str] = "memory/events/executor_scheduler_audit.jsonl"
SCHEDULER_AUDIT_SCHEMA: Final[str] = "anu_v2.executor_scheduler.v1"

# ─── FUC-4 per-PR exception isolation (task-2560) ────────────────────────
#
# 회장 §명시 2026-05-12 Track A 1순위 본질:
#   한 PR diagnosis 의 예외가 전체 cycle 을 중단시키지 않게 한다. 본 모듈은
#   ``_handle_single_diagnosis`` 를 ``_safe_handle_single_diagnosis`` 로 감싸
#   per-PR try/except isolation 을 강제한다.
#
# 분류 (회장 §명시 본질 6 — "Critical 7 이면 critical escalation"):
#   non-critical exception → cycle 유지, 해당 PR 만 FAILED marker + audit
#   critical exception     → cycle 유지하되 critical escalation marker 박제 +
#                            re-raise 가 아닌 isolated record (회장 §명시 본질
#                            7 — "non-critical exception 이면 scheduler cycle
#                            유지" 의 dual: critical 도 cycle 은 유지하나 별도
#                            marker 로 박제해 escalation 가시화).
#
#   단 ``BaseException`` 계열 (SystemExit, KeyboardInterrupt, GeneratorExit) 와
#   polling policy 의 ``BotSessionExitRequired`` 는 cycle 종료 신호 자체이므로
#   per-PR isolation 으로 삼키지 않고 그대로 전파한다 (회장 §9 / 봇 종료 정책).

PR_EXCEPTION_AUDIT_SCHEMA: Final[str] = "anu_v2.executor_scheduler.pr_exception.v1"

ACTION_PR_EXCEPTION_ISOLATED: Final[str] = "PR_EXCEPTION_ISOLATED"
ACTION_PR_EXCEPTION_CRITICAL_ESCALATED: Final[str] = "PR_EXCEPTION_CRITICAL_ESCALATED"


# Critical 7 — cycle 자체는 유지하나 escalation marker 로 박제해야 할 예외.
# 본 분류는 task-2560 회장 §명시 본질 6 1:1. polling/system 종료 신호는 본
# 셋에 포함되지 않는다 (그것들은 cycle 전체 종료이므로 별도 처리).
_CRITICAL_EXCEPTIONS: Final[tuple[type[BaseException], ...]] = (
    TokenBoundaryViolation,   # ①  scheduler token 경계 위반
    PermissionError,          # ②  파일/리소스 권한
    OSError,                  # ③  디스크/파일시스템 오류
    MemoryError,              # ④  메모리 부족
    NotImplementedError,      # ⑤  계약 위반 (필수 인터페이스 미구현)
    TypeError,                # ⑥  타입 계약 위반 (DI 잘못)
    AttributeError,           # ⑦  내부 attribute 누락 (코드 인터페이스 갈림)
)


def _is_critical_exception(exc: BaseException) -> bool:
    """주어진 예외가 critical (escalation marker 박제) 대상인지 판정."""
    return isinstance(exc, _CRITICAL_EXCEPTIONS)


def _summarize_exception(exc: BaseException) -> dict:
    """exception 한 줄 summary (decision/audit 기록용). traceback frame 0 만 박제."""
    summary: dict = {
        "type": type(exc).__name__,
        "module": type(exc).__module__,
        "message": str(exc)[:512],   # decision 파일 비대화 방지
    }
    tb = exc.__traceback__
    if tb is not None:
        # 가장 안쪽 frame 까지 내려가 origin 위치만 박제 (PII / token leak 방지).
        last = tb
        while last.tb_next is not None:
            last = last.tb_next
        code = last.tb_frame.f_code
        summary["origin_filename"] = Path(code.co_filename).name
        summary["origin_function"] = code.co_name
        summary["origin_lineno"] = last.tb_lineno
    return summary


# ─── 결과 코드 (회장 §) ────────────────────────────────────────────────────


ACTION_OWNER_TRIGGER_DISPATCHED: Final[str] = "OWNER_TRIGGER_DISPATCHED"
ACTION_OWNER_TRIGGER_DEDUPED: Final[str] = "OWNER_TRIGGER_DEDUPED"
ACTION_OWNER_TRIGGER_FAILED: Final[str] = "OWNER_TRIGGER_FAILED"
ACTION_FRESH_RESUME: Final[str] = "FRESH_GEMINI_AUTO_RESUME"
ACTION_WITHIN_GRACE: Final[str] = "WITHIN_GRACE_PERIOD_SKIP"
ACTION_CI_FAILED_SKIP: Final[str] = "CI_FAILED_SKIP"
ACTION_MISSING_TASK_ID_SKIP: Final[str] = "MISSING_TASK_ID_SKIP"
ACTION_SAME_HEAD_DEDUPED: Final[str] = "SAME_HEAD_DEDUPED"

# task-2563 §1 1:1: FIRST_TRIGGER_PENDING 상태에서 fast_path=false 시 owner trigger 보류.
# scheduler 는 ``dispatch_decision.owner_trigger_fast_path == true`` 일 때만 조기 dispatch.
# 그 외는 본 action 으로 skip — chat_notifications=0, marker 0, audit 만 박제.
ACTION_FIRST_TRIGGER_PENDING_SKIP: Final[str] = "FIRST_TRIGGER_PENDING_SKIP"

# dispatch_decision JSON 의 fast_path flag 키 (회장 §명시 task-2563 §1).
DISPATCH_DECISION_FAST_PATH_KEY: Final[str] = "owner_trigger_fast_path"


# ─── 결과 dataclass ───────────────────────────────────────────────────────


@dataclass(frozen=True)
class SchedulerPRAction:
    """단일 PR 에 대한 scheduler 결정 + 결과."""

    pr_number: int
    task_id: str
    head_sha: str
    state: str
    action: str
    runner_result: str = ""
    decision_path: str = ""
    marker_path: str = ""
    reason: str = ""

    def __post_init__(self) -> None:
        if self.action not in {
            ACTION_OWNER_TRIGGER_DISPATCHED,
            ACTION_OWNER_TRIGGER_DEDUPED,
            ACTION_OWNER_TRIGGER_FAILED,
            ACTION_FRESH_RESUME,
            ACTION_WITHIN_GRACE,
            ACTION_CI_FAILED_SKIP,
            ACTION_MISSING_TASK_ID_SKIP,
            ACTION_SAME_HEAD_DEDUPED,
            ACTION_FIRST_TRIGGER_PENDING_SKIP,
            ACTION_PR_EXCEPTION_ISOLATED,
            ACTION_PR_EXCEPTION_CRITICAL_ESCALATED,
            "DIAGNOSIS_ONLY",
            "UNKNOWN_STATE_SKIP",
        }:
            raise ValueError(f"action {self.action!r} not in allowed set")


@dataclass(frozen=True)
class SchedulerCycleResult:
    """run_one_cycle 의 종합 결과 (모든 PR 합산).

    각 PR 별 SchedulerPRAction 리스트 + chat-noise-free 어셀션 (chat_notifications=0).

    task-2560 FUC-4 per-PR isolation:
      - ``pr_exceptions_isolated`` — non-critical exception 발생 PR 수 (cycle 유지).
      - ``pr_exceptions_critical_escalated`` — critical 7 분류 exception 발생 PR 수
        (cycle 유지 + escalation marker 박제).
      - ``cycle_crashed`` — cycle 자체가 중단됐는지 (per-PR isolation 성공 시 False).
    """

    cycle_started_at: str
    cycle_finished_at: str
    pr_actions: tuple[SchedulerPRAction, ...]
    chat_notifications: int = 0   # 회장 chat 노출 금지 (회장 §8) — 본 값은 항상 0.
    rechecks_done: int = 0
    bot_should_exit: bool = True   # 회장 §9 — recheck 1 회 후 즉시 종료.
    pr_exceptions_isolated: int = 0
    pr_exceptions_critical_escalated: int = 0
    cycle_crashed: bool = False


# ─── core class ───────────────────────────────────────────────────────────


class ExecutorScheduler:
    """OPEN PR idle scan + 자동 entry point. 회장 §1/§5/§6/§7/§8/§10/§12 1:1.

    구성 (DI):
      - ``workspace_root``: anu_v2 workspace 루트 (decision/marker/audit 경로 계산).
      - ``decision_dir``: marker / decision.json 디렉토리.
      - ``snapshot_provider``: ``Callable[[], Sequence[IdlePRSnapshot]]`` —
        ``gh pr list --state open --json ...`` 결과를 정규화해 반환. 본 모듈은 외부 명령
        실행을 직접 안 한다 (one-way isolation).
      - ``owner_trigger``: ``OwnerTriggerOnly`` 인스턴스. 본 클래스는 본 runner 를
        ``invoke_from_scheduler`` 어댑터로 호출.
      - ``merge_executor``: ``MergeQueueExecutor`` 인스턴스. fresh evidence 도착 시
        ``auto_resume_after_fresh_evidence`` 를 호출하기 위함.
      - ``owner``, ``repo``: GitHub owner / repo.
      - ``diagnoser``: 선택. 미주입 시 default ``IdlePRDiagnoser``.

    설계 원칙:
      - **chat_notifications == 0**: 본 모듈은 telegram / cokacdir / slack 어떤 외부
        채널에도 알림을 보내지 않는다 (회장 §8 1:1).
      - **long polling 0**: 본 모듈은 자체 sleep loop 가 없다. ``run_one_cycle`` 은
        한 번만 scan + dispatch 후 즉시 반환. 재진입은 외부 cron/webhook 책임 (§8).
      - **same-head dedupe**: ``fcntl.flock`` 기반 atomic. 동일 (pr, head) 가 활성
        trigger 를 가진 경우 SchedulerPRAction.action == SAME_HEAD_DEDUPED.
      - **state persisted markers**: 본 모듈의 모든 결정은 audit/marker 로 박제되어
        다음 cycle 이 재진입 시 상태 복원 가능.
    """

    def __init__(
        self,
        *,
        workspace_root: str | Path,
        decision_dir: str | Path,
        snapshot_provider: Callable[[], Sequence[IdlePRSnapshot]],
        owner_trigger: OwnerTriggerOnly,
        merge_executor: MergeQueueExecutor,
        owner: str,
        repo: str,
        diagnoser: IdlePRDiagnoser | None = None,
        clock: Callable[[], str] | None = None,
    ) -> None:
        if snapshot_provider is None:
            raise NotImplementedError("snapshot_provider must be injected")
        if not isinstance(owner_trigger, OwnerTriggerOnly):
            raise TypeError("owner_trigger must be OwnerTriggerOnly instance")
        if not isinstance(merge_executor, MergeQueueExecutor):
            raise TypeError("merge_executor must be MergeQueueExecutor instance")
        if not isinstance(owner, str) or not owner or "/" in owner:
            raise ValueError("owner must be non-empty string without '/'")
        if not isinstance(repo, str) or not repo or "/" in repo:
            raise ValueError("repo must be non-empty string without '/'")
        self._workspace_root = Path(workspace_root).resolve()
        self._decision_dir = Path(decision_dir).resolve()
        self._snapshot_provider = snapshot_provider
        self._owner_trigger = owner_trigger
        self._merge_executor = merge_executor
        self._owner = owner
        self._repo = repo
        self._diagnoser = diagnoser if diagnoser is not None else IdlePRDiagnoser()
        self._clock = clock if clock is not None else _now_iso

    # ─── lock paths ──────────────────────────────────────────────────────

    @property
    def lock_path(self) -> Path:
        """scheduler same-head dedupe lock 파일 경로."""
        return self._workspace_root / SCHEDULER_LOCK_REL_PATH

    @property
    def audit_path(self) -> Path:
        """scheduler audit JSONL 경로 (cycle / action 박제)."""
        return self._workspace_root / SCHEDULER_AUDIT_REL_PATH

    # ─── §10 same-head dedupe lock ───────────────────────────────────────

    @contextmanager
    def _scheduler_lock(self) -> Iterator[None]:
        """fcntl LOCK_EX 기반 atomic scheduler lock (회장 §10).

        동시에 다른 scheduler instance 가 진입하면 후속 instance 는 본 lock 에서
        block 된다 (POSIX advisory). 본 lock 은 sidecar lock 파일에 잡혀 audit JSONL
        본 파일 lock 과 분리된다.
        """
        self.lock_path.parent.mkdir(parents=True, exist_ok=True)
        with open(self.lock_path, "a", encoding="utf-8") as lock_fh:
            fcntl.flock(lock_fh.fileno(), fcntl.LOCK_EX)
            try:
                yield
            finally:
                fcntl.flock(lock_fh.fileno(), fcntl.LOCK_UN)

    def _has_active_trigger_for_head(
        self, *, pr_number: int, head_sha: str
    ) -> bool:
        """(pr, head) 에 대해 활성 owner trigger 가 이미 있는지 audit 으로 확인.

        owner_trigger_audit 의 bounded reverse scan 을 활용 (capability 재사용).
        """
        audit = self._owner_trigger._audit  # owner_trigger_audit.OwnerTriggerAudit
        return audit._has_active_trigger(pr=pr_number, head=head_sha)

    # ─── §6 / §7 owner trigger dispatch ──────────────────────────────────

    def _dispatch_owner_trigger(
        self,
        *,
        diag: IdlePRDiagnosis,
        snapshot: IdlePRSnapshot,
    ) -> SchedulerPRAction:
        """OWNER_TRIGGER_REQUIRED 진단 결과를 owner_trigger runner 로 dispatch.

        흐름 (회장 §5 / §6 / §7 1:1):
          1. same-head dedupe 검사 (audit 기반).
          2. PRMeta 구성 (snapshot → merge_queue_executor 호환).
          3. ``merge_executor.emit_owner_trigger_decision`` 호출 — decision.json + requested marker.
          4. ``invoke_from_scheduler(runner, ...)`` 어댑터로 OwnerTriggerOnly 호출.
          5. 결과 (POSTED|DEDUPED|FAILED|PENDING) 에 따라 marker 기록.
          6. SchedulerPRAction 반환.
        """
        # §10 same-head dedupe — audit 기반 (다른 process 가 이미 active trigger)
        if self._has_active_trigger_for_head(
            pr_number=diag.pr_number, head_sha=diag.head_sha
        ):
            return SchedulerPRAction(
                pr_number=diag.pr_number,
                task_id=diag.task_id,
                head_sha=diag.head_sha,
                state=diag.state,
                action=ACTION_SAME_HEAD_DEDUPED,
                runner_result=RESULT_DEDUPED,
                reason="audit shows active POSTED|PENDING trigger for (pr, head)",
            )

        pr_meta = _snapshot_to_pr_meta(snapshot)

        # §5 decision.json + requested marker
        self._merge_executor.emit_owner_trigger_decision(
            task_id=diag.task_id,
            pr=pr_meta,
            decision_dir=self._decision_dir,
        )
        decision_path = (
            self._decision_dir / f"{diag.task_id}.owner_trigger_decision.json"
        )

        # §6 runner 자동 호출 (scheduler-initiated)
        try:
            runner_result = invoke_from_scheduler(
                self._owner_trigger,
                decision_path=decision_path,
                owner=self._owner,
                repo=self._repo,
                current_head_actual=diag.head_sha,
            )
        except DecisionInvalidError as exc:
            # decision schema 위반 — fail-closed marker 기록
            marker_path = self._merge_executor.record_owner_trigger_outcome(
                task_id=diag.task_id,
                pr=pr_meta,
                outcome_code=OWNER_TRIGGER_FAILED,
                decision_dir=self._decision_dir,
                extra={"decision_invalid_code": getattr(exc, "code", "")},
            )
            return SchedulerPRAction(
                pr_number=diag.pr_number,
                task_id=diag.task_id,
                head_sha=diag.head_sha,
                state=diag.state,
                action=ACTION_OWNER_TRIGGER_FAILED,
                runner_result=RESULT_FAILED,
                decision_path=str(decision_path),
                marker_path=str(marker_path),
                reason=f"decision invalid: {getattr(exc, 'code', '')}",
            )

        # §7 결과별 marker 기록
        if runner_result == RESULT_POSTED:
            marker_path = self._merge_executor.record_owner_trigger_outcome(
                task_id=diag.task_id,
                pr=pr_meta,
                outcome_code=OWNER_TRIGGER_POSTED,
                decision_dir=self._decision_dir,
            )
            return SchedulerPRAction(
                pr_number=diag.pr_number,
                task_id=diag.task_id,
                head_sha=diag.head_sha,
                state=diag.state,
                action=ACTION_OWNER_TRIGGER_DISPATCHED,
                runner_result=RESULT_POSTED,
                decision_path=str(decision_path),
                marker_path=str(marker_path),
                reason=diag.reason,
            )
        if runner_result == RESULT_DEDUPED:
            return SchedulerPRAction(
                pr_number=diag.pr_number,
                task_id=diag.task_id,
                head_sha=diag.head_sha,
                state=diag.state,
                action=ACTION_OWNER_TRIGGER_DEDUPED,
                runner_result=RESULT_DEDUPED,
                decision_path=str(decision_path),
                reason="runner returned DEDUPED (atomic dedupe race)",
            )
        # FAILED / PENDING / unknown
        marker_path = self._merge_executor.record_owner_trigger_outcome(
            task_id=diag.task_id,
            pr=pr_meta,
            outcome_code=OWNER_TRIGGER_FAILED,
            decision_dir=self._decision_dir,
            extra={"runner_result": runner_result},
        )
        return SchedulerPRAction(
            pr_number=diag.pr_number,
            task_id=diag.task_id,
            head_sha=diag.head_sha,
            state=diag.state,
            action=ACTION_OWNER_TRIGGER_FAILED,
            runner_result=runner_result,
            decision_path=str(decision_path),
            marker_path=str(marker_path),
            reason=f"runner result {runner_result}",
        )

    # ─── §12 fresh evidence auto-resume ──────────────────────────────────

    def _handle_fresh_evidence(
        self,
        *,
        diag: IdlePRDiagnosis,
        snapshot: IdlePRSnapshot,
    ) -> SchedulerPRAction:
        """gemini fresh on head 진단 → merge_executor.auto_resume_after_fresh_evidence."""
        pr_meta = _snapshot_to_pr_meta(snapshot)
        outcome = self._merge_executor.auto_resume_after_fresh_evidence(
            task_id=diag.task_id,
            pr=pr_meta,
            latest_gemini_review_commit_id=diag.latest_gemini_commit_id,
            decision_dir=self._decision_dir,
        )
        marker_str = ""
        if (
            outcome.decision == AUTO_MERGE_ALLOWED
            and outcome.reason == GEMINI_FRESH_DETECTED
        ):
            extra = outcome.extra or {}
            marker_str = str(extra.get("marker_path", ""))
        return SchedulerPRAction(
            pr_number=diag.pr_number,
            task_id=diag.task_id,
            head_sha=diag.head_sha,
            state=diag.state,
            action=ACTION_FRESH_RESUME,
            decision_path="",
            marker_path=marker_str,
            reason=outcome.reason,
        )

    # ─── §1 / §2 single-cycle entry point ────────────────────────────────

    def run_one_cycle(
        self,
        *,
        env: dict | None = None,
        now: str | None = None,
        cycle_polling_state: PollingState | None = None,
    ) -> SchedulerCycleResult:
        """단일 scan + dispatch + exit. 회장 §1/§2/§5~§8/§10/§12 1:1.

        본 메서드는 **단일 cycle 만 실행** 한다. while True 루프 0 — long polling 0
        (회장 §9). 다음 cycle 은 외부 cron / webhook 에서 본 메서드를 재호출.

        흐름:
          1. token boundary 검증 (env 주입 — scheduler 가 OWNER token 만 들고 있음).
          2. polling state 검증 (must_exit_now 시 즉시 종료).
          3. ``snapshot_provider()`` 호출 → OPEN PR snapshot 목록.
          4. diagnoser.diagnose_all 일괄 진단.
          5. 각 PR 진단 결과에 따라 action 분기:
             - WITHIN_GRACE_PERIOD → ACTION_WITHIN_GRACE (skip, marker 없음)
             - MISSING_TASK_ID    → ACTION_MISSING_TASK_ID_SKIP
             - CI_FAILED          → ACTION_CI_FAILED_SKIP
             - FIRST_GEMINI_TRIGGER_MISSING | GEMINI_STALE_ON_HEAD → dispatch
             - GEMINI_FRESH_ON_HEAD → auto-resume marker
          6. SchedulerCycleResult 반환.

        Args:
          env: scheduler env dict. ``OWNER_GEMINI_TRIGGER_TOKEN`` 필수.
          now: 진단용 ISO UTC (테스트 시 결정성 보장). 미주입 시 ``datetime.now(UTC)``.
          cycle_polling_state: 현재 cycle 의 polling state. 미주입 시 fresh state.

        Returns:
          SchedulerCycleResult.

        Raises:
          TokenBoundaryViolation: env 가 boundary 위반.
          BotSessionExitRequired: polling 정책상 즉시 exit.
        """
        # §11 token boundary
        assert_scheduler_token_boundary(env)
        # §9 polling state 검증
        state = cycle_polling_state if cycle_polling_state is not None else PollingState()
        if must_exit_now(state):
            raise BotSessionExitRequired(
                f"polling state requires exit (rechecks_done={state.rechecks_done}, "
                f"elapsed={state.elapsed_seconds}s)"
            )

        cycle_started = self._clock()
        actions: list[SchedulerPRAction] = []
        isolated_count = 0
        critical_count = 0
        with self._scheduler_lock():
            snapshots = list(self._snapshot_provider())
            diagnoses = self._diagnoser.diagnose_all(snapshots, now=now)

            # snapshot index 로 빠르게 lookup 가능하도록 dict 구성.
            snap_by_pr: dict[int, IdlePRSnapshot] = {s.number: s for s in snapshots}

            for diag in diagnoses:
                snap = snap_by_pr.get(diag.pr_number)
                if snap is None:
                    actions.append(
                        SchedulerPRAction(
                            pr_number=diag.pr_number,
                            task_id=diag.task_id,
                            head_sha=diag.head_sha,
                            state=diag.state,
                            action="UNKNOWN_STATE_SKIP",
                            reason="snapshot missing",
                        )
                    )
                    continue
                # task-2560 FUC-4 — per-PR try/except isolation.
                # 본 PR 의 예외가 다음 PR 의 diagnosis / dispatch 를 막지 않게 한다.
                action = self._safe_handle_single_diagnosis(
                    diag=diag, snapshot=snap, cycle_started=cycle_started
                )
                actions.append(action)
                if action.action == ACTION_PR_EXCEPTION_ISOLATED:
                    isolated_count += 1
                elif action.action == ACTION_PR_EXCEPTION_CRITICAL_ESCALATED:
                    critical_count += 1
                self._append_audit(cycle_started=cycle_started, action=action)

        cycle_finished = self._clock()
        return SchedulerCycleResult(
            cycle_started_at=cycle_started,
            cycle_finished_at=cycle_finished,
            pr_actions=tuple(actions),
            chat_notifications=0,
            rechecks_done=state.rechecks_done,
            bot_should_exit=must_exit_now(state),
            pr_exceptions_isolated=isolated_count,
            pr_exceptions_critical_escalated=critical_count,
            cycle_crashed=False,
        )

    def _handle_single_diagnosis(
        self,
        *,
        diag: IdlePRDiagnosis,
        snapshot: IdlePRSnapshot,
    ) -> SchedulerPRAction:
        """단일 PR 진단 결과 → action 분기."""
        if diag.state == STATE_WITHIN_GRACE_PERIOD:
            return SchedulerPRAction(
                pr_number=diag.pr_number,
                task_id=diag.task_id,
                head_sha=diag.head_sha,
                state=diag.state,
                action=ACTION_WITHIN_GRACE,
                reason=diag.reason,
            )
        if diag.state == "MISSING_TASK_ID":
            return SchedulerPRAction(
                pr_number=diag.pr_number,
                task_id=diag.task_id,
                head_sha=diag.head_sha,
                state=diag.state,
                action=ACTION_MISSING_TASK_ID_SKIP,
                reason=diag.reason,
            )
        if diag.state == "CI_FAILED":
            return SchedulerPRAction(
                pr_number=diag.pr_number,
                task_id=diag.task_id,
                head_sha=diag.head_sha,
                state=diag.state,
                action=ACTION_CI_FAILED_SKIP,
                reason=diag.reason,
            )
        if diag.state == STATE_GEMINI_FRESH_ON_HEAD:
            return self._handle_fresh_evidence(diag=diag, snapshot=snapshot)
        if diag.state == STATE_FIRST_TRIGGER_PENDING:
            # task-2563 §1 1:1: PENDING 은 default 로 dispatch 보류.
            # ``dispatch_decision.owner_trigger_fast_path == true`` 일 때만 조기 dispatch 허용.
            fast_path = self._load_fast_path_flag(diag=diag)
            if fast_path:
                return self._dispatch_owner_trigger(diag=diag, snapshot=snapshot)
            return SchedulerPRAction(
                pr_number=diag.pr_number,
                task_id=diag.task_id,
                head_sha=diag.head_sha,
                state=diag.state,
                action=ACTION_FIRST_TRIGGER_PENDING_SKIP,
                reason=(
                    f"{diag.reason} — fast_path=false, owner trigger 보류 "
                    f"(FIRST_TIMEOUT_SECONDS 경과 후 MISSING 확정 시 dispatch)"
                ),
            )
        if diag.state in (
            STATE_FIRST_GEMINI_TRIGGER_MISSING,
            STATE_GEMINI_STALE_ON_HEAD,
        ):
            return self._dispatch_owner_trigger(diag=diag, snapshot=snapshot)
        return SchedulerPRAction(
            pr_number=diag.pr_number,
            task_id=diag.task_id,
            head_sha=diag.head_sha,
            state=diag.state,
            action="UNKNOWN_STATE_SKIP",
            reason=diag.reason,
        )

    # ─── task-2563 §1: fast_path flag loader ──────────────────────────────

    def _load_fast_path_flag(self, *, diag: IdlePRDiagnosis) -> bool:
        """``dispatch_decision.owner_trigger_fast_path`` flag 를 로드.

        task-2563 §1 1:1: FIRST_TRIGGER_PENDING 상태에서 owner trigger 를 조기 dispatch 하려면
        반드시 dispatch_decision JSON 에 ``owner_trigger_fast_path: true`` 가 명시되어야 한다.

        loader 동작 (fail-closed):
          1. ``decision_dir / "{task_id}.dispatch-decision.json"`` 경로 확인.
          2. 없거나 JSON 파싱 실패 → False (보수적).
          3. 키 누락 / non-bool / False → False.
          4. ``True`` (Python bool) → True 반환.

        본 loader 는 외부 dispatch_decision 작성자 (회장 / dispatcher) 가 명시적으로 fast_path=true
        를 박제해야만 활성화되도록 설계되어 있다.
        """
        try:
            candidates = []
            if diag.task_id:
                candidates.append(self._decision_dir / f"{diag.task_id}.dispatch-decision.json")
            # 추가 fallback: PR 번호 기반 (task_id 가 비어있을 경우)
            candidates.append(
                self._decision_dir / f"pr-{diag.pr_number}.dispatch-decision.json"
            )
            for path in candidates:
                if not path.exists():
                    continue
                try:
                    data = json.loads(path.read_text(encoding="utf-8"))
                except (json.JSONDecodeError, OSError):
                    return False
                if not isinstance(data, dict):
                    return False
                value = data.get(DISPATCH_DECISION_FAST_PATH_KEY)
                return value is True
        except Exception:  # noqa: BLE001 — fast_path 로딩 실패도 fail-closed (False).
            logger.warning(
                "fast_path flag load failed for pr=%s task_id=%s — fail-closed (False).",
                diag.pr_number,
                diag.task_id,
                exc_info=True,
            )
            return False
        return False

    # ─── FUC-4 per-PR exception isolation (task-2560) ────────────────────

    def _safe_handle_single_diagnosis(
        self,
        *,
        diag: IdlePRDiagnosis,
        snapshot: IdlePRSnapshot,
        cycle_started: str,
    ) -> SchedulerPRAction:
        """``_handle_single_diagnosis`` 를 per-PR try/except 로 감싼다 (회장 §명시 본질 1).

        예외 분류:
          - ``BotSessionExitRequired`` / ``KeyboardInterrupt`` / ``SystemExit`` /
            ``GeneratorExit`` → 그대로 전파 (cycle 정상 종료 신호).
          - critical 7 분류 → cycle 유지하되 ESCALATED marker 박제.
          - 그 외 (Exception) → cycle 유지 + FAILED marker 박제, 다음 PR 진행.

        markers / audit:
          - decision_dir 에 ``{task_id}.pr-{pr}.exception.json`` (exception summary).
          - decision_dir 에 ``{task_id}.pr-{pr}.failed`` 또는
            ``{task_id}.pr-{pr}.critical-escalated`` marker.
          - scheduler audit JSONL 에 동일 record 박제 (호출자 ``run_one_cycle`` 책임).
        """
        # cycle 종료 신호는 isolation 으로 삼키지 않는다.
        # SystemExit / KeyboardInterrupt / GeneratorExit 는 BaseException 직접 상속이라
        # except Exception 으로 catch 되지 않는다. BotSessionExitRequired 는 RuntimeError
        # 상속이므로 명시적으로 우선 처리.
        try:
            return self._handle_single_diagnosis(diag=diag, snapshot=snapshot)
        except BotSessionExitRequired:
            raise
        except Exception as exc:   # noqa: BLE001 — per-PR isolation 의도적 광의.
            critical = _is_critical_exception(exc)
            summary = _summarize_exception(exc)
            try:
                marker_path = self._record_pr_exception_marker(
                    diag=diag,
                    summary=summary,
                    critical=critical,
                    cycle_started=cycle_started,
                )
            except Exception:   # noqa: BLE001 — marker 박제 실패도 cycle 중단 금지.
                marker_path = ""
            if critical:
                action_code = ACTION_PR_EXCEPTION_CRITICAL_ESCALATED
                reason = (
                    f"critical exception {summary['type']}: "
                    f"{summary['message']} — cycle 유지, ESCALATED marker 박제"
                )
            else:
                action_code = ACTION_PR_EXCEPTION_ISOLATED
                reason = (
                    f"isolated exception {summary['type']}: "
                    f"{summary['message']} — cycle 유지, FAILED marker 박제"
                )
            return SchedulerPRAction(
                pr_number=diag.pr_number,
                task_id=diag.task_id,
                head_sha=diag.head_sha,
                state=diag.state,
                action=action_code,
                runner_result=summary["type"],
                marker_path=marker_path,
                reason=reason,
            )

    def _record_pr_exception_marker(
        self,
        *,
        diag: IdlePRDiagnosis,
        summary: dict,
        critical: bool,
        cycle_started: str,
    ) -> str:
        """exception summary 를 decision/audit 박제. marker 경로 반환.

        파일 두 종 생성:
          1. ``{task_id}.pr-{pr}.exception.json`` — 전체 summary (read-only audit).
          2. ``{task_id}.pr-{pr}.{failed|critical-escalated}`` — 0-byte marker
             (lifecycle 감시용).
        """
        decision_dir = self._decision_dir
        decision_dir.mkdir(parents=True, exist_ok=True)
        suffix = "critical-escalated" if critical else "failed"
        task_part = diag.task_id or f"pr-{diag.pr_number}"
        summary_path = decision_dir / f"{task_part}.pr-{diag.pr_number}.exception.json"
        marker_path = decision_dir / f"{task_part}.pr-{diag.pr_number}.{suffix}"
        record = {
            "schema": PR_EXCEPTION_AUDIT_SCHEMA,
            "ts": self._clock(),
            "cycle_started_at": cycle_started,
            "pr_number": diag.pr_number,
            "task_id": diag.task_id,
            "head_sha": diag.head_sha,
            "state": diag.state,
            "critical": critical,
            "action": (
                ACTION_PR_EXCEPTION_CRITICAL_ESCALATED
                if critical
                else ACTION_PR_EXCEPTION_ISOLATED
            ),
            "exception_summary": summary,
            "owner": self._owner,
            "repo": self._repo,
            "chat_notifications": 0,
        }
        with open(summary_path, "w", encoding="utf-8") as fh:
            fcntl.flock(fh.fileno(), fcntl.LOCK_EX)
            try:
                fh.write(json.dumps(record, ensure_ascii=False, sort_keys=True, indent=2))
                fh.flush()
                os.fsync(fh.fileno())
            finally:
                fcntl.flock(fh.fileno(), fcntl.LOCK_UN)
        marker_path.touch(exist_ok=True)
        return str(marker_path)

    # ─── §7 / §12 audit append (state persisted markers) ─────────────────

    def _append_audit(
        self,
        *,
        cycle_started: str,
        action: SchedulerPRAction,
    ) -> None:
        """scheduler audit JSONL 한 줄 append. 다음 cycle 재진입 시 history 참조 가능."""
        self.audit_path.parent.mkdir(parents=True, exist_ok=True)
        record = {
            "schema": SCHEDULER_AUDIT_SCHEMA,
            "ts": self._clock(),
            "cycle_started_at": cycle_started,
            "pr_number": action.pr_number,
            "task_id": action.task_id,
            "head_sha": action.head_sha,
            "state": action.state,
            "action": action.action,
            "runner_result": action.runner_result,
            "decision_path": action.decision_path,
            "marker_path": action.marker_path,
            "reason": action.reason,
            "owner": self._owner,
            "repo": self._repo,
            "chat_notifications": 0,
        }
        with open(self.audit_path, "a", encoding="utf-8") as fh:
            fcntl.flock(fh.fileno(), fcntl.LOCK_EX)
            try:
                fh.write(json.dumps(record, ensure_ascii=False, sort_keys=True) + "\n")
                fh.flush()
                os.fsync(fh.fileno())
            finally:
                fcntl.flock(fh.fileno(), fcntl.LOCK_UN)

    # ─── §8 scheduler-driven recheck (chat 노출 0) ────────────────────────

    def schedule_recheck(
        self,
        *,
        elapsed_seconds: int,
        cycle_polling_state: PollingState | None = None,
    ) -> PollingState:
        """trigger 후 ``elapsed_seconds`` 경과 시 recheck 1 회 등록. chat 노출 0.

        본 함수는 polling 정책을 코드 게이트로 강제:
          - elapsed_seconds <= FIRST_TIMEOUT_SECONDS 검증
          - rechecks_done < MAX_RECHECKS 검증
          - 다음 PollingState 반환 (rechecks_done += 1)

        호출자 (scheduler) 는 본 함수가 반환한 state 를 다음 cycle 에 주입.
        BotSessionExitRequired 발생 시 봇은 즉시 process exit, 다음 cycle 은 외부 cron 에서.
        """
        assert_first_timeout_not_exceeded(elapsed_seconds)
        state = cycle_polling_state if cycle_polling_state is not None else PollingState(
            elapsed_seconds=elapsed_seconds
        )
        # advance_recheck 가 MAX_RECHECKS 초과 시 BotSessionExitRequired raise.
        return advance_recheck(state)


# ─── helpers ───────────────────────────────────────────────────────────


# ─── Phase 2: task-2565 second-review hook (helper-only, wire 후속 task에서) ─
#
# follow-up commit 감지 후 SecondReviewInput을 구성해 auto_trigger_owner_review를
# 호출하는 helper. 실제 wire(호출)는 IdlePRDiagnoser가 SHA 변경을 감지한 시점에
# _handle_single_diagnosis에서 연결 예정. 현재는 회귀 방지를 위해 helper 정의만 추가.
#
# 연결 위치 (예정): _handle_single_diagnosis 내 STATE_GEMINI_STALE_ON_HEAD 분기.
# try/except로 감싸 기존 _dispatch_owner_trigger 경로는 그대로 유지.


def invoke_phase2_second_review_hook(
    *,
    task_id: str,
    pr_number: int,
    old_head_sha: str,
    current_head_sha: str,
    latest_gemini_commit_id: str | None,
    ci_gate_failure_reason: str | None,
    unresolved_thread_outdated: bool,
    follow_up_commit_detected: bool,
    elapsed_since_follow_up_seconds: int,
    owner_trigger_audit_entries: tuple[dict, ...] = (),
    trigger_callable: Callable[[int, str], dict] | None = None,
    decision_dir: Path | None = None,
) -> dict:
    """Phase 2 second-review hook — follow-up commit 감지 후 자동 호출 진입점.

    task-2565 §5 Phase 2: stale detection + owner trigger 자동 호출.
    기존 경로 회귀 방지: try/except로 감싸 실패 시 빈 dict 반환.

    Args:
      task_id: 관련 task ID.
      pr_number: PR 번호.
      old_head_sha: follow-up commit 이전 head SHA.
      current_head_sha: 현재 head SHA.
      latest_gemini_commit_id: 최신 Gemini review commit_id.
      ci_gate_failure_reason: CI gate 실패 이유.
      unresolved_thread_outdated: unresolved thread OUTDATED 여부.
      follow_up_commit_detected: follow-up commit 감지 여부.
      elapsed_since_follow_up_seconds: follow-up 경과 초.
      owner_trigger_audit_entries: 기존 audit entries (dedupe 검사용).
      trigger_callable: (pr_number, head_sha) → dict 호출 가능. None=dry-run.
      decision_dir: marker 저장 디렉토리. None=MARKER_DIR.

    Returns:
      auto_trigger_owner_review 결과 dict. 예외 발생 시 {"result": "HOOK_ERROR", ...}.
    """
    try:
        inp = SecondReviewInput(
            task_id=task_id,
            pr_number=pr_number,
            old_head_sha=old_head_sha,
            current_head_sha=current_head_sha,
            latest_gemini_commit_id=latest_gemini_commit_id,
            ci_gate_failure_reason=ci_gate_failure_reason,
            unresolved_thread_outdated=unresolved_thread_outdated,
            follow_up_commit_detected=follow_up_commit_detected,
            elapsed_since_follow_up_seconds=elapsed_since_follow_up_seconds,
            owner_trigger_audit_entries=owner_trigger_audit_entries,
        )
        state = determine_state(inp)
        decision = auto_trigger_owner_review(inp, trigger_callable=trigger_callable)
        emit_phase2_markers(task_id, state, decision, marker_dir=decision_dir)
        return decision
    except Exception as exc:  # noqa: BLE001 — 회귀 방지용 광의 catch
        return {
            "result": "HOOK_ERROR",
            "reason": f"phase2 hook 예외: {type(exc).__name__}: {exc}",
            "triggered": False,
            "dedupe_key": f"{pr_number}+{current_head_sha}",
        }


def _snapshot_to_pr_meta(snapshot: IdlePRSnapshot) -> PRMeta:
    """IdlePRSnapshot → PRMeta (merge_queue_executor 호환).

    누락 필드는 보수적 기본값 (BLOCKED / unresolved) 으로 채워 fail-closed.
    """
    return PRMeta(
        number=snapshot.number,
        head_sha=snapshot.head_sha.lower(),
        head_ref=snapshot.head_ref,
        base_ref="main",
        changed_files=(),
        ci_required_all_success=snapshot.ci_required_all_success,
        gemini_status="GEMINI_UNRESOLVED",
        merge_state_status="BLOCKED",
        queue_predecessors_open=0,
    )


def _now_iso() -> str:
    return datetime.now(timezone.utc).isoformat(timespec="seconds")


__all__ = [
    "ACTION_OWNER_TRIGGER_DISPATCHED",
    "ACTION_OWNER_TRIGGER_DEDUPED",
    "ACTION_OWNER_TRIGGER_FAILED",
    "ACTION_FRESH_RESUME",
    "ACTION_WITHIN_GRACE",
    "ACTION_CI_FAILED_SKIP",
    "ACTION_MISSING_TASK_ID_SKIP",
    "ACTION_SAME_HEAD_DEDUPED",
    "ACTION_FIRST_TRIGGER_PENDING_SKIP",
    "ACTION_PR_EXCEPTION_ISOLATED",
    "ACTION_PR_EXCEPTION_CRITICAL_ESCALATED",
    "DISPATCH_DECISION_FAST_PATH_KEY",
    "PR_EXCEPTION_AUDIT_SCHEMA",
    "SCHEDULER_AUDIT_REL_PATH",
    "SCHEDULER_AUDIT_SCHEMA",
    "SCHEDULER_LOCK_REL_PATH",
    "SchedulerCycleResult",
    "SchedulerPRAction",
    "ExecutorScheduler",
    # Phase 2 (task-2565)
    "invoke_phase2_second_review_hook",
]
