# -*- coding: utf-8 -*-
"""anu_v3.batch_settle_writeback — independent-ANU durable-success write-back
+ batch settle evaluator (task-2553+53 §3).

회장 §2 진단: +50/+51/+52 normal completion callback collector 가 모두 독립
ANU 로 AUTHORITATIVE_PASS 를 산출했으나, +44 durable 4-tuple registry
(``memory/events/callback_4tuple_index.jsonl``) 에 durable-success(COMPLETED)
레코드가 0/3 → batch coordinator all-settled 자동 인식 실패 → last-settle
collector consolidated summary 미생성. track 실패가 아니라 normal collector
durable-success → registry write-back → batch settle trigger 결선이 미완.

이 모듈은 그 결선을 메운다:

  1. 독립 ANU collector 의 terminal verdict(AUTHORITATIVE_PASS / PASS /
     ACCEPTED) 를 ``anu_v3.authoritative_verdict_selector`` 정본으로 *선택*
     (self-chain 영구 QUARANTINE — §3.3) 한 뒤,
  2. ``anu_v3.self_collector_guard`` / ``anu_v3.writeback_binding_conflict_
     guard`` 를 fail-closed 로 경유해,
  3. §3.2 11종 필드를 담은 durable-success(COMPLETED) write-back 레코드를
     +44 ledger 에 **additive append-only** 로 기록하고,
  4. batch_id 기준 all-tracks-settled 를 normal-callback durable-success
     event 기준으로 즉시 평가한다 (고정시각/dead-man/fallback 진행 트리거
     금지 — §3.6/§3.7).

reuse, not re-implement:

  * +44 ``anu_v3.callback_4tuple_registry`` (append-only ledger, byte-0 —
    additive append 만, allowlist).
  * +49 ``anu_v3.authoritative_verdict_selector`` (self-chain vs independent
    ANU 권위 선택 정본).
  * +49 ``anu_v3.self_collector_guard`` (executor self-collector fail-closed).
  * +49 ``anu_v3.writeback_binding_conflict_guard`` (WRITEBACK_BINDING_
    CONFLICT / idempotent skip).
  * ``dispatch.callback_owner_enforcer.DEFAULT_ANU_KEYS`` (ANU key 정본).

Layer A / NO-CRON: 순수 분류 + append-only ledger WRITE (allowlist gated).
ZERO cron register/remove, ZERO dispatch, ZERO subprocess, ZERO cokacdir,
ZERO merge/PR/branch. fixed-time/dead-man/fallback 을 진행 트리거로 사용 0.
"""
from __future__ import annotations

import hashlib
import json
import os
from dataclasses import dataclass, field
from pathlib import Path
from typing import Dict, List, Optional, Sequence, Tuple, Union

from anu_v3.authoritative_verdict_selector import (
    AUTHORITATIVE_PASS,
    VerdictRecord,
    select_authoritative_verdict,
)
from anu_v3.callback_4tuple_registry import (
    Callback4TupleRegistry,
    make_record,
    record_is_valid,
    validate_record,
)
from anu_v3.self_collector_guard import guard_self_collector_session
from anu_v3.writeback_binding_conflict_guard import guard_writeback_binding
from dispatch.callback_owner_enforcer import DEFAULT_ANU_KEYS

MODULE_SCHEMA = "anu_v3.batch_settle_writeback.v1"
DURABLE_SUCCESS_WRITEBACK_SCHEMA = "durable_success_writeback.v1"
BATCH_SETTLE_RESULT_SCHEMA = "batch_settle_result.v1"

CANONICAL_ROOT = "/home/jay/workspace"  # CLAUDE.md §1 canonical workspace root
DEFAULT_LEDGER_RELPATH = "memory/events/callback_4tuple_index.jsonl"

# §3.4/§3.5 — settle gate is judged ONLY on the normal-callback durable-
# success event, never on a fixed-time gate / dead-man / fallback (§3.7).
EVALUATED_AT_BASIS = "normal_callback_durable_success_event"

# Terminal verdicts that authorise a durable-success write-back (§3.1).
TERMINAL_WRITEBACK_VERDICTS = ("AUTHORITATIVE_PASS", "PASS", "ACCEPTED")

