"""tests/regression/test_operational_collector_wiring_2553plus25.py

task-2553+25 — OPERATIONAL COLLECTOR WIRING regression.

§5.3 결선 경로 단위 ①~⑦ — 전건 PASS:
  ① durable-success → seam 1회 호출
  ② verifier 5조건 PASS → remove called
  ③ 각 mismatch/SKIP → preserve, remove 0
  ④ cron-remove 실패 → normal collector success 불변 (디커플)
  ⑤ 기존 DUPLICATE_CALLBACK_IGNORED 무회귀 + 비-PASS seam 미진입
  ⑥ frozen anchor byte-0 (sha 전후 동일) + wiring 모듈 frozen 무수정 import
  ⑦ seam 중복호출 0 / idempotent (동시 2호출 → 1 invoke · 1 no-op)

100% offline. 실 callback cron / cokacdir subprocess / network / git mutation 0.
모든 cron-list = 주입 fake lister, 모든 cron-remove = 주입 fake remover.
실 subprocess 호출 시 즉시 FAIL (차단 spy). 실 운영 cron 실제 삭제 0
(§6 / 9-R.4). live `/home/jay/workspace` git tracked HEAD/branch/ref 전후
assertEqual (§6 / 9-R.5 — repo root 기준).
"""
from __future__ import annotations

import hashlib
import json
import subprocess
import sys
from pathlib import Path

import pytest

WORKSPACE = Path(__file__).resolve().parent.parent.parent
if str(WORKSPACE) in sys.path:
    sys.path.remove(str(WORKSPACE))
sys.path.insert(0, str(WORKSPACE))

from utils.anu_delegation_completion_callback import (  # noqa: E402
    CallbackInput,
    CallbackType,
    Classification,
)
from utils.completion_callback_fallback_cancel import RemoverResult  # noqa: E402
from utils.operational_collector_wiring import (  # noqa: E402
    OperationalSeamParams,
    compute_event_id,
    run_operational_completion_callback_collector,
)

FIXDIR = WORKSPACE / "memory" / "fixtures"
FROZEN = WORKSPACE / "utils" / "anu_delegation_completion_callback.py"
FROZEN_SHA = "83b3e307c8207c76a3e311c408aab4951373bd317896e51687d3007907b0c3d4"


def _fx(name: str) -> dict:
    return json.loads((FIXDIR / f"{name}.json").read_text(encoding="utf-8"))


class FakeCronLister:
    def __init__(self, entries, status="ok"):
        self.entries = entries
        self.status = status
        self.calls = 0

    def __call__(self) -> dict:
        self.calls += 1
        if self.status != "ok":
            return {"status": self.status, "entries": [], "raw": {}}
        return {"status": "ok", "entries": list(self.entries), "raw": {"fake": True}}


class SpyRemover:
    def __init__(self, status: str = "removed"):
        self.status = status
        self.calls: list = []

    def __call__(self, cron_id: str, *, dry_run: bool = True) -> RemoverResult:
        self.calls.append({"cron_id": cron_id, "dry_run": dry_run})
        return RemoverResult(status=self.status, detail=f"fake:{self.status}")


@pytest.fixture(autouse=True)
def _block_real_subprocess(monkeypatch):
    def _boom(*a, **k):  # noqa: ANN001, ANN002, ANN003
        raise AssertionError(
            "실 subprocess 호출 금지 (§6 9-R.4) — 주입 fake lister/remover 만"
        )

    monkeypatch.setattr(subprocess, "run", _boom)


def _git_ref():
    git_dir = WORKSPACE / ".git"
    head_txt = (git_dir / "HEAD").read_text(encoding="utf-8").strip()
    branch = head_txt.split("ref: ", 1)[1] if head_txt.startswith("ref:") else head_txt
    ref_path = git_dir / branch if head_txt.startswith("ref:") else None
    sha = (
        (git_dir / branch).read_text(encoding="utf-8").strip()
        if ref_path and ref_path.exists()
        else head_txt
    )
    return (head_txt, branch, sha)


