# -*- coding: utf-8 -*-
"""Regression — task-2553+31 ANU_RUNTIME_RECONCILE_CHECKPOINT.

Covers §6 verbatim (20 items) + §5 fixture self-checks + NO-CRON dogfood +
frozen-guard / no-mutation / schema-conformance extras.

NO-CRON (§15): zero cron register/remove; +31 self-completion via
result.json + .done existence (dogfooding). Checkpoint is read-only:
zero write / cron / merge / dispatch (regression 12).
"""
import hashlib
import json
import sys
import unittest
from pathlib import Path

_ROOT = Path(__file__).resolve().parents[2]
if str(_ROOT) not in sys.path:
    sys.path.insert(0, str(_ROOT))

from anu_v3.runtime_reconcile_checkpoint import (  # noqa: E402
    RuntimeReconcileCheckpoint,
    RuntimeTaskObservation,
    classify_observation,
)
from anu_v3.runtime_next_action_resolver import (  # noqa: E402
    resolve,
    TERMINAL,
    NONTERMINAL,
    ALL_CLASSIFICATIONS,
)
from anu_v3.runtime_batch_state_updater import (  # noqa: E402
    build_proposal,
    emit_runtime_batch_state,
    FrozenWriteRefused,
)

FIX_DIR = _ROOT / "memory" / "fixtures"
NO_CRON_30 = FIX_DIR / "task-2553.runtime-reconcile.no-cron-30.json"
RR_26_27 = FIX_DIR / "task-2553.runtime-reconcile.result-ready-no-normal-26-27.json"
TAXONOMY = FIX_DIR / "task-2553.runtime-reconcile.taxonomy-cases.json"

FROZEN_V1 = _ROOT / "memory" / "events" / "task-2553.parallel-batch-state.json"
FROZEN_CALLBACK = _ROOT / "utils" / "anu_delegation_completion_callback.py"
FROZEN_COORD = _ROOT / "anu_v3" / "parallel_batch_coordinator.py"

# +26/+27/+30 real artifacts — read/parse/reference only (§5/§9 fixture verbs).
SOURCE_ARTIFACTS = [
    _ROOT / "memory" / "events" / "task-2553+26.result.json",
    _ROOT / "memory" / "events" / "task-2553+27.result.json",
    _ROOT / "memory" / "events" / "task-2553+30.result.json",
    _ROOT / "memory" / "events" / "task-2553+30.done",
]


def _sha(p: Path) -> str:
    return hashlib.sha256(Path(p).read_bytes()).hexdigest()


def _ck() -> RuntimeReconcileCheckpoint:
    return RuntimeReconcileCheckpoint(_ROOT)


def _records(fixture: Path) -> dict:
    return _ck().run(fixture)["track_records"]


def _obs(**kw) -> RuntimeTaskObservation:
    base = dict(
        task_id="t", dispatch_ok=True, result_present=False,
        done_present=False, normal_collector_registered=False,
        normal_collector_executed=False, by_design_no_normal_collector=False,
        fallback_state="NONE", terminal_outcome="", stale=False,
        track_mismatch=False, track_mismatch_reasons=[],
    )
    base.update(kw)
    return RuntimeTaskObservation(**base)


