"""tests/regression/test_runtime_event_loop_2553plus54.py

task-2553+54 — ANU_RUNTIME_EVENT_LOOP_FROM_DURABLE_REGISTRY regression.

Spec: memory/tasks/task-2553+54.md
(sha256 97ff6dd95c65775f40c6cac242d2fc10010479df863e11b8c2019502de2bd3f4).

회장 §4 필수 regression 1~16 — 실 entrypoint 직접 호출 (mock-only FAIL):

  1  single task COMPLETED write-back -> NEXT_ACTION_READY
  2  batch 1/3 completed -> WAIT
  3  batch 3/3 completed -> ALL_SETTLED
  4  all-settled -> consolidated summary candidate
  5  fallback pending + normal completed -> non-blocking
  6  dead-man fired duplicate -> no duplicate next_action
  7  same event scanned twice -> idempotent
  8  self-chain completed record -> ignored/quarantined
  9  non-ANU collector record -> ignored/HOLD
  10 fixed-time/dead-man progress trigger -> FAIL
  11 missing registry -> no action / recovery state
  12 registry mismatch -> TRACK_MISMATCH/HOLD
  13 unauthorized dispatch -> proposal only
  14 authorized ANU-key dispatch candidate 생성
  15 +50/+51/+52 fixture (real registry) -> all-settled 재현
  16 +53 result를 event loop input으로 -> next summary READY 재현

모든 테스트 100% offline — network / git mutation / cron / dispatch /
cokacdir / subprocess 0. The real +44 ledger
(memory/events/callback_4tuple_index.jsonl) is NEVER mutated by this test:
synthetic cases use ISOLATED tmp ledgers; the live cases (15/16) consume
the real registry READ-ONLY (sha256 asserted unchanged).
"""
from __future__ import annotations

import hashlib
import json
import sys
from pathlib import Path

import pytest

WORKSPACE = Path(__file__).resolve().parent.parent.parent
if str(WORKSPACE) in sys.path:
    sys.path.remove(str(WORKSPACE))
sys.path.insert(0, str(WORKSPACE))

from anu_v3.callback_4tuple_registry import (  # noqa: E402  # pyright: ignore[reportMissingImports]
    Callback4TupleRegistry,
)
from anu_v3.runtime_event_loop import (  # noqa: E402  # pyright: ignore[reportMissingImports]
    ACTION_MODE_PROPOSAL,
    AUTHORITY_NONE,
    EVENT_NON_ANU_HOLD,
    EVENT_SELF_CHAIN_QUARANTINED,
    LOOP_ALL_SETTLED,
    LOOP_FORBIDDEN_TRIGGER,
    LOOP_NEXT_ACTION_READY,
    LOOP_NO_REGISTRY,
    LOOP_TRACK_MISMATCH,
    LOOP_WAIT,
    BatchSpec,
    RuntimeEventLoop,
    SingleTaskSpec,
    loop_result_envelope,
)

FIX = json.loads(
    (WORKSPACE / "memory/fixtures/task-2553plus54.cases.json").read_text(
        encoding="utf-8"
    )
)
ANU_KEY = FIX["anu_key"]
SELF_KEY = FIX["executor_self_key"]
REAL_LEDGER = WORKSPACE / "memory/events/callback_4tuple_index.jsonl"


def _sha256(p: Path) -> str:
    return hashlib.sha256(p.read_bytes()).hexdigest()


def _seed(tmp_path: Path, lines) -> Callback4TupleRegistry:
    led = tmp_path / "callback_4tuple_index.jsonl"
    with open(led, "w", encoding="utf-8") as fh:
        for ln in lines:
            fh.write(json.dumps(ln, ensure_ascii=False, sort_keys=True))
            fh.write("\n")
    return Callback4TupleRegistry(led)


