# -*- coding: utf-8 -*-
"""Regression — task-2553+44 dispatch/cron-direct callback guard + durable
4-tuple registry.

Covers §5 verbatim items 1~9, 13~15, 17, 18 + §5/§8/9-R invariants. The
cron-direct (cokacdir --cron) path is now a code-level fail-closed gate, and
a durable append-only 4-tuple ledger makes a consumed normal callback
re-confirmable from a later session (회장 §2.1/§2.3).

9-R.1 Layer A: cron_dispatch_guard / callback_4tuple_registry perform ZERO
cron register/remove (validate + append-only record only). Layer B (the
executor's own normal completion callback, §10) is a designed lifecycle
signal and is asserted preserved, executed 0 here.
"""
import hashlib
import importlib.util
import json
import subprocess
import sys
import tempfile
import unittest
from pathlib import Path

_ROOT = Path(__file__).resolve().parents[2]
if str(_ROOT) not in sys.path:
    sys.path.insert(0, str(_ROOT))


def _load(modname: str, relpath: str):
    """Hermetic file-path import (collision-proof vs tests/dispatch shadow)."""
    spec = importlib.util.spec_from_file_location(modname, _ROOT / relpath)
    mod = importlib.util.module_from_spec(spec)
    sys.modules[modname] = mod
    spec.loader.exec_module(mod)
    return mod


# Pre-seed canonical dotted names so internal `from dispatch.* import` and
# `from anu_v3.* import` resolve to the real workspace modules.
_ecc = _load(
    "_p44_ecc", "dispatch/executor_completion_contract.py"
)
sys.modules["dispatch.executor_completion_contract"] = _ecc
_stv = _load("_p44_stv", "dispatch/spec_template_validator.py")
sys.modules["dispatch.spec_template_validator"] = _stv
_reg = _load("_p44_reg", "anu_v3/callback_4tuple_registry.py")
sys.modules["anu_v3.callback_4tuple_registry"] = _reg
_guard = _load("_p44_guard", "dispatch/cron_dispatch_guard.py")

Callback4Tuple = _ecc.Callback4Tuple
guard_dispatch = _guard.guard_dispatch
PATH_CRON_DIRECT = _guard.PATH_CRON_DIRECT
PATH_DISPATCH_PY = _guard.PATH_DISPATCH_PY
Callback4TupleRegistry = _reg.Callback4TupleRegistry
make_record = _reg.make_record
validate_record = _reg.validate_record
NORMAL_CALLBACK_COMPLETED = _reg.NORMAL_CALLBACK_COMPLETED
NO_LEDGER_RECORD = _reg.NO_LEDGER_RECORD
TRACK_MISMATCH = _reg.TRACK_MISMATCH

_MANDATORY_CLAUSE = _stv.MANDATORY_CALLBACK_CLAUSE

NEW_MODULES = [
    _ROOT / "dispatch" / "cron_dispatch_guard.py",
    _ROOT / "anu_v3" / "callback_4tuple_registry.py",
    _ROOT / "anu_v3" / "artifact_root_resolver.py",
    _ROOT / "anu_v3" / "collector_artifact_lookup.py",
]
FROZEN_ANCHOR = _ROOT / "utils" / "anu_delegation_completion_callback.py"
DURABLE_V1 = _ROOT / "memory" / "events" / "task-2553.parallel-batch-state.json"
ECC = _ROOT / "dispatch" / "executor_completion_contract.py"

GIT_HEAD_PRE = "20456b5f83fc039f2fd6f50f4b94095c29b41bfb"
GIT_BRANCH_PRE = "task/task-2553p1-f1-clean-replacement"
FROZEN_ANCHOR_SHA = (
    "83b3e307c8207c76a3e311c408aab4951373bd317896e51687d3007907b0c3d4"
)
DURABLE_V1_SHA = (
    "fe705d84274e8ae367aaa88c77df763b46bdf4c936efaa1dae78458aedd2a3bc"
)
# +32 executor_completion_contract.py must remain byte-0 (task md: byte-0
# 우선, additive only if unavoidable — here untouched).
ECC_SHA = hashlib.sha256(ECC.read_bytes()).hexdigest()

FX42 = _ROOT / "memory" / "fixtures" / "task-2553plus42.normal-callback-consumed.json"
FX43 = _ROOT / "memory" / "fixtures" / "task-2553plus43.normal-callback-consumed.json"


