"""utils/completion_callback_fallback_cancel.py

task-2553+9a — CALLBACK_FALLBACK_CANCEL_ON_SUCCESS (회장 결정).

목적: normal completion callback collector 가 result/report/collector-result
marker 생성을 durable 하게 완료했을 때, dispatch 시점에 사전 등록된 fallback
callback cron 을 자동 제거하여 뒤늦은 redundant 발화를 없앤다.

설계 원칙 (task-2553+9a §4, §9-R.1~§9-R.5):
  * callback orchestrator(utils/anu_delegation_completion_callback.py) **무수정**.
    본 모듈은 완전 분리·독립 — orchestrator 를 import/호출하지 않는다(§9-R.4).
  * cron remove 는 §9-R.1 5조건 결합 검증 전부 충족 시에만 (오발 제거 0).
  * success gate 는 §9-R.2 durable evidence 기반 — caller boolean 단독 금지.
  * cron-remove 실행은 dependency-injected `remover` (§9-R.5). 실 subprocess
    호출은 운영 collector 만, 본 task 구현/테스트는 fake/dry-run only.

분류(CancelClassification):
  CANCELLED              fallback cron 제거 성공 → fallback_cancelled=true
  ALREADY_GONE           이미 삭제됨(또는 만료) — idempotent, 실패 아님
  ALREADY_FIRED          이미 발화함 — 기존 DUPLICATE_CALLBACK_IGNORED 경로 유지
  SKIPPED_NORMAL_FAILED  durable evidence 부재/HOLD/failure/partial → fallback 보존
  SKIPPED_UNTRUSTED      §9-R.1 5조건 중 하나라도 불충족 → remove 미실행
  REMOVE_FAILED_WARNING  remove 시도 실패 → warning marker, collector success 유지
"""
from __future__ import annotations

import json
import os
import subprocess
from dataclasses import dataclass, field
from datetime import datetime, timezone
from enum import Enum
from pathlib import Path
from typing import Callable, Optional

from utils.callback_envelope_schema import ANU_CALLBACK_KEY as ANU_KEY
from utils import fallback_schedule_registry

# ── 소유권 상수 (회장 verbatim — 절대 하드 경계) ────────────────────────────
ANU_CHAT_ID = 6937032012
FALLBACK_ROLE = "fallback"

# durable evidence — result.json status 가 이 집합/패턴이면 실패·HOLD 로 간주
_FAILURE_STATUS_TOKENS = (
    "hold",
    "hold_for_chair",
    "fail",
    "failed",
    "failure",
    "error",
    "partial",
    "crash",
    "killed",
    "timeout",
    "aborted",
    "cancelled",
    "canceled",
    "running",
    "pending",
    "unknown",
)
_SUCCESS_STATUS_TOKENS = (
    "ok",
    "pass",
    "passed",
    "success",
    "succeeded",
    "complete",
    "completed",
    "done",
    "defensive_hold_pass",  # 회장 채택 정상 종료 분류 (PASS 계열)
)


def _now_utc() -> str:
    return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")


class CancelClassification(str, Enum):
    CANCELLED = "CANCELLED"
    ALREADY_GONE = "ALREADY_GONE"
    ALREADY_FIRED = "ALREADY_FIRED"
    SKIPPED_NORMAL_FAILED = "SKIPPED_NORMAL_FAILED"
    SKIPPED_UNTRUSTED = "SKIPPED_UNTRUSTED"
    REMOVE_FAILED_WARNING = "REMOVE_FAILED_WARNING"


@dataclass
class RemoverResult:
    """cron remover 호출 결과. status: removed|already_gone|already_fired|failed."""

    status: str
    detail: str = ""
    raw: Optional[dict] = None


# remover 시그니처: remover(cron_id: str, *, dry_run: bool) -> RemoverResult
Remover = Callable[..., RemoverResult]