@pytest.fixture(autouse=True)
def _git_ref_invariant():
    before = _git_ref()
    yield
    assert _git_ref() == before, "git HEAD/branch/ref 변경 감지 (§6 위반)"


def _materialize(fx: dict, tmp: Path) -> dict:
    tmp.mkdir(parents=True, exist_ok=True)
    paths: dict = {}
    dfm = tmp / f"{fx['task_id']}.dispatch-fired.json"
    dfm.write_text(json.dumps(fx["dispatch_fired_marker"]), encoding="utf-8")
    paths["dispatch_fired_marker_path"] = dfm
    rj = tmp / f"{fx['task_id']}.result.json"
    rj.write_text(json.dumps(fx["result_json"]), encoding="utf-8")
    paths["result_json_path"] = rj
    rep = tmp / f"{fx['task_id']}.report.md"
    rep.write_text(fx.get("report_text", ""), encoding="utf-8")
    paths["report_path"] = rep
    crm = tmp / f"{fx['task_id']}.collector-result.json"
    crm.write_text(json.dumps(fx["collector_result_marker"]), encoding="utf-8")
    paths["collector_result_marker_path"] = crm
    paths["fallback_cancelled_marker_path"] = (
        tmp / f"{fx['task_id']}.fallback-cancelled.json"
    )
    paths["cancel_lock_path"] = tmp / f"{fx['task_id']}.cancel.lock"
    paths["audit_path"] = tmp / f"{fx['task_id']}.plus23-cancel-audit.json"
    return paths


def _seam_params(fx: dict, tmp: Path, *, lister, remover) -> OperationalSeamParams:
    p = _materialize(fx, tmp)
    return OperationalSeamParams(
        target_cron_id=fx["target_cron_id"],
        dispatch_fired_marker_path=p["dispatch_fired_marker_path"],
        result_json_path=p["result_json_path"],
        report_path=p["report_path"],
        collector_result_marker_path=p["collector_result_marker_path"],
        audit_path=p["audit_path"],
        cron_lister=lister,
        remover=remover,
        fallback_cancelled_marker_path=p["fallback_cancelled_marker_path"],
        cancel_lock_path=p["cancel_lock_path"],
        callback_contract=fx.get("callback_contract"),
    )


def _pass_input(task_id: str, *, callback_type=CallbackType.NORMAL) -> CallbackInput:
    """frozen collector 가 PASS(durable-success) 를 산출하는 최소 입력.

    ack 획득 + NORMAL + fallback-only 아님 + 필수 marker 부재 0 + Critical7
    0 + forbidden action 0 + anchor match + chair_gated False + dev_sunset
    True → §9 우선순위 6 PASS.
    """
    return CallbackInput(
        task_id=task_id,
        executor="dev2-sim",
        dispatch_cron_id="DISP2500",
        callback_type=callback_type,
        callback_cron_id="NORM2500",
        cron_status="ok",
        task_status="completed",
        required_closeout_markers={"result_json": True, "report": True},
        preservation_anchors={"frozen_anchor": "match"},
        dev_sunset=True,
    )


# ── ① durable-success → seam 1회 호출 ───────────────────────────────────────
def test_01_durable_success_invokes_seam_once(tmp_path):
    fx = _fx("task-2553+25.dry-run")
    spy = SpyRemover("removed")
    lister = FakeCronLister(fx["live_cron_entries"])
    sp = _seam_params(fx, tmp_path, lister=lister, remover=spy)
    res = run_operational_completion_callback_collector(
        _pass_input(fx["task_id"]),
        tmp_path / "ack.json",
        seam_params=sp,
        claim_dir=tmp_path / "claims",
    )
    assert res.collector_result.classification == Classification.PASS
    assert res.durable_success is True
    assert res.seam_invoked is True
    assert res.seam_outcome is not None
    assert lister.calls == 1  # seam 1회 진입 (live 조회 1회)
    assert Path(res.claim_marker_path).exists()


