"""tests/regression/test_dispatch_callback_contract.py

task-2614 §7b — 회장 2026-05-20 야간 필수 보강. dispatch callback/
progress-trigger 인프라(`anu_v3.dispatch_callback_contract`)의 런타임
계약 regression. 실 모듈 + 실 entrypoint 검증(mock-only FAIL).

PASS 필수 케이스 (회장 verbatim §7b regression):
  (a) normal present → CONTRACT_OK · fallback cancel-on-success
  (b) normal missing · fallback present → fallback recovery 경로(정상 안전망)
  (c) result + normal missing + fallback missing → DISPATCH_CONTRACT_VIOLATION
      + recovery watcher required (회장 필수 fixture case-callback-gap)
  (d) recovery watcher idempotent — 동일 task 2회 호출 시 spawn 정확히 1회
  (e) recovery watcher 가 fixed-time/dead-man 아님 (조건 미충족 시 no-op)
  (f) executor self-key collector/adjudication/dispatch → fail-closed
"""
from __future__ import annotations

import json
import sys
from pathlib import Path

import jsonschema
import pytest

WORKSPACE = Path(__file__).resolve().parent.parent.parent
if str(WORKSPACE) not in sys.path:
    sys.path.insert(0, str(WORKSPACE))

import anu_v3.dispatch_callback_contract as DCC  # noqa: E402
from anu_v3.dispatch_callback_contract import (  # noqa: E402
    CONTRACT_OK,
    DISPATCH_CONTRACT_VIOLATION,
    EXECUTOR_SELF_KEY_FORBIDDEN,
    FALLBACK_RECOVERY,
    INDEPENDENT_ANU_KEY,
    ExecutorSelfKeyForbidden,
    RecoveryWatcher,
    classify_dispatch_contract,
    evaluate,
    run_self_check,
)

FIXTURES = WORKSPACE / "memory" / "fixtures"
SCHEMA_PATH = WORKSPACE / "schemas" / "dispatch_callback_contract.schema.json"
CALLBACK_GAP = FIXTURES / "task-2614.case-callback-gap.json"


def _schema() -> dict:
    return json.loads(SCHEMA_PATH.read_text(encoding="utf-8"))


def _gap_observation() -> dict:
    return json.loads(CALLBACK_GAP.read_text(encoding="utf-8"))["observation"]


# ── (a) ─────────────────────────────────────────────────────────────────────
def test_a_normal_present_contract_ok_fallback_cancel_on_success() -> None:
    rec = classify_dispatch_contract(
        task_id="task-2614",
        normal_callback_present=True,
        fallback_present=True,
        result_present=True,
    )
    assert rec.classification == CONTRACT_OK
    assert rec.fallback_cancel_on_success is True
    assert rec.recovery_required is False
    assert rec.collector_key == INDEPENDENT_ANU_KEY
    jsonschema.Draft7Validator(_schema()).validate(rec.to_dict())


# ── (b) ─────────────────────────────────────────────────────────────────────
def test_b_normal_missing_fallback_present_is_safety_net() -> None:
    rec = classify_dispatch_contract(
        task_id="task-2614",
        normal_callback_present=False,
        fallback_present=True,
        result_present=True,
    )
    assert rec.classification == FALLBACK_RECOVERY
    assert rec.recovery_required is False
    assert rec.recovery_is_fixed_time_or_dead_man is False
    jsonschema.Draft7Validator(_schema()).validate(rec.to_dict())


# ── (c) 회장 필수 fixture ────────────────────────────────────────────────────
def test_c_callback_gap_is_violation_and_requires_recovery() -> None:
    obs = _gap_observation()
    rec = evaluate(obs)
    assert rec.classification == DISPATCH_CONTRACT_VIOLATION
    assert rec.recovery_required is True
    assert rec.result_present is True
    assert rec.normal_callback_present is False
    assert rec.fallback_present is False
    assert rec.recovery_is_fixed_time_or_dead_man is False
    jsonschema.Draft7Validator(_schema()).validate(rec.to_dict())

    fx = json.loads(CALLBACK_GAP.read_text(encoding="utf-8"))
    exp = fx["expected"]
    assert rec.classification == exp["classification"]
    assert rec.recovery_required == exp["recovery_required"]