class RealCokacdirCronRemover:
    """실 `cokacdir --cron-remove` wrapper (운영 collector 전용 기본값).

    본 task 의 regression 은 fake remover 를 주입하므로 이 클래스의 subprocess
    경로는 테스트에서 절대 실행되지 않는다(§9-R.5). dry_run=True 면 subprocess
    호출 자체를 하지 않고 시뮬레이션 결과만 돌려준다(이중 안전장치).
    """

    binary = os.environ.get("COKACDIR_BIN", "/usr/local/bin/cokacdir")

    def __call__(self, cron_id: str, *, dry_run: bool = True) -> RemoverResult:
        if dry_run:
            return RemoverResult(
                status="removed",
                detail="dry-run: 실 subprocess 호출 0 (운영 collector 만 실제 제거)",
                raw={"dry_run": True, "cron_id": cron_id},
            )
        proc = subprocess.run(  # pragma: no cover - 운영 collector 전용
            [
                self.binary,
                "--cron-remove",
                cron_id,
                "--chat",
                str(ANU_CHAT_ID),
                "--key",
                ANU_KEY,
            ],
            capture_output=True,
            text=True,
            timeout=60,
        )
        try:
            payload = json.loads(proc.stdout.strip() or "{}")
        except json.JSONDecodeError:
            payload = {"status": "error", "message": proc.stdout.strip()}
        if payload.get("status") == "ok":
            return RemoverResult(status="removed", detail="cokacdir ok", raw=payload)
        msg = str(payload.get("message", "")).lower()
        if "not found" in msg or "no such" in msg or "already" in msg:
            return RemoverResult(status="already_gone", detail=msg, raw=payload)
        return RemoverResult(status="failed", detail=msg or "remove failed", raw=payload)


@dataclass
class CancelDecision:
    classification: CancelClassification
    task_id: str
    target_cron_id: str
    cron_remove_invoked: bool
    fallback_cancelled: bool
    cancel_skipped_reason: str
    safe_remove_checks: dict = field(default_factory=dict)
    durable_evidence: dict = field(default_factory=dict)
    remover_result: Optional[dict] = None
    hold_reasons: list = field(default_factory=list)
    notes: list = field(default_factory=list)
    ts_utc: str = ""

    def to_dict(self) -> dict:
        return {
            "schema": "callback_fallback_cancel_result_v1",
            "task_id": self.task_id,
            "target_cron_id": self.target_cron_id,
            "classification": self.classification.value,
            "cron_remove_invoked": self.cron_remove_invoked,
            "fallback_cancelled": self.fallback_cancelled,
            "cancel_skipped_reason": self.cancel_skipped_reason,
            "safe_remove_checks": self.safe_remove_checks,
            "durable_evidence": self.durable_evidence,
            "remover_result": self.remover_result,
            "hold_reasons": self.hold_reasons,
            "notes": self.notes,
            "ts_utc": self.ts_utc,
        }


# ── §9-R.2 durable-evidence success gate ────────────────────────────────────


def _read_json(path: Path) -> Optional[dict]:
    try:
        return json.loads(path.read_text(encoding="utf-8"))
    except (OSError, json.JSONDecodeError):
        return None


def _status_is_success(status: str) -> bool:
    s = (status or "").strip().lower()
    if not s:
        return False
    # 실패/HOLD 토큰이 하나라도 부분일치하면 비-성공으로 간주 (보수적)
    for tok in _FAILURE_STATUS_TOKENS:
        if tok in s:
            return False
    return any(tok == s or tok in s for tok in _SUCCESS_STATUS_TOKENS)


