"""merge_queue_executor.py — task-2509 5 모듈 #1.

회장 명시: queue 선두 PR이 자동 머지 10조건 만족 시 회장 승인 없이 squash merge +
post-merge smoke + 후행 PR stale 재검증을 수행한다.
산출물은 정책 문서가 아니라 실행 가능한 코드 + 회귀 테스트.

자동 머지 10조건:
  1. queue head 위치 (선행 PR 모두 merged)
  2. origin/main fetch 성공 + main HEAD SHA 잠금
  3. PR base sync (BEHIND → merge sync, rebase/force 금지)
  4. effective diff = expected_files (불일치 → replacement_pr_runner 분기)
  5. forbidden path 0건
  6. CI required all SUCCESS
  7. Gemini reviewThreads unresolved 0
  8. mergeStateStatus == CLEAN
  9. HEAD SHA lock 유지 (검증 시작 == merge 직전)
 10. cherry_pick_allowed != true / serial_only conflict 0

Critical 7종 (회장 §14):
  - FORBIDDEN_PATH_INVASION
  - EFFECTIVE_DIFF_CONTAMINATION_REPLACEMENT_FAILED
  - GEMINI_REAL_BUG_SCOPE_EXPANSION
  - BLOCK_OVERRIDE_REQUIRED_OR_INSUFFICIENT_REASON
  - DEPENDENCY_CYCLE_OR_SERIAL_ONLY_CONFLICT
  - REPLACEMENT_PR_ALSO_FAILED
  - POST_MERGE_SMOKE_FAILURE

CLI:
  python3 utils/merge_queue_executor.py --pr <N> --dry-run
    → AUTO_MERGE_ALLOWED 또는 BLOCKED_WITH_REASON: <code> JSON

후속 모듈 (인터페이스만 박제):
  - replacement_pr_runner (task-2510)
  - auto_gemini_triage (task-2511)
  - post_merge_smoke_runner (task-2512)
  - critical_escalation_reporter (task-2513)
"""

from __future__ import annotations

import argparse
import json
import logging
import os
import re
import subprocess
import sys
import time
from dataclasses import dataclass, field, asdict
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Callable, Optional

WORKSPACE = Path(os.environ.get("WORKSPACE_ROOT", "/home/jay/workspace"))
EVENTS_DIR = WORKSPACE / "memory" / "events"
AUDIT_DIR = WORKSPACE / "memory" / "orchestration-audit"
GLOBAL_AUDIT_LOG = AUDIT_DIR / "merge-queue.jsonl"

logger = logging.getLogger(__name__)


# ─── Decision codes ────────────────────────────────────────────────────────
AUTO_MERGE_ALLOWED = "AUTO_MERGE_ALLOWED"
AUTO_MERGE_SUCCESS = "AUTO_MERGE_SUCCESS"
WAITING_FOR_PREDECESSOR = "WAITING_FOR_PREDECESSOR"
BLOCKED_WITH_REASON = "BLOCKED_WITH_REASON"
HEAD_SHA_LOCK_BROKEN = "HEAD_SHA_LOCK_BROKEN"
CI_FAILURE_BLOCK = "CI_FAILURE_BLOCK"
CI_IN_PROGRESS = "CI_IN_PROGRESS"
GEMINI_UNRESOLVED_BLOCK = "GEMINI_UNRESOLVED_BLOCK"
MERGE_STATE_NOT_CLEAN = "MERGE_STATE_NOT_CLEAN"
DIFF_CONTAMINATION_REPLACEMENT = "DIFF_CONTAMINATION_REPLACEMENT"

# ─── Critical 7종 (회장 §14) ───────────────────────────────────────────────
CRITICAL_FORBIDDEN_PATH = "FORBIDDEN_PATH_INVASION"
CRITICAL_DIFF_REPLACEMENT_FAILED = "EFFECTIVE_DIFF_CONTAMINATION_REPLACEMENT_FAILED"
CRITICAL_GEMINI_SCOPE_EXPANSION = "GEMINI_REAL_BUG_SCOPE_EXPANSION"
CRITICAL_BLOCK_OVERRIDE = "BLOCK_OVERRIDE_REQUIRED_OR_INSUFFICIENT_REASON"
CRITICAL_DEPENDENCY_CYCLE = "DEPENDENCY_CYCLE_OR_SERIAL_ONLY_CONFLICT"
CRITICAL_REPLACEMENT_FAILED = "REPLACEMENT_PR_ALSO_FAILED"
CRITICAL_POST_MERGE_SMOKE = "POST_MERGE_SMOKE_FAILURE"

CRITICAL_CODES = {
    CRITICAL_FORBIDDEN_PATH,
    CRITICAL_DIFF_REPLACEMENT_FAILED,
    CRITICAL_GEMINI_SCOPE_EXPANSION,
    CRITICAL_BLOCK_OVERRIDE,
    CRITICAL_DEPENDENCY_CYCLE,
    CRITICAL_REPLACEMENT_FAILED,
    CRITICAL_POST_MERGE_SMOKE,
}

# 회장 §6 — admin override / force / rebase 절대 금지 enum
FORBIDDEN_GIT_FLAGS = {"--admin", "--force", "--force-with-lease", "-f"}

# expected_files 외부 변경이 발생하면 본 path 분기 → replacement
REPLACEMENT_PR_RUNNER_HOOK = "replacement_pr_runner"   # task-2510
AUTO_GEMINI_TRIAGE_HOOK = "auto_gemini_triage"         # task-2511
POST_MERGE_SMOKE_HOOK = "post_merge_smoke_runner"      # task-2512
CRITICAL_ESCALATION_HOOK = "critical_escalation_reporter"  # task-2513


# ─── Gemini status enum (task-2509+1 §2) ─────────────────────────────────
GEMINI_COMPLETED = "GEMINI_COMPLETED"            # inline review 완료, unresolved 0
GEMINI_UNRESOLVED = "GEMINI_UNRESOLVED"          # inline review 완료, unresolved > 0
GEMINI_UNAVAILABLE_QUOTA = "GEMINI_UNAVAILABLE_QUOTA"  # daily quota limit (PR #58)
GEMINI_TIMEOUT = "GEMINI_TIMEOUT"                # polling timeout
GEMINI_STALE = "GEMINI_STALE"                    # PR head 변경 후 재실행 필요
GEMINI_REAL_BUG = "GEMINI_REAL_BUG"              # unresolved + real bug 분류
GEMINI_SCOPE_EXPANSION = "GEMINI_SCOPE_EXPANSION"  # expected_files 밖 수정 요구

GEMINI_STATUS_VALUES = frozenset({
    GEMINI_COMPLETED, GEMINI_UNRESOLVED, GEMINI_UNAVAILABLE_QUOTA,
    GEMINI_TIMEOUT, GEMINI_STALE, GEMINI_REAL_BUG, GEMINI_SCOPE_EXPANSION,
})

# Gemini 미가용 (fallback 가능 후보)
GEMINI_UNAVAILABLE_STATUSES = frozenset({
    GEMINI_UNAVAILABLE_QUOTA, GEMINI_TIMEOUT, GEMINI_STALE,
})


# ─── Risk level (task-2509+1 §4) ──────────────────────────────────────────
RISK_LEVEL_LOW = "LOW"
RISK_LEVEL_MEDIUM = "MEDIUM"
RISK_LEVEL_HIGH_CORE = "HIGH_CORE"