# ── (d) idempotent ──────────────────────────────────────────────────────────
def test_d_recovery_watcher_idempotent_exactly_one_spawn() -> None:
    calls: list = []
    w = RecoveryWatcher(lambda tid, key: calls.append((tid, key)))
    obs = _gap_observation()
    r1 = w.maybe_spawn(obs)
    r2 = w.maybe_spawn(obs)
    r3 = w.maybe_spawn(obs)
    assert r1["spawned"] is True
    assert r2["spawned"] is False and r2["duplicate_suppressed"] is True
    assert r3["spawned"] is False and r3["duplicate_suppressed"] is True
    assert len(calls) == 1
    assert calls[0][1] == INDEPENDENT_ANU_KEY
    assert len(w.spawned_keys) == 1


def test_d_idempotency_survives_shared_ledger() -> None:
    """영속 idempotency ledger 주입 시 새 watcher 인스턴스에서도 중복 0."""
    seen: set = set()
    obs = _gap_observation()
    w1 = RecoveryWatcher(lambda t, k: "s", seen=seen)
    w2 = RecoveryWatcher(lambda t, k: "s", seen=seen)
    assert w1.maybe_spawn(obs)["spawned"] is True
    assert w2.maybe_spawn(obs)["spawned"] is False  # 다른 인스턴스도 1회 보장


# ── (e) no fixed-time / dead-man ────────────────────────────────────────────
def test_e_recovery_watcher_is_not_fixed_time_or_dead_man() -> None:
    """조건 미충족 시 no-op — 시간 경과만으로 발화하는 진행트리거 0."""
    calls: list = []
    w = RecoveryWatcher(lambda tid, key: calls.append(tid))

    # normal present → 조건 미충족 → no-op
    r_ok = w.maybe_spawn(
        {"task_id": "t", "normal_callback_present": True,
         "fallback_present": False, "result_present": True}
    )
    # fallback present (안전망) → 조건 미충족 → no-op
    r_fb = w.maybe_spawn(
        {"task_id": "t2", "normal_callback_present": False,
         "fallback_present": True, "result_present": True}
    )
    # result 부재 → 위반이나 spawn 조건 미충족 → no-op
    r_nores = w.maybe_spawn(
        {"task_id": "t3", "normal_callback_present": False,
         "fallback_present": False, "result_present": False}
    )
    assert r_ok["spawned"] is False
    assert r_fb["spawned"] is False
    assert r_nores["spawned"] is False
    assert r_nores["classification"] == DISPATCH_CONTRACT_VIOLATION
    assert calls == []
    for r in (r_ok, r_fb, r_nores):
        assert r["fixed_time_or_dead_man"] is False


# ── (f) executor self-key fail-closed ───────────────────────────────────────
def test_f_executor_self_key_collector_fail_closed() -> None:
    with pytest.raises(ExecutorSelfKeyForbidden):
        classify_dispatch_contract(
            task_id="task-2614",
            normal_callback_present=True,
            fallback_present=True,
            result_present=True,
            collector_key=EXECUTOR_SELF_KEY_FORBIDDEN,
        )


def test_f_executor_self_key_recovery_watcher_fail_closed() -> None:
    with pytest.raises(ExecutorSelfKeyForbidden):
        RecoveryWatcher(lambda t, k: None,
                        collector_key=EXECUTOR_SELF_KEY_FORBIDDEN)


def test_f_arbitrary_non_anu_key_also_fail_closed() -> None:
    with pytest.raises(ExecutorSelfKeyForbidden):
        classify_dispatch_contract(
            task_id="task-2614",
            normal_callback_present=True,
            fallback_present=False,
            result_present=True,
            collector_key="deadbeefdeadbeef",
        )


