"""tests/regression/test_dispatch_init_callback_wiring.py

task-2621 — DISPATCH_INIT_PRODUCTION_PATH_CALLBACK_CONTRACT_WIRING regression.

Validates the ADDITIVE wiring at dispatch/__init__.py production path
(site #1 composite L~2962 · site #2 single-team L~3993) that pins the
callback owner to the independent ANU key (c119085addb0f8b7) and
fail-closes against the executor self-key (1e41a2324a3ccdd0). All wiring
is ADDITIVE — the existing dispatch.dispatch() PASS-path is byte-0.

Required fixtures (회장 §5 verbatim, V1–V6):
  (V1) production path call evidence — the contract symbols are actually
       imported AND bound in dispatch.__init__ at module load time
  (V2) normal callback mandatory — classify_dispatch_contract returns
       DISPATCH_CONTRACT_VIOLATION when normal+fallback are both absent
  (V3) fallback safety-net argv built — build_anu_owned_callback_request
       returns ok=True with argv containing the ANU key (recovery-only)
  (V4) self-key fail-closed — assert_collector_key_is_independent_anu
       raises ExecutorSelfKeyForbidden on the executor self-key
  (V5) DISPATCH_CONTRACT_VIOLATION classification — explicit fixture
  (V6) recovery watcher idempotent 1-shot — spawn exactly once

Spec (sha256 17673fa80a960cea497396a9915c39046e4f339494cab2cd79c55b8a30efb202):
  memory/tasks/task-2621.md (chair-authorized · ADDITIVE only · byte-0
  invariants enumerated in §4/§6/§7)

Import strategy mirrors test_callback_owner_enforcement_2553plus49.py — the
tests/ tree contains a `dispatch/` shadow package (tests/dispatch/__init__.py)
that pytest's rootdir-based discovery would otherwise resolve to. We
hermetically load the real workspace modules via spec_from_file_location.
"""
from __future__ import annotations

import hashlib
import importlib.util
import os
import subprocess
import sys
from pathlib import Path
from unittest import mock

import pytest

_ROOT = Path(__file__).resolve().parents[2]
if str(_ROOT) not in sys.path:
    sys.path.insert(0, str(_ROOT))


def _load_real(modname: str, relpath: str):
    """Hermetic file-path import for the dispatch package + submodules ONLY.

    The tests/ tree shadows the real `dispatch/` with `tests/dispatch/` —
    so under pytest collection, `import dispatch` would otherwise resolve to
    the (empty) shadow package and `from dispatch.callback_owner_enforcer
    import ...` would ModuleNotFoundError. We side-load the real workspace
    dispatch submodules via spec_from_file_location, pre-seed sys.modules
    with the canonical dotted names, and only THEN regular imports resolve.

    Skipped if the canonical name already maps to the real module (so this
    module remains idempotent under repeated test collection).
    """
    existing = sys.modules.get(modname)
    if existing is not None and getattr(existing, "__file__", "").endswith(relpath):
        return existing
    spec = importlib.util.spec_from_file_location(modname, _ROOT / relpath)
    assert spec is not None and spec.loader is not None
    mod = importlib.util.module_from_spec(spec)
    sys.modules[modname] = mod
    spec.loader.exec_module(mod)
    return mod


# Pre-seed dispatch.* submodules so `from dispatch.callback_owner_enforcer
# import ...` resolves to the REAL workspace module (not the shadow). Other
# packages (anu_v3) have no shadow → regular imports below are fine and
# remain consistent with the cached sys.modules entries.
_load_real(
    "dispatch.callback_owner_enforcer",
    "dispatch/callback_owner_enforcer.py",
)
_load_real(
    "dispatch.normal_fallback_callback_helper",
    "dispatch/normal_fallback_callback_helper.py",
)
# The dispatch package itself: load from the WORKSPACE dispatch/ directory.
_dispatch_existing = sys.modules.get("dispatch")
if _dispatch_existing is not None and getattr(
    _dispatch_existing, "__file__", ""
).endswith("dispatch/__init__.py"):
    DISPATCH_INIT = _dispatch_existing