def evaluate_durable_evidence(
    *,
    result_json_path: Path,
    report_path: Path,
    collector_result_marker_path: Path,
) -> dict:
    """§9-R.2: result.json(존재 AND status 성공/비-HOLD/비-failure) + report +
    collector-result marker 가 실재·정합할 때만 success. boolean 은 권위 아님."""
    ev: dict = {
        "result_json_exists": False,
        "result_json_status": None,
        "result_json_status_ok": False,
        "report_exists": False,
        "collector_result_marker_exists": False,
        "satisfied": False,
        "reason": "",
    }
    rj = _read_json(result_json_path)
    ev["result_json_exists"] = rj is not None
    if rj is not None:
        status = (
            rj.get("status")
            or rj.get("classification")
            or rj.get("final_status")
            or rj.get("result")
            or ""
        )
        ev["result_json_status"] = status
        ev["result_json_status_ok"] = _status_is_success(str(status))
    ev["report_exists"] = (
        report_path.exists() and report_path.stat().st_size > 0
    )
    ev["collector_result_marker_exists"] = collector_result_marker_path.exists()

    if not ev["result_json_exists"]:
        ev["reason"] = "result.json 부재 → normal collector 미완료/실패"
    elif not ev["result_json_status_ok"]:
        ev["reason"] = (
            f"result.json status 비-성공/HOLD/failure ({ev['result_json_status']!r})"
        )
    elif not ev["report_exists"]:
        ev["reason"] = "report 부재 또는 비어 있음"
    elif not ev["collector_result_marker_exists"]:
        ev["reason"] = "collector-result marker 부재"
    else:
        ev["satisfied"] = True
        ev["reason"] = "durable evidence 정합 (result+status+report+marker)"
    return ev


# ── §9-R.1 safe-remove 5조건 결합 검증 ──────────────────────────────────────


def evaluate_safe_remove(
    *,
    task_id: str,
    target_cron_id: str,
    dispatch_fired_marker_path: Path,
    callback_contract: Optional[dict] = None,
) -> dict:
    """§9-R.1 5조건. dispatch-fired marker 의
    callback_policy_a.fallback_callback_cron_id 가 단일 권위.
    callback contract 는 동일값 교차확인용 보조일 뿐 단독 권위 아님."""
    checks = {
        "c1_marker_id_matches": False,
        "c2_task_binding": False,
        "c3_ownership": False,
        "c4_role_fallback": False,
        "c5_not_stale_or_typo": False,
        "all_satisfied": False,
        "marker_present": False,
        "authority_cron_id": None,
        "contract_cross_check": "n/a",
        "fail_reason": "",
    }
    marker = _read_json(dispatch_fired_marker_path)
    if marker is None:
        checks["fail_reason"] = (
            "dispatch-fired marker 부재/파싱불가 → 추정 remove 0 (SKIPPED_UNTRUSTED)"
        )
        return checks
    checks["marker_present"] = True

    policy = marker.get("callback_policy_a")
    if not isinstance(policy, dict):
        checks["fail_reason"] = (
            "marker.callback_policy_a 부재 → fallback_cron_id 권위 없음"
        )
        return checks

    authority_id = policy.get("fallback_callback_cron_id")
    checks["authority_cron_id"] = authority_id

    # C1 — marker 권위 id == 제거 대상 id (정확 문자열 일치)
    checks["c1_marker_id_matches"] = bool(
        authority_id
        and isinstance(authority_id, str)
        and authority_id == target_cron_id
    )
    # C2 — marker.task_id == 처리 task_id (task-level binding)
    checks["c2_task_binding"] = marker.get("task_id") == task_id
    # C3 — ownership: chat_id == 6937032012 AND anu-key
    checks["c3_ownership"] = (
        policy.get("chat_id") == ANU_CHAT_ID
        and policy.get("anu_key") == ANU_KEY
    )
    # C4 — 제거 대상 역할 == fallback
    checks["c4_role_fallback"] = (
        policy.get("fallback_role") == FALLBACK_ROLE
    )
    # C5 — stale/typo/marker부재 아님: 권위 id 가 비어있지 않고 C1 충족
    checks["c5_not_stale_or_typo"] = bool(authority_id) and checks[
        "c1_marker_id_matches"
    ]

    # callback contract 교차확인 (보조). 불일치 시 신뢰 박탈.
    if callback_contract is not None:
        contract_id = callback_contract.get("fallback_callback_cron_id")
        if contract_id is None:
            checks["contract_cross_check"] = "contract_no_fallback_id"
        elif contract_id == authority_id:
            checks["contract_cross_check"] = "match"
        else:
            checks["contract_cross_check"] = "MISMATCH"
            checks["c5_not_stale_or_typo"] = False

    checks["all_satisfied"] = all(
        (
            checks["c1_marker_id_matches"],
            checks["c2_task_binding"],
            checks["c3_ownership"],
            checks["c4_role_fallback"],
            checks["c5_not_stale_or_typo"],
        )
    )
    if not checks["all_satisfied"]:
        failed = [
            k
            for k in (
                "c1_marker_id_matches",
                "c2_task_binding",
                "c3_ownership",
                "c4_role_fallback",
                "c5_not_stale_or_typo",
            )
            if not checks[k]
        ]
        checks["fail_reason"] = "5조건 미충족: " + ",".join(failed)
    return checks


