# -*- coding: utf-8 -*-
"""anu_v3.generic_batch_coordinator — parallel_batch_coordinator v0 generalized.

task-2553+30 TRACK B (NO-CRON variant, 9-R.1/9-R.2/9-R.3).

Goal (회장 verbatim, §2): generalize parallel_batch_coordinator v0 so that for
ANY future parallel batch, ANU auto-integrates per-track state · callback ·
fallback · dependency · loop progress · final authority packet · closeout
eligibility into ONE batch_state — code/file automation, not documentation.

Generalization approach (§4):
  * REUSE / EXTEND the +29 read-only registry layer as a generic tier.
  * NEW standalone generic module (this file) + NEW generic schemas.
  * additive only — zero edit / zero API break of:
        anu_v3.parallel_batch_coordinator  (+17/+19 entry, never imported here)
        anu_v3.batch_join_policy           (frozen anchor — imported read-only)
        anu_v3.parallel_runtime_registry   (+29 — imported read-only)
        anu_v3.batch_runtime_join_policy   (+29 — imported read-only)
  * generic batch_state authority == NEW separate path
        memory/events/task-2553.generic-batch-state.json
    distinct from the chair durable v1 memory/events/...parallel-batch-state.json
    (read-only, never mutated/coupled).

9-R.1 (read-only decision logic): every coordinator component below —
next-action resolver · final authority packet selector · consolidated summary
generator · result-ready classifier · overlap/forbidden checker — only
derive / propose / read. They NEVER execute, confirm, write, report, merge,
or touch cron. The single file emission helper (`emit_generic_batch_state`)
is an explicit, hard-guarded I/O function OUTSIDE the decision boundary; the
chair / ANU performs the actual closeout confirmation (closeout.confirmed is
hard-pinned False here).

9-R.2 (fixture verbs): the +26~+29 batch-closeout artifacts are read / parsed
/ referenced only. This module never modifies / registers / overwrites them;
the generic fixture is a NEW normalized copy.

9-R.3 (single dispatch): pure stdlib, no subprocess dispatch / delegation /
cron. Loop iteration is an in-process state derivation, never a new dispatch.

§7 NO-CRON: zero cron register/remove. Self-completion / recovery is
recognised purely by result.json + .done existence (dogfooding, §12).
"""
from __future__ import annotations

import hashlib
import json
import subprocess
from dataclasses import dataclass, field
from pathlib import Path
from typing import Dict, List, Optional, Sequence

# --- reused frozen anchor (read-only import — additive, zero edit) ----------
from anu_v3.batch_join_policy import (
    PacketCandidate,
    TrackArtifacts,
    cross_track_contamination,
    final_authority_packet_selector,
)

# --- reused +17 leaf modules (read-only import) -----------------------------
from anu_v3.batch_dependency_matrix import BatchDependencyMatrix, TrackSpec
from anu_v3.callback_track_registry import CallbackTrackRecord
from anu_v3.track_loop_state import (
    ACCEPTED,
    HOLD_FOR_CHAIR,
    MERGED,
    RUNNING,
    TrackLoopState,
    is_terminal,
)

# --- reused +29 read-only registry layer ------------------------------------
from anu_v3.callback_4tuple_index import Callback4TupleIndex, Tuple4
from anu_v3.parallel_runtime_registry import (
    ParallelRuntimeRegistry,
    TaskRuntimeRecord,
)
from anu_v3.result_ready_recovery import (
    RuntimeObservation,
    classify_runtime,
    is_recovery_eligible,
    recovery_note,
)

SCHEMA = "anu_v3.generic_batch_state.v0"
PLAN_SCHEMA = "anu_v3.generic_batch_plan.v0"

# generic settled set — extends +29 {MERGED,PASS,DONE} with ACCEPT so the
# registry-style accepted track (+29) counts as independently settled
# (§5 regression 1). +29 modules are NOT edited; this set lives here.
_SETTLED = {"MERGED", "PASS", "DONE", "ACCEPT"}

# durable v1 chair file — byte-0 immutable, never an emission target (§9).
_FROZEN_DURABLE_V1 = "task-2553.parallel-batch-state.json"


class FrozenWriteRefused(RuntimeError):
    """Emission was attempted against the chair durable v1 path or any
    git-tracked path. Generic batch_state authority MUST be the separate
    NEW untracked path (§4 / §9 / 9-R.2)."""