# write-back classifications (§3.8/§3.9).
DURABLE_SUCCESS_WRITTEN = "DURABLE_SUCCESS_WRITTEN"
WRITEBACK_IDEMPOTENT_SKIP = "WRITEBACK_IDEMPOTENT_SKIP"
WRITEBACK_BINDING_CONFLICT = "WRITEBACK_BINDING_CONFLICT"
TRACK_MISMATCH = "TRACK_MISMATCH"
SELF_CHAIN_NO_WRITEBACK = "SELF_CHAIN_NO_WRITEBACK"

# §3.2 mandatory write-back fields (11).
WRITEBACK_FIELDS_11: Tuple[str, ...] = (
    "task_id",
    "track_id",
    "dispatch_id",
    "normal_collector_cron_id",
    "collector_key",
    "collector_role",
    "authoritative_verdict",
    "terminal_status",
    "completed_at",
    "source_result_path",
    "batch_id",
)

# Identity fields that, on mismatch, are NEVER silent-skipped (§3.9).
BINDING_IDENTITY_FIELDS: Tuple[str, ...] = (
    "task_id",
    "track_id",
    "collector_key",
    "dispatch_id",
    "authoritative_verdict",
)


# ── §9 allowlist write guard (fail-closed) ───────────────────────────────────
class WritebackPathRefused(RuntimeError):
    """A write outside the §9 allowlist was attempted (fail-closed)."""


def _assert_ledger_allowlisted(ledger_path: Path) -> None:
    """The ONLY durable WRITE target is the +44 ledger (additive append).

    The write target is keyed to the ledger *identity* (the canonical
    allowlisted relpath, OR a path whose basename is the durable
    ledger file name — the latter only so isolated regression ledgers
    never touch the real +44 ledger). Any other target (a differently
    named file) = §9 violation -> refuse (fail-closed, never silent)."""
    rel = _relpath(ledger_path)
    ledger_name = Path(DEFAULT_LEDGER_RELPATH).name
    if rel != DEFAULT_LEDGER_RELPATH and Path(ledger_path).name != (
        ledger_name
    ):
        raise WritebackPathRefused(
            f"durable-success write-back target {rel!r} is NOT the §9 "
            f"allowlisted append-only ledger {DEFAULT_LEDGER_RELPATH!r} "
            f"(nor a {ledger_name!r}-named isolated ledger) — refused "
            "(fail-closed)."
        )


def _relpath(p: Path, root: object = CANONICAL_ROOT) -> str:
    try:
        return str(Path(p).resolve().relative_to(Path(root)))
    except (ValueError, OSError):
        return str(p)


# ── source verdict extraction (independent-ANU verdict file -> binding) ───────
def _normalize_task_id(raw: str) -> str:
    """``task-2553+50 (TRACK 1 — ...)`` -> ``task-2553+50``."""
    tid = (raw or "").strip()
    for sep in (" ", "\t", "("):
        if sep in tid:
            tid = tid.split(sep, 1)[0].strip()
    return tid


def _first_str(*vals: object) -> str:
    for v in vals:
        if isinstance(v, str) and v.strip():
            return v.strip()
    return ""


def _strip_key(raw: str) -> str:
    """``7943afbe12c12f7d (self-chain, 영구 비권위)`` -> ``7943afbe12c12f7d``."""
    return _first_str((raw or "").split("(", 1)[0]).split()[0] if raw else ""