# ── 동시성 단일 승자 락 (§9-R.3 #11 race) ───────────────────────────────────


def _acquire_cancel_lock(lock_path: Path) -> bool:
    """O_CREAT|O_EXCL atomic — normal-success-cancel vs fallback-fire race 에서
    단일 처리만 허용(이중 처리·재escalate 0)."""
    try:
        lock_path.parent.mkdir(parents=True, exist_ok=True)
        fd = os.open(str(lock_path), os.O_CREAT | os.O_EXCL | os.O_WRONLY, 0o644)
    except FileExistsError:
        return False
    with os.fdopen(fd, "w", encoding="utf-8") as fh:
        fh.write(json.dumps({"locked_at": _now_utc(), "pid": os.getpid()}))
    return True


# ── 메인 진입점 ─────────────────────────────────────────────────────────────


def cancel_fallback_on_success(
    *,
    task_id: str,
    target_cron_id: str,
    dispatch_fired_marker_path: Path,
    result_json_path: Path,
    report_path: Path,
    collector_result_marker_path: Path,
    fallback_cancelled_marker_path: Optional[Path] = None,
    cancel_lock_path: Optional[Path] = None,
    callback_contract: Optional[dict] = None,
    normal_collector_success: bool = False,
    remover: Optional[Remover] = None,
    dry_run: bool = True,
    now_fn: Callable[[], str] = _now_utc,
) -> CancelDecision:
    """normal collector 성공 시 사전등록 fallback callback cron 자동 제거.

    `normal_collector_success` 는 보조 신호일 뿐 §9-R.2 durable-evidence gate 에
    종속한다 — 단독으로 cancel 을 결정하지 않는다.
    """
    if remover is None:
        remover = RealCokacdirCronRemover()
    ts = now_fn()

    # (A) §9-R.2 durable-evidence success gate. boolean 단독 금지.
    ev = evaluate_durable_evidence(
        result_json_path=result_json_path,
        report_path=report_path,
        collector_result_marker_path=collector_result_marker_path,
    )
    ev["caller_boolean_aux"] = bool(normal_collector_success)
    if not ev["satisfied"]:
        return CancelDecision(
            classification=CancelClassification.SKIPPED_NORMAL_FAILED,
            task_id=task_id,
            target_cron_id=target_cron_id,
            cron_remove_invoked=False,
            fallback_cancelled=False,
            cancel_skipped_reason=(
                "normal collector durable evidence 미충족 → fallback 보존 "
                f"(예정대로 발화): {ev['reason']}"
            ),
            durable_evidence=ev,
            notes=[
                "§9-R.2: caller boolean 은 권위 아님 — durable evidence 부재 시 "
                "boolean=true 라도 SKIPPED_NORMAL_FAILED",
            ],
            ts_utc=ts,
        )

    # (B) §9-R.1 safe-remove 5조건 결합 검증.
    checks = evaluate_safe_remove(
        task_id=task_id,
        target_cron_id=target_cron_id,
        dispatch_fired_marker_path=dispatch_fired_marker_path,
        callback_contract=callback_contract,
    )
    if not checks["all_satisfied"]:
        hold_reasons = []
        if not checks["marker_present"]:
            hold_reasons.append(
                "dispatch-fired marker 부재 — fallback_cron_id 신뢰 불가"
            )
        if checks["contract_cross_check"] == "MISMATCH":
            hold_reasons.append(
                "callback contract vs marker 권위 id MISMATCH — 타 cron 위험"
            )
        return CancelDecision(
            classification=CancelClassification.SKIPPED_UNTRUSTED,
            task_id=task_id,
            target_cron_id=target_cron_id,
            cron_remove_invoked=False,
            fallback_cancelled=False,
            cancel_skipped_reason=(
                "§9-R.1 5조건 결합 검증 실패 → cron remove 미실행 "
                f"({checks['fail_reason']})"
            ),
            safe_remove_checks=checks,
            durable_evidence=ev,
            hold_reasons=hold_reasons,
            notes=[
                "오발 제거 0: 신뢰할 수 없는 fallback_cron_id 는 절대 추정 remove 0",
            ],
            ts_utc=ts,
        )

    # (C) 동시성 단일 승자 — race 에서 1회만 처리.
    if cancel_lock_path is not None and not _acquire_cancel_lock(cancel_lock_path):
        return CancelDecision(
            classification=CancelClassification.ALREADY_FIRED,
            task_id=task_id,
            target_cron_id=target_cron_id,
            cron_remove_invoked=False,
            fallback_cancelled=False,
            cancel_skipped_reason=(
                "동시 cancel 락 획득 실패 — 다른 처리기가 단일 처리 중 "
                "(이중 처리·재escalate 0, DUPLICATE 경로 유지)"
            ),
            safe_remove_checks=checks,
            durable_evidence=ev,
            ts_utc=ts,
        )

    # (D) cron remove 실행 (dependency-injected remover).
    rr = remover(target_cron_id, dry_run=dry_run)
    remover_dict = {"status": rr.status, "detail": rr.detail, "raw": rr.raw}

    if rr.status == "removed":
        if fallback_cancelled_marker_path is not None:
            fallback_cancelled_marker_path.parent.mkdir(parents=True, exist_ok=True)
            fallback_cancelled_marker_path.write_text(
                json.dumps(
                    {
                        "schema": "fallback_cancelled_v1",
                        "task_id": task_id,
                        "fallback_callback_cron_id": target_cron_id,
                        "fallback_cancelled": True,
                        "dry_run": dry_run,
                        "ts_utc": ts,
                        "safe_remove_checks": checks,
                        "durable_evidence": ev,
                    },
                    ensure_ascii=False,
                    indent=2,
                ),
                encoding="utf-8",
            )
        return CancelDecision(
            classification=CancelClassification.CANCELLED,
            task_id=task_id,
            target_cron_id=target_cron_id,
            cron_remove_invoked=True,
            fallback_cancelled=True,
            cancel_skipped_reason="",
            safe_remove_checks=checks,
            durable_evidence=ev,
            remover_result=remover_dict,
            notes=["fallback_cancelled=true marker persist"],
            ts_utc=ts,
        )

    if rr.status == "already_gone":
        return CancelDecision(
            classification=CancelClassification.ALREADY_GONE,
            task_id=task_id,
            target_cron_id=target_cron_id,
            cron_remove_invoked=True,
            fallback_cancelled=False,
            cancel_skipped_reason="fallback cron 이미 삭제/만료 — idempotent, 실패 아님",
            safe_remove_checks=checks,
            durable_evidence=ev,
            remover_result=remover_dict,
            ts_utc=ts,
        )

    if rr.status == "already_fired":
        return CancelDecision(
            classification=CancelClassification.ALREADY_FIRED,
            task_id=task_id,
            target_cron_id=target_cron_id,
            cron_remove_invoked=True,
            fallback_cancelled=False,
            cancel_skipped_reason=(
                "fallback 이미 발화 — 기존 DUPLICATE_CALLBACK_IGNORED 경로 유지"
            ),
            safe_remove_checks=checks,
            durable_evidence=ev,
            remover_result=remover_dict,
            ts_utc=ts,
        )

    # rr.status == "failed" (또는 미지)
    return CancelDecision(
        classification=CancelClassification.REMOVE_FAILED_WARNING,
        task_id=task_id,
        target_cron_id=target_cron_id,
        cron_remove_invoked=True,
        fallback_cancelled=False,
        cancel_skipped_reason=(
            "cron remove 실패 — warning marker. normal collector success 는 "
            "실패로 바꾸지 않음 (§3.5). fallback 은 DUPLICATE 경로로 음소거됨"
        ),
        safe_remove_checks=checks,
        durable_evidence=ev,
        remover_result=remover_dict,
        notes=["collector success preserved despite remove failure"],
        ts_utc=ts,
    )


