"""tests/regression/test_completion_callback_fallback_cancel_2553plus9a.py

task-2553+9a — CALLBACK_FALLBACK_CANCEL_ON_SUCCESS regression.

§6 필수 테스트 10 + §9-R.3 adversarial 추가 6 = 총 16.

100% offline. 실 callback cron / cokacdir subprocess / network / git 0.
모든 cron-remove 는 **주입된 fake remover**(§9-R.5). 실 subprocess `--cron-remove`
호출 시 테스트 즉시 FAIL(§9-R.3-13: subprocess.run 전면 차단 spy).

callback orchestrator(utils/anu_delegation_completion_callback.py) 는 본 테스트
어디에서도 import/호출하지 않는다 — §9-R.4 분리 seam 검증.
"""
from __future__ import annotations

import json
import sys
import threading
from pathlib import Path

import pytest

WORKSPACE = Path(__file__).resolve().parent.parent.parent
if str(WORKSPACE) in sys.path:
    sys.path.remove(str(WORKSPACE))
sys.path.insert(0, str(WORKSPACE))

from utils.completion_callback_fallback_cancel import (  # noqa: E402
    CancelClassification,
    RealCokacdirCronRemover,
    RemoverResult,
    cancel_fallback_on_success,
    evaluate_durable_evidence,
)

FIXDIR = WORKSPACE / "memory" / "fixtures"


# ── 헬퍼: fixture → tmp_path 산출물 materialize ─────────────────────────────


def _fx(name: str) -> dict:
    return json.loads((FIXDIR / f"{name}.json").read_text(encoding="utf-8"))


def _materialize(fx: dict, tmp: Path) -> dict:
    """fixture 의 marker/result/report/collector-result 를 tmp_path 에 기록."""
    paths: dict = {}

    dfm = tmp / f"{fx['task_id']}.dispatch-fired.json"
    dfm.write_text(json.dumps(fx["dispatch_fired_marker"]), encoding="utf-8")
    paths["dispatch_fired_marker_path"] = dfm

    rj = tmp / f"{fx['task_id']}.result.json"
    rj.write_text(json.dumps(fx["result_json"]), encoding="utf-8")
    paths["result_json_path"] = rj

    rep = tmp / f"{fx['task_id']}.report.md"
    rep.write_text(fx.get("report_text", ""), encoding="utf-8")
    paths["report_path"] = rep

    crm = tmp / f"{fx['task_id']}.collector-result.json"
    if fx.get("collector_result_marker_present", True) and "collector_result_marker" in fx:
        crm.write_text(
            json.dumps(fx["collector_result_marker"]), encoding="utf-8"
        )
    paths["collector_result_marker_path"] = crm

    paths["fallback_cancelled_marker_path"] = tmp / f"{fx['task_id']}.fallback-cancelled.json"
    paths["cancel_lock_path"] = tmp / f"{fx['task_id']}.cancel.lock"
    return paths


class SpyRemover:
    """주입 fake remover — 호출 인자/횟수 기록. 실 subprocess 0."""

    def __init__(self, status: str = "removed"):
        self.status = status
        self.calls: list = []

    def __call__(self, cron_id: str, *, dry_run: bool = True) -> RemoverResult:
        self.calls.append({"cron_id": cron_id, "dry_run": dry_run})
        return RemoverResult(status=self.status, detail=f"fake:{self.status}")


@pytest.fixture(autouse=True)
def _block_real_subprocess(monkeypatch):
    """§9-R.3-13 — 어떤 테스트에서도 실 subprocess --cron-remove 호출 0 강제."""
    import subprocess as _sp

    def _boom(*a, **k):  # noqa: ANN001, ANN002, ANN003
        raise AssertionError(
            "실 subprocess 호출 금지 (§9-R.5) — 주입 fake remover 만 허용"
        )

    monkeypatch.setattr(_sp, "run", _boom)


def _run(fx_name: str, tmp_path: Path, remover, **overrides):
    fx = _fx(fx_name)
    paths = _materialize(fx, tmp_path)
    kwargs = dict(
        task_id=fx["task_id"],
        target_cron_id=fx["target_cron_id"],
        normal_collector_success=fx.get("normal_collector_success_aux", False),
        callback_contract=fx.get("callback_contract"),
        remover=remover,
        dry_run=True,
        **paths,
    )
    kwargs.update(overrides)
    return fx, cancel_fallback_on_success(**kwargs)


# ── §6 필수 테스트 1~10 ─────────────────────────────────────────────────────