# 변경 파일 1건이라도 포함되면 HIGH_CORE 분류 (회장 §4)
HIGH_CORE_FILE_PATTERNS: list[re.Pattern] = [
    re.compile(r"^utils/merge_queue_executor\.py$"),
    re.compile(r"^utils/merge_topology_gate\.py$"),
    re.compile(r"^dispatch\.py$"),
    re.compile(r"^teams/shared/verifiers/.+"),
]


# ─── 정적 위험 패턴 (task-2509+1 §4 — HIGH_CORE 강화) ─────────────────────
RISKY_PATTERNS = [
    (re.compile(r"git\s+push\s+.*--force"), "force push"),
    (re.compile(r"git\s+push\s+.*-f\b"), "force push (-f)"),
    (re.compile(r"gh\s+pr\s+merge\s+.*--admin"), "admin override"),
    (re.compile(r"git\s+cherry-pick\b"), "cherry-pick"),
    (re.compile(r'open\([^)]*"\.done"[^)]*"w"'), "manual .done write"),
    (re.compile(r'Path\([^)]*\)\.touch\(\).*\.done'), "manual .done touch"),
]


# ─── Final decision enum prefix (task-2509+1 §7) ──────────────────────────
FINAL_AUTO_MERGE_ALLOWED = AUTO_MERGE_ALLOWED
FINAL_BLOCKED_PREFIX = "BLOCKED_WITH_REASON"
FINAL_CRITICAL_PREFIX = "CRITICAL_ESCALATION"
FALLBACK_REVIEW_FAILED = "FALLBACK_REVIEW_FAILED"
NON_DRY_RUN_REQUIRES_SMOKE_COMMAND = "NON_DRY_RUN_REQUIRES_SMOKE_COMMAND"


# ─── Data classes ──────────────────────────────────────────────────────────
@dataclass
class QueueDecision:
    decision: str
    reason: str = ""
    pr_number: Optional[int] = None
    task_id: Optional[str] = None
    main_head_sha_start: Optional[str] = None
    main_head_sha_merge: Optional[str] = None
    pr_head_sha_start: Optional[str] = None
    pr_head_sha_merge: Optional[str] = None
    expected_files: list[str] = field(default_factory=list)
    effective_files: list[str] = field(default_factory=list)
    forbidden_paths: list[str] = field(default_factory=list)
    ci_status: str = ""
    gemini_unresolved_count: int = 0
    merge_state_status: str = ""
    smoke_status: str = ""
    critical_code: Optional[str] = None
    audit_path: Optional[str] = None
    timestamp: str = ""
    fixture_pr_replay: list[dict] = field(default_factory=list)
    # 신규 필드 (task-2509+1 §7)
    gemini_status: Optional[str] = None              # GEMINI_* enum 중 하나
    fallback_review_used: bool = False
    fallback_review_passed: bool = False             # used=True 시만 의미
    risk_level: Optional[str] = None                 # RISK_LEVEL_* 중 하나
    review_gate_passed: bool = False
    final_decision: Optional[str] = None             # AUTO_MERGE_ALLOWED / BLOCKED_WITH_REASON: <code> / CRITICAL_ESCALATION: <enum>
    critical_escalation: Optional[str] = None        # Critical 7종 enum 중 1 (없으면 None)
    fallback_check_details: dict = field(default_factory=dict)  # evaluate_fallback_review() 결과
    static_scan_violations: list = field(default_factory=list)  # static_risky_pattern_scan 위반 목록

    def to_dict(self) -> dict:
        return asdict(self)


@dataclass
class TaskSpec:
    """task spec metadata 정상화."""

    task_id: str
    expected_files: list[str]
    risk_area: str
    dependency: list[str]
    parallel_policy: str
    merge_queue_position: Any
    stale_recheck_required: bool
    cherry_pick_allowed: Any
    smoke_command: Optional[list[str]] = None


# ─── Subprocess wrapper (테스트 inject 용) ────────────────────────────────
RunnerType = Callable[..., subprocess.CompletedProcess]


def _default_runner(args: list[str], cwd: Optional[str] = None, timeout: int = 60) -> subprocess.CompletedProcess:
    return subprocess.run(
        args,
        cwd=cwd or str(WORKSPACE),
        capture_output=True,
        text=True,
        timeout=timeout,
    )


# ─── §10 admin/rebase/force 금지 검증 (정적) ──────────────────────────────
def assert_no_forbidden_git_flags(args: list[str]) -> None:
    bad = [a for a in args if a in FORBIDDEN_GIT_FLAGS or a.startswith("--admin")]
    if bad:
        raise RuntimeError(f"FORBIDDEN_GIT_FLAGS detected: {bad}")
    # rebase 명시 차단
    if "rebase" in args:
        raise RuntimeError("REBASE_FORBIDDEN")


# ─── §1 queue head 확인 ───────────────────────────────────────────────────
def check_predecessor_merged(
    dependency: list[str],
    runner: RunnerType,
    main_log_grep: Optional[Callable[[str], bool]] = None,
) -> tuple[bool, list[str]]:
    """dependency 항목 중 `.merged` 상태 task 모두 main에 반영됐는지 확인."""
    pending: list[str] = []
    if not dependency:
        return True, pending
    for spec in dependency:
        if not isinstance(spec, str):
            continue
        if spec == "none":
            continue
        m = re.match(r"^(task-\d+(?:\+\d+)?)(?:\.(merged|done))?$", spec.strip())
        if not m:
            pending.append(spec)
            continue
        task_id, state = m.group(1), m.group(2) or "merged"
        if state != "merged":
            continue
        # main commit grep — runner 또는 콜백 사용
        if main_log_grep is not None:
            if not main_log_grep(task_id):
                pending.append(task_id)
            continue
        result = runner(["git", "log", "origin/main", "--oneline", f"--grep={task_id}", "-n", "1"])
        if not result.stdout.strip():
            pending.append(task_id)
    return (len(pending) == 0), pending


# ─── §2 origin/main fetch + HEAD SHA 잠금 ──────────────────────────────────
def fetch_main_head(runner: RunnerType) -> str:
    """origin fetch + main HEAD SHA 반환."""
    runner(["git", "fetch", "origin", "--quiet"])
    result = runner(["git", "rev-parse", "origin/main"])
    sha = (result.stdout or "").strip()
    if not sha:
        raise RuntimeError("MAIN_HEAD_SHA_FETCH_FAILED")
    return sha


# ─── §3 PR base sync (BEHIND → merge sync) ────────────────────────────────
def sync_pr_base(
    _pr_branch: str,
    pr_workdir: str,
    runner: RunnerType,
    merge_state_status: str,
) -> dict:
    """BEHIND 시 merge sync. rebase 금지, force 금지."""
    result = {
        "performed": False,
        "method": None,
        "conflict": False,
    }
    if merge_state_status != "BEHIND":
        return result
    # merge origin/main (rebase/force 금지)
    args = ["git", "merge", "origin/main", "--no-edit"]
    assert_no_forbidden_git_flags(args)
    proc = runner(args, cwd=pr_workdir)
    result["performed"] = True
    result["method"] = "merge_no_edit"
    if proc.returncode != 0:
        # conflict 발생 — runner에서 stderr/stdout 확인
        combined = (proc.stdout or "") + (proc.stderr or "")
        if "CONFLICT" in combined or "conflict" in combined.lower():
            result["conflict"] = True
    return result


# ─── §4 effective diff vs expected_files ──────────────────────────────────
def _normalize_file_list(files: list[str]) -> set[str]:
    return {p.strip() for p in files if p.strip()}