@dataclass(frozen=True)
class WritebackBinding:
    """The §3.2 11-field durable-success binding extracted from an
    independent-ANU verdict file (untrusted free-text claims are NOT used
    to grant authority — origin is recomputed by the +49 selector)."""

    task_id: str
    track_id: str
    dispatch_id: str
    dispatch_id_source: str
    normal_collector_cron_id: str
    normal_collector_cron_id_source: str
    collector_key: str
    collector_role: str
    authoritative_verdict: str
    terminal_status: str
    completed_at: str
    source_result_path: str
    batch_id: str
    executor_key: str = ""
    chat_id: str = ""
    fallback_callback_cron_id: Optional[str] = None
    session_is_executor_self: bool = False

    def writeback_id(self) -> str:
        raw = "|".join(
            [
                self.task_id,
                self.track_id,
                self.dispatch_id,
                self.collector_key,
                self.authoritative_verdict,
                self.batch_id,
            ]
        )
        return hashlib.sha256(raw.encode("utf-8")).hexdigest()

    def to_record(self, classification: str) -> Dict[str, object]:
        rec: Dict[str, object] = {
            "schema": DURABLE_SUCCESS_WRITEBACK_SCHEMA,
            "task_id": self.task_id,
            "track_id": self.track_id,
            "dispatch_id": self.dispatch_id,
            "dispatch_id_source": self.dispatch_id_source,
            "normal_collector_cron_id": self.normal_collector_cron_id,
            "normal_collector_cron_id_source": (
                self.normal_collector_cron_id_source
            ),
            "collector_key": self.collector_key,
            "collector_role": self.collector_role,
            "authoritative_verdict": self.authoritative_verdict,
            "terminal_status": self.terminal_status,
            "completed_at": self.completed_at,
            "source_result_path": self.source_result_path,
            "batch_id": self.batch_id,
            "executor_key": self.executor_key,
            "chat_id": self.chat_id,
            "fallback_callback_cron_id": self.fallback_callback_cron_id,
            "writeback_classification": classification,
            "writeback_id": self.writeback_id(),
        }
        return rec


def extract_binding_from_source(
    source_path: Union[str, os.PathLike],
    *,
    batch_id: str,
    track_id: Optional[str] = None,
    executor_key: Optional[str] = None,
    canonical_root: Union[str, os.PathLike] = CANONICAL_ROOT,
) -> WritebackBinding:
    """Read an independent-ANU verdict JSON and extract the §3.2 binding.

    dispatch_id / normal_collector_cron_id, when not source-recorded, are
    *deterministically derived* with explicit ``*_source`` provenance — a
    transparent, idempotent derivation, never a silent fabrication (§3.8).
    """
    p = Path(source_path)
    data = json.loads(p.read_text(encoding="utf-8"))

    task_id = _normalize_task_id(
        _first_str(data.get("task_id"), data.get("task"))
    )
    trk = _first_str(track_id, data.get("track"), data.get("track_id"))
    if not trk:
        # last resort: derive from a 'three_track_batch' style mapping.
        trk = "UNKNOWN_TRACK"

    verdict = _first_str(
        data.get("authoritative_verdict"), data.get("verdict")
    )
    four = data.get("four_tuple_record") or {}
    bci = data.get("batch_coordinator_integration") or {}

    collector_key = _strip_key(
        _first_str(
            four.get("collector_key"),
            data.get("collector_owner_key"),
            data.get("collector_key"),
        )
    )
    if not collector_key:
        # Some verdict files only state the ANU key in prose; recompute from
        # the configured ANU key set only when the file explicitly asserts
        # collector_role=ANU and executor_self_collector is false.
        if (
            _first_str(data.get("collector_role")) == "ANU"
            and data.get("executor_self_collector") in (False, None)
        ):
            collector_key = next(iter(sorted(DEFAULT_ANU_KEYS)), "")

    collector_role = _first_str(
        four.get("collector_role"), data.get("collector_role")
    ) or "ANU"

    exec_key = _strip_key(
        _first_str(
            executor_key,
            four.get("executor_key"),
            data.get("executor_key"),
        )
    )

    chat_id = _first_str(
        four.get("chat_id"), data.get("chat_id"), bci.get("chat_id")
    )

    # dispatch_id — source-recorded or deterministically derived.
    disp = _first_str(
        four.get("dispatch_cron_id"),
        four.get("dispatch_id"),
        data.get("dispatch_cron_id"),
        data.get("dispatch_id"),
    )
    if disp:
        disp_src = "source_recorded"
    else:
        disp = f"derived:{task_id}"
        disp_src = "derived_from_task_id"

    # normal_collector_cron_id — source-recorded or derived from ANU key.
    ncc = _first_str(
        four.get("normal_collector_cron_id"),
        data.get("normal_collector_cron_id"),
    )
    if ncc:
        ncc_src = "source_recorded"
    else:
        ncc = f"ANU-normal-callback:{collector_key}"
        ncc_src = "derived_from_collector_key"

    fb = _first_str(
        four.get("fallback_callback_cron_id"),
        data.get("fallback_callback_cron_id"),
    ) or None

    completed_at = _first_str(
        data.get("ts_kst"), data.get("completed_at"), four.get("ts_kst")
    )

    self_session = bool(
        data.get("executor_self_collector")
        or data.get("session_is_executor_self")
        or data.get("executor_self_chain_authoritative")
    )

    return WritebackBinding(
        task_id=task_id,
        track_id=trk,
        dispatch_id=disp,
        dispatch_id_source=disp_src,
        normal_collector_cron_id=ncc,
        normal_collector_cron_id_source=ncc_src,
        collector_key=collector_key,
        collector_role=collector_role,
        authoritative_verdict=verdict,
        terminal_status="COMPLETED",
        completed_at=completed_at,
        source_result_path=_relpath(p, canonical_root),
        batch_id=batch_id,
        executor_key=exec_key,
        chat_id=chat_id,
        fallback_callback_cron_id=fb,
        session_is_executor_self=self_session,
    )


