"""anu_v2.merge_queue_executor — ANU v2 자동 머지 큐 실행기 v0 (task-2531).

회장 §명시 (2026-05-10) 9 기능 박제:
  1. queue head 자동 검증
  2. expected_files diff gate
  3. forbidden path gate
  4. CI / Gemini / mergeStateStatus CLEAN / HEAD SHA lock 확인
  5. BOT GITHUB_TOKEN process-local injection으로 squash merge
  6. post-merge smoke (회귀 재실행)
  7. downstream stale revalidation
  8. Critical 7종 분류 (회장 보고 대상)
  9. 비critical 자동 처리 (escalation 없이 self-resolve)

설계 원칙:
  - one-way isolation: anu_v2/* 만 import. utils/dispatch/scripts/dashboard 의존성 0.
  - 외부 부수효과는 모두 주입 가능한 callable 로 추상화 (gh_runner, git_runner,
    pytest_runner, audit_writer) — 테스트 시 mock 으로 대체 가능.
  - admin override / force / rebase / owner_pat fallback / manual .done 일체 사용 금지.
"""

from __future__ import annotations

import json
import os
import re
import subprocess
from dataclasses import dataclass, field
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Callable, Iterable, Mapping, Sequence

# task-2554+2 §3: owner_trigger_only capability 와의 통합 hook.
# anu_v2 내부 import 만 허용 (one-way isolation 유지).
from anu_v2.owner_trigger_decision import (
    ALLOWED_ACTION as OWNER_TRIGGER_ALLOWED_ACTION,
    ALLOWED_COMMENT_BODY as OWNER_TRIGGER_COMMENT_BODY,
    SCHEMA_NAME as OWNER_TRIGGER_DECISION_SCHEMA,
)


# ─── Decision codes (gate 결과) ───────────────────────────────────────────────
AUTO_MERGE_ALLOWED = "AUTO_MERGE_ALLOWED"
AUTO_MERGE_SUCCESS = "AUTO_MERGE_SUCCESS"
WAITING_FOR_PREDECESSOR = "WAITING_FOR_PREDECESSOR"
BLOCKED_WITH_REASON = "BLOCKED_WITH_REASON"
HEAD_SHA_LOCK_BROKEN = "HEAD_SHA_LOCK_BROKEN"
CI_FAILURE_BLOCK = "CI_FAILURE_BLOCK"
GEMINI_UNRESOLVED_BLOCK = "GEMINI_UNRESOLVED_BLOCK"
MERGE_STATE_NOT_CLEAN = "MERGE_STATE_NOT_CLEAN"
DIFF_CONTAMINATION = "DIFF_CONTAMINATION"
FORBIDDEN_PATH_HIT = "FORBIDDEN_PATH_HIT"
NON_CRITICAL_AUTO_RESOLVED = "NON_CRITICAL_AUTO_RESOLVED"


# ─── Critical 7종 (회장 §) ───────────────────────────────────────────────────
CRITICAL_FORBIDDEN_PATH = "FORBIDDEN_PATH_INVASION"
CRITICAL_DIFF_REPLACEMENT_FAILED = "EFFECTIVE_DIFF_CONTAMINATION_REPLACEMENT_FAILED"
CRITICAL_GEMINI_SCOPE_EXPANSION = "GEMINI_REAL_BUG_SCOPE_EXPANSION"
CRITICAL_BLOCK_OVERRIDE = "BLOCK_OVERRIDE_REQUIRED_OR_INSUFFICIENT_REASON"
CRITICAL_DEPENDENCY_CYCLE = "DEPENDENCY_CYCLE_OR_SERIAL_ONLY_CONFLICT"
CRITICAL_REPLACEMENT_FAILED = "REPLACEMENT_PR_ALSO_FAILED"
CRITICAL_POST_MERGE_SMOKE = "POST_MERGE_SMOKE_FAILURE"

# task-2554+2 §3: GEMINI stale-on-head decision codes (owner trigger integration)
GEMINI_STALE_ON_HEAD = "GEMINI_STALE_ON_HEAD"
OWNER_TRIGGER_REQUIRED = "OWNER_TRIGGER_REQUIRED"
OWNER_TRIGGER_REQUESTED = "OWNER_TRIGGER_REQUESTED"
OWNER_TRIGGER_POSTED = "OWNER_TRIGGER_POSTED"
OWNER_TRIGGER_FAILED = "OWNER_TRIGGER_FAILED"
GEMINI_FRESH_DETECTED = "GEMINI_FRESH_DETECTED"
POSTED_BUT_NO_FRESH_EVIDENCE = "POSTED_BUT_NO_FRESH_EVIDENCE"

CRITICAL_CODES: frozenset[str] = frozenset({
    CRITICAL_FORBIDDEN_PATH,
    CRITICAL_DIFF_REPLACEMENT_FAILED,
    CRITICAL_GEMINI_SCOPE_EXPANSION,
    CRITICAL_BLOCK_OVERRIDE,
    CRITICAL_DEPENDENCY_CYCLE,
    CRITICAL_REPLACEMENT_FAILED,
    CRITICAL_POST_MERGE_SMOKE,
})

# 회장 §6 — 절대 금지 git/gh flag (정적 차단)
FORBIDDEN_GIT_FLAGS: frozenset[str] = frozenset({
    "--admin", "--force", "--force-with-lease", "-f", "--rebase",
})

# Gemini 상태값
GEMINI_COMPLETED = "GEMINI_COMPLETED"
GEMINI_UNRESOLVED = "GEMINI_UNRESOLVED"
GEMINI_REAL_BUG = "GEMINI_REAL_BUG"
GEMINI_SCOPE_EXPANSION = "GEMINI_SCOPE_EXPANSION"


# ─── Data types ──────────────────────────────────────────────────────────────
@dataclass(frozen=True)
class TaskSpec:
    """task md 에서 추출한 머지 게이트 메타데이터."""
    task_id: str
    expected_files: tuple[str, ...]
    forbidden_paths: tuple[str, ...] = ()
    cherry_pick_allowed: bool = False