def compare_effective_diff(
    effective_files: list[str],
    expected_files: list[str],
) -> tuple[bool, list[str], list[str]]:
    """effective diff vs expected_files 대칭 비교.

    반환: (equal, extra, missing)
      - extra   = effective - expected (예상 외 파일)
      - missing = expected - effective (누락 파일)
      - equal   = extra == [] and missing == []
    """
    expected = _normalize_file_list(expected_files)
    effective = _normalize_file_list(effective_files)
    extra = sorted(effective - expected)
    missing = sorted(expected - effective)
    return (len(extra) == 0 and len(missing) == 0), extra, missing


# ─── §5 forbidden path 검증 ────────────────────────────────────────────────
DEFAULT_FORBIDDEN_PATTERNS = [
    re.compile(r"^\.github/workflows/"),
    re.compile(r"^teams/.*/qc/verifiers/"),
    re.compile(r"^utils/task_id_parser\.py$"),
    re.compile(r"^scripts/finish_task\.py$"),
    re.compile(r"^scripts/qc_verify\.py$"),
]


def detect_forbidden_paths(
    effective_files: list[str],
    allowed_expected: list[str],
    extra_patterns: Optional[list[re.Pattern]] = None,
) -> list[str]:
    """forbidden 파일이 effective diff 안에 있는데 expected 외이면 invasion."""
    patterns = list(DEFAULT_FORBIDDEN_PATTERNS)
    if extra_patterns:
        patterns.extend(extra_patterns)
    expected = _normalize_file_list(allowed_expected)
    invasions: list[str] = []
    for f in effective_files:
        if f in expected:
            continue
        for p in patterns:
            if p.search(f):
                invasions.append(f)
                break
    return invasions


# ─── §6 CI 상태 확인 ───────────────────────────────────────────────────────
def fetch_ci_status(
    pr_number: int,
    runner: RunnerType,
    max_polls: int = 1,
    backoff_seconds: float = 0.0,
    sleeper: Callable[[float], None] = time.sleep,
) -> dict:
    """gh pr view --json statusCheckRollup → required all SUCCESS / FAILURE / IN_PROGRESS."""
    for attempt in range(max(max_polls, 1)):
        result = runner([
            "gh", "pr", "view", str(pr_number),
            "--json", "statusCheckRollup",
        ])
        try:
            payload = json.loads(result.stdout or "{}")
        except json.JSONDecodeError:
            payload = {}
        rollup = payload.get("statusCheckRollup") or []
        statuses = []
        for item in rollup:
            state = item.get("state") or item.get("conclusion") or item.get("status") or ""
            statuses.append(state.upper())
        if not statuses:
            return {"status": CI_FAILURE_BLOCK, "details": [], "raw": rollup}
        if any(s in {"FAILURE", "ERROR", "CANCELLED", "TIMED_OUT", "ACTION_REQUIRED"} for s in statuses):
            return {"status": CI_FAILURE_BLOCK, "details": statuses, "raw": rollup}
        if any(s in {"IN_PROGRESS", "PENDING", "QUEUED", "WAITING", "EXPECTED"} for s in statuses):
            if attempt + 1 < max_polls:
                sleeper(backoff_seconds * (2 ** attempt))
                continue
            return {"status": CI_IN_PROGRESS, "details": statuses, "raw": rollup}
        if all(s in {"SUCCESS", "COMPLETED", "NEUTRAL", "SKIPPED"} for s in statuses):
            return {"status": "SUCCESS", "details": statuses, "raw": rollup}
        return {"status": CI_FAILURE_BLOCK, "details": statuses, "raw": rollup}
    return {"status": CI_IN_PROGRESS, "details": [], "raw": []}


# ─── §7 Gemini 상태 ───────────────────────────────────────────────────────
def fetch_gemini_status(
    pr_number: int,
    runner: RunnerType,
    expected_files: list[str],
) -> dict:
    """gh api graphql reviewThreads → unresolved 0 필수.

    분기:
      - unresolved == 0 → ok
      - unresolved style/false-positive only & expected_files 안 → AUTO_GEMINI_TRIAGE_HOOK
      - unresolved real bug expected_files 밖 → CRITICAL_GEMINI_SCOPE_EXPANSION
    """
    result = runner([
        "gh", "api", "graphql", "-f",
        f"query=query{{ repository(owner:\"x\",name:\"y\"){{ pullRequest(number:{pr_number}){{ reviewThreads(first:50){{ nodes{{ isResolved comments(first:1){{ nodes{{ path body }} }} }} }} }} }} }}",
    ])
    try:
        payload = json.loads(result.stdout or "{}")
    except json.JSONDecodeError:
        payload = {}

    # task-2509+1 §2 — quota/timeout/stale 신호 감지 (PR #58 사고 방지)
    errors = payload.get("errors") or []
    if isinstance(errors, list) and errors:
        joined = " ".join(
            (e.get("message", "") if isinstance(e, dict) else str(e)).lower()
            for e in errors
        )
        if "quota" in joined or "rate limit" in joined or "rate-limit" in joined:
            return {
                "status": "unavailable_quota",
                "unresolved": [],
                "hook": None,
                "errors": errors,
            }
        if "timeout" in joined or "deadline" in joined:
            return {
                "status": "timeout",
                "unresolved": [],
                "hook": None,
                "errors": errors,
            }
    # PR head SHA 미스매치 → stale (payload에 stale 키 또는 head_sha mismatch)
    if payload.get("stale") is True or payload.get("pr_head_changed") is True:
        return {
            "status": "stale",
            "unresolved": [],
            "hook": None,
        }

    data = payload.get("data") or {}
    repo = data.get("repository") or {}
    pr = repo.get("pullRequest") or {}
    threads = pr.get("reviewThreads") or {}
    nodes = threads.get("nodes") or []
    unresolved = []
    for n in nodes:
        if n.get("isResolved"):
            continue
        comments = (n.get("comments") or {}).get("nodes") or []
        first = comments[0] if comments else {}
        unresolved.append({
            "path": first.get("path", ""),
            "body": first.get("body", ""),
        })
    if not unresolved:
        return {"status": "ok", "unresolved": [], "hook": None}
    expected = _normalize_file_list(expected_files)
    inside = [u for u in unresolved if u.get("path") in expected]
    outside = [u for u in unresolved if u.get("path") not in expected]
    if outside:
        return {
            "status": "critical_scope_expansion",
            "unresolved": unresolved,
            "outside": outside,
            "hook": CRITICAL_ESCALATION_HOOK,
            "critical_code": CRITICAL_GEMINI_SCOPE_EXPANSION,
        }
    return {
        "status": "auto_triage_candidate",
        "unresolved": unresolved,
        "inside": inside,
        "hook": AUTO_GEMINI_TRIAGE_HOOK,
    }