# ── task-2728: 6 cause enum ──────────────────────────────────────────────────


class PruneCause(str, Enum):
    CANCEL_NOT_WIRED = "cancel_not_wired"
    CANCEL_FAILED = "cancel_failed"
    SCHEDULE_NOT_FOUND = "schedule_not_found"
    STALE_ROUND = "stale_round"
    STALE_HEAD = "stale_head"
    NORMAL_CALLBACK_ALREADY_COLLECTED = "normal_callback_already_collected"


# ── task-2728: PruneOutcome dataclass ────────────────────────────────────────


@dataclass
class PruneOutcome:
    task_id: str
    round: int
    head_sha: str
    cron_id: str
    classification: CancelClassification
    cause: Optional[str]
    cron_remove_invoked: bool
    pruned: bool
    detail: str
    ts_utc: str

    def to_dict(self) -> dict:
        return {
            "task_id": self.task_id,
            "round": self.round,
            "head_sha": self.head_sha,
            "cron_id": self.cron_id,
            "classification": self.classification.value,
            "cause": self.cause,
            "cron_remove_invoked": self.cron_remove_invoked,
            "pruned": self.pruned,
            "detail": self.detail,
            "ts_utc": self.ts_utc,
        }


# ── task-2728: SelfCheckDecision dataclass ───────────────────────────────────


