# -*- coding: utf-8 -*-
"""Regression — task-2553+29 ANU parallel batch coordinator runtime registry.

Covers §5 1~15 verbatim. NO-CRON variant (9-R.1): zero cron register/remove;
self-completion via result.json + .done existence (dogfooding).

  1  dispatch ok + result + no normal callback -> RESULT_READY_NO_NORMAL_CALLBACK
  2  result ready + fallback pending -> does NOT block batch final state
  3  result ready + fallback later fires -> DUPLICATE_CALLBACK_IGNORED /
     RESULT_READY_ALREADY_COLLECTED
  4  normal callback completed -> NORMAL_COLLECTOR_COMPLETED
  5  dispatch ok + no result + fallback pending -> WAIT_FOR_FALLBACK
  6  fallback fires + no result -> RESULT_MISSING_BOT_STALE
  7  task_id mismatch -> TRACK_MISMATCH
  8  dispatch_cron_id mismatch -> TRACK_MISMATCH
  9  normal collector belongs to different task -> TRACK_MISMATCH
  10 fallback belongs to different task -> TRACK_MISMATCH
  11 one track HOLD does not block independent DONE track
  12 cross-task artifact contamination -> BATCH_HOLD
  13 batch_state represents +26 MERGED, +27 PASS, +28 DONE simultaneously
  14 closeout eligibility derived from batch_state, not chat memory
  15 consolidated summary contains only final decision fields
"""
import sys
import unittest
from pathlib import Path

_ROOT = Path(__file__).resolve().parents[2]
if str(_ROOT) not in sys.path:
    sys.path.insert(0, str(_ROOT))

from anu_v3.parallel_runtime_registry import (  # noqa: E402
    ParallelRuntimeRegistry,
    TaskRuntimeRecord,
)
from anu_v3.callback_4tuple_index import (  # noqa: E402
    Callback4TupleIndex,
    Tuple4,
)
from anu_v3.batch_runtime_reconciler import reconcile  # noqa: E402

FIXTURE = _ROOT / "memory" / "fixtures" / "task-2553.runtime-reconcile.fixture.json"
FROZEN = _ROOT / "memory" / "events" / "task-2553.parallel-batch-state.json"


def _rec(**kw) -> TaskRuntimeRecord:
    base = dict(
        task_id="task-X",
        executor="devX",
        dispatch_cron_id="D1",
        fallback_callback_cron_id="F1",
        dispatch_status="ok",
        fallback_state="PENDING",
    )
    base.update(kw)
    return TaskRuntimeRecord(**base)