# ── schema forward-reject ───────────────────────────────────────────────────
def test_schema_forward_rejects_weakened_invariants() -> None:
    v = jsonschema.Draft7Validator(_schema())
    good = classify_dispatch_contract(
        task_id="t", normal_callback_present=False,
        fallback_present=False, result_present=True,
    ).to_dict()
    v.validate(good)
    # recovery_is_fixed_time_or_dead_man 약화 시도 → forward-reject
    weak = dict(good)
    weak["recovery_is_fixed_time_or_dead_man"] = True
    with pytest.raises(jsonschema.ValidationError):
        v.validate(weak)
    # executor_self_key_forbidden 약화 시도 → forward-reject
    weak2 = dict(good)
    weak2["executor_self_key_forbidden"] = False
    with pytest.raises(jsonschema.ValidationError):
        v.validate(weak2)


# ── mock-only FAIL (실 entrypoint 입증) ─────────────────────────────────────
def test_self_check_real_entrypoint_passes() -> None:
    res = run_self_check(str(CALLBACK_GAP))
    assert res["all_passed"] is True, res
    assert res["mock_only_would_fail"] is True
    assert callable(DCC.classify_dispatch_contract)


def test_mock_only_constant_classifier_would_fail() -> None:
    """상수 분류기(항상 CONTRACT_OK)는 (c) callback-gap 에서 반드시 FAIL —
    실 로직 입증."""
    a = classify_dispatch_contract(
        task_id="t", normal_callback_present=True,
        fallback_present=True, result_present=True,
    ).classification
    c = evaluate(_gap_observation()).classification
    assert a == CONTRACT_OK
    assert c == DISPATCH_CONTRACT_VIOLATION
    assert a != c


# ── task-2620 §2.2 H1/H2/H3 hardening regression (additive) ─────────────────
# 회장 2026-05-20 — task-2614 forward-candidate (D 갈래) 봉합 검증.
# 기존 12 케이스 무회귀 + H1/H2/H3 + wiring proof 추가.


def test_h1_classify_truthy_non_bool_inputs_normalized_to_bool() -> None:
    """H1: mixed-signal truthy 비-bool 입력은 명시적 bool 로 정규화돼야 한다.

    분류 분기는 truthy 동등하지만 reocrd 의 signal 필드는 진짜 bool 만 노출
    (downstream 약화/JSON-schema validation 회피 차단).
    """
    rec = classify_dispatch_contract(
        task_id="task-h1-truthy",
        normal_callback_present=1,           # truthy int
        fallback_present="yes",               # truthy str
        result_present=[1, 2, 3],             # truthy list
    )
    assert rec.classification == CONTRACT_OK
    # 명시적 bool 정규화 — record 에는 절대 1/"yes"/list 가 남지 않는다.
    assert rec.normal_callback_present is True
    assert rec.fallback_present is True
    assert rec.result_present is True


def test_h1_classify_falsy_non_bool_inputs_normalized_to_bool() -> None:
    """H1: falsy 비-bool (0/""/None/[]) → False 정규화 → 약화 0."""
    rec = classify_dispatch_contract(
        task_id="task-h1-falsy",
        normal_callback_present=0,            # falsy int
        fallback_present="",                  # falsy str
        result_present=None,                  # falsy None
    )
    # 모든 falsy 입력 → fallback/normal/result 부재 → DISPATCH_CONTRACT_VIOLATION
    assert rec.classification == DISPATCH_CONTRACT_VIOLATION
    assert rec.normal_callback_present is False
    assert rec.fallback_present is False
    assert rec.result_present is False


def test_h1_classify_mixed_truthy_falsy_boundary() -> None:
    """H1: normal=truthy + fallback=falsy + result=falsy 경계.

    혼합 신호: normal truthy → CONTRACT_OK (fallback/result 와 무관).
    """
    rec = classify_dispatch_contract(
        task_id="task-h1-mixed",
        normal_callback_present="present",    # truthy
        fallback_present=0,                    # falsy
        result_present=False,                  # falsy
    )
    assert rec.classification == CONTRACT_OK
    assert rec.normal_callback_present is True
    assert rec.fallback_present is False
    # fallback_present=False 이므로 cancel-on-success 도 False
    assert rec.fallback_cancel_on_success is False