# ── §3.1/§3.3 independent-ANU terminal verdict gate (정본 selector) ──────────
@dataclass
class WritebackDecision:
    schema: str
    verdict: str  # PASS | FAIL
    classification: str
    task_id: str
    track_id: str
    batch_id: str
    durable_record: Optional[Dict[str, object]]
    appended: bool
    reasons: List[str] = field(default_factory=list)

    @property
    def ok(self) -> bool:
        return self.verdict == "PASS"

    def to_json(self) -> dict:
        return {
            "schema": self.schema,
            "verdict": self.verdict,
            "classification": self.classification,
            "task_id": self.task_id,
            "track_id": self.track_id,
            "batch_id": self.batch_id,
            "durable_record": self.durable_record,
            "appended": self.appended,
            "reasons": list(self.reasons),
        }


def _ledger_durable_lines(
    ledger_path: Path,
) -> List[Dict[str, object]]:
    """All durable_success_writeback.v1 lines (fail-safe; corrupt lines
    skipped). +44 Callback4TupleRecord lines are ignored here."""
    out: List[Dict[str, object]] = []
    try:
        if not ledger_path.is_file():
            return out
        text = ledger_path.read_text(encoding="utf-8")
    except OSError:
        return out
    for raw in text.splitlines():
        raw = raw.strip()
        if not raw:
            continue
        try:
            d = json.loads(raw)
        except json.JSONDecodeError:
            continue
        if d.get("schema") == DURABLE_SUCCESS_WRITEBACK_SCHEMA:
            out.append(d)
    return out