def classify_gemini_status(gemini_state: dict, *, real_bug: bool = False) -> str:
    """fetch_gemini_status() 결과 + 추가 신호 → enum 7종 중 하나.

    분류 규칙:
      - status == 'ok' AND unresolved == 0 → GEMINI_COMPLETED
      - status == 'critical_scope_expansion' → GEMINI_SCOPE_EXPANSION
      - status == 'auto_triage_candidate' AND real_bug=True → GEMINI_REAL_BUG
      - status == 'auto_triage_candidate' (real_bug=False) → GEMINI_UNRESOLVED
      - status == 'unavailable_quota' → GEMINI_UNAVAILABLE_QUOTA
      - status == 'timeout' → GEMINI_TIMEOUT
      - status == 'stale' → GEMINI_STALE
      - 그 외 → GEMINI_UNRESOLVED (보수적)
    """
    if not gemini_state:
        return GEMINI_UNRESOLVED
    status = (gemini_state.get("status") or "").lower()
    unresolved = gemini_state.get("unresolved") or []
    if status == "ok" and len(unresolved) == 0:
        return GEMINI_COMPLETED
    if status == "critical_scope_expansion":
        return GEMINI_SCOPE_EXPANSION
    if status == "auto_triage_candidate":
        return GEMINI_REAL_BUG if real_bug else GEMINI_UNRESOLVED
    if status == "unavailable_quota":
        return GEMINI_UNAVAILABLE_QUOTA
    if status == "timeout":
        return GEMINI_TIMEOUT
    if status == "stale":
        return GEMINI_STALE
    # 그 외 (보수적 분류)
    return GEMINI_UNRESOLVED


# ─── Risk level / static scan / fallback review (task-2509+1 §3~5) ────────
def assess_risk_level(effective_files: list[str]) -> str:
    """변경 파일 → risk_level 분류."""
    for f in effective_files or []:
        for pat in HIGH_CORE_FILE_PATTERNS:
            if pat.search(f):
                return RISK_LEVEL_HIGH_CORE
    # 회장 §4 명시 외 단순 LOW (MEDIUM은 후행 task에서 정의)
    return RISK_LEVEL_LOW


def static_risky_pattern_scan(
    effective_files: list[str],
    workspace_root: Path = WORKSPACE,
) -> dict:
    """HIGH_CORE 시 변경 파일에 대해 위험 패턴 정적 검사.

    Returns:
      {"passed": bool, "violations": [{"file": str, "pattern": str, "line": int, "snippet": str}, ...]}
    """
    violations: list[dict] = []
    for rel in effective_files or []:
        path = workspace_root / rel
        if not path.exists() or not path.is_file():
            continue
        try:
            text = path.read_text(encoding="utf-8", errors="replace")
        except OSError:
            # 파일 I/O 오류 (권한/경로/심볼릭 등) — 해당 파일은 스캔에서 제외
            continue
        for lineno, line in enumerate(text.splitlines(), start=1):
            # 자기참조 회피: 패턴 정의 라인 자체는 제외 (RISKY_PATTERNS 정의 자리)
            if "RISKY_PATTERNS" in line or "static_risky_pattern_scan" in line:
                continue
            for pat, desc in RISKY_PATTERNS:
                if pat.search(line):
                    violations.append({
                        "file": rel,
                        "pattern": desc,
                        "line": lineno,
                        "snippet": line.strip()[:200],
                    })
    return {"passed": len(violations) == 0, "violations": violations}


def evaluate_fallback_review(
    *,
    diff_ok: bool,                  # effective == expected
    forbidden_count: int,
    ci_status: str,                 # "SUCCESS" 만 PASS
    merge_state_status: str,        # "CLEAN" 만 PASS
    head_sha_lock_ok: bool,
    smoke_command_defined: bool,
    dry_run_decision_pass: bool,
    risk_level: str,
    static_scan_passed: bool,       # HIGH_CORE 시만 의미. LOW/MEDIUM은 True 전달.
) -> dict:
    """8조건 검사. 모두 PASS여야 fallback_review_passed=True.

    Returns:
      {"passed": bool, "checks": {각 조건: bool}, "failed": [str, ...]}
    """
    checks = {
        "effective_diff_equals_expected": bool(diff_ok),
        "forbidden_path_zero": forbidden_count == 0,
        "ci_all_success": (ci_status or "").upper() == "SUCCESS",
        "merge_state_clean": (merge_state_status or "").upper() == "CLEAN",
        "head_sha_lock_ok": bool(head_sha_lock_ok),
        "smoke_command_defined": bool(smoke_command_defined),
        "dry_run_decision_pass": bool(dry_run_decision_pass),
        "static_risky_scan_pass_if_high_core": (
            risk_level != RISK_LEVEL_HIGH_CORE or bool(static_scan_passed)
        ),
    }
    failed = [k for k, v in checks.items() if not v]
    return {"passed": len(failed) == 0, "checks": checks, "failed": failed}


# ─── §8 mergeStateStatus ──────────────────────────────────────────────────
def fetch_merge_state(pr_number: int, runner: RunnerType) -> dict:
    result = runner([
        "gh", "pr", "view", str(pr_number),
        "--json", "mergeStateStatus,headRefOid,baseRefName",
    ])
    try:
        payload = json.loads(result.stdout or "{}")
    except json.JSONDecodeError:
        payload = {}
    return {
        "mergeStateStatus": (payload.get("mergeStateStatus") or "").upper(),
        "headRefOid": payload.get("headRefOid", ""),
        "baseRefName": payload.get("baseRefName", ""),
    }


# ─── §9 HEAD SHA lock ─────────────────────────────────────────────────────
def assert_head_sha_lock(start_sha: str, current_sha: str) -> bool:
    return bool(start_sha) and start_sha == current_sha


# ─── §10 squash merge ────────────────────────────────────────────────────
def execute_squash_merge(pr_number: int, runner: RunnerType) -> dict:
    args = ["gh", "pr", "merge", str(pr_number), "--squash", "--delete-branch"]
    assert_no_forbidden_git_flags(args)
    result = runner(args)
    return {
        "returncode": result.returncode,
        "stdout": (result.stdout or "")[-2000:],
        "stderr": (result.stderr or "")[-2000:],
    }


# ─── §11 post-merge smoke ─────────────────────────────────────────────────
def run_post_merge_smoke(
    smoke_command: Optional[list[str]],
    runner: RunnerType,
) -> dict:
    if not smoke_command:
        return {"status": "skipped", "details": "no smoke_command in task spec"}
    result = runner(smoke_command, timeout=600)
    if result.returncode == 0:
        return {"status": "PASS", "stdout": (result.stdout or "")[-2000:]}
    return {
        "status": "FAIL",
        "stdout": (result.stdout or "")[-2000:],
        "stderr": (result.stderr or "")[-2000:],
        "critical_code": CRITICAL_POST_MERGE_SMOKE,
    }


