"""task-2515 e2e auto-merge replay harness — 9 PR × 12 verifications.
회장 명시: 본 task = end-to-end 검증 + replay harness (신규 기능 X).
"""
from __future__ import annotations

import argparse
import json
import subprocess
import sys
from pathlib import Path
import pytest

WORKSPACE = Path(__file__).resolve().parent.parent.parent
if str(WORKSPACE) in sys.path:
    sys.path.remove(str(WORKSPACE))
sys.path.insert(0, str(WORKSPACE))

from utils.merge_queue_executor import (  # noqa: E402  # pyright: ignore[reportMissingImports]
    AUTO_MERGE_ALLOWED, AUTO_MERGE_SUCCESS, BLOCKED_WITH_REASON,
    DIFF_CONTAMINATION_REPLACEMENT, GEMINI_UNRESOLVED_BLOCK,
    CRITICAL_FORBIDDEN_PATH, CRITICAL_DIFF_REPLACEMENT_FAILED,
    CRITICAL_GEMINI_SCOPE_EXPANSION, CRITICAL_BLOCK_OVERRIDE,
    CRITICAL_DEPENDENCY_CYCLE, CRITICAL_REPLACEMENT_FAILED,
    CRITICAL_POST_MERGE_SMOKE,
    ExecutorContext, TaskSpec,
    evaluate_pr, verify_head_lock_then_merge,
    compare_effective_diff, detect_forbidden_paths,
    check_predecessor_merged, recheck_following_prs,
)
from utils.automation_contracts import ReplacementResult  # noqa: E402  # pyright: ignore[reportMissingImports]
from utils.auto_gemini_triage import (  # noqa: E402  # pyright: ignore[reportMissingImports]
    triage_pr,
)
from utils.post_merge_smoke_runner import (  # noqa: E402  # pyright: ignore[reportMissingImports]
    PostMergeSmokeRun, SmokeStatus,
)
from utils.automation_contracts import (  # noqa: E402  # pyright: ignore[reportMissingImports]
    SmokeResult,
)
from utils.critical_escalation_reporter import process_event  # noqa: E402  # pyright: ignore[reportMissingImports]
from utils.replacement_pr_runner import ReplacementPRRunner  # noqa: E402  # pyright: ignore[reportMissingImports]

FIXTURE_PATH = Path(__file__).parent / "fixtures" / "auto_merge_replay_2515.json"


def _load_fixtures() -> dict:
    return json.loads(FIXTURE_PATH.read_text(encoding="utf-8"))


def _cp(returncode=0, stdout="", stderr=""):
    return subprocess.CompletedProcess(args=[], returncode=returncode, stdout=stdout, stderr=stderr)


# Forbidden git flag 호출 차단 — fake_runner 안에 assertion
FORBIDDEN_TOKENS = {"--admin", "--force", "-f", "--force-with-lease", "cherry-pick", "--no-verify"}


def _make_fake_runner(scenario):
    """시나리오 fixture로부터 fake subprocess runner 합성. forbidden flag 호출 차단."""
    calls = []

    def runner(args, cwd=None, timeout=60):
        calls.append({"args": list(args), "cwd": cwd, "timeout": timeout})
        # forbidden 자동 차단 — `--flag=value` 형태도 검출
        for tok in FORBIDDEN_TOKENS:
            assert not any(
                arg == tok or arg.split("=", 1)[0] == tok
                for arg in args
            ), f"FORBIDDEN_TOKEN detected: {tok} in {args}"
        # squash merge 응답
        if "merge" in args and "--squash" in args:
            return _cp(returncode=scenario.get("merge_runner_returncode", 0), stdout="merged")
        # smoke 응답 (smoke_command 첫 토큰 매칭)
        sc = scenario.get("smoke_command") or []
        if sc and all(t in args for t in sc[:1]):
            return _cp(returncode=scenario.get("smoke_runner_returncode", 0), stdout="smoke output")
        # following PR view
        for fp in scenario.get("following_queue") or []:
            pn = str(fp["pr_number"])
            if pn in args and "--json" in args:
                payload = {
                    "mergeStateStatus": fp.get("mergeStateStatus", "CLEAN"),
                    "headRefOid": fp.get("headRefOid", ""),
                    "baseRefName": "main",
                    "files": [{"path": p} for p in fp.get("effective_files", [])],
                }
                return _cp(returncode=0, stdout=json.dumps(payload))
        return _cp()

    runner.calls = calls  # type: ignore[attr-defined]
    return runner