def evaluate_durable_success_writeback(
    source_path: Union[str, os.PathLike],
    *,
    batch_id: str,
    track_id: Optional[str] = None,
    executor_key: Optional[str] = None,
    expected: Optional[Dict[str, object]] = None,
    ledger_path: Optional[Union[str, os.PathLike]] = None,
    anu_keys: Sequence[str] = tuple(sorted(DEFAULT_ANU_KEYS)),
    canonical_root: Union[str, os.PathLike] = CANONICAL_ROOT,
) -> WritebackDecision:
    """§3.1/§3.2/§3.3/§3.8/§3.9 — evaluate (NO write) a durable-success
    write-back candidate from an independent-ANU verdict file.

    Pipeline (every step fail-closed, never a silent skip):

      1. extract §3.2 binding from the source verdict file.
      2. §3.9 mismatch — compare against ``expected`` (if given); any
         identity-field mismatch -> TRACK_MISMATCH / WRITEBACK_BINDING_
         CONFLICT (RECORDED, never silent).
      3. §3.3 — +49 ``select_authoritative_verdict``: self-chain origin is
         permanently QUARANTINED; only an independent-ANU terminal verdict
         is authoritative. self-chain -> SELF_CHAIN_NO_WRITEBACK.
      4. §3 — ``guard_self_collector_session`` fail-closed (collector MUST
         be an independent ANU session).
      5. §3.8 — idempotency over existing durable lines: identical binding
         -> WRITEBACK_IDEMPOTENT_SKIP; same idempotency identity but a
         different authoritative_verdict -> WRITEBACK_BINDING_CONFLICT.
      6. §3.9 — ``guard_writeback_binding`` over the +44 ledger history.
      7. clean -> DURABLE_SUCCESS_WRITTEN candidate (the §3.2 record).
    """
    root = Path(canonical_root)
    lp = Path(ledger_path) if ledger_path else root / DEFAULT_LEDGER_RELPATH

    b = extract_binding_from_source(
        source_path,
        batch_id=batch_id,
        track_id=track_id,
        executor_key=executor_key,
        canonical_root=root,
    )

    def _decision(verdict: str, cls: str, reasons: List[str],
                   rec: Optional[Dict[str, object]] = None
                   ) -> WritebackDecision:
        return WritebackDecision(
            schema=MODULE_SCHEMA,
            verdict=verdict,
            classification=cls,
            task_id=b.task_id,
            track_id=b.track_id,
            batch_id=b.batch_id,
            durable_record=rec,
            appended=False,
            reasons=reasons,
        )

    # 2. §3.9 explicit mismatch vs caller expectation — never silent skip.
    if expected:
        mismatched = [
            f for f in BINDING_IDENTITY_FIELDS
            if f in expected and str(expected[f]) != str(getattr(b, f))
        ]
        if mismatched:
            cls = (
                TRACK_MISMATCH
                if ("task_id" in mismatched or "track_id" in mismatched)
                else WRITEBACK_BINDING_CONFLICT
            )
            return _decision(
                "FAIL", cls,
                [
                    f"binding identity mismatch on {mismatched} — RECORDED "
                    f"as {cls}; silent skip is FORBIDDEN (§3.9). expected="
                    f"{ {k: expected[k] for k in mismatched} } "
                    f"observed={ {k: getattr(b, k) for k in mismatched} }"
                ],
                b.to_record(cls),
            )

    # 3. §3.3 independent-ANU vs self-chain (+49 정본 selector).
    if b.authoritative_verdict not in TERMINAL_WRITEBACK_VERDICTS:
        return _decision(
            "FAIL", SELF_CHAIN_NO_WRITEBACK,
            [
                f"verdict {b.authoritative_verdict!r} is not a terminal "
                f"write-back verdict {TERMINAL_WRITEBACK_VERDICTS} — no "
                "durable-success write-back (§3.1)."
            ],
        )
    sel = select_authoritative_verdict(
        [
            VerdictRecord(
                kind="collector_result",
                verdict="PASS",
                task_id=b.task_id,
                executor_key=b.executor_key,
                collector_key=b.collector_key,
                collector_role=b.collector_role,
                session_is_executor_self=b.session_is_executor_self,
                claimed_origin="independent_anu",
                detail=b.authoritative_verdict,
            )
        ],
        task_id=b.task_id,
        anu_keys=anu_keys,
    )
    if sel.classification != AUTHORITATIVE_PASS or (
        sel.independent_anu_count < 1
    ):
        return _decision(
            "FAIL", SELF_CHAIN_NO_WRITEBACK,
            [
                "authoritative_verdict_selector (+49 정본) did NOT classify "
                f"this as AUTHORITATIVE_PASS (got {sel.classification!r}, "
                f"independent_anu_count={sel.independent_anu_count}) — "
                "self-chain verdict is permanently non-authoritative; "
                "executor self-chain is NEVER written back as durable-"
                "success (§3.3)."
            ]
            + list(sel.reasons),
        )

    # 4. §3 self-collector guard fail-closed.
    g = guard_self_collector_session(
        executor_key=b.executor_key,
        collector_key=b.collector_key,
        collector_role=b.collector_role,
        is_executor_self_session=b.session_is_executor_self,
    )
    if not g.ok:
        return _decision(
            "FAIL", SELF_CHAIN_NO_WRITEBACK,
            ["self-collector guard FAIL: " + "; ".join(g.reasons)],
        )

    # 5. §3.8 idempotency over existing durable lines.
    wid = b.writeback_id()
    for ln in _ledger_durable_lines(lp):
        if ln.get("writeback_id") == wid:
            same = all(
                str(ln.get(f)) == str(getattr(b, f))
                for f in BINDING_IDENTITY_FIELDS
            )
            if same:
                return _decision(
                    "PASS", WRITEBACK_IDEMPOTENT_SKIP,
                    [
                        "identical durable-success write-back already "
                        "present (same writeback_id AND identical binding) "
                        "— idempotent SKIP, NOT a duplicate append (§3.8)."
                    ],
                    ln,
                )
        # same idempotency identity (task/track/dispatch/collector/batch)
        # but a DIFFERENT authoritative_verdict -> binding conflict (§3.9).
        if (
            ln.get("task_id") == b.task_id
            and ln.get("track_id") == b.track_id
            and ln.get("dispatch_id") == b.dispatch_id
            and ln.get("collector_key") == b.collector_key
            and ln.get("batch_id") == b.batch_id
            and ln.get("authoritative_verdict") != b.authoritative_verdict
        ):
            return _decision(
                "FAIL", WRITEBACK_BINDING_CONFLICT,
                [
                    "same binding identity but a different "
                    f"authoritative_verdict (ledger="
                    f"{ln.get('authoritative_verdict')!r} vs candidate="
                    f"{b.authoritative_verdict!r}) — RECORDED as "
                    "WRITEBACK_BINDING_CONFLICT; silent skip FORBIDDEN "
                    "(§3.9)."
                ],
                b.to_record(WRITEBACK_BINDING_CONFLICT),
            )

    # 6. §3.9 +49 writeback_binding_conflict_guard over +44 ledger history.
    reg = Callback4TupleRegistry(lp)
    history = reg.history_for(b.task_id)
    wb = guard_writeback_binding(
        history,
        task_id=b.task_id,
        dispatch_id=b.dispatch_id,
        chat_id=b.chat_id or "6937032012",
        normal_collector_cron_id=b.normal_collector_cron_id,
        candidate_role="anu_collector_durable_success",
        candidate_fallback_cron_id=b.fallback_callback_cron_id,
        candidate_owner_key=b.collector_key,
        executor_key=b.executor_key,
        anu_keys=anu_keys,
        candidate_session_is_executor_self=b.session_is_executor_self,
    )
    if not wb.ok:
        return _decision(
            "FAIL", WRITEBACK_BINDING_CONFLICT,
            ["writeback binding guard FAIL: " + "; ".join(wb.reasons)],
            b.to_record(WRITEBACK_BINDING_CONFLICT),
        )

    # 7. clean -> durable-success write-back candidate (§3.2 11 fields).
    rec = b.to_record(DURABLE_SUCCESS_WRITTEN)
    missing = [f for f in WRITEBACK_FIELDS_11 if not rec.get(f)]
    if missing:
        return _decision(
            "FAIL", WRITEBACK_BINDING_CONFLICT,
            [
                f"§3.2 mandatory write-back fields missing/empty: {missing} "
                "— refused (fail-closed, never a silent partial record)."
            ],
            rec,
        )
    return _decision(
        "PASS", DURABLE_SUCCESS_WRITTEN,
        [
            "independent-ANU AUTHORITATIVE_PASS confirmed via +49 정본 "
            "selector; self-collector + writeback-binding guards PASS; "
            "§3.2 11-field durable-success record ready for additive "
            "append-only write-back."
        ],
        rec,
    )