# ─── §12 후행 stale 재검증 (state machine) ────────────────────────────────
def recheck_following_prs(
    queue: list[dict],
    runner: RunnerType,
) -> list[dict]:
    """queue 다음 PR들의 BEHIND/conflict/diff 오염 자동 재평가 (task-2509+1 §9 보강).

    queue 항목 = {pr_number, expected_files?, prior_effective_files?,
                  prior_main_sha?, gemini_head_sha_at_last_review?}
    반환 = 각 PR별 state. 기존 키(needs_recheck/behind/conflict/blocked) 보존 +
            신규 키(effective_diff_drift/expected_files_maintained/forbidden_path_present/
                   gemini_stale/ci_rerun_needed/current_effective_files).
    """
    states: list[dict] = []
    for entry in queue:
        pr_number = entry.get("pr_number")
        if pr_number is None:
            continue
        view = fetch_merge_state(pr_number, runner)
        ms = view.get("mergeStateStatus", "")
        # observe_pr 호출은 좁은 예외만 받아 prior 값을 유지하도록 한다 (TC-12 회귀 보존).
        # subprocess/json 의존이므로 외부 호출 실패만 흡수하고 그 외는 raise.
        try:
            obs = observe_pr(pr_number, runner)
            current_effective = obs.get("effective_files", []) or []
            current_head = obs.get("headRefOid", "") or view.get("headRefOid", "")
        except (subprocess.SubprocessError, json.JSONDecodeError, OSError) as exc:
            logger.debug("observe_pr fallback for PR %s: %s", pr_number, exc)
            current_effective = entry.get("prior_effective_files", []) or []
            current_head = view.get("headRefOid", "")
        prior_effective = entry.get("prior_effective_files", []) or []
        expected_files = entry.get("expected_files", []) or []
        diff_drift = (
            sorted(set(current_effective)) != sorted(set(prior_effective))
            if prior_effective else False
        )
        expected_subset = (
            (not expected_files)
            or set(current_effective).issubset(set(expected_files))
        )
        forbidden_present = bool(detect_forbidden_paths(current_effective, expected_files))
        gemini_stale = bool(
            entry.get("gemini_head_sha_at_last_review")
            and entry.get("gemini_head_sha_at_last_review") != current_head
        )
        ci_rerun = (ms == "BEHIND") or diff_drift
        states.append({
            "pr_number": pr_number,
            "merge_state_status": ms,
            "needs_recheck": (
                ms in {"BEHIND", "DIRTY", "BLOCKED"}
                or diff_drift or forbidden_present or gemini_stale
            ),
            "behind": ms == "BEHIND",
            "conflict": ms == "DIRTY",
            "blocked": ms == "BLOCKED",
            "effective_diff_drift": diff_drift,
            "expected_files_maintained": expected_subset,
            "forbidden_path_present": forbidden_present,
            "gemini_stale": gemini_stale,
            "ci_rerun_needed": ci_rerun,
            "current_effective_files": list(current_effective),
        })
    return states


# ─── §13 audit/evidence ───────────────────────────────────────────────────
def write_audit(
    decision: QueueDecision,
    task_id: Optional[str],
    no_audit: bool = False,
) -> Optional[Path]:
    if no_audit:
        return None
    AUDIT_DIR.mkdir(parents=True, exist_ok=True)
    EVENTS_DIR.mkdir(parents=True, exist_ok=True)
    decision.timestamp = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.%fZ")
    line = json.dumps(decision.to_dict(), ensure_ascii=False) + "\n"
    with open(GLOBAL_AUDIT_LOG, "a", encoding="utf-8") as fh:
        fh.write(line)
    if task_id:
        per_task = EVENTS_DIR / f"{task_id}.merge-queue.json"
        with open(per_task, "w", encoding="utf-8") as fh:
            json.dump(decision.to_dict(), fh, ensure_ascii=False, indent=2)
        decision.audit_path = str(per_task)
        return per_task
    return GLOBAL_AUDIT_LOG


# ─── §14 critical_escalation_reporter 인터페이스 (task-2513 hook) ────────
def emit_critical_escalation(
    code: str,
    decision: QueueDecision,
    reporter_hook: Optional[Callable[[str, QueueDecision], None]] = None,
) -> None:
    if code not in CRITICAL_CODES:
        raise ValueError(f"UNKNOWN_CRITICAL_CODE: {code}")
    decision.critical_code = code
    if reporter_hook is not None:
        reporter_hook(code, decision)


# ─── task spec loader ────────────────────────────────────────────────────
_TASK_FRONTMATTER_RE = re.compile(r"```yaml\s*\n(.*?)```", re.DOTALL)


def load_task_spec(task_file: Path) -> TaskSpec:
    """task md에서 expected_files / dependency / parallel_policy 등 추출."""
    text = task_file.read_text(encoding="utf-8")
    yaml_block = ""
    m = _TASK_FRONTMATTER_RE.search(text)
    if m:
        yaml_block = m.group(1)
    expected_files = _extract_yaml_list(yaml_block, "expected_files")
    dependency = _extract_yaml_list(yaml_block, "dependency")
    risk_area = _extract_yaml_scalar(yaml_block, "risk_area")
    parallel_policy = _extract_yaml_scalar(yaml_block, "parallel_policy") or "serial_only"
    queue_pos_raw = _extract_yaml_scalar(yaml_block, "merge_queue_position")
    try:
        queue_pos: Any = int(queue_pos_raw) if queue_pos_raw and queue_pos_raw != "n/a" else queue_pos_raw or "n/a"
    except ValueError:
        queue_pos = queue_pos_raw
    stale_raw = _extract_yaml_scalar(yaml_block, "stale_recheck_required") or "false"
    cherry_raw = _extract_yaml_scalar(yaml_block, "cherry_pick_allowed") or "false"
    task_id_match = re.search(r"^# (task-\d+(?:\+\d+)?)", text, re.MULTILINE)
    task_id = task_id_match.group(1) if task_id_match else task_file.stem
    return TaskSpec(
        task_id=task_id,
        expected_files=expected_files,
        risk_area=risk_area or "",
        dependency=dependency,
        parallel_policy=parallel_policy,
        merge_queue_position=queue_pos,
        stale_recheck_required=str(stale_raw).strip().lower() == "true",
        cherry_pick_allowed=str(cherry_raw).strip().lower() == "true",
        smoke_command=None,
    )


def _extract_yaml_list(yaml_block: str, key: str) -> list[str]:
    pattern = rf"^{re.escape(key)}:\s*\n((?:\s*-\s*.+\n?)+)"
    m = re.search(pattern, yaml_block, re.MULTILINE)
    if not m:
        return []
    items: list[str] = []
    for line in m.group(1).splitlines():
        s = line.strip()
        if not s.startswith("-"):
            continue
        raw = s[1:].strip()
        if raw.startswith('"'):
            end = raw.find('"', 1)
            val = raw[1:end] if end > 0 else raw.strip("\"'")
        elif raw.startswith("'"):
            end = raw.find("'", 1)
            val = raw[1:end] if end > 0 else raw.strip("\"'")
        else:
            val = raw.split("#", 1)[0].strip().strip("\"'")
        if val:
            items.append(val)
    return items


def _extract_yaml_scalar(yaml_block: str, key: str) -> str:
    pattern = rf"^{re.escape(key)}:\s*([^\n#]+?)(?:\s*#.*)?$"
    m = re.search(pattern, yaml_block, re.MULTILINE)
    if not m:
        return ""
    val = m.group(1).strip().strip("\"'")
    return val


# ─── 핵심 오케스트레이션 ──────────────────────────────────────────────────
@dataclass
class ExecutorContext:
    runner: RunnerType = field(default_factory=lambda: _default_runner)
    pr_workdir: Optional[str] = None
    smoke_command: Optional[list[str]] = None
    no_audit: bool = False
    main_log_grep: Optional[Callable[[str], bool]] = None
    extra_forbidden_patterns: Optional[list[re.Pattern]] = None
    fixture_main_sha: Optional[str] = None  # fixture replay 용
    sleeper: Callable[[float], None] = field(default=lambda _s: None)
    reporter_hook: Optional[Callable[[str, QueueDecision], None]] = None