@dataclass(frozen=True)
class PRMeta:
    """PR 메타 (gh pr view 결과 정규화)."""
    number: int
    head_sha: str
    head_ref: str
    base_ref: str
    changed_files: tuple[str, ...]
    ci_required_all_success: bool
    gemini_status: str
    merge_state_status: str          # CLEAN / DIRTY / BLOCKED / BEHIND / UNKNOWN
    queue_predecessors_open: int     # 선행 PR 중 OPEN 상태 개수


@dataclass
class GateOutcome:
    """gate 평가 결과."""
    decision: str
    reason: str = ""
    critical_code: str = ""
    extra: dict[str, Any] = field(default_factory=dict)

    @property
    def passed(self) -> bool:
        return self.decision in (AUTO_MERGE_ALLOWED, AUTO_MERGE_SUCCESS)

    @property
    def is_critical(self) -> bool:
        return bool(self.critical_code) and self.critical_code in CRITICAL_CODES


# ─── 외부 부수효과 콜백 시그니처 ─────────────────────────────────────────────
GhRunner = Callable[[Sequence[str], Mapping[str, str] | None], subprocess.CompletedProcess]
GitRunner = Callable[[Sequence[str]], subprocess.CompletedProcess]
PytestRunner = Callable[[Sequence[str]], int]            # 종료 코드 반환
AuditWriter = Callable[[Mapping[str, Any]], None]


# ─── Helpers ────────────────────────────────────────────────────────────────
def _now_iso() -> str:
    return datetime.now(timezone.utc).isoformat(timespec="seconds")


def _coerce_stream(value: Any) -> str:
    """subprocess stdout/stderr 정규화: None → "", bytes → utf-8 decode, str → 그대로.

    `text=True` 미설정 호출이 섞여 들어와도 직렬화 시점에 `bytes` 가 dict 에 박히지
    않게 한다.
    """
    if value is None:
        return ""
    if isinstance(value, bytes):
        return value.decode("utf-8", errors="replace")
    return str(value)


def assert_no_forbidden_git_flags(args: Sequence[str]) -> None:
    """gh / git 호출 인자에 admin/force/rebase 금지 플래그가 섞이면 즉시 RuntimeError.

    회장 §6 정적 차단 — 호출부에서 우회를 막는 단방향 hard-stop.

    검사 규칙:
      - exact: 인자 토큰이 FORBIDDEN 셋과 정확히 일치 (`--admin`, `--force`, ...).
      - prefix: 값 형태 (`--admin=true`, `--force-with-lease=...`) 도 차단.
      - 단축 결합 (`-af`): `-`/`--` 접두어가 1개일 때 각 문자가 단축 금지에 포함되는지.
        본 모듈은 `-f` 만 단축 형태로 사용 가정 (`-af`, `-fa` 모두 매칭).
    """
    short_chars = {flag[1:] for flag in FORBIDDEN_GIT_FLAGS if len(flag) == 2 and flag.startswith("-")}
    bad: list[str] = []
    for token in args:
        if not isinstance(token, str):
            continue
        if token in FORBIDDEN_GIT_FLAGS:
            bad.append(token)
            continue
        # `--admin=...` 등 값 포함형
        if "=" in token:
            head = token.split("=", 1)[0]
            if head in FORBIDDEN_GIT_FLAGS:
                bad.append(token)
                continue
        # 단축 결합 `-af` 처리: `--` (long) 가 아닌 `-X..` 형태에서만
        if (
            short_chars
            and len(token) > 2
            and token.startswith("-")
            and not token.startswith("--")
        ):
            if any(ch in short_chars for ch in token[1:]):
                bad.append(token)
                continue
    if bad:
        # 중복 제거 + 정렬 (테스트 결정성)
        unique = sorted(dict.fromkeys(bad))
        raise RuntimeError(f"FORBIDDEN_GIT_FLAG_BLOCKED: {unique}")


def _parse_yaml_list(block: str) -> tuple[str, ...]:
    """간단한 inline YAML list 파서 (`expected_files:` 블록의 `- "..."`).

    외부 yaml 모듈 의존성을 피하기 위해 직접 구현. 본 모듈이 다루는
    task md format 만 지원하면 되므로 minimal grammar.
    """
    items: list[str] = []
    for line in block.splitlines():
        # 인용된 항목 (큰/작은 따옴표 모두) 은 `#` 을 그대로 허용 (예: `C#_file.cs`),
        # 비인용 항목에서만 `#` 이후를 주석으로 절단한다.
        m = re.match(
            r"^\s*-\s*(?:\"([^\"]+)\"|'([^']+)'|([^#]+?))\s*(?:#.*)?$",
            line,
        )
        if m:
            value = next((g for g in m.groups() if g is not None), "")
            items.append(value.strip())
    return tuple(items)


