# -*- coding: utf-8 -*-
"""anu_v3.runtime_event_loop — ANU runtime event loop driven by the durable
4-tuple registry (task-2553+54, 회장 정정·결정).

회장 §1 verbatim: "우리가 원하는 것은 사후 복원이 아니라 즉시 진행이다.
normal callback durable-success → registry write-back → ANU runtime event
loop가 즉시 감지 → batch coordinator 취합 → 다음 작업 dispatch 또는
consolidated summary 생성까지 자동 진행."

Problem (회장 §2 diagnosis): +44 durable registry, +47 read-only event
trigger, +49 self-chain quarantine selector and +53 durable-success
write-back / batch-settle evaluator each exist, but there was no single
*runtime event loop* that interprets a durable-registry append/update as a
progress event and immediately drives the next step (single-task next_action
/ batch all-settled / consolidated summary candidate / next-phase ANU-key
dispatch candidate) without waiting for a fixed-time gate or a dead-man
fallback.

This module is that loop. It is a pure, read-only consumer (9-R.1):

  * ``callback_4tuple_index.jsonl`` is the **state source** — read-only.
    This module NEVER writes the registry; the durable-success write-back
    is +53's sole responsibility (no new write-back logic here).
  * ``schedule_history`` is an audit log — it is NEVER a progress trigger.
  * ``cron-list`` is the current reservation state — NEVER an authority.
  * a fixed-time gate / dead-man fallback is a missed-callback safety net —
    NEVER a progress trigger. Offering one as the progress trigger is a
    hard FAIL (``FORBIDDEN_TRIGGER_SOURCE``, 회장 §3/§5 verbatim).
  * the ONLY accepted progress trigger is a durable-registry
    ``registry_completed_event`` (a normal-callback durable-success
    append/update).

Reuse, not re-implement (회장 §10 9-R.1):

  * +44 ``anu_v3.callback_4tuple_registry`` — read-only registry scan.
  * +47 ``anu_v3.callback_event_trigger.CallbackEventTrigger`` — the
    single-task next_action resolver (already registry-event-driven).
  * +49 ``anu_v3.authoritative_verdict_selector`` /
    ``anu_v3.self_collector_guard`` — self-chain verdict is permanently
    quarantined; a non-ANU collector record is HOLD.
  * +53 ``anu_v3.batch_settle_writeback.evaluate_batch_settle`` — batch
    all-settled over the durable-success write-back lines.

Authority boundary (회장 §2.12 / §8): the loop only *detects / evaluates /
proposes*. It has NO dispatch / merge / write authority. Every next-phase
ANU-key dispatch and every consolidated summary is emitted as a **proposal
JSON only** — zero auto-dispatch / auto-merge / auto-closeout.

Idempotency (회장 §2.9 / §2.10): the loop is a pure deterministic function
of the registry state. Each progress event has a deterministic ``event_id``;
duplicate events (e.g. a dead-man-fired duplicate COMPLETED line, or the
same event scanned twice) are de-duplicated so the loop yields exactly one
next_action / one consolidated summary / one dispatch candidate per
task/batch and never a duplicate.

Layer A / NO-CRON: ZERO cron register/remove, ZERO dispatch, ZERO merge,
ZERO ``cokacdir`` / ``subprocess`` exec, ZERO registry write.
"""
from __future__ import annotations

import hashlib
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Sequence, Tuple

from anu_v3.authoritative_verdict_selector import (  # pyright: ignore[reportMissingImports]  # noqa: E501
    AUTHORITATIVE_PASS,
    COLLECTOR_ROLE_ANU,
    VerdictRecord,
    select_authoritative_verdict,
)
from anu_v3.batch_settle_writeback import (  # pyright: ignore[reportMissingImports]  # noqa: E501
    TERMINAL_WRITEBACK_VERDICTS,
    DURABLE_SUCCESS_WRITEBACK_SCHEMA,
    evaluate_batch_settle,
)
from anu_v3.callback_4tuple_registry import (  # pyright: ignore[reportMissingImports]  # noqa: E501
    Callback4TupleRegistry,
)
from anu_v3.callback_event_trigger import (  # pyright: ignore[reportMissingImports]  # noqa: E501
    CallbackEventTrigger,
    FORBIDDEN_PROGRESS_TRIGGERS,
    NEXT_ACTION_READY,
    TRIGGER_DEAD_MAN,
    TRIGGER_FIXED_TIME,
    TRIGGER_REGISTRY_COMPLETED,
)
from anu_v3.self_collector_guard import (  # pyright: ignore[reportMissingImports]  # noqa: E501
    guard_self_collector_session,
)

