"""merge_queue_executor.py — task-2509 5 모듈 #1.

회장 명시: queue 선두 PR이 자동 머지 10조건 만족 시 회장 승인 없이 squash merge +
post-merge smoke + 후행 PR stale 재검증을 수행한다.
산출물은 정책 문서가 아니라 실행 가능한 코드 + 회귀 테스트.

자동 머지 10조건:
  1. queue head 위치 (선행 PR 모두 merged)
  2. origin/main fetch 성공 + main HEAD SHA 잠금
  3. PR base sync (BEHIND → merge sync, rebase/force 금지)
  4. effective diff = expected_files (불일치 → replacement_pr_runner 분기)
  5. forbidden path 0건
  6. CI required all SUCCESS
  7. Gemini reviewThreads unresolved 0
  8. mergeStateStatus == CLEAN
  9. HEAD SHA lock 유지 (검증 시작 == merge 직전)
 10. cherry_pick_allowed != true / serial_only conflict 0

Critical 7종 (회장 §14):
  - FORBIDDEN_PATH_INVASION
  - EFFECTIVE_DIFF_CONTAMINATION_REPLACEMENT_FAILED
  - GEMINI_REAL_BUG_SCOPE_EXPANSION
  - BLOCK_OVERRIDE_REQUIRED_OR_INSUFFICIENT_REASON
  - DEPENDENCY_CYCLE_OR_SERIAL_ONLY_CONFLICT
  - REPLACEMENT_PR_ALSO_FAILED
  - POST_MERGE_SMOKE_FAILURE

CLI:
  python3 utils/merge_queue_executor.py --pr <N> --dry-run
    → AUTO_MERGE_ALLOWED 또는 BLOCKED_WITH_REASON: <code> JSON

후속 모듈 (인터페이스만 박제):
  - replacement_pr_runner (task-2510)
  - auto_gemini_triage (task-2511)
  - post_merge_smoke_runner (task-2512)
  - critical_escalation_reporter (task-2513)
"""

from __future__ import annotations

import argparse
import json
import logging
import os
import re
import subprocess
import sys
import time
from dataclasses import dataclass, field, asdict
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Callable, Optional

WORKSPACE = Path(os.environ.get("WORKSPACE_ROOT", "/home/jay/workspace"))
EVENTS_DIR = WORKSPACE / "memory" / "events"
AUDIT_DIR = WORKSPACE / "memory" / "orchestration-audit"
GLOBAL_AUDIT_LOG = AUDIT_DIR / "merge-queue.jsonl"

logger = logging.getLogger(__name__)

# ─── W1: 5 모듈 import wiring (task-2514) ────────────────────────────────────
try:
    from utils.replacement_pr_runner import ReplacementPRRunner  # pyright: ignore[reportMissingImports]
    from utils.auto_gemini_triage import triage_pr, to_legacy_gemini_state, TriageReport  # pyright: ignore[reportMissingImports]
    from utils.post_merge_smoke_runner import (  # pyright: ignore[reportMissingImports]
        run_post_merge_smoke as run_pm_smoke_v2,
        PostMergeSmokeRun,
        SmokeStatus,
    )
    from utils.critical_escalation_reporter import (  # pyright: ignore[reportMissingImports]
        process_event as report_critical_event,
        LEGACY_CRITICAL_MAP,
    )
    from utils.automation_contracts import (  # pyright: ignore[reportMissingImports]
        CriticalEscalationType,
        EscalationPacket,
        ReviewGateStatus,
        GeminiTriageResult,
    )
    _WIRING_AVAILABLE = True
except ImportError as _wiring_import_err:  # pragma: no cover
    logger.debug("W1 wiring modules not available: %s", _wiring_import_err)
    _WIRING_AVAILABLE = False
    ReplacementPRRunner = None  # type: ignore[assignment,misc]
    triage_pr = None  # type: ignore[assignment]
    to_legacy_gemini_state = None  # type: ignore[assignment]
    TriageReport = None  # type: ignore[assignment,misc]
    run_pm_smoke_v2 = None  # type: ignore[assignment]
    PostMergeSmokeRun = None  # type: ignore[assignment,misc]
    SmokeStatus = None  # type: ignore[assignment,misc]
    report_critical_event = None  # type: ignore[assignment]
    LEGACY_CRITICAL_MAP = {}  # type: ignore[assignment]
    CriticalEscalationType = None  # type: ignore[assignment,misc]
    EscalationPacket = None  # type: ignore[assignment,misc]
    ReviewGateStatus = None  # type: ignore[assignment,misc]
    GeminiTriageResult = None  # type: ignore[assignment,misc]

# W1 wiring symbols re-export — fallback 분기에서 None으로 재할당되는 인터페이스를
# 외부에서 직접 참조할 수 있도록 명시적 노출 + Pyright unused 경고 회피.
__wiring_exports__ = (
    _WIRING_AVAILABLE,
    ReplacementPRRunner,
    triage_pr,
    to_legacy_gemini_state,
    TriageReport,
    run_pm_smoke_v2,
    PostMergeSmokeRun,
    SmokeStatus,
    report_critical_event,
    LEGACY_CRITICAL_MAP,
    CriticalEscalationType,
    EscalationPacket,
    ReviewGateStatus,
    GeminiTriageResult,
)


# ─── Decision codes ────────────────────────────────────────────────────────
AUTO_MERGE_ALLOWED = "AUTO_MERGE_ALLOWED"
AUTO_MERGE_SUCCESS = "AUTO_MERGE_SUCCESS"
WAITING_FOR_PREDECESSOR = "WAITING_FOR_PREDECESSOR"
BLOCKED_WITH_REASON = "BLOCKED_WITH_REASON"
HEAD_SHA_LOCK_BROKEN = "HEAD_SHA_LOCK_BROKEN"
CI_FAILURE_BLOCK = "CI_FAILURE_BLOCK"
CI_IN_PROGRESS = "CI_IN_PROGRESS"
GEMINI_UNRESOLVED_BLOCK = "GEMINI_UNRESOLVED_BLOCK"
MERGE_STATE_NOT_CLEAN = "MERGE_STATE_NOT_CLEAN"
DIFF_CONTAMINATION_REPLACEMENT = "DIFF_CONTAMINATION_REPLACEMENT"
# task-2521 §2 — Gemini async race: deferred 분류로 premature fail/rerun 차단
WAITING_FOR_GEMINI_REVIEW = "WAITING_FOR_GEMINI_REVIEW"
# task-2522 §5 — owner_pat fallback 감지 시 자동 머지 거부 분리 박제 (Critical 7종 X).
# 회장 §본질: owner_pat fallback은 functional success여도 autonomy success로
# 인정하지 않으며, AUTOMATION_CAPABILITY_GAP marker로만 분류한다.
OWNER_PAT_FALLBACK_BLOCKED = "OWNER_PAT_FALLBACK_BLOCKED"

# task-2521 §2 — wait budget (회장 결정 가능): pushed_at 기준 6분
GEMINI_REVIEW_WAIT_BUDGET_SECONDS_DEFAULT = 360.0

# ─── Critical 7종 (회장 §14) ───────────────────────────────────────────────
CRITICAL_FORBIDDEN_PATH = "FORBIDDEN_PATH_INVASION"
CRITICAL_DIFF_REPLACEMENT_FAILED = "EFFECTIVE_DIFF_CONTAMINATION_REPLACEMENT_FAILED"
CRITICAL_GEMINI_SCOPE_EXPANSION = "GEMINI_REAL_BUG_SCOPE_EXPANSION"
CRITICAL_BLOCK_OVERRIDE = "BLOCK_OVERRIDE_REQUIRED_OR_INSUFFICIENT_REASON"
CRITICAL_DEPENDENCY_CYCLE = "DEPENDENCY_CYCLE_OR_SERIAL_ONLY_CONFLICT"
CRITICAL_REPLACEMENT_FAILED = "REPLACEMENT_PR_ALSO_FAILED"
CRITICAL_POST_MERGE_SMOKE = "POST_MERGE_SMOKE_FAILURE"

CRITICAL_CODES = {
    CRITICAL_FORBIDDEN_PATH,
    CRITICAL_DIFF_REPLACEMENT_FAILED,
    CRITICAL_GEMINI_SCOPE_EXPANSION,
    CRITICAL_BLOCK_OVERRIDE,
    CRITICAL_DEPENDENCY_CYCLE,
    CRITICAL_REPLACEMENT_FAILED,
    CRITICAL_POST_MERGE_SMOKE,
}

# 회장 §6 — admin override / force / rebase 절대 금지 enum
FORBIDDEN_GIT_FLAGS = {"--admin", "--force", "--force-with-lease", "-f"}

# expected_files 외부 변경이 발생하면 본 path 분기 → replacement
REPLACEMENT_PR_RUNNER_HOOK = "replacement_pr_runner"   # task-2510
AUTO_GEMINI_TRIAGE_HOOK = "auto_gemini_triage"         # task-2511
POST_MERGE_SMOKE_HOOK = "post_merge_smoke_runner"      # task-2512
CRITICAL_ESCALATION_HOOK = "critical_escalation_reporter"  # task-2513


# ─── Gemini status enum (task-2509+1 §2) ─────────────────────────────────
GEMINI_COMPLETED = "GEMINI_COMPLETED"            # inline review 완료, unresolved 0
GEMINI_UNRESOLVED = "GEMINI_UNRESOLVED"          # inline review 완료, unresolved > 0
GEMINI_UNAVAILABLE_QUOTA = "GEMINI_UNAVAILABLE_QUOTA"  # daily quota limit (PR #58)
GEMINI_TIMEOUT = "GEMINI_TIMEOUT"                # polling timeout
GEMINI_STALE = "GEMINI_STALE"                    # PR head 변경 후 재실행 필요
GEMINI_REAL_BUG = "GEMINI_REAL_BUG"              # unresolved + real bug 분류
GEMINI_SCOPE_EXPANSION = "GEMINI_SCOPE_EXPANSION"  # expected_files 밖 수정 요구

GEMINI_STATUS_VALUES = frozenset({
    GEMINI_COMPLETED, GEMINI_UNRESOLVED, GEMINI_UNAVAILABLE_QUOTA,
    GEMINI_TIMEOUT, GEMINI_STALE, GEMINI_REAL_BUG, GEMINI_SCOPE_EXPANSION,
})

# Gemini 미가용 (fallback 가능 후보)
GEMINI_UNAVAILABLE_STATUSES = frozenset({
    GEMINI_UNAVAILABLE_QUOTA, GEMINI_TIMEOUT, GEMINI_STALE,
})


# ─── Risk level (task-2509+1 §4) ──────────────────────────────────────────
RISK_LEVEL_LOW = "LOW"
RISK_LEVEL_MEDIUM = "MEDIUM"
RISK_LEVEL_HIGH_CORE = "HIGH_CORE"