def test_01_success_pending_cancelled(tmp_path):
    """1. normal success + fallback pending → cron remove 호출, fallback_cancelled=true"""
    spy = SpyRemover("removed")
    fx, d = _run("callback_fallback_cancel_success", tmp_path, spy)
    assert d.classification == CancelClassification.CANCELLED
    assert d.fallback_cancelled is True
    assert d.cron_remove_invoked is True
    assert spy.calls == [{"cron_id": fx["target_cron_id"], "dry_run": True}]
    marker = json.loads(
        (tmp_path / f"{fx['task_id']}.fallback-cancelled.json").read_text()
    )
    assert marker["fallback_cancelled"] is True


def test_02_already_deleted_idempotent(tmp_path):
    """2. normal success + fallback already deleted → no failure, idempotent"""
    spy = SpyRemover("already_gone")
    _, d = _run("callback_fallback_cancel_success", tmp_path, spy)
    assert d.classification == CancelClassification.ALREADY_GONE
    assert d.fallback_cancelled is False
    assert d.cron_remove_invoked is True
    assert d.hold_reasons == []


def test_03_already_fired_duplicate_path(tmp_path):
    """3. normal success + fallback already fired → no failure, duplicate path 유지"""
    spy = SpyRemover("already_fired")
    _, d = _run("callback_fallback_cancel_already_fired", tmp_path, spy)
    assert d.classification == CancelClassification.ALREADY_FIRED
    assert d.fallback_cancelled is False
    assert "DUPLICATE_CALLBACK_IGNORED" in d.cancel_skipped_reason


def test_04_normal_failure_no_remove(tmp_path):
    """4. normal collector failure → fallback remove 호출 금지"""
    spy = SpyRemover("removed")
    _, d = _run("callback_normal_failed_fallback_remains", tmp_path, spy)
    assert d.classification == CancelClassification.SKIPPED_NORMAL_FAILED
    assert d.cron_remove_invoked is False
    assert d.fallback_cancelled is False
    assert spy.calls == []  # remove 절대 미호출 → fallback 보존


def test_05_fallback_id_missing_reason_recorded(tmp_path):
    """5. fallback_cron_id missing → cancel_skipped_reason 기록 (HOLD reason 동반)"""
    spy = SpyRemover("removed")
    fx = _fx("callback_fallback_cancel_success")
    paths = _materialize(fx, tmp_path)
    # dispatch-fired marker 자체를 제거 → 권위 부재
    paths["dispatch_fired_marker_path"].unlink()
    d = cancel_fallback_on_success(
        task_id=fx["task_id"],
        target_cron_id=fx["target_cron_id"],
        normal_collector_success=True,
        remover=spy,
        dry_run=True,
        **paths,
    )
    assert d.classification == CancelClassification.SKIPPED_UNTRUSTED
    assert d.cancel_skipped_reason
    assert spy.calls == []
    assert any("marker 부재" in r for r in d.hold_reasons)


def test_06_wrong_chat_id_no_remove(tmp_path):
    """6. wrong chat_id fallback → remove 금지"""
    spy = SpyRemover("removed")
    fx = _fx("callback_fallback_cancel_success")
    fx["dispatch_fired_marker"]["callback_policy_a"]["chat_id"] = 9999999999
    paths = _materialize(fx, tmp_path)
    d = cancel_fallback_on_success(
        task_id=fx["task_id"],
        target_cron_id=fx["target_cron_id"],
        normal_collector_success=True,
        remover=spy,
        dry_run=True,
        **paths,
    )
    assert d.classification == CancelClassification.SKIPPED_UNTRUSTED
    assert d.safe_remove_checks["c3_ownership"] is False
    assert spy.calls == []


def test_07_id_mismatch_no_remove(tmp_path):
    """7. fallback id mismatch → remove 금지"""
    spy = SpyRemover("removed")
    fx, d = _run(
        "callback_fallback_cancel_success",
        tmp_path,
        spy,
        target_cron_id="WRONG-ID-XYZ",
    )
    assert d.classification == CancelClassification.SKIPPED_UNTRUSTED
    assert d.safe_remove_checks["c1_marker_id_matches"] is False
    assert spy.calls == []


def test_08_remove_failed_warning_collector_success_preserved(tmp_path):
    """8. cron remove 실패 → warning marker, collector success 유지"""
    spy = SpyRemover("failed")
    _, d = _run("callback_fallback_cancel_success", tmp_path, spy)
    assert d.classification == CancelClassification.REMOVE_FAILED_WARNING
    assert d.cron_remove_invoked is True
    assert d.fallback_cancelled is False
    assert "실패로 바꾸지 않" in d.cancel_skipped_reason