def _writeback_line(task_id, track_id, batch_id, tpl):
    d = dict(tpl)
    d.update(
        {
            "task_id": task_id,
            "track_id": track_id,
            "batch_id": batch_id,
            "dispatch_id": f"derived:{task_id}",
            "normal_collector_cron_id": "ANU-normal-callback:" + ANU_KEY,
            "completed_at": "2026-05-18 21:30 KST",
            "source_result_path": f"memory/events/{task_id}.synth.json",
            "writeback_id": hashlib.sha256(
                f"{task_id}|{track_id}|{batch_id}".encode()
            ).hexdigest(),
        }
    )
    return d


# ── 1. single task COMPLETED write-back -> NEXT_ACTION_READY ─────────────────
def test_01_single_task_completed_next_action_ready(tmp_path):
    reg = _seed(tmp_path, FIX["single_task_completed"]["registry_lines"])
    loop = RuntimeEventLoop(reg, anu_keys=[ANU_KEY])
    res = loop.run(
        single_tasks=[SingleTaskSpec(task_id="task-2553+99-single")]
    )
    assert res.verdict == LOOP_NEXT_ACTION_READY
    assert res.single_task_results[0]["verdict"] == "NEXT_ACTION_READY"
    assert res.single_task_results[0]["trigger_source"] == (
        "registry_completed_event"
    )
    assert res.fixed_time_used is False and res.dead_man_used is False


# ── 2. batch 1/3 completed -> WAIT ──────────────────────────────────────────
def test_02_batch_partial_wait(tmp_path):
    bp = FIX["batch_partial"]
    tpl = bp["track_writeback_template"]
    only_a = _writeback_line(
        "task-2553+90", "TRACK A", bp["batch_id"], tpl
    )
    reg = _seed(tmp_path, [only_a])
    loop = RuntimeEventLoop(reg, anu_keys=[ANU_KEY])
    res = loop.run(
        batches=[
            BatchSpec(
                batch_id=bp["batch_id"],
                expected_tracks=[tuple(t) for t in bp["expected_tracks"]],
            )
        ]
    )
    assert res.verdict == LOOP_WAIT
    sj = res.batch_results[0]
    assert sj["all_settled"] is False
    assert len(sj["tracks_settled"]) == 1
    assert res.consolidated_summary_candidates == []


# ── 3. batch 3/3 completed -> ALL_SETTLED ───────────────────────────────────
def test_03_batch_all_settled(tmp_path):
    bp = FIX["batch_partial"]
    tpl = bp["track_writeback_template"]
    lines = [
        _writeback_line(tid, trk, bp["batch_id"], tpl)
        for trk, tid in [
            ("TRACK A", "task-2553+90"),
            ("TRACK B", "task-2553+91"),
            ("TRACK C", "task-2553+92"),
        ]
    ]
    reg = _seed(tmp_path, lines)
    loop = RuntimeEventLoop(reg, anu_keys=[ANU_KEY])
    res = loop.run(
        batches=[
            BatchSpec(
                batch_id=bp["batch_id"],
                expected_tracks=[tuple(t) for t in bp["expected_tracks"]],
            )
        ]
    )
    assert res.verdict == LOOP_ALL_SETTLED
    assert res.batch_results[0]["all_settled"] is True
    assert res.batch_results[0]["all_authoritative_pass"] is True
    assert res.batch_results[0]["evaluated_at_basis"] == (
        "normal_callback_durable_success_event"
    )


# ── 4. all-settled -> consolidated summary candidate ────────────────────────
def test_04_all_settled_consolidated_summary_candidate(tmp_path):
    bp = FIX["batch_partial"]
    tpl = bp["track_writeback_template"]
    lines = [
        _writeback_line(tid, trk, bp["batch_id"], tpl)
        for trk, tid in [
            ("TRACK A", "task-2553+90"),
            ("TRACK B", "task-2553+91"),
            ("TRACK C", "task-2553+92"),
        ]
    ]
    reg = _seed(tmp_path, lines)
    res = RuntimeEventLoop(reg, anu_keys=[ANU_KEY]).run(
        batches=[
            BatchSpec(
                batch_id=bp["batch_id"],
                expected_tracks=[tuple(t) for t in bp["expected_tracks"]],
            )
        ]
    )
    cands = res.consolidated_summary_candidates
    assert len(cands) == 1
    c = cands[0]
    assert c["status"] == "CONSOLIDATED_SUMMARY_CANDIDATE_READY"
    assert c["authority"] == AUTHORITY_NONE
    assert c["action_mode"] == ACTION_MODE_PROPOSAL
    assert c["auto_executed"] is False