@dataclass
class SelfCheckDecision:
    proceed: bool
    cause: Optional[str]
    detail: str
    task_id: str
    round: int
    head_sha: str
    ts_utc: str

    def to_dict(self) -> dict:
        return {
            "proceed": self.proceed,
            "cause": self.cause,
            "detail": self.detail,
            "task_id": self.task_id,
            "round": self.round,
            "head_sha": self.head_sha,
            "ts_utc": self.ts_utc,
        }


# ── task-2728: registry 기반 idempotent prune ────────────────────────────────


def prune_fallbacks_for_key(
    *,
    task_id: str,
    round: int,
    head_sha: str,
    trigger: str,
    remover: Optional[Remover] = None,
    dry_run: bool = True,
    canonical_root: Optional[str] = None,
    registry_path: Optional[str] = None,
    now_fn: Callable[[], str] = _now_utc,
) -> list:
    """registry 기반 idempotent prune.

    recs가 비어있으면 [] 반환 (이미 prune됨/없음 — idempotent 재실행 안전).
    각 rec에 대해 remover를 호출하고 PruneOutcome 리스트를 반환.
    """
    if remover is None:
        remover = RealCokacdirCronRemover()

    recs = fallback_schedule_registry.pending_for(
        task_id,
        round,
        head_sha,
        canonical_root=canonical_root,
        registry_path=registry_path,
    )

    if not recs:
        return []

    outcomes = []
    for rec in recs:
        try:
            rr = remover(rec.cron_id, dry_run=dry_run)
            ts = now_fn()

            if rr.status == "removed":
                classification = CancelClassification.CANCELLED
                cause = None
                cron_remove_invoked = True
                pruned = True
                detail = rr.detail or "removed"
                fallback_schedule_registry.mark_pruned(
                    task_id=rec.task_id,
                    round=rec.round,
                    head_sha=rec.head_sha,
                    cron_id=rec.cron_id,
                    cause=f"pruned_on_{trigger}",
                    canonical_root=canonical_root,
                    registry_path=registry_path,
                    now_fn=now_fn,
                )
            elif rr.status == "already_gone":
                classification = CancelClassification.ALREADY_GONE
                cause = PruneCause.SCHEDULE_NOT_FOUND.value
                cron_remove_invoked = True
                pruned = True
                detail = rr.detail or "already_gone"
                fallback_schedule_registry.mark_pruned(
                    task_id=rec.task_id,
                    round=rec.round,
                    head_sha=rec.head_sha,
                    cron_id=rec.cron_id,
                    cause=PruneCause.SCHEDULE_NOT_FOUND.value,
                    canonical_root=canonical_root,
                    registry_path=registry_path,
                    now_fn=now_fn,
                )
            elif rr.status == "already_fired":
                classification = CancelClassification.ALREADY_FIRED
                cause = PruneCause.NORMAL_CALLBACK_ALREADY_COLLECTED.value
                cron_remove_invoked = True
                pruned = True
                detail = rr.detail or "already_fired"
                fallback_schedule_registry.mark_pruned(
                    task_id=rec.task_id,
                    round=rec.round,
                    head_sha=rec.head_sha,
                    cron_id=rec.cron_id,
                    cause=PruneCause.NORMAL_CALLBACK_ALREADY_COLLECTED.value,
                    canonical_root=canonical_root,
                    registry_path=registry_path,
                    now_fn=now_fn,
                )
            else:
                # failed (또는 미지) — PENDING 유지, mark_pruned 호출 안 함
                classification = CancelClassification.REMOVE_FAILED_WARNING
                cause = PruneCause.CANCEL_FAILED.value
                cron_remove_invoked = True
                pruned = False
                detail = rr.detail or "remove failed"

            outcomes.append(
                PruneOutcome(
                    task_id=rec.task_id,
                    round=rec.round,
                    head_sha=rec.head_sha,
                    cron_id=rec.cron_id,
                    classification=classification,
                    cause=cause,
                    cron_remove_invoked=cron_remove_invoked,
                    pruned=pruned,
                    detail=detail,
                    ts_utc=ts,
                )
            )
        except Exception as exc:  # noqa: BLE001 — 1건 실패가 나머지 prune 차단 방지
            outcomes.append(
                PruneOutcome(
                    task_id=rec.task_id,
                    round=rec.round,
                    head_sha=rec.head_sha,
                    cron_id=rec.cron_id,
                    classification=CancelClassification.REMOVE_FAILED_WARNING,
                    cause=PruneCause.CANCEL_FAILED.value,
                    cron_remove_invoked=True,
                    pruned=False,
                    detail=f"REMOVE_FAILED_WARNING: {type(exc).__name__}: {exc}",
                    ts_utc=now_fn(),
                )
            )
            continue
    return outcomes