LOOP_SCHEMA = "anu_v3.runtime_event_loop.v1"
RESULT_SCHEMA = "runtime_event_loop_result.v1"

# §3 — the durable registry append/update is the ONLY progress trigger.
PROGRESS_TRIGGER = TRIGGER_REGISTRY_COMPLETED

# ── progress-event interpretation of a durable registry line ─────────────────
EVENT_NORMAL_CALLBACK_DURABLE_SUCCESS = "normal_callback_durable_success"
EVENT_DURABLE_SUCCESS_WRITEBACK = "durable_success_writeback"
EVENT_REGISTERED_NO_PROGRESS = "registered_no_progress"
EVENT_SELF_CHAIN_QUARANTINED = "self_chain_quarantined_ignored"
EVENT_NON_ANU_HOLD = "non_anu_collector_hold"

# ── loop verdicts ────────────────────────────────────────────────────────────
LOOP_NEXT_ACTION_READY = "NEXT_ACTION_READY"
LOOP_WAIT = "WAIT"
LOOP_ALL_SETTLED = "ALL_SETTLED"
LOOP_NO_REGISTRY = "NO_REGISTRY_NO_ACTION"
LOOP_FORBIDDEN_TRIGGER = "FORBIDDEN_TRIGGER_SOURCE"
LOOP_TRACK_MISMATCH = "TRACK_MISMATCH"
LOOP_HOLD_FOR_CHAIR = "HOLD_FOR_CHAIR"

# Every loop output is a proposal — the loop has NO write/dispatch authority.
AUTHORITY_NONE = "none"
ACTION_MODE_PROPOSAL = "proposal"

# Independent-ANU collector key set is the single authority for "next phase
# ANU-key dispatch candidate" ownership (회장 §9). Resolved lazily so an
# unresolvable key set escalates instead of silently self-dispatching.
try:  # pragma: no cover - import-guard only
    from dispatch.callback_owner_enforcer import (  # pyright: ignore[reportMissingImports]  # noqa: E501
        DEFAULT_ANU_KEYS as _ANU_KEYS,
    )
    DEFAULT_ANU_KEYS: Tuple[str, ...] = tuple(sorted(_ANU_KEYS))
except Exception:  # pragma: no cover
    DEFAULT_ANU_KEYS = ()


def _sha(*parts: object) -> str:
    return hashlib.sha256(
        "|".join("" if p is None else str(p) for p in parts).encode("utf-8")
    ).hexdigest()


# ── single-task / batch input specs ──────────────────────────────────────────
@dataclass(frozen=True)
class SingleTaskSpec:
    """A single (non-parallel) task watched by the loop.

    ``next_phase`` (optional) declares a follow-up ANU-key dispatch — it is
    emitted as a proposal only (회장 §2.6 / §8). ``collector_key`` /
    ``executor_key`` (optional) let the loop run the +49 self-chain
    quarantine over a single-task durable-success record."""

    task_id: str
    expected_dispatch_id: Optional[str] = None
    expected_chat_id: Optional[str] = None
    expected_dispatch_cron_id: Optional[str] = None
    next_action_kind: str = "dispatch_next_task"
    next_action_payload: Optional[Dict[str, object]] = None
    next_phase: Optional[Dict[str, object]] = None
    collector_key: Optional[str] = None
    executor_key: Optional[str] = None


@dataclass(frozen=True)
class BatchSpec:
    """A parallel batch watched by the loop. ``expected_tracks`` =
    sequence of (track_id, task_id). ``next_phase`` (optional) declares the
    post-consolidation ANU-key dispatch — proposal only."""

    batch_id: str
    expected_tracks: Sequence[Tuple[str, str]]
    this_track_id: Optional[str] = None
    next_phase: Optional[Dict[str, object]] = None