# ── 5. fallback pending + normal completed -> non-blocking ──────────────────
def test_05_fallback_pending_non_blocking(tmp_path):
    # +99 single task carries a fallback_callback_cron_id (FB-99) that is
    # still pending; the COMPLETED task must still proceed.
    reg = _seed(tmp_path, FIX["single_task_completed"]["registry_lines"])
    res = RuntimeEventLoop(reg, anu_keys=[ANU_KEY]).run(
        single_tasks=[SingleTaskSpec(task_id="task-2553+99-single")]
    )
    assert res.verdict == LOOP_NEXT_ACTION_READY
    assert res.fallback_pending_non_blocking is True
    st = res.single_task_results[0]
    assert st["ready"] is True
    assert st["fallback_pending_non_blocking"] is True


# ── 6. dead-man fired duplicate -> no duplicate next_action ─────────────────
def test_06_dead_man_duplicate_no_duplicate_next_action(tmp_path):
    lines = list(FIX["single_task_completed"]["registry_lines"])
    lines.append(FIX["dead_man_duplicate"]["duplicate_completed_line"])
    reg = _seed(tmp_path, lines)
    res = RuntimeEventLoop(reg, anu_keys=[ANU_KEY]).run(
        single_tasks=[
            SingleTaskSpec(task_id="task-2553+99-single"),
            # an accidental duplicate spec must also be deduped.
            SingleTaskSpec(task_id="task-2553+99-single"),
        ],
        dead_man_signal=True,
    )
    # exactly one single-task verdict despite a duplicate COMPLETED line +
    # a duplicate spec + a dead-man signal.
    assert len(res.single_task_results) == 1
    assert res.verdict == LOOP_NEXT_ACTION_READY
    assert res.dead_man_signal_observed is True
    assert res.dead_man_used is False
    assert res.duplicate_events_suppressed >= 1


# ── 7. same event scanned twice -> idempotent ───────────────────────────────
def test_07_idempotent_repeated_runs(tmp_path):
    bp = FIX["batch_partial"]
    tpl = bp["track_writeback_template"]
    lines = [
        _writeback_line(tid, trk, bp["batch_id"], tpl)
        for trk, tid in [
            ("TRACK A", "task-2553+90"),
            ("TRACK B", "task-2553+91"),
            ("TRACK C", "task-2553+92"),
        ]
    ]
    reg = _seed(tmp_path, lines)
    pre = _sha256(reg.ledger_path)
    loop = RuntimeEventLoop(reg, anu_keys=[ANU_KEY])
    spec = [
        BatchSpec(
            batch_id=bp["batch_id"],
            expected_tracks=[tuple(t) for t in bp["expected_tracks"]],
        )
    ]
    r1 = loop.run(batches=spec).to_json()
    r2 = loop.run(batches=spec).to_json()
    # idempotent: identical result, registry byte-0 (read-only consumer),
    # exactly one summary candidate (never duplicated across runs).
    assert r1 == r2
    assert _sha256(reg.ledger_path) == pre
    assert len(r2["consolidated_summary_candidates"]) == 1


# ── 8. self-chain completed record -> ignored/quarantined ───────────────────
def test_08_self_chain_quarantined(tmp_path):
    reg = _seed(tmp_path, [FIX["self_chain_writeback"]["registry_line"]])
    res = RuntimeEventLoop(reg, anu_keys=[ANU_KEY]).run()
    kinds = [q["event_kind"] for q in res.quarantined_events]
    assert EVENT_SELF_CHAIN_QUARANTINED in kinds
    # the self-chain line is NOT counted as a valid progress event.
    assert all(
        e["event_kind"] != EVENT_SELF_CHAIN_QUARANTINED
        for e in res.events_observed
    )