# ── task-2728: cancel_not_wired 근본원인 코드화 ──────────────────────────────


def detect_unwired_fallback(
    *,
    task_id: str,
    round: int,
    head_sha: str,
    dispatch_fired: bool = True,
    canonical_root: Optional[str] = None,
    registry_path: Optional[str] = None,
    now_fn: Callable[[], str] = _now_utc,
) -> Optional[PruneOutcome]:
    """cancel_not_wired 근본원인 코드화.

    dispatch_fired가 True인데 registry에 (task_id, round) 매칭 레코드가
    하나도 없으면 → fallback이 등록됐으나 registry 미기록 상태.
    """
    if not dispatch_fired:
        return None

    recs = fallback_schedule_registry.read_records(
        canonical_root=canonical_root,
        registry_path=registry_path,
    )
    # (task_id, round) 매칭 레코드가 하나도 없으면 unwired
    matched = [r for r in recs if r.task_id == task_id and r.round == round]
    if not matched:
        return PruneOutcome(
            task_id=task_id,
            round=round,
            head_sha=head_sha,
            cron_id="",
            classification=CancelClassification.SKIPPED_UNTRUSTED,
            cause=PruneCause.CANCEL_NOT_WIRED.value,
            cron_remove_invoked=False,
            pruned=False,
            detail=(
                "fallback dispatch fired but absent from durable registry "
                "— collector cannot prune (A86DB611 root cause)"
            ),
            ts_utc=now_fn(),
        )
    return None