# ---------------------------------------------------------------------------
# generic plan model
# ---------------------------------------------------------------------------

@dataclass
class GenericTrackPlan:
    track_id: str
    task_id: str
    executor: str = ""
    dispatch_cron_id: str = ""
    normal_collector_cron_id: Optional[str] = None
    fallback_callback_cron_id: str = ""
    expected_files: List[str] = field(default_factory=list)
    forbidden_write_targets: List[str] = field(default_factory=list)
    depends_on: List[str] = field(default_factory=list)
    own_artifacts: List[str] = field(default_factory=list)
    cited_artifacts: List[str] = field(default_factory=list)
    # runtime observation (read-only, derived from result/.done existence)
    dispatch_status: str = "ok"
    normal_collector_executed: bool = False
    by_design_no_normal_collector: bool = False
    result_present: bool = False
    done_present: bool = False
    fallback_state: str = "PENDING"
    fallback_fire_kst: Optional[str] = None
    terminal_outcome: str = "UNKNOWN"
    hold_for_chair: bool = False
    retry_ceiling: int = 2
    authority_packets: List[Dict[str, str]] = field(default_factory=list)


@dataclass
class GenericBatchPlan:
    batch_label: str
    tracks: List[GenericTrackPlan]


# ---------------------------------------------------------------------------
# generic coordinator (all methods below = read-only decision logic, 9-R.1)
# ---------------------------------------------------------------------------