def _make_task_spec(s):
    return TaskSpec(
        task_id=s["task_id"],
        expected_files=list(s["expected_files"]),
        risk_area="auto_merge_e2e_replay",
        dependency=list(s.get("dependency", [])),
        parallel_policy=s.get("parallel_policy", "serial_only"),
        merge_queue_position=s.get("merge_queue_position", 1),
        stale_recheck_required=s.get("stale_recheck_required", True),
        cherry_pick_allowed=s.get("cherry_pick_allowed", False),
        smoke_command=s.get("smoke_command"),
    )


def _make_replacement_runner(s, runner):
    """시나리오 expected_replacement_used에 맞춰 ReplacementPRRunner 또는 fake runner 반환.
    PR57_contamination_replacement → ReplacementPRRunner(dry_run=True) (success=True 합성).
    critical_diff_contamination_replacement_failed → fake runner returning success=False.
    그 외 → None.
    """
    sid = s["id"]
    if sid == "PR57_contamination_replacement":
        return ReplacementPRRunner(runner=runner, dry_run=True)
    if sid == "critical_diff_contamination_replacement_failed":
        class _FailRunner:
            def execute(self, pr_number, task_spec=None):
                return ReplacementResult(
                    source_pr=pr_number, replacement_pr=None,
                    original_pr_preserved=True,
                    expected_files=list(task_spec.expected_files) if task_spec else [],
                    effective_diff_files=list(s["effective_files"]),
                    forbidden_paths=[],
                    success=False,
                    failure_reason="REPLACEMENT_PR_AUTO_CREATION_FAILED_FOR_CONTAMINATED_DIFF",
                )
        return _FailRunner()
    return None


def _make_smoke_envelope_fn(scenario):
    """task-2514 W1 circular import으로 run_pm_smoke_v2 가 None이라 직접 envelope 합성 fn 주입.

    fixture 의 smoke_runner_returncode 에 따라 PASS/FAIL envelope 반환.
    """
    rc = scenario.get("smoke_runner_returncode", 0)
    cmd = scenario.get("smoke_command") or []
    cmd_str = " ".join(cmd) if cmd else ""

    def _envelope_fn(*, task_file, merge_commit, dry_run, runner, pr_number, skip_stale_check, **_kw):
        if rc == 0:
            sr = SmokeResult(
                command=cmd_str, passed=True, exit_code=0,
                stdout_tail="", stderr_tail="", failure_reason=None,
            )
            return PostMergeSmokeRun(
                merge_commit=merge_commit or "fixture-merge-sha",
                task_id=scenario["task_id"],
                status=SmokeStatus.PASS,
                smoke_result=sr,
                duration_ms=100,
                smoke_command=list(cmd),
                allow_continuation=True,
                escalation=None,
                stale=False,
                dry_run=dry_run,
            )
        sr = SmokeResult(
            command=cmd_str, passed=False, exit_code=rc,
            stdout_tail="", stderr_tail="smoke fail simulated",
            failure_reason=f"EXIT_{rc}",
        )
        return PostMergeSmokeRun(
            merge_commit=merge_commit or "fixture-merge-sha",
            task_id=scenario["task_id"],
            status=SmokeStatus.FAIL,
            smoke_result=sr,
            duration_ms=100,
            smoke_command=list(cmd),
            allow_continuation=False,
            escalation=None,
            stale=False,
            dry_run=dry_run,
        )
    return _envelope_fn


def _make_triage_spy(scenario):
    """ctx.triage_fn 으로 주입할 spy. evaluate_pr W3 호출 여부를 외부에서 관찰 가능.

    실제 triage_pr 을 위임 호출. wiring 활성화 검증용.
    """
    observed = []

    def spy(pr_number, threads, pr_head_sha, fix_commits, expected_files,
            forbidden_paths, apply, task_id):
        observed.append({
            "pr_number": pr_number,
            "thread_count": len(threads),
            "task_id": task_id,
        })
        return triage_pr(
            pr_number, threads, pr_head_sha, fix_commits,
            expected_files, forbidden_paths, apply, task_id,
        )
    spy.observed = observed  # type: ignore[attr-defined]
    return spy