def test_09_duplicate_callback_path_no_regression(tmp_path):
    """9. duplicate callback path 기존 regression 무회귀.

    본 메커니즘은 orchestrator 와 분리(§9-R.4) — dedup 경로 코드/테스트 무회귀를
    실제 기존 dedup regression 모듈을 재실행하여 입증한다.
    """
    import importlib

    sys.path.insert(0, str(WORKSPACE / "tests" / "regression"))
    dup = importlib.import_module(
        "test_completion_callback_dup_ignored_realworld_2553plus1"
    )
    fxd = dup._fixture()
    ackp = tmp_path / "dup.callback-ack.json"
    first = dup.run_completion_callback_collector(
        dup._normal_input(fxd), ackp, post_result_review_fn=dup._mock_review_advisory
    )
    second = dup.run_completion_callback_collector(
        dup._fallback_input(fxd), ackp, post_result_review_fn=dup._mock_review_advisory
    )
    assert first.classification == dup.Classification.PASS
    assert second.classification == dup.Classification.DUPLICATE_CALLBACK_IGNORED
    assert second.closeout_candidate is False
    # 본 모듈이 orchestrator 를 import/호출하지 않음(§9-R.4 분리 seam) — AST 정적 증명
    import ast

    src = (
        WORKSPACE / "utils" / "completion_callback_fallback_cancel.py"
    ).read_text(encoding="utf-8")
    tree = ast.parse(src)
    imported: list = []
    for node in ast.walk(tree):
        if isinstance(node, ast.Import):
            imported += [a.name for a in node.names]
        elif isinstance(node, ast.ImportFrom):
            imported.append(node.module or "")
    assert not any(
        "anu_delegation_completion_callback" in (m or "") for m in imported
    ), f"orchestrator import 결합 발견: {imported}"


def test_10_callback_policy_a_collector_only_authority(tmp_path):
    """10. callback policy(a) collector-only 권한 유지 — 본 모듈은 fallback cron
    제거 메커니즘만 제공, write/closeout/재가동 코드 경로 0."""
    src = (
        WORKSPACE / "utils" / "completion_callback_fallback_cancel.py"
    ).read_text(encoding="utf-8")
    for forbidden in ("git push", "gh pr", "--merge", "load_owner_pat", "closeout"):
        assert forbidden not in src
    # 실제 동작도 cron remove 1회 외 부수효과 없음
    spy = SpyRemover("removed")
    _, d = _run("callback_fallback_cancel_success", tmp_path, spy)
    assert len(spy.calls) == 1
    assert d.cron_remove_invoked is True


# ── §9-R.3 adversarial 추가 11~16 ───────────────────────────────────────────


def test_11_concurrent_cancel_vs_fire_single_result(tmp_path):
    """11. concurrent: normal-success-cancel vs fallback-fire race → 단일 결과."""
    fx = _fx("callback_fallback_cancel_success")
    paths = _materialize(fx, tmp_path)
    results: list = []

    def worker():
        spy = SpyRemover("removed")
        d = cancel_fallback_on_success(
            task_id=fx["task_id"],
            target_cron_id=fx["target_cron_id"],
            normal_collector_success=True,
            callback_contract=fx.get("callback_contract"),
            remover=spy,
            dry_run=True,
            **paths,
        )
        results.append((d, spy))

    threads = [threading.Thread(target=worker) for _ in range(8)]
    for t in threads:
        t.start()
    for t in threads:
        t.join()

    cancelled = [r for r in results if r[0].classification == CancelClassification.CANCELLED]
    losers = [
        r for r in results if r[0].classification == CancelClassification.ALREADY_FIRED
    ]
    # 정확히 1개만 실제 cancel(이중 처리·재escalate 0), 나머지는 lock-loser
    assert len(cancelled) == 1
    assert len(losers) == len(results) - 1
    total_remove_calls = sum(len(r[1].calls) for r in results)
    assert total_remove_calls == 1


def test_12_same_owner_other_live_task_blocked(tmp_path):
    """12. same-owner 이나 다른 live task 의 cron id → task_id binding 불일치 → remove 0"""
    spy = SpyRemover("removed")
    fx = _fx("callback_fallback_cancel_success")
    # ownership 동일하지만 marker.task_id 가 다른 live task
    fx["dispatch_fired_marker"]["task_id"] = "task-OTHER-live"
    paths = _materialize(fx, tmp_path)
    d = cancel_fallback_on_success(
        task_id=fx["task_id"],  # 처리 task 와 marker task 불일치
        target_cron_id=fx["target_cron_id"],
        normal_collector_success=True,
        callback_contract=fx.get("callback_contract"),
        remover=spy,
        dry_run=True,
        **paths,
    )
    assert d.classification == CancelClassification.SKIPPED_UNTRUSTED
    assert d.safe_remove_checks["c2_task_binding"] is False
    assert spy.calls == []