# 변경 파일 1건이라도 포함되면 HIGH_CORE 분류 (회장 §4)
HIGH_CORE_FILE_PATTERNS: list[re.Pattern] = [
    re.compile(r"^utils/merge_queue_executor\.py$"),
    re.compile(r"^utils/merge_topology_gate\.py$"),
    re.compile(r"^dispatch\.py$"),
    re.compile(r"^teams/shared/verifiers/.+"),
]


# ─── 정적 위험 패턴 (task-2509+1 §4 — HIGH_CORE 강화) ─────────────────────
RISKY_PATTERNS = [
    (re.compile(r"git\s+push\s+.*--force"), "force push"),
    (re.compile(r"git\s+push\s+.*-f\b"), "force push (-f)"),
    (re.compile(r"gh\s+pr\s+merge\s+.*--admin"), "admin override"),
    (re.compile(r"git\s+cherry-pick\b"), "cherry-pick"),
    (re.compile(r'open\([^)]*"\.done"[^)]*"w"'), "manual .done write"),
    (re.compile(r'Path\([^)]*\)\.touch\(\).*\.done'), "manual .done touch"),
]


# ─── Final decision enum prefix (task-2509+1 §7) ──────────────────────────
FINAL_AUTO_MERGE_ALLOWED = AUTO_MERGE_ALLOWED
FINAL_BLOCKED_PREFIX = "BLOCKED_WITH_REASON"
FINAL_CRITICAL_PREFIX = "CRITICAL_ESCALATION"
FALLBACK_REVIEW_FAILED = "FALLBACK_REVIEW_FAILED"
NON_DRY_RUN_REQUIRES_SMOKE_COMMAND = "NON_DRY_RUN_REQUIRES_SMOKE_COMMAND"


# ─── Data classes ──────────────────────────────────────────────────────────
@dataclass
class QueueDecision:
    decision: str
    reason: str = ""
    pr_number: Optional[int] = None
    task_id: Optional[str] = None
    main_head_sha_start: Optional[str] = None
    main_head_sha_merge: Optional[str] = None
    pr_head_sha_start: Optional[str] = None
    pr_head_sha_merge: Optional[str] = None
    expected_files: list[str] = field(default_factory=list)
    effective_files: list[str] = field(default_factory=list)
    forbidden_paths: list[str] = field(default_factory=list)
    ci_status: str = ""
    gemini_unresolved_count: int = 0
    merge_state_status: str = ""
    smoke_status: str = ""
    critical_code: Optional[str] = None
    audit_path: Optional[str] = None
    timestamp: str = ""
    fixture_pr_replay: list[dict] = field(default_factory=list)
    # 신규 필드 (task-2509+1 §7)
    gemini_status: Optional[str] = None              # GEMINI_* enum 중 하나
    fallback_review_used: bool = False
    fallback_review_passed: bool = False             # used=True 시만 의미
    risk_level: Optional[str] = None                 # RISK_LEVEL_* 중 하나
    review_gate_passed: bool = False
    final_decision: Optional[str] = None             # AUTO_MERGE_ALLOWED / BLOCKED_WITH_REASON: <code> / CRITICAL_ESCALATION: <enum>
    critical_escalation: Optional[str] = None        # Critical 7종 enum 중 1 (없으면 None)
    fallback_check_details: dict = field(default_factory=dict)  # evaluate_fallback_review() 결과
    static_scan_violations: list = field(default_factory=list)  # static_risky_pattern_scan 위반 목록
    # ★ W7 신규 필드 (task-2514)
    pipeline_step: Optional[str] = None                          # 현재 chain 단계
    replacement_used: bool = False
    triage_summary: Optional[dict] = None
    smoke_envelope: Optional[dict] = None
    escalations: list = field(default_factory=list)
    # ★ task-2521 §2 — Gemini async race 박제 (5 metric)
    pushed_at: Optional[str] = None                               # head SHA push 시각 ISO8601
    gemini_submitted_at: Optional[str] = None                    # gemini review submitted_at ISO8601
    ci_complete_at: Optional[str] = None                          # CI rollup 완료 시각 (있으면)
    push_to_ci_complete_seconds: Optional[float] = None
    push_to_gemini_review_seconds: Optional[float] = None
    gemini_gate_wait_seconds: Optional[float] = None             # 누적 대기 시간
    rerun_count: int = 0                                          # async race 재시도 횟수
    premature_gate_fail_detected: bool = False                   # gemini submitted_at < pushed_at 감지

    def to_dict(self) -> dict:
        return asdict(self)


@dataclass
class TaskSpec:
    """task spec metadata 정상화."""

    task_id: str
    expected_files: list[str]
    risk_area: str
    dependency: list[str]
    parallel_policy: str
    merge_queue_position: Any
    stale_recheck_required: bool
    cherry_pick_allowed: Any
    smoke_command: Optional[list[str]] = None


# ─── Subprocess wrapper (테스트 inject 용) ────────────────────────────────
RunnerType = Callable[..., subprocess.CompletedProcess]


def _default_runner(args: list[str], cwd: Optional[str] = None, timeout: int = 60) -> subprocess.CompletedProcess:
    return subprocess.run(
        args,
        cwd=cwd or str(WORKSPACE),
        capture_output=True,
        text=True,
        timeout=timeout,
    )


# ─── §10 admin/rebase/force 금지 검증 (정적) ──────────────────────────────
def assert_no_forbidden_git_flags(args: list[str]) -> None:
    bad = [a for a in args if a in FORBIDDEN_GIT_FLAGS or a.startswith("--admin")]
    if bad:
        raise RuntimeError(f"FORBIDDEN_GIT_FLAGS detected: {bad}")
    # rebase 명시 차단
    if "rebase" in args:
        raise RuntimeError("REBASE_FORBIDDEN")


# ─── task-2522 §2 — token selection wiring (좁은 보강) ────────────────────
# 회장 §본질:
#   1. GitHub App installation token 있음 → 사용 (정상 자동화).
#   2. App token 없음 → AUTOMATION_CAPABILITY_GAP 분류 + 자동 머지 거부.
#   3. owner_pat fallback → autonomy success로 간주하지 않음.
#
# 본 헬퍼는 신규 abstraction을 만들지 않고 deterministic 함수만 추가.
# 분류는 utils.bot_merge_identity.classify_token_source() 결과를 입력으로 받는다.
# (lazy import로 circular 방지)

def select_merge_token_decision(token_source: str) -> dict:
    """token_source 4 enum 입력 → merge 실행 허용 여부 + 분리 decision.

    회장 §2 정책:
      - GITHUB_APP_INSTALLATION_TOKEN → allow_merge=True, decision='APP_TOKEN_OK'
      - GITHUB_ACTIONS_TOKEN          → allow_merge=True, decision='ACTIONS_TOKEN_OK'
      - OWNER_PAT                     → allow_merge=False, decision=OWNER_PAT_FALLBACK_BLOCKED,
                                         capability_gap=True
      - UNKNOWN                       → allow_merge=False, decision='AUTOMATION_CAPABILITY_GAP',
                                         capability_gap=True (fail-closed)

    회장 §금지: owner_pat을 bot identity로 위장 X. 따라서 OWNER_PAT 시 allow_merge는
    반드시 False. capability_gap=True는 ops 채널 marker이며 Critical 7종 외이다.
    """
    src = (token_source or "").strip().upper()
    if src == "GITHUB_APP_INSTALLATION_TOKEN":
        return {
            "allow_merge": True,
            "decision": "APP_TOKEN_OK",
            "capability_gap": False,
            "reason": "GitHub App installation token detected",
        }
    if src == "GITHUB_ACTIONS_TOKEN":
        return {
            "allow_merge": True,
            "decision": "ACTIONS_TOKEN_OK",
            "capability_gap": False,
            "reason": "GitHub Actions runner token detected",
        }
    if src == "OWNER_PAT":
        return {
            "allow_merge": False,
            "decision": OWNER_PAT_FALLBACK_BLOCKED,
            "capability_gap": True,
            "reason": "owner_pat fallback detected; autonomy success NOT recognized",
        }
    # UNKNOWN 또는 분류 외
    return {
        "allow_merge": False,
        "decision": "AUTOMATION_CAPABILITY_GAP",
        "capability_gap": True,
        "reason": f"token source unknown or unclassified: {token_source!r}",
    }


# ─── §1 queue head 확인 ───────────────────────────────────────────────────
def check_predecessor_merged(
    dependency: list[str],
    runner: RunnerType,
    main_log_grep: Optional[Callable[[str], bool]] = None,
) -> tuple[bool, list[str]]:
    """dependency 항목 중 `.merged` 상태 task 모두 main에 반영됐는지 확인."""
    pending: list[str] = []
    if not dependency:
        return True, pending
    for spec in dependency:
        if not isinstance(spec, str):
            continue
        if spec == "none":
            continue
        m = re.match(r"^(task-\d+(?:\+\d+)?)(?:\.(merged|done))?$", spec.strip())
        if not m:
            pending.append(spec)
            continue
        task_id, state = m.group(1), m.group(2) or "merged"
        if state != "merged":
            continue
        # main commit grep — runner 또는 콜백 사용
        if main_log_grep is not None:
            if not main_log_grep(task_id):
                pending.append(task_id)
            continue
        result = runner(["git", "log", "origin/main", "--oneline", f"--grep={task_id}", "-n", "1"])
        if not result.stdout.strip():
            pending.append(task_id)
    return (len(pending) == 0), pending


# ─── §2 origin/main fetch + HEAD SHA 잠금 ──────────────────────────────────
def fetch_main_head(runner: RunnerType) -> str:
    """origin fetch + main HEAD SHA 반환."""
    runner(["git", "fetch", "origin", "--quiet"])
    result = runner(["git", "rev-parse", "origin/main"])
    sha = (result.stdout or "").strip()
    if not sha:
        raise RuntimeError("MAIN_HEAD_SHA_FETCH_FAILED")
    return sha


# ─── §3 PR base sync (BEHIND → merge sync) ────────────────────────────────
def sync_pr_base(
    _pr_branch: str,
    pr_workdir: str,
    runner: RunnerType,
    merge_state_status: str,
) -> dict:
    """BEHIND 시 merge sync. rebase 금지, force 금지."""
    result = {
        "performed": False,
        "method": None,
        "conflict": False,
    }
    if merge_state_status != "BEHIND":
        return result
    # merge origin/main (rebase/force 금지)
    args = ["git", "merge", "origin/main", "--no-edit"]
    assert_no_forbidden_git_flags(args)
    proc = runner(args, cwd=pr_workdir)
    result["performed"] = True
    result["method"] = "merge_no_edit"
    if proc.returncode != 0:
        # conflict 발생 — runner에서 stderr/stdout 확인
        combined = (proc.stdout or "") + (proc.stderr or "")
        if "CONFLICT" in combined or "conflict" in combined.lower():
            result["conflict"] = True
    return result


