# -*- coding: utf-8 -*-
"""Regression — task-2553+43 STEP 3 checkpoint turn-boundary sweep.

Proves 9-R.1 Layer A: anu_v3.checkpoint_turn_boundary_sweep is a PURE
READ-ONLY deliverable —

  * correct candidate enumeration (drift/stale/recovery/NO-CRON/result-ready
    + benign + in-flight) from the +31 checkpoint read-only entrypoint
  * +31 runtime_reconcile_checkpoint.py + recovery layer + frozen anchors
    byte-0 (before == after)
  * STATIC side-effect-0: forbidden write/cron/merge/PR/subprocess tokens
    absent from the module source
  * DYNAMIC side-effect-0: live write / cron / subprocess sentinels capture
    ZERO events across a sweep run
  * idempotent: N calls with identical inputs => byte-identical output
  * emit=True is NEVER reached (read-only consumption of +31)
  * recovery-not-primary invariant preserved (callback primary / fallback
    safety paths NOT replaced — task §5)
  * callback / collector module files untouched
  * live git tracked HEAD + branch invariant (before == after)
  * schema conformance of sweep output + wiring-candidate doc

9-R.1 Layer B (the §8 executor completion callback cron) is a separate
process-lifecycle signal fired by the executor via external cron tooling and
is intentionally OUT OF SCOPE for this module-level regression.
"""
import ast
import builtins
import hashlib
import json
import os
import subprocess
import sys
import unittest
from pathlib import Path

_ROOT = Path(__file__).resolve().parents[2]
if str(_ROOT) not in sys.path:
    sys.path.insert(0, str(_ROOT))

from anu_v3.checkpoint_turn_boundary_sweep import (  # noqa: E402
    sweep_turn_boundary,
    wiring_candidates,
    recovery_invariant_status,
    SWEEP_RESULT_SCHEMA,
    WIRING_CANDIDATE_SCHEMA,
)

FIX = _ROOT / "memory" / "fixtures" / "task-2553+43.sweep-cases.json"
SWEEP_MOD = _ROOT / "anu_v3" / "checkpoint_turn_boundary_sweep.py"
SCHEMA = _ROOT / "schemas" / "checkpoint_turn_boundary_sweep.schema.json"

# Frozen byte-0 anchors (task §5 / frozen anchor list).
FROZEN = {
    "plus31": _ROOT / "anu_v3" / "runtime_reconcile_checkpoint.py",
    "plus31_recovery": _ROOT / "anu_v3"
    / "runtime_reconcile_checkpoint_recovery_layer.py",
    "policy_profile_engine": _ROOT / "anu_v3" / "policy_profile_engine.py",
    "parallel_batch_coordinator": _ROOT / "anu_v3"
    / "parallel_batch_coordinator.py",
    "callback_anchor": _ROOT / "utils"
    / "anu_delegation_completion_callback.py",
    "collector_entrypoint": _ROOT / "utils"
    / "normal_completion_callback_collector_entrypoint.py",
}

EXPECTED = {
    "task-2553+SYN43-nocron-done":
        ("NO_CRON_TASK_DONE", "NO_CRON_DONE_CANDIDATE", True),
    "task-2553+SYN43-recovery":
        ("RESULT_READY_NO_NORMAL_CALLBACK", "RECOVERY_CANDIDATE", True),
    "task-2553+SYN43-stale":
        ("STALE_OR_BOT_STUCK_CANDIDATE", "STALE_CANDIDATE", True),
    "task-2553+SYN43-drift":
        ("TRACK_MISMATCH", "DRIFT_CANDIDATE", True),
    "task-2553+SYN43-normal-completed":
        ("NORMAL_COLLECTOR_COMPLETED", "BENIGN_NORMAL_COMPLETED", False),
    "task-2553+SYN43-duplicate":
        ("DUPLICATE_CALLBACK_IGNORED", "BENIGN_DUPLICATE_IGNORED", False),
    "task-2553+SYN43-fallback-pending":
        ("FALLBACK_PENDING", "INFLIGHT_FALLBACK_PENDING", False),
    "task-2553+SYN43-running":
        ("RUNNING", "INFLIGHT_RUNNING", False),
}


def _sha(p: Path) -> str:
    return hashlib.sha256(Path(p).read_bytes()).hexdigest()


def _git(*args: str) -> str:
    return subprocess.run(
        ["git", "-C", str(_ROOT), *args],
        capture_output=True, text=True, check=False,
    ).stdout.strip()