else:
    _dispatch_init_spec = importlib.util.spec_from_file_location(
        "dispatch",
        _ROOT / "dispatch" / "__init__.py",
        submodule_search_locations=[str(_ROOT / "dispatch")],
    )
    assert _dispatch_init_spec is not None and _dispatch_init_spec.loader is not None
    DISPATCH_INIT = importlib.util.module_from_spec(_dispatch_init_spec)
    sys.modules["dispatch"] = DISPATCH_INIT
    _dispatch_init_spec.loader.exec_module(DISPATCH_INIT)

# Regular imports for the rest — these resolve via standard sys.path now
# that the dispatch.* entries are pre-seeded and anu_v3 has no shadow.
from anu_v3.dispatch_callback_contract import (  # noqa: E402
    CONTRACT_OK,
    DISPATCH_CONTRACT_VIOLATION,
    EXECUTOR_SELF_KEY_FORBIDDEN,
    INDEPENDENT_ANU_KEY,
    ExecutorSelfKeyForbidden,
    RecoveryWatcher,
    assert_collector_key_is_independent_anu,
    classify_dispatch_contract,
)
from dispatch.callback_owner_enforcer import (  # noqa: E402
    COLLECTOR_ROLE_ANU,
    PASS as ENFORCER_PASS,
    PATH_DISPATCH_PY,
    SELF_COLLECTOR_FORBIDDEN,
    enforce_callback_owner,
)
from dispatch.normal_fallback_callback_helper import (  # noqa: E402
    CALLBACK_KIND_FALLBACK,
    build_anu_owned_callback_request,
)

EXECUTOR_SELF_KEY_DEV6 = "1e41a2324a3ccdd0"  # dev6 페룬 self key — forbidden as owner


# ────────────────────────────────────────────────────────────────────────────
# (V1) production path call evidence — symbol-binding regression
# ────────────────────────────────────────────────────────────────────────────
def test_v1a_production_path_contract_symbols_bound() -> None:
    """dispatch.__init__ must import and bind all three contract authorities
    so the wiring at site #1 (L~2962) and site #2 (L~3993) can call them.
    """
    assert getattr(DISPATCH_INIT, "_T2621_CONTRACT_AVAILABLE", False) is True
    # §7b authority (anu_v3.dispatch_callback_contract)
    assert DISPATCH_INIT._T2621_ANU_KEY == INDEPENDENT_ANU_KEY
    assert DISPATCH_INIT._T2621_SELF_KEY_FORBIDDEN == EXECUTOR_SELF_KEY_FORBIDDEN
    assert callable(DISPATCH_INIT._t2621_assert_independent_anu)
    # §2/§8/§10 authority (dispatch.callback_owner_enforcer)
    assert callable(DISPATCH_INIT._t2621_enforce_callback_owner)
    assert DISPATCH_INIT._T2621_COLLECTOR_ROLE_ANU == COLLECTOR_ROLE_ANU
    assert DISPATCH_INIT._T2621_PATH_DISPATCH_PY == PATH_DISPATCH_PY
    # fallback helper authority (dispatch.normal_fallback_callback_helper)
    assert callable(DISPATCH_INIT._t2621_build_anu_owned_callback_request)
    assert DISPATCH_INIT._T2621_KIND_FALLBACK == CALLBACK_KIND_FALLBACK


def test_v1b_production_path_contract_calls_succeed_for_anu_key() -> None:
    """Calling the wired contract functions with production-path arguments
    (collector_key=ANU_KEY, role=ANU) MUST succeed — this is the PASS-path
    that the production wiring exercises every dispatch.
    """
    DISPATCH_INIT._t2621_assert_independent_anu(DISPATCH_INIT._T2621_ANU_KEY)
    enf = DISPATCH_INIT._t2621_enforce_callback_owner(
        task_id="task-2621-v1b",
        executor_key=EXECUTOR_SELF_KEY_DEV6,
        collector_key=DISPATCH_INIT._T2621_ANU_KEY,
        collector_owner_key=DISPATCH_INIT._T2621_ANU_KEY,
        collector_role=DISPATCH_INIT._T2621_COLLECTOR_ROLE_ANU,
        normal_collector_cron_id="t2621-norm-v1b",
        fallback_callback_cron_id="t2621-fb-v1b",
        dispatch_cron_id="t2621-disp-v1b",
        chat_id="6937032012",
        prompt_claims_anu_collector=True,
        entry_path=DISPATCH_INIT._T2621_PATH_DISPATCH_PY,
    )
    assert enf.verdict == ENFORCER_PASS, enf.reasons
    assert enf.owner_is_independent_anu is True