# ─── §4 effective diff vs expected_files ──────────────────────────────────
def _normalize_file_list(files: list[str]) -> set[str]:
    return {p.strip() for p in files if p.strip()}


def compare_effective_diff(
    effective_files: list[str],
    expected_files: list[str],
) -> tuple[bool, list[str], list[str]]:
    """effective diff vs expected_files 대칭 비교.

    반환: (equal, extra, missing)
      - extra   = effective - expected (예상 외 파일)
      - missing = expected - effective (누락 파일)
      - equal   = extra == [] and missing == []
    """
    expected = _normalize_file_list(expected_files)
    effective = _normalize_file_list(effective_files)
    extra = sorted(effective - expected)
    missing = sorted(expected - effective)
    return (len(extra) == 0 and len(missing) == 0), extra, missing


# ─── §5 forbidden path 검증 ────────────────────────────────────────────────
DEFAULT_FORBIDDEN_PATTERNS = [
    re.compile(r"^\.github/workflows/"),
    re.compile(r"^teams/.*/qc/verifiers/"),
    re.compile(r"^utils/task_id_parser\.py$"),
    re.compile(r"^scripts/finish_task\.py$"),
    re.compile(r"^scripts/qc_verify\.py$"),
]


def detect_forbidden_paths(
    effective_files: list[str],
    allowed_expected: list[str],
    extra_patterns: Optional[list[re.Pattern]] = None,
) -> list[str]:
    """forbidden 파일이 effective diff 안에 있는데 expected 외이면 invasion."""
    patterns = list(DEFAULT_FORBIDDEN_PATTERNS)
    if extra_patterns:
        patterns.extend(extra_patterns)
    expected = _normalize_file_list(allowed_expected)
    invasions: list[str] = []
    for f in effective_files:
        if f in expected:
            continue
        for p in patterns:
            if p.search(f):
                invasions.append(f)
                break
    return invasions


# ─── §6 CI 상태 확인 ───────────────────────────────────────────────────────
def fetch_ci_status(
    pr_number: int,
    runner: RunnerType,
    max_polls: int = 1,
    backoff_seconds: float = 0.0,
    sleeper: Callable[[float], None] = time.sleep,
) -> dict:
    """gh pr view --json statusCheckRollup → required all SUCCESS / FAILURE / IN_PROGRESS."""
    for attempt in range(max(max_polls, 1)):
        result = runner([
            "gh", "pr", "view", str(pr_number),
            "--json", "statusCheckRollup",
        ])
        try:
            payload = json.loads(result.stdout or "{}")
        except json.JSONDecodeError:
            payload = {}
        rollup = payload.get("statusCheckRollup") or []
        statuses = []
        for item in rollup:
            state = item.get("state") or item.get("conclusion") or item.get("status") or ""
            statuses.append(state.upper())
        if not statuses:
            return {"status": CI_FAILURE_BLOCK, "details": [], "raw": rollup}
        if any(s in {"FAILURE", "ERROR", "CANCELLED", "TIMED_OUT", "ACTION_REQUIRED"} for s in statuses):
            return {"status": CI_FAILURE_BLOCK, "details": statuses, "raw": rollup}
        if any(s in {"IN_PROGRESS", "PENDING", "QUEUED", "WAITING", "EXPECTED"} for s in statuses):
            if attempt + 1 < max_polls:
                sleeper(backoff_seconds * (2 ** attempt))
                continue
            return {"status": CI_IN_PROGRESS, "details": statuses, "raw": rollup}
        if all(s in {"SUCCESS", "COMPLETED", "NEUTRAL", "SKIPPED"} for s in statuses):
            return {"status": "SUCCESS", "details": statuses, "raw": rollup}
        return {"status": CI_FAILURE_BLOCK, "details": statuses, "raw": rollup}
    return {"status": CI_IN_PROGRESS, "details": [], "raw": []}


# ─── §7 Gemini 상태 ───────────────────────────────────────────────────────
def fetch_gemini_status(
    pr_number: int,
    runner: RunnerType,
    expected_files: list[str],
) -> dict:
    """gh api graphql reviewThreads → unresolved 0 필수.

    분기:
      - unresolved == 0 → ok
      - unresolved style/false-positive only & expected_files 안 → AUTO_GEMINI_TRIAGE_HOOK
      - unresolved real bug expected_files 밖 → CRITICAL_GEMINI_SCOPE_EXPANSION
    """
    result = runner([
        "gh", "api", "graphql", "-f",
        f"query=query{{ repository(owner:\"x\",name:\"y\"){{ pullRequest(number:{pr_number}){{ reviewThreads(first:50){{ nodes{{ isResolved comments(first:1){{ nodes{{ path body }} }} }} }} }} }} }}",
    ])
    try:
        payload = json.loads(result.stdout or "{}")
    except json.JSONDecodeError:
        payload = {}

    # task-2509+1 §2 — quota/timeout/stale 신호 감지 (PR #58 사고 방지)
    errors = payload.get("errors") or []
    if isinstance(errors, list) and errors:
        joined = " ".join(
            (e.get("message", "") if isinstance(e, dict) else str(e)).lower()
            for e in errors
        )
        if "quota" in joined or "rate limit" in joined or "rate-limit" in joined:
            return {
                "status": "unavailable_quota",
                "unresolved": [],
                "hook": None,
                "errors": errors,
            }
        if "timeout" in joined or "deadline" in joined:
            return {
                "status": "timeout",
                "unresolved": [],
                "hook": None,
                "errors": errors,
            }
    # PR head SHA 미스매치 → stale (payload에 stale 키 또는 head_sha mismatch)
    if payload.get("stale") is True or payload.get("pr_head_changed") is True:
        return {
            "status": "stale",
            "unresolved": [],
            "hook": None,
        }

    data = payload.get("data") or {}
    repo = data.get("repository") or {}
    pr = repo.get("pullRequest") or {}
    threads = pr.get("reviewThreads") or {}
    nodes = threads.get("nodes") or []
    unresolved = []
    for n in nodes:
        if n.get("isResolved"):
            continue
        comments = (n.get("comments") or {}).get("nodes") or []
        first = comments[0] if comments else {}
        unresolved.append({
            "path": first.get("path", ""),
            "body": first.get("body", ""),
        })
    if not unresolved:
        return {"status": "ok", "unresolved": [], "hook": None}
    expected = _normalize_file_list(expected_files)
    inside = [u for u in unresolved if u.get("path") in expected]
    outside = [u for u in unresolved if u.get("path") not in expected]
    if outside:
        return {
            "status": "critical_scope_expansion",
            "unresolved": unresolved,
            "outside": outside,
            "hook": CRITICAL_ESCALATION_HOOK,
            "critical_code": CRITICAL_GEMINI_SCOPE_EXPANSION,
        }
    return {
        "status": "auto_triage_candidate",
        "unresolved": unresolved,
        "inside": inside,
        "hook": AUTO_GEMINI_TRIAGE_HOOK,
    }


def classify_gemini_status(gemini_state: dict, *, real_bug: bool = False) -> str:
    """fetch_gemini_status() 결과 + 추가 신호 → enum 7종 중 하나.

    분류 규칙:
      - status == 'ok' AND unresolved == 0 → GEMINI_COMPLETED
      - status == 'critical_scope_expansion' → GEMINI_SCOPE_EXPANSION
      - status == 'auto_triage_candidate' AND real_bug=True → GEMINI_REAL_BUG
      - status == 'auto_triage_candidate' (real_bug=False) → GEMINI_UNRESOLVED
      - status == 'unavailable_quota' → GEMINI_UNAVAILABLE_QUOTA
      - status == 'timeout' → GEMINI_TIMEOUT
      - status == 'stale' → GEMINI_STALE
      - 그 외 → GEMINI_UNRESOLVED (보수적)
    """
    if not gemini_state:
        return GEMINI_UNRESOLVED
    status = (gemini_state.get("status") or "").lower()
    unresolved = gemini_state.get("unresolved") or []
    if status == "ok" and len(unresolved) == 0:
        return GEMINI_COMPLETED
    if status == "critical_scope_expansion":
        return GEMINI_SCOPE_EXPANSION
    if status == "auto_triage_candidate":
        return GEMINI_REAL_BUG if real_bug else GEMINI_UNRESOLVED
    if status == "unavailable_quota":
        return GEMINI_UNAVAILABLE_QUOTA
    if status == "timeout":
        return GEMINI_TIMEOUT
    if status == "stale":
        return GEMINI_STALE
    # 그 외 (보수적 분류)
    return GEMINI_UNRESOLVED


# ─── task-2521 §2 — Gemini async race hardening ─────────────────────────
def _parse_iso8601(value: Optional[str]) -> Optional[float]:
    """ISO8601 string → epoch seconds. None/invalid → None."""
    if not value:
        return None
    try:
        # GitHub uses 'Z' for UTC; Python 3.11+ datetime.fromisoformat supports it.
        normalised = value.strip()
        if normalised.endswith("Z"):
            normalised = normalised[:-1] + "+00:00"
        return datetime.fromisoformat(normalised).timestamp()
    except (ValueError, TypeError):
        return None


