"""task-2514 end-to-end 회귀 테스트 — 5 모듈 오케스트레이션 14 케이스.

QA 담당: 모리건(Morrigan)
대상: 5 모듈 wiring (merge_queue_executor + replacement_pr_runner +
        auto_gemini_triage + post_merge_smoke_runner + critical_escalation_reporter)

케이스 목록:
  TC-N1  clean PR + 자동 머지 10조건 충족 → AUTO_MERGE_SUCCESS + smoke PASS + 후행 stale 재검증 OK
  TC-N2  Gemini outdated thread × 5 → auto_gemini_triage가 모두 resolve → review_gate_passed → squash merge → smoke PASS
  TC-N3  Gemini quota → fallback_review_passed → review_gate_passed → squash merge → smoke PASS
  TC-N4  clean replacement (오염 PR) → replacement_pr_runner 새 PR 생성 + 원 PR 보존 → 새 PR 자동 머지
  TC-C1  Critical #1 forbidden path → critical_escalation_reporter packet 생성
  TC-C2  Critical #2 replacement_pr_auto_creation_failed → packet 생성
  TC-C3  Critical #3 gemini_real_bug_requires_scope_expansion → packet 생성
  TC-C4  Critical #4 block_override_required → packet 생성
  TC-C5  Critical #5 dependency_cycle_or_serial_only_collision → packet 생성
  TC-C6  Critical #6 replacement_pr_failed → packet 생성
  TC-C7  Critical #7 post_merge_smoke_failed → packet 생성
  TC-A1  false-positive Gemini suppression → auto_gemini_triage dismiss + review_gate_passed → 회장 보고 0건
  TC-A2  style-only Gemini suppression → 동일
  TC-A3  dependency satisfied 자동 판정 → merge_topology_gate ALLOW → 자동 진행

wiring 인터페이스 상태 (2026-05-09 기준):
  - ExecutorContext 신규 필드(replacement_runner, triage_fn, smoke_envelope_fn 등) 미구현 (루 WIP).
  - 현재 테스트는 공개 인터페이스(evaluate_pr / verify_head_lock_then_merge / process_event / triage_pr)
    를 직접 호출하여 wiring 완성 후와 동일한 흐름을 검증한다.
  - ExecutorContext 신규 필드가 추가될 경우 _make_ctx() 헬퍼만 업데이트하면 14 케이스 전부 재사용 가능.
"""

from __future__ import annotations

import dataclasses
import subprocess
import sys
from pathlib import Path
import pytest

# ─── sys.path 설정 ──────────────────────────────────────────────────────────
WORKSPACE = Path(__file__).resolve().parent.parent.parent
if str(WORKSPACE) in sys.path:
    sys.path.remove(str(WORKSPACE))
sys.path.insert(0, str(WORKSPACE))

# ─── 핵심 import ────────────────────────────────────────────────────────────
from utils.merge_queue_executor import (  # noqa: E402
    AUTO_MERGE_ALLOWED,
    AUTO_MERGE_SUCCESS,
    BLOCKED_WITH_REASON,
    CI_FAILURE_BLOCK,
    CRITICAL_FORBIDDEN_PATH,
    CRITICAL_DIFF_REPLACEMENT_FAILED,
    CRITICAL_GEMINI_SCOPE_EXPANSION,
    CRITICAL_BLOCK_OVERRIDE,
    CRITICAL_DEPENDENCY_CYCLE,
    CRITICAL_REPLACEMENT_FAILED,
    CRITICAL_POST_MERGE_SMOKE,
    DIFF_CONTAMINATION_REPLACEMENT,
    ExecutorContext,
    QueueDecision,
    TaskSpec,
    evaluate_pr,
    verify_head_lock_then_merge,
)
from utils.automation_contracts import (  # noqa: E402
    CriticalEscalationType,
    GeminiStatus,
    ReviewGateStatus,
    GeminiTriageResult,
    ReplacementResult,
    EscalationPacket,
    SmokeResult,
)
from utils.auto_gemini_triage import (  # noqa: E402
    TriageVerdict,
    ThreadTriageOutcome,
    TriageReport,
    triage_pr,
)
from utils.post_merge_smoke_runner import (  # noqa: E402
    PostMergeSmokeRun,
    SmokeStatus,
)
from utils.critical_escalation_reporter import (  # noqa: E402
    process_event,
    LEGACY_CRITICAL_MAP,
)
from utils.replacement_pr_runner import (  # noqa: E402
    ReplacementPRRunner,
)


# ═══════════════════════════════════════════════════════════════════════════════
# helpers
# ═══════════════════════════════════════════════════════════════════════════════

def cp(returncode: int = 0, stdout: str = "", stderr: str = "") -> subprocess.CompletedProcess:
    """CompletedProcess 생성 헬퍼."""
    return subprocess.CompletedProcess(args=[], returncode=returncode, stdout=stdout, stderr=stderr)


def make_runner(returns_by_args: dict | None = None):
    """args 패턴별 CompletedProcess 반환 fake runner.

    key = tuple of arg tokens to match (all must be present),
    value = CompletedProcess to return.
    Unmatched → returncode=0, empty stdout/stderr.
    """
    if returns_by_args is None:
        returns_by_args = {}
    calls: list[dict] = []

    def runner(args, cwd=None, timeout=60):
        calls.append({"args": list(args), "cwd": cwd, "timeout": timeout})
        for pattern, completed in returns_by_args.items():
            if all(p in args for p in pattern):
                return completed
        return cp()

    runner.calls = calls  # type: ignore[attr-defined]
    return runner


# ─── 공통 상수 ────────────────────────────────────────────────────────────────

EXPECTED_FILES = [
    "utils/merge_queue_executor.py",
    "tests/regression/test_orchestration_runtime_2514.py",
]

CLEAN_MERGE_STATE = {
    "mergeStateStatus": "CLEAN",
    "headRefOid": "sha-clean-001",
    "baseRefName": "main",
}

CI_OK = {"status": "SUCCESS", "details": ["SUCCESS"], "raw": []}
GEMINI_OK = {"status": "ok", "unresolved": [], "hook": None}


def make_task_spec(**overrides) -> TaskSpec:
    base = dict(
        task_id="task-2514",
        expected_files=list(EXPECTED_FILES),
        risk_area="orchestration_runtime",
        dependency=[],
        parallel_policy="serial_only",
        merge_queue_position=11,
        stale_recheck_required=True,
        cherry_pick_allowed=False,
        smoke_command=None,
    )
    base.update(overrides)
    return TaskSpec(**base)


