# -*- coding: utf-8 -*-
"""Regression — task-2553+47 NORMAL_CALLBACK_REGISTRY_WRITEBACK_AND_EVENT_
TRIGGER.

Covers 회장 §8 verbatim items 1~10 + §5/§6/9-R.1 invariants:

  1  normal collector success -> registry COMPLETED write-back
  2  completed write-back -> next_action READY
  3  fallback pending + normal completed -> non-blocking
  4  dead-man only -> NOT a primary trigger
  5  fixed-time gate as dependency trigger -> FAIL
  6  repeated scan idempotent
  7  missing fallback binding is NOT a cancel target
  8  +45 14:24 case -> automatic next-action detection reproduced
  9  registry mismatch -> TRACK_MISMATCH
  10 callback mandatory rule no-regression

9-R.1 Layer A: callback_event_trigger performs ZERO cron register/remove,
ZERO dispatch/merge/subprocess. The only write surface is the +44
append-only ledger via write_back_completed (the +44 module stays byte-0).
A next_action is a PROPOSAL ONLY (no auto dispatch/merge/closeout, §6).
"""
import ast
import hashlib
import importlib.util
import json
import subprocess
import sys
import tempfile
import unittest
from pathlib import Path
from typing import Optional

import jsonschema

_ROOT = Path(__file__).resolve().parents[2]
if str(_ROOT) not in sys.path:
    sys.path.insert(0, str(_ROOT))


def _load(modname: str, relpath: str):
    spec = importlib.util.spec_from_file_location(modname, _ROOT / relpath)
    mod = importlib.util.module_from_spec(spec)
    sys.modules[modname] = mod
    spec.loader.exec_module(mod)
    return mod


# Pre-seed the canonical dotted name so callback_event_trigger's
# `from anu_v3.callback_4tuple_registry import ...` resolves to the real
# workspace module (hermetic, collision-proof).
_reg = _load("anu_v3.callback_4tuple_registry",
             "anu_v3/callback_4tuple_registry.py")
_cet = _load("anu_v3.callback_event_trigger",
             "anu_v3/callback_event_trigger.py")

Callback4TupleRegistry = _reg.Callback4TupleRegistry
make_record = _reg.make_record

write_back_completed = _cet.write_back_completed
CallbackEventTrigger = _cet.CallbackEventTrigger
proposal_envelope = _cet.proposal_envelope
WRITEBACK_COMPLETED = _cet.WRITEBACK_COMPLETED
WRITEBACK_IDEMPOTENT_SKIP = _cet.WRITEBACK_IDEMPOTENT_SKIP
CALLBACK_MANDATORY_VIOLATION = _cet.CALLBACK_MANDATORY_VIOLATION
NEXT_ACTION_READY = _cet.NEXT_ACTION_READY
NEXT_ACTION_DEFERRED = _cet.NEXT_ACTION_DEFERRED
TRACK_MISMATCH = _cet.TRACK_MISMATCH
FORBIDDEN_TRIGGER_SOURCE = _cet.FORBIDDEN_TRIGGER_SOURCE
NO_LEDGER_RECORD = _cet.NO_LEDGER_RECORD
TRIGGER_REGISTRY_COMPLETED = _cet.TRIGGER_REGISTRY_COMPLETED
TRIGGER_FIXED_TIME = _cet.TRIGGER_FIXED_TIME
TRIGGER_DEAD_MAN = _cet.TRIGGER_DEAD_MAN

CET_SRC = _ROOT / "anu_v3" / "callback_event_trigger.py"
REG_SRC = _ROOT / "anu_v3" / "callback_4tuple_registry.py"
WB_SCHEMA = _ROOT / "schemas" / "callback_4tuple_writeback.schema.json"
ET_SCHEMA = _ROOT / "schemas" / "callback_event_trigger.schema.json"
FX_45 = (_ROOT / "memory" / "fixtures"
         / "task-2553plus47.plus45-1424-normal-callback-completed.json")
FX_CONS = (_ROOT / "memory" / "fixtures"
           / "task-2553plus47.consolidated-summary.json")