class GenericBatchCoordinator:
    """Single generic entry: given any parallel batch plan it derives the
    fully integrated generic batch_state. Pure / read-only (9-R.1)."""

    def __init__(self, plan: GenericBatchPlan) -> None:
        self.plan = plan
        self.batch_id = self._derive_batch_id(plan)

        # 4-tuple ownership index (+29 reuse) — cross-track callback ownership.
        self._index = Callback4TupleIndex()
        for tp in plan.tracks:
            self._index.register(
                Tuple4(
                    task_id=tp.task_id,
                    dispatch_cron_id=tp.dispatch_cron_id,
                    normal_collector_cron_id=tp.normal_collector_cron_id,
                    fallback_callback_cron_id=tp.fallback_callback_cron_id,
                )
            )

        # runtime records (+29 reuse) — classification via classify_runtime.
        self._reg = ParallelRuntimeRegistry()
        for tp in plan.tracks:
            self._reg.register_dispatch(
                TaskRuntimeRecord(
                    task_id=tp.task_id,
                    executor=tp.executor,
                    dispatch_cron_id=tp.dispatch_cron_id,
                    fallback_callback_cron_id=tp.fallback_callback_cron_id,
                    expected_artifacts=list(tp.expected_files),
                    dispatch_status=tp.dispatch_status,
                    expected_normal_collector_cron_id=tp.normal_collector_cron_id,
                    normal_collector_executed=tp.normal_collector_executed,
                    by_design_no_normal_collector=tp.by_design_no_normal_collector,
                    result_present=tp.result_present,
                    done_present=tp.done_present,
                    fallback_state=tp.fallback_state,
                    fallback_fire_kst=tp.fallback_fire_kst,
                    terminal_outcome=tp.terminal_outcome,
                    hold_for_chair=tp.hold_for_chair,
                )
            )

        # dependency / overlap matrix (+17 reuse).
        self._depmatrix = BatchDependencyMatrix(
            [
                TrackSpec(
                    track_id=tp.track_id,
                    expected_files=tp.expected_files,
                    forbidden_write_targets=tp.forbidden_write_targets,
                    depends_on=tp.depends_on,
                    own_artifacts=tp.own_artifacts,
                )
                for tp in plan.tracks
            ]
        )

        # 13-state loop machine per track (+17 reuse).
        self._loop: Dict[str, TrackLoopState] = {
            tp.track_id: TrackLoopState(
                track_id=tp.track_id,
                state=self._initial_loop_state(tp),
                retry_ceiling=tp.retry_ceiling,
            )
            for tp in plan.tracks
        }

    # -- deterministic batch id -----------------------------------------
    @staticmethod
    def _derive_batch_id(plan: GenericBatchPlan) -> str:
        basis = plan.batch_label + "|" + "|".join(
            sorted(f"{t.track_id}:{t.task_id}" for t in plan.tracks)
        )
        digest = hashlib.sha256(basis.encode("utf-8")).hexdigest()[:12]
        return f"batch-{plan.batch_label}-{digest}"

    @staticmethod
    def _initial_loop_state(tp: GenericTrackPlan) -> str:
        """Map an observed terminal outcome onto a valid 13-state position
        (read-only derivation — no transition side effects)."""
        if tp.hold_for_chair or tp.terminal_outcome == "HOLD":
            return HOLD_FOR_CHAIR
        if tp.terminal_outcome == "MERGED":
            return MERGED
        if tp.terminal_outcome in ("ACCEPT", "PASS", "DONE"):
            return ACCEPTED
        return RUNNING

    # -- §4 callback 4-tuple registry (5-field record view) -------------
    def callback_track_registry(self) -> Dict[str, Dict[str, object]]:
        """track_id-keyed 5-field callback_track_record view. by-design /
        NO-CRON tracks may carry a null normal cron and empty fallback cron;
        the strict +17 CallbackTrackRegistry is NOT used for those so its
        non-empty invariant stays intact (zero API break)."""
        out: Dict[str, Dict[str, object]] = {}
        for tp in self.plan.tracks:
            strict = bool(
                tp.dispatch_cron_id
                and tp.normal_collector_cron_id
                and tp.fallback_callback_cron_id
            )
            if strict:
                rec = CallbackTrackRecord(
                    track_id=tp.track_id,
                    task_id=tp.task_id,
                    dispatch_cron_id=tp.dispatch_cron_id,
                    normal_collector_cron_id=tp.normal_collector_cron_id or "",
                    fallback_callback_cron_id=tp.fallback_callback_cron_id,
                ).to_dict()
            else:
                rec = {
                    "track_id": tp.track_id,
                    "task_id": tp.task_id,
                    "dispatch_cron_id": tp.dispatch_cron_id,
                    "normal_collector_cron_id": tp.normal_collector_cron_id,
                    "fallback_callback_cron_id": tp.fallback_callback_cron_id,
                }
            out[tp.track_id] = rec
        return out

    def validate_callback_identity(
        self,
        *,
        claimed_task_id: str,
        event_kind: str,
        event_task_id: Optional[str] = None,
        event_cron_id: Optional[str] = None,
    ) -> List[str]:
        """4-tuple ownership check (+29 reuse). Non-empty == TRACK_MISMATCH
        reasons (§5 regression 5)."""
        return self._index.classify_event(
            claimed_task_id=claimed_task_id,
            event_kind=event_kind,
            event_task_id=event_task_id,
            event_cron_id=event_cron_id,
        )

    def tuple_consistency(self, task_id: str) -> List[str]:
        """Generic per-track 4-tuple consistency. dispatch cron always
        required; fallback cron required UNLESS the track is by-design
        NO-CRON (by_design_no_normal_collector & fallback_state in
        {NONE,CANCELLED}). Keeps the strict +29 validate_tuple unedited."""
        tp = next((t for t in self.plan.tracks if t.task_id == task_id), None)
        if tp is None:
            return [f"{task_id}: no track registered"]
        reasons: List[str] = []
        if not tp.dispatch_cron_id:
            reasons.append(f"{task_id}: dispatch_cron_id empty")
        no_cron_ok = tp.by_design_no_normal_collector and tp.fallback_state in (
            "NONE",
            "CANCELLED",
        )
        if not tp.fallback_callback_cron_id and not no_cron_ok:
            reasons.append(f"{task_id}: fallback_callback_cron_id empty")
        return reasons

    def classify_fallback_fire(
        self, *, claimed_task_id: str, fallback_cron_id: str,
        fallback_task_id: Optional[str] = None,
    ) -> str:
        """Classify a fallback firing against registry truth (+29 reuse):
        result ready + no normal -> DUPLICATE_CALLBACK_IGNORED (§5 reg 4);
        4-tuple mismatch -> TRACK_MISMATCH (§5 reg 5). read-only."""
        self._reg.reconcile_all()
        return self._reg.classify_fallback(
            claimed_task_id=claimed_task_id,
            fallback_cron_id=fallback_cron_id,
            fallback_task_id=fallback_task_id,
        )

    # -- §4 dependency / overlap / forbidden checkers -------------------
    def dependency_matrix(self) -> Dict[str, object]:
        return self._depmatrix.to_dict()

    def expected_files_overlap(self):
        return self._depmatrix.expected_files_overlap()

    def forbidden_write_overlap(self):
        return self._depmatrix.forbidden_write_overlap()

    # -- §4 result-ready / recovery classifier (+29 leaf reuse) ---------
    def track_runtime_records(self) -> Dict[str, Dict[str, object]]:
        """Per-track runtime record. Classification is derived directly via
        the +29 result_ready_recovery leaf (read-only reuse) so a by-design
        NO-CRON track (empty fallback cron, §7) is NOT mis-flagged by the
        strict +29 validate_tuple — the strict +29 module stays unedited
        (zero API break, §9). Cross-track 4-tuple ownership is still enforced
        via Callback4TupleIndex (regression 5)."""
        out: Dict[str, Dict[str, object]] = {}
        for tp in self.plan.tracks:
            reasons = self.tuple_consistency(tp.task_id)
            if reasons:
                classification = "TRACK_MISMATCH"
                rec_eligible = False
                note = ""
            else:
                obs = RuntimeObservation(
                    dispatch_ok=(tp.dispatch_status == "ok"),
                    result_present=tp.result_present,
                    done_present=tp.done_present,
                    normal_collector_executed=tp.normal_collector_executed,
                    by_design_no_normal_collector=tp.by_design_no_normal_collector,
                    fallback_state=tp.fallback_state,
                )
                classification = classify_runtime(obs)
                rec_eligible = is_recovery_eligible(classification)
                note = recovery_note(
                    classification, tp.by_design_no_normal_collector
                )
            out[tp.task_id] = {
                "task_id": tp.task_id,
                "executor": tp.executor,
                "dispatch_cron_id": tp.dispatch_cron_id,
                "dispatch_status": tp.dispatch_status,
                "expected_normal_collector_cron_id": tp.normal_collector_cron_id,
                "fallback_callback_cron_id": tp.fallback_callback_cron_id,
                "expected_artifacts": list(tp.expected_files),
                "normal_collector_registered": False,
                "normal_collector_executed": tp.normal_collector_executed,
                "by_design_no_normal_collector": tp.by_design_no_normal_collector,
                "result_present": tp.result_present,
                "done_present": tp.done_present,
                "fallback_state": tp.fallback_state,
                "fallback_fire_kst": tp.fallback_fire_kst,
                "classification": classification,
                "recovery_eligible": rec_eligible,
                "recovery_note": note,
                "terminal_outcome": tp.terminal_outcome,
                "hold_for_chair": tp.hold_for_chair,
                "track_mismatch_reasons": reasons,
            }
        return out

    # -- §4 contamination (frozen anchor reuse) -------------------------
    def contamination(self):
        return cross_track_contamination(
            [
                TrackArtifacts(
                    track_id=tp.track_id,
                    own_artifacts=tp.own_artifacts,
                    cited_artifacts=tp.cited_artifacts,
                )
                for tp in self.plan.tracks
            ]
        )

    # -- §4 final authority packet selector (frozen anchor reuse) -------
    def authority_packets(self) -> Dict[str, Dict[str, object]]:
        candidates: List[PacketCandidate] = []
        for tp in self.plan.tracks:
            if tp.authority_packets:
                for p in tp.authority_packets:
                    candidates.append(
                        PacketCandidate(
                            task_id=tp.task_id,
                            track_id=tp.track_id,
                            source=p.get("source", "fallback"),
                            schema=p.get("schema", "generic.result.v0"),
                            ts=p.get("ts", tp.fallback_fire_kst or ""),
                            path=p.get("path", ""),
                            status=p.get("status", tp.terminal_outcome),
                        )
                    )
            elif tp.result_present or tp.done_present:
                # synthesize the single observed packet (read-only derive).
                candidates.append(
                    PacketCandidate(
                        task_id=tp.task_id,
                        track_id=tp.track_id,
                        source="normal" if tp.normal_collector_executed
                        else "fallback",
                        schema="generic.result.v0",
                        ts=tp.fallback_fire_kst or "",
                        path=(tp.expected_files[0] if tp.expected_files else ""),
                        status=tp.terminal_outcome,
                    )
                )
        chosen = final_authority_packet_selector(candidates)
        return {
            task_id: {
                "task_id": c.task_id,
                "track_id": c.track_id,
                "source": c.source,
                "schema": c.schema,
                "ts": c.ts,
                "path": c.path,
                "status": c.status,
            }
            for task_id, c in chosen.items()
        }

    # -- §4 pending-fallback non-blocking join --------------------------
    def join(self) -> Dict[str, object]:
        """Independence join — a settled track whose fallback is still
        PENDING is NON-blocking (§5 regression 3); a HOLD track never demotes
        an independent settled track (§5 regression 7). Generic settled set
        includes ACCEPT (registry track). Pure read-only derivation."""
        records = self.track_runtime_records()
        contaminated = bool(self.contamination())
        independent: List[str] = []
        held: List[str] = []
        waiting: List[str] = []
        for tp in self.plan.tracks:
            rec = records[tp.task_id]
            if tp.hold_for_chair or tp.terminal_outcome == "HOLD":
                held.append(tp.task_id)
                continue
            if tp.terminal_outcome in _SETTLED:
                independent.append(tp.task_id)  # fallback PENDING ok
                continue
            waiting.append(tp.task_id)
        blocking: List[str] = []
        if held and independent:
            blocking.append(
                "held=%s do NOT block independent settled=%s "
                "(join by independence)" % (held, independent)
            )
        if contaminated:
            action = "BATCH_HOLD"
            blocking.append("cross_track_contamination -> BATCH_HOLD")
        elif held:
            action = "BATCH_HOLD"
        elif waiting:
            action = "WAIT_FOR_FALLBACK"
        else:
            action = "BATCH_ACCEPT"
        return {
            "independent_done_tracks": sorted(independent),
            "held_tracks": sorted(held),
            "waiting_tracks": sorted(waiting),
            "blocking_relations": blocking,
            "batch_next_action": action,
        }

    # -- §4 batch-level next-action resolver ----------------------------
    def batch_next_action(self) -> str:
        """Single batch-level next action (read-only resolve, 9-R.1).

        Precedence: contamination -> BATCH_HOLD_CONTAMINATION; any
        HOLD/HOLD_FOR_CHAIR -> CHAIR_DECISION_REQUIRED; any non-terminal loop
        -> CONTINUE_LOOP; pending fallback w/o result -> AWAIT_PENDING_CALLBACK;
        all terminal & clean -> CONSOLIDATE_FOR_CHAIR."""
        if self.contamination():
            return "BATCH_HOLD_CONTAMINATION"
        if any(
            tp.hold_for_chair
            or is_terminal(self._loop[tp.track_id].state)
            and self._loop[tp.track_id].state == HOLD_FOR_CHAIR
            for tp in self.plan.tracks
        ):
            return "CHAIR_DECISION_REQUIRED"
        if any(
            not tp.result_present
            and not tp.done_present
            and tp.fallback_state == "PENDING"
            and tp.terminal_outcome not in _SETTLED
            for tp in self.plan.tracks
        ):
            return "AWAIT_PENDING_CALLBACK"
        if any(
            not is_terminal(self._loop[tp.track_id].state)
            for tp in self.plan.tracks
        ):
            return "CONTINUE_LOOP"
        return "CONSOLIDATE_FOR_CHAIR"

    # -- §4 closeout proposal (PROPOSE only — confirmed hard-pinned False)
    def closeout_proposal(self) -> Dict[str, object]:
        """Evidence-based closeout eligibility, DERIVED from batch_state
        only (§5 regression 8). 9-R.1 / §7: registry/coordinator never
        confirms — confirmed is hard-pinned False; chair confirmation
        required out of band."""
        tracks = self.plan.tracks
        contaminated = bool(self.contamination())
        all_settled = bool(tracks) and all(
            tp.terminal_outcome in _SETTLED and not tp.hold_for_chair
            for tp in tracks
        )
        action = self.join()["batch_next_action"]
        eligible = all_settled and not contaminated and action == "BATCH_ACCEPT"
        if not tracks:
            reason = "no tracks in batch_state"
        elif contaminated:
            reason = "cross-task contamination present -> ineligible"
        elif not all_settled:
            unsettled = [
                tp.task_id
                for tp in tracks
                if tp.terminal_outcome not in _SETTLED or tp.hold_for_chair
            ]
            reason = f"tracks not all settled: {unsettled}"
        else:
            reason = (
                "all tracks settled (MERGED/PASS/DONE/ACCEPT), no "
                "contamination — closeout PROPOSED; chair confirmation "
                "still required (§7)."
            )
        return {
            "eligible": bool(eligible),
            "derived_from": "batch_state",
            "confirmed": False,  # §7 / 9-R.1 — never confirmed here
            "reason": reason,
        }

    # -- §4 consolidated summary generator (code-generated, final only) -
    def consolidated_summary(self) -> Dict[str, object]:
        records = self.track_runtime_records()
        return {
            "tracks": {
                tp.task_id: {
                    "terminal_outcome": tp.terminal_outcome,
                    "classification": records[tp.task_id]["classification"],
                    "hold_for_chair": tp.hold_for_chair,
                }
                for tp in self.plan.tracks
            },
            "batch_next_action": self.join()["batch_next_action"],
            "closeout_eligible": bool(self.closeout_proposal()["eligible"]),
        }

    # -- assemble the generic batch_state (read-only derive) ------------
    def build_state(self) -> Dict[str, object]:
        records = self.track_runtime_records()
        join = self.join()
        loop_states = {
            tid: st.to_dict() for tid, st in self._loop.items()
        }
        tuple_reasons = {
            tp.task_id: self.tuple_consistency(tp.task_id)
            for tp in self.plan.tracks
        }
        return {
            "schema": SCHEMA,
            "batch_id": self.batch_id,
            "batch_label": self.plan.batch_label,
            "decision_logic": "read-only: derive/propose/read only; zero "
            "execute/confirm/write/merge/cron (9-R.1).",
            "callback_track_registry": self.callback_track_registry(),
            "callback_4tuple_consistency": tuple_reasons,
            "dependency_matrix": self.dependency_matrix(),
            "track_runtime_records": records,
            "track_loop_states": loop_states,
            "contamination": [
                {"citing": a, "owner": b, "artifact": c}
                for (a, b, c) in self.contamination()
            ],
            "authority_packets": self.authority_packets(),
            "join_policy": join,
            "batch_next_action": join["batch_next_action"],
            "batch_next_action_resolver": self.batch_next_action(),
            "closeout_proposal": self.closeout_proposal(),
            "consolidated_summary": self.consolidated_summary(),
        }

    def build_consolidated_summary_md(self) -> str:
        st = self.build_state()
        cs = st["consolidated_summary"]
        lines: List[str] = []
        lines.append(
            f"# Generic Batch Consolidated Summary — {self.batch_id}"
        )
        lines.append("")
        lines.append(
            "> Code-generated (anu_v3.generic_batch_coordinator). Chair-only "
            "final decision fields. read-only decision logic (9-R.1)."
        )
        lines.append("")
        lines.append(f"- batch_label: `{self.plan.batch_label}`")
        lines.append(
            f"- batch_next_action: **{cs['batch_next_action']}**"
        )
        lines.append(
            f"- closeout_eligible: **{cs['closeout_eligible']}** "
            f"(confirmed: **{st['closeout_proposal']['confirmed']}** — "
            f"chair out-of-band, §7)"
        )
        contam = st["contamination"]
        lines.append(
            f"- cross_track_contamination: "
            f"{'NONE' if not contam else contam}"
        )
        lines.append("")
        lines.append("## Per-track final view")
        for tid, rec in cs["tracks"].items():
            lines.append(
                f"- `{tid}` → outcome **{rec['terminal_outcome']}**, "
                f"class `{rec['classification']}`, "
                f"hold={rec['hold_for_chair']}"
            )
        lines.append("")
        return "\n".join(lines) + "\n"

    # -- §7 NO-CRON dogfooding self-completion --------------------------
    @staticmethod
    def self_completion_recognized(
        result_path: Path, done_path: Path
    ) -> bool:
        """+30 recognises its own completion by result.json + .done
        existence — zero cron register/remove (§7, 9-R.1)."""
        return Path(result_path).is_file() and Path(done_path).is_file()