def _make_ctx(
    runner=None,
    smoke_command=None,
    fixture_main_sha: str = "main-sha-fixture-001",
) -> ExecutorContext:
    """기본 ExecutorContext 생성 헬퍼 (현재 wiring 미완성 대응)."""
    if runner is None:
        runner = make_runner()
    return ExecutorContext(
        runner=runner,
        smoke_command=smoke_command,
        no_audit=True,
        fixture_main_sha=fixture_main_sha,
    )


# ─── fake_triage_fn : OUTDATED thread × N → 모두 auto_resolved ───────────────

def fake_triage_fn(
    pr_number,
    threads,
    pr_head_sha,
    fix_commits,
    expected_files,
    forbidden_paths,
    apply,
    task_id,
) -> TriageReport:
    """OUTDATED만 있는 케이스 → 모두 auto_resolved → review_gate_passed=True."""
    outcomes = [
        ThreadTriageOutcome(
            thread_id=f"th-{i}",
            verdict=TriageVerdict.OUTDATED,
            auto_resolved=True,
            dismiss_comment="[AUTO-OUTDATED] ...",
            escalation_type=None,
            evidence={"outdated": True},
        )
        for i, _ in enumerate(threads)
    ]
    review_gate = ReviewGateStatus(
        gemini_status=GeminiStatus.GEMINI_COMPLETED,
        unresolved_threads=0,
        fallback_review_used=False,
        fallback_review_passed=False,
        review_gate_passed=True,
        reason="all auto_resolved",
    )
    summary = GeminiTriageResult(
        status=GeminiStatus.GEMINI_COMPLETED,
        false_positive_count=0,
        style_only_count=0,
        real_bug_small_count=0,
        scope_expansion_count=0,
        unresolved_count=0,
        actions_taken=[],
    )
    return TriageReport(
        pr_number=pr_number,
        threads=outcomes,
        unresolved_count=0,
        auto_resolved_count=len(outcomes),
        blocking_thread_count=0,
        merge_readiness=True,
        review_gate_status=review_gate,
        triage_summary=summary,
    )


def fake_smoke_envelope_pass(
    *,
    task_file,
    merge_commit,
    dry_run,
    runner,
    pr_number,
    skip_stale_check,
) -> PostMergeSmokeRun:
    """smoke PASS envelope."""
    return PostMergeSmokeRun(
        merge_commit=merge_commit,
        task_id="task-2514",
        status=SmokeStatus.PASS,
        smoke_result=SmokeResult(
            command="pytest",
            passed=True,
            exit_code=0,
            stdout_tail="",
            stderr_tail="",
            failure_reason=None,
        ),
        duration_ms=1000,
        smoke_command=["pytest"],
        allow_continuation=True,
        escalation=None,
        stale=False,
        dry_run=dry_run,
    )


# ═══════════════════════════════════════════════════════════════════════════════
# 정상 흐름 (4건)
# ═══════════════════════════════════════════════════════════════════════════════

def test_tc_n1_clean_pr_auto_merge_success_with_stale_recheck() -> None:
    """TC-N1: clean PR + 자동 머지 10조건 충족 → AUTO_MERGE_SUCCESS + smoke PASS + 후행 stale 재검증 OK.

    시나리오:
      - effective_files == expected_files (오염 없음)
      - CI 통과 / Gemini OK / mergeStateStatus=CLEAN
      - squash merge → returncode=0
      - smoke PASS
      - following_queue PR#201 → CLEAN 상태 → needs_recheck=False
    검증:
      - decision.decision == AUTO_MERGE_SUCCESS
      - smoke_status == "PASS"
      - fixture_pr_replay에 PR#201 상태 기록 (needs_recheck=False)
    """
    import json

    smoke_cmd = ["pytest", "tests/regression/test_merge_queue_executor_2509.py", "-q"]
    spec = make_task_spec(smoke_command=smoke_cmd)

    def fake_runner(args, cwd=None, timeout=60):
        args_list = list(args)
        # squash merge
        if "merge" in args_list and "--squash" in args_list:
            return cp(returncode=0, stdout="PR merged")
        # smoke
        if "pytest" in args_list:
            return cp(returncode=0, stdout="5 passed, 0 failed")
        # following PR stale recheck (PR#201)
        if "201" in args_list and "--json" in args_list:
            payload = {
                "mergeStateStatus": "CLEAN",
                "headRefOid": "sha-201-clean",
                "baseRefName": "main",
            }
            return cp(returncode=0, stdout=json.dumps(payload))
        return cp()

    ctx = ExecutorContext(
        runner=fake_runner,
        no_audit=True,
        fixture_main_sha="main-sha-n1",
        smoke_command=smoke_cmd,
    )
    # following_queue inject (getattr 패턴 - verify_head_lock_then_merge에서 사용)
    ctx.following_queue = [{"pr_number": 201}]  # type: ignore[attr-defined]

    decision = evaluate_pr(
        pr_number=101,
        task_spec=spec,
        pr_head_sha="sha-pr-n1",
        effective_files=list(EXPECTED_FILES),
        merge_state=CLEAN_MERGE_STATE,
        ci_state=CI_OK,
        gemini_state=GEMINI_OK,
        ctx=ctx,
    )
    assert decision.decision == AUTO_MERGE_ALLOWED, (
        f"evaluate_pr must return AUTO_MERGE_ALLOWED, got {decision.decision}"
    )

    result = verify_head_lock_then_merge(
        decision=decision,
        pr_number=101,
        ctx=ctx,
        fetch_pr_head_at_merge=lambda n: "sha-pr-n1",
        dry_run=False,
    )

    assert result.decision == AUTO_MERGE_SUCCESS, (
        f"TC-N1: expected AUTO_MERGE_SUCCESS, got {result.decision} / reason={result.reason}"
    )
    assert result.smoke_status == "PASS", (
        f"TC-N1: expected smoke_status=PASS, got {result.smoke_status}"
    )
    # 후행 stale 재검증 결과가 fixture_pr_replay에 기록
    assert any(
        s.get("pr_number") == 201 for s in result.fixture_pr_replay
    ), f"TC-N1: PR#201 stale recheck must appear in fixture_pr_replay: {result.fixture_pr_replay}"
    pr201_state = next(s for s in result.fixture_pr_replay if s.get("pr_number") == 201)
    assert pr201_state["needs_recheck"] is False, (
        f"TC-N1: PR#201 CLEAN → needs_recheck must be False: {pr201_state}"
    )