def test_h2_evaluate_none_observation_fail_closed() -> None:
    """H2: None observation → InvalidObservation fail-closed."""
    from anu_v3.dispatch_callback_contract import InvalidObservation
    with pytest.raises(InvalidObservation):
        evaluate(None)  # type: ignore[arg-type]


def test_h2_evaluate_empty_dict_fail_closed() -> None:
    """H2: 빈 dict observation → InvalidObservation."""
    from anu_v3.dispatch_callback_contract import InvalidObservation
    with pytest.raises(InvalidObservation):
        evaluate({})


def test_h2_evaluate_wrong_type_fail_closed() -> None:
    """H2: dict 가 아닌 입력 (list/str/int) → InvalidObservation."""
    from anu_v3.dispatch_callback_contract import InvalidObservation
    for bad in (["task-2614"], "task-2614", 42):
        with pytest.raises(InvalidObservation):
            evaluate(bad)  # type: ignore[arg-type]


def test_h2_evaluate_missing_required_key_fail_closed() -> None:
    """H2: required key (task_id + 3 boolean signal) 누락 → InvalidObservation."""
    from anu_v3.dispatch_callback_contract import InvalidObservation
    # task_id 부재
    with pytest.raises(InvalidObservation):
        evaluate({
            "normal_callback_present": True,
            "fallback_present": True,
            "result_present": True,
        })
    # boolean signal 누락
    with pytest.raises(InvalidObservation):
        evaluate({
            "task_id": "task-2614",
            "normal_callback_present": True,
            # fallback_present 누락
            "result_present": True,
        })


def test_h2_evaluate_none_or_wrong_typed_boolean_signal_fail_closed() -> None:
    """H2: boolean signal 자리에 None / dict / list 등 silent-truthy 잡음 차단."""
    from anu_v3.dispatch_callback_contract import InvalidObservation
    base = {
        "task_id": "task-2614",
        "normal_callback_present": True,
        "fallback_present": True,
        "result_present": True,
    }
    # None
    for k in ("normal_callback_present", "fallback_present", "result_present"):
        bad = dict(base)
        bad[k] = None
        with pytest.raises(InvalidObservation):
            evaluate(bad)
    # dict / list (silently-truthy → 분류 왜곡 봉합)
    for k in ("normal_callback_present", "fallback_present", "result_present"):
        for v in ({"x": 1}, [1, 2]):
            bad = dict(base)
            bad[k] = v
            with pytest.raises(InvalidObservation):
                evaluate(bad)


def test_h2_evaluate_pass_path_byte_0() -> None:
    """H2: 기존 PASS-path (정합 fixture / dict 입력) 는 byte-0."""
    obs = _gap_observation()
    rec = evaluate(obs)
    assert rec.classification == DISPATCH_CONTRACT_VIOLATION  # 기존 (c) 동일


def test_h3_idempotency_atomic_record_order_blocks_concurrent_double_spawn() -> None:
    """H3: mark-then-act atomic — 재진입 시도(spawn_fn 안에서 maybe_spawn
    재호출)에서도 spawn 정확히 1회 강화. write-then-mark 약점 봉합."""
    spawn_calls = []
    obs = _gap_observation()

    # spawn_fn 내부에서 같은 watcher 의 maybe_spawn 을 다시 호출하는
    # 재진입 시뮬레이션 — write-then-mark 였다면 2회 spawn, mark-then-act
    # 이면 두 번째는 duplicate_suppressed.
    captured = {}

    def reentrant_spawn(tid: str, _key: str) -> str:
        spawn_calls.append((tid, _key))
        # 재진입: spawn_fn 실행 중 같은 obs 로 다시 호출
        captured["reentry"] = w.maybe_spawn(obs)
        return "spawned"

    w = RecoveryWatcher(reentrant_spawn)
    r = w.maybe_spawn(obs)
    assert r["spawned"] is True
    assert len(spawn_calls) == 1, (
        "재진입 호출이 H3 atomic mark-then-act 위반으로 두 번째 spawn 호출 발생"
    )
    assert captured["reentry"]["spawned"] is False
    assert captured["reentry"]["duplicate_suppressed"] is True