# ---------------------------------------------------------------------------
# fixture loader (9-R.2 — read/parse/reference only; never mutates source)
# ---------------------------------------------------------------------------

def load_plan_from_fixture(fixture_path: str | Path) -> GenericBatchPlan:
    data = json.loads(Path(fixture_path).read_text(encoding="utf-8"))
    tracks = [
        GenericTrackPlan(
            track_id=t.get("track_id", t["task_id"]),
            task_id=t["task_id"],
            executor=t.get("executor", ""),
            dispatch_cron_id=t.get("dispatch_cron_id", ""),
            normal_collector_cron_id=t.get("normal_collector_cron_id")
            or t.get("expected_normal_collector_cron_id"),
            fallback_callback_cron_id=t.get("fallback_callback_cron_id", ""),
            expected_files=list(
                t.get("expected_files", t.get("expected_artifacts", []))
            ),
            forbidden_write_targets=list(t.get("forbidden_write_targets", [])),
            depends_on=list(t.get("depends_on", [])),
            own_artifacts=list(t.get("own_artifacts", [])),
            cited_artifacts=list(t.get("cited_artifacts", [])),
            dispatch_status=t.get("dispatch_status", "ok"),
            normal_collector_executed=t.get("normal_collector_executed", False),
            by_design_no_normal_collector=t.get(
                "by_design_no_normal_collector", False
            ),
            result_present=t.get("result_present", False),
            done_present=t.get("done_present", False),
            fallback_state=t.get("fallback_state", "PENDING"),
            fallback_fire_kst=t.get("fallback_fire_kst"),
            terminal_outcome=t.get("terminal_outcome", "UNKNOWN"),
            hold_for_chair=t.get("hold_for_chair", False),
            retry_ceiling=t.get("retry_ceiling", 2),
            authority_packets=list(t.get("authority_packets", [])),
        )
        for t in data["tracks"]
    ]
    return GenericBatchPlan(
        batch_label=data["batch_label"], tracks=tracks
    )