def evaluate_pr(
    pr_number: int,
    task_spec: TaskSpec,
    pr_head_sha: str,
    effective_files: list[str],
    merge_state: dict,
    ci_state: dict,
    gemini_state: dict,
    ctx: ExecutorContext,
) -> QueueDecision:
    """10조건 게이트 — 결과 decision 반환 (실제 머지 X, dry-run 가능)."""
    decision = QueueDecision(
        decision="UNKNOWN",
        pr_number=pr_number,
        task_id=task_spec.task_id,
        expected_files=list(task_spec.expected_files),
        effective_files=list(effective_files),
    )

    # task-2509+1 §3 — risk_level 사전 산정
    decision.risk_level = assess_risk_level(effective_files)
    # task-2509+1 §2 — gemini_status enum 분류 (early; 분기 시 사용)
    _gem_real_bug = bool(gemini_state.get("real_bug")) if isinstance(gemini_state, dict) else False
    decision.gemini_status = classify_gemini_status(gemini_state, real_bug=_gem_real_bug)

    # §1 queue head 확인 (선행 PR merged)
    head_ok, pending = check_predecessor_merged(
        task_spec.dependency,
        ctx.runner,
        main_log_grep=ctx.main_log_grep,
    )
    if not head_ok:
        decision.decision = WAITING_FOR_PREDECESSOR
        decision.reason = f"pending: {','.join(pending)}"
        decision.final_decision = decision.decision
        return decision

    # §10 parallel_policy / cherry_pick_allowed gate (선행 검사 직후)
    _VALID_PARALLEL_POLICIES = {"serial_only", "limited_parallel", "parallel_safe"}
    if task_spec.parallel_policy not in _VALID_PARALLEL_POLICIES:
        decision.decision = BLOCKED_WITH_REASON
        decision.reason = f"INVALID_PARALLEL_POLICY: {task_spec.parallel_policy!r}"
        decision.critical_code = CRITICAL_DEPENDENCY_CYCLE
        decision.critical_escalation = CRITICAL_DEPENDENCY_CYCLE
        decision.final_decision = f"{FINAL_CRITICAL_PREFIX}: {CRITICAL_DEPENDENCY_CYCLE}"
        emit_critical_escalation(CRITICAL_DEPENDENCY_CYCLE, decision, ctx.reporter_hook)
        return decision
    _cherry_raw = task_spec.cherry_pick_allowed
    _cherry_bool = (
        _cherry_raw is True
        or (isinstance(_cherry_raw, str) and _cherry_raw.strip().lower() == "true")
    )
    if _cherry_bool:
        decision.decision = BLOCKED_WITH_REASON
        decision.reason = "cherry_pick_allowed=true is forbidden (DEPENDENCY_CYCLE risk)"
        decision.critical_code = CRITICAL_DEPENDENCY_CYCLE
        decision.critical_escalation = CRITICAL_DEPENDENCY_CYCLE
        decision.final_decision = f"{FINAL_CRITICAL_PREFIX}: {CRITICAL_DEPENDENCY_CYCLE}"
        emit_critical_escalation(CRITICAL_DEPENDENCY_CYCLE, decision, ctx.reporter_hook)
        return decision

    # §2 main HEAD SHA fetch
    if ctx.fixture_main_sha:
        main_head_start = ctx.fixture_main_sha
    else:
        main_head_start = fetch_main_head(ctx.runner)
    decision.main_head_sha_start = main_head_start
    decision.pr_head_sha_start = pr_head_sha

    # §3 base sync
    ms = (merge_state.get("mergeStateStatus") or "").upper()
    decision.merge_state_status = ms
    if ms == "BEHIND" and ctx.pr_workdir:
        sync = sync_pr_base(
            _pr_branch=merge_state.get("baseRefName", "main"),
            pr_workdir=ctx.pr_workdir,
            runner=ctx.runner,
            merge_state_status=ms,
        )
        if sync.get("conflict"):
            decision.decision = BLOCKED_WITH_REASON
            decision.reason = "MERGE_CONFLICT_DURING_BASE_SYNC"
            decision.critical_code = CRITICAL_BLOCK_OVERRIDE
            decision.critical_escalation = CRITICAL_BLOCK_OVERRIDE
            decision.final_decision = f"{FINAL_CRITICAL_PREFIX}: {CRITICAL_BLOCK_OVERRIDE}"
            return decision

    # §4 effective diff vs expected
    diff_ok, extra, missing = compare_effective_diff(effective_files, task_spec.expected_files)
    if not diff_ok:
        # forbidden path 우선 검사
        invasions = detect_forbidden_paths(
            effective_files, task_spec.expected_files, ctx.extra_forbidden_patterns,
        )
        if invasions:
            decision.decision = BLOCKED_WITH_REASON
            decision.reason = "FORBIDDEN_PATH"
            decision.forbidden_paths = invasions
            decision.critical_code = CRITICAL_FORBIDDEN_PATH
            decision.critical_escalation = CRITICAL_FORBIDDEN_PATH
            decision.final_decision = f"{FINAL_CRITICAL_PREFIX}: {CRITICAL_FORBIDDEN_PATH}"
            emit_critical_escalation(CRITICAL_FORBIDDEN_PATH, decision, ctx.reporter_hook)
            return decision
        # forbidden 없음 → replacement 분기 (task-2510 hook)
        decision.decision = DIFF_CONTAMINATION_REPLACEMENT
        reason_parts = []
        if extra:
            reason_parts.append(f"extra={extra}")
        if missing:
            reason_parts.append(f"missing={missing}")
        decision.reason = f"diff contamination: {'; '.join(reason_parts)}; hook={REPLACEMENT_PR_RUNNER_HOOK}"
        decision.final_decision = decision.decision
        return decision

    # §5 forbidden path (expected 안에 들어 있어도 forbidden 여부 재확인)
    invasions = detect_forbidden_paths(
        effective_files, task_spec.expected_files, ctx.extra_forbidden_patterns,
    )
    if invasions:
        decision.decision = BLOCKED_WITH_REASON
        decision.reason = "FORBIDDEN_PATH_INSIDE_EXPECTED"
        decision.forbidden_paths = invasions
        decision.critical_code = CRITICAL_FORBIDDEN_PATH
        decision.critical_escalation = CRITICAL_FORBIDDEN_PATH
        decision.final_decision = f"{FINAL_CRITICAL_PREFIX}: {CRITICAL_FORBIDDEN_PATH}"
        return decision

    # §6 CI
    ci_status = ci_state.get("status", "")
    decision.ci_status = ci_status
    if ci_status == CI_FAILURE_BLOCK:
        decision.decision = CI_FAILURE_BLOCK
        decision.reason = f"CI failure: {ci_state.get('details')}"
        decision.final_decision = decision.decision
        return decision
    if ci_status == CI_IN_PROGRESS:
        decision.decision = BLOCKED_WITH_REASON
        decision.reason = "CI_IN_PROGRESS"
        decision.final_decision = f"{FINAL_BLOCKED_PREFIX}: CI_IN_PROGRESS"
        return decision

    # §7 Gemini (task-2509+1 §7 — review_gate_passed 도입)
    gem_status = gemini_state.get("status", "")
    decision.gemini_unresolved_count = len(gemini_state.get("unresolved", []) or [])
    if gem_status == "critical_scope_expansion":
        decision.decision = BLOCKED_WITH_REASON
        decision.reason = "GEMINI_REAL_BUG_OUTSIDE_EXPECTED_FILES"
        decision.critical_code = CRITICAL_GEMINI_SCOPE_EXPANSION
        decision.critical_escalation = CRITICAL_GEMINI_SCOPE_EXPANSION
        decision.final_decision = f"{FINAL_CRITICAL_PREFIX}: {CRITICAL_GEMINI_SCOPE_EXPANSION}"
        return decision
    if gem_status == "auto_triage_candidate":
        # 회귀 보존: 기존 GEMINI_UNRESOLVED_BLOCK 유지
        decision.decision = GEMINI_UNRESOLVED_BLOCK
        decision.reason = f"unresolved (auto_gemini_triage hook); count={decision.gemini_unresolved_count}"
        decision.final_decision = decision.decision
        return decision

    # task-2509+1 §7 — Gemini 미가용 (quota/timeout/stale) 시 fallback review
    if decision.gemini_status in GEMINI_UNAVAILABLE_STATUSES:
        decision.fallback_review_used = True
        # HIGH_CORE 시 정적 패턴 스캔 수행
        if decision.risk_level == RISK_LEVEL_HIGH_CORE:
            scan = static_risky_pattern_scan(effective_files)
            decision.static_scan_violations = list(scan.get("violations") or [])
            static_scan_passed = bool(scan.get("passed"))
        else:
            static_scan_passed = True

        # 8조건 평가 (Gemini 신호 없이 자율 통과를 막는 박제)
        fb = evaluate_fallback_review(
            diff_ok=diff_ok,
            forbidden_count=0,  # 여기 도달 = 위 §4/§5 통과
            ci_status=ci_status if ci_status else "",
            merge_state_status=ms,
            head_sha_lock_ok=True,  # evaluate 시점 (verify_head_lock_then_merge에서 재확인)
            smoke_command_defined=bool(ctx.smoke_command),
            dry_run_decision_pass=True,  # evaluate 단계는 dry-run 결정 통과
            risk_level=decision.risk_level or RISK_LEVEL_LOW,
            static_scan_passed=static_scan_passed,
        )
        decision.fallback_check_details = fb
        decision.fallback_review_passed = bool(fb.get("passed"))
        if not fb.get("passed"):
            decision.decision = BLOCKED_WITH_REASON
            decision.reason = (
                f"{FALLBACK_REVIEW_FAILED}: {','.join(fb.get('failed') or [])}; gemini_status={decision.gemini_status}"
            )
            decision.review_gate_passed = False
            decision.final_decision = f"{FINAL_BLOCKED_PREFIX}: {FALLBACK_REVIEW_FAILED}"
            return decision
        decision.review_gate_passed = True
        # fallthrough → §8 mergeStateStatus 검사
    elif decision.gemini_status == GEMINI_COMPLETED:
        decision.review_gate_passed = True
        decision.fallback_review_used = False

    # §8 mergeStateStatus CLEAN
    if ms != "CLEAN":
        if ms == "BLOCKED":
            decision.decision = BLOCKED_WITH_REASON
            decision.reason = "MERGE_STATE_BLOCKED"
            decision.critical_code = CRITICAL_BLOCK_OVERRIDE
            decision.critical_escalation = CRITICAL_BLOCK_OVERRIDE
            decision.final_decision = f"{FINAL_CRITICAL_PREFIX}: {CRITICAL_BLOCK_OVERRIDE}"
            emit_critical_escalation(CRITICAL_BLOCK_OVERRIDE, decision, ctx.reporter_hook)
            return decision
        if ms == "BEHIND":
            decision.decision = BLOCKED_WITH_REASON
            decision.reason = "MERGE_STATE_BEHIND_AFTER_SYNC"
            decision.final_decision = f"{FINAL_BLOCKED_PREFIX}: MERGE_STATE_BEHIND_AFTER_SYNC"
            return decision
        decision.decision = MERGE_STATE_NOT_CLEAN
        decision.reason = f"mergeStateStatus={ms}"
        decision.final_decision = decision.decision
        return decision

    # task-2509+1 §1 — review_gate_passed=False 방어 가드 (Gemini 코드리뷰 critical 수용).
    # GEMINI_UNRESOLVED / 분류되지 않은 Gemini 상태가 §7의 명시 분기(auto_triage_candidate /
    # critical_scope_expansion / UNAVAILABLE / COMPLETED) 중 어디에도 매칭되지 않을 경우,
    # review_gate_passed가 False로 남은 채 §8/§9를 통과해 AUTO_MERGE_ALLOWED에 도달할 수 있다.
    # 이 시나리오를 fail-closed로 차단한다.
    if not decision.review_gate_passed:
        decision.decision = BLOCKED_WITH_REASON
        decision.reason = (
            f"REVIEW_GATE_NOT_PASSED: gemini_status={decision.gemini_status}; "
            f"fallback_review_used={decision.fallback_review_used}"
        )
        decision.final_decision = f"{FINAL_BLOCKED_PREFIX}: REVIEW_GATE_NOT_PASSED"
        return decision

    # §9 HEAD SHA lock — 호출자 측에서 merge 직전 다시 확인 (verify_head_lock_then_merge)
    decision.decision = AUTO_MERGE_ALLOWED
    decision.reason = "all 10 gates PASS"
    decision.final_decision = decision.decision
    return decision