# ── 9. non-ANU collector record -> ignored/HOLD ─────────────────────────────
def test_09_non_anu_collector_hold(tmp_path):
    reg = _seed(tmp_path, [FIX["non_anu_writeback"]["registry_line"]])
    res = RuntimeEventLoop(reg, anu_keys=[ANU_KEY]).run()
    kinds = [q["event_kind"] for q in res.quarantined_events]
    assert EVENT_NON_ANU_HOLD in kinds
    assert all(
        e["event_kind"] != EVENT_NON_ANU_HOLD
        for e in res.events_observed
    )


# ── 10. fixed-time / dead-man progress trigger -> FAIL ──────────────────────
@pytest.mark.parametrize("bad", ["fixed_time_gate", "dead_man_fallback"])
def test_10_forbidden_progress_trigger(tmp_path, bad):
    reg = _seed(tmp_path, FIX["single_task_completed"]["registry_lines"])
    res = RuntimeEventLoop(reg, anu_keys=[ANU_KEY]).run(
        single_tasks=[SingleTaskSpec(task_id="task-2553+99-single")],
        progress_trigger_source=bad,
    )
    assert res.verdict == LOOP_FORBIDDEN_TRIGGER
    assert res.ok is False
    assert res.progress_trigger is None
    if bad == "fixed_time_gate":
        assert res.fixed_time_used is True
    else:
        assert res.dead_man_used is True
    # no next_action / summary / dispatch produced on a forbidden trigger.
    assert res.single_task_results == []
    assert res.consolidated_summary_candidates == []
    assert res.dispatch_candidates == []


# ── 11. missing registry -> no action / recovery state ──────────────────────
def test_11_missing_registry_no_action(tmp_path):
    reg = Callback4TupleRegistry(tmp_path / "does-not-exist.jsonl")
    res = RuntimeEventLoop(reg, anu_keys=[ANU_KEY]).run(
        single_tasks=[SingleTaskSpec(task_id="task-anything")]
    )
    assert res.verdict == LOOP_NO_REGISTRY
    assert res.single_task_results == []
    assert res.consolidated_summary_candidates == []
    assert res.dispatch_candidates == []
    assert any("recovery state" in r for r in res.reasons)


# ── 12. registry mismatch -> TRACK_MISMATCH/HOLD ────────────────────────────
def test_12_registry_mismatch_hold(tmp_path):
    m = FIX["mismatch"]
    reg = _seed(tmp_path, [m["registry_line"]])
    res = RuntimeEventLoop(reg, anu_keys=[ANU_KEY]).run(
        single_tasks=[
            SingleTaskSpec(
                task_id="task-2553+96-mismatch",
                expected_dispatch_id=m["expected_dispatch_id"],
            )
        ]
    )
    assert res.verdict == LOOP_TRACK_MISMATCH
    assert res.hold_for_chair is True
    assert res.single_task_results[0]["verdict"] == "TRACK_MISMATCH"


# ── 13. unauthorized dispatch -> proposal only ──────────────────────────────
def test_13_dispatch_proposal_only(tmp_path):
    reg = _seed(tmp_path, FIX["single_task_completed"]["registry_lines"])
    res = RuntimeEventLoop(reg, anu_keys=[ANU_KEY]).run(
        single_tasks=[
            SingleTaskSpec(
                task_id="task-2553+99-single",
                next_phase=FIX["single_task_completed"]["next_phase"],
            )
        ]
    )
    assert res.authority == AUTHORITY_NONE
    assert res.action_mode == ACTION_MODE_PROPOSAL
    assert len(res.dispatch_candidates) == 1
    dc = res.dispatch_candidates[0]
    assert dc["authority"] == AUTHORITY_NONE
    assert dc["action_mode"] == ACTION_MODE_PROPOSAL
    assert dc["auto_executed"] is False
    assert dc["anu_key_owner_required"] is True
    # the loop NEVER fires the executor self key.
    assert SELF_KEY not in dc["anu_keys"]