def test_tc_n2_gemini_outdated_threads_auto_resolve_then_merge() -> None:
    """TC-N2: Gemini outdated thread × 5 → auto_gemini_triage가 모두 resolve → review_gate_passed → squash merge → smoke PASS.

    시나리오:
      - 5개 outdated thread → triage_pr() → 모두 OUTDATED + auto_resolved=True
      - review_gate_status.review_gate_passed=True
      - to_legacy_gemini_state → status='completed'
      - evaluate_pr (Gemini COMPLETED) → AUTO_MERGE_ALLOWED
      - squash merge + smoke → AUTO_MERGE_SUCCESS
    """
    from utils.auto_gemini_triage import triage_pr, to_legacy_gemini_state

    outdated_threads = [
        {"id": f"th-{i}", "isOutdated": True, "isResolved": False, "comments": []}
        for i in range(5)
    ]
    report = triage_pr(
        pr_number=102,
        threads=outdated_threads,
        pr_head_sha="sha-pr-n2",
        fix_commits=[],
        expected_files=list(EXPECTED_FILES),
        forbidden_paths=[],
        apply=False,
        task_id="task-2514",
    )

    # triage 결과 검증
    assert report.auto_resolved_count == 5, (
        f"TC-N2: 5 outdated threads must all auto_resolve, got {report.auto_resolved_count}"
    )
    assert report.blocking_thread_count == 0, (
        f"TC-N2: blocking must be 0, got {report.blocking_thread_count}"
    )
    assert report.review_gate_status.review_gate_passed is True, (
        f"TC-N2: review_gate_passed must be True"
    )
    assert report.merge_readiness is True, (
        f"TC-N2: merge_readiness must be True"
    )

    # legacy state 변환 → evaluate_pr 입력으로 사용
    legacy_state = to_legacy_gemini_state(report)
    assert legacy_state["status"] == "completed", (
        f"TC-N2: legacy status must be 'completed', got {legacy_state['status']}"
    )

    smoke_cmd = ["pytest", "tests/regression/test_merge_queue_executor_2509.py", "-q"]
    spec = make_task_spec(smoke_command=smoke_cmd)

    def fake_runner(args, cwd=None, timeout=60):
        args_list = list(args)
        if "merge" in args_list and "--squash" in args_list:
            return cp(returncode=0, stdout="PR squash merged")
        if "pytest" in args_list:
            return cp(returncode=0, stdout="all passed")
        return cp()

    ctx = ExecutorContext(
        runner=fake_runner,
        no_audit=True,
        fixture_main_sha="main-sha-n2",
        smoke_command=smoke_cmd,
    )

    # Gemini COMPLETED 상태 → 정상 흐름
    gemini_completed = {"status": "ok", "unresolved": [], "hook": None}
    decision = evaluate_pr(
        pr_number=102,
        task_spec=spec,
        pr_head_sha="sha-pr-n2",
        effective_files=list(EXPECTED_FILES),
        merge_state=CLEAN_MERGE_STATE,
        ci_state=CI_OK,
        gemini_state=gemini_completed,
        ctx=ctx,
    )
    assert decision.decision == AUTO_MERGE_ALLOWED, (
        f"TC-N2: evaluate_pr must return AUTO_MERGE_ALLOWED, got {decision.decision}"
    )

    result = verify_head_lock_then_merge(
        decision=decision,
        pr_number=102,
        ctx=ctx,
        fetch_pr_head_at_merge=lambda n: "sha-pr-n2",
        dry_run=False,
    )
    assert result.decision == AUTO_MERGE_SUCCESS, (
        f"TC-N2: expected AUTO_MERGE_SUCCESS, got {result.decision}"
    )
    assert result.smoke_status == "PASS", (
        f"TC-N2: expected smoke PASS, got {result.smoke_status}"
    )


def test_tc_n3_gemini_quota_fallback_review_then_merge() -> None:
    """TC-N3: Gemini quota → fallback_review_passed → review_gate_passed → squash merge → smoke PASS.

    시나리오:
      - gemini_state.status='unavailable_quota'
      - fallback review 8조건 PASS → review_gate_passed=True
      - squash merge + smoke PASS → AUTO_MERGE_SUCCESS
    """
    smoke_cmd = ["pytest", "tests/regression/test_merge_queue_executor_2509.py", "-q"]
    spec = make_task_spec(smoke_command=smoke_cmd)

    def fake_runner(args, cwd=None, timeout=60):
        args_list = list(args)
        if "merge" in args_list and "--squash" in args_list:
            return cp(returncode=0, stdout="PR merged")
        if "pytest" in args_list:
            return cp(returncode=0, stdout="passed")
        return cp()

    ctx = ExecutorContext(
        runner=fake_runner,
        no_audit=True,
        fixture_main_sha="main-sha-n3",
        smoke_command=smoke_cmd,
    )

    gemini_quota = {"status": "unavailable_quota", "unresolved": [], "hook": None, "errors": [{"message": "quota exceeded"}]}
    decision = evaluate_pr(
        pr_number=103,
        task_spec=spec,
        pr_head_sha="sha-pr-n3",
        effective_files=list(EXPECTED_FILES),
        merge_state=CLEAN_MERGE_STATE,
        ci_state=CI_OK,
        gemini_state=gemini_quota,
        ctx=ctx,
    )
    assert decision.decision == AUTO_MERGE_ALLOWED, (
        f"TC-N3: fallback should pass → AUTO_MERGE_ALLOWED, got {decision.decision} / reason={decision.reason}"
    )
    assert decision.fallback_review_used is True, (
        f"TC-N3: fallback_review_used must be True"
    )
    assert decision.fallback_review_passed is True, (
        f"TC-N3: fallback_review_passed must be True"
    )
    assert decision.review_gate_passed is True, (
        f"TC-N3: review_gate_passed must be True"
    )

    result = verify_head_lock_then_merge(
        decision=decision,
        pr_number=103,
        ctx=ctx,
        fetch_pr_head_at_merge=lambda n: "sha-pr-n3",
        dry_run=False,
    )
    assert result.decision == AUTO_MERGE_SUCCESS, (
        f"TC-N3: expected AUTO_MERGE_SUCCESS, got {result.decision}"
    )
    assert result.smoke_status == "PASS", (
        f"TC-N3: expected smoke PASS, got {result.smoke_status}"
    )