def _make_reporter_hook_spy(scenario):
    """ctx.reporter_hook 으로 주입할 spy. evaluate_pr emit_critical_escalation 활성 검증.

    실제 process_event 호출 후 decision.escalations 에 결과를 직접 push.
    W1 circular import 으로 default report_critical_event=None 이지만, hook 명시 주입 시 작동.
    """
    observed = []

    def spy(code, decision):
        evidence = {
            "decision": decision.decision,
            "reason": decision.reason,
            "critical_code": code,
            "effective_files": list(scenario["effective_files"]),
        }
        result = process_event(
            {
                "task_id": scenario["task_id"],
                "pr_number": scenario["pr_number"],
                "event_type": code,
                "source": "e2e_reporter_hook_spy",
                "evidence": evidence,
            },
            no_audit=True,
            dry_run=True,
        )
        observed.append({"code": code, "result": result})
        if not hasattr(decision, "escalations") or decision.escalations is None:
            decision.escalations = []
        decision.escalations.append(result)
    spy.observed = observed  # type: ignore[attr-defined]
    return spy


def _make_executor_context(s, runner):
    triage_spy = _make_triage_spy(s) if s.get("triage_threads") else None
    reporter_spy = _make_reporter_hook_spy(s) if s.get("expected_classification") == "critical" else None
    smoke_env_fn = _make_smoke_envelope_fn(s) if s.get("smoke_command") else None
    ctx = ExecutorContext(
        runner=runner,
        smoke_command=s.get("smoke_command"),
        no_audit=True,
        fixture_main_sha=s.get("fixture_main_sha", "fixture-main-sha"),
        replacement_runner=_make_replacement_runner(s, runner),
        triage_fn=triage_spy,
        triage_threads=s.get("triage_threads") or None,
        triage_fix_commits=s.get("fix_commits") or None,
        triage_pr_head_sha=s.get("pr_head_sha"),
        main_log_grep=(lambda _t: True) if s.get("main_log_grep_returns_true") else None,
        reporter_hook=reporter_spy,
        smoke_envelope_fn=smoke_env_fn,
    )
    # task_file은 W4 envelope 경로 활성화에 필수 (실재 path 필요 X — 존재만 확인됨)
    if smoke_env_fn is not None:
        ctx.task_file = Path(__file__)  # type: ignore[attr-defined]
    if s.get("following_queue"):
        ctx.following_queue = list(s["following_queue"])  # type: ignore[attr-defined]
    # spy 핸들 보존 (test 에서 관찰)
    ctx.triage_spy = triage_spy  # type: ignore[attr-defined]
    ctx.reporter_spy = reporter_spy  # type: ignore[attr-defined]
    return ctx


def _resolve_expected_decision(name):
    return {
        "AUTO_MERGE_ALLOWED": AUTO_MERGE_ALLOWED,
        "AUTO_MERGE_SUCCESS": AUTO_MERGE_SUCCESS,
        "BLOCKED_WITH_REASON": BLOCKED_WITH_REASON,
        "DIFF_CONTAMINATION_REPLACEMENT": DIFF_CONTAMINATION_REPLACEMENT,
        "GEMINI_UNRESOLVED_BLOCK": GEMINI_UNRESOLVED_BLOCK,
    }[name]


def _resolve_critical_code(name):
    if name is None:
        return None
    return {
        "CRITICAL_FORBIDDEN_PATH": CRITICAL_FORBIDDEN_PATH,
        "CRITICAL_DIFF_REPLACEMENT_FAILED": CRITICAL_DIFF_REPLACEMENT_FAILED,
        "CRITICAL_GEMINI_SCOPE_EXPANSION": CRITICAL_GEMINI_SCOPE_EXPANSION,
        "CRITICAL_BLOCK_OVERRIDE": CRITICAL_BLOCK_OVERRIDE,
        "CRITICAL_DEPENDENCY_CYCLE": CRITICAL_DEPENDENCY_CYCLE,
        "CRITICAL_REPLACEMENT_FAILED": CRITICAL_REPLACEMENT_FAILED,
        "CRITICAL_POST_MERGE_SMOKE": CRITICAL_POST_MERGE_SMOKE,
    }[name]