class TestSweepEnumeration(unittest.TestCase):
    def test_01_all_candidate_types_classified(self):
        out = sweep_turn_boundary(_ROOT, FIX)
        got = {
            c["task_id"]: (
                c["classification"], c["candidate_type"], c["actionable"]
            )
            for c in out["candidates"]
        }
        self.assertEqual(got, EXPECTED)

    def test_02_actionable_set(self):
        out = sweep_turn_boundary(_ROOT, FIX)
        self.assertEqual(
            sorted(out["actionable_candidates"]),
            sorted(t for t, v in EXPECTED.items() if v[2]),
        )
        self.assertEqual(out["actionable_count"], 4)

    def test_03_recovery_invariant_preserved(self):
        ri = sweep_turn_boundary(_ROOT, FIX)["recovery_invariant"]
        self.assertTrue(ri["recovery_not_primary_ok"])
        self.assertEqual(ri["violations"], [])
        self.assertFalse(ri["replaces_callback_primary_path"])
        self.assertFalse(ri["discards_fallback_safety_path"])
        # standalone helper agrees
        self.assertTrue(
            recovery_invariant_status()["recovery_not_primary_ok"]
        )

    def test_04_candidate_order_deterministic(self):
        out = sweep_turn_boundary(_ROOT, FIX)
        ids = [c["task_id"] for c in out["candidates"]]
        self.assertEqual(ids, sorted(ids))


class TestZeroSideEffectStatic(unittest.TestCase):
    def test_05_no_forbidden_tokens_in_source(self):
        src = SWEEP_MOD.read_text(encoding="utf-8")
        # strip docstrings/comments so advisory prose (e.g. the word "cron"
        # in the §8 explanation) is not mistaken for a forbidden CALL.
        tree = ast.parse(src)
        code_only_lines = set()
        for node in ast.walk(tree):
            if isinstance(node, (ast.Call, ast.Attribute, ast.Name,
                                 ast.Import, ast.ImportFrom)):
                if hasattr(node, "lineno"):
                    code_only_lines.add(node.lineno)
        code_text = "\n".join(
            ln for i, ln in enumerate(src.splitlines(), 1)
            if i in code_only_lines
        )
        forbidden = [
            "subprocess", "os.system", "os.popen", "Popen",
            "shutil.", ".write_text(", ".write_bytes(",
            "os.remove(", "os.rename(", "os.replace(", "os.makedirs(",
            ".mkdir(", "cokacdir", "--cron", "git commit", "git merge",
            "git push", "emit=True",
        ]
        hits = [tok for tok in forbidden if tok in code_text]
        self.assertEqual(hits, [], f"forbidden tokens in code: {hits}")

    def test_06_no_write_mode_open_calls(self):
        tree = ast.parse(SWEEP_MOD.read_text(encoding="utf-8"))
        for node in ast.walk(tree):
            if isinstance(node, ast.Call):
                fn = node.func
                name = getattr(fn, "id", "") or getattr(fn, "attr", "")
                self.assertNotIn(
                    name, ("open",),
                    "module must not call open() at all (read delegated "
                    "to +31)",
                )

    def test_07_no_emit_true_call(self):
        # emit=True must never be passed anywhere in the module.
        tree = ast.parse(SWEEP_MOD.read_text(encoding="utf-8"))
        for node in ast.walk(tree):
            if isinstance(node, ast.keyword) and node.arg == "emit":
                self.assertFalse(
                    isinstance(node.value, ast.Constant)
                    and node.value.value is True,
                    "emit=True passed — forbidden read-only violation",
                )