def test_tc_n4_contaminated_pr_replacement_then_auto_merge() -> None:
    """TC-N4: clean replacement (오염 PR) → replacement_pr_runner가 새 PR 생성 + 원 PR 보존 → 새 PR 자동 머지.

    시나리오:
      - 원 PR effective_files에 rogue 파일 → DIFF_CONTAMINATION_REPLACEMENT
      - ReplacementPRRunner.execute() dry_run=True → success=True, original_pr_preserved=True
      - replacement PR 존재 → evaluate_pr (clean files) → AUTO_MERGE_ALLOWED
    """
    spec = make_task_spec()

    # 원 PR → 오염 파일 포함
    ctx = _make_ctx()
    contaminated_decision = evaluate_pr(
        pr_number=104,
        task_spec=spec,
        pr_head_sha="sha-pr-n4-contaminated",
        effective_files=list(EXPECTED_FILES) + ["utils/rogue_extra_file.py"],
        merge_state=CLEAN_MERGE_STATE,
        ci_state=CI_OK,
        gemini_state=GEMINI_OK,
        ctx=ctx,
    )
    assert contaminated_decision.decision == DIFF_CONTAMINATION_REPLACEMENT, (
        f"TC-N4: contaminated PR must get DIFF_CONTAMINATION_REPLACEMENT, got {contaminated_decision.decision}"
    )

    # replacement_pr_runner dry_run → 새 PR 시뮬레이션
    replacement_runner = ReplacementPRRunner(
        runner=make_runner(),
        dry_run=True,
    )
    replacement_result = replacement_runner.execute(104, task_spec=spec)
    assert replacement_result.success is True, (
        f"TC-N4: ReplacementPRRunner.execute dry_run must succeed: {replacement_result.failure_reason}"
    )
    assert replacement_result.original_pr_preserved is True, (
        f"TC-N4: original PR must be preserved"
    )
    assert replacement_result.source_pr == 104, (
        f"TC-N4: source_pr must be 104, got {replacement_result.source_pr}"
    )

    # 새 PR (clean) → evaluate_pr → AUTO_MERGE_ALLOWED
    smoke_cmd = ["pytest", "-q"]
    spec_clean = make_task_spec(smoke_command=smoke_cmd)

    def fake_runner_clean(args, cwd=None, timeout=60):
        args_list = list(args)
        if "merge" in args_list and "--squash" in args_list:
            return cp(returncode=0, stdout="replacement PR merged")
        if "pytest" in args_list:
            return cp(returncode=0, stdout="passed")
        return cp()

    ctx_clean = ExecutorContext(
        runner=fake_runner_clean,
        no_audit=True,
        fixture_main_sha="main-sha-n4",
        smoke_command=smoke_cmd,
    )

    new_pr_decision = evaluate_pr(
        pr_number=199,  # 새 replacement PR 번호
        task_spec=spec_clean,
        pr_head_sha="sha-pr-n4-clean",
        effective_files=list(EXPECTED_FILES),  # clean
        merge_state=CLEAN_MERGE_STATE,
        ci_state=CI_OK,
        gemini_state=GEMINI_OK,
        ctx=ctx_clean,
    )
    assert new_pr_decision.decision == AUTO_MERGE_ALLOWED, (
        f"TC-N4: new clean PR must be AUTO_MERGE_ALLOWED, got {new_pr_decision.decision}"
    )

    final_result = verify_head_lock_then_merge(
        decision=new_pr_decision,
        pr_number=199,
        ctx=ctx_clean,
        fetch_pr_head_at_merge=lambda n: "sha-pr-n4-clean",
        dry_run=False,
    )
    assert final_result.decision == AUTO_MERGE_SUCCESS, (
        f"TC-N4: replacement PR must AUTO_MERGE_SUCCESS, got {final_result.decision}"
    )


# ═══════════════════════════════════════════════════════════════════════════════
# Critical 7종 escalation (7건)
# ═══════════════════════════════════════════════════════════════════════════════

def test_tc_c1_critical_forbidden_path() -> None:
    """TC-C1: Critical #1 forbidden path → evaluate_pr BLOCKED + process_event packet FORBIDDEN_PATH_INTRUSION.

    시나리오:
      - effective_files에 .github/workflows/foo.yml 포함
      - evaluate_pr → BLOCKED_WITH_REASON + critical_code=CRITICAL_FORBIDDEN_PATH
      - process_event로 LEGACY_CRITICAL_MAP 경유 → escalation_type=FORBIDDEN_PATH_INTRUSION
    """
    spec = make_task_spec()
    ctx = _make_ctx()

    files_with_forbidden = list(EXPECTED_FILES) + [".github/workflows/foo.yml"]
    decision = evaluate_pr(
        pr_number=105,
        task_spec=spec,
        pr_head_sha="sha-pr-c1",
        effective_files=files_with_forbidden,
        merge_state=CLEAN_MERGE_STATE,
        ci_state=CI_OK,
        gemini_state=GEMINI_OK,
        ctx=ctx,
    )

    assert decision.decision == BLOCKED_WITH_REASON, (
        f"TC-C1: expected BLOCKED_WITH_REASON, got {decision.decision}"
    )
    assert decision.critical_code == CRITICAL_FORBIDDEN_PATH, (
        f"TC-C1: expected critical_code=CRITICAL_FORBIDDEN_PATH, got {decision.critical_code}"
    )
    assert ".github/workflows/foo.yml" in decision.forbidden_paths, (
        f"TC-C1: forbidden_paths must contain the invasion: {decision.forbidden_paths}"
    )

    # critical_escalation_reporter로 packet 생성
    event_result = process_event(
        {
            "task_id": "task-2514",
            "pr_number": 105,
            "event_type": CRITICAL_FORBIDDEN_PATH,  # "FORBIDDEN_PATH_INVASION"
            "source": "evaluate_pr",
            "evidence": {"forbidden_paths": decision.forbidden_paths},
        },
        no_audit=True,
        dry_run=True,
    )
    assert event_result["classification"] == "critical", (
        f"TC-C1: classification must be 'critical', got {event_result['classification']}"
    )
    assert event_result["escalation_type"] == CriticalEscalationType.FORBIDDEN_PATH_INTRUSION.value, (
        f"TC-C1: escalation_type must be FORBIDDEN_PATH_INTRUSION, got {event_result['escalation_type']}"
    )
    assert event_result["packet"] is not None, "TC-C1: packet must not be None"
    assert isinstance(event_result["packet"], EscalationPacket), (
        f"TC-C1: packet must be EscalationPacket, got {type(event_result['packet'])}"
    )