def _run_pipeline(scenario):
    """evaluate_pr → (필요 시) verify_head_lock_then_merge 호출 → final QueueDecision 반환."""
    runner = _make_fake_runner(scenario)
    spec = _make_task_spec(scenario)
    ctx = _make_executor_context(scenario, runner)
    decision = evaluate_pr(
        pr_number=scenario["pr_number"],
        task_spec=spec,
        pr_head_sha=scenario["pr_head_sha"],
        effective_files=list(scenario["effective_files"]),
        merge_state=scenario["merge_state"],
        ci_state=scenario["ci_state"],
        gemini_state=scenario["gemini_state"],
        ctx=ctx,
    )
    if scenario.get("verify_merge") and decision.decision == AUTO_MERGE_ALLOWED:
        decision = verify_head_lock_then_merge(
            decision=decision,
            pr_number=scenario["pr_number"],
            ctx=ctx,
            fetch_pr_head_at_merge=lambda _n: scenario["pr_head_sha"],
            dry_run=False,
        )
    return decision, ctx, runner, spec


def _verify_12_items(scenario, decision, ctx, runner, _spec):
    """12 항목을 순서대로 검증. 모든 assertion이 PASS여야 함.

    ctx 는 spy(triage_spy/reporter_spy) 관찰 + smoke_envelope_fn 활성화 검증 시 사용.
    _spec 은 호출 측 signature 호환 (직접 참조 X).
    """
    sid = scenario["id"]

    # §1 queue head — check_predecessor_merged
    main_grep = (lambda _t: True) if scenario.get("main_log_grep_returns_true") else None
    head_ok, pending = check_predecessor_merged(
        scenario.get("dependency") or [], runner, main_log_grep=main_grep,
    )
    assert head_ok is True and pending == [], f"{sid} §1 queue head failed: pending={pending}"

    # §2 effective diff == expected
    diff_ok, extra, missing = compare_effective_diff(
        scenario["effective_files"], scenario["expected_files"],
    )
    assert diff_ok == bool(scenario["expected_diff_equal"]), (
        f"{sid} §2 diff mismatch: diff_ok={diff_ok} extra={extra} missing={missing} expected_equal={scenario['expected_diff_equal']}"
    )

    # §3 forbidden path
    forbidden = detect_forbidden_paths(
        scenario["effective_files"], scenario["expected_files"],
    )
    assert len(forbidden) == int(scenario["expected_forbidden_count"]), (
        f"{sid} §3 forbidden mismatch: got={forbidden} expected_count={scenario['expected_forbidden_count']}"
    )

    # §4 replacement runner branch
    assert decision.replacement_used == bool(scenario["expected_replacement_used"]), (
        f"{sid} §4 replacement_used mismatch: got={decision.replacement_used} expected={scenario['expected_replacement_used']}"
    )

    # §5 triage branch — spy 로 wiring 활성화 검증 (W1 circular import 으로 인한 fallthrough 와 별개)
    triage_spy = getattr(ctx, "triage_spy", None)
    if scenario.get("triage_threads"):
        # triage_threads 가 있으면 ctx.triage_fn(spy) 가 evaluate_pr 의 W3 분기에서 호출되어야 함
        assert triage_spy is not None, f"{sid} §5 triage_spy must be injected for triage scenarios"
        assert len(triage_spy.observed) == 1, (
            f"{sid} §5 triage_fn must be invoked exactly once, got {len(triage_spy.observed)}"
        )
        assert triage_spy.observed[0]["pr_number"] == scenario["pr_number"], (
            f"{sid} §5 triage_fn pr_number mismatch: {triage_spy.observed}"
        )
    else:
        # triage_threads 가 없으면 W3 분기 미진입 (triage_fn 미주입)
        assert triage_spy is None, (
            f"{sid} §5 no triage scenario but spy present"
        )

    # §6 review thread auto-resolve verdicts (직접 triage_pr 호출 비교)
    expected_verdicts = scenario.get("expected_triage_verdicts") or []
    if expected_verdicts:
        report = triage_pr(
            pr_number=scenario["pr_number"],
            threads=scenario["triage_threads"] or [],
            pr_head_sha=scenario["pr_head_sha"],
            fix_commits=scenario.get("fix_commits") or [],
            expected_files=list(scenario["expected_files"]),
            forbidden_paths=[],
            apply=False,
            task_id=scenario["task_id"],
        )
        actual = [o.verdict.value for o in report.threads]
        assert actual == expected_verdicts, (
            f"{sid} §6 verdict mismatch: actual={actual} expected={expected_verdicts}"
        )
    # else: triage_threads 없으면 §6 verdict 검증 불필요 (§5에서 spy 미주입 박제로 충족)

    # §7 review_gate_passed — fallback_review (Gemini quota) 도 함께 검증
    expected_rgp = scenario.get("expected_review_gate_passed")
    expected_fb_used = scenario.get("fallback_review_used_expected", False)
    expected_fb_passed = scenario.get("fallback_review_passed_expected", False)
    if expected_rgp is not None:
        actual_rgp = decision.review_gate_passed or decision.fallback_review_passed
        assert actual_rgp == bool(expected_rgp), (
            f"{sid} §7 review_gate_passed mismatch: got={decision.review_gate_passed} fallback={decision.fallback_review_passed} expected={expected_rgp}"
        )
        # fallback review 활성화 여부도 박제 (S3 PR58_gemini_quota_fallback)
        assert decision.fallback_review_used == bool(expected_fb_used), (
            f"{sid} §7 fallback_review_used mismatch: got={decision.fallback_review_used} expected={expected_fb_used}"
        )
        if expected_fb_used:
            assert decision.fallback_review_passed == bool(expected_fb_passed), (
                f"{sid} §7 fallback_review_passed mismatch: got={decision.fallback_review_passed} expected={expected_fb_passed}"
            )
    else:
        # BLOCKED/critical 시나리오 — review_gate 미진입 또는 N/A
        # decision.fallback_review_used 가 fixture와 일치해야 함
        assert decision.fallback_review_used == bool(expected_fb_used), (
            f"{sid} §7 N/A path fallback_used mismatch: got={decision.fallback_review_used} expected={expected_fb_used}"
        )

    # §8 squash merge decision + admin/force/rebase 0건 (fake_runner 자동 검증)
    expected_dec = _resolve_expected_decision(scenario["expected_decision"])
    assert decision.decision == expected_dec, (
        f"{sid} §8 decision mismatch: got={decision.decision} expected={expected_dec}"
    )
    expected_crit = _resolve_critical_code(scenario.get("expected_critical_code"))
    assert decision.critical_code == expected_crit, (
        f"{sid} §8 critical_code mismatch: got={decision.critical_code} expected={expected_crit}"
    )

    # §9 post_merge_smoke status — envelope path 활성화 검증 (verify_merge && AUTO_MERGE 진입 시만)
    expected_smoke = scenario.get("expected_smoke_status")
    if expected_smoke is not None:
        assert decision.smoke_status == expected_smoke, (
            f"{sid} §9 smoke_status mismatch: got={decision.smoke_status} expected={expected_smoke}"
        )
        # smoke_envelope 가 W4 envelope 경로(ctx.smoke_envelope_fn + task_file 주입)를 거쳐
        # 실제 PostMergeSmokeRun 객체로 채워졌는지 검증.
        assert decision.smoke_envelope is not None, (
            f"{sid} §9 smoke_envelope must be populated when verify_merge=true; "
            f"smoke_status={decision.smoke_status}"
        )
        assert decision.smoke_envelope.get("status") == expected_smoke, (
            f"{sid} §9 envelope status mismatch: got={decision.smoke_envelope.get('status')}"
        )
        # PASS 시 allow_continuation=True / FAIL 시 False
        expected_allow = (expected_smoke == "PASS")
        assert decision.smoke_envelope.get("allow_continuation") is expected_allow, (
            f"{sid} §9 envelope allow_continuation mismatch: "
            f"got={decision.smoke_envelope.get('allow_continuation')} expected={expected_allow}"
        )
    else:
        # smoke_command 미정의 또는 evaluate-only 시나리오 — envelope 미생성
        assert decision.smoke_status == "" or decision.smoke_envelope is None, (
            f"{sid} §9 envelope must remain unpopulated for non-smoke scenario, got envelope={decision.smoke_envelope}"
        )

    # §10 후행 PR stale recheck — 직접 호출 + verify_head_lock_then_merge 산출물 검증
    expected_states = scenario.get("expected_following_states") or []
    fq = scenario.get("following_queue") or []
    if fq:
        states = recheck_following_prs(fq, runner)
        assert len(states) == len(expected_states), (
            f"{sid} §10 following count mismatch: got={len(states)} expected={len(expected_states)}"
        )
        for s_actual, s_expected in zip(states, expected_states):
            for k, v in s_expected.items():
                assert s_actual.get(k) == v, (
                    f"{sid} §10 following PR#{s_actual.get('pr_number')} field {k}: got={s_actual.get(k)} expected={v}"
                )
        # AUTO_MERGE_SUCCESS 시나리오는 verify_head_lock_then_merge 가 decision.fixture_pr_replay 에 적재
        if scenario.get("verify_merge") and decision.decision == AUTO_MERGE_SUCCESS:
            replay_states = decision.fixture_pr_replay
            assert replay_states and len(replay_states) >= 1, (
                f"{sid} §10 verify_head_lock_then_merge must populate decision.fixture_pr_replay, got {replay_states}"
            )
            recorded_pr_numbers = {s.get("pr_number") for s in replay_states}
            for ef in fq:
                assert ef["pr_number"] in recorded_pr_numbers, (
                    f"{sid} §10 PR#{ef['pr_number']} missing from decision.fixture_pr_replay"
                )
    else:
        # following queue 없으면 fixture_pr_replay 도 비어있어야 함 (orchestration 누락 방지)
        assert decision.fixture_pr_replay == [] or decision.fixture_pr_replay is None, (
            f"{sid} §10 N/A path expects empty fixture_pr_replay, got {decision.fixture_pr_replay}"
        )

    # §11 critical reportable — pipeline wiring(reporter_hook spy) + Critical 7종 enum 매핑
    expected_esc = scenario.get("expected_escalation_type")
    expected_cls = scenario["expected_classification"]
    reporter_spy = getattr(ctx, "reporter_spy", None)
    if expected_cls == "critical":
        # critical 시나리오 — emit_critical_escalation → reporter_hook(spy) 호출 → decision.escalations 에 결과 기록
        assert reporter_spy is not None, f"{sid} §11 reporter_hook spy must be injected"
        assert len(reporter_spy.observed) >= 1, (
            f"{sid} §11 reporter_hook must be called for critical scenario, got {len(reporter_spy.observed)}"
        )
        first_obs = reporter_spy.observed[0]
        assert first_obs["result"]["classification"] == "critical", (
            f"{sid} §11 spy classification mismatch: {first_obs['result']['classification']}"
        )
        assert first_obs["result"]["packet"] is not None, f"{sid} §11 packet must be set"
        assert first_obs["result"]["escalation_type"] == expected_esc, (
            f"{sid} §11 escalation_type mismatch: got={first_obs['result']['escalation_type']} expected={expected_esc}"
        )
        # decision.escalations 에도 push 됐는지 확인 (실제 wiring 산출물)
        assert decision.escalations and len(decision.escalations) >= 1, (
            f"{sid} §11 decision.escalations must contain at least 1 entry, got {decision.escalations}"
        )
    else:
        # auto-handled 시나리오 — reporter_hook 호출 0건 + non-critical event 추가 검증
        if reporter_spy is not None:
            assert len(reporter_spy.observed) == 0, (
                f"{sid} §11 auto-handled scenario must not invoke reporter_hook, got {len(reporter_spy.observed)}"
            )
        # 추가: 임의 non-critical event_type 으로 process_event → auto-handled
        result = process_event(
            {"task_id": scenario["task_id"], "pr_number": scenario["pr_number"],
             "event_type": "AUTO_HANDLED_DUMMY_EVENT", "source": "e2e_replay",
             "evidence": {}},
            no_audit=True, dry_run=True,
        )
        assert result["classification"] == "auto-handled", (
            f"{sid} §11 expected auto-handled, got {result['classification']}"
        )
        assert result["packet"] is None, f"{sid} §11 packet must be None for auto-handled"

    # §12 non-critical은 audit/evidence만 (회장 보고 packet 0건)
    if expected_cls == "auto-handled":
        assert decision.critical_code is None, (
            f"{sid} §12 critical_code must be None for auto-handled, got {decision.critical_code}"
        )
        assert decision.critical_escalation is None, (
            f"{sid} §12 critical_escalation must be None for auto-handled, got {decision.critical_escalation}"
        )
    else:
        # critical — critical_code 또는 critical_escalation 둘 중 하나 이상 박제
        assert decision.critical_code is not None or decision.critical_escalation is not None, (
            f"{sid} §12 critical scenario must set critical_code or critical_escalation"
        )