class RuntimeReconcileCheckpointRegression(unittest.TestCase):

    # 1 — NO-CRON task result.json + .done -> NO_CRON_TASK_DONE
    def test_01_no_cron_task_done(self):
        recs = _records(NO_CRON_30)
        self.assertEqual(
            recs["task-2553+30"]["classification"], "NO_CRON_TASK_DONE")
        self.assertTrue(recs["task-2553+30"]["terminal"])

    # 2 — dispatch ok + result exists + normal callback missing
    def test_02_result_ready_no_normal_callback(self):
        recs = _records(RR_26_27)
        for tid in ("task-2553+26", "task-2553+27"):
            self.assertEqual(
                recs[tid]["classification"],
                "RESULT_READY_NO_NORMAL_CALLBACK")
            self.assertTrue(recs[tid]["recovery_eligible"])

    # 3 — dispatch ok + result missing + not stale -> RUNNING|WAIT_FOR_RESULT
    def test_03_running_or_wait(self):
        c = classify_observation(_obs(result_present=False, stale=False,
                                      fallback_state="NONE"))
        self.assertIn(c, {"RUNNING", "WAIT_FOR_RESULT"})

    # 4 — dispatch ok + result missing + stale -> STALE_OR_BOT_STUCK_CANDIDATE
    def test_04_stale_candidate(self):
        c = classify_observation(_obs(result_present=False, stale=True))
        self.assertEqual(c, "STALE_OR_BOT_STUCK_CANDIDATE")

    # 5 — normal callback completed -> NORMAL_COLLECTOR_COMPLETED
    def test_05_normal_collector_completed(self):
        recs = _records(TAXONOMY)
        self.assertEqual(
            recs["task-2553+SYN-normal-completed"]["classification"],
            "NORMAL_COLLECTOR_COMPLETED")

    # 6 — fallback pending + result ready -> non-blocking
    def test_06_fallback_pending_result_ready_non_blocking(self):
        recs = _records(RR_26_27)
        rec = recs["task-2553+26"]
        self.assertEqual(rec["fallback_state"], "PENDING")
        self.assertFalse(rec["next_action"]["blocking"])
        self.assertFalse(rec["next_action"]["is_execution"])

    # 7 — fallback fired after result ready -> DUPLICATE_CALLBACK_IGNORED
    def test_07_duplicate_callback_ignored(self):
        recs = _records(TAXONOMY)
        self.assertEqual(
            recs["task-2553+SYN-fallback-duplicate"]["classification"],
            "DUPLICATE_CALLBACK_IGNORED")

    # 8 — task_id mismatch -> TRACK_MISMATCH
    def test_08_taskid_mismatch(self):
        recs = _records(TAXONOMY)
        self.assertEqual(
            recs["task-2553+SYN-mismatch-taskid"]["classification"],
            "TRACK_MISMATCH")

    # 9 — dispatch_cron_id mismatch -> TRACK_MISMATCH
    def test_09_dispatch_cron_mismatch(self):
        recs = _records(TAXONOMY)
        self.assertEqual(
            recs["task-2553+SYN-mismatch-dispatchcron"]["classification"],
            "TRACK_MISMATCH")

    # 10 — normal collector belongs to different task -> TRACK_MISMATCH
    def test_10_collector_other_task_mismatch(self):
        recs = _records(TAXONOMY)
        self.assertEqual(
            recs["task-2553+SYN-mismatch-collector"]["classification"],
            "TRACK_MISMATCH")

    # 11 — fallback belongs to different task -> TRACK_MISMATCH
    def test_11_fallback_other_task_mismatch(self):
        recs = _records(TAXONOMY)
        self.assertEqual(
            recs["task-2553+SYN-mismatch-fallback"]["classification"],
            "TRACK_MISMATCH")

    # 12 — checkpoint performs zero write/cron/merge/dispatch
    def test_12_zero_side_effects(self):
        events = _ROOT / "memory" / "events"
        before = {p.name: _sha(p) for p in events.glob("*") if p.is_file()}
        res = _ck().run(NO_CRON_30, emit=False)
        after = {p.name: _sha(p) for p in events.glob("*") if p.is_file()}
        self.assertEqual(before, after, "run(emit=False) wrote/changed a file")
        z = res["zero_side_effect_proof"]
        self.assertEqual(
            (z["write"], z["cron"], z["merge"], z["dispatch"]), (0, 0, 0, 0))

    # 13 — batch_state update is additive / versioned
    def test_13_batch_state_additive_versioned(self):
        res = _ck().run(RR_26_27, prior_version=4)
        bs = res["batch_state_proposal"]
        self.assertEqual(bs["artifact_kind"], "ADDITIVE_VERSIONED_PROPOSAL")
        self.assertFalse(bs["supersedes_durable_v1"])
        self.assertEqual(bs["version"], 5)
        self.assertEqual(
            bs["schema"],
            "anu_v3.runtime_reconcile_checkpoint.batch_state.v1")

    # 14 — frozen coordinator/source artifacts not mutated
    def test_14_frozen_not_mutated(self):
        pre = {p: _sha(p) for p in
               [FROZEN_V1, FROZEN_CALLBACK, FROZEN_COORD] + SOURCE_ARTIFACTS
               if p.is_file()}
        _ck().run(NO_CRON_30, emit=False)
        _ck().run(RR_26_27, emit=False)
        _ck().run(TAXONOMY, emit=False)
        post = {p: _sha(p) for p in
                [FROZEN_V1, FROZEN_CALLBACK, FROZEN_COORD] + SOURCE_ARTIFACTS
                if p.is_file()}
        self.assertEqual(pre, post)

    # 15 — next_action is recommendation only, not execution
    def test_15_next_action_recommendation_only(self):
        for fx in (NO_CRON_30, RR_26_27, TAXONOMY):
            for rec in _records(fx).values():
                self.assertIn("recommendation", rec["next_action"])
                self.assertFalse(rec["next_action"]["is_execution"])
                self.assertEqual(
                    rec["next_action"]["execution_authority"],
                    "CHAIR_OR_ANU")

    # 16 — closeout eligible may be proposed, never confirmed w/o authority
    def test_16_closeout_proposed_not_confirmed(self):
        cp = _ck().run(NO_CRON_30)["batch_state_proposal"]["closeout_proposal"]
        self.assertTrue(cp["eligible"])
        self.assertFalse(cp["confirmed"])
        self.assertIn("CHAIR", cp["authority_required"])

    # 17 — callback primary path remains documented & not disabled
    def test_17_primary_path_preserved(self):
        st = _ck().run(NO_CRON_30)["batch_state_proposal"]
        self.assertIn(
            "NOT disabled",
            st["callback_paths_status"]["primary_callback_path"])

    # 18 — fallback safety path remains documented & not disabled
    def test_18_fallback_path_preserved(self):
        st = _ck().run(NO_CRON_30)["batch_state_proposal"]
        self.assertIn(
            "NOT disabled",
            st["callback_paths_status"]["fallback_safety_path"])

    # 19 — cancel-on-success path remains compatible
    def test_19_cancel_on_success_compatible(self):
        st = _ck().run(NO_CRON_30)["batch_state_proposal"]
        self.assertIn(
            "compatible",
            st["callback_paths_status"]["cancel_on_success"])

    # 20 — +26/+27/+30 fixture batch_state reproduces actual statuses
    def test_20_reproduces_actual_statuses(self):
        recs = {**_records(NO_CRON_30), **_records(RR_26_27)}
        self.assertEqual(recs["task-2553+30"]["terminal_outcome"], "DONE")
        self.assertEqual(recs["task-2553+26"]["terminal_outcome"], "MERGED")
        self.assertEqual(recs["task-2553+27"]["terminal_outcome"], "PASS")
        # cross-check against the real result.json verdicts (read-only).
        det = _ck().detector
        self.assertIn(
            det.terminal_outcome_hint("task-2553+30"), ("DONE", "PASS"))
        self.assertIn(
            det.terminal_outcome_hint("task-2553+26"),
            ("COMPLETE_MERGED", "MERGED"))
        self.assertIn(
            det.terminal_outcome_hint("task-2553+27"), ("DONE", "PASS"))

    # --- §5 fixture self-checks ------------------------------------------
    def test_extra_fixture_self_check_all_pass(self):
        for fx in (NO_CRON_30, RR_26_27, TAXONOMY):
            cs = _ck().run(fx)["consolidated_summary"]
            self.assertTrue(cs["fixture_self_check_pass"], f"{fx} self-check")

    # --- taxonomy completeness (9-R.2/9-R.4) -----------------------------
    def test_extra_taxonomy_enumeration(self):
        self.assertEqual(len(TERMINAL), 5)
        self.assertEqual(len(NONTERMINAL), 4)
        self.assertEqual(len(ALL_CLASSIFICATIONS), 9)
        for c in ALL_CLASSIFICATIONS:
            na = resolve(c)
            self.assertFalse(na.to_json()["is_execution"])
        # unknown -> HOLD candidate, never silently accepted (§3 그 외 0)
        self.assertIn("HOLD", resolve("BOGUS").recommendation)

    # --- 9-R.1 emit guard: only allowlisted path; durable v1 never -------
    def test_extra_emit_guard_allowlist(self):
        res = _ck().run(NO_CRON_30, emit=True)
        emitted = res["batch_state_proposal_emitted"]
        self.assertTrue(Path(emitted["path"]).is_file())
        self.assertEqual(
            Path(emitted["path"]).name,
            "task-2553.runtime-reconcile-checkpoint.batch-state.json")
        self.assertNotEqual(Path(emitted["path"]), FROZEN_V1)

    def test_extra_emit_refuses_durable_v1(self):
        doc = build_proposal(
            {}, source_fixture="x", source_fixture_sha256="x",
            frozen_v1_ref="x", frozen_v1_sha256="x", generated_ts_kst="x")
        with self.assertRaises(FrozenWriteRefused):
            emit_runtime_batch_state(doc, FROZEN_V1, _ROOT)
        with self.assertRaises(FrozenWriteRefused):
            emit_runtime_batch_state(
                doc,
                _ROOT / "memory" / "events"
                / "task-2553.generic-batch-state.json",
                _ROOT)

    def test_extra_emit_durable_v1_untouched(self):
        pre = _sha(FROZEN_V1)
        _ck().run(NO_CRON_30, emit=True)
        self.assertEqual(_sha(FROZEN_V1), pre)

    # --- §11 entrypoint + §15 dogfooding ---------------------------------
    def test_extra_entrypoint_auto_wireable_no_chair_question(self):
        ep = _ck().checkpoint_entrypoint(NO_CRON_30)
        self.assertTrue(ep["auto_wireable"])
        self.assertFalse(ep["chair_question_required"])
        self.assertTrue(ep["no_cron_completion_detected"])

    def test_extra_dogfooding_self_completion(self):
        sc = _ck().detect_self_completion()
        self.assertEqual(sc["task_id"], "task-2553+31")
        self.assertFalse(sc["chair_question_required"])
        # classification is deterministic on (result.json + .done) existence
        self.assertIn(sc["classification"], ALL_CLASSIFICATIONS)

    # --- schema conformance ----------------------------------------------
    def test_extra_result_schema_keys(self):
        res = _ck().run(RR_26_27)
        for k in ("schema", "task_id", "mode", "track_records",
                  "batch_state_proposal", "consolidated_summary",
                  "zero_side_effect_proof"):
            self.assertIn(k, res)
        self.assertEqual(res["mode"], "READ_ONLY")
        # emitted proposal must validate-shape against result schema subset
        schema = json.loads(
            (_ROOT / "schemas" / "runtime_reconcile_result.schema.json")
            .read_text())
        self.assertEqual(schema["title"],
                         "anu_v3.runtime_reconcile_checkpoint.result.v1")


if __name__ == "__main__":
    unittest.main()