def load_task_spec_from_md(task_md_path: Path) -> TaskSpec:
    """task md 파일에서 expected_files / forbidden_paths / cherry_pick_allowed 추출."""
    text = task_md_path.read_text(encoding="utf-8")
    task_id = task_md_path.stem  # e.g. "task-2531"

    def _extract_block(key: str) -> str:
        # `key:` 다음 줄부터 들여쓰기된 라인을 모은다.
        # - 키 자체가 들여쓰기 (YAML 블록 / fenced code block 내부) 된 경우도 허용.
        # - 종료 조건: 키와 같거나 더 얕은 들여쓰기 깊이의 비빈 라인을 만나면 종료
        #   (sibling key 오인 방지).
        # - 빈 줄 / 주석 라인은 보존하여 _parse_yaml_list 에서 무시하도록 한다.
        head_re = re.compile(rf"^([ \t]*){re.escape(key)}\s*:\s*$", re.MULTILINE)
        head = head_re.search(text)
        if not head:
            return ""
        key_indent = len(head.group(1))
        lines = text[head.end():].splitlines()
        block: list[str] = []
        for ln in lines:
            stripped = ln.lstrip()
            if not stripped:
                block.append(ln)
                continue
            if stripped.startswith("#"):
                block.append(ln)
                continue
            # 들여쓰기 깊이 비교 — key 보다 깊은 라인만 블록에 포함
            current_indent = len(ln) - len(stripped)
            if current_indent <= key_indent:
                break
            block.append(ln)
        return "\n".join(block) + ("\n" if block else "")

    expected = _parse_yaml_list(_extract_block("expected_files"))
    forbidden = _parse_yaml_list(_extract_block("forbidden_paths"))

    # cherry_pick_allowed 는 YAML 블록 내부의 값만 신뢰한다. 본문 prose 에 같은
    # 키워드가 등장해도 영향을 주지 않게, ```yaml ... ``` 펜스 내부 텍스트와
    # expected_files / forbidden_paths 인접 영역을 우선 검색.
    cherry_search_targets: list[str] = []
    for fence in re.findall(r"```ya?ml\s*\n([\s\S]*?)```", text, re.IGNORECASE):
        cherry_search_targets.append(fence)
    cherry_search_targets.append(_extract_block("expected_files"))
    cherry_search_targets.append(_extract_block("forbidden_paths"))
    cherry = False
    cherry_re = re.compile(
        # 행 끝 주석 (`# ...`) 도 허용하여 다른 파서들과 일관성 유지.
        r"^[ \t]*cherry_pick_allowed\s*:\s*(true|false)\s*(?:#.*)?$",
        re.MULTILINE | re.IGNORECASE,
    )
    for chunk in cherry_search_targets:
        if not chunk:
            continue
        m = cherry_re.search(chunk)
        if m:
            cherry = m.group(1).lower() == "true"
            break

    return TaskSpec(
        task_id=task_id,
        expected_files=expected,
        forbidden_paths=forbidden,
        cherry_pick_allowed=cherry,
    )