# ─── 9 pytest test functions (1 fn = 1 시나리오) ──────────────────────────

@pytest.fixture(scope="module")
def replay():
    return _load_fixtures()


def _scenario_by_id(replay, sid):
    for s in replay["scenarios"]:
        if s["id"] == sid:
            return s
    raise KeyError(sid)


def test_e2e_pr55_clean_auto_merge(replay):
    s = _scenario_by_id(replay, "PR55_clean_auto_merge")
    decision, ctx, runner, spec = _run_pipeline(s)
    _verify_12_items(s, decision, ctx, runner, spec)


def test_e2e_pr57_false_positive_triage(replay):
    s = _scenario_by_id(replay, "PR57_false_positive_triage")
    decision, ctx, runner, spec = _run_pipeline(s)
    _verify_12_items(s, decision, ctx, runner, spec)


def test_e2e_pr58_gemini_quota_fallback(replay):
    s = _scenario_by_id(replay, "PR58_gemini_quota_fallback")
    decision, ctx, runner, spec = _run_pipeline(s)
    _verify_12_items(s, decision, ctx, runner, spec)


def test_e2e_pr64_smoke_pass(replay):
    s = _scenario_by_id(replay, "PR64_smoke_pass")
    decision, ctx, runner, spec = _run_pipeline(s)
    _verify_12_items(s, decision, ctx, runner, spec)