def test_h3_idempotency_rollback_on_spawn_failure() -> None:
    """H3: spawn_fn 예외 → idempotency 마킹 rollback (재시도 deadlock 0)."""
    attempts = []
    obs = _gap_observation()

    def flaky_spawn(tid: str, _key: str) -> str:
        attempts.append(tid)
        if len(attempts) == 1:
            raise RuntimeError("transient — retry next time")
        return "spawned-on-retry"

    w = RecoveryWatcher(flaky_spawn)
    # 1st: spawn_fn raises, idempotency rollback
    with pytest.raises(RuntimeError):
        w.maybe_spawn(obs)
    assert w.spawned_keys == [], "idempotency rollback 미실행 — 재시도 deadlock"
    # 2nd: 재시도 PASS — spawn 1회 추가, ledger 1개 기록
    r2 = w.maybe_spawn(obs)
    assert r2["spawned"] is True
    assert len(attempts) == 2
    assert len(w.spawned_keys) == 1


# ── task-2620 §2.1 production wiring proof (additive) ───────────────────────
# 회장 verbatim — 파일 존재 단독 PASS 금지. 실 invoke trace 캡처와 동등한
# 정합성을 regression 으로 박제(드리프트 방지).


def _force_workspace_dispatch_package():
    """pytest test-collection 이 tests/ 를 sys.path 에 먼저 추가하여 비어 있는
    tests/dispatch/__init__.py 가 진짜 /home/jay/workspace/dispatch 패키지를
    가리는 환경을 우회한다. 본 헬퍼는 WORKSPACE 의 dispatch 패키지를 직접
    file-location 으로 로드하여 sys.modules 에 명시적으로 등록한다(테스트
    범위 한정, 다른 테스트에 영향 0 — 이미 동일 패키지로 표준화)."""
    import importlib.util
    import sys as _sys

    pkg_init = WORKSPACE / "dispatch" / "__init__.py"
    if not pkg_init.exists():
        return None
    existing = _sys.modules.get("dispatch")
    if existing is not None and getattr(existing, "__file__", "") == str(pkg_init):
        return existing
    # tests/dispatch 가 먼저 로드된 경우 강제 교체
    if existing is not None:
        # sub-modules 까지 제거하여 깨끗하게 재로딩
        for k in list(_sys.modules):
            if k == "dispatch" or k.startswith("dispatch."):
                _sys.modules.pop(k, None)
    spec = importlib.util.spec_from_file_location(
        "dispatch",
        str(pkg_init),
        submodule_search_locations=[str(WORKSPACE / "dispatch")],
    )
    assert spec is not None and spec.loader is not None
    pkg = importlib.util.module_from_spec(spec)
    _sys.modules["dispatch"] = pkg
    spec.loader.exec_module(pkg)
    return pkg