def test_tc_c2_critical_replacement_pr_auto_creation_failed() -> None:
    """TC-C2: Critical #2 replacement_pr_auto_creation_failed → packet 생성.

    시나리오:
      - ReplacementPRRunner non-dry_run + fetch_pr_metadata 실패 (gh CLI 차단)
      - ReplacementResult.success=False + failure_reason=REPLACEMENT_PR_AUTO_CREATION_FAILED_FOR_CONTAMINATED_DIFF
      - process_event로 LEGACY_CRITICAL_MAP 경유 → packet 생성
    """
    spec = make_task_spec()

    # gh pr view 항상 실패 → fetch_pr_metadata 실패
    failing_runner = make_runner({
        ("gh", "pr", "view"): cp(returncode=1, stderr="API rate limit exceeded"),
    })

    replacement_runner = ReplacementPRRunner(
        runner=failing_runner,
        dry_run=False,
    )
    result = replacement_runner.execute(106, task_spec=spec)

    assert result.success is False, (
        f"TC-C2: expected failure, got success=True"
    )
    assert result.original_pr_preserved is True, (
        f"TC-C2: original PR must be preserved even on failure"
    )
    assert result.failure_reason is not None, "TC-C2: failure_reason must be set"

    # escalation packet 생성 검증
    event_result = process_event(
        {
            "task_id": "task-2514",
            "pr_number": 106,
            "event_type": CRITICAL_DIFF_REPLACEMENT_FAILED,  # "EFFECTIVE_DIFF_CONTAMINATION_REPLACEMENT_FAILED"
            "source": "replacement_pr_runner",
            "evidence": {"failure_reason": result.failure_reason},
        },
        no_audit=True,
        dry_run=True,
    )
    assert event_result["classification"] == "critical", (
        f"TC-C2: classification must be 'critical', got {event_result['classification']}"
    )
    assert event_result["escalation_type"] == CriticalEscalationType.REPLACEMENT_PR_AUTO_CREATION_FAILED_FOR_CONTAMINATED_DIFF.value, (
        f"TC-C2: wrong escalation_type: {event_result['escalation_type']}"
    )
    assert event_result["packet"] is not None, "TC-C2: packet must not be None"


def test_tc_c3_critical_gemini_real_bug_scope_expansion() -> None:
    """TC-C3: Critical #3 gemini_real_bug_requires_scope_expansion → packet 생성.

    시나리오:
      - gemini_state.status='critical_scope_expansion' → evaluate_pr BLOCKED
      - process_event → GEMINI_REAL_BUG_REQUIRES_SCOPE_EXPANSION packet
    """
    spec = make_task_spec()
    ctx = _make_ctx()

    gemini_scope_expansion = {
        "status": "critical_scope_expansion",
        "unresolved": [{"path": "some/external/file.py", "body": "real bug outside expected"}],
        "outside": [{"path": "some/external/file.py", "body": "real bug outside expected"}],
        "hook": "critical_escalation_reporter",
        "critical_code": CRITICAL_GEMINI_SCOPE_EXPANSION,
    }

    decision = evaluate_pr(
        pr_number=107,
        task_spec=spec,
        pr_head_sha="sha-pr-c3",
        effective_files=list(EXPECTED_FILES),
        merge_state=CLEAN_MERGE_STATE,
        ci_state=CI_OK,
        gemini_state=gemini_scope_expansion,
        ctx=ctx,
    )
    assert decision.decision == BLOCKED_WITH_REASON, (
        f"TC-C3: expected BLOCKED_WITH_REASON, got {decision.decision}"
    )
    assert decision.critical_code == CRITICAL_GEMINI_SCOPE_EXPANSION, (
        f"TC-C3: expected CRITICAL_GEMINI_SCOPE_EXPANSION, got {decision.critical_code}"
    )

    # auto_gemini_triage로 scope expansion thread 처리 검증
    scope_threads = [
        {
            "id": "th-scope-01",
            "isOutdated": False,
            "isResolved": False,
            "comments": [{"body": "This function in other/external_module.py has a bug"}],
        }
    ]
    report = triage_pr(
        pr_number=107,
        threads=scope_threads,
        pr_head_sha="sha-pr-c3",
        fix_commits=[],
        expected_files=list(EXPECTED_FILES),
        forbidden_paths=[],
        apply=False,
        task_id="task-2514",
    )
    # scope expansion은 REAL_BUG_SCOPE_EXPANSION 또는 REAL_BUG_IN_SCOPE
    # (외부 파일 언급 시)
    assert report.unresolved_count >= 0, "TC-C3: triage report must have valid counts"

    # process_event → packet 생성
    event_result = process_event(
        {
            "task_id": "task-2514",
            "pr_number": 107,
            "event_type": CRITICAL_GEMINI_SCOPE_EXPANSION,  # "GEMINI_REAL_BUG_SCOPE_EXPANSION"
            "source": "evaluate_pr",
            "evidence": {"gemini_status": "critical_scope_expansion"},
        },
        no_audit=True,
        dry_run=True,
    )
    assert event_result["classification"] == "critical", (
        f"TC-C3: classification must be 'critical', got {event_result['classification']}"
    )
    assert event_result["escalation_type"] == CriticalEscalationType.GEMINI_REAL_BUG_REQUIRES_SCOPE_EXPANSION.value, (
        f"TC-C3: wrong escalation_type: {event_result['escalation_type']}"
    )
    assert event_result["packet"] is not None, "TC-C3: packet must not be None"


def test_tc_c4_critical_block_override_required() -> None:
    """TC-C4: Critical #4 block_override_required → packet 생성.

    시나리오:
      - mergeStateStatus=BLOCKED → evaluate_pr BLOCKED + critical_code=CRITICAL_BLOCK_OVERRIDE
      - process_event → BLOCK_OVERRIDE_REQUIRED_OR_REASON_INSUFFICIENT packet
    """
    spec = make_task_spec()
    ctx = _make_ctx()

    blocked_merge_state = {
        "mergeStateStatus": "BLOCKED",
        "headRefOid": "sha-pr-c4",
        "baseRefName": "main",
    }

    decision = evaluate_pr(
        pr_number=108,
        task_spec=spec,
        pr_head_sha="sha-pr-c4",
        effective_files=list(EXPECTED_FILES),
        merge_state=blocked_merge_state,
        ci_state=CI_OK,
        gemini_state=GEMINI_OK,
        ctx=ctx,
    )
    assert decision.decision == BLOCKED_WITH_REASON, (
        f"TC-C4: expected BLOCKED_WITH_REASON, got {decision.decision}"
    )
    assert decision.critical_code == CRITICAL_BLOCK_OVERRIDE, (
        f"TC-C4: expected CRITICAL_BLOCK_OVERRIDE, got {decision.critical_code}"
    )

    event_result = process_event(
        {
            "task_id": "task-2514",
            "pr_number": 108,
            "event_type": CRITICAL_BLOCK_OVERRIDE,  # "BLOCK_OVERRIDE_REQUIRED_OR_INSUFFICIENT_REASON"
            "source": "evaluate_pr",
            "evidence": {"merge_state_status": "BLOCKED"},
        },
        no_audit=True,
        dry_run=True,
    )
    assert event_result["classification"] == "critical", (
        f"TC-C4: classification must be 'critical', got {event_result['classification']}"
    )
    assert event_result["escalation_type"] == CriticalEscalationType.BLOCK_OVERRIDE_REQUIRED_OR_REASON_INSUFFICIENT.value, (
        f"TC-C4: wrong escalation_type: {event_result['escalation_type']}"
    )
    assert event_result["packet"] is not None, "TC-C4: packet must not be None"