# ── task-2729 Phase 2: live_prune 요약 helper (MT-1 결함7) ──────────────────


def summarize_live_prune(outcomes: list) -> dict:
    """PruneOutcome list를 live_prune 요약 dict로 변환.

    real_tombstone: pruned=True AND cron_remove_invoked=True 인 건이 1건 이상 존재.
    pruned_count: pruned=True 건수.
    causes: 각 outcome의 cause 값 list (PruneCause enum 보존).
    """
    real_tombstone = any(
        o.pruned and o.cron_remove_invoked for o in outcomes
    )
    pruned_count = sum(1 for o in outcomes if o.pruned)
    causes = [o.cause for o in outcomes]
    return {
        "real_tombstone": real_tombstone,
        "pruned_count": pruned_count,
        "causes": causes,
    }


# ── task-2728: fallback cron 실행 시점 2계층 방어 stale self-check ───────────


def fallback_self_check(
    *,
    task_id: str,
    round: int,
    head_sha: str,
    current_round: Optional[int] = None,
    current_head_sha: Optional[str] = None,
    normal_callback_collected: bool = False,
    now_fn: Callable[[], str] = _now_utc,
) -> SelfCheckDecision:
    """fallback cron 실행 시점 2계층 방어 stale self-check.

    우선순위:
    1. normal_callback_collected True → NORMAL_CALLBACK_ALREADY_COLLECTED
    2. current_round is not None and round < current_round → STALE_ROUND
    3. current_head_sha and head_sha and head_sha != current_head_sha → STALE_HEAD
    4. 그 외 → proceed=True
    """
    ts = now_fn()

    if normal_callback_collected:
        return SelfCheckDecision(
            proceed=False,
            cause=PruneCause.NORMAL_CALLBACK_ALREADY_COLLECTED.value,
            detail=(
                "normal callback already collected — fallback must not fire "
                "(stale self-check: NORMAL_CALLBACK_ALREADY_COLLECTED)"
            ),
            task_id=task_id,
            round=round,
            head_sha=head_sha,
            ts_utc=ts,
        )

    if current_round is not None and round < current_round:
        return SelfCheckDecision(
            proceed=False,
            cause=PruneCause.STALE_ROUND.value,
            detail=(
                f"fallback round {round} < current round {current_round} "
                "— stale fallback must not fire (STALE_ROUND)"
            ),
            task_id=task_id,
            round=round,
            head_sha=head_sha,
            ts_utc=ts,
        )

    if current_head_sha and head_sha and head_sha != current_head_sha:
        return SelfCheckDecision(
            proceed=False,
            cause=PruneCause.STALE_HEAD.value,
            detail=(
                f"fallback head_sha {head_sha!r} != current head_sha "
                f"{current_head_sha!r} — stale fallback must not fire (STALE_HEAD)"
            ),
            task_id=task_id,
            round=round,
            head_sha=head_sha,
            ts_utc=ts,
        )

    return SelfCheckDecision(
        proceed=True,
        cause=None,
        detail="recovery-only: no stale condition, fallback may fire as recovery",
        task_id=task_id,
        round=round,
        head_sha=head_sha,
        ts_utc=ts,
    )