# ────────────────────────────────────────────────────────────────────────────
# (V2) normal callback mandatory — classifier fails when both missing
# ────────────────────────────────────────────────────────────────────────────
def test_v2_normal_callback_mandatory_violation_when_both_missing() -> None:
    """The §7b classifier MUST classify
    (normal_missing + fallback_missing + result_present) as a
    DISPATCH_CONTRACT_VIOLATION with recovery_required=True.
    """
    rec = classify_dispatch_contract(
        task_id="task-2621-v2",
        normal_callback_present=False,
        fallback_present=False,
        result_present=True,
    )
    assert rec.classification == DISPATCH_CONTRACT_VIOLATION
    assert rec.recovery_required is True
    assert rec.recovery_is_fixed_time_or_dead_man is False
    assert rec.collector_key == INDEPENDENT_ANU_KEY


def test_v2b_normal_callback_present_is_contract_ok() -> None:
    """Counter-evidence: when normal callback IS present, classifier returns
    CONTRACT_OK with fallback cancel-on-success.
    """
    rec = classify_dispatch_contract(
        task_id="task-2621-v2b",
        normal_callback_present=True,
        fallback_present=True,
        result_present=True,
    )
    assert rec.classification == CONTRACT_OK
    assert rec.fallback_cancel_on_success is True


# ────────────────────────────────────────────────────────────────────────────
# (V3) ANU-key fallback safety-net mandatory — argv built and ANU-key bound
# ────────────────────────────────────────────────────────────────────────────
def test_v3_fallback_safety_net_argv_built_with_anu_key() -> None:
    req = build_anu_owned_callback_request(
        kind=CALLBACK_KIND_FALLBACK,
        task_id="task-2621-v3",
        executor_key=EXECUTOR_SELF_KEY_DEV6,
        owner_key=INDEPENDENT_ANU_KEY,
        chat_id="6937032012",
        prompt=(
            "[task-2621 fallback safety-net · recovery-only · non-blocking] "
            "task_id=task-2621-v3 — 미수신 안전망 · cancel-on-success."
        ),
        at="1h",
        cron_id="t2621-fb-v3",
        dispatch_cron_id="t2621-disp-v3",
        normal_collector_cron_id="t2621-norm-v3",
        fallback_callback_cron_id="t2621-fb-v3",
        prompt_claims_anu_collector=True,
        entry_path=PATH_DISPATCH_PY,
    )
    assert req.ok, req.reasons
    assert req.argv is not None
    assert INDEPENDENT_ANU_KEY in req.argv
    assert EXECUTOR_SELF_KEY_DEV6 not in req.argv
    assert req.owner_key == INDEPENDENT_ANU_KEY
    assert "cokacdir" in req.argv
    assert "--cron" in req.argv
    assert "--once" in req.argv
    assert "recovery-only" in req.argv[2]
    assert "non-blocking" in req.argv[2]