def _sha(p: Path) -> str:
    return hashlib.sha256(p.read_bytes()).hexdigest()


def _git(*args: str) -> str:
    return subprocess.run(
        ["git", "-C", str(_ROOT), *args],
        capture_output=True, text=True, check=True,
    ).stdout.strip()


class CronDispatchGuardAndRegistry(unittest.TestCase):
    @classmethod
    def setUpClass(cls):
        cls.fx42 = json.loads(FX42.read_text(encoding="utf-8"))
        cls.fx43 = json.loads(FX43.read_text(encoding="utf-8"))

    def _good_tuple(self, tid="t"):
        return Callback4Tuple(tid, "D", "N", "F")

    # §5.1 — normal callback clause present dispatch -> PASS
    def test_01_clause_present_pass(self):
        r = guard_dispatch(
            spec_text=_MANDATORY_CLAUSE,
            entry_path=PATH_CRON_DIRECT,
            tuple_=self._good_tuple(),
        )
        self.assertEqual(r.verdict, "PASS")
        self.assertTrue(r.ok)

    # §5.2 — clause missing dispatch -> FAIL
    def test_02_clause_missing_fail(self):
        r = guard_dispatch(
            spec_text="executor 는 작업 후 result.json 을 남긴다.",
            entry_path=PATH_CRON_DIRECT,
            tuple_=self._good_tuple(),
        )
        self.assertEqual(r.verdict, "FAIL")
        self.assertTrue(any("clause MISSING" in x for x in r.reasons))

    # §5.3 — 4-tuple normal_collector_cron_id missing -> FAIL
    def test_03_missing_normal_collector_fail(self):
        r = guard_dispatch(
            spec_text=_MANDATORY_CLAUSE,
            entry_path=PATH_CRON_DIRECT,
            tuple_=Callback4Tuple("t", "D", None, "F"),
        )
        self.assertEqual(r.verdict, "FAIL")
        self.assertTrue(
            any("normal_collector_cron_id missing" in x for x in r.reasons)
        )

    # §5.4 — fallback_cron_id missing -> FAIL, OR explicit no-fallback PASS
    def test_04_missing_fallback_fail_or_explicit_nofallback(self):
        r_fail = guard_dispatch(
            spec_text=_MANDATORY_CLAUSE,
            entry_path=PATH_CRON_DIRECT,
            tuple_=Callback4Tuple("t", "D", "N", ""),
        )
        self.assertEqual(r_fail.verdict, "FAIL")
        # explicit no-fallback contract -> PASS (normal_collector still req'd)
        r_ok = guard_dispatch(
            spec_text=_MANDATORY_CLAUSE,
            entry_path=PATH_CRON_DIRECT,
            tuple_=Callback4Tuple("t", "D", "N", ""),
            no_fallback_contract=True,
        )
        self.assertEqual(r_ok.verdict, "PASS")
        # no-fallback must NOT rescue a missing normal_collector_cron_id
        r_still = guard_dispatch(
            spec_text=_MANDATORY_CLAUSE,
            entry_path=PATH_CRON_DIRECT,
            tuple_=Callback4Tuple("t", "D", None, ""),
            no_fallback_contract=True,
        )
        self.assertEqual(r_still.verdict, "FAIL")

    # §5.5 — NO-CRON registry/checkpoint task still keeps executor callback
    def test_05_no_cron_keeps_executor_callback(self):
        spec = (
            "NO-CRON variant (registry/checkpoint cron 0). 그래도 executor "
            "는 완료 직후 normal completion callback 을 반드시 발사한다 "
            "(mandatory)."
        )
        r = guard_dispatch(
            spec_text=spec,
            entry_path=PATH_CRON_DIRECT,
            tuple_=self._good_tuple(),
        )
        self.assertEqual(r.verdict, "PASS")

    # §5.6 — legacy/cokacdir direct path that bypasses contract -> FAIL
    def test_06_unknown_path_bypass_fail(self):
        r = guard_dispatch(
            spec_text=_MANDATORY_CLAUSE,
            entry_path="legacy_bot_raw_cokacdir",
            tuple_=self._good_tuple(),
        )
        self.assertEqual(r.verdict, "FAIL")
        self.assertTrue(any("bypass" in x for x in r.reasons))
        # the recognised cron-direct path with full contract -> PASS
        r_ok = guard_dispatch(
            spec_text=_MANDATORY_CLAUSE,
            entry_path=PATH_CRON_DIRECT,
            tuple_=self._good_tuple(),
        )
        self.assertEqual(r_ok.verdict, "PASS")

    # §5.7 — +42 fixture: consumed but no durable registry -> gap reproduced
    def test_07_plus42_gap_reproduced(self):
        with tempfile.TemporaryDirectory() as d:
            led = Callback4TupleRegistry(Path(d) / "ledger.jsonl")
            v = led.classify(
                task_id=self.fx42["gap_case"]["task_id"],
                expected_chat_id=self.fx42["chat_id"],
            )
            self.assertEqual(v, NO_LEDGER_RECORD)
            self.assertEqual(
                v, self.fx42["gap_case"]["classify_expected_without_ledger"]
            )

    # §5.8 — +42 fixture fixed -> registry NORMAL_CALLBACK_COMPLETED
    def test_08_plus42_fixed_completed(self):
        with tempfile.TemporaryDirectory() as d:
            led = Callback4TupleRegistry(Path(d) / "ledger.jsonl")
            rc = self.fx42["resolved_case"]["ledger_record"]
            rec = make_record(
                task_id=rc["task_id"], dispatch_id=rc["dispatch_id"],
                dispatch_cron_id=rc["dispatch_cron_id"],
                executor=rc["executor"], chat_id=rc["chat_id"],
                normal_collector_cron_id=rc["normal_collector_cron_id"],
                fallback_callback_cron_id=rc["fallback_callback_cron_id"],
                role=rc["role"], status="COMPLETED", ts_kst=rc["ts_kst"],
            )
            self.assertEqual(validate_record(rec), [])
            led.append(rec)
            v = led.classify(
                task_id=rc["task_id"],
                expected_dispatch_id=rc["dispatch_id"],
                expected_chat_id=rc["chat_id"],
            )
            self.assertEqual(v, NORMAL_CALLBACK_COMPLETED)
            # durable history survives even after the one-shot cron is gone
            self.assertEqual(len(led.history_for(rc["task_id"])), 1)

    # §5.9 — +43 fixture fixed -> registry NORMAL_CALLBACK_COMPLETED
    def test_09_plus43_fixed_completed(self):
        with tempfile.TemporaryDirectory() as d:
            led = Callback4TupleRegistry(Path(d) / "ledger.jsonl")
            rc = self.fx43["resolved_case"]["ledger_record"]
            rec = make_record(
                task_id=rc["task_id"], dispatch_id=rc["dispatch_id"],
                dispatch_cron_id=rc["dispatch_cron_id"],
                executor=rc["executor"], chat_id=rc["chat_id"],
                normal_collector_cron_id=rc["normal_collector_cron_id"],
                fallback_callback_cron_id=rc["fallback_callback_cron_id"],
                role=rc["role"], status="COMPLETED", ts_kst=rc["ts_kst"],
            )
            led.append(rec)
            self.assertEqual(
                led.classify(task_id=rc["task_id"],
                             expected_chat_id=rc["chat_id"]),
                NORMAL_CALLBACK_COMPLETED,
            )

    # §5.13 — task_id mismatch -> TRACK_MISMATCH
    def test_13_task_id_mismatch(self):
        with tempfile.TemporaryDirectory() as d:
            led = Callback4TupleRegistry(Path(d) / "ledger.jsonl")
            led.append(make_record(
                task_id="task-2553+42", dispatch_id="7A4D1633",
                dispatch_cron_id="7A4D1633", executor="x",
                chat_id="6937032012", normal_collector_cron_id="0B466540",
                fallback_callback_cron_id="FB", role="executor",
                status="COMPLETED",
            ))
            # unrelated task -> no record -> NO_LEDGER_RECORD (fail-safe,
            # never misjudged as completed; an unrelated callback not cited)
            self.assertEqual(
                led.classify(task_id="task-2553+99"), NO_LEDGER_RECORD
            )
            # a record EXISTS but explicit expected_task_id differs ->
            # TRACK_MISMATCH (Codex [1]/[5], regression 13 strengthened)
            self.assertEqual(
                led.classify(
                    task_id="task-2553+42",
                    expected_task_id="task-2553+OTHER",
                ),
                TRACK_MISMATCH,
            )
            ok, cls = led.validate_identity(
                "task-2553+42",
                {"task_id": "task-2553+99", "dispatch_cron_id": "7A4D1633",
                 "normal_collector_cron_id": "0B466540",
                 "fallback_callback_cron_id": "FB"},
            )
            self.assertFalse(ok)
            self.assertEqual(cls, TRACK_MISMATCH)

    # §5.14 — dispatch_id mismatch -> TRACK_MISMATCH
    def test_14_dispatch_id_mismatch(self):
        with tempfile.TemporaryDirectory() as d:
            led = Callback4TupleRegistry(Path(d) / "ledger.jsonl")
            led.append(make_record(
                task_id="task-2553+42", dispatch_id="7A4D1633",
                dispatch_cron_id="7A4D1633", executor="x",
                chat_id="6937032012", normal_collector_cron_id="0B466540",
                fallback_callback_cron_id="FB", role="executor",
                status="COMPLETED",
            ))
            self.assertEqual(
                led.classify(task_id="task-2553+42",
                             expected_dispatch_id="DEADBEEF"),
                TRACK_MISMATCH,
            )

    # §5.15 — chat_id mismatch -> TRACK_MISMATCH
    def test_15_chat_id_mismatch(self):
        with tempfile.TemporaryDirectory() as d:
            led = Callback4TupleRegistry(Path(d) / "ledger.jsonl")
            led.append(make_record(
                task_id="task-2553+42", dispatch_id="7A4D1633",
                dispatch_cron_id="7A4D1633", executor="x",
                chat_id="6937032012", normal_collector_cron_id="0B466540",
                fallback_callback_cron_id="FB", role="executor",
                status="COMPLETED",
            ))
            self.assertEqual(
                led.classify(task_id="task-2553+42",
                             expected_chat_id="9999999999"),
                TRACK_MISMATCH,
            )

    # §5.6b — guard PASS must NOT durably record an inconsistent/invalid
    # ledger record (Codex [2] fail-closed ledger integrity)
    def test_06b_guard_ledger_reconciliation_fail_closed(self):
        with tempfile.TemporaryDirectory() as d:
            led = Callback4TupleRegistry(Path(d) / "ledger.jsonl")
            good_tuple = Callback4Tuple(
                "task-2553+44_46", "DISP", "NORM", "FB"
            )
            # mismatched record vs the validated 4-tuple -> FAIL, no record
            bad_rec = make_record(
                task_id="task-2553+44_46", dispatch_id="DID",
                dispatch_cron_id="WRONG-DISP", executor="anu",
                chat_id="6937032012", normal_collector_cron_id="NORM",
                fallback_callback_cron_id="FB", role="executor",
            )
            r_bad = guard_dispatch(
                spec_text=_MANDATORY_CLAUSE, entry_path=PATH_CRON_DIRECT,
                tuple_=good_tuple, ledger=led, ledger_record=bad_rec,
            )
            self.assertEqual(r_bad.verdict, "FAIL")
            self.assertFalse(r_bad.recorded_to_ledger)
            self.assertTrue(
                any("ledger/4-tuple mismatch" in x for x in r_bad.reasons)
            )
            self.assertEqual(led.history_for("task-2553+44_46"), [])
            # consistent record -> PASS and durably recorded
            ok_rec = make_record(
                task_id="task-2553+44_46", dispatch_id="DID",
                dispatch_cron_id="DISP", executor="anu",
                chat_id="6937032012", normal_collector_cron_id="NORM",
                fallback_callback_cron_id="FB", role="executor",
                status="COMPLETED",
            )
            r_ok = guard_dispatch(
                spec_text=_MANDATORY_CLAUSE, entry_path=PATH_CRON_DIRECT,
                tuple_=good_tuple, ledger=led, ledger_record=ok_rec,
            )
            self.assertEqual(r_ok.verdict, "PASS")
            self.assertTrue(r_ok.recorded_to_ledger)
            self.assertEqual(
                led.classify(task_id="task-2553+44_46",
                             expected_chat_id="6937032012"),
                NORMAL_CALLBACK_COMPLETED,
            )

    # §5.6c — registry validation that cannot run -> fail-closed, NO record
    # (Codex [2] re-adjudication: no fail-open silent pass)
    def test_06c_guard_validation_unrunnable_fail_closed(self):
        class _Bogus:  # not a Callback4TupleRecord -> validate_record raises
            def identity(self):
                return {}

        with tempfile.TemporaryDirectory() as d:
            led = Callback4TupleRegistry(Path(d) / "ledger.jsonl")
            r = guard_dispatch(
                spec_text=_MANDATORY_CLAUSE, entry_path=PATH_CRON_DIRECT,
                tuple_=Callback4Tuple("t", "D", "N", "F"),
                ledger=led, ledger_record=_Bogus(),
            )
            self.assertEqual(r.verdict, "FAIL")
            self.assertFalse(r.recorded_to_ledger)
            self.assertTrue(
                any("fail-closed" in x for x in r.reasons),
                r.reasons,
            )
            self.assertFalse((Path(d) / "ledger.jsonl").exists())

    # §5.17 — dispatch.py import/help smoke PASS
    def test_17_dispatch_py_smoke(self):
        r = subprocess.run(
            [sys.executable, str(_ROOT / "dispatch.py"), "--help"],
            capture_output=True, text=True, cwd=str(_ROOT),
        )
        self.assertEqual(r.returncode, 0, r.stderr[-800:])

    # §5.18 — existing +32 mandatory callback regression no-regress
    def test_18_plus32_no_regression(self):
        r = subprocess.run(
            [sys.executable, "-m", "pytest", "-q",
             "tests/regression/test_executor_completion_callback_mandatory_2553plus32.py"],
            capture_output=True, text=True, cwd=str(_ROOT),
        )
        self.assertEqual(r.returncode, 0, (r.stdout + r.stderr)[-1500:])

    # §5.19 — callback/fallback/cancel-on-success structure preserved
    def test_19_structure_preserved(self):
        # guard never strips fallback; an explicit no-fallback is opt-in only.
        r = guard_dispatch(
            spec_text=_MANDATORY_CLAUSE, entry_path=PATH_DISPATCH_PY,
            tuple_=self._good_tuple(),
        )
        self.assertFalse(r.no_fallback_contract)
        self.assertEqual(r.verdict, "PASS")
        # a missing fallback is FAIL unless explicitly contracted away —
        # the safety path is preserved by default (not silently dropped).
        r2 = guard_dispatch(
            spec_text=_MANDATORY_CLAUSE, entry_path=PATH_DISPATCH_PY,
            tuple_=Callback4Tuple("t", "D", "N", ""),
        )
        self.assertEqual(r2.verdict, "FAIL")

    # ── invariants (§5 / §8 / 9-R) ───────────────────────────────────────
    def test_inv_git_head_branch_equal(self):
        self.assertEqual(_git("rev-parse", "HEAD"), GIT_HEAD_PRE)
        self.assertEqual(
            _git("rev-parse", "--abbrev-ref", "HEAD"), GIT_BRANCH_PRE
        )

    def test_inv_frozen_anchor_durable_unmutated(self):
        self.assertEqual(_sha(FROZEN_ANCHOR), FROZEN_ANCHOR_SHA)
        self.assertEqual(_sha(DURABLE_V1), DURABLE_V1_SHA)

    def test_inv_ecc_byte0_unmutated(self):
        # existing executor_completion_contract.py byte-0 (no additive patch
        # was needed — guard composes on top of it).
        self.assertEqual(_sha(ECC), ECC_SHA)

    def test_inv_new_modules_git_untracked(self):
        for m in NEW_MODULES:
            rc = subprocess.run(
                ["git", "-C", str(_ROOT), "ls-files", "--error-unmatch",
                 str(m.relative_to(_ROOT))],
                capture_output=True, text=True,
            ).returncode
            self.assertNotEqual(
                rc, 0, f"{m.name} must be git-untracked (§8 invariant)"
            )

    # 9-R.1 Layer A — deliverable modules execute ZERO cron/process.
    # Scan real execution syntax (not explanatory prose: the words
    # "cron"/"cokacdir"/"subprocess" legitimately appear in docstrings
    # stating the NO-CRON guarantee — same caveat as +32 test_04).
    def test_inv_layer_a_no_cron_side_effect(self):
        exec_syntax = (
            "import subprocess", "subprocess.run", "subprocess.Popen",
            "subprocess.call", "subprocess.check", "os.system(",
            "os.popen(", ".Popen(", '"--cron', "'--cron",
            '"--cron-remove', "'--cron-remove",
        )
        for m in NEW_MODULES:
            txt = m.read_text(encoding="utf-8")
            for tok in exec_syntax:
                self.assertNotIn(
                    tok, txt,
                    f"{m.name} (Layer A) must not execute cron/process "
                    f"({tok!r}) — 9-R.1",
                )


if __name__ == "__main__":
    unittest.main(verbosity=2)