def verify_head_lock_then_merge(
    decision: QueueDecision,
    pr_number: int,
    ctx: ExecutorContext,
    fetch_pr_head_at_merge: Callable[[int], str],
    fetch_main_head_at_merge: Optional[Callable[[], str]] = None,
    dry_run: bool = True,
) -> QueueDecision:
    """§9 + §10 — head SHA 재확인 후 squash merge (dry_run=False 시 실제 실행)."""
    if decision.decision != AUTO_MERGE_ALLOWED:
        return decision
    pr_head_now = fetch_pr_head_at_merge(pr_number)
    decision.pr_head_sha_merge = pr_head_now
    if not assert_head_sha_lock(decision.pr_head_sha_start or "", pr_head_now):
        decision.decision = HEAD_SHA_LOCK_BROKEN
        decision.reason = (
            f"PR head changed: start={decision.pr_head_sha_start} merge={pr_head_now}"
        )
        decision.final_decision = decision.decision
        return decision
    if fetch_main_head_at_merge is not None:
        main_now = fetch_main_head_at_merge()
        decision.main_head_sha_merge = main_now
        if decision.main_head_sha_start and main_now != decision.main_head_sha_start:
            decision.decision = HEAD_SHA_LOCK_BROKEN
            decision.reason = (
                f"main head changed: start={decision.main_head_sha_start} merge={main_now}"
            )
            decision.final_decision = decision.decision
            return decision
    # task-2509+1 §5 — non-dry-run + smoke_command 미정의 → BLOCK
    if not dry_run and not ctx.smoke_command:
        decision.decision = BLOCKED_WITH_REASON
        decision.reason = NON_DRY_RUN_REQUIRES_SMOKE_COMMAND
        decision.final_decision = f"{FINAL_BLOCKED_PREFIX}: {NON_DRY_RUN_REQUIRES_SMOKE_COMMAND}"
        return decision
    if dry_run:
        decision.final_decision = decision.decision
        return decision
    merge_result = execute_squash_merge(pr_number, ctx.runner)
    if merge_result["returncode"] != 0:
        decision.decision = BLOCKED_WITH_REASON
        decision.reason = f"squash merge failed: {merge_result['stderr']}"
        decision.final_decision = f"{FINAL_BLOCKED_PREFIX}: SQUASH_MERGE_FAILED"
        return decision

    # §11 post-merge 후처리 파이프라인
    # (a) git fetch origin --quiet
    if ctx.pr_workdir is not None:
        ctx.runner(["git", "fetch", "origin", "--quiet"], cwd=ctx.pr_workdir)
        # (b) main fast-forward
        ff_result = ctx.runner(
            ["git", "merge", "--ff-only", "origin/main"],
            cwd=ctx.pr_workdir,
        )
        ff_ok = ff_result.returncode == 0
    else:
        ff_ok = None  # pr_workdir 없으면 fast-forward skip

    # (c) run_post_merge_smoke (fast-forward 결과 포함)
    if ff_ok is False:
        # fast-forward 실패 → smoke 결과를 FAIL_FAST_FORWARD로 표시
        smoke_status_str = "FAIL_FAST_FORWARD"
        decision.smoke_status = smoke_status_str
        decision.decision = BLOCKED_WITH_REASON
        decision.reason = "POST_MERGE_SMOKE_FAILURE: fast-forward failed before smoke"
        decision.critical_code = CRITICAL_POST_MERGE_SMOKE
        decision.critical_escalation = CRITICAL_POST_MERGE_SMOKE
        decision.final_decision = f"{FINAL_CRITICAL_PREFIX}: {CRITICAL_POST_MERGE_SMOKE}"
        emit_critical_escalation(CRITICAL_POST_MERGE_SMOKE, decision, ctx.reporter_hook)
        return decision
    elif ff_ok is None:
        # pr_workdir이 None → skip fast-forward, reason에 명시
        smoke = run_post_merge_smoke(ctx.smoke_command, ctx.runner)
        decision.smoke_status = smoke.get("status", "")
        if smoke.get("status") == "FAIL":
            decision.decision = BLOCKED_WITH_REASON
            decision.reason = "POST_MERGE_SMOKE_FAILURE (ff-only skipped: no pr_workdir)"
            decision.critical_code = CRITICAL_POST_MERGE_SMOKE
            decision.critical_escalation = CRITICAL_POST_MERGE_SMOKE
            decision.final_decision = f"{FINAL_CRITICAL_PREFIX}: {CRITICAL_POST_MERGE_SMOKE}"
            emit_critical_escalation(CRITICAL_POST_MERGE_SMOKE, decision, ctx.reporter_hook)
            return decision
    else:
        smoke = run_post_merge_smoke(ctx.smoke_command, ctx.runner)
        decision.smoke_status = smoke.get("status", "")
        if smoke.get("status") == "FAIL":
            decision.decision = BLOCKED_WITH_REASON
            decision.reason = "POST_MERGE_SMOKE_FAILURE"
            decision.critical_code = CRITICAL_POST_MERGE_SMOKE
            decision.critical_escalation = CRITICAL_POST_MERGE_SMOKE
            decision.final_decision = f"{FINAL_CRITICAL_PREFIX}: {CRITICAL_POST_MERGE_SMOKE}"
            emit_critical_escalation(CRITICAL_POST_MERGE_SMOKE, decision, ctx.reporter_hook)
            return decision

    # (d) recheck_following_prs — 후행 PR stale 재검증
    # ctx에 following_queue가 있으면 재검증 수행 (기본 빈 리스트)
    _following_queue: list[dict] = getattr(ctx, "following_queue", []) or []
    if _following_queue:
        following_states = recheck_following_prs(_following_queue, ctx.runner)
        decision.fixture_pr_replay = list(decision.fixture_pr_replay) + following_states

    decision.decision = AUTO_MERGE_SUCCESS
    decision.reason = "merged + smoke PASS"
    decision.final_decision = decision.decision
    return decision