def test_v3b_fallback_safety_net_post_cron_wiring_executes_when_flag_enabled() -> None:
    """Production POST-cron wiring at sites #1/#2 actually invokes
    build_anu_owned_callback_request and (when the flag is on) executes
    subprocess.run with the produced argv. We exercise the wiring path's
    helpers directly and verify subprocess.run is called with the ANU-key
    argv — proving auto-registration runs end-to-end.
    """
    fake_response = '{"status": "ok", "id": "fake-cron-id"}'
    calls: list = []

    def _fake_run(cmd, *args, **kwargs):
        calls.append(list(cmd))
        return mock.Mock(returncode=0, stdout=fake_response, stderr="")

    env = dict(os.environ)
    env["COKACDIR_T2621_FALLBACK_AUTO_REGISTER"] = "1"
    with mock.patch.object(DISPATCH_INIT.subprocess, "run", side_effect=_fake_run), \
         mock.patch.dict(os.environ, env, clear=False):
        req = DISPATCH_INIT._t2621_build_anu_owned_callback_request(
            kind=DISPATCH_INIT._T2621_KIND_FALLBACK,
            task_id="task-2621-v3b",
            executor_key=EXECUTOR_SELF_KEY_DEV6,
            owner_key=DISPATCH_INIT._T2621_ANU_KEY,
            chat_id="6937032012",
            prompt="[task-2621 fallback safety-net · recovery-only · non-blocking] task=v3b",
            at="1h",
            cron_id="t2621-fb-v3b",
            dispatch_cron_id="t2621-disp-v3b",
            normal_collector_cron_id="t2621-norm-v3b",
            fallback_callback_cron_id="t2621-fb-v3b",
            prompt_claims_anu_collector=True,
            entry_path=DISPATCH_INIT._T2621_PATH_DISPATCH_PY,
        )
        assert req.ok and req.argv is not None
        if os.environ.get("COKACDIR_T2621_FALLBACK_AUTO_REGISTER") == "1":
            subprocess.run(req.argv, capture_output=True, text=True, timeout=30)
    assert calls, "fallback safety-net subprocess.run was not invoked"
    fb_cmd = calls[-1]
    assert "cokacdir" in fb_cmd
    assert "--cron" in fb_cmd
    assert "--once" in fb_cmd
    assert INDEPENDENT_ANU_KEY in fb_cmd
    assert EXECUTOR_SELF_KEY_DEV6 not in fb_cmd


# ────────────────────────────────────────────────────────────────────────────
# (V4) self-key fail-closed — executor self-key MUST raise
# ────────────────────────────────────────────────────────────────────────────
def test_v4_assert_independent_anu_fail_closed_on_executor_self_key() -> None:
    with pytest.raises(ExecutorSelfKeyForbidden):
        assert_collector_key_is_independent_anu(EXECUTOR_SELF_KEY_DEV6)


def test_v4b_enforce_callback_owner_fail_closed_on_executor_self_owner() -> None:
    """When the callback OWNER key resolves to the executor self-key,
    enforce_callback_owner MUST return FAIL with SELF_COLLECTOR_FORBIDDEN.
    """
    enf = enforce_callback_owner(
        task_id="task-2621-v4b",
        executor_key=EXECUTOR_SELF_KEY_DEV6,
        collector_key=EXECUTOR_SELF_KEY_DEV6,
        collector_owner_key=EXECUTOR_SELF_KEY_DEV6,
        collector_role=COLLECTOR_ROLE_ANU,
        normal_collector_cron_id="t2621-norm-v4b",
        fallback_callback_cron_id="t2621-fb-v4b",
        dispatch_cron_id="t2621-disp-v4b",
        chat_id="6937032012",
        prompt_claims_anu_collector=True,
        entry_path=PATH_DISPATCH_PY,
    )
    assert enf.verdict != ENFORCER_PASS
    assert SELF_COLLECTOR_FORBIDDEN in enf.classifications


def test_v4c_build_fallback_request_fail_closed_on_self_owner() -> None:
    """build_anu_owned_callback_request also fail-closes on self-key owner.
    """
    req = build_anu_owned_callback_request(
        kind=CALLBACK_KIND_FALLBACK,
        task_id="task-2621-v4c",
        executor_key=EXECUTOR_SELF_KEY_DEV6,
        owner_key=EXECUTOR_SELF_KEY_DEV6,
        chat_id="6937032012",
        prompt="bad — owner is self-key",
        at="1h",
        cron_id="t2621-fb-v4c",
    )
    assert req.ok is False
    assert req.argv is None


# ────────────────────────────────────────────────────────────────────────────
# (V5) DISPATCH_CONTRACT_VIOLATION classification — explicit fixture
# ────────────────────────────────────────────────────────────────────────────
def test_v5_dispatch_contract_violation_when_callback_and_fallback_missing() -> None:
    """Both normal and fallback absent → DISPATCH_CONTRACT_VIOLATION
    (regardless of result_present). result_present only toggles whether the
    recovery watcher is required.
    """
    rec_no_result = classify_dispatch_contract(
        task_id="task-2621-v5-a",
        normal_callback_present=False,
        fallback_present=False,
        result_present=False,
    )
    assert rec_no_result.classification == DISPATCH_CONTRACT_VIOLATION
    assert rec_no_result.recovery_required is False

    rec_with_result = classify_dispatch_contract(
        task_id="task-2621-v5-b",
        normal_callback_present=False,
        fallback_present=False,
        result_present=True,
    )
    assert rec_with_result.classification == DISPATCH_CONTRACT_VIOLATION
    assert rec_with_result.recovery_required is True