def test_e2e_pr57_contamination_replacement(replay):
    s = _scenario_by_id(replay, "PR57_contamination_replacement")
    decision, ctx, runner, spec = _run_pipeline(s)
    _verify_12_items(s, decision, ctx, runner, spec)


def test_e2e_pr61_review_thread_blocker(replay):
    s = _scenario_by_id(replay, "PR61_review_thread_blocker")
    decision, ctx, runner, spec = _run_pipeline(s)
    _verify_12_items(s, decision, ctx, runner, spec)


def test_e2e_critical_smoke_failure(replay):
    s = _scenario_by_id(replay, "critical_smoke_failure")
    decision, ctx, runner, spec = _run_pipeline(s)
    _verify_12_items(s, decision, ctx, runner, spec)


def test_e2e_critical_forbidden_path(replay):
    s = _scenario_by_id(replay, "critical_forbidden_path")
    decision, ctx, runner, spec = _run_pipeline(s)
    _verify_12_items(s, decision, ctx, runner, spec)


def test_e2e_critical_diff_contamination_replacement_failed(replay):
    s = _scenario_by_id(replay, "critical_diff_contamination_replacement_failed")
    decision, ctx, runner, spec = _run_pipeline(s)
    _verify_12_items(s, decision, ctx, runner, spec)