def evaluate_gemini_async_race(
    *,
    pushed_at: Optional[str],
    gemini_submitted_at: Optional[str],
    ci_complete_at: Optional[str] = None,
    gemini_status: Optional[str],
    now_epoch: float,
    wait_budget_seconds: float = GEMINI_REVIEW_WAIT_BUDGET_SECONDS_DEFAULT,
) -> dict:
    """task-2521 §2 — Gemini async race 박제 + 5 metric.

    분기:
      1. gemini_submitted_at == None  → review 미도착
         a. now - pushed_at <  wait_budget → status="async_pending", premature_gate_fail_detected=True
         b. now - pushed_at >= wait_budget → status="wait_budget_exhausted" (호출자가 quota/timeout/unavailable
            과 구분; 본 함수는 단순 "예산 초과" 신호만 emit)
      2. gemini_submitted_at != None  AND  submitted_at < pushed_at
            → status="async_pending" (stale review — 새 head SHA에 대해 미반영)
              + premature_gate_fail_detected=True
      3. gemini_submitted_at != None  AND  submitted_at >= pushed_at  → status="ok"

    분류는 GEMINI_UNAVAILABLE_QUOTA / GEMINI_TIMEOUT 과 구분되어 있어야 함 (task-2521 §2 명시).
    quota/timeout 신호는 호출자(fetch_gemini_status)가 이미 결정하므로 본 함수는 "async_pending"
    만 책임진다.

    Returns:
      {
        "status": "ok" | "async_pending" | "wait_budget_exhausted",
        "push_to_ci_complete_seconds": float | None,
        "push_to_gemini_review_seconds": float | None,
        "gemini_gate_wait_seconds": float | None,
        "premature_gate_fail_detected": bool,
        "should_block_with_waiting_marker": bool,
      }
    """
    pushed_epoch = _parse_iso8601(pushed_at)
    submitted_epoch = _parse_iso8601(gemini_submitted_at)
    ci_complete_epoch = _parse_iso8601(ci_complete_at)

    push_to_ci = (
        ci_complete_epoch - pushed_epoch
        if (pushed_epoch is not None and ci_complete_epoch is not None)
        else None
    )
    push_to_gemini = (
        submitted_epoch - pushed_epoch
        if (pushed_epoch is not None and submitted_epoch is not None)
        else None
    )
    gate_wait = (
        now_epoch - pushed_epoch
        if pushed_epoch is not None
        else None
    )

    # quota/timeout/stale은 호출자(classify_gemini_status)가 책임지므로 여기서는 직접 판단 X.
    # 단, async_pending은 status=="async_pending"으로만 표현.
    premature = False
    status_label = "ok"
    should_block = False

    if pushed_epoch is None:
        # pushed_at 미상 → race 평가 불가. caller에 'ok' 반환 (gate은 기존 분기로 진행)
        return {
            "status": "ok",
            "push_to_ci_complete_seconds": push_to_ci,
            "push_to_gemini_review_seconds": push_to_gemini,
            "gemini_gate_wait_seconds": gate_wait,
            "premature_gate_fail_detected": False,
            "should_block_with_waiting_marker": False,
        }

    if submitted_epoch is None:
        # case 1: review 미도착
        if (gate_wait or 0) < wait_budget_seconds:
            # case 1a: budget 내 → premature fail 신호 (gate 분기 차단 X, 단순 표시)
            premature = True
            status_label = "async_pending"
            should_block = True   # 호출자: WAITING_FOR_GEMINI_REVIEW 분류
        else:
            # case 1b: budget 초과
            status_label = "wait_budget_exhausted"
            # gemini_status가 unavailable/timeout이면 호출자가 분류; 본 함수는 박제만.
            if (gemini_status or "").upper() not in {
                "GEMINI_UNAVAILABLE_QUOTA",
                "GEMINI_TIMEOUT",
                "GEMINI_STALE",
            }:
                # quota/timeout 신호 없는데 미도착 → 여전히 async_pending (보수적)
                status_label = "async_pending"
                should_block = True
                premature = True
    elif submitted_epoch < pushed_epoch:
        # case 2: stale — submitted_at이 pushed_at보다 이전 (새 head SHA에 대해 미반영)
        premature = True
        status_label = "async_pending"
        should_block = True
    else:
        # case 3: 정상
        status_label = "ok"

    return {
        "status": status_label,
        "push_to_ci_complete_seconds": push_to_ci,
        "push_to_gemini_review_seconds": push_to_gemini,
        "gemini_gate_wait_seconds": gate_wait,
        "premature_gate_fail_detected": premature,
        "should_block_with_waiting_marker": should_block,
    }


# ─── Risk level / static scan / fallback review (task-2509+1 §3~5) ────────
def assess_risk_level(effective_files: list[str]) -> str:
    """변경 파일 → risk_level 분류."""
    for f in effective_files or []:
        for pat in HIGH_CORE_FILE_PATTERNS:
            if pat.search(f):
                return RISK_LEVEL_HIGH_CORE
    # 회장 §4 명시 외 단순 LOW (MEDIUM은 후행 task에서 정의)
    return RISK_LEVEL_LOW


def static_risky_pattern_scan(
    effective_files: list[str],
    workspace_root: Path = WORKSPACE,
) -> dict:
    """HIGH_CORE 시 변경 파일에 대해 위험 패턴 정적 검사.

    Returns:
      {"passed": bool, "violations": [{"file": str, "pattern": str, "line": int, "snippet": str}, ...]}
    """
    violations: list[dict] = []
    for rel in effective_files or []:
        path = workspace_root / rel
        if not path.exists() or not path.is_file():
            continue
        try:
            text = path.read_text(encoding="utf-8", errors="replace")
        except OSError:
            # 파일 I/O 오류 (권한/경로/심볼릭 등) — 해당 파일은 스캔에서 제외
            continue
        for lineno, line in enumerate(text.splitlines(), start=1):
            # 자기참조 회피: 패턴 정의 라인 자체는 제외 (RISKY_PATTERNS 정의 자리)
            if "RISKY_PATTERNS" in line or "static_risky_pattern_scan" in line:
                continue
            for pat, desc in RISKY_PATTERNS:
                if pat.search(line):
                    violations.append({
                        "file": rel,
                        "pattern": desc,
                        "line": lineno,
                        "snippet": line.strip()[:200],
                    })
    return {"passed": len(violations) == 0, "violations": violations}


def evaluate_fallback_review(
    *,
    diff_ok: bool,                  # effective == expected
    forbidden_count: int,
    ci_status: str,                 # "SUCCESS" 만 PASS
    merge_state_status: str,        # "CLEAN" 만 PASS
    head_sha_lock_ok: bool,
    smoke_command_defined: bool,
    dry_run_decision_pass: bool,
    risk_level: str,
    static_scan_passed: bool,       # HIGH_CORE 시만 의미. LOW/MEDIUM은 True 전달.
) -> dict:
    """8조건 검사. 모두 PASS여야 fallback_review_passed=True.

    Returns:
      {"passed": bool, "checks": {각 조건: bool}, "failed": [str, ...]}
    """
    checks = {
        "effective_diff_equals_expected": bool(diff_ok),
        "forbidden_path_zero": forbidden_count == 0,
        "ci_all_success": (ci_status or "").upper() == "SUCCESS",
        "merge_state_clean": (merge_state_status or "").upper() == "CLEAN",
        "head_sha_lock_ok": bool(head_sha_lock_ok),
        "smoke_command_defined": bool(smoke_command_defined),
        "dry_run_decision_pass": bool(dry_run_decision_pass),
        "static_risky_scan_pass_if_high_core": (
            risk_level != RISK_LEVEL_HIGH_CORE or bool(static_scan_passed)
        ),
    }
    failed = [k for k, v in checks.items() if not v]
    return {"passed": len(failed) == 0, "checks": checks, "failed": failed}


# ─── §8 mergeStateStatus ──────────────────────────────────────────────────
def fetch_merge_state(pr_number: int, runner: RunnerType) -> dict:
    result = runner([
        "gh", "pr", "view", str(pr_number),
        "--json", "mergeStateStatus,headRefOid,baseRefName",
    ])
    try:
        payload = json.loads(result.stdout or "{}")
    except json.JSONDecodeError:
        payload = {}
    return {
        "mergeStateStatus": (payload.get("mergeStateStatus") or "").upper(),
        "headRefOid": payload.get("headRefOid", ""),
        "baseRefName": payload.get("baseRefName", ""),
    }


# ─── §9 HEAD SHA lock ─────────────────────────────────────────────────────
def assert_head_sha_lock(start_sha: str, current_sha: str) -> bool:
    return bool(start_sha) and start_sha == current_sha


# ─── §10 squash merge ────────────────────────────────────────────────────
def execute_squash_merge(pr_number: int, runner: RunnerType) -> dict:
    args = ["gh", "pr", "merge", str(pr_number), "--squash", "--delete-branch"]
    assert_no_forbidden_git_flags(args)
    result = runner(args)
    return {
        "returncode": result.returncode,
        "stdout": (result.stdout or "")[-2000:],
        "stderr": (result.stderr or "")[-2000:],
    }


# ─── §11 post-merge smoke ─────────────────────────────────────────────────
def run_post_merge_smoke(
    smoke_command: Optional[list[str]],
    runner: RunnerType,
) -> dict:
    if not smoke_command:
        return {"status": "skipped", "details": "no smoke_command in task spec"}
    result = runner(smoke_command, timeout=600)
    if result.returncode == 0:
        return {"status": "PASS", "stdout": (result.stdout or "")[-2000:]}
    return {
        "status": "FAIL",
        "stdout": (result.stdout or "")[-2000:],
        "stderr": (result.stderr or "")[-2000:],
        "critical_code": CRITICAL_POST_MERGE_SMOKE,
    }


# ─── §12 후행 stale 재검증 (state machine) ────────────────────────────────
def recheck_following_prs(
    queue: list[dict],
    runner: RunnerType,
) -> list[dict]:
    """queue 다음 PR들의 BEHIND/conflict/diff 오염 자동 재평가 (task-2509+1 §9 보강).

    queue 항목 = {pr_number, expected_files?, prior_effective_files?,
                  prior_main_sha?, gemini_head_sha_at_last_review?}
    반환 = 각 PR별 state. 기존 키(needs_recheck/behind/conflict/blocked) 보존 +
            신규 키(effective_diff_drift/expected_files_maintained/forbidden_path_present/
                   gemini_stale/ci_rerun_needed/current_effective_files).
    """
    states: list[dict] = []
    for entry in queue:
        pr_number = entry.get("pr_number")
        if pr_number is None:
            continue
        view = fetch_merge_state(pr_number, runner)
        ms = view.get("mergeStateStatus", "")
        # observe_pr 호출은 좁은 예외만 받아 prior 값을 유지하도록 한다 (TC-12 회귀 보존).
        # subprocess/json 의존이므로 외부 호출 실패만 흡수하고 그 외는 raise.
        try:
            obs = observe_pr(pr_number, runner)
            current_effective = obs.get("effective_files", []) or []
            current_head = obs.get("headRefOid", "") or view.get("headRefOid", "")
        except (subprocess.SubprocessError, json.JSONDecodeError, OSError) as exc:
            logger.debug("observe_pr fallback for PR %s: %s", pr_number, exc)
            current_effective = entry.get("prior_effective_files", []) or []
            current_head = view.get("headRefOid", "")
        prior_effective = entry.get("prior_effective_files", []) or []
        expected_files = entry.get("expected_files", []) or []
        diff_drift = (
            sorted(set(current_effective)) != sorted(set(prior_effective))
            if prior_effective else False
        )
        expected_subset = (
            (not expected_files)
            or set(current_effective).issubset(set(expected_files))
        )
        forbidden_present = bool(detect_forbidden_paths(current_effective, expected_files))
        gemini_stale = bool(
            entry.get("gemini_head_sha_at_last_review")
            and entry.get("gemini_head_sha_at_last_review") != current_head
        )
        ci_rerun = (ms == "BEHIND") or diff_drift
        states.append({
            "pr_number": pr_number,
            "merge_state_status": ms,
            "needs_recheck": (
                ms in {"BEHIND", "DIRTY", "BLOCKED"}
                or diff_drift or forbidden_present or gemini_stale
            ),
            "behind": ms == "BEHIND",
            "conflict": ms == "DIRTY",
            "blocked": ms == "BLOCKED",
            "effective_diff_drift": diff_drift,
            "expected_files_maintained": expected_subset,
            "forbidden_path_present": forbidden_present,
            "gemini_stale": gemini_stale,
            "ci_rerun_needed": ci_rerun,
            "current_effective_files": list(current_effective),
        })
    return states