def test_tc_c5_critical_dependency_cycle() -> None:
    """TC-C5: Critical #5 dependency_cycle_or_serial_only_collision → packet 생성.

    시나리오:
      - parallel_policy='INVALID_POLICY' → evaluate_pr BLOCKED + CRITICAL_DEPENDENCY_CYCLE
      - process_event → DEPENDENCY_CYCLE_OR_SERIAL_ONLY_COLLISION packet
    """
    spec = make_task_spec(parallel_policy="INVALID_POLICY_FOR_TEST")
    ctx = _make_ctx()

    decision = evaluate_pr(
        pr_number=109,
        task_spec=spec,
        pr_head_sha="sha-pr-c5",
        effective_files=list(EXPECTED_FILES),
        merge_state=CLEAN_MERGE_STATE,
        ci_state=CI_OK,
        gemini_state=GEMINI_OK,
        ctx=ctx,
    )
    assert decision.decision == BLOCKED_WITH_REASON, (
        f"TC-C5: expected BLOCKED_WITH_REASON, got {decision.decision}"
    )
    assert decision.critical_code == CRITICAL_DEPENDENCY_CYCLE, (
        f"TC-C5: expected CRITICAL_DEPENDENCY_CYCLE, got {decision.critical_code}"
    )

    event_result = process_event(
        {
            "task_id": "task-2514",
            "pr_number": 109,
            "event_type": CRITICAL_DEPENDENCY_CYCLE,  # "DEPENDENCY_CYCLE_OR_SERIAL_ONLY_CONFLICT"
            "source": "evaluate_pr",
            "evidence": {"parallel_policy": "INVALID_POLICY_FOR_TEST"},
        },
        no_audit=True,
        dry_run=True,
    )
    assert event_result["classification"] == "critical", (
        f"TC-C5: classification must be 'critical', got {event_result['classification']}"
    )
    assert event_result["escalation_type"] == CriticalEscalationType.DEPENDENCY_CYCLE_OR_SERIAL_ONLY_COLLISION.value, (
        f"TC-C5: wrong escalation_type: {event_result['escalation_type']}"
    )
    assert event_result["packet"] is not None, "TC-C5: packet must not be None"


def test_tc_c6_critical_replacement_pr_failed() -> None:
    """TC-C6: Critical #6 replacement_pr_failed → packet 생성.

    시나리오:
      - ReplacementPRRunner non-dry_run, fetch 성공, 오염 감지, branch 생성 실패
      - ReplacementResult.success=False, failure_reason contains REPLACEMENT_PR
      - process_event → REPLACEMENT_PR_FAILED packet
    """
    spec = make_task_spec()

    def always_fail_runner(args, cwd=None, timeout=60):
        args_list = list(args)
        # fetch 성공
        if "fetch" in args_list:
            return cp(returncode=0)
        # pr view: contaminated files
        if "pr" in args_list and "view" in args_list and "--json" in args_list:
            import json
            payload = {
                "headRefName": "task/task-2514-c6",
                "headRefOid": "sha-c6-001",
                "baseRefName": "main",
                "files": [
                    {"path": "utils/merge_queue_executor.py"},
                    {"path": "tests/regression/test_orchestration_runtime_2514.py"},
                    {"path": "utils/rogue_c6.py"},  # 오염 파일
                ],
                "title": "[task-2514] test C6",
            }
            return cp(returncode=0, stdout=json.dumps(payload))
        # git status --porcelain (clean working tree)
        if "status" in args_list and "--porcelain" in args_list:
            return cp(returncode=0, stdout="")
        # checkout -b branch → 실패 (branch 생성 실패 시뮬레이션)
        if "checkout" in args_list and "-b" in args_list:
            return cp(returncode=1, stderr="fatal: cannot create branch")
        return cp()

    replacement_runner = ReplacementPRRunner(
        runner=always_fail_runner,
        dry_run=False,
    )
    result = replacement_runner.execute(110, task_spec=spec)

    assert result.success is False, (
        f"TC-C6: expected failure, got success=True"
    )
    assert result.original_pr_preserved is True, (
        f"TC-C6: original PR must always be preserved"
    )

    event_result = process_event(
        {
            "task_id": "task-2514",
            "pr_number": 110,
            "event_type": CRITICAL_REPLACEMENT_FAILED,  # "REPLACEMENT_PR_ALSO_FAILED"
            "source": "replacement_pr_runner",
            "evidence": {"failure_reason": result.failure_reason},
        },
        no_audit=True,
        dry_run=True,
    )
    assert event_result["classification"] == "critical", (
        f"TC-C6: classification must be 'critical', got {event_result['classification']}"
    )
    assert event_result["escalation_type"] == CriticalEscalationType.REPLACEMENT_PR_FAILED.value, (
        f"TC-C6: wrong escalation_type: {event_result['escalation_type']}"
    )
    assert event_result["packet"] is not None, "TC-C6: packet must not be None"