def apply_durable_success_writeback(
    decision: WritebackDecision,
    *,
    ledger_path: Union[str, os.PathLike],
    canonical_root: Union[str, os.PathLike] = CANONICAL_ROOT,
) -> WritebackDecision:
    """§3.1/§3.10 — durable-success additive append-only WRITE.

    ONLY the §9-allowlisted +44 ledger is writable (fail-closed). Two
    additive lines are appended:

      * the §3.2 ``durable_success_writeback.v1`` 11-field record, AND
      * a +44 ``callback_4tuple_ledger_record.v1`` COMPLETED line so the
        existing +44 ``classify`` / batch coordinator immediately observes
        ``NORMAL_CALLBACK_COMPLETED`` (this is the diagnosed §2 root-cause
        fix). No historical line is rewritten (append-only — +44 byte-0).

    IDEMPOTENT_SKIP / any FAIL classification -> NO write (idempotent /
    fail-closed). Returns the decision with ``appended`` set.
    """
    lp = Path(ledger_path)
    _assert_ledger_allowlisted(lp)

    if decision.classification == WRITEBACK_IDEMPOTENT_SKIP:
        decision.reasons.append(
            "idempotent skip — no duplicate append performed (§3.8)."
        )
        return decision
    if not decision.ok or decision.classification != DURABLE_SUCCESS_WRITTEN:
        decision.reasons.append(
            f"classification={decision.classification!r} verdict="
            f"{decision.verdict!r} — NO durable-success write performed "
            "(fail-closed; mismatch/conflict/self-chain never silently "
            "written §3.9)."
        )
        return decision

    rec = dict(decision.durable_record or {})
    lp.parent.mkdir(parents=True, exist_ok=True)
    line = json.dumps(rec, ensure_ascii=False, sort_keys=True)
    with open(lp, "a", encoding="utf-8") as fh:
        fh.write(line + "\n")
        fh.flush()
        os.fsync(fh.fileno())

    # +44-compatible COMPLETED line (root-cause fix: classify() recognises
    # NORMAL_CALLBACK_COMPLETED). Validated, never an invalid record.
    fb = rec.get("fallback_callback_cron_id")
    c44 = make_record(
        task_id=str(rec["task_id"]),
        dispatch_id=str(rec["dispatch_id"]),
        dispatch_cron_id=str(rec["dispatch_id"]),
        executor=f"independent ANU collector (key {rec['collector_key']})",
        chat_id=str(rec.get("chat_id") or "6937032012"),
        normal_collector_cron_id=str(rec["normal_collector_cron_id"]),
        fallback_callback_cron_id=(str(fb) if fb else None),
        role="anu_collector_durable_success",
        status="COMPLETED",
        no_fallback=(not fb),
        ts_kst=str(rec["completed_at"]),
    )
    if not record_is_valid(c44):
        decision.reasons.append(
            "WARN: +44-compat COMPLETED line invalid ("
            + "; ".join(validate_record(c44))
            + ") — durable_success_writeback line written; +44 mirror "
            "skipped (fail-closed, never an invalid +44 record)."
        )
        decision.appended = True
        return decision
    Callback4TupleRegistry(lp).append(c44)

    decision.appended = True
    decision.reasons.append(
        "durable-success write-back appended (additive, append-only): "
        "durable_success_writeback.v1 + +44 COMPLETED mirror line — "
        "batch coordinator now observes NORMAL_CALLBACK_COMPLETED (§2 "
        "root-cause fixed)."
    )
    return decision