# ─── §13 audit/evidence ───────────────────────────────────────────────────
def write_audit(
    decision: QueueDecision,
    task_id: Optional[str],
    no_audit: bool = False,
) -> Optional[Path]:
    if no_audit:
        return None
    AUDIT_DIR.mkdir(parents=True, exist_ok=True)
    EVENTS_DIR.mkdir(parents=True, exist_ok=True)
    decision.timestamp = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.%fZ")
    line = json.dumps(decision.to_dict(), ensure_ascii=False) + "\n"
    with open(GLOBAL_AUDIT_LOG, "a", encoding="utf-8") as fh:
        fh.write(line)
    if task_id:
        per_task = EVENTS_DIR / f"{task_id}.merge-queue.json"
        with open(per_task, "w", encoding="utf-8") as fh:
            json.dump(decision.to_dict(), fh, ensure_ascii=False, indent=2)
        decision.audit_path = str(per_task)
        return per_task
    return GLOBAL_AUDIT_LOG


# ─── §14 critical_escalation_reporter 인터페이스 (task-2513 hook) ────────
def emit_critical_escalation(
    code: str,
    decision: QueueDecision,
    reporter_hook: Optional[Callable[[str, QueueDecision], None]] = None,
) -> None:
    if code not in CRITICAL_CODES:
        raise ValueError(f"UNKNOWN_CRITICAL_CODE: {code}")
    decision.critical_code = code
    if reporter_hook is not None:
        reporter_hook(code, decision)
    else:
        # W5: default reporter_hook → process_event() 호출 (dry_run=True, no_audit=False)
        if report_critical_event is not None:
            try:
                _event = {
                    "task_id": decision.task_id or "unknown",
                    "pr_number": decision.pr_number or 0,
                    "event_type": code,
                    "source": "merge_queue_executor",
                    "evidence": {
                        "decision": decision.decision,
                        "reason": decision.reason,
                        "critical_code": code,
                    },
                }
                _result = report_critical_event(
                    _event,
                    workspace_root=WORKSPACE,
                    dry_run=True,
                    no_audit=False,
                )
                # W5: 결과를 decision.escalations 리스트에 append
                if not hasattr(decision, "escalations") or decision.escalations is None:
                    decision.escalations = []
                decision.escalations.append(_result)
            except Exception as _w5_exc:
                logger.warning("W5 report_critical_event() failed (silent fallback): %s", _w5_exc)


# ─── task spec loader ────────────────────────────────────────────────────
_TASK_FRONTMATTER_RE = re.compile(r"```yaml\s*\n(.*?)```", re.DOTALL)


def load_task_spec(task_file: Path) -> TaskSpec:
    """task md에서 expected_files / dependency / parallel_policy 등 추출."""
    text = task_file.read_text(encoding="utf-8")
    yaml_block = ""
    m = _TASK_FRONTMATTER_RE.search(text)
    if m:
        yaml_block = m.group(1)
    expected_files = _extract_yaml_list(yaml_block, "expected_files")
    dependency = _extract_yaml_list(yaml_block, "dependency")
    risk_area = _extract_yaml_scalar(yaml_block, "risk_area")
    parallel_policy = _extract_yaml_scalar(yaml_block, "parallel_policy") or "serial_only"
    queue_pos_raw = _extract_yaml_scalar(yaml_block, "merge_queue_position")
    try:
        queue_pos: Any = int(queue_pos_raw) if queue_pos_raw and queue_pos_raw != "n/a" else queue_pos_raw or "n/a"
    except ValueError:
        queue_pos = queue_pos_raw
    stale_raw = _extract_yaml_scalar(yaml_block, "stale_recheck_required") or "false"
    cherry_raw = _extract_yaml_scalar(yaml_block, "cherry_pick_allowed") or "false"
    task_id_match = re.search(r"^# (task-\d+(?:\+\d+)?)", text, re.MULTILINE)
    task_id = task_id_match.group(1) if task_id_match else task_file.stem
    return TaskSpec(
        task_id=task_id,
        expected_files=expected_files,
        risk_area=risk_area or "",
        dependency=dependency,
        parallel_policy=parallel_policy,
        merge_queue_position=queue_pos,
        stale_recheck_required=str(stale_raw).strip().lower() == "true",
        cherry_pick_allowed=str(cherry_raw).strip().lower() == "true",
        smoke_command=None,
    )


def _extract_yaml_list(yaml_block: str, key: str) -> list[str]:
    pattern = rf"^{re.escape(key)}:\s*\n((?:\s*-\s*.+\n?)+)"
    m = re.search(pattern, yaml_block, re.MULTILINE)
    if not m:
        return []
    items: list[str] = []
    for line in m.group(1).splitlines():
        s = line.strip()
        if not s.startswith("-"):
            continue
        raw = s[1:].strip()
        if raw.startswith('"'):
            end = raw.find('"', 1)
            val = raw[1:end] if end > 0 else raw.strip("\"'")
        elif raw.startswith("'"):
            end = raw.find("'", 1)
            val = raw[1:end] if end > 0 else raw.strip("\"'")
        else:
            val = raw.split("#", 1)[0].strip().strip("\"'")
        if val:
            items.append(val)
    return items


def _extract_yaml_scalar(yaml_block: str, key: str) -> str:
    pattern = rf"^{re.escape(key)}:\s*([^\n#]+?)(?:\s*#.*)?$"
    m = re.search(pattern, yaml_block, re.MULTILINE)
    if not m:
        return ""
    val = m.group(1).strip().strip("\"'")
    return val


# ─── 핵심 오케스트레이션 ──────────────────────────────────────────────────
@dataclass
class ExecutorContext:
    runner: RunnerType = field(default_factory=lambda: _default_runner)
    pr_workdir: Optional[str] = None
    smoke_command: Optional[list[str]] = None
    no_audit: bool = False
    main_log_grep: Optional[Callable[[str], bool]] = None
    extra_forbidden_patterns: Optional[list[re.Pattern]] = None
    fixture_main_sha: Optional[str] = None  # fixture replay 용
    sleeper: Callable[[float], None] = field(default=lambda _s: None)
    reporter_hook: Optional[Callable[[str, QueueDecision], None]] = None
    # ★ NEW (W1~W7, task-2514)
    replacement_runner: Optional[Any] = None          # ReplacementPRRunner instance
    triage_fn: Optional[Callable] = None              # triage_pr or fixture
    smoke_envelope_fn: Optional[Callable] = None      # run_pm_smoke_v2 or fixture
    task_file: Optional[Path] = None                  # post_merge_smoke_runner가 task_file 인자 필요
    triage_threads: Optional[list] = None             # triage_fn 입력으로 전달할 raw threads (테스트 inject용)
    triage_fix_commits: Optional[list] = None
    triage_pr_head_sha: Optional[str] = None
    following_queue: list = field(default_factory=list)
    apply_triage: bool = False                        # auto_resolve_threads(apply=...)
    # ★ task-2521 §2 — Gemini async race wiring
    pushed_at: Optional[str] = None                   # head SHA push 시각 (ISO8601)
    gemini_submitted_at: Optional[str] = None         # gemini review submitted_at
    ci_complete_at: Optional[str] = None              # CI rollup 완료 시각 (있으면)
    now_provider: Callable[[], float] = field(default=lambda: datetime.now(timezone.utc).timestamp())
    gemini_wait_budget_seconds: float = GEMINI_REVIEW_WAIT_BUDGET_SECONDS_DEFAULT
    rerun_count: int = 0                              # 호출자가 누적 (이전 evaluate_pr 호출 횟수)