# ── ② verifier 5조건 PASS → remove called ──────────────────────────────────
def test_02_five_conditions_pass_remove_called(tmp_path):
    fx = _fx("task-2553+25.dry-run")
    spy = SpyRemover("removed")
    sp = _seam_params(
        fx, tmp_path, lister=FakeCronLister(fx["live_cron_entries"]), remover=spy
    )
    res = run_operational_completion_callback_collector(
        _pass_input(fx["task_id"]),
        tmp_path / "ack.json",
        seam_params=sp,
        claim_dir=tmp_path / "claims",
    )
    out = res.seam_outcome
    assert out.seam_classification == "PLUS9A_CANCELLED"
    assert out.remove_allowed_by_live_verifier is True
    assert out.cron_remove_invoked is True
    assert out.fallback_cancelled is True
    assert spy.calls == [{"cron_id": "FB25-0001", "dry_run": False}]  # operational
    five = res.cancel_audit["five_condition_results"]
    assert all(
        five[k] is True
        for k in (
            "c1_task_id_match",
            "c2_chat_id_owned",
            "c3_role_fallback",
            "c4_marker_id_crosscheck",
            "c5_pending_not_fired_not_removed",
        )
    )
    assert res.cancel_audit["remove_attempted"] is True
    assert res.cancel_audit["remove_result"] == "CANCELLED"
    assert res.cancel_audit["normal_success_unchanged"] is True


# ── ③ 각 mismatch/SKIP → preserve, remove 0 ────────────────────────────────
@pytest.mark.parametrize(
    "name,expect_sc",
    [
        ("task-2553+23.task-id-mismatch", "SKIP_LIVE_SKIP_MISMATCH"),
        ("task-2553+23.chat-id-mismatch", "SKIP_LIVE_SKIP_MISMATCH"),
        ("task-2553+23.role-not-fallback", "SKIP_LIVE_SKIP_MISMATCH"),
        ("task-2553+23.marker-id-mismatch", "SKIP_LIVE_SKIP_MISMATCH"),
        ("task-2553+23.live-missing", "SKIP_LIVE_SKIP_ALREADY_REMOVED"),
        ("task-2553+23.already-removed", "SKIP_LIVE_SKIP_ALREADY_REMOVED"),
        ("task-2553+23.already-fired", "SKIP_LIVE_SKIP_ALREADY_FIRED"),
    ],
)
def test_03_mismatch_skip_preserves_fallback(tmp_path, name, expect_sc):
    fx = _fx(name)
    spy = SpyRemover("removed")
    sp = _seam_params(
        fx, tmp_path, lister=FakeCronLister(fx["live_cron_entries"]), remover=spy
    )
    res = run_operational_completion_callback_collector(
        _pass_input(fx["task_id"]),
        tmp_path / "ack.json",
        seam_params=sp,
        claim_dir=tmp_path / "claims",
    )
    assert res.seam_invoked is True  # seam 은 진입하되 verifier 가 SKIP
    assert res.seam_outcome.seam_classification == expect_sc
    assert res.seam_outcome.cron_remove_invoked is False
    assert spy.calls == []  # 실 remove 0 — fallback 보존
    assert res.cancel_audit["remove_attempted"] is False
    assert res.cancel_audit["remove_result"] == "NOT_ATTEMPTED"
    assert res.cancel_audit["normal_success_unchanged"] is True
    # collector 성공은 seam SKIP 과 독립
    assert res.collector_result.classification == Classification.PASS