# ─── PR observation helpers (gh CLI) ──────────────────────────────────────
def observe_pr(pr_number: int, runner: RunnerType) -> dict:
    result = runner([
        "gh", "pr", "view", str(pr_number),
        "--json", "headRefOid,baseRefName,mergeStateStatus,files",
    ])
    try:
        payload = json.loads(result.stdout or "{}")
    except json.JSONDecodeError:
        payload = {}
    files = [f.get("path", "") for f in (payload.get("files") or []) if f.get("path")]
    return {
        "headRefOid": payload.get("headRefOid", ""),
        "baseRefName": payload.get("baseRefName", "main"),
        "mergeStateStatus": (payload.get("mergeStateStatus") or "").upper(),
        "effective_files": files,
    }


# ─── CLI entrypoint ───────────────────────────────────────────────────────
def main(argv: Optional[list[str]] = None) -> int:
    parser = argparse.ArgumentParser(description="merge_queue_executor (task-2509)")
    parser.add_argument("--pr", type=int, required=True, help="PR 번호")
    parser.add_argument("--task-file", type=str, default="", help="queue 선두 task spec md")
    parser.add_argument("--dry-run", action="store_true", default=True, help="dry-run (default true)")
    parser.add_argument("--no-dry-run", dest="dry_run", action="store_false", help="실제 머지 수행")
    parser.add_argument("--no-audit", action="store_true", help="audit log 기록 안 함 (테스트용)")
    parser.add_argument("--smoke-command", type=str, default="", help="post-merge smoke 명령")
    parser.add_argument("--workspace", type=str, default=str(WORKSPACE), help="workspace root")
    parser.add_argument("--ci-max-polls", type=int, default=5, help="CI 상태 polling 최대 횟수 (default 5)")
    parser.add_argument("--ci-backoff-seconds", type=float, default=10.0, help="CI polling 지수 백오프 기본값 (default 10.0)")
    args = parser.parse_args(argv)

    # task spec 로드
    if args.task_file:
        task_path = Path(args.task_file)
    else:
        # PR 번호에서 task ID 추론은 어려우므로 task_file 필수
        print(json.dumps({
            "decision": BLOCKED_WITH_REASON,
            "reason": "missing --task-file",
        }, ensure_ascii=False))
        return 2
    if not task_path.exists():
        print(json.dumps({
            "decision": BLOCKED_WITH_REASON,
            "reason": f"task file not found: {task_path}",
        }, ensure_ascii=False))
        return 2
    spec = load_task_spec(task_path)

    runner = _default_runner
    pr_obs = observe_pr(args.pr, runner)
    ci = fetch_ci_status(args.pr, runner, max_polls=args.ci_max_polls, backoff_seconds=args.ci_backoff_seconds)
    gem = fetch_gemini_status(args.pr, runner, spec.expected_files)
    ctx = ExecutorContext(
        runner=runner,
        pr_workdir=str(WORKSPACE),
        smoke_command=args.smoke_command.split() if args.smoke_command else None,
        no_audit=args.no_audit,
    )
    decision = evaluate_pr(
        pr_number=args.pr,
        task_spec=spec,
        pr_head_sha=pr_obs.get("headRefOid", ""),
        effective_files=pr_obs.get("effective_files", []),
        merge_state={
            "mergeStateStatus": pr_obs.get("mergeStateStatus", ""),
            "baseRefName": pr_obs.get("baseRefName", "main"),
        },
        ci_state=ci,
        gemini_state=gem,
        ctx=ctx,
    )
    decision = verify_head_lock_then_merge(
        decision=decision,
        pr_number=args.pr,
        ctx=ctx,
        fetch_pr_head_at_merge=lambda n: observe_pr(n, runner).get("headRefOid", ""),
        fetch_main_head_at_merge=lambda: fetch_main_head(runner),
        dry_run=args.dry_run,
    )
    write_audit(decision, spec.task_id, no_audit=args.no_audit)
    print(json.dumps(decision.to_dict(), ensure_ascii=False, indent=2))
    if decision.decision in {AUTO_MERGE_ALLOWED, AUTO_MERGE_SUCCESS}:
        return 0
    if decision.decision == WAITING_FOR_PREDECESSOR:
        return 3
    return 1


if __name__ == "__main__":  # pragma: no cover
    sys.exit(main())