def test_tc_c7_critical_post_merge_smoke_failed() -> None:
    """TC-C7: Critical #7 post_merge_smoke_failed → packet 생성.

    시나리오:
      - squash merge 성공 → smoke FAIL (returncode=1)
      - verify_head_lock_then_merge → BLOCKED_WITH_REASON + CRITICAL_POST_MERGE_SMOKE
      - process_event → POST_MERGE_SMOKE_FAILED packet
    """
    smoke_cmd = ["pytest", "tests/regression", "-q"]
    spec = make_task_spec(smoke_command=smoke_cmd)

    def fake_runner_smoke_fail(args, cwd=None, timeout=60):
        args_list = list(args)
        if "merge" in args_list and "--squash" in args_list:
            return cp(returncode=0, stdout="PR merged")
        if "pytest" in args_list:
            return cp(returncode=1, stderr="SMOKE FAILED: 3 tests failed")
        return cp()

    ctx = ExecutorContext(
        runner=fake_runner_smoke_fail,
        no_audit=True,
        fixture_main_sha="main-sha-c7",
        smoke_command=smoke_cmd,
    )

    decision = evaluate_pr(
        pr_number=111,
        task_spec=spec,
        pr_head_sha="sha-pr-c7",
        effective_files=list(EXPECTED_FILES),
        merge_state=CLEAN_MERGE_STATE,
        ci_state=CI_OK,
        gemini_state=GEMINI_OK,
        ctx=ctx,
    )
    assert decision.decision == AUTO_MERGE_ALLOWED, (
        f"TC-C7: evaluate_pr must return AUTO_MERGE_ALLOWED, got {decision.decision}"
    )

    result = verify_head_lock_then_merge(
        decision=decision,
        pr_number=111,
        ctx=ctx,
        fetch_pr_head_at_merge=lambda n: "sha-pr-c7",
        dry_run=False,
    )
    assert result.decision == BLOCKED_WITH_REASON, (
        f"TC-C7: expected BLOCKED_WITH_REASON (smoke fail), got {result.decision}"
    )
    assert result.critical_code == CRITICAL_POST_MERGE_SMOKE, (
        f"TC-C7: expected CRITICAL_POST_MERGE_SMOKE, got {result.critical_code}"
    )
    assert result.smoke_status == "FAIL", (
        f"TC-C7: expected smoke_status=FAIL, got {result.smoke_status}"
    )

    event_result = process_event(
        {
            "task_id": "task-2514",
            "pr_number": 111,
            "event_type": CRITICAL_POST_MERGE_SMOKE,  # "POST_MERGE_SMOKE_FAILURE"
            "source": "verify_head_lock_then_merge",
            "evidence": {"smoke_status": "FAIL", "critical_code": CRITICAL_POST_MERGE_SMOKE},
        },
        no_audit=True,
        dry_run=True,
    )
    assert event_result["classification"] == "critical", (
        f"TC-C7: classification must be 'critical', got {event_result['classification']}"
    )
    assert event_result["escalation_type"] == CriticalEscalationType.POST_MERGE_SMOKE_FAILED.value, (
        f"TC-C7: wrong escalation_type: {event_result['escalation_type']}"
    )
    assert event_result["packet"] is not None, "TC-C7: packet must not be None"


# ═══════════════════════════════════════════════════════════════════════════════
# 자동 처리 3건
# ═══════════════════════════════════════════════════════════════════════════════

def test_tc_a1_false_positive_gemini_suppression() -> None:
    """TC-A1: false-positive Gemini suppression → auto_gemini_triage dismiss + review_gate_passed → 회장 보고 0건.

    시나리오:
      - review thread body에 false-positive 패턴 ("wrapper pattern", "이미 처리") 포함
      - triage_pr → FALSE_POSITIVE → auto_resolved=True
      - review_gate_status.review_gate_passed=True
      - escalations 0건 (회장 보고 없음)
    """
    fp_threads = [
        {
            "id": "th-fp-01",
            "isOutdated": False,
            "isResolved": False,
            "comments": [{"body": "This is just a wrapper pattern and is already handled correctly"}],
        },
        {
            "id": "th-fp-02",
            "isOutdated": False,
            "isResolved": False,
            "comments": [{"body": "이미 처리된 케이스입니다. 정상 동작입니다."}],
        },
    ]
    report = triage_pr(
        pr_number=112,
        threads=fp_threads,
        pr_head_sha="sha-pr-a1",
        fix_commits=[],
        expected_files=list(EXPECTED_FILES),
        forbidden_paths=[],
        apply=False,
        task_id="task-2514",
    )

    assert report.auto_resolved_count == 2, (
        f"TC-A1: 2 false-positive threads must all auto_resolve, got {report.auto_resolved_count}"
    )
    assert report.blocking_thread_count == 0, (
        f"TC-A1: no blocking threads expected, got {report.blocking_thread_count}"
    )
    assert report.review_gate_status.review_gate_passed is True, (
        f"TC-A1: review_gate_passed must be True after false-positive suppression"
    )
    # 모든 verdict가 FALSE_POSITIVE
    for outcome in report.threads:
        assert outcome.verdict == TriageVerdict.FALSE_POSITIVE, (
            f"TC-A1: all threads must be FALSE_POSITIVE, got {outcome.verdict} for {outcome.thread_id}"
        )
        assert outcome.escalation_type is None, (
            f"TC-A1: FALSE_POSITIVE must have no escalation_type, got {outcome.escalation_type}"
        )

    # 회장 보고 0건 검증: escalation_type이 없음 → process_event 호출 불필요
    # (triage summary에서 scope_expansion_count=0 확인)
    assert report.triage_summary.scope_expansion_count == 0, (
        f"TC-A1: scope_expansion_count must be 0 (회장 보고 0건), got {report.triage_summary.scope_expansion_count}"
    )
    assert report.triage_summary.false_positive_count == 2, (
        f"TC-A1: false_positive_count must be 2, got {report.triage_summary.false_positive_count}"
    )


def test_tc_a2_style_only_gemini_suppression() -> None:
    """TC-A2: style-only Gemini suppression → auto_gemini_triage dismiss + review_gate_passed → 회장 보고 0건.

    시나리오:
      - review thread body에 style-only 패턴 ("naming", "code style", "formatting") 포함 (bug 단어 없음)
      - triage_pr → STYLE_ONLY → auto_resolved=True
      - review_gate_status.review_gate_passed=True
      - 회장 보고 0건
    """
    style_threads = [
        {
            "id": "th-style-01",
            "isOutdated": False,
            "isResolved": False,
            "comments": [{"body": "The naming convention here is inconsistent. Consider renaming to follow the convention."}],
        },
        {
            "id": "th-style-02",
            "isOutdated": False,
            "isResolved": False,
            "comments": [{"body": "code style nit: use f-string instead of .format() for readability"}],
        },
        {
            "id": "th-style-03",
            "isOutdated": False,
            "isResolved": False,
            "comments": [{"body": "Minor formatting issue: consistency with the rest of the codebase"}],
        },
    ]
    report = triage_pr(
        pr_number=113,
        threads=style_threads,
        pr_head_sha="sha-pr-a2",
        fix_commits=[],
        expected_files=list(EXPECTED_FILES),
        forbidden_paths=[],
        apply=False,
        task_id="task-2514",
    )

    assert report.auto_resolved_count == 3, (
        f"TC-A2: 3 style-only threads must all auto_resolve, got {report.auto_resolved_count}"
    )
    assert report.blocking_thread_count == 0, (
        f"TC-A2: no blocking threads expected, got {report.blocking_thread_count}"
    )
    assert report.review_gate_status.review_gate_passed is True, (
        f"TC-A2: review_gate_passed must be True after style-only suppression"
    )
    for outcome in report.threads:
        assert outcome.verdict == TriageVerdict.STYLE_ONLY, (
            f"TC-A2: all threads must be STYLE_ONLY, got {outcome.verdict} for {outcome.thread_id}"
        )
        assert outcome.escalation_type is None, (
            f"TC-A2: STYLE_ONLY must have no escalation_type"
        )

    # 회장 보고 0건: scope_expansion=0
    assert report.triage_summary.scope_expansion_count == 0, (
        f"TC-A2: scope_expansion_count must be 0 (회장 보고 0건), got {report.triage_summary.scope_expansion_count}"
    )
    assert report.triage_summary.style_only_count == 3, (
        f"TC-A2: style_only_count must be 3, got {report.triage_summary.style_only_count}"
    )