# ─── 추가 검증: audit append 실재 확인 (Codex G1 권고 반영) ──────────────

def test_audit_append_critical_writes_jsonl(tmp_path):
    """Critical 시나리오에서 process_event(no_audit=False) 호출 시 audit JSONL 실재 박제."""
    audit_root = tmp_path
    result = process_event(
        {
            "task_id": "task-2515",
            "pr_number": 9999,
            "event_type": "FORBIDDEN_PATH_INTRUSION",
            "source": "test_audit_append",
            "evidence": {"effective_files": [".github/workflows/foo.yml"]},
        },
        workspace_root=audit_root,
        no_audit=False,
        dry_run=True,
    )
    assert result["classification"] == "critical"
    assert result["audit_appended"] is True
    audit_log = audit_root / "memory" / "orchestration-audit" / "critical-escalations.jsonl"
    assert audit_log.exists(), f"audit JSONL must exist at {audit_log}"
    raw = audit_log.read_text(encoding="utf-8").strip()
    assert raw, "audit JSONL must contain at least 1 line"
    last = json.loads(raw.splitlines()[-1])
    assert last["task_id"] == "task-2515"
    assert last["classification"] == "critical"
    assert last["escalation_type"] == "FORBIDDEN_PATH_INTRUSION"


def test_audit_append_auto_handled_writes_jsonl(tmp_path):
    """auto-handled 시나리오에서 process_event(no_audit=False) 호출 시 audit-only JSONL 박제."""
    audit_root = tmp_path
    result = process_event(
        {
            "task_id": "task-2515",
            "pr_number": 9998,
            "event_type": "AUTO_HANDLED_DUMMY_EVENT",
            "source": "test_audit_append",
            "evidence": {},
        },
        workspace_root=audit_root,
        no_audit=False,
        dry_run=True,
    )
    assert result["classification"] == "auto-handled"
    assert result["packet"] is None
    assert result["audit_appended"] is True
    audit_log = audit_root / "memory" / "orchestration-audit" / "critical-escalations.jsonl"
    assert audit_log.exists(), f"audit JSONL must exist at {audit_log} (auto-handled append)"
    raw = audit_log.read_text(encoding="utf-8").strip()
    assert raw, "audit JSONL must contain at least 1 line"
    last = json.loads(raw.splitlines()[-1])
    assert last["task_id"] == "task-2515"
    assert last["classification"] == "auto-handled"