GIT_HEAD_PRE = "20456b5f83fc039f2fd6f50f4b94095c29b41bfb"
GIT_BRANCH_PRE = "task/task-2553p1-f1-clean-replacement"
# +44 registry must remain byte-0 (task md §5: byte-0 우선, additive only if
# unavoidable — here untouched; write-back uses its existing append API).
# Pinned to the +44 baseline literal so test_13 is a durable byte-0 guard,
# not a within-run tautology (ANU-Codex adjudication, task-2553+47 collector).
REG_SHA = (
    "774d550628410d36962c23a7663c4b6dbf72789de7c7fd940871e9ad8280e5ab"
)


def _git(*args):
    return subprocess.run(
        ["git", "-C", str(_ROOT), *args],
        capture_output=True, text=True, check=True,
    ).stdout.strip()


class _Base(unittest.TestCase):
    def setUp(self):
        self._tmp = tempfile.TemporaryDirectory()
        self.ledger = Path(self._tmp.name) / "callback_4tuple_index.jsonl"
        self.reg = Callback4TupleRegistry(self.ledger)

    def tearDown(self):
        self._tmp.cleanup()

    def _seed_registered(self, task_id="task-2553+45", role="dispatch",
                         ncc: Optional[str] = None,
                         fb: Optional[str] = "1D8D112A",
                         dispatch_id="2CCA4E15"):
        self.reg.append(make_record(
            task_id=task_id, dispatch_id=dispatch_id,
            dispatch_cron_id=dispatch_id, executor="dev",
            chat_id="6937032012", normal_collector_cron_id=ncc,
            fallback_callback_cron_id=fb, role=role, status="REGISTERED",
            ts_kst="2026-05-18 13:42 KST",
        ))