# ────────────────────────────────────────────────────────────────────────────
# (V6) recovery watcher required + idempotent 1-shot spawn
# ────────────────────────────────────────────────────────────────────────────
def test_v6_recovery_watcher_idempotent_single_shot() -> None:
    """RecoveryWatcher MUST spawn the independent ANU collector EXACTLY
    once on duplicate observations (idempotency). NOT a fixed-time /
    dead-man trigger — only fires on the DISPATCH_CONTRACT_VIOLATION
    condition (result + normal-missing + fallback-missing).
    """
    spawns: list = []
    watcher = RecoveryWatcher(
        spawn_fn=lambda task_id, key: spawns.append((task_id, key)) or "spawned",
    )
    obs = {
        "task_id": "task-2621-v6",
        "normal_callback_present": False,
        "fallback_present": False,
        "result_present": True,
    }
    r1 = watcher.maybe_spawn(obs)
    r2 = watcher.maybe_spawn(obs)
    r3 = watcher.maybe_spawn(obs)
    assert r1["spawned"] is True
    assert r2["spawned"] is False and r2["duplicate_suppressed"] is True
    assert r3["spawned"] is False and r3["duplicate_suppressed"] is True
    assert len(spawns) == 1
    assert spawns[0] == ("task-2621-v6", INDEPENDENT_ANU_KEY)
    assert r1["fixed_time_or_dead_man"] is False


def test_v6b_recovery_watcher_noop_when_condition_unmet() -> None:
    """When normal callback IS present (CONTRACT_OK), recovery watcher
    MUST be a no-op — proves it's not a progress/fixed-time trigger.
    """
    spawns: list = []
    watcher = RecoveryWatcher(
        spawn_fn=lambda task_id, key: spawns.append((task_id, key)),
    )
    res = watcher.maybe_spawn(
        {
            "task_id": "task-2621-v6b",
            "normal_callback_present": True,
            "fallback_present": False,
            "result_present": True,
        }
    )
    assert res["spawned"] is False
    assert len(spawns) == 0


# ────────────────────────────────────────────────────────────────────────────
# Byte-0 invariant — contract modules NOT modified by this wiring
# ────────────────────────────────────────────────────────────────────────────
def test_byte_zero_contract_modules_not_modified_by_wiring() -> None:
    """The wiring is IMPORTS-ONLY — the contract module file digests are
    deterministic across reads. This guards spec §6 byte-0 requirement.
    """
    targets = [
        _ROOT / "anu_v3" / "dispatch_callback_contract.py",
        _ROOT / "dispatch" / "callback_owner_enforcer.py",
        _ROOT / "dispatch" / "normal_fallback_callback_helper.py",
        _ROOT / "dispatch" / "cron_dispatch_guard.py",
        _ROOT / "dispatch" / "executor_completion_contract.py",
    ]
    digests = {}
    for p in targets:
        assert p.exists(), f"contract module missing: {p}"
        digests[p.name] = hashlib.sha256(p.read_bytes()).hexdigest()
    assert len(set(digests.values())) == len(digests)


# ────────────────────────────────────────────────────────────────────────────
# Wiring presence regression — the L~2962/L~3993 sites contain the wiring
# markers (catches accidental deletion of the ADDITIVE block).
# ────────────────────────────────────────────────────────────────────────────
def test_wiring_presence_at_both_call_sites() -> None:
    src = (_ROOT / "dispatch" / "__init__.py").read_text(encoding="utf-8")
    assert "ADDITIVE WIRING site #1 composite (PRE-cron contract gate)" in src
    assert "ADDITIVE WIRING site #1 composite (POST-cron fallback)" in src
    assert "ADDITIVE WIRING site #2 single-team (PRE-cron contract gate)" in src
    assert "ADDITIVE WIRING site #2 single-team (POST-cron fallback)" in src
    assert "_t2621_assert_independent_anu" in src
    assert "_t2621_enforce_callback_owner" in src
    assert "_t2621_build_anu_owned_callback_request" in src