# ─── 본체 ────────────────────────────────────────────────────────────────────
class MergeQueueExecutor:
    """ANU v2 자동 머지 큐 실행기 v0.

    9 기능을 method 단위로 분리. 부수효과는 생성자 주입 callable 로 분리되어
    테스트는 mock callable 만 교체하면 충분하다.
    """

    def __init__(
        self,
        *,
        gh_runner: GhRunner,
        git_runner: GitRunner,
        pytest_runner: PytestRunner,
        audit_writer: AuditWriter,
        task_md_root: Path,
        bot_token_env: str = "BOT_GITHUB_TOKEN",
    ) -> None:
        self._gh = gh_runner
        self._git = git_runner
        self._pytest = pytest_runner
        self._audit = audit_writer
        self._task_md_root = Path(task_md_root)
        self._bot_token_env = bot_token_env

    # ── 1. queue head 자동 검증 ──────────────────────────────────────────────
    def evaluate_queue_head(self, pr: PRMeta) -> GateOutcome:
        """선행 PR 모두 머지 완료 (queue_predecessors_open == 0) 인지 검증."""
        if pr.queue_predecessors_open > 0:
            return GateOutcome(
                decision=WAITING_FOR_PREDECESSOR,
                reason=f"{pr.queue_predecessors_open} predecessor(s) still open",
            )
        return GateOutcome(decision=AUTO_MERGE_ALLOWED, reason="queue_head_confirmed")

    # ── 2. expected_files diff gate ──────────────────────────────────────────
    def check_expected_files_diff(
        self,
        spec: TaskSpec,
        pr: PRMeta,
    ) -> GateOutcome:
        """PR diff 가 task md expected_files 와 정확히 일치하는지 검증.

        불일치는 DIFF_CONTAMINATION (비critical) — 정정 트랙은 본 v0 범위 외.
        """
        expected = set(spec.expected_files)
        actual = set(pr.changed_files)
        if expected == actual:
            return GateOutcome(decision=AUTO_MERGE_ALLOWED, reason="expected_files_match")
        missing = expected - actual
        extra = actual - expected
        return GateOutcome(
            decision=DIFF_CONTAMINATION,
            reason="expected_files_mismatch",
            extra={"missing": sorted(missing), "extra": sorted(extra)},
        )

    # ── 3. forbidden path gate ───────────────────────────────────────────────
    def check_forbidden_paths(self, spec: TaskSpec, pr: PRMeta) -> GateOutcome:
        """PR diff 에 task md forbidden_paths 매칭 파일이 있으면 Critical 7종."""
        if not spec.forbidden_paths:
            return GateOutcome(decision=AUTO_MERGE_ALLOWED, reason="no_forbidden_paths")

        hits: list[str] = []
        for changed in pr.changed_files:
            for pattern in spec.forbidden_paths:
                if _glob_match(pattern, changed):
                    hits.append(changed)
                    break
        if hits:
            return GateOutcome(
                decision=BLOCKED_WITH_REASON,
                reason=FORBIDDEN_PATH_HIT,
                critical_code=CRITICAL_FORBIDDEN_PATH,
                extra={"hits": sorted(set(hits))},
            )
        return GateOutcome(decision=AUTO_MERGE_ALLOWED, reason="forbidden_paths_clean")

    # ── 4. CI / Gemini / CLEAN / HEAD SHA lock ──────────────────────────────
    def check_ci_gemini_clean_sha_lock(
        self,
        pr: PRMeta,
        *,
        head_sha_at_lock: str,
    ) -> GateOutcome:
        """CI required all SUCCESS + Gemini unresolved 0 + CLEAN + HEAD SHA 일치."""
        if not pr.ci_required_all_success:
            return GateOutcome(
                decision=CI_FAILURE_BLOCK,
                reason="ci_required_not_all_success",
            )
        if pr.gemini_status == GEMINI_SCOPE_EXPANSION:
            return GateOutcome(
                decision=BLOCKED_WITH_REASON,
                reason="gemini_scope_expansion",
                critical_code=CRITICAL_GEMINI_SCOPE_EXPANSION,
            )
        if pr.gemini_status not in (GEMINI_COMPLETED,):
            return GateOutcome(
                decision=GEMINI_UNRESOLVED_BLOCK,
                reason=f"gemini_status={pr.gemini_status}",
            )
        if pr.merge_state_status != "CLEAN":
            decision = MERGE_STATE_NOT_CLEAN
            critical = ""
            if pr.merge_state_status == "BLOCKED":
                decision = BLOCKED_WITH_REASON
                critical = CRITICAL_BLOCK_OVERRIDE
            return GateOutcome(
                decision=decision,
                reason=f"mergeStateStatus={pr.merge_state_status}",
                critical_code=critical,
            )
        if pr.head_sha != head_sha_at_lock:
            return GateOutcome(
                decision=HEAD_SHA_LOCK_BROKEN,
                reason="head_sha_changed_during_evaluation",
                extra={"locked": head_sha_at_lock, "current": pr.head_sha},
            )
        return GateOutcome(decision=AUTO_MERGE_ALLOWED, reason="all_gates_clean")

    # ── 5. BOT identity squash merge ─────────────────────────────────────────
    def execute_bot_squash_merge(
        self,
        pr: PRMeta,
        *,
        head_sha_at_lock: str,
    ) -> GateOutcome:
        """`GH_TOKEN=$BOT_GITHUB_TOKEN` process-local injection 으로 squash merge.

        - 호스트 환경의 GH_TOKEN / GITHUB_TOKEN 은 절대 그대로 사용하지 않는다.
        - admin / force / rebase 플래그는 정적 차단 (assert_no_forbidden_git_flags).
        """
        token = os.environ.get(self._bot_token_env, "").strip()
        if not token:
            return GateOutcome(
                decision=BLOCKED_WITH_REASON,
                reason="bot_token_unavailable",
                critical_code=CRITICAL_BLOCK_OVERRIDE,
            )

        args = [
            "pr", "merge", str(pr.number),
            "--squash",
            "--match-head-commit", head_sha_at_lock,
        ]
        assert_no_forbidden_git_flags(args)

        # process-local env: parent의 PATH 등 실행 환경은 유지하되, 토큰은 BOT 으로 덮어쓴다.
        # 빈 dict 로 시작하면 PATH 누락 → gh/git 미발견. owner PAT 등 외부 토큰은 그대로
        # 두지 않고 BOT 토큰으로 명시적 덮어써서 격리한다.
        env = os.environ.copy()
        env["GH_TOKEN"] = token
        env["GITHUB_TOKEN"] = token
        cp = self._gh(args, env)
        if cp.returncode != 0:
            return GateOutcome(
                decision=BLOCKED_WITH_REASON,
                reason="gh_pr_merge_failed",
                critical_code=CRITICAL_BLOCK_OVERRIDE,
                # capture_output=False 이거나 텍스트 모드 비활성화 등으로 stdout/stderr 가
                # None / bytes 일 수 있다. 항상 str 로 정규화하여 직렬화 안전성을 보장.
                extra={"stderr": _coerce_stream(getattr(cp, "stderr", None))[:512]},
            )
        return GateOutcome(
            decision=AUTO_MERGE_SUCCESS,
            reason="bot_squash_merge_completed",
            extra={"stdout": _coerce_stream(getattr(cp, "stdout", None))[:512]},
        )

    # ── 6. post-merge smoke ──────────────────────────────────────────────────
    def run_post_merge_smoke(
        self,
        *,
        smoke_test_paths: Sequence[str],
    ) -> GateOutcome:
        """머지 후 회귀 (smoke) 재실행."""
        if not smoke_test_paths:
            return GateOutcome(decision=AUTO_MERGE_SUCCESS, reason="smoke_skipped_no_targets")
        rc = self._pytest(list(smoke_test_paths))
        if rc != 0:
            return GateOutcome(
                decision=BLOCKED_WITH_REASON,
                reason="post_merge_smoke_failed",
                critical_code=CRITICAL_POST_MERGE_SMOKE,
                extra={"pytest_exit_code": rc},
            )
        return GateOutcome(decision=AUTO_MERGE_SUCCESS, reason="post_merge_smoke_passed")

    # ── 7. downstream stale revalidation ─────────────────────────────────────
    def revalidate_downstream(
        self,
        *,
        merged_task_id: str,
        downstream_pr_numbers: Iterable[int],
    ) -> list[dict[str, Any]]:
        """머지로 stale 이 된 다른 OPEN PR 의 evidence 재검증 트리거.

        모든 gh 호출은 BOT 토큰 process-local injection 으로 격리한다 (회장 §6 — 봇
        identity 강제). 외부 owner_pat 누출 차단.
        """
        token = os.environ.get(self._bot_token_env, "").strip()
        if not token:
            # 토큰 없으면 머지 단계와 동일하게 실행 거부 (silent skip 금지).
            return [
                {
                    "pr_number": pr_no,
                    "ok": False,
                    "reason": "bot_token_unavailable",
                    "ts": _now_iso(),
                }
                for pr_no in downstream_pr_numbers
            ]
        env = os.environ.copy()
        env["GH_TOKEN"] = token
        env["GITHUB_TOKEN"] = token

        results: list[dict[str, Any]] = []
        for pr_no in downstream_pr_numbers:
            args = ["pr", "comment", str(pr_no), "-b",
                    f"[anu_v2.merge_queue_executor] base merged: {merged_task_id} → "
                    "evidence revalidation requested (CI rerun + Gemini retrigger)."]
            assert_no_forbidden_git_flags(args)
            cp = self._gh(args, env)
            results.append({
                "pr_number": pr_no,
                "ok": cp.returncode == 0,
                "ts": _now_iso(),
            })
        return results

    # ── 8. Critical 7종 분류 / 9. 비critical 자동 처리 ────────────────────────
    def classify_failure(self, outcome: GateOutcome) -> str:
        """Critical 7종 코드면 그대로 반환, 그 외는 NON_CRITICAL_AUTO_RESOLVED."""
        if outcome.is_critical:
            return outcome.critical_code
        return NON_CRITICAL_AUTO_RESOLVED

    def auto_handle_non_critical(self, outcome: GateOutcome) -> GateOutcome:
        """비critical 결과는 audit 에만 기록하고 self-resolve. 회장 보고 X.

        WAITING_FOR_PREDECESSOR 는 머지 큐의 정상적인 대기 상태라 짧은 주기로 반복
        실행 시 audit 로그가 과도하게 누적된다. 대기 상태는 로깅을 생략하고 그대로
        반환만 한다.
        """
        if outcome.decision != WAITING_FOR_PREDECESSOR:
            self._audit({
                "ts": _now_iso(),
                "kind": "non_critical_auto_resolved",
                "decision": outcome.decision,
                "reason": outcome.reason,
                "extra": dict(outcome.extra),
            })
        return GateOutcome(
            decision=NON_CRITICAL_AUTO_RESOLVED,
            reason=outcome.reason,
            extra=dict(outcome.extra),
        )

    # ── 통합 평가 (전체 파이프라인) ─────────────────────────────────────────
    def evaluate(
        self,
        *,
        pr: PRMeta,
        head_sha_at_lock: str,
    ) -> GateOutcome:
        """9 기능 중 게이트 1~4 를 순차 평가하고 최종 결과를 반환.

        실제 머지 (5) / 스모크 (6) / 다운스트림 재검증 (7) 은 호출부에서 명시 호출.
        """
        # head_ref 는 외부 입력값. branch 명에 `../` 등이 들어오면 task_md_root 외부의
        # 파일을 가리킬 수 있으므로 task_id 의 파일명 부분만 추출해 root 안으로 강제 한정.
        raw_task_id = _extract_task_id_from_branch(pr.head_ref)
        safe_task_id = Path(raw_task_id).name
        spec_path = self._task_md_root / f"{safe_task_id}.md"
        if not spec_path.exists():
            return GateOutcome(
                decision=BLOCKED_WITH_REASON,
                reason="task_md_missing",
                critical_code=CRITICAL_BLOCK_OVERRIDE,
                extra={"path": str(spec_path)},
            )
        spec = load_task_spec_from_md(spec_path)

        # 평가 순서: Critical-우선. forbidden_path 와 ci/gemini/scope_expansion 같은
        # Critical 7종 후보 게이트를 먼저 실행해서 보안 위반이 큐 대기/diff 불일치보다
        # 늦게 발견되는 일이 없게 한다. 모든 게이트는 부수효과가 없는 read-only 평가.
        for gate in (
            self.check_forbidden_paths(spec, pr),
            self.check_ci_gemini_clean_sha_lock(pr, head_sha_at_lock=head_sha_at_lock),
            self.evaluate_queue_head(pr),
            self.check_expected_files_diff(spec, pr),
        ):
            if not gate.passed:
                if not gate.is_critical:
                    return self.auto_handle_non_critical(gate)
                return gate

        return GateOutcome(decision=AUTO_MERGE_ALLOWED, reason="all_4_gates_pass")

    # ─── §3 owner trigger integration (task-2554+2) ─────────────────────────

    def detect_gemini_stale_on_head(
        self,
        *,
        pr: PRMeta,
        gemini_review_commit_id: str | None,
    ) -> GateOutcome:
        """PR head SHA != 최신 Gemini review commit_id 면 GEMINI_STALE_ON_HEAD.

        Args:
          pr: PR meta (head_sha = 현재 PR head 의 40-char hex).
          gemini_review_commit_id: 가장 최근 Gemini review 가 가리키는 commit_id (없으면 None).

        Returns:
          GateOutcome — stale 일 때 decision=GEMINI_STALE_ON_HEAD.
          fresh (head == commit_id) 일 때 AUTO_MERGE_ALLOWED (검사 통과).

        회장 §3: stale 감지 시 OWNER_TRIGGER_REQUIRED decision 생성 단계로 전환.
        """
        head = pr.head_sha
        if not isinstance(head, str) or len(head) != 40:
            return GateOutcome(
                decision=BLOCKED_WITH_REASON,
                reason="invalid_pr_head_sha",
                critical_code=CRITICAL_BLOCK_OVERRIDE,
            )
        if gemini_review_commit_id is None:
            # Gemini review 가 PR 에 아직 1 건도 없음 — first-review-pending.
            # OWNER_TRIGGER_REQUIRED 까지는 아님 (외부 트리거 채널의 자동 1st review 대기).
            return GateOutcome(
                decision=BLOCKED_WITH_REASON,
                reason="gemini_first_review_pending",
                extra={"head": head},
            )
        if not isinstance(gemini_review_commit_id, str) or len(gemini_review_commit_id) != 40:
            return GateOutcome(
                decision=BLOCKED_WITH_REASON,
                reason="invalid_gemini_review_commit_id",
                critical_code=CRITICAL_BLOCK_OVERRIDE,
            )
        if head.lower() == gemini_review_commit_id.lower():
            return GateOutcome(
                decision=AUTO_MERGE_ALLOWED,
                reason="gemini_review_fresh_on_head",
                extra={"head": head},
            )
        return GateOutcome(
            decision=GEMINI_STALE_ON_HEAD,
            reason="gemini_review_stale_against_head",
            extra={
                "head": head,
                "gemini_review_commit_id": gemini_review_commit_id.lower(),
            },
        )

    def emit_owner_trigger_decision(
        self,
        *,
        task_id: str,
        pr: PRMeta,
        decision_dir: Path,
    ) -> dict[str, Any]:
        """OWNER_TRIGGER_REQUIRED decision JSON 을 disk 에 기록.

        schema: ``anu_v2.owner_trigger_decision.v1`` (8 PASS 조건 1:1).

        파일 경로: ``<decision_dir>/<task_id>.owner_trigger_decision.json``
        marker 동반:
          - ``<task_id>.owner-trigger.requested`` (decision 생성 시점 표식)

        Returns:
          decision dict (memory 에서 schema 검증 가능). schema 위반 시 ValueError.
        """
        if not isinstance(task_id, str) or not task_id:
            raise ValueError("task_id must be non-empty string")
        head = pr.head_sha
        if not isinstance(head, str) or len(head) != 40:
            raise ValueError("pr.head_sha must be 40-char hex")
        decision = {
            "schema": OWNER_TRIGGER_DECISION_SCHEMA,
            "task_id": task_id,
            "pr": int(pr.number),
            "current_head": head.lower(),
            "queue_head": True,
            "current_head_confirmed": True,
            "gemini_evidence_fresh": False,
            "nudge_count_for_pr_head": 0,
            "allowed_action": OWNER_TRIGGER_ALLOWED_ACTION,
            "comment_body": OWNER_TRIGGER_COMMENT_BODY,
            "allowed": True,
        }
        decision_dir = Path(decision_dir)
        decision_dir.mkdir(parents=True, exist_ok=True)
        decision_path = decision_dir / f"{task_id}.owner_trigger_decision.json"
        decision_path.write_text(
            json.dumps(decision, ensure_ascii=False, sort_keys=True, indent=2),
            encoding="utf-8",
        )
        requested_marker = decision_dir / f"{task_id}.owner-trigger.requested"
        requested_marker.write_text(
            json.dumps(
                {
                    "ts": _now_iso(),
                    "task_id": task_id,
                    "pr": int(pr.number),
                    "head": head.lower(),
                    "decision_path": str(decision_path),
                    "decision_code": OWNER_TRIGGER_REQUIRED,
                },
                ensure_ascii=False,
                sort_keys=True,
            )
            + "\n",
            encoding="utf-8",
        )
        return decision

    def record_owner_trigger_outcome(
        self,
        *,
        task_id: str,
        pr: PRMeta,
        outcome_code: str,
        decision_dir: Path,
        extra: Mapping[str, Any] | None = None,
    ) -> Path:
        """owner trigger 결과 (POSTED / FAILED) marker 파일 생성.

        marker 종류:
          - ``<task_id>.owner-trigger.posted`` (outcome_code == OWNER_TRIGGER_POSTED)
          - ``<task_id>.owner-trigger.failed`` (outcome_code == OWNER_TRIGGER_FAILED)

        Returns:
          생성된 marker 파일 경로.

        Raises:
          ValueError: outcome_code 가 허용 2 종 외.
        """
        if outcome_code == OWNER_TRIGGER_POSTED:
            suffix = ".owner-trigger.posted"
        elif outcome_code == OWNER_TRIGGER_FAILED:
            suffix = ".owner-trigger.failed"
        else:
            raise ValueError(
                f"outcome_code must be OWNER_TRIGGER_POSTED|OWNER_TRIGGER_FAILED, got {outcome_code!r}"
            )
        head = pr.head_sha
        decision_dir = Path(decision_dir)
        decision_dir.mkdir(parents=True, exist_ok=True)
        marker_path = decision_dir / f"{task_id}{suffix}"
        payload: dict[str, Any] = {
            "ts": _now_iso(),
            "task_id": task_id,
            "pr": int(pr.number),
            "head": head.lower() if isinstance(head, str) and len(head) == 40 else head,
            "outcome_code": outcome_code,
        }
        if extra:
            payload["extra"] = dict(extra)
        marker_path.write_text(
            json.dumps(payload, ensure_ascii=False, sort_keys=True) + "\n",
            encoding="utf-8",
        )
        return marker_path

    def detect_fresh_gemini_review(
        self,
        *,
        pr: PRMeta,
        latest_gemini_review_commit_id: str | None,
    ) -> GateOutcome:
        """post 후 fresh Gemini review 도착 감지 (review.commit_id == pr.head_sha).

        Returns:
          AUTO_MERGE_ALLOWED + reason=GEMINI_FRESH_DETECTED 시 자동 resume 신호.
          그 외는 BLOCKED_WITH_REASON.
        """
        head = pr.head_sha
        if (
            isinstance(latest_gemini_review_commit_id, str)
            and len(latest_gemini_review_commit_id) == 40
            and isinstance(head, str)
            and head.lower() == latest_gemini_review_commit_id.lower()
        ):
            return GateOutcome(
                decision=AUTO_MERGE_ALLOWED,
                reason=GEMINI_FRESH_DETECTED,
                extra={
                    "head": head.lower(),
                    "gemini_review_commit_id": latest_gemini_review_commit_id.lower(),
                },
            )
        return GateOutcome(
            decision=BLOCKED_WITH_REASON,
            reason="gemini_fresh_review_not_yet",
            extra={
                "head": head if isinstance(head, str) else "",
                "latest_commit_id": latest_gemini_review_commit_id or "",
            },
        )

    def mark_gemini_fresh_detected(
        self,
        *,
        task_id: str,
        pr: PRMeta,
        gemini_review_commit_id: str,
        decision_dir: Path,
    ) -> Path:
        """fresh Gemini review 도착 시 ``<task_id>.gemini-fresh-detected`` marker 생성.

        marker payload: ts / task_id / pr / head / gemini_review_commit_id /
        decision_code=GEMINI_FRESH_DETECTED. 외부 executor 가 본 marker 를 보고 auto-resume.
        """
        if (
            not isinstance(gemini_review_commit_id, str)
            or len(gemini_review_commit_id) != 40
        ):
            raise ValueError("gemini_review_commit_id must be 40-char hex SHA")
        head = pr.head_sha
        if (
            not isinstance(head, str)
            or head.lower() != gemini_review_commit_id.lower()
        ):
            raise ValueError("pr.head_sha must match gemini_review_commit_id for fresh marker")
        decision_dir = Path(decision_dir)
        decision_dir.mkdir(parents=True, exist_ok=True)
        marker_path = decision_dir / f"{task_id}.gemini-fresh-detected"
        marker_path.write_text(
            json.dumps(
                {
                    "ts": _now_iso(),
                    "task_id": task_id,
                    "pr": int(pr.number),
                    "head": head.lower(),
                    "gemini_review_commit_id": gemini_review_commit_id.lower(),
                    "decision_code": GEMINI_FRESH_DETECTED,
                    "auto_resume": True,
                },
                ensure_ascii=False,
                sort_keys=True,
            )
            + "\n",
            encoding="utf-8",
        )
        return marker_path

    def orchestrate_owner_trigger_for_stale_pr(
        self,
        *,
        task_id: str,
        pr: PRMeta,
        stale_gemini_review_commit_id: str | None,
        owner_trigger_runner: Callable[[Path], str],
        decision_dir: Path,
    ) -> GateOutcome:
        """§3 통합 orchestration — stale 감지 → decision 생성 → runner 호출 → marker 기록.

        흐름:
          1. ``detect_gemini_stale_on_head`` 로 stale 여부 판정.
          2. stale 이면 ``emit_owner_trigger_decision`` 호출 (decision.json + requested marker).
          3. ``owner_trigger_runner(decision_path)`` 호출 (외부에서 OwnerTriggerOnly.trigger_gemini_review
             wrapping callable 주입).
          4. runner 결과 (POSTED / DEDUPED / FAILED) 에 따라 marker 기록.
          5. AUTO_MERGE_ALLOWED + reason=OWNER_TRIGGER_REQUESTED|OWNER_TRIGGER_POSTED 반환.

        Args:
          task_id: e.g. "task-2554+2".
          pr: PR meta.
          stale_gemini_review_commit_id: 가장 최근 Gemini review commit_id (stale 검증용).
          owner_trigger_runner: ``Callable[[Path], str]`` — decision_path 받아서 runner 결과
            string ("POSTED" / "DEDUPED" / "FAILED" / "PENDING") 반환. 본 모듈은 직접 OWNER token
            을 만지지 않고 callable 만 사용 (분리 원칙).
          decision_dir: marker / decision.json 저장 디렉토리.

        Returns:
          GateOutcome — orchestration 결과.
        """
        stale_outcome = self.detect_gemini_stale_on_head(
            pr=pr, gemini_review_commit_id=stale_gemini_review_commit_id,
        )
        if stale_outcome.decision != GEMINI_STALE_ON_HEAD:
            # not stale — pass through (no owner trigger needed)
            return stale_outcome

        # decision JSON + requested marker
        self.emit_owner_trigger_decision(
            task_id=task_id, pr=pr, decision_dir=decision_dir,
        )
        decision_path = Path(decision_dir) / f"{task_id}.owner_trigger_decision.json"

        # runner 호출 (외부 callable — OwnerTriggerOnly 분리)
        runner_result = owner_trigger_runner(decision_path)

        # 결과 → marker
        if runner_result == "POSTED":
            self.record_owner_trigger_outcome(
                task_id=task_id, pr=pr,
                outcome_code=OWNER_TRIGGER_POSTED,
                decision_dir=decision_dir,
            )
            return GateOutcome(
                decision=AUTO_MERGE_ALLOWED,
                reason=OWNER_TRIGGER_POSTED,
                extra={"runner_result": runner_result},
            )
        if runner_result == "DEDUPED":
            # 이미 다른 process 가 trigger 완료 — 정상 (auto-resume 가능)
            return GateOutcome(
                decision=AUTO_MERGE_ALLOWED,
                reason="owner_trigger_dedupe",
                extra={"runner_result": runner_result},
            )
        # FAILED / PENDING / other
        self.record_owner_trigger_outcome(
            task_id=task_id, pr=pr,
            outcome_code=OWNER_TRIGGER_FAILED,
            decision_dir=decision_dir,
            extra={"runner_result": runner_result},
        )
        return GateOutcome(
            decision=BLOCKED_WITH_REASON,
            reason=OWNER_TRIGGER_FAILED,
            critical_code=CRITICAL_BLOCK_OVERRIDE,
            extra={"runner_result": runner_result},
        )

    # ─── task-2556 §3 minimal patch: scheduler entry point hook ──────────────

    def auto_resume_after_fresh_evidence(
        self,
        *,
        task_id: str,
        pr: PRMeta,
        latest_gemini_review_commit_id: str | None,
        decision_dir: Path,
    ) -> GateOutcome:
        """ExecutorScheduler 가 호출하는 auto-resume 진입점 (task-2556 §3).

        흐름:
          1. fresh Gemini review 도착 여부 확인 (commit_id == pr.head_sha).
          2. fresh 도착 시 ``<task>.gemini-fresh-detected`` marker 생성.
          3. AUTO_MERGE_ALLOWED + reason=GEMINI_FRESH_DETECTED 반환 (executor 가 후속 진행).
          4. 미도착 시 BLOCKED_WITH_REASON + reason="gemini_fresh_review_not_yet".

        본 메서드는 task-2554+2 가 추가한 ``detect_fresh_gemini_review`` 와
        ``mark_gemini_fresh_detected`` 를 단일 진입점으로 묶는 **minimal patch** 이다.
        scheduler 는 본 메서드만 호출하면 fresh 도착 시 marker 생성 + AUTO_MERGE_ALLOWED
        결과를 받는다 — 회장 §12 (state persisted markers 기반 재진입).

        Args:
          task_id: e.g. "task-2554+2".
          pr: PR meta.
          latest_gemini_review_commit_id: 가장 최근 Gemini review commit_id (없으면 None).
          decision_dir: marker 저장 디렉토리.

        Returns:
          GateOutcome.
        """
        fresh_outcome = self.detect_fresh_gemini_review(
            pr=pr,
            latest_gemini_review_commit_id=latest_gemini_review_commit_id,
        )
        if (
            fresh_outcome.decision == AUTO_MERGE_ALLOWED
            and fresh_outcome.reason == GEMINI_FRESH_DETECTED
            and isinstance(latest_gemini_review_commit_id, str)
            and len(latest_gemini_review_commit_id) == 40
        ):
            marker_path = self.mark_gemini_fresh_detected(
                task_id=task_id,
                pr=pr,
                gemini_review_commit_id=latest_gemini_review_commit_id,
                decision_dir=decision_dir,
            )
            return GateOutcome(
                decision=AUTO_MERGE_ALLOWED,
                reason=GEMINI_FRESH_DETECTED,
                extra={
                    "head": pr.head_sha.lower() if isinstance(pr.head_sha, str) else "",
                    "gemini_review_commit_id": latest_gemini_review_commit_id.lower(),
                    "marker_path": str(marker_path),
                    "auto_resume": True,
                },
            )
        return fresh_outcome

    def record_no_fresh_evidence_after_post(
        self,
        *,
        task_id: str,
        pr: PRMeta,
        elapsed_seconds: int,
        threshold_seconds: int,
        decision_dir: Path,
    ) -> GateOutcome:
        """owner trigger post 후 ``threshold_seconds`` (회장 § = 24h) 경과해도 fresh 미도착 시
        ESCALATED marker 생성 + auto-resume 안 함.

        marker: ``<task_id>.posted-but-no-fresh-evidence``
        Returns:
          GateOutcome (decision=BLOCKED_WITH_REASON, reason=POSTED_BUT_NO_FRESH_EVIDENCE,
          critical_code=CRITICAL_BLOCK_OVERRIDE — 회장 결정 대기).
        """
        if elapsed_seconds < threshold_seconds:
            return GateOutcome(
                decision=BLOCKED_WITH_REASON,
                reason="within_grace_period",
                extra={
                    "elapsed_seconds": elapsed_seconds,
                    "threshold_seconds": threshold_seconds,
                },
            )
        decision_dir = Path(decision_dir)
        decision_dir.mkdir(parents=True, exist_ok=True)
        marker_path = decision_dir / f"{task_id}.posted-but-no-fresh-evidence"
        head = pr.head_sha
        marker_path.write_text(
            json.dumps(
                {
                    "ts": _now_iso(),
                    "task_id": task_id,
                    "pr": int(pr.number),
                    "head": head.lower() if isinstance(head, str) and len(head) == 40 else head,
                    "elapsed_seconds": elapsed_seconds,
                    "threshold_seconds": threshold_seconds,
                    "decision_code": POSTED_BUT_NO_FRESH_EVIDENCE,
                    "escalation": "OWNER_DECISION_REQUIRED",
                },
                ensure_ascii=False,
                sort_keys=True,
            )
            + "\n",
            encoding="utf-8",
        )
        return GateOutcome(
            decision=BLOCKED_WITH_REASON,
            reason=POSTED_BUT_NO_FRESH_EVIDENCE,
            critical_code=CRITICAL_BLOCK_OVERRIDE,
            extra={
                "marker_path": str(marker_path),
                "elapsed_seconds": elapsed_seconds,
                "threshold_seconds": threshold_seconds,
            },
        )