class WriteBackAndEventTrigger(_Base):

    # §8.1 — normal collector success -> registry COMPLETED write-back
    def test_01_normal_success_writeback(self):
        self._seed_registered()
        r = write_back_completed(
            self.reg, task_id="task-2553+45", dispatch_id="2CCA4E15",
            dispatch_cron_id="2CCA4E15", executor="dev2 오딘",
            chat_id="6937032012", normal_collector_cron_id="NC-2553p45-1424",
            fallback_callback_cron_id="1D8D112A", role="executor",
            ts_kst="2026-05-18 14:24 KST",
        )
        self.assertEqual(r.status, WRITEBACK_COMPLETED)
        self.assertTrue(r.appended)
        latest = self.reg.latest_for("task-2553+45")
        self.assertEqual(latest.status, "COMPLETED")
        # binding corrected — not a naive copy of the stale role=dispatch/
        # normal_collector_cron_id=null line.
        self.assertEqual(latest.role, "executor")
        self.assertEqual(latest.normal_collector_cron_id, "NC-2553p45-1424")
        self.assertEqual(latest.fallback_callback_cron_id, "1D8D112A")
        jsonschema.validate(
            r.to_json(), json.loads(WB_SCHEMA.read_text("utf-8")))

    # §8.2 — completed write-back -> next_action READY
    def test_02_completed_writeback_next_action_ready(self):
        self._seed_registered()
        write_back_completed(
            self.reg, task_id="task-2553+45", dispatch_id="2CCA4E15",
            dispatch_cron_id="2CCA4E15", executor="dev2 오딘",
            chat_id="6937032012", normal_collector_cron_id="NC-1",
            fallback_callback_cron_id="1D8D112A", role="executor",
        )
        et = CallbackEventTrigger(self.reg)
        res = et.scan(task_id="task-2553+45",
                      expected_dispatch_id="2CCA4E15",
                      expected_chat_id="6937032012")
        self.assertEqual(res.verdict, NEXT_ACTION_READY)
        self.assertTrue(res.ready)
        self.assertEqual(res.trigger_source, TRIGGER_REGISTRY_COMPLETED)
        self.assertFalse(res.fixed_time_used)
        self.assertFalse(res.dead_man_used)
        # PROPOSAL ONLY — no write/dispatch authority (§6).
        self.assertEqual(res.next_action["authority"], "none")
        self.assertEqual(res.next_action["action_mode"], "proposal")
        self.assertFalse(res.next_action["auto_executed"])
        jsonschema.validate(
            res.to_json(), json.loads(ET_SCHEMA.read_text("utf-8")))

    # §8.3 — fallback pending + normal completed -> non-blocking
    def test_03_fallback_pending_non_blocking(self):
        self._seed_registered()
        write_back_completed(
            self.reg, task_id="task-2553+45", dispatch_id="2CCA4E15",
            dispatch_cron_id="2CCA4E15", executor="dev", chat_id="6937032012",
            normal_collector_cron_id="NC-1",
            fallback_callback_cron_id="1D8D112A", role="executor",
        )
        res = CallbackEventTrigger(self.reg).scan(task_id="task-2553+45")
        # fallback still bound/pending (Layer A never cancels) yet the
        # completed task proceeds.
        self.assertTrue(res.fallback_pending_non_blocking)
        self.assertTrue(res.ready)
        self.assertEqual(res.verdict, NEXT_ACTION_READY)

    # §8.4 — dead-man only -> NOT a primary trigger
    def test_04_dead_man_only_not_primary(self):
        self._seed_registered()  # REGISTERED only, no COMPLETED
        res = CallbackEventTrigger(self.reg).scan(
            task_id="task-2553+45", dead_man_signal=True)
        self.assertFalse(res.ready)
        self.assertEqual(res.verdict, NEXT_ACTION_DEFERRED)
        self.assertFalse(res.dead_man_used)
        self.assertIsNone(res.trigger_source)
        self.assertTrue(any("dead-man" in x and "NOT" in x
                            for x in res.reasons))

    # §8.5 — fixed-time gate as dependency trigger -> FAIL
    def test_05_fixed_time_dependency_trigger_fail(self):
        self._seed_registered()
        write_back_completed(
            self.reg, task_id="task-2553+45", dispatch_id="2CCA4E15",
            dispatch_cron_id="2CCA4E15", executor="dev", chat_id="6937032012",
            normal_collector_cron_id="NC-1",
            fallback_callback_cron_id="1D8D112A", role="executor",
        )
        et = CallbackEventTrigger(self.reg)
        for bad in (TRIGGER_FIXED_TIME, TRIGGER_DEAD_MAN):
            res = et.scan(task_id="task-2553+45",
                          dependency_trigger_source=bad)
            self.assertEqual(res.verdict, FORBIDDEN_TRIGGER_SOURCE)
            self.assertFalse(res.ready)
            self.assertIsNone(res.next_action)

    # §8.6 — repeated scan idempotent (+ repeated write-back idempotent)
    def test_06_idempotent(self):
        self._seed_registered()
        kw = dict(task_id="task-2553+45", dispatch_id="2CCA4E15",
                  dispatch_cron_id="2CCA4E15", executor="dev",
                  chat_id="6937032012", normal_collector_cron_id="NC-1",
                  fallback_callback_cron_id="1D8D112A", role="executor")
        r1 = write_back_completed(self.reg, **kw)
        r2 = write_back_completed(self.reg, **kw)
        self.assertEqual(r1.status, WRITEBACK_COMPLETED)
        self.assertEqual(r2.status, WRITEBACK_IDEMPOTENT_SKIP)
        self.assertFalse(r2.appended)
        completed_lines = [
            ln for ln in self.ledger.read_text("utf-8").splitlines()
            if '"status": "COMPLETED"' in ln
        ]
        self.assertEqual(len(completed_lines), 1)
        et = CallbackEventTrigger(self.reg)
        a = et.scan(task_id="task-2553+45").to_json()
        b = et.scan(task_id="task-2553+45").to_json()
        self.assertEqual(a, b)

    # §8.7 — missing fallback binding is NOT a cancel target
    def test_07_missing_fallback_not_cancel_target(self):
        self._seed_registered(fb=None)
        write_back_completed(
            self.reg, task_id="task-2553+45", dispatch_id="2CCA4E15",
            dispatch_cron_id="2CCA4E15", executor="dev", chat_id="6937032012",
            normal_collector_cron_id="NC-1",
            fallback_callback_cron_id=None, role="executor",
            no_fallback=True,
        )
        res = CallbackEventTrigger(self.reg).scan(task_id="task-2553+45")
        self.assertTrue(res.ready)
        # absent / declared-no fallback -> never a cancel target.
        self.assertFalse(res.fallback_pending_non_blocking)
        self.assertEqual(res.verdict, NEXT_ACTION_READY)

    # §8.8 — +45 14:24 case -> automatic next-action detection reproduced
    def test_08_plus45_1424_autodetect(self):
        fx = json.loads(FX_45.read_text("utf-8"))
        d = fx["defect_case"]["ledger_record_at_defect"]
        self.reg.append(make_record(
            task_id=d["task_id"], dispatch_id=d["dispatch_id"],
            dispatch_cron_id=d["dispatch_cron_id"], executor=d["executor"],
            chat_id=d["chat_id"],
            normal_collector_cron_id=d["normal_collector_cron_id"],
            fallback_callback_cron_id=d["fallback_callback_cron_id"],
            role=d["role"], status=d["status"], ts_kst=d["ts_kst"],
        ))
        et = CallbackEventTrigger(self.reg)
        # before write-back: deferred (the exact 14:24->14:44 defect).
        before = et.scan(task_id="task-2553+45")
        self.assertEqual(before.verdict, NEXT_ACTION_DEFERRED)
        wi = fx["writeback_input"]
        wr = write_back_completed(
            self.reg, task_id=wi["task_id"], dispatch_id=wi["dispatch_id"],
            dispatch_cron_id=wi["dispatch_cron_id"], executor=wi["executor"],
            chat_id=wi["chat_id"],
            normal_collector_cron_id=wi["normal_collector_cron_id"],
            fallback_callback_cron_id=wi["fallback_callback_cron_id"],
            role=wi["role"], ts_kst=wi["ts_kst"],
        )
        self.assertEqual(wr.status, WRITEBACK_COMPLETED)
        after = et.scan(task_id="task-2553+45")
        self.assertEqual(after.verdict, NEXT_ACTION_READY)
        self.assertTrue(after.ready)
        self.assertEqual(after.trigger_source, TRIGGER_REGISTRY_COMPLETED)
        self.assertFalse(after.fixed_time_used)
        self.assertFalse(after.dead_man_used)

    # §8.8b — consolidated inputs all COMPLETED -> summary candidate now
    def test_08b_consolidated_summary_candidate(self):
        fx = json.loads(FX_CONS.read_text("utf-8"))
        for rec in fx["consolidated_case"]["completed_records"]:
            self.reg.append(make_record(
                task_id=rec["task_id"], dispatch_id=rec["dispatch_id"],
                dispatch_cron_id=rec["dispatch_cron_id"],
                executor=rec["executor"], chat_id=rec["chat_id"],
                normal_collector_cron_id=rec["normal_collector_cron_id"],
                fallback_callback_cron_id=rec["fallback_callback_cron_id"],
                role=rec["role"], status=rec["status"],
                ts_kst=rec["ts_kst"],
            ))
        res = CallbackEventTrigger(self.reg).scan(
            task_id="task-2553+47",
            consolidated_inputs=fx["consolidated_case"][
                "consolidated_inputs"],
        )
        self.assertTrue(res.ready)
        self.assertTrue(res.summary_candidate)
        self.assertEqual(
            sorted(res.summary_inputs_completed),
            sorted(fx["consolidated_case"]["consolidated_inputs"]),
        )
        self.assertTrue(res.next_action["summary_candidate"])

    # §8.9 — registry mismatch -> TRACK_MISMATCH
    def test_09_track_mismatch(self):
        self._seed_registered()
        write_back_completed(
            self.reg, task_id="task-2553+45", dispatch_id="2CCA4E15",
            dispatch_cron_id="2CCA4E15", executor="dev", chat_id="6937032012",
            normal_collector_cron_id="NC-1",
            fallback_callback_cron_id="1D8D112A", role="executor",
        )
        res = CallbackEventTrigger(self.reg).scan(
            task_id="task-2553+45", expected_dispatch_id="DEADBEEF")
        self.assertEqual(res.verdict, TRACK_MISMATCH)
        self.assertFalse(res.ready)
        self.assertIsNone(res.next_action)
        res2 = CallbackEventTrigger(self.reg).scan(
            task_id="task-2553+45", expected_chat_id="9999999999")
        self.assertEqual(res2.verdict, TRACK_MISMATCH)

    # §8.10 — callback mandatory rule no-regression
    def test_10_callback_mandatory_no_regression(self):
        self._seed_registered()
        r = write_back_completed(
            self.reg, task_id="task-2553+45", dispatch_id="2CCA4E15",
            dispatch_cron_id="2CCA4E15", executor="dev", chat_id="6937032012",
            normal_collector_cron_id=None,  # MANDATORY signal missing
            fallback_callback_cron_id="1D8D112A", role="executor",
        )
        self.assertEqual(r.status, CALLBACK_MANDATORY_VIOLATION)
        self.assertFalse(r.appended)
        # rule NOT weakened: no COMPLETED line was written.
        self.assertNotIn("COMPLETED",
                         self.ledger.read_text("utf-8")
                         if self.ledger.is_file() else "")
        jsonschema.validate(
            r.to_json(), json.loads(WB_SCHEMA.read_text("utf-8")))

    # NO_LEDGER_RECORD fail-safe DEFER (not a progress trigger)
    def test_11_no_ledger_record_defer(self):
        res = CallbackEventTrigger(self.reg).scan(task_id="task-unknown")
        self.assertEqual(res.verdict, NO_LEDGER_RECORD)
        self.assertFalse(res.ready)
        self.assertIsNone(res.trigger_source)