# ── ④ cron-remove 실패 → normal collector success 불변 (디커플) ────────────
def test_04_remove_failure_decoupled_from_collector_success(tmp_path):
    fx = _fx("task-2553+25.dry-run")
    spy = SpyRemover("error")  # remover 가 실패 상태 반환
    sp = _seam_params(
        fx, tmp_path, lister=FakeCronLister(fx["live_cron_entries"]), remover=spy
    )
    res = run_operational_completion_callback_collector(
        _pass_input(fx["task_id"]),
        tmp_path / "ack.json",
        seam_params=sp,
        claim_dir=tmp_path / "claims",
    )
    # collector 성공은 cron-remove 실패와 무관하게 PASS 유지
    assert res.collector_result.classification == Classification.PASS
    assert res.collector_result.closeout_candidate is True
    assert res.seam_outcome.normal_success_preserved is True
    assert res.cancel_audit["normal_success_unchanged"] is True
    assert res.cancel_audit["remove_result"] in ("REMOVE_NONFATAL", "CANCELLED")


def test_04b_seam_exception_decoupled(tmp_path, monkeypatch):
    """seam 자체가 예외를 던져도 collector 결과 그대로 유지 (디커플)."""
    fx = _fx("task-2553+25.dry-run")
    sp = _seam_params(
        fx,
        tmp_path,
        lister=FakeCronLister(fx["live_cron_entries"]),
        remover=SpyRemover("removed"),
    )
    import utils.operational_collector_wiring as wiring

    def _boom(**k):  # noqa: ANN003
        raise RuntimeError("seam blew up")

    monkeypatch.setattr(wiring, "run_operational_cancel_seam", _boom)
    res = wiring.run_operational_completion_callback_collector(
        _pass_input(fx["task_id"]),
        tmp_path / "ack.json",
        seam_params=sp,
        claim_dir=tmp_path / "claims",
    )
    assert res.collector_result.classification == Classification.PASS
    assert res.seam_invoked is False
    assert "디커플" in res.seam_skipped_reason
    assert res.cancel_audit["normal_success_unchanged"] is True


# ── ⑤ DUPLICATE_CALLBACK_IGNORED 무회귀 + 비-PASS seam 미진입 ──────────────
def test_05_duplicate_callback_no_regression_no_seam(tmp_path):
    fx = _fx("task-2553+25.dry-run")
    ack = tmp_path / "shared-ack.json"
    claim = tmp_path / "claims"
    sp1 = _seam_params(
        fx,
        tmp_path / "a",
        lister=FakeCronLister(fx["live_cron_entries"]),
        remover=SpyRemover("removed"),
    )
    first = run_operational_completion_callback_collector(
        _pass_input(fx["task_id"], callback_type=CallbackType.NORMAL),
        ack,
        seam_params=sp1,
        claim_dir=claim,
    )
    spy2 = SpyRemover("removed")
    sp2 = _seam_params(
        fx, tmp_path / "b", lister=FakeCronLister(fx["live_cron_entries"]), remover=spy2
    )
    second = run_operational_completion_callback_collector(
        _pass_input(fx["task_id"], callback_type=CallbackType.FALLBACK_STALE),
        ack,  # 동일 ack → 후발 = ack loser
        seam_params=sp2,
        claim_dir=claim,
    )
    assert first.collector_result.classification == Classification.PASS
    assert first.seam_invoked is True
    # 기존 dedup 안전망 무회귀
    assert (
        second.collector_result.classification
        == Classification.DUPLICATE_CALLBACK_IGNORED
    )
    assert second.collector_result.closeout_candidate is False
    # 비-PASS → operational cancel seam 진입 0
    assert second.durable_success is False
    assert second.seam_invoked is False
    assert spy2.calls == []


# ── ⑥ frozen anchor byte-0 ─────────────────────────────────────────────────
def test_06_frozen_anchor_byte0():
    sha = hashlib.sha256(FROZEN.read_bytes()).hexdigest()
    assert sha == FROZEN_SHA, f"frozen anchor 변경 감지: {sha}"