# ── 14. authorized ANU-key dispatch candidate 생성 ──────────────────────────
def test_14_authorized_anu_key_dispatch_candidate(tmp_path):
    lb = FIX["live_batch"]
    reg = Callback4TupleRegistry(REAL_LEDGER)
    pre = _sha256(REAL_LEDGER)
    res = RuntimeEventLoop(reg, anu_keys=[ANU_KEY]).run(
        batches=[
            BatchSpec(
                batch_id=lb["batch_id"],
                expected_tracks=[tuple(t) for t in lb["expected_tracks"]],
                next_phase=lb["next_phase"],
            )
        ]
    )
    assert res.verdict == LOOP_ALL_SETTLED
    assert len(res.dispatch_candidates) == 1
    dc = res.dispatch_candidates[0]
    assert dc["anu_keys"] == [ANU_KEY]
    assert dc["anu_keys_resolvable"] is True
    assert dc["auto_executed"] is False
    # real registry consumed READ-ONLY — byte-0.
    assert _sha256(REAL_LEDGER) == pre


# ── 15. +50/+51/+52 fixture (real registry) -> all-settled 재현 ─────────────
def test_15_live_50_51_52_all_settled(tmp_path):
    lb = FIX["live_batch"]
    reg = Callback4TupleRegistry(REAL_LEDGER)
    pre = _sha256(REAL_LEDGER)
    res = RuntimeEventLoop(reg, anu_keys=[ANU_KEY]).run(
        batches=[
            BatchSpec(
                batch_id=lb["batch_id"],
                expected_tracks=[tuple(t) for t in lb["expected_tracks"]],
            )
        ]
    )
    assert res.verdict == LOOP_ALL_SETTLED
    sj = res.batch_results[0]
    assert sj["all_settled"] is True
    assert sj["all_authoritative_pass"] is True
    assert sj["tracks_settled"] == ["TRACK 1", "TRACK 2", "TRACK 3"]
    assert _sha256(REAL_LEDGER) == pre  # read-only consumer


# ── 16. +53 result를 event loop input으로 -> next summary READY 재현 ────────
def test_16_plus53_result_as_input_summary_ready(tmp_path):
    lb = FIX["live_batch"]
    # The +53 durable-success write-back already populated the real
    # registry; the event loop consumes that state and immediately
    # reproduces the consolidated summary candidate READY.
    plus53 = json.loads(
        (WORKSPACE / "memory/events/task-2553+53.result.json").read_text(
            encoding="utf-8"
        )
    )
    assert plus53["hold_for_chair"] is False
    reg = Callback4TupleRegistry(REAL_LEDGER)
    res = RuntimeEventLoop(reg, anu_keys=[ANU_KEY]).run(
        batches=[
            BatchSpec(
                batch_id=lb["batch_id"],
                expected_tracks=[tuple(t) for t in lb["expected_tracks"]],
                next_phase=lb["next_phase"],
            )
        ]
    )
    cands = res.consolidated_summary_candidates
    assert len(cands) == 1
    assert cands[0]["status"] == "CONSOLIDATED_SUMMARY_CANDIDATE_READY"
    assert cands[0]["all_authoritative_pass"] is True
    env = loop_result_envelope(res, generated_at_kst="2026-05-18 21:45 KST")
    assert env["schema"] == "task-2553+54.runtime-event-loop-result.v1"
    assert env["auto_executed"] is False
    assert env["result"]["verdict"] == LOOP_ALL_SETTLED


# ── invariants: no self key, proposal-only, schema-valid ────────────────────
def test_17_no_self_collector_no_self_dispatch_invariant(tmp_path):
    reg = _seed(tmp_path, FIX["single_task_completed"]["registry_lines"])
    res = RuntimeEventLoop(reg, anu_keys=[ANU_KEY]).run(
        single_tasks=[
            SingleTaskSpec(
                task_id="task-2553+99-single",
                next_phase={"phase": "x"},
            )
        ]
    )
    blob = json.dumps(res.to_json())
    assert SELF_KEY not in blob  # executor self key never appears
    assert res.authority == "none"
    assert res.action_mode == "proposal"