def evaluate_pr(
    pr_number: int,
    task_spec: TaskSpec,
    pr_head_sha: str,
    effective_files: list[str],
    merge_state: dict,
    ci_state: dict,
    gemini_state: dict,
    ctx: ExecutorContext,
) -> QueueDecision:
    """10조건 게이트 — 결과 decision 반환 (실제 머지 X, dry-run 가능)."""
    decision = QueueDecision(
        decision="UNKNOWN",
        pr_number=pr_number,
        task_id=task_spec.task_id,
        expected_files=list(task_spec.expected_files),
        effective_files=list(effective_files),
    )

    # task-2509+1 §3 — risk_level 사전 산정
    decision.risk_level = assess_risk_level(effective_files)
    # task-2509+1 §2 — gemini_status enum 분류 (early; 분기 시 사용)
    _gem_real_bug = bool(gemini_state.get("real_bug")) if isinstance(gemini_state, dict) else False
    decision.gemini_status = classify_gemini_status(gemini_state, real_bug=_gem_real_bug)
    # ★ task-2521 §2 — race timestamps 박제 (이후 분기에서 사용)
    decision.pushed_at = ctx.pushed_at
    # gemini_state로부터 submitted_at 우선 추출, fallback ctx
    _gemini_submitted_from_state: Optional[str] = None
    if isinstance(gemini_state, dict):
        _gemini_submitted_from_state = (
            gemini_state.get("submitted_at")
            or gemini_state.get("gemini_submitted_at")
            or None
        )
    decision.gemini_submitted_at = _gemini_submitted_from_state or ctx.gemini_submitted_at
    decision.ci_complete_at = ctx.ci_complete_at
    decision.rerun_count = ctx.rerun_count

    # §1 queue head 확인 (선행 PR merged)
    head_ok, pending = check_predecessor_merged(
        task_spec.dependency,
        ctx.runner,
        main_log_grep=ctx.main_log_grep,
    )
    if not head_ok:
        decision.decision = WAITING_FOR_PREDECESSOR
        decision.reason = f"pending: {','.join(pending)}"
        decision.final_decision = decision.decision
        return decision

    # §10 parallel_policy / cherry_pick_allowed gate (선행 검사 직후)
    _VALID_PARALLEL_POLICIES = {"serial_only", "limited_parallel", "parallel_safe"}
    if task_spec.parallel_policy not in _VALID_PARALLEL_POLICIES:
        decision.decision = BLOCKED_WITH_REASON
        decision.reason = f"INVALID_PARALLEL_POLICY: {task_spec.parallel_policy!r}"
        decision.critical_code = CRITICAL_DEPENDENCY_CYCLE
        decision.critical_escalation = CRITICAL_DEPENDENCY_CYCLE
        decision.final_decision = f"{FINAL_CRITICAL_PREFIX}: {CRITICAL_DEPENDENCY_CYCLE}"
        emit_critical_escalation(CRITICAL_DEPENDENCY_CYCLE, decision, ctx.reporter_hook)
        return decision
    _cherry_raw = task_spec.cherry_pick_allowed
    _cherry_bool = (
        _cherry_raw is True
        or (isinstance(_cherry_raw, str) and _cherry_raw.strip().lower() == "true")
    )
    if _cherry_bool:
        decision.decision = BLOCKED_WITH_REASON
        decision.reason = "cherry_pick_allowed=true is forbidden (DEPENDENCY_CYCLE risk)"
        decision.critical_code = CRITICAL_DEPENDENCY_CYCLE
        decision.critical_escalation = CRITICAL_DEPENDENCY_CYCLE
        decision.final_decision = f"{FINAL_CRITICAL_PREFIX}: {CRITICAL_DEPENDENCY_CYCLE}"
        emit_critical_escalation(CRITICAL_DEPENDENCY_CYCLE, decision, ctx.reporter_hook)
        return decision

    # §2 main HEAD SHA fetch
    if ctx.fixture_main_sha:
        main_head_start = ctx.fixture_main_sha
    else:
        main_head_start = fetch_main_head(ctx.runner)
    decision.main_head_sha_start = main_head_start
    decision.pr_head_sha_start = pr_head_sha

    # §3 base sync
    ms = (merge_state.get("mergeStateStatus") or "").upper()
    decision.merge_state_status = ms
    if ms == "BEHIND" and ctx.pr_workdir:
        sync = sync_pr_base(
            _pr_branch=merge_state.get("baseRefName", "main"),
            pr_workdir=ctx.pr_workdir,
            runner=ctx.runner,
            merge_state_status=ms,
        )
        if sync.get("conflict"):
            decision.decision = BLOCKED_WITH_REASON
            decision.reason = "MERGE_CONFLICT_DURING_BASE_SYNC"
            decision.critical_code = CRITICAL_BLOCK_OVERRIDE
            decision.critical_escalation = CRITICAL_BLOCK_OVERRIDE
            decision.final_decision = f"{FINAL_CRITICAL_PREFIX}: {CRITICAL_BLOCK_OVERRIDE}"
            return decision

    # §4 effective diff vs expected
    diff_ok, extra, missing = compare_effective_diff(effective_files, task_spec.expected_files)
    if not diff_ok:
        # forbidden path 우선 검사
        invasions = detect_forbidden_paths(
            effective_files, task_spec.expected_files, ctx.extra_forbidden_patterns,
        )
        if invasions:
            decision.decision = BLOCKED_WITH_REASON
            decision.reason = "FORBIDDEN_PATH"
            decision.forbidden_paths = invasions
            decision.critical_code = CRITICAL_FORBIDDEN_PATH
            decision.critical_escalation = CRITICAL_FORBIDDEN_PATH
            decision.final_decision = f"{FINAL_CRITICAL_PREFIX}: {CRITICAL_FORBIDDEN_PATH}"
            emit_critical_escalation(CRITICAL_FORBIDDEN_PATH, decision, ctx.reporter_hook)
            return decision
        # forbidden 없음 → replacement 분기 (task-2510 hook)
        # W2: ctx.replacement_runner가 주어진 경우 ReplacementPRRunner.execute() 호출
        decision.pipeline_step = "evaluate_pr"
        runner = getattr(ctx, "replacement_runner", None)
        if runner is not None:
            try:
                replacement_result = runner.execute(pr_number, task_spec)
                if replacement_result.success:
                    decision.replacement_used = True
                    repl_pr = replacement_result.replacement_pr
                    decision.decision = DIFF_CONTAMINATION_REPLACEMENT
                    reason_parts_r = []
                    if extra:
                        reason_parts_r.append(f"extra={extra}")
                    if missing:
                        reason_parts_r.append(f"missing={missing}")
                    reason_parts_r.append(f"replacement PR #{repl_pr} created")
                    decision.reason = f"diff contamination: {'; '.join(reason_parts_r)}; hook={REPLACEMENT_PR_RUNNER_HOOK}"
                    decision.final_decision = decision.decision
                    return decision
                else:
                    # ReplacementResult.success=False → Critical escalation
                    _crit_code = CRITICAL_DIFF_REPLACEMENT_FAILED
                    decision.decision = BLOCKED_WITH_REASON
                    reason_parts_f = []
                    if extra:
                        reason_parts_f.append(f"extra={extra}")
                    if missing:
                        reason_parts_f.append(f"missing={missing}")
                    reason_parts_f.append(f"replacement_failed={replacement_result.failure_reason}")
                    decision.reason = f"diff contamination + replacement failed: {'; '.join(reason_parts_f)}"
                    decision.critical_code = _crit_code
                    decision.critical_escalation = _crit_code
                    decision.final_decision = f"{FINAL_CRITICAL_PREFIX}: {_crit_code}"
                    emit_critical_escalation(_crit_code, decision, ctx.reporter_hook)
                    return decision
            except Exception as _w2_exc:
                logger.warning("W2 replacement_runner.execute() failed: %s", _w2_exc)
                # fallthrough → 기존 동작 (DIFF_CONTAMINATION_REPLACEMENT 반환)
        # ctx.replacement_runner가 None이거나 예외 시 기존 동작 유지 (회귀 보존)
        decision.decision = DIFF_CONTAMINATION_REPLACEMENT
        reason_parts = []
        if extra:
            reason_parts.append(f"extra={extra}")
        if missing:
            reason_parts.append(f"missing={missing}")
        decision.reason = f"diff contamination: {'; '.join(reason_parts)}; hook={REPLACEMENT_PR_RUNNER_HOOK}"
        decision.final_decision = decision.decision
        return decision

    # §5 forbidden path (expected 안에 들어 있어도 forbidden 여부 재확인)
    invasions = detect_forbidden_paths(
        effective_files, task_spec.expected_files, ctx.extra_forbidden_patterns,
    )
    if invasions:
        decision.decision = BLOCKED_WITH_REASON
        decision.reason = "FORBIDDEN_PATH_INSIDE_EXPECTED"
        decision.forbidden_paths = invasions
        decision.critical_code = CRITICAL_FORBIDDEN_PATH
        decision.critical_escalation = CRITICAL_FORBIDDEN_PATH
        decision.final_decision = f"{FINAL_CRITICAL_PREFIX}: {CRITICAL_FORBIDDEN_PATH}"
        return decision

    # §6 CI
    ci_status = ci_state.get("status", "")
    decision.ci_status = ci_status
    if ci_status == CI_FAILURE_BLOCK:
        decision.decision = CI_FAILURE_BLOCK
        decision.reason = f"CI failure: {ci_state.get('details')}"
        decision.final_decision = decision.decision
        return decision
    if ci_status == CI_IN_PROGRESS:
        decision.decision = BLOCKED_WITH_REASON
        decision.reason = "CI_IN_PROGRESS"
        decision.final_decision = f"{FINAL_BLOCKED_PREFIX}: CI_IN_PROGRESS"
        return decision

    # §7 Gemini (task-2509+1 §7 — review_gate_passed 도입)
    # W3: ctx.triage_fn이 주어진 경우 triage_pr() + to_legacy_gemini_state() 호출
    _triage_fn = getattr(ctx, "triage_fn", None)
    if _triage_fn is not None:
        try:
            import dataclasses as _dc
            _triage_threads = getattr(ctx, "triage_threads", None) or []
            _triage_pr_head_sha = getattr(ctx, "triage_pr_head_sha", None) or pr_head_sha
            _triage_fix_commits = getattr(ctx, "triage_fix_commits", None) or []
            _apply_triage = getattr(ctx, "apply_triage", False)
            _triage_report = _triage_fn(
                pr_number,
                _triage_threads,
                _triage_pr_head_sha,
                _triage_fix_commits,
                list(task_spec.expected_files),
                [],
                _apply_triage,
                task_spec.task_id,
            )
            gemini_state = to_legacy_gemini_state(_triage_report)  # pyright: ignore[reportOptionalCall]
            # W3: completed 상태를 classify_gemini_status가 인식하는 "ok" 로 정규화
            if gemini_state.get("status") == "completed":
                gemini_state = dict(gemini_state)
                gemini_state["status"] = "ok"
            # W3: review_gate_status.review_gate_passed 기반으로 triage_summary 저장
            decision.triage_summary = _dc.asdict(_triage_report.triage_summary) if _triage_report.triage_summary is not None else None
            # W3: gemini_status 재분류 (triage 결과 반영)
            _gem_real_bug2 = bool(gemini_state.get("real_bug")) if isinstance(gemini_state, dict) else False
            decision.gemini_status = classify_gemini_status(gemini_state, real_bug=_gem_real_bug2)
        except Exception as _w3_exc:
            logger.warning("W3 triage_fn() failed: %s", _w3_exc)
            # fallthrough → 기존 gemini_state 그대로 사용 (회귀 보존)
    gem_status = gemini_state.get("status", "")
    decision.gemini_unresolved_count = len(gemini_state.get("unresolved", []) or [])
    # ★ task-2521 §2 — Gemini async race 평가 (submitted_at < pushed_at 박제)
    # ctx.pushed_at이 주어진 경우만 평가 (회귀 보존; 기존 호출은 영향 X)
    if ctx.pushed_at:
        # gemini_state 우선, fallback ctx
        _submitted_for_race = decision.gemini_submitted_at
        _race = evaluate_gemini_async_race(
            pushed_at=ctx.pushed_at,
            gemini_submitted_at=_submitted_for_race,
            ci_complete_at=ctx.ci_complete_at,
            gemini_status=decision.gemini_status,
            now_epoch=ctx.now_provider(),
            wait_budget_seconds=ctx.gemini_wait_budget_seconds,
        )
        decision.push_to_ci_complete_seconds = _race["push_to_ci_complete_seconds"]
        decision.push_to_gemini_review_seconds = _race["push_to_gemini_review_seconds"]
        decision.gemini_gate_wait_seconds = _race["gemini_gate_wait_seconds"]
        decision.premature_gate_fail_detected = bool(_race["premature_gate_fail_detected"])
        if _race["should_block_with_waiting_marker"]:
            decision.decision = WAITING_FOR_GEMINI_REVIEW
            decision.reason = (
                f"WAITING_FOR_GEMINI_REVIEW: race=async_pending; "
                f"pushed_at={ctx.pushed_at}; submitted_at={_submitted_for_race or 'none'}; "
                f"gate_wait={decision.gemini_gate_wait_seconds:.1f}s; "
                f"budget={ctx.gemini_wait_budget_seconds:.1f}s; "
                f"premature_gate_fail_detected={decision.premature_gate_fail_detected}"
            )
            decision.final_decision = decision.decision
            return decision
    if gem_status == "critical_scope_expansion":
        decision.decision = BLOCKED_WITH_REASON
        decision.reason = "GEMINI_REAL_BUG_OUTSIDE_EXPECTED_FILES"
        decision.critical_code = CRITICAL_GEMINI_SCOPE_EXPANSION
        decision.critical_escalation = CRITICAL_GEMINI_SCOPE_EXPANSION
        decision.final_decision = f"{FINAL_CRITICAL_PREFIX}: {CRITICAL_GEMINI_SCOPE_EXPANSION}"
        return decision
    if gem_status == "auto_triage_candidate":
        # 회귀 보존: 기존 GEMINI_UNRESOLVED_BLOCK 유지
        decision.decision = GEMINI_UNRESOLVED_BLOCK
        decision.reason = f"unresolved (auto_gemini_triage hook); count={decision.gemini_unresolved_count}"
        decision.final_decision = decision.decision
        return decision

    # task-2509+1 §7 — Gemini 미가용 (quota/timeout/stale) 시 fallback review
    if decision.gemini_status in GEMINI_UNAVAILABLE_STATUSES:
        decision.fallback_review_used = True
        # HIGH_CORE 시 정적 패턴 스캔 수행
        if decision.risk_level == RISK_LEVEL_HIGH_CORE:
            scan = static_risky_pattern_scan(effective_files)
            decision.static_scan_violations = list(scan.get("violations") or [])
            static_scan_passed = bool(scan.get("passed"))
        else:
            static_scan_passed = True

        # 8조건 평가 (Gemini 신호 없이 자율 통과를 막는 박제)
        fb = evaluate_fallback_review(
            diff_ok=diff_ok,
            forbidden_count=0,  # 여기 도달 = 위 §4/§5 통과
            ci_status=ci_status if ci_status else "",
            merge_state_status=ms,
            head_sha_lock_ok=True,  # evaluate 시점 (verify_head_lock_then_merge에서 재확인)
            smoke_command_defined=bool(ctx.smoke_command),
            dry_run_decision_pass=True,  # evaluate 단계는 dry-run 결정 통과
            risk_level=decision.risk_level or RISK_LEVEL_LOW,
            static_scan_passed=static_scan_passed,
        )
        decision.fallback_check_details = fb
        decision.fallback_review_passed = bool(fb.get("passed"))
        if not fb.get("passed"):
            decision.decision = BLOCKED_WITH_REASON
            decision.reason = (
                f"{FALLBACK_REVIEW_FAILED}: {','.join(fb.get('failed') or [])}; gemini_status={decision.gemini_status}"
            )
            decision.review_gate_passed = False
            decision.final_decision = f"{FINAL_BLOCKED_PREFIX}: {FALLBACK_REVIEW_FAILED}"
            return decision
        decision.review_gate_passed = True
        # fallthrough → §8 mergeStateStatus 검사
    elif decision.gemini_status == GEMINI_COMPLETED:
        decision.review_gate_passed = True
        decision.fallback_review_used = False

    # §8 mergeStateStatus CLEAN
    if ms != "CLEAN":
        if ms == "BLOCKED":
            decision.decision = BLOCKED_WITH_REASON
            decision.reason = "MERGE_STATE_BLOCKED"
            decision.critical_code = CRITICAL_BLOCK_OVERRIDE
            decision.critical_escalation = CRITICAL_BLOCK_OVERRIDE
            decision.final_decision = f"{FINAL_CRITICAL_PREFIX}: {CRITICAL_BLOCK_OVERRIDE}"
            emit_critical_escalation(CRITICAL_BLOCK_OVERRIDE, decision, ctx.reporter_hook)
            return decision
        if ms == "BEHIND":
            decision.decision = BLOCKED_WITH_REASON
            decision.reason = "MERGE_STATE_BEHIND_AFTER_SYNC"
            decision.final_decision = f"{FINAL_BLOCKED_PREFIX}: MERGE_STATE_BEHIND_AFTER_SYNC"
            return decision
        decision.decision = MERGE_STATE_NOT_CLEAN
        decision.reason = f"mergeStateStatus={ms}"
        decision.final_decision = decision.decision
        return decision

    # task-2509+1 §1 — review_gate_passed=False 방어 가드 (Gemini 코드리뷰 critical 수용).
    # GEMINI_UNRESOLVED / 분류되지 않은 Gemini 상태가 §7의 명시 분기(auto_triage_candidate /
    # critical_scope_expansion / UNAVAILABLE / COMPLETED) 중 어디에도 매칭되지 않을 경우,
    # review_gate_passed가 False로 남은 채 §8/§9를 통과해 AUTO_MERGE_ALLOWED에 도달할 수 있다.
    # 이 시나리오를 fail-closed로 차단한다.
    if not decision.review_gate_passed:
        decision.decision = BLOCKED_WITH_REASON
        decision.reason = (
            f"REVIEW_GATE_NOT_PASSED: gemini_status={decision.gemini_status}; "
            f"fallback_review_used={decision.fallback_review_used}"
        )
        decision.final_decision = f"{FINAL_BLOCKED_PREFIX}: REVIEW_GATE_NOT_PASSED"
        return decision

    # §9 HEAD SHA lock — 호출자 측에서 merge 직전 다시 확인 (verify_head_lock_then_merge)
    decision.decision = AUTO_MERGE_ALLOWED
    decision.reason = "all 10 gates PASS"
    decision.final_decision = decision.decision
    return decision