# ── results ──────────────────────────────────────────────────────────────────
@dataclass
class RuntimeEventLoopResult:
    schema: str
    generated_at_kst: str
    registry_path: str
    progress_trigger: Optional[str]
    verdict: str
    fixed_time_used: bool
    dead_man_used: bool
    dead_man_signal_observed: bool
    fallback_pending_non_blocking: bool
    authority: str
    action_mode: str
    events_observed: List[Dict[str, object]]
    duplicate_events_suppressed: int
    single_task_results: List[Dict[str, object]]
    batch_results: List[Dict[str, object]]
    consolidated_summary_candidates: List[Dict[str, object]]
    dispatch_candidates: List[Dict[str, object]]
    quarantined_events: List[Dict[str, object]]
    hold_for_chair: bool
    reasons: List[str] = field(default_factory=list)

    @property
    def ok(self) -> bool:
        return self.verdict not in (
            LOOP_FORBIDDEN_TRIGGER,
            LOOP_HOLD_FOR_CHAIR,
        )

    def to_json(self) -> Dict[str, object]:
        return {
            "schema": self.schema,
            "generated_at_kst": self.generated_at_kst,
            "registry_path": self.registry_path,
            "progress_trigger": self.progress_trigger,
            "verdict": self.verdict,
            "fixed_time_used": self.fixed_time_used,
            "dead_man_used": self.dead_man_used,
            "dead_man_signal_observed": self.dead_man_signal_observed,
            "fallback_pending_non_blocking": (
                self.fallback_pending_non_blocking
            ),
            "authority": self.authority,
            "action_mode": self.action_mode,
            "events_observed": list(self.events_observed),
            "duplicate_events_suppressed": self.duplicate_events_suppressed,
            "single_task_results": list(self.single_task_results),
            "batch_results": list(self.batch_results),
            "consolidated_summary_candidates": list(
                self.consolidated_summary_candidates
            ),
            "dispatch_candidates": list(self.dispatch_candidates),
            "quarantined_events": list(self.quarantined_events),
            "hold_for_chair": self.hold_for_chair,
            "reasons": list(self.reasons),
        }