# ─── glob 매칭 (forbidden path 용) ───────────────────────────────────────────
def _glob_match(pattern: str, path: str) -> bool:
    """간단한 glob 매칭.

    지원 규칙:
      - `**/`  → `(?:.*/)?` (0개 이상의 디렉토리. 루트 파일도 매칭).
      - `**`   → `.*`       (`/` 포함 임의 문자열).
      - `*`    → `[^/]*`    (디렉토리 경계를 넘지 않는 임의 문자열).

    `**/` 처리를 `**` 보다 먼저 두어 `**/foo.py` 패턴이 `foo.py` (루트 파일) 도
    매칭할 수 있게 한다 (re.escape 가 `/` 를 이스케이프하지 않는 환경 가정 외에도
    이스케이프된 `\\/` 형태도 함께 처리).
    """
    escaped = re.escape(pattern)
    regex = (
        "^"
        + escaped.replace(r"\*\*\/", r"(?:.*/)?")
                 .replace(r"\*\*/", r"(?:.*/)?")
                 .replace(r"\*\*", ".*")
                 .replace(r"\*", "[^/]*")
        + "$"
    )
    return re.match(regex, path) is not None


def _extract_task_id_from_branch(branch: str) -> str:
    """`task/task-2531-dev4` → `task-2531`."""
    m = re.match(r"^task/(?P<id>task-\d+(?:\+\d+)?)", branch)
    if not m:
        return branch
    return m.group("id")