# ── §3.4/§3.5 batch settle evaluator (normal-callback event basis) ───────────
@dataclass
class BatchSettleResult:
    schema: str
    batch_id: str
    evaluated_at_basis: str
    tracks_expected: List[str]
    tracks_settled: List[str]
    all_settled: bool
    all_authoritative_pass: bool
    decision: str  # ALL_SETTLED_CONSOLIDATE | RECORD_AND_DEFER
    track_states: List[Dict[str, object]]
    this_collector_is_last_settle_track: bool = False
    fallback_pending_non_blocking: bool = True
    consolidated_summary_candidate_path: Optional[str] = None
    reasons: List[str] = field(default_factory=list)

    def to_json(self) -> dict:
        return {
            "schema": self.schema,
            "batch_id": self.batch_id,
            "evaluated_at_basis": self.evaluated_at_basis,
            "tracks_expected": list(self.tracks_expected),
            "tracks_settled": list(self.tracks_settled),
            "all_settled": self.all_settled,
            "all_authoritative_pass": self.all_authoritative_pass,
            "decision": self.decision,
            "this_collector_is_last_settle_track": (
                self.this_collector_is_last_settle_track
            ),
            "fallback_pending_non_blocking": (
                self.fallback_pending_non_blocking
            ),
            "track_states": list(self.track_states),
            "consolidated_summary_candidate_path": (
                self.consolidated_summary_candidate_path
            ),
            "reasons": list(self.reasons),
        }