class RuntimeEventLoop:
    """Durable-registry-driven runtime event loop (read-only consumer).

    The loop never writes the registry, never registers/cancels a cron,
    never dispatches/merges. ``run`` is idempotent: repeated runs over the
    same registry state yield an identical result and zero side-effects.
    """

    def __init__(
        self,
        registry: Callback4TupleRegistry,
        *,
        anu_keys: Sequence[str] = DEFAULT_ANU_KEYS,
    ) -> None:
        self.registry = registry
        self.anu_keys = tuple(k for k in anu_keys if k)
        self._trigger = CallbackEventTrigger(registry)

    # -- registry read helpers (read-only) -----------------------------
    def _registry_present(self) -> bool:
        try:
            return self.registry.ledger_path.is_file()
        except OSError:
            return False

    def _all_lines(self) -> List[Dict[str, object]]:
        """All raw JSONL lines (fail-safe; corrupt lines skipped). The
        loop only READS — it never appends here (no +53 write-back)."""
        import json

        out: List[Dict[str, object]] = []
        try:
            if not self.registry.ledger_path.is_file():
                return out
            text = self.registry.ledger_path.read_text(encoding="utf-8")
        except OSError:
            return out
        for raw in text.splitlines():
            raw = raw.strip()
            if not raw:
                continue
            try:
                d = json.loads(raw)
            except ValueError:
                continue
            if isinstance(d, dict):
                out.append(d)
        return out

    # -- progress-event classification (회장 §2.1 / reg 8/9) ------------
    def _classify_writeback_event(
        self, line: Dict[str, object]
    ) -> Tuple[str, List[str]]:
        """Interpret a +53 durable_success_writeback line as a progress
        event, routing self-chain / non-ANU collectors through +49.

        Returns (event_kind, reasons). Self-chain origin -> permanently
        quarantined (ignored as a progress trigger). collector_role != ANU
        -> HOLD. Independent-ANU AUTHORITATIVE_PASS -> a valid durable-
        success progress event.
        """
        task_id = str(line.get("task_id") or "")
        collector_key = str(line.get("collector_key") or "")
        executor_key = str(line.get("executor_key") or "")
        collector_role = str(line.get("collector_role") or "")
        verdict = str(line.get("authoritative_verdict") or "")
        self_session = bool(collector_key) and collector_key == executor_key

        if collector_role and collector_role != COLLECTOR_ROLE_ANU:
            return EVENT_NON_ANU_HOLD, [
                f"{task_id}: collector_role={collector_role!r} != 'ANU' — "
                "non-ANU collector record is NOT a progress trigger; "
                "routed to HOLD via +49 guard (회장 §8 / reg 9)."
            ]

        g = guard_self_collector_session(
            executor_key=executor_key,
            collector_key=collector_key,
            collector_role=collector_role or COLLECTOR_ROLE_ANU,
            is_executor_self_session=self_session,
        )
        sel = select_authoritative_verdict(
            [
                VerdictRecord(
                    kind="collector_result",
                    verdict="PASS"
                    if verdict in TERMINAL_WRITEBACK_VERDICTS
                    else (verdict or "FAIL"),
                    task_id=task_id,
                    executor_key=executor_key,
                    collector_key=collector_key,
                    collector_role=collector_role or COLLECTOR_ROLE_ANU,
                    session_is_executor_self=self_session,
                    claimed_origin="independent_anu",
                    detail=verdict,
                )
            ],
            task_id=task_id,
            anu_keys=self.anu_keys,
        )
        if not g.ok or sel.classification != AUTHORITATIVE_PASS or (
            sel.independent_anu_count < 1
        ):
            return EVENT_SELF_CHAIN_QUARANTINED, [
                f"{task_id}: +49 selector/guard did NOT confirm an "
                f"independent-ANU AUTHORITATIVE_PASS (selector="
                f"{sel.classification!r}, guard_ok={g.ok}) — self-chain "
                "verdict permanently QUARANTINED, ignored as a progress "
                "trigger (회장 §5.D / reg 8)."
            ]
        return EVENT_DURABLE_SUCCESS_WRITEBACK, [
            f"{task_id}: independent-ANU AUTHORITATIVE_PASS durable-success "
            "write-back — valid registry progress event (회장 §2.1)."
        ]

    def _build_events(
        self,
    ) -> Tuple[List[Dict[str, object]], List[Dict[str, object]], int]:
        """Scan the registry and build the de-duplicated progress-event
        list (+ quarantined list + duplicate-suppressed count).

        Idempotency (회장 §2.10): each event has a deterministic
        ``event_id``; an identical event observed again (a dead-man-fired
        duplicate COMPLETED line, or the same registry scanned twice) is
        suppressed — counted once, never re-driven."""
        events: List[Dict[str, object]] = []
        quarantined: List[Dict[str, object]] = []
        seen: set[str] = set()
        suppressed = 0
        for ln in self._all_lines():
            schema = str(ln.get("schema") or "")
            if schema == DURABLE_SUCCESS_WRITEBACK_SCHEMA:
                kind, why = self._classify_writeback_event(ln)
                eid = _sha(
                    schema,
                    ln.get("task_id"),
                    ln.get("batch_id"),
                    ln.get("writeback_id"),
                    ln.get("writeback_classification"),
                    kind,
                )
                rec = {
                    "event_id": eid,
                    "event_kind": kind,
                    "schema_kind": schema,
                    "task_id": ln.get("task_id"),
                    "batch_id": ln.get("batch_id"),
                    "track_id": ln.get("track_id"),
                    "authoritative_verdict": ln.get(
                        "authoritative_verdict"
                    ),
                    "writeback_classification": ln.get(
                        "writeback_classification"
                    ),
                    "reasons": why,
                }
                if eid in seen:
                    suppressed += 1
                    continue
                seen.add(eid)
                if kind in (
                    EVENT_SELF_CHAIN_QUARANTINED,
                    EVENT_NON_ANU_HOLD,
                ):
                    quarantined.append(rec)
                else:
                    events.append(rec)
            else:
                # +44 callback_4tuple_ledger_record.v1 — a COMPLETED line is
                # a single-task normal-callback durable-success event; a
                # REGISTERED line is not yet a progress event.
                status = str(ln.get("status") or "")
                kind = (
                    EVENT_NORMAL_CALLBACK_DURABLE_SUCCESS
                    if status == "COMPLETED"
                    else EVENT_REGISTERED_NO_PROGRESS
                )
                eid = _sha(
                    schema or "callback_4tuple_ledger_record.v1",
                    ln.get("task_id"),
                    ln.get("dispatch_id"),
                    ln.get("normal_collector_cron_id"),
                    status,
                    kind,
                )
                rec = {
                    "event_id": eid,
                    "event_kind": kind,
                    "schema_kind": (
                        schema or "callback_4tuple_ledger_record.v1"
                    ),
                    "task_id": ln.get("task_id"),
                    "batch_id": None,
                    "status": status,
                    "role": ln.get("role"),
                    "reasons": [],
                }
                if eid in seen:
                    suppressed += 1
                    continue
                seen.add(eid)
                events.append(rec)
        return events, quarantined, suppressed

    # -- proposal builders (proposal-only — 회장 §2.12 / §8) -----------
    def _dispatch_candidate(
        self,
        *,
        source: str,
        source_id: str,
        next_phase: Dict[str, object],
        gate_evidence: Dict[str, object],
    ) -> Dict[str, object]:
        """Build a next-phase ANU-key dispatch **candidate** (proposal).

        The loop has NO dispatch authority. The candidate names the
        independent-ANU key set as the only legitimate owner; an
        unresolvable ANU key set is surfaced (never a silent self-dispatch
        — 회장 §5 verbatim forbids executor self-dispatch)."""
        anu_resolvable = bool(self.anu_keys)
        return {
            "schema": "anu_v3.runtime_event_loop.dispatch_candidate.v1",
            "authority": AUTHORITY_NONE,
            "action_mode": ACTION_MODE_PROPOSAL,
            "auto_executed": False,
            "source": source,
            "source_id": source_id,
            "anu_key_owner_required": True,
            "anu_keys": list(self.anu_keys),
            "anu_keys_resolvable": anu_resolvable,
            "next_phase": dict(next_phase),
            "gate_evidence": dict(gate_evidence),
            "note": (
                "PROPOSAL ONLY — next-phase dispatch must be enacted by an "
                "independent ANU-key session; the event loop performs ZERO "
                "auto-dispatch / merge / closeout (회장 §2.6/§2.12/§8). "
                "executor self-dispatch is forbidden (회장 §5)."
            ),
        }

    def _summary_candidate(
        self, batch_id: str, settle_json: Dict[str, object]
    ) -> Dict[str, object]:
        return {
            "schema": "anu_v3.runtime_event_loop.consolidated_summary"
            "_candidate.v1",
            "authority": AUTHORITY_NONE,
            "action_mode": ACTION_MODE_PROPOSAL,
            "auto_executed": False,
            "kind": "CANDIDATE",
            "status": "CONSOLIDATED_SUMMARY_CANDIDATE_READY",
            "batch_id": batch_id,
            "generated_basis": settle_json.get("evaluated_at_basis"),
            "all_settled": settle_json.get("all_settled"),
            "all_authoritative_pass": settle_json.get(
                "all_authoritative_pass"
            ),
            "track_states": settle_json.get("track_states"),
            "note": (
                "PROPOSAL ONLY — consolidated summary candidate generated "
                "by the runtime event loop on the normal-callback durable-"
                "success event (회장 §2.5). No auto-closeout."
            ),
        }

    # -- the loop ------------------------------------------------------
    def run(
        self,
        *,
        single_tasks: Sequence[SingleTaskSpec] = (),
        batches: Sequence[BatchSpec] = (),
        progress_trigger_source: str = PROGRESS_TRIGGER,
        dead_man_signal: bool = False,
        generated_at_kst: str = "",
    ) -> RuntimeEventLoopResult:
        """Interpret durable-registry append/update as progress events and
        immediately drive the next step (회장 §2 1~12).

        * ``progress_trigger_source`` MUST be ``registry_completed_event``.
          A ``fixed_time_gate`` / ``dead_man_fallback`` -> hard FAIL
          (``FORBIDDEN_TRIGGER_SOURCE``; 회장 §3/§5 verbatim, reg 10).
        * a ``dead_man_signal`` is recorded as observed but is NEVER
          promoted to a progress trigger (회장 §3, reg 6).
        * a missing registry -> ``NO_REGISTRY_NO_ACTION`` recovery state,
          no action (reg 11).
        * single task: +47 ``CallbackEventTrigger.scan`` -> READY/WAIT;
          identity mismatch -> ``TRACK_MISMATCH`` (reg 1/12).
        * batch: +53 ``evaluate_batch_settle`` -> WAIT (partial) /
          ALL_SETTLED (every track durable-success). all-settled &
          all-AUTHORITATIVE_PASS -> consolidated summary candidate (reg
          2/3/4/15/16).
        * a declared ``next_phase`` -> ANU-key dispatch **candidate**
          (proposal only, reg 13/14; 회장 §2.12/§8).
        * fallback/dead-man pending is NON-blocking (reg 5).
        * idempotent across repeated runs and duplicate events (reg 6/7).
        """
        reasons: List[str] = []

        # 1. §3/§5 — only the registry-completed event is a progress
        #    trigger; a fixed-time / dead-man source is a hard FAIL.
        if progress_trigger_source in FORBIDDEN_PROGRESS_TRIGGERS:
            reasons.append(
                f"progress_trigger_source={progress_trigger_source!r} is a "
                f"{TRIGGER_FIXED_TIME!r}/{TRIGGER_DEAD_MAN!r}-class trigger "
                "— FORBIDDEN as a progress trigger; only "
                f"{PROGRESS_TRIGGER!r} (a durable-registry append/update) "
                "drives progress (회장 §3/§5 verbatim, reg 10)."
            )
            return RuntimeEventLoopResult(
                schema=RESULT_SCHEMA,
                generated_at_kst=generated_at_kst,
                registry_path=str(self.registry.ledger_path),
                progress_trigger=None,
                verdict=LOOP_FORBIDDEN_TRIGGER,
                fixed_time_used=(
                    progress_trigger_source == TRIGGER_FIXED_TIME
                ),
                dead_man_used=(
                    progress_trigger_source == TRIGGER_DEAD_MAN
                ),
                dead_man_signal_observed=dead_man_signal,
                fallback_pending_non_blocking=True,
                authority=AUTHORITY_NONE,
                action_mode=ACTION_MODE_PROPOSAL,
                events_observed=[],
                duplicate_events_suppressed=0,
                single_task_results=[],
                batch_results=[],
                consolidated_summary_candidates=[],
                dispatch_candidates=[],
                quarantined_events=[],
                hold_for_chair=False,
                reasons=reasons,
            )

        # 2. missing registry -> no action, recovery state (reg 11).
        if not self._registry_present():
            reasons.append(
                "durable registry absent — NO action; this is the correct "
                "fail-safe recovery state (defer to schedule_history audit "
                "+ canonical artifact, never a misjudged progress). The "
                "registry is the state source; its absence is not a "
                "trigger (회장 §3, reg 11)."
            )
            return RuntimeEventLoopResult(
                schema=RESULT_SCHEMA,
                generated_at_kst=generated_at_kst,
                registry_path=str(self.registry.ledger_path),
                progress_trigger=PROGRESS_TRIGGER,
                verdict=LOOP_NO_REGISTRY,
                fixed_time_used=False,
                dead_man_used=False,
                dead_man_signal_observed=dead_man_signal,
                fallback_pending_non_blocking=True,
                authority=AUTHORITY_NONE,
                action_mode=ACTION_MODE_PROPOSAL,
                events_observed=[],
                duplicate_events_suppressed=0,
                single_task_results=[],
                batch_results=[],
                consolidated_summary_candidates=[],
                dispatch_candidates=[],
                quarantined_events=[],
                hold_for_chair=False,
                reasons=reasons,
            )

        if dead_man_signal:
            reasons.append(
                "a dead-man signal was OBSERVED but is EXPLICITLY NOT "
                "promoted to a progress trigger — missed-callback safety "
                "net only (회장 §3, reg 6)."
            )

        # 3. build de-duplicated progress events (reg 7/8/9).
        events, quarantined, suppressed = self._build_events()
        if suppressed:
            reasons.append(
                f"{suppressed} duplicate registry event(s) suppressed "
                "(deterministic event_id) — idempotent: a re-scanned / "
                "dead-man-duplicated event is counted once, never re-"
                "driven (회장 §2.10, reg 6/7)."
            )
        for q in quarantined:
            q_reasons = q.get("reasons", [])
            if isinstance(q_reasons, list):
                reasons.extend(str(x) for x in q_reasons)

        # 4. single-task next_action via the +47 registry-event resolver.
        single_results: List[Dict[str, object]] = []
        dispatch_candidates: List[Dict[str, object]] = []
        verdicts: List[str] = []
        hold = False
        seen_task: set[str] = set()
        for spec in single_tasks:
            if spec.task_id in seen_task:
                continue  # idempotent: one verdict per task (reg 7)
            seen_task.add(spec.task_id)
            r = self._trigger.scan(
                task_id=spec.task_id,
                expected_dispatch_id=spec.expected_dispatch_id,
                expected_chat_id=spec.expected_chat_id,
                expected_dispatch_cron_id=spec.expected_dispatch_cron_id,
                dependency_trigger_source=PROGRESS_TRIGGER,
                next_action_kind=spec.next_action_kind,
                next_action_payload=spec.next_action_payload,
                dead_man_signal=dead_man_signal,
            )
            rj = r.to_json()
            single_results.append(rj)
            verdicts.append(r.verdict)
            if r.verdict == "TRACK_MISMATCH":
                hold = True
                reasons.append(
                    f"{spec.task_id}: registry identity mismatch -> "
                    "TRACK_MISMATCH/HOLD (an unrelated track's callback is "
                    "never cited as progress; 회장 §6, reg 12)."
                )
            if (
                r.ready
                and r.verdict == NEXT_ACTION_READY
                and spec.next_phase
            ):
                dispatch_candidates.append(
                    self._dispatch_candidate(
                        source="single_task",
                        source_id=spec.task_id,
                        next_phase=spec.next_phase,
                        gate_evidence={
                            "trigger_source": r.trigger_source,
                            "fixed_time_used": r.fixed_time_used,
                            "dead_man_used": r.dead_man_used,
                        },
                    )
                )

        # 5. batch all-settled via the +53 durable-success evaluator.
        batch_results: List[Dict[str, object]] = []
        summary_candidates: List[Dict[str, object]] = []
        seen_batch: set[str] = set()
        for b in batches:
            if b.batch_id in seen_batch:
                continue  # idempotent: one settle per batch (reg 7)
            seen_batch.add(b.batch_id)
            settle = evaluate_batch_settle(
                batch_id=b.batch_id,
                expected_tracks=list(b.expected_tracks),
                ledger_path=self.registry.ledger_path,
                this_track_id=b.this_track_id,
            )
            sj = settle.to_json()
            batch_results.append(sj)
            if settle.all_settled and settle.all_authoritative_pass:
                verdicts.append(LOOP_ALL_SETTLED)
                summary_candidates.append(
                    self._summary_candidate(b.batch_id, sj)
                )
                reasons.append(
                    f"batch {b.batch_id}: 3/3 (all) tracks durable-success "
                    "settled & every verdict AUTHORITATIVE_PASS — "
                    "consolidated summary candidate generated immediately "
                    "on the normal-callback durable-success event (회장 "
                    "§2.4/§2.5, reg 3/4/15/16)."
                )
                if b.next_phase:
                    dispatch_candidates.append(
                        self._dispatch_candidate(
                            source="batch",
                            source_id=b.batch_id,
                            next_phase=b.next_phase,
                            gate_evidence={
                                "evaluated_at_basis": settle.evaluated_at_basis,  # noqa: E501
                                "all_settled": settle.all_settled,
                                "all_authoritative_pass": (
                                    settle.all_authoritative_pass
                                ),
                            },
                        )
                    )
            else:
                verdicts.append(LOOP_WAIT)
                reasons.append(
                    f"batch {b.batch_id}: "
                    f"{len(settle.tracks_settled)}/"
                    f"{len(list(b.expected_tracks))} tracks settled — WAIT "
                    "(not all-settled). A fallback/dead-man pending state "
                    "does NOT block a completed track's progress (회장 "
                    "§2.7, reg 2/5)."
                )

        # 6. fold the loop verdict.
        if hold:
            verdict = LOOP_TRACK_MISMATCH
        elif LOOP_ALL_SETTLED in verdicts:
            verdict = LOOP_ALL_SETTLED
        elif NEXT_ACTION_READY in verdicts:
            verdict = LOOP_NEXT_ACTION_READY
        elif verdicts:
            verdict = LOOP_WAIT
        else:
            verdict = LOOP_WAIT
            reasons.append(
                "no single-task / batch spec supplied — loop scanned the "
                "registry, recorded progress events, took no action (read-"
                "only consumer)."
            )

        reasons.append(
            "schedule_history=audit only · cron-list≠authority · durable "
            "registry=state source · event loop=progress trigger · "
            "fallback/dead-man=safety net (NOT a progress trigger). All "
            "outputs are proposals (NO auto-dispatch/merge/write — 회장 "
            "§2.12/§8)."
        )
        return RuntimeEventLoopResult(
            schema=RESULT_SCHEMA,
            generated_at_kst=generated_at_kst,
            registry_path=str(self.registry.ledger_path),
            progress_trigger=PROGRESS_TRIGGER,
            verdict=verdict,
            fixed_time_used=False,
            dead_man_used=False,
            dead_man_signal_observed=dead_man_signal,
            fallback_pending_non_blocking=True,
            authority=AUTHORITY_NONE,
            action_mode=ACTION_MODE_PROPOSAL,
            events_observed=events,
            duplicate_events_suppressed=suppressed,
            single_task_results=single_results,
            batch_results=batch_results,
            consolidated_summary_candidates=summary_candidates,
            dispatch_candidates=dispatch_candidates,
            quarantined_events=quarantined,
            hold_for_chair=hold,
            reasons=reasons,
        )