def test_tc_a3_dependency_satisfied_auto_merge_allowed() -> None:
    """TC-A3: dependency satisfied 자동 판정 → merge_topology_gate ALLOW → 자동 진행.

    시나리오:
      - dependency=["task-2509.merged"] (선행 PR merged)
      - main_log_grep이 task-2509 존재 → True 반환
      - check_predecessor_merged → (True, [])
      - evaluate_pr → WAITING_FOR_PREDECESSOR가 아닌 AUTO_MERGE_ALLOWED
      - merge_topology_gate도 병렬 충돌 없음 (serial_only, no conflict)
    """
    from utils.merge_queue_executor import check_predecessor_merged

    spec = make_task_spec(
        dependency=["task-2509.merged"],
        parallel_policy="serial_only",
    )

    # main_log_grep: task-2509 → merged (True)
    def _merged(task_id: str) -> bool:
        return task_id == "task-2509"

    # check_predecessor_merged 직접 검증
    ok, pending = check_predecessor_merged(
        dependency=spec.dependency,
        runner=make_runner(),
        main_log_grep=_merged,
    )
    assert ok is True, f"TC-A3: predecessor must be merged, pending={pending}"
    assert pending == [], f"TC-A3: no pending predecessors expected, got {pending}"

    # evaluate_pr → AUTO_MERGE_ALLOWED (dependency 만족)
    ctx = ExecutorContext(
        runner=make_runner(),
        no_audit=True,
        fixture_main_sha="main-sha-a3",
        main_log_grep=_merged,
        smoke_command=None,
    )
    # smoke_command 없으므로 dry_run=True로 verify
    smoke_cmd = ["pytest", "-q"]
    spec_with_smoke = make_task_spec(
        dependency=["task-2509.merged"],
        parallel_policy="serial_only",
        smoke_command=smoke_cmd,
    )
    ctx_smoke = ExecutorContext(
        runner=make_runner({
            ("pytest",): cp(returncode=0, stdout="passed"),
            ("gh", "pr", "merge", "114", "--squash"): cp(returncode=0, stdout="merged"),
        }),
        no_audit=True,
        fixture_main_sha="main-sha-a3",
        main_log_grep=_merged,
        smoke_command=smoke_cmd,
    )

    decision = evaluate_pr(
        pr_number=114,
        task_spec=spec_with_smoke,
        pr_head_sha="sha-pr-a3",
        effective_files=list(EXPECTED_FILES),
        merge_state=CLEAN_MERGE_STATE,
        ci_state=CI_OK,
        gemini_state=GEMINI_OK,
        ctx=ctx_smoke,
    )
    assert decision.decision == AUTO_MERGE_ALLOWED, (
        f"TC-A3: expected AUTO_MERGE_ALLOWED (dependency satisfied), got {decision.decision} / reason={decision.reason}"
    )

    # merge_topology_gate: serial_only, no conflict → ALLOW
    # (dependency=[task-2509.merged] + main_log_grep=True → 선행 merge 확인)
    from utils.merge_queue_executor import check_predecessor_merged
    ok2, pending2 = check_predecessor_merged(
        dependency=["task-2509.merged"],
        runner=make_runner(),
        main_log_grep=_merged,
    )
    assert ok2 is True, f"TC-A3: merge_topology_gate must ALLOW, pending={pending2}"

    # 자동 진행 검증: verify_head_lock_then_merge (dry_run=True → AUTO_MERGE_ALLOWED 유지)
    result = verify_head_lock_then_merge(
        decision=decision,
        pr_number=114,
        ctx=ctx_smoke,
        fetch_pr_head_at_merge=lambda n: "sha-pr-a3",
        dry_run=True,
    )
    assert result.decision == AUTO_MERGE_ALLOWED, (
        f"TC-A3: dry_run=True must keep AUTO_MERGE_ALLOWED, got {result.decision}"
    )


# ═══════════════════════════════════════════════════════════════════════════════
# 보조 검증: LEGACY_CRITICAL_MAP 7종 완전성
# ═══════════════════════════════════════════════════════════════════════════════

def test_legacy_critical_map_covers_all_7_types() -> None:
    """LEGACY_CRITICAL_MAP이 merge_queue_executor의 CRITICAL 7종을 모두 커버하는지 확인."""
    from utils.merge_queue_executor import (
        CRITICAL_FORBIDDEN_PATH,
        CRITICAL_DIFF_REPLACEMENT_FAILED,
        CRITICAL_GEMINI_SCOPE_EXPANSION,
        CRITICAL_BLOCK_OVERRIDE,
        CRITICAL_DEPENDENCY_CYCLE,
        CRITICAL_REPLACEMENT_FAILED,
        CRITICAL_POST_MERGE_SMOKE,
    )
    all_critical_codes = [
        CRITICAL_FORBIDDEN_PATH,
        CRITICAL_DIFF_REPLACEMENT_FAILED,
        CRITICAL_GEMINI_SCOPE_EXPANSION,
        CRITICAL_BLOCK_OVERRIDE,
        CRITICAL_DEPENDENCY_CYCLE,
        CRITICAL_REPLACEMENT_FAILED,
        CRITICAL_POST_MERGE_SMOKE,
    ]
    for code in all_critical_codes:
        assert code in LEGACY_CRITICAL_MAP, (
            f"LEGACY_CRITICAL_MAP must contain '{code}': {list(LEGACY_CRITICAL_MAP.keys())}"
        )
        mapped = LEGACY_CRITICAL_MAP[code]
        assert isinstance(mapped, CriticalEscalationType), (
            f"LEGACY_CRITICAL_MAP['{code}'] must be CriticalEscalationType, got {type(mapped)}"
        )