def verify_head_lock_then_merge(
    decision: QueueDecision,
    pr_number: int,
    ctx: ExecutorContext,
    fetch_pr_head_at_merge: Callable[[int], str],
    fetch_main_head_at_merge: Optional[Callable[[], str]] = None,
    dry_run: bool = True,
) -> QueueDecision:
    """§9 + §10 — head SHA 재확인 후 squash merge (dry_run=False 시 실제 실행)."""
    if decision.decision != AUTO_MERGE_ALLOWED:
        return decision
    pr_head_now = fetch_pr_head_at_merge(pr_number)
    decision.pr_head_sha_merge = pr_head_now
    if not assert_head_sha_lock(decision.pr_head_sha_start or "", pr_head_now):
        decision.decision = HEAD_SHA_LOCK_BROKEN
        decision.reason = (
            f"PR head changed: start={decision.pr_head_sha_start} merge={pr_head_now}"
        )
        decision.final_decision = decision.decision
        return decision
    if fetch_main_head_at_merge is not None:
        main_now = fetch_main_head_at_merge()
        decision.main_head_sha_merge = main_now
        if decision.main_head_sha_start and main_now != decision.main_head_sha_start:
            decision.decision = HEAD_SHA_LOCK_BROKEN
            decision.reason = (
                f"main head changed: start={decision.main_head_sha_start} merge={main_now}"
            )
            decision.final_decision = decision.decision
            return decision
    # task-2509+1 §5 — non-dry-run + smoke_command 미정의 → BLOCK
    if not dry_run and not ctx.smoke_command:
        decision.decision = BLOCKED_WITH_REASON
        decision.reason = NON_DRY_RUN_REQUIRES_SMOKE_COMMAND
        decision.final_decision = f"{FINAL_BLOCKED_PREFIX}: {NON_DRY_RUN_REQUIRES_SMOKE_COMMAND}"
        return decision
    if dry_run:
        decision.final_decision = decision.decision
        return decision
    merge_result = execute_squash_merge(pr_number, ctx.runner)
    if merge_result["returncode"] != 0:
        decision.decision = BLOCKED_WITH_REASON
        decision.reason = f"squash merge failed: {merge_result['stderr']}"
        decision.final_decision = f"{FINAL_BLOCKED_PREFIX}: SQUASH_MERGE_FAILED"
        return decision

    # §11 post-merge 후처리 파이프라인
    # (a) git fetch origin --quiet
    if ctx.pr_workdir is not None:
        ctx.runner(["git", "fetch", "origin", "--quiet"], cwd=ctx.pr_workdir)
        # (b) main fast-forward
        ff_result = ctx.runner(
            ["git", "merge", "--ff-only", "origin/main"],
            cwd=ctx.pr_workdir,
        )
        ff_ok = ff_result.returncode == 0
    else:
        ff_ok = None  # pr_workdir 없으면 fast-forward skip

    # (c) run_post_merge_smoke (fast-forward 결과 포함)
    if ff_ok is False:
        # fast-forward 실패 → smoke 결과를 FAIL_FAST_FORWARD로 표시
        smoke_status_str = "FAIL_FAST_FORWARD"
        decision.smoke_status = smoke_status_str
        decision.decision = BLOCKED_WITH_REASON
        decision.reason = "POST_MERGE_SMOKE_FAILURE: fast-forward failed before smoke"
        decision.critical_code = CRITICAL_POST_MERGE_SMOKE
        decision.critical_escalation = CRITICAL_POST_MERGE_SMOKE
        decision.final_decision = f"{FINAL_CRITICAL_PREFIX}: {CRITICAL_POST_MERGE_SMOKE}"
        emit_critical_escalation(CRITICAL_POST_MERGE_SMOKE, decision, ctx.reporter_hook)
        return decision
    elif ff_ok is None:
        # pr_workdir이 None → skip fast-forward, reason에 명시
        # W4: ctx.smoke_envelope_fn 또는 ctx.task_file이 주어진 경우 run_pm_smoke_v2() 호출
        _smoke_envelope_fn = getattr(ctx, "smoke_envelope_fn", None)
        _task_file_path = getattr(ctx, "task_file", None)
        if (_smoke_envelope_fn is not None or _task_file_path is not None) and _task_file_path is not None:
            try:
                _envelope_fn = _smoke_envelope_fn if _smoke_envelope_fn is not None else run_pm_smoke_v2
                envelope = _envelope_fn(  # pyright: ignore[reportOptionalCall]
                    task_file=_task_file_path,
                    merge_commit=decision.main_head_sha_merge or decision.main_head_sha_start or "",
                    dry_run=False,
                    runner=ctx.runner,
                    pr_number=pr_number,
                    skip_stale_check=True,
                )
                decision.smoke_status = envelope.status.value if hasattr(envelope.status, "value") else str(envelope.status)
                decision.smoke_envelope = envelope.to_dict()
                if not envelope.allow_continuation:
                    decision.decision = BLOCKED_WITH_REASON
                    decision.reason = "POST_MERGE_SMOKE_FAILURE (ff-only skipped: no pr_workdir) [envelope]"
                    decision.critical_code = CRITICAL_POST_MERGE_SMOKE
                    decision.critical_escalation = CRITICAL_POST_MERGE_SMOKE
                    decision.final_decision = f"{FINAL_CRITICAL_PREFIX}: {CRITICAL_POST_MERGE_SMOKE}"
                    emit_critical_escalation(CRITICAL_POST_MERGE_SMOKE, decision, ctx.reporter_hook)
                    return decision
            except Exception as _w4_exc:
                logger.warning("W4 smoke_envelope_fn() failed (fallback to dict): %s", _w4_exc)
                smoke = run_post_merge_smoke(ctx.smoke_command, ctx.runner)
                decision.smoke_status = smoke.get("status", "")
                if smoke.get("status") == "FAIL":
                    decision.decision = BLOCKED_WITH_REASON
                    decision.reason = "POST_MERGE_SMOKE_FAILURE (ff-only skipped: no pr_workdir)"
                    decision.critical_code = CRITICAL_POST_MERGE_SMOKE
                    decision.critical_escalation = CRITICAL_POST_MERGE_SMOKE
                    decision.final_decision = f"{FINAL_CRITICAL_PREFIX}: {CRITICAL_POST_MERGE_SMOKE}"
                    emit_critical_escalation(CRITICAL_POST_MERGE_SMOKE, decision, ctx.reporter_hook)
                    return decision
        else:
            # envelope_fn 미주입 시 기존 dict 기반 동작 (회귀 보존)
            smoke = run_post_merge_smoke(ctx.smoke_command, ctx.runner)
            decision.smoke_status = smoke.get("status", "")
            if smoke.get("status") == "FAIL":
                decision.decision = BLOCKED_WITH_REASON
                decision.reason = "POST_MERGE_SMOKE_FAILURE (ff-only skipped: no pr_workdir)"
                decision.critical_code = CRITICAL_POST_MERGE_SMOKE
                decision.critical_escalation = CRITICAL_POST_MERGE_SMOKE
                decision.final_decision = f"{FINAL_CRITICAL_PREFIX}: {CRITICAL_POST_MERGE_SMOKE}"
                emit_critical_escalation(CRITICAL_POST_MERGE_SMOKE, decision, ctx.reporter_hook)
                return decision
    else:
        # W4: ctx.smoke_envelope_fn 또는 ctx.task_file이 주어진 경우 run_pm_smoke_v2() 호출
        _smoke_envelope_fn2 = getattr(ctx, "smoke_envelope_fn", None)
        _task_file_path2 = getattr(ctx, "task_file", None)
        if (_smoke_envelope_fn2 is not None or _task_file_path2 is not None) and _task_file_path2 is not None:
            try:
                _envelope_fn2 = _smoke_envelope_fn2 if _smoke_envelope_fn2 is not None else run_pm_smoke_v2
                envelope2 = _envelope_fn2(  # pyright: ignore[reportOptionalCall]
                    task_file=_task_file_path2,
                    merge_commit=decision.main_head_sha_merge or decision.main_head_sha_start or "",
                    dry_run=False,
                    runner=ctx.runner,
                    pr_number=pr_number,
                    skip_stale_check=True,
                )
                decision.smoke_status = envelope2.status.value if hasattr(envelope2.status, "value") else str(envelope2.status)
                decision.smoke_envelope = envelope2.to_dict()
                if not envelope2.allow_continuation:
                    decision.decision = BLOCKED_WITH_REASON
                    decision.reason = "POST_MERGE_SMOKE_FAILURE [envelope]"
                    decision.critical_code = CRITICAL_POST_MERGE_SMOKE
                    decision.critical_escalation = CRITICAL_POST_MERGE_SMOKE
                    decision.final_decision = f"{FINAL_CRITICAL_PREFIX}: {CRITICAL_POST_MERGE_SMOKE}"
                    emit_critical_escalation(CRITICAL_POST_MERGE_SMOKE, decision, ctx.reporter_hook)
                    return decision
            except Exception as _w4b_exc:
                logger.warning("W4 smoke_envelope_fn() failed (fallback to dict): %s", _w4b_exc)
                smoke = run_post_merge_smoke(ctx.smoke_command, ctx.runner)
                decision.smoke_status = smoke.get("status", "")
                if smoke.get("status") == "FAIL":
                    decision.decision = BLOCKED_WITH_REASON
                    decision.reason = "POST_MERGE_SMOKE_FAILURE"
                    decision.critical_code = CRITICAL_POST_MERGE_SMOKE
                    decision.critical_escalation = CRITICAL_POST_MERGE_SMOKE
                    decision.final_decision = f"{FINAL_CRITICAL_PREFIX}: {CRITICAL_POST_MERGE_SMOKE}"
                    emit_critical_escalation(CRITICAL_POST_MERGE_SMOKE, decision, ctx.reporter_hook)
                    return decision
        else:
            # envelope_fn 미주입 시 기존 dict 기반 동작 (회귀 보존)
            smoke = run_post_merge_smoke(ctx.smoke_command, ctx.runner)
            decision.smoke_status = smoke.get("status", "")
            if smoke.get("status") == "FAIL":
                decision.decision = BLOCKED_WITH_REASON
                decision.reason = "POST_MERGE_SMOKE_FAILURE"
                decision.critical_code = CRITICAL_POST_MERGE_SMOKE
                decision.critical_escalation = CRITICAL_POST_MERGE_SMOKE
                decision.final_decision = f"{FINAL_CRITICAL_PREFIX}: {CRITICAL_POST_MERGE_SMOKE}"
                emit_critical_escalation(CRITICAL_POST_MERGE_SMOKE, decision, ctx.reporter_hook)
                return decision

    # (d) recheck_following_prs — 후행 PR stale 재검증
    # ctx에 following_queue가 있으면 재검증 수행 (기본 빈 리스트)
    _following_queue: list[dict] = getattr(ctx, "following_queue", []) or []
    if _following_queue:
        following_states = recheck_following_prs(_following_queue, ctx.runner)
        decision.fixture_pr_replay = list(decision.fixture_pr_replay) + following_states
        # W6: forbidden_path_present / blocked 발견 시 emit_critical_escalation() 호출
        for _fstate in following_states:
            if _fstate.get("forbidden_path_present"):
                try:
                    emit_critical_escalation(CRITICAL_FORBIDDEN_PATH, decision, ctx.reporter_hook)
                except Exception as _w6_exc:
                    logger.warning("W6 forbidden_path_present emit failed: %s", _w6_exc)
            if _fstate.get("blocked"):
                try:
                    emit_critical_escalation(CRITICAL_BLOCK_OVERRIDE, decision, ctx.reporter_hook)
                except Exception as _w6b_exc:
                    logger.warning("W6 blocked emit failed: %s", _w6b_exc)

    decision.decision = AUTO_MERGE_SUCCESS
    decision.reason = "merged + smoke PASS"
    decision.final_decision = decision.decision
    return decision