def loop_result_envelope(
    result: RuntimeEventLoopResult, *, generated_at_kst: str = ""
) -> Dict[str, object]:
    """Serialize a loop result as the standalone runtime-event-loop-result
    artifact (회장 §7 expected_files)."""
    return {
        "schema": "task-2553+54.runtime-event-loop-result.v1",
        "generated_at_kst": generated_at_kst
        or result.generated_at_kst,
        "authority": AUTHORITY_NONE,
        "action_mode": ACTION_MODE_PROPOSAL,
        "auto_executed": False,
        "result": result.to_json(),
    }


__all__ = [
    "LOOP_SCHEMA",
    "RESULT_SCHEMA",
    "PROGRESS_TRIGGER",
    "EVENT_NORMAL_CALLBACK_DURABLE_SUCCESS",
    "EVENT_DURABLE_SUCCESS_WRITEBACK",
    "EVENT_REGISTERED_NO_PROGRESS",
    "EVENT_SELF_CHAIN_QUARANTINED",
    "EVENT_NON_ANU_HOLD",
    "LOOP_NEXT_ACTION_READY",
    "LOOP_WAIT",
    "LOOP_ALL_SETTLED",
    "LOOP_NO_REGISTRY",
    "LOOP_FORBIDDEN_TRIGGER",
    "LOOP_TRACK_MISMATCH",
    "LOOP_HOLD_FOR_CHAIR",
    "AUTHORITY_NONE",
    "ACTION_MODE_PROPOSAL",
    "SingleTaskSpec",
    "BatchSpec",
    "RuntimeEventLoopResult",
    "RuntimeEventLoop",
    "loop_result_envelope",
]