# ─── CLI entrypoint (옵션) ─────────────────────────────────────────────

def _cli_main(argv=None):
    parser = argparse.ArgumentParser(description="task-2515 e2e replay harness")
    parser.add_argument("--pr", type=int, help="단일 PR 번호 replay")
    parser.add_argument("--list", action="store_true", help="9 시나리오 목록")
    args = parser.parse_args(argv)
    fixtures = _load_fixtures()
    if args.list:
        for s in fixtures["scenarios"]:
            print(f"  PR#{s['pr_number']:>3}  {s['id']:<55}  [{s['category']}]  → {s['expected_decision']}")
        return 0
    if args.pr:
        for s in fixtures["scenarios"]:
            if s["pr_number"] == args.pr:
                decision, ctx, runner, spec = _run_pipeline(s)
                print(json.dumps(decision.to_dict(), indent=2, ensure_ascii=False, default=str))
                _verify_12_items(s, decision, ctx, runner, spec)
                print(f"\n[PASS] {s['id']} — 12/12 verifications")
                return 0
        print(f"PR #{args.pr} not found in fixtures")
        return 2
    parser.print_help()
    return 0


def test_default_runtime_path_no_ctx_hooks_2516():
    """task-2516: ctx hook 미주입 상태에서 default runtime path가 W1 wiring chain
    (_WIRING_AVAILABLE=True 경로의 ReplacementPRRunner / triage_pr / run_pm_smoke_v2 등)을
    실제로 사용하는지 검증. circular import fix 회귀 방지.
    """
    import importlib
    for mod_name in ("utils.merge_queue_executor", "utils.replacement_pr_runner"):
        if mod_name in sys.modules:
            del sys.modules[mod_name]
    mqe = importlib.import_module("utils.merge_queue_executor")
    assert mqe._WIRING_AVAILABLE is True, "W1 wiring 비활성 — task-2516 fix 회귀"
    # default runtime entry: ExecutorContext 생성 시 hook을 주입하지 않으면
    # 본체가 wiring 심볼(ReplacementPRRunner / triage_pr / run_pm_smoke_v2 / report_critical_event)을
    # 직접 사용해야 한다. None이면 wiring 누락 → blocker.
    assert mqe.ReplacementPRRunner is not None
    assert mqe.triage_pr is not None
    assert mqe.run_pm_smoke_v2 is not None
    assert mqe.report_critical_event is not None


if __name__ == "__main__":
    sys.exit(_cli_main())