def test_13_enforced_mock_transport_no_real_subprocess():
    """13. enforced mock transport — 실 subprocess --cron-remove 호출 시 FAIL.

    autouse fixture 가 subprocess.run 을 차단. 기본 RealCokacdirCronRemover 의
    dry_run=True 경로는 subprocess 를 호출하지 않음을 동적 증명.
    """
    real = RealCokacdirCronRemover()
    rr = real("ANYID", dry_run=True)  # subprocess 미호출 (차단되어도 통과)
    assert rr.status == "removed"
    assert rr.raw and rr.raw["dry_run"] is True
    # no_dry_run 이면 차단 spy 가 AssertionError 발생 — 실 호출 0 강제 입증
    with pytest.raises(AssertionError):
        real("ANYID", dry_run=False)


def test_14_two_source_extraction_cross_check(tmp_path):
    """14. fallback_cron_id 2-source(marker AND contract) 양쪽 경로 검증."""
    spy = SpyRemover("removed")
    # contract 가 marker 권위 id 와 불일치 → 신뢰 박탈, remove 0
    _, d = _run(
        "callback_fallback_cancel_success",
        tmp_path,
        spy,
        callback_contract={"fallback_callback_cron_id": "CONTRACT-DIFFERENT"},
    )
    assert d.classification == CancelClassification.SKIPPED_UNTRUSTED
    assert d.safe_remove_checks["contract_cross_check"] == "MISMATCH"
    assert spy.calls == []
    # contract 가 marker 와 일치 → 정상 CANCELLED
    spy2 = SpyRemover("removed")
    _, d2 = _run("callback_fallback_cancel_success", tmp_path, spy2)
    assert d2.classification == CancelClassification.CANCELLED
    assert d2.safe_remove_checks["contract_cross_check"] == "match"


def test_15_crash_window_fallback_preserved(tmp_path):
    """15. crash window: marker 생성 ~ cancel 호출 사이 중단 → fallback 보존.

    crash 를 'collector-result marker 미생성(durable evidence 미완)'으로 모사 →
    SKIPPED_NORMAL_FAILED, remove 0."""
    spy = SpyRemover("removed")
    fx = _fx("callback_fallback_cancel_success")
    paths = _materialize(fx, tmp_path)
    paths["collector_result_marker_path"].unlink()  # crash: marker 미완
    d = cancel_fallback_on_success(
        task_id=fx["task_id"],
        target_cron_id=fx["target_cron_id"],
        normal_collector_success=True,
        callback_contract=fx.get("callback_contract"),
        remover=spy,
        dry_run=True,
        **paths,
    )
    assert d.classification == CancelClassification.SKIPPED_NORMAL_FAILED
    assert spy.calls == []


def test_16_boolean_true_but_no_durable_evidence_skipped(tmp_path):
    """16. durable evidence 부재인데 boolean=true 주입 → SKIPPED_NORMAL_FAILED (§9-R.2)."""
    spy = SpyRemover("removed")
    _, d = _run(
        "callback_normal_failed_fallback_remains",
        tmp_path,
        spy,
        normal_collector_success=True,  # boolean 강제 true
    )
    assert d.classification == CancelClassification.SKIPPED_NORMAL_FAILED
    assert d.durable_evidence["caller_boolean_aux"] is True
    assert d.durable_evidence["satisfied"] is False
    assert spy.calls == []


def test_durable_evidence_unit(tmp_path):
    """보조 단위 — durable evidence 평가기 직접 검증."""
    rj = tmp_path / "r.json"
    rep = tmp_path / "r.md"
    crm = tmp_path / "c.json"
    rj.write_text(json.dumps({"status": "completed"}), encoding="utf-8")
    rep.write_text("ok", encoding="utf-8")
    crm.write_text("{}", encoding="utf-8")
    ev = evaluate_durable_evidence(
        result_json_path=rj, report_path=rep, collector_result_marker_path=crm
    )
    assert ev["satisfied"] is True
    rj.write_text(json.dumps({"status": "HOLD_FOR_CHAIR"}), encoding="utf-8")
    ev2 = evaluate_durable_evidence(
        result_json_path=rj, report_path=rep, collector_result_marker_path=crm
    )
    assert ev2["satisfied"] is False


if __name__ == "__main__":
    raise SystemExit(pytest.main([__file__, "-q"]))