# ─── task-2565 Phase 3 helper hooks ──────────────────────────────────────────
# helper-only 방식: 실제 wire는 후속 task에서 추가. Phase 2 executor_scheduler와 동일.

def invoke_phase3_ci_rerun_hook(rerun_input: Any, rerun_callable: Any = None) -> dict:
    """task-2565 Phase 3 — CI rerun adapter hook. caller가 inputs를 채워 호출.

    CIRerunInput을 받아 auto_rerun_failed_ci_jobs를 호출한다.
    실제 wire는 후속 task에서 추가 (helper-only, Phase 2 executor_scheduler 방식과 동일).

    Args:
      rerun_input: CIRerunInput 인스턴스.
      rerun_callable: (pr_number, failed_jobs) → dict 형태 callable. None이면 DECISION_ONLY.

    Returns:
      dict with keys: result (str), reason (str), decision (dict)
    """
    try:
        from anu_v2.second_review_recovery import auto_rerun_failed_ci_jobs
        return auto_rerun_failed_ci_jobs(rerun_input, rerun_callable=rerun_callable)
    except Exception as exc:  # noqa: BLE001
        return {"result": "HOOK_FAILED", "reason": str(exc), "decision": {}}


def invoke_phase3_pre_merge_guard_hook(pre_merge_input: Any) -> dict:
    """task-2565 Phase 3 — pre-merge wording-only commit 차단 hook.

    PreMergeCommitInput을 받아 classify_pre_merge_commit + build_pre_merge_block_decision을 호출한다.
    실제 wire는 후속 task에서 추가 (helper-only, Phase 2 executor_scheduler 방식과 동일).

    Args:
      pre_merge_input: PreMergeCommitInput 인스턴스.

    Returns:
      dict with keys: change_type (str), allowed (bool), decision (dict)
    """
    try:
        from anu_v2.second_review_recovery import (
            build_pre_merge_block_decision,
            classify_pre_merge_commit,
        )
        change_type, allowed = classify_pre_merge_commit(pre_merge_input)
        decision = build_pre_merge_block_decision(pre_merge_input)
        return {"change_type": change_type, "allowed": allowed, "decision": decision}
    except Exception as exc:  # noqa: BLE001
        return {"change_type": "UNKNOWN", "allowed": True, "decision": {}, "error": str(exc)}