# ─── PR observation helpers (gh CLI) ──────────────────────────────────────
def observe_pr(pr_number: int, runner: RunnerType) -> dict:
    result = runner([
        "gh", "pr", "view", str(pr_number),
        "--json", "headRefOid,baseRefName,mergeStateStatus,files",
    ])
    try:
        payload = json.loads(result.stdout or "{}")
    except json.JSONDecodeError:
        payload = {}
    files = [f.get("path", "") for f in (payload.get("files") or []) if f.get("path")]
    return {
        "headRefOid": payload.get("headRefOid", ""),
        "baseRefName": payload.get("baseRefName", "main"),
        "mergeStateStatus": (payload.get("mergeStateStatus") or "").upper(),
        "effective_files": files,
    }


# ─── CLI entrypoint ───────────────────────────────────────────────────────
def main(argv: Optional[list[str]] = None) -> int:
    parser = argparse.ArgumentParser(description="merge_queue_executor (task-2509)")
    parser.add_argument("--pr", type=int, required=True, help="PR 번호")
    parser.add_argument("--task-file", type=str, default="", help="queue 선두 task spec md")
    parser.add_argument("--dry-run", action="store_true", default=True, help="dry-run (default true)")
    parser.add_argument("--no-dry-run", dest="dry_run", action="store_false", help="실제 머지 수행")
    parser.add_argument("--no-audit", action="store_true", help="audit log 기록 안 함 (테스트용)")
    parser.add_argument("--smoke-command", type=str, default="", help="post-merge smoke 명령")
    parser.add_argument("--workspace", type=str, default=str(WORKSPACE), help="workspace root")
    parser.add_argument("--ci-max-polls", type=int, default=5, help="CI 상태 polling 최대 횟수 (default 5)")
    parser.add_argument("--ci-backoff-seconds", type=float, default=10.0, help="CI polling 지수 백오프 기본값 (default 10.0)")
    args = parser.parse_args(argv)

    # task spec 로드
    if args.task_file:
        task_path = Path(args.task_file)
    else:
        # PR 번호에서 task ID 추론은 어려우므로 task_file 필수
        print(json.dumps({
            "decision": BLOCKED_WITH_REASON,
            "reason": "missing --task-file",
        }, ensure_ascii=False))
        return 2
    if not task_path.exists():
        print(json.dumps({
            "decision": BLOCKED_WITH_REASON,
            "reason": f"task file not found: {task_path}",
        }, ensure_ascii=False))
        return 2
    spec = load_task_spec(task_path)

    runner = _default_runner
    pr_obs = observe_pr(args.pr, runner)
    ci = fetch_ci_status(args.pr, runner, max_polls=args.ci_max_polls, backoff_seconds=args.ci_backoff_seconds)
    gem = fetch_gemini_status(args.pr, runner, spec.expected_files)
    ctx = ExecutorContext(
        runner=runner,
        pr_workdir=str(WORKSPACE),
        smoke_command=args.smoke_command.split() if args.smoke_command else None,
        no_audit=args.no_audit,
        # CLI 기본 wiring 활성화 (W1~W7, task-2514)
        replacement_runner=ReplacementPRRunner(runner=runner, dry_run=args.dry_run) if ReplacementPRRunner is not None else None,
        triage_fn=triage_pr if triage_pr is not None else None,
        smoke_envelope_fn=run_pm_smoke_v2 if run_pm_smoke_v2 is not None else None,
        task_file=task_path,
    )
    decision = evaluate_pr(
        pr_number=args.pr,
        task_spec=spec,
        pr_head_sha=pr_obs.get("headRefOid", ""),
        effective_files=pr_obs.get("effective_files", []),
        merge_state={
            "mergeStateStatus": pr_obs.get("mergeStateStatus", ""),
            "baseRefName": pr_obs.get("baseRefName", "main"),
        },
        ci_state=ci,
        gemini_state=gem,
        ctx=ctx,
    )
    decision = verify_head_lock_then_merge(
        decision=decision,
        pr_number=args.pr,
        ctx=ctx,
        fetch_pr_head_at_merge=lambda n: observe_pr(n, runner).get("headRefOid", ""),
        fetch_main_head_at_merge=lambda: fetch_main_head(runner),
        dry_run=args.dry_run,
    )
    write_audit(decision, spec.task_id, no_audit=args.no_audit)
    print(json.dumps(decision.to_dict(), ensure_ascii=False, indent=2))
    if decision.decision in {AUTO_MERGE_ALLOWED, AUTO_MERGE_SUCCESS}:
        return 0
    if decision.decision == WAITING_FOR_PREDECESSOR:
        return 3
    return 1


if __name__ == "__main__":  # pragma: no cover
    sys.exit(main())