# ---------------------------------------------------------------------------
# explicit hard-guarded emission — OUTSIDE the decision boundary (9-R.1)
# ---------------------------------------------------------------------------

def _is_git_tracked(p: Path) -> bool:
    try:
        top = subprocess.run(
            ["git", "-C", str(p.parent), "rev-parse", "--show-toplevel"],
            capture_output=True,
            text=True,
        )
        if top.returncode != 0:
            return False
        repo = top.stdout.strip()
        rc = subprocess.run(
            ["git", "-C", repo, "ls-files", "--error-unmatch", str(p.resolve())],
            capture_output=True,
            text=True,
        )
        return rc.returncode == 0
    except Exception:
        return False


# a path is a sanctioned NEW generic-state deliverable iff its name matches
# the suffix OR (when it already exists) it already carries the generic
# state schema marker — never an arbitrary unrelated untracked file.
_DELIVERABLE_SUFFIX = ".generic-batch-state.json"
_DELIVERABLE_MARKER = f'"schema": "{SCHEMA}"'


def emit_generic_batch_state(
    batch_state: Dict[str, object], out_path: str | Path
) -> Path:
    """Write the NEW generic authority file. NOT a decision component —
    explicit I/O the runner/ANU performs (9-R.1 boundary).

    Hard write-guard envelope (§4 / §9 / 9-R.2) — precise, mirrors the +17
    coordinator guard so an unrelated existing file can NEVER be clobbered:

      * chair durable v1 name                 -> REFUSE (byte-0 immutable)
      * git-tracked path                      -> REFUSE (tracked HEAD invariant)
      * untracked & non-existent              -> ALLOW  (NEW deliverable)
      * untracked & existing, IS a sanctioned
        generic-state deliverable             -> ALLOW  (idempotent re-emit)
      * untracked & existing, NOT a sanctioned
        deliverable                           -> REFUSE (never clobber an
                                                 unrelated untracked file)
    """
    out = Path(out_path)
    if out.name == _FROZEN_DURABLE_V1:
        raise FrozenWriteRefused(
            "refusing to write chair durable v1 (§9 byte-0); generic "
            "authority MUST be the separate task-2553.generic-batch-state.json"
        )
    if _is_git_tracked(out):
        raise FrozenWriteRefused(
            f"refusing to write git-tracked path {out} (§9 tracked HEAD "
            "invariant — only NEW untracked deliverables)"
        )
    if out.exists():
        sanctioned = out.name.endswith(_DELIVERABLE_SUFFIX)
        if not sanctioned:
            try:
                sanctioned = _DELIVERABLE_MARKER in out.read_text(
                    encoding="utf-8"
                )
            except (OSError, UnicodeDecodeError):
                sanctioned = False
        if not sanctioned:
            raise FrozenWriteRefused(
                f"refusing to overwrite existing untracked non-deliverable "
                f"{out} (9-R.2: only a NEW untracked path or the coordinator's "
                f"own generic-batch-state deliverable may be written)"
            )
    out.parent.mkdir(parents=True, exist_ok=True)
    out.write_text(
        json.dumps(batch_state, ensure_ascii=False, indent=2) + "\n",
        encoding="utf-8",
    )
    return out