class RuntimeRegistryRegression(unittest.TestCase):
    # 1
    def test_01_result_ready_no_normal_callback(self):
        reg = ParallelRuntimeRegistry()
        reg.register_dispatch(_rec(result_present=True))
        r = reg.reconcile_track("task-X")
        self.assertEqual(r.classification, "RESULT_READY_NO_NORMAL_CALLBACK")
        self.assertTrue(r.recovery_eligible, "must be recovery target, not failure")

    # 2
    def test_02_fallback_pending_non_blocking(self):
        bs = reconcile(FIXTURE, FROZEN)
        # +26/+27/+28 all have fallback PENDING yet batch is not blocked.
        self.assertEqual(bs["batch_next_action"], "BATCH_ACCEPT")
        self.assertEqual(
            sorted(bs["join_policy"]["independent_done_tracks"]),
            ["task-2553+26", "task-2553+27", "task-2553+28"],
        )

    # 3
    def test_03_result_ready_fallback_later_fires(self):
        reg = ParallelRuntimeRegistry()
        reg.register_dispatch(_rec(result_present=True))
        reg.reconcile_track("task-X")
        cls = reg.classify_fallback(
            claimed_task_id="task-X", fallback_cron_id="F1"
        )
        self.assertEqual(cls, "DUPLICATE_CALLBACK_IGNORED")
        # already collected variant
        reg2 = ParallelRuntimeRegistry()
        reg2.register_dispatch(
            _rec(result_present=True, normal_collector_executed=True)
        )
        reg2.reconcile_track("task-X")
        self.assertEqual(
            reg2.classify_fallback(claimed_task_id="task-X", fallback_cron_id="F1"),
            "RESULT_READY_ALREADY_COLLECTED",
        )

    # 4
    def test_04_normal_collector_completed(self):
        reg = ParallelRuntimeRegistry()
        reg.register_dispatch(
            _rec(result_present=True, normal_collector_executed=True)
        )
        r = reg.reconcile_track("task-X")
        self.assertEqual(r.classification, "NORMAL_COLLECTOR_COMPLETED")

    # 5
    def test_05_no_result_fallback_pending_wait(self):
        reg = ParallelRuntimeRegistry()
        reg.register_dispatch(_rec(result_present=False, fallback_state="PENDING"))
        r = reg.reconcile_track("task-X")
        self.assertEqual(r.classification, "WAIT_FOR_FALLBACK")

    # 6
    def test_06_fallback_fires_no_result_stale(self):
        reg = ParallelRuntimeRegistry()
        reg.register_dispatch(_rec(result_present=False, fallback_state="FIRED"))
        r = reg.reconcile_track("task-X")
        self.assertEqual(r.classification, "RESULT_MISSING_BOT_STALE")
        self.assertEqual(
            reg.classify_fallback(claimed_task_id="task-X", fallback_cron_id="F1"),
            "RESULT_MISSING_BOT_STALE",
        )

    # 7
    def test_07_task_id_mismatch(self):
        idx = Callback4TupleIndex()
        idx.register(Tuple4("task-A", "D1", "N1", "F1"))
        reasons = idx.classify_event(
            claimed_task_id="task-A",
            event_kind="fallback",
            event_task_id="task-B",
            event_cron_id="F1",
        )
        self.assertTrue(any("task_id mismatch" in x for x in reasons))

    # 8
    def test_08_dispatch_cron_id_mismatch(self):
        idx = Callback4TupleIndex()
        idx.register(Tuple4("task-A", "D1", "N1", "F1"))
        idx.register(Tuple4("task-B", "D2", "N2", "F2"))
        reasons = idx.classify_event(
            claimed_task_id="task-A",
            event_kind="dispatch",
            event_cron_id="D2",
        )
        self.assertTrue(any("belongs to task-B" in x for x in reasons))

    # 9
    def test_09_normal_collector_other_task(self):
        idx = Callback4TupleIndex()
        idx.register(Tuple4("task-A", "D1", "N1", "F1"))
        idx.register(Tuple4("task-B", "D2", "N2", "F2"))
        reasons = idx.classify_event(
            claimed_task_id="task-A",
            event_kind="normal_collector",
            event_cron_id="N2",
        )
        self.assertTrue(any("belongs to task-B" in x for x in reasons))

    # 10
    def test_10_fallback_other_task(self):
        idx = Callback4TupleIndex()
        idx.register(Tuple4("task-A", "D1", "N1", "F1"))
        idx.register(Tuple4("task-B", "D2", "N2", "F2"))
        reasons = idx.classify_event(
            claimed_task_id="task-A",
            event_kind="fallback",
            event_cron_id="F2",
        )
        self.assertTrue(any("belongs to task-B" in x for x in reasons))
        # and the registry surfaces TRACK_MISMATCH for the fallback fire
        reg = ParallelRuntimeRegistry()
        reg.register_dispatch(
            _rec(task_id="task-A", dispatch_cron_id="D1", fallback_callback_cron_id="F1")
        )
        reg.register_dispatch(
            _rec(task_id="task-B", dispatch_cron_id="D2", fallback_callback_cron_id="F2")
        )
        self.assertEqual(
            reg.classify_fallback(
                claimed_task_id="task-A", fallback_cron_id="F2"
            ),
            "TRACK_MISMATCH",
        )

    # 11
    def test_11_hold_track_does_not_block_done(self):
        reg = ParallelRuntimeRegistry()
        reg.register_dispatch(
            _rec(task_id="task-DONE", dispatch_cron_id="D1",
                 fallback_callback_cron_id="F1", result_present=True,
                 terminal_outcome="DONE")
        )
        reg.register_dispatch(
            _rec(task_id="task-HOLD", dispatch_cron_id="D2",
                 fallback_callback_cron_id="F2", hold_for_chair=True,
                 terminal_outcome="HOLD")
        )
        reg.reconcile_all()
        from anu_v3.batch_runtime_join_policy import TrackJoinView, join
        views = [
            TrackJoinView("task-DONE", "DONE", "RESULT_READY_NO_NORMAL_CALLBACK",
                          False, "PENDING", True),
            TrackJoinView("task-HOLD", "HOLD", "RESULT_READY_NO_NORMAL_CALLBACK",
                          True, "PENDING", False),
        ]
        jr = join(views)
        self.assertIn("task-DONE", jr.independent_done_tracks)
        self.assertIn("task-HOLD", jr.held_tracks)
        self.assertEqual(jr.batch_next_action, "BATCH_HOLD")
        self.assertTrue(jr.independent_done_tracks,
                        "DONE track must remain independently settled")

    # 12
    def test_12_contamination_batch_hold(self):
        bs = reconcile(
            FIXTURE, FROZEN,
            contamination=[{"writer": "task-2553+26", "victim": "task-2553+27"}],
        )
        self.assertEqual(bs["batch_next_action"], "BATCH_HOLD")
        self.assertFalse(bs["closeout_proposal"]["eligible"])

    # 13
    def test_13_batch_state_three_outcomes_simultaneous(self):
        bs = reconcile(FIXTURE, FROZEN)
        tr = bs["tracks"]
        self.assertEqual(tr["task-2553+26"]["terminal_outcome"], "MERGED")
        self.assertEqual(tr["task-2553+27"]["terminal_outcome"], "PASS")
        self.assertEqual(tr["task-2553+28"]["terminal_outcome"], "DONE")

    # 14
    def test_14_closeout_derived_from_batch_state(self):
        bs = reconcile(FIXTURE, FROZEN)
        prop = bs["closeout_proposal"]
        self.assertEqual(prop["derived_from"], "batch_state")
        self.assertFalse(prop["confirmed"], "registry never confirms closeout (§7)")
        self.assertTrue(prop["eligible"])
        # derivation depends only on the batch_state dict
        from anu_v3.batch_runtime_join_policy import derive_closeout_proposal
        self.assertEqual(derive_closeout_proposal(bs)["eligible"], prop["eligible"])

    # 15
    def test_15_consolidated_summary_final_fields_only(self):
        bs = reconcile(FIXTURE, FROZEN)
        cs = bs["consolidated_summary"]
        self.assertEqual(set(cs.keys()),
                         {"tracks", "batch_next_action", "closeout_eligible"})
        for rec in cs["tracks"].values():
            self.assertEqual(
                set(rec.keys()),
                {"terminal_outcome", "classification", "hold_for_chair"},
            )

    # NO-CRON dogfooding self-completion (§12 / 9-R.1)
    def test_16_self_completion_via_result_and_done(self):
        import tempfile
        with tempfile.TemporaryDirectory() as d:
            res = Path(d) / "r.json"
            done = Path(d) / "x.done"
            self.assertFalse(
                ParallelRuntimeRegistry.self_completion_recognized(res, done)
            )
            res.write_text("{}", encoding="utf-8")
            done.write_text("", encoding="utf-8")
            self.assertTrue(
                ParallelRuntimeRegistry.self_completion_recognized(res, done)
            )

    # frozen authority guard (9-R.2)
    def test_17_refuse_write_to_frozen_v1(self):
        from anu_v3.batch_runtime_reconciler import write_authority_state
        with self.assertRaises(RuntimeError):
            write_authority_state({}, FROZEN)


if __name__ == "__main__":
    unittest.main(verbosity=2)