def test_06b_wiring_imports_frozen_readonly_no_mutation():
    """wiring 모듈은 frozen 을 read-only import 만 — frozen 무수정 재확인."""
    import ast

    src = (WORKSPACE / "utils" / "operational_collector_wiring.py").read_text(
        encoding="utf-8"
    )
    tree = ast.parse(src)
    froms = [
        n.module
        for n in ast.walk(tree)
        if isinstance(n, ast.ImportFrom) and n.module
    ]
    assert "utils.anu_delegation_completion_callback" in froms
    # frozen sha 는 import 후에도 불변
    assert hashlib.sha256(FROZEN.read_bytes()).hexdigest() == FROZEN_SHA


# ── ⑦ seam 중복호출 0 / idempotent (동시 2호출 → 1 invoke · 1 no-op) ───────
def test_07_exact_once_atomic_claim(tmp_path):
    fx = _fx("task-2553+25.dry-run")
    claim = tmp_path / "claims"
    # 동일 event_id (동일 inp/seam_params/결정입력) 인데 ack 는 분리 → 둘 다
    # durable-success 도달. atomic claim 이 정확히 1회만 seam 진입 허용.
    inp = _pass_input(fx["task_id"])
    # 동일 논리 durable-success 이벤트 = 동일 seam_params(동일 dispatch-fired
    # marker 경로·결정입력). 둘 다 PASS 도달을 위해 ack 만 분리 (ack 경로는
    # event_id 비결정 — 9-R.3). 공유 claim_dir 의 atomic claim 이 정확히
    # 1회만 seam 진입 허용.
    sp = _seam_params(
        fx, tmp_path / "shared", lister=FakeCronLister(fx["live_cron_entries"]),
        remover=SpyRemover("removed"),
    )
    r1 = run_operational_completion_callback_collector(
        inp, tmp_path / "ack1.json", seam_params=sp, claim_dir=claim
    )
    r2 = run_operational_completion_callback_collector(
        inp, tmp_path / "ack2.json", seam_params=sp, claim_dir=claim
    )
    assert r1.event_id == r2.event_id  # 동일 durable-success event identity
    invoked = [r1.seam_invoked, r2.seam_invoked]
    assert invoked.count(True) == 1, "exact-once 위반 — seam 2회 진입"
    assert invoked.count(False) == 1
    noop = r1 if not r1.seam_invoked else r2
    assert "exact-once" in noop.seam_skipped_reason or "claim" in (
        noop.seam_skipped_reason
    )
    # claim marker 1개만 생성 (event_id 기반)
    claims = list(claim.glob("task-2553+25.seam-claim.*.json"))
    assert len(claims) == 1


def test_07b_event_id_pre_seam_stable_no_resultjson_reread(tmp_path):
    """9-R.3 — event_id 는 in-hand 입력만으로 도출, result.json 재독 0.

    result.json 파일을 삭제해도 동일 collector 결과로 동일 event_id 가
    재현됨을 보여 pre-seam 안정성/비재독을 입증한다.
    """
    fx = _fx("task-2553+25.dry-run")
    sp = _seam_params(
        fx, tmp_path, lister=FakeCronLister(fx["live_cron_entries"]),
        remover=SpyRemover("removed"),
    )
    res = run_operational_completion_callback_collector(
        _pass_input(fx["task_id"]),
        tmp_path / "ack.json",
        seam_params=sp,
        claim_dir=tmp_path / "claims",
    )
    eid_runtime = res.event_id
    # result.json 삭제 후에도 동일 결과로부터 event_id 재계산 일치
    sp.result_json_path.unlink()
    eid_recompute = compute_event_id(
        task_id=fx["task_id"],
        fallback_cron_id=fx["target_cron_id"],
        dispatch_fired_marker_path=sp.dispatch_fired_marker_path,
        result=res.collector_result,
    )
    assert eid_runtime == eid_recompute


if __name__ == "__main__":
    raise SystemExit(pytest.main([__file__, "-q"]))