class LayerANoCronInvariants(_Base):

    # 9-R.1 Layer A: ZERO cron/dispatch/merge/subprocess executable code
    # (prose in the docstring is allowed; only real imports/calls are a
    # Layer A violation).
    def test_12_no_cron_side_effects_in_source(self):
        tree = ast.parse(CET_SRC.read_text("utf-8"))
        banned_mods = {"subprocess", "os"}
        for node in ast.walk(tree):
            if isinstance(node, ast.Import):
                for a in node.names:
                    self.assertNotIn(
                        a.name.split(".")[0], {"subprocess"},
                        "Layer A violation: import subprocess",
                    )
            if isinstance(node, ast.ImportFrom):
                self.assertNotEqual(
                    node.module, "subprocess",
                    "Layer A violation: from subprocess import ...",
                )
            if isinstance(node, ast.Call):
                f = node.func
                # os.system / subprocess.* / *.Popen calls are forbidden.
                if isinstance(f, ast.Attribute):
                    base = f.value
                    if isinstance(base, ast.Name) and base.id in banned_mods:
                        self.assertNotEqual(
                            (base.id, f.attr), ("os", "system"),
                            "Layer A violation: os.system call",
                        )
                        self.assertNotEqual(
                            base.id, "subprocess",
                            "Layer A violation: subprocess.* call",
                        )
                    self.assertNotEqual(
                        f.attr, "Popen",
                        "Layer A violation: Popen call",
                    )
        # No literal cron/cokacdir command string is executed anywhere.
        for node in ast.walk(tree):
            if isinstance(node, ast.Constant) and isinstance(
                node.value, str
            ):
                low = node.value.lower()
                self.assertNotIn("cokacdir --", low)
                self.assertNotIn("--sendfile", low)

    # +44 registry stays byte-0 (write-back uses its existing append API).
    def test_13_plus44_registry_byte0(self):
        self.assertEqual(
            hashlib.sha256(REG_SRC.read_bytes()).hexdigest(), REG_SHA)

    # proposal envelope is proposal-only (no auto execution).
    def test_14_proposal_envelope_proposal_only(self):
        self._seed_registered()
        write_back_completed(
            self.reg, task_id="task-2553+45", dispatch_id="2CCA4E15",
            dispatch_cron_id="2CCA4E15", executor="dev", chat_id="6937032012",
            normal_collector_cron_id="NC-1",
            fallback_callback_cron_id="1D8D112A", role="executor",
        )
        res = CallbackEventTrigger(self.reg).scan(task_id="task-2553+45")
        env = proposal_envelope(res, generated_at_kst="2026-05-18 15:00 KST")
        self.assertEqual(env["action_mode"], "proposal")
        self.assertEqual(env["authority"], "none")
        self.assertFalse(env["auto_executed"])

    # git tracked HEAD/branch invariant — read-only regression.
    def test_15_git_invariant(self):
        self.assertEqual(_git("rev-parse", "HEAD"), GIT_HEAD_PRE)
        self.assertEqual(
            _git("rev-parse", "--abbrev-ref", "HEAD"), GIT_BRANCH_PRE)


if __name__ == "__main__":
    unittest.main(verbosity=2)