def test_wiring_dispatch_py_shim_reexports_contract_symbols() -> None:
    """Site #1 결선 핵심: dispatch.py 가 dispatch_callback_contract 의 정본
    심볼을 재내보내야 한다(`import dispatch as pkg` 가 아니라 shim 파일 직접
    로드 — 'import dispatch' 는 패키지를 해석한다)."""
    import importlib.machinery
    import importlib.util

    # dispatch.py shim 은 자기 안에서 `from dispatch.core import main` 을
    # 실행하므로 진짜 dispatch 패키지가 sys.modules 에 먼저 로드돼 있어야 한다.
    _force_workspace_dispatch_package()

    shim_path = str(WORKSPACE / "dispatch.py")
    loader = importlib.machinery.SourceFileLoader("dispatch_shim_file_t", shim_path)
    spec = importlib.util.spec_from_loader("dispatch_shim_file_t", loader)
    assert spec is not None
    mod = importlib.util.module_from_spec(spec)
    loader.exec_module(mod)

    assert callable(getattr(mod, "classify_dispatch_contract", None))
    assert callable(getattr(mod, "assert_collector_key_is_independent_anu", None))
    assert getattr(mod, "INDEPENDENT_ANU_KEY", None) == INDEPENDENT_ANU_KEY
    assert getattr(mod, "EXECUTOR_SELF_KEY_FORBIDDEN", None) == EXECUTOR_SELF_KEY_FORBIDDEN
    # 실 invoke — 분류기가 shim 경로에서 동일 결과를 반환
    rec = mod.classify_dispatch_contract(
        task_id="task-2620-wiring", normal_callback_present=True,
        fallback_present=True, result_present=True,
    )
    assert rec.classification == CONTRACT_OK
    # 실 fail-closed
    with pytest.raises(mod.ExecutorSelfKeyForbidden):
        mod.assert_collector_key_is_independent_anu(EXECUTOR_SELF_KEY_FORBIDDEN)


def test_wiring_normal_fallback_helper_defends_against_self_key() -> None:
    """Site #2 결선 핵심: dispatch.normal_fallback_callback_helper.build_anu
    _owned_callback_request 가 dispatch_callback_contract.assert_collector
    _key_is_independent_anu 를 통해 executor self-key 를 추가로 차단해야
    한다. enforce_callback_owner 와 동일 케이스에서 FAIL — strictly 더 강함."""
    _force_workspace_dispatch_package()
    from dispatch.normal_fallback_callback_helper import (
        build_anu_owned_callback_request,
    )

    # Case A: ANU owner_key — PASS path 보존 (byte-0)
    req_pass = build_anu_owned_callback_request(
        kind="normal",
        task_id="task-2620-wiring",
        executor_key=EXECUTOR_SELF_KEY_FORBIDDEN,
        owner_key=INDEPENDENT_ANU_KEY,
        chat_id="6937032012",
        prompt="[task-2620 wiring test] normal",
        at="2026-05-20 23:59:00",
        cron_id="normal-trace-cron",
        normal_collector_cron_id="normal-trace-cron",
        fallback_callback_cron_id="fallback-trace-cron",
        dispatch_cron_id="dispatch-trace-cron",
        prompt_claims_anu_collector=True,
        entry_path="cokacdir_cron_direct",
    )
    assert req_pass.verdict == "PASS"
    assert req_pass.argv is not None
    assert INDEPENDENT_ANU_KEY in req_pass.argv

    # Case B: executor self-key owner — 새 §7b authority 가 차단
    req_self = build_anu_owned_callback_request(
        kind="normal",
        task_id="task-2620-wiring",
        executor_key=EXECUTOR_SELF_KEY_FORBIDDEN,
        owner_key=EXECUTOR_SELF_KEY_FORBIDDEN,
        chat_id="6937032012",
        prompt="[task-2620 wiring test] should never produce argv",
        at="2026-05-20 23:59:00",
    )
    assert req_self.verdict == "FAIL"
    assert req_self.argv is None
    # 추가 reason 이 dispatch_callback_contract 정본을 명시해야 함
    assert any(
        "dispatch_callback_contract.assert_collector_key_is_independent_anu" in r
        for r in req_self.reasons
    )

    # Case C: 임의 non-ANU key — 동일하게 fail-closed
    req_arb = build_anu_owned_callback_request(
        kind="fallback",
        task_id="task-2620-wiring",
        executor_key=EXECUTOR_SELF_KEY_FORBIDDEN,
        owner_key="deadbeefdeadbeef",
        chat_id="6937032012",
        prompt="[task-2620 wiring test] arbitrary non-ANU",
        at="2026-05-20 23:59:00",
    )
    assert req_arb.verdict == "FAIL"
    assert req_arb.argv is None