class TestZeroSideEffectDynamic(unittest.TestCase):
    def test_08_runtime_write_cron_sentinels_zero(self):
        events = []
        real_open = builtins.open
        real_system = os.system
        real_run = subprocess.run
        real_popen = subprocess.Popen
        real_wt = Path.write_text
        real_wb = Path.write_bytes
        real_mkdir = Path.mkdir

        def guard_open(file, mode="r", *a, **k):
            if any(m in mode for m in ("w", "a", "x", "+")):
                events.append(("open-write", str(file), mode))
            return real_open(file, mode, *a, **k)

        def boom_system(cmd):
            events.append(("os.system", cmd))
            return 0

        def boom_run(*a, **k):
            events.append(("subprocess.run", a, k))

            class _R:
                returncode = 0
                stdout = ""
                stderr = ""
            return _R()

        def boom_popen(*a, **k):
            events.append(("subprocess.Popen", a, k))
            raise AssertionError("subprocess.Popen attempted")

        def guard_wt(self, *a, **k):
            events.append(("Path.write_text", str(self)))

        def guard_wb(self, *a, **k):
            events.append(("Path.write_bytes", str(self)))

        def guard_mkdir(self, *a, **k):
            events.append(("Path.mkdir", str(self)))

        builtins.open = guard_open
        os.system = boom_system
        subprocess.run = boom_run
        subprocess.Popen = boom_popen
        Path.write_text = guard_wt
        Path.write_bytes = guard_wb
        Path.mkdir = guard_mkdir
        try:
            out = sweep_turn_boundary(_ROOT, FIX)
            wc = wiring_candidates()
        finally:
            builtins.open = real_open
            os.system = real_system
            subprocess.run = real_run
            subprocess.Popen = real_popen
            Path.write_text = real_wt
            Path.write_bytes = real_wb
            Path.mkdir = real_mkdir

        self.assertEqual(events, [], f"side-effects captured: {events}")
        self.assertEqual(out["zero_side_effect_proof"]["write"], 0)
        self.assertEqual(out["zero_side_effect_proof"]["cron_register"], 0)
        self.assertEqual(out["zero_side_effect_proof"]["cron_remove"], 0)
        self.assertEqual(out["zero_side_effect_proof"]["merge"], 0)
        self.assertFalse(out["zero_side_effect_proof"]["emit_true_reached"])
        self.assertFalse(wc["applied"])

    def test_09_idempotent_byte_identical(self):
        runs = [
            json.dumps(sweep_turn_boundary(
                _ROOT, FIX, generated_ts_kst="FIXED"), sort_keys=True)
            for _ in range(5)
        ]
        self.assertEqual(len(set(runs)), 1, "sweep is not idempotent")
        wc = [
            json.dumps(wiring_candidates(), sort_keys=True)
            for _ in range(3)
        ]
        self.assertEqual(len(set(wc)), 1, "wiring_candidates not idempotent")


class TestFrozenByteZero(unittest.TestCase):
    def setUp(self):
        self.before = {k: _sha(p) for k, p in FROZEN.items()}
        self.git_head_before = _git("rev-parse", "HEAD")
        self.git_branch_before = _git("rev-parse", "--abbrev-ref", "HEAD")

    def test_10_frozen_anchors_byte0_and_git_invariant(self):
        sweep_turn_boundary(_ROOT, FIX)
        wiring_candidates()
        recovery_invariant_status()
        after = {k: _sha(p) for k, p in FROZEN.items()}
        self.assertEqual(self.before, after, "frozen anchor byte-0 broken")
        self.assertEqual(
            self.git_head_before, _git("rev-parse", "HEAD"),
            "git tracked HEAD changed",
        )
        self.assertEqual(
            self.git_branch_before,
            _git("rev-parse", "--abbrev-ref", "HEAD"),
            "git branch changed",
        )
        # +31 frozen anchor pinned exactly (task frozen anchor 83b3e307…)
        self.assertEqual(
            after["callback_anchor"],
            "83b3e307c8207c76a3e311c408aab4951373bd317896e51687d3007907b0c3d4",
        )

    def test_11_new_files_untracked(self):
        for p in (SWEEP_MOD, SCHEMA, FIX, Path(__file__)):
            tracked = _git("ls-files", "--error-unmatch",
                           str(p.relative_to(_ROOT)))
            self.assertEqual(
                tracked, "",
                f"{p} unexpectedly git-tracked (must stay untracked)",
            )


class TestSchemaConformance(unittest.TestCase):
    def test_12_sweep_output_schema(self):
        schema = json.loads(SCHEMA.read_text(encoding="utf-8"))
        out = sweep_turn_boundary(_ROOT, FIX)
        for req in schema["required"]:
            self.assertIn(req, out)
        self.assertEqual(out["schema"], SWEEP_RESULT_SCHEMA)
        self.assertEqual(
            out["schema"], schema["properties"]["schema"]["const"]
        )
        cand_props = set(
            schema["$defs"]["sweepCandidate"]["properties"].keys()
        )
        for c in out["candidates"]:
            self.assertEqual(set(c.keys()), cand_props)
        # zero-side-effect proof const-checks
        zse = schema["$defs"]["zeroSideEffectProof"]["properties"]
        for k, spec in zse.items():
            self.assertEqual(out["zero_side_effect_proof"][k], spec["const"])

    def test_13_wiring_candidate_schema(self):
        schema = json.loads(SCHEMA.read_text(encoding="utf-8"))
        wc = wiring_candidates()
        wdoc = schema["$defs"]["wiringCandidateDoc"]
        for req in wdoc["required"]:
            self.assertIn(req, wc)
        self.assertEqual(wc["schema"], WIRING_CANDIDATE_SCHEMA)
        self.assertFalse(wc["applied"])
        wcand_props = set(
            schema["$defs"]["wiringCandidate"]["properties"].keys()
        )
        for c in wc["candidates"]:
            self.assertEqual(set(c.keys()), wcand_props)
            self.assertIn(c["risk_tier"], ("LOW", "MEDIUM", "HIGH"))


if __name__ == "__main__":  # pragma: no cover
    unittest.main(verbosity=2)