def evaluate_batch_settle(
    *,
    batch_id: str,
    expected_tracks: Sequence[Tuple[str, str]],
    ledger_path: Union[str, os.PathLike],
    this_track_id: Optional[str] = None,
) -> BatchSettleResult:
    """§3.4/§3.5/§3.6/§3.7 — evaluate all-tracks-settled for ``batch_id``.

    ``expected_tracks`` = sequence of (track_id, task_id). A track is
    *settled* iff the +44 ledger has a ``durable_success_writeback.v1``
    line for it with ``terminal_status == COMPLETED`` and a terminal
    authoritative verdict. The gate is judged ONLY on these normal-callback
    durable-success events — fixed-time / dead-man / fallback are NEVER a
    progress trigger (§3.7) and a fallback/dead-man pending state is
    explicitly NON-blocking (§3.6).

    decision = ``ALL_SETTLED_CONSOLIDATE`` iff every expected track is
    settled AND every verdict is AUTHORITATIVE_PASS (§3.5); else
    ``RECORD_AND_DEFER``.
    """
    lp = Path(ledger_path)
    durable = _ledger_durable_lines(lp)
    by_track: Dict[str, Dict[str, object]] = {}
    for d in durable:
        if d.get("batch_id") != batch_id:
            continue
        if d.get("writeback_classification") != DURABLE_SUCCESS_WRITTEN:
            continue
        # last-write-wins per track within the batch.
        by_track[str(d.get("track_id"))] = d

    track_states: List[Dict[str, object]] = []
    settled: List[str] = []
    all_ap = True
    for trk, task_id in expected_tracks:
        rec = by_track.get(trk)
        present = bool(
            rec
            and rec.get("terminal_status") == "COMPLETED"
            and rec.get("authoritative_verdict")
            in TERMINAL_WRITEBACK_VERDICTS
        )
        verdict = rec.get("authoritative_verdict") if rec else None
        if present:
            settled.append(trk)
            if verdict != AUTHORITATIVE_PASS:
                all_ap = False
        else:
            all_ap = False
        track_states.append(
            {
                "track_id": trk,
                "task_id": task_id,
                "terminal_status": (
                    rec.get("terminal_status") if rec else "PENDING"
                ),
                "authoritative_verdict": verdict,
                "durable_success_present": present,
                "writeback_classification": (
                    rec.get("writeback_classification") if rec else None
                ),
            }
        )

    all_settled = len(settled) == len(list(expected_tracks)) and bool(
        expected_tracks
    )
    consolidate = all_settled and all_ap
    decision = "ALL_SETTLED_CONSOLIDATE" if consolidate else "RECORD_AND_DEFER"

    reasons = [
        f"batch settle judged on {EVALUATED_AT_BASIS} ONLY — fixed-time / "
        "dead-man / fallback are NOT progress triggers (§3.7); a "
        "fallback/dead-man pending state does NOT block all-settled (§3.6).",
        f"{len(settled)}/{len(list(expected_tracks))} tracks settled via "
        "durable_success_writeback.v1 (COMPLETED).",
    ]
    if consolidate:
        reasons.append(
            "ALL tracks settled AND every verdict is AUTHORITATIVE_PASS — "
            "last settle collector generates the consolidated summary "
            "candidate (§3.5)."
        )
    else:
        reasons.append(
            "not all-settled (or a non-AUTHORITATIVE_PASS verdict present) "
            "-> RECORD_AND_DEFER; the last settle track's independent ANU "
            "collector consolidates once 3/3 durable-success are recorded."
        )

    is_last = bool(
        this_track_id
        and this_track_id in [t for t, _ in expected_tracks]
        and all_settled
    )

    return BatchSettleResult(
        schema=BATCH_SETTLE_RESULT_SCHEMA,
        batch_id=batch_id,
        evaluated_at_basis=EVALUATED_AT_BASIS,
        tracks_expected=[t for t, _ in expected_tracks],
        tracks_settled=settled,
        all_settled=all_settled,
        all_authoritative_pass=consolidate,
        decision=decision,
        track_states=track_states,
        this_collector_is_last_settle_track=is_last,
        fallback_pending_non_blocking=True,
        reasons=reasons,
    )


__all__ = [
    "MODULE_SCHEMA",
    "DURABLE_SUCCESS_WRITEBACK_SCHEMA",
    "BATCH_SETTLE_RESULT_SCHEMA",
    "EVALUATED_AT_BASIS",
    "TERMINAL_WRITEBACK_VERDICTS",
    "DURABLE_SUCCESS_WRITTEN",
    "WRITEBACK_IDEMPOTENT_SKIP",
    "WRITEBACK_BINDING_CONFLICT",
    "TRACK_MISMATCH",
    "SELF_CHAIN_NO_WRITEBACK",
    "WRITEBACK_FIELDS_11",
    "WritebackPathRefused",
    "WritebackBinding",
    "WritebackDecision",
    "BatchSettleResult",
    "extract_binding_from_source",
    "evaluate_durable_success_writeback",
    "apply_durable_success_writeback",
    "evaluate_batch_settle",
]
