"""utils/completion_callback_fallback_cancel.py

task-2553+9a — CALLBACK_FALLBACK_CANCEL_ON_SUCCESS (회장 결정).

목적: normal completion callback collector 가 result/report/collector-result
marker 생성을 durable 하게 완료했을 때, dispatch 시점에 사전 등록된 fallback
callback cron 을 자동 제거하여 뒤늦은 redundant 발화를 없앤다.

설계 원칙 (task-2553+9a §4, §9-R.1~§9-R.5):
  * callback orchestrator(utils/anu_delegation_completion_callback.py) **무수정**.
    본 모듈은 완전 분리·독립 — orchestrator 를 import/호출하지 않는다(§9-R.4).
  * cron remove 는 §9-R.1 5조건 결합 검증 전부 충족 시에만 (오발 제거 0).
  * success gate 는 §9-R.2 durable evidence 기반 — caller boolean 단독 금지.
  * cron-remove 실행은 dependency-injected `remover` (§9-R.5). 실 subprocess
    호출은 운영 collector 만, 본 task 구현/테스트는 fake/dry-run only.

분류(CancelClassification):
  CANCELLED              fallback cron 제거 성공 → fallback_cancelled=true
  ALREADY_GONE           이미 삭제됨(또는 만료) — idempotent, 실패 아님
  ALREADY_FIRED          이미 발화함 — 기존 DUPLICATE_CALLBACK_IGNORED 경로 유지
  SKIPPED_NORMAL_FAILED  durable evidence 부재/HOLD/failure/partial → fallback 보존
  SKIPPED_UNTRUSTED      §9-R.1 5조건 중 하나라도 불충족 → remove 미실행
  REMOVE_FAILED_WARNING  remove 시도 실패 → warning marker, collector success 유지
"""
from __future__ import annotations

import json
import os
import subprocess
from dataclasses import dataclass, field
from datetime import datetime, timezone
from enum import Enum
from pathlib import Path
from typing import Callable, Optional

# ── 소유권 상수 (회장 verbatim — 절대 하드 경계) ────────────────────────────
ANU_CHAT_ID = 6937032012
ANU_KEY = "c119085addb0f8b7"
FALLBACK_ROLE = "fallback"

# durable evidence — result.json status 가 이 집합/패턴이면 실패·HOLD 로 간주
_FAILURE_STATUS_TOKENS = (
    "hold",
    "hold_for_chair",
    "fail",
    "failed",
    "failure",
    "error",
    "partial",
    "crash",
    "killed",
    "timeout",
    "aborted",
    "cancelled",
    "canceled",
    "running",
    "pending",
    "unknown",
)
_SUCCESS_STATUS_TOKENS = (
    "ok",
    "pass",
    "passed",
    "success",
    "succeeded",
    "complete",
    "completed",
    "done",
    "defensive_hold_pass",  # 회장 채택 정상 종료 분류 (PASS 계열)
)


def _now_utc() -> str:
    return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")


class CancelClassification(str, Enum):
    CANCELLED = "CANCELLED"
    ALREADY_GONE = "ALREADY_GONE"
    ALREADY_FIRED = "ALREADY_FIRED"
    SKIPPED_NORMAL_FAILED = "SKIPPED_NORMAL_FAILED"
    SKIPPED_UNTRUSTED = "SKIPPED_UNTRUSTED"
    REMOVE_FAILED_WARNING = "REMOVE_FAILED_WARNING"


@dataclass
class RemoverResult:
    """cron remover 호출 결과. status: removed|already_gone|already_fired|failed."""

    status: str
    detail: str = ""
    raw: Optional[dict] = None


# remover 시그니처: remover(cron_id: str, *, dry_run: bool) -> RemoverResult
Remover = Callable[..., RemoverResult]


class RealCokacdirCronRemover:
    """실 `cokacdir --cron-remove` wrapper (운영 collector 전용 기본값).

    본 task 의 regression 은 fake remover 를 주입하므로 이 클래스의 subprocess
    경로는 테스트에서 절대 실행되지 않는다(§9-R.5). dry_run=True 면 subprocess
    호출 자체를 하지 않고 시뮬레이션 결과만 돌려준다(이중 안전장치).
    """

    binary = os.environ.get("COKACDIR_BIN", "/usr/local/bin/cokacdir")

    def __call__(self, cron_id: str, *, dry_run: bool = True) -> RemoverResult:
        if dry_run:
            return RemoverResult(
                status="removed",
                detail="dry-run: 실 subprocess 호출 0 (운영 collector 만 실제 제거)",
                raw={"dry_run": True, "cron_id": cron_id},
            )
        proc = subprocess.run(  # pragma: no cover - 운영 collector 전용
            [
                self.binary,
                "--cron-remove",
                cron_id,
                "--chat",
                str(ANU_CHAT_ID),
                "--key",
                ANU_KEY,
            ],
            capture_output=True,
            text=True,
            timeout=60,
        )
        try:
            payload = json.loads(proc.stdout.strip() or "{}")
        except json.JSONDecodeError:
            payload = {"status": "error", "message": proc.stdout.strip()}
        if payload.get("status") == "ok":
            return RemoverResult(status="removed", detail="cokacdir ok", raw=payload)
        msg = str(payload.get("message", "")).lower()
        if "not found" in msg or "no such" in msg or "already" in msg:
            return RemoverResult(status="already_gone", detail=msg, raw=payload)
        return RemoverResult(status="failed", detail=msg or "remove failed", raw=payload)


@dataclass
class CancelDecision:
    classification: CancelClassification
    task_id: str
    target_cron_id: str
    cron_remove_invoked: bool
    fallback_cancelled: bool
    cancel_skipped_reason: str
    safe_remove_checks: dict = field(default_factory=dict)
    durable_evidence: dict = field(default_factory=dict)
    remover_result: Optional[dict] = None
    hold_reasons: list = field(default_factory=list)
    notes: list = field(default_factory=list)
    ts_utc: str = ""

    def to_dict(self) -> dict:
        return {
            "schema": "callback_fallback_cancel_result_v1",
            "task_id": self.task_id,
            "target_cron_id": self.target_cron_id,
            "classification": self.classification.value,
            "cron_remove_invoked": self.cron_remove_invoked,
            "fallback_cancelled": self.fallback_cancelled,
            "cancel_skipped_reason": self.cancel_skipped_reason,
            "safe_remove_checks": self.safe_remove_checks,
            "durable_evidence": self.durable_evidence,
            "remover_result": self.remover_result,
            "hold_reasons": self.hold_reasons,
            "notes": self.notes,
            "ts_utc": self.ts_utc,
        }


# ── §9-R.2 durable-evidence success gate ────────────────────────────────────


def _read_json(path: Path) -> Optional[dict]:
    try:
        return json.loads(path.read_text(encoding="utf-8"))
    except (OSError, json.JSONDecodeError):
        return None


def _status_is_success(status: str) -> bool:
    s = (status or "").strip().lower()
    if not s:
        return False
    # 실패/HOLD 토큰이 하나라도 부분일치하면 비-성공으로 간주 (보수적)
    for tok in _FAILURE_STATUS_TOKENS:
        if tok in s:
            return False
    return any(tok == s or tok in s for tok in _SUCCESS_STATUS_TOKENS)


def evaluate_durable_evidence(
    *,
    result_json_path: Path,
    report_path: Path,
    collector_result_marker_path: Path,
) -> dict:
    """§9-R.2: result.json(존재 AND status 성공/비-HOLD/비-failure) + report +
    collector-result marker 가 실재·정합할 때만 success. boolean 은 권위 아님."""
    ev: dict = {
        "result_json_exists": False,
        "result_json_status": None,
        "result_json_status_ok": False,
        "report_exists": False,
        "collector_result_marker_exists": False,
        "satisfied": False,
        "reason": "",
    }
    rj = _read_json(result_json_path)
    ev["result_json_exists"] = rj is not None
    if rj is not None:
        status = (
            rj.get("status")
            or rj.get("classification")
            or rj.get("final_status")
            or rj.get("result")
            or ""
        )
        ev["result_json_status"] = status
        ev["result_json_status_ok"] = _status_is_success(str(status))
    ev["report_exists"] = (
        report_path.exists() and report_path.stat().st_size > 0
    )
    ev["collector_result_marker_exists"] = collector_result_marker_path.exists()

    if not ev["result_json_exists"]:
        ev["reason"] = "result.json 부재 → normal collector 미완료/실패"
    elif not ev["result_json_status_ok"]:
        ev["reason"] = (
            f"result.json status 비-성공/HOLD/failure ({ev['result_json_status']!r})"
        )
    elif not ev["report_exists"]:
        ev["reason"] = "report 부재 또는 비어 있음"
    elif not ev["collector_result_marker_exists"]:
        ev["reason"] = "collector-result marker 부재"
    else:
        ev["satisfied"] = True
        ev["reason"] = "durable evidence 정합 (result+status+report+marker)"
    return ev


# ── §9-R.1 safe-remove 5조건 결합 검증 ──────────────────────────────────────


def evaluate_safe_remove(
    *,
    task_id: str,
    target_cron_id: str,
    dispatch_fired_marker_path: Path,
    callback_contract: Optional[dict] = None,
) -> dict:
    """§9-R.1 5조건. dispatch-fired marker 의
    callback_policy_a.fallback_callback_cron_id 가 단일 권위.
    callback contract 는 동일값 교차확인용 보조일 뿐 단독 권위 아님."""
    checks = {
        "c1_marker_id_matches": False,
        "c2_task_binding": False,
        "c3_ownership": False,
        "c4_role_fallback": False,
        "c5_not_stale_or_typo": False,
        "all_satisfied": False,
        "marker_present": False,
        "authority_cron_id": None,
        "contract_cross_check": "n/a",
        "fail_reason": "",
    }
    marker = _read_json(dispatch_fired_marker_path)
    if marker is None:
        checks["fail_reason"] = (
            "dispatch-fired marker 부재/파싱불가 → 추정 remove 0 (SKIPPED_UNTRUSTED)"
        )
        return checks
    checks["marker_present"] = True

    policy = marker.get("callback_policy_a")
    if not isinstance(policy, dict):
        checks["fail_reason"] = (
            "marker.callback_policy_a 부재 → fallback_cron_id 권위 없음"
        )
        return checks

    authority_id = policy.get("fallback_callback_cron_id")
    checks["authority_cron_id"] = authority_id

    # C1 — marker 권위 id == 제거 대상 id (정확 문자열 일치)
    checks["c1_marker_id_matches"] = bool(
        authority_id
        and isinstance(authority_id, str)
        and authority_id == target_cron_id
    )
    # C2 — marker.task_id == 처리 task_id (task-level binding)
    checks["c2_task_binding"] = marker.get("task_id") == task_id
    # C3 — ownership: chat_id == 6937032012 AND anu-key
    checks["c3_ownership"] = (
        policy.get("chat_id") == ANU_CHAT_ID
        and policy.get("anu_key") == ANU_KEY
    )
    # C4 — 제거 대상 역할 == fallback
    checks["c4_role_fallback"] = (
        policy.get("fallback_role") == FALLBACK_ROLE
    )
    # C5 — stale/typo/marker부재 아님: 권위 id 가 비어있지 않고 C1 충족
    checks["c5_not_stale_or_typo"] = bool(authority_id) and checks[
        "c1_marker_id_matches"
    ]

    # callback contract 교차확인 (보조). 불일치 시 신뢰 박탈.
    if callback_contract is not None:
        contract_id = callback_contract.get("fallback_callback_cron_id")
        if contract_id is None:
            checks["contract_cross_check"] = "contract_no_fallback_id"
        elif contract_id == authority_id:
            checks["contract_cross_check"] = "match"
        else:
            checks["contract_cross_check"] = "MISMATCH"
            checks["c5_not_stale_or_typo"] = False

    checks["all_satisfied"] = all(
        (
            checks["c1_marker_id_matches"],
            checks["c2_task_binding"],
            checks["c3_ownership"],
            checks["c4_role_fallback"],
            checks["c5_not_stale_or_typo"],
        )
    )
    if not checks["all_satisfied"]:
        failed = [
            k
            for k in (
                "c1_marker_id_matches",
                "c2_task_binding",
                "c3_ownership",
                "c4_role_fallback",
                "c5_not_stale_or_typo",
            )
            if not checks[k]
        ]
        checks["fail_reason"] = "5조건 미충족: " + ",".join(failed)
    return checks


# ── 동시성 단일 승자 락 (§9-R.3 #11 race) ───────────────────────────────────


def _acquire_cancel_lock(lock_path: Path) -> bool:
    """O_CREAT|O_EXCL atomic — normal-success-cancel vs fallback-fire race 에서
    단일 처리만 허용(이중 처리·재escalate 0)."""
    try:
        lock_path.parent.mkdir(parents=True, exist_ok=True)
        fd = os.open(str(lock_path), os.O_CREAT | os.O_EXCL | os.O_WRONLY, 0o644)
    except FileExistsError:
        return False
    with os.fdopen(fd, "w", encoding="utf-8") as fh:
        fh.write(json.dumps({"locked_at": _now_utc(), "pid": os.getpid()}))
    return True


# ── 메인 진입점 ─────────────────────────────────────────────────────────────


def cancel_fallback_on_success(
    *,
    task_id: str,
    target_cron_id: str,
    dispatch_fired_marker_path: Path,
    result_json_path: Path,
    report_path: Path,
    collector_result_marker_path: Path,
    fallback_cancelled_marker_path: Optional[Path] = None,
    cancel_lock_path: Optional[Path] = None,
    callback_contract: Optional[dict] = None,
    normal_collector_success: bool = False,
    remover: Optional[Remover] = None,
    dry_run: bool = True,
    now_fn: Callable[[], str] = _now_utc,
) -> CancelDecision:
    """normal collector 성공 시 사전등록 fallback callback cron 자동 제거.

    `normal_collector_success` 는 보조 신호일 뿐 §9-R.2 durable-evidence gate 에
    종속한다 — 단독으로 cancel 을 결정하지 않는다.
    """
    if remover is None:
        remover = RealCokacdirCronRemover()
    ts = now_fn()

    # (A) §9-R.2 durable-evidence success gate. boolean 단독 금지.
    ev = evaluate_durable_evidence(
        result_json_path=result_json_path,
        report_path=report_path,
        collector_result_marker_path=collector_result_marker_path,
    )
    ev["caller_boolean_aux"] = bool(normal_collector_success)
    if not ev["satisfied"]:
        return CancelDecision(
            classification=CancelClassification.SKIPPED_NORMAL_FAILED,
            task_id=task_id,
            target_cron_id=target_cron_id,
            cron_remove_invoked=False,
            fallback_cancelled=False,
            cancel_skipped_reason=(
                "normal collector durable evidence 미충족 → fallback 보존 "
                f"(예정대로 발화): {ev['reason']}"
            ),
            durable_evidence=ev,
            notes=[
                "§9-R.2: caller boolean 은 권위 아님 — durable evidence 부재 시 "
                "boolean=true 라도 SKIPPED_NORMAL_FAILED",
            ],
            ts_utc=ts,
        )

    # (B) §9-R.1 safe-remove 5조건 결합 검증.
    checks = evaluate_safe_remove(
        task_id=task_id,
        target_cron_id=target_cron_id,
        dispatch_fired_marker_path=dispatch_fired_marker_path,
        callback_contract=callback_contract,
    )
    if not checks["all_satisfied"]:
        hold_reasons = []
        if not checks["marker_present"]:
            hold_reasons.append(
                "dispatch-fired marker 부재 — fallback_cron_id 신뢰 불가"
            )
        if checks["contract_cross_check"] == "MISMATCH":
            hold_reasons.append(
                "callback contract vs marker 권위 id MISMATCH — 타 cron 위험"
            )
        return CancelDecision(
            classification=CancelClassification.SKIPPED_UNTRUSTED,
            task_id=task_id,
            target_cron_id=target_cron_id,
            cron_remove_invoked=False,
            fallback_cancelled=False,
            cancel_skipped_reason=(
                "§9-R.1 5조건 결합 검증 실패 → cron remove 미실행 "
                f"({checks['fail_reason']})"
            ),
            safe_remove_checks=checks,
            durable_evidence=ev,
            hold_reasons=hold_reasons,
            notes=[
                "오발 제거 0: 신뢰할 수 없는 fallback_cron_id 는 절대 추정 remove 0",
            ],
            ts_utc=ts,
        )

    # (C) 동시성 단일 승자 — race 에서 1회만 처리.
    if cancel_lock_path is not None and not _acquire_cancel_lock(cancel_lock_path):
        return CancelDecision(
            classification=CancelClassification.ALREADY_FIRED,
            task_id=task_id,
            target_cron_id=target_cron_id,
            cron_remove_invoked=False,
            fallback_cancelled=False,
            cancel_skipped_reason=(
                "동시 cancel 락 획득 실패 — 다른 처리기가 단일 처리 중 "
                "(이중 처리·재escalate 0, DUPLICATE 경로 유지)"
            ),
            safe_remove_checks=checks,
            durable_evidence=ev,
            ts_utc=ts,
        )

    # (D) cron remove 실행 (dependency-injected remover).
    rr = remover(target_cron_id, dry_run=dry_run)
    remover_dict = {"status": rr.status, "detail": rr.detail, "raw": rr.raw}

    if rr.status == "removed":
        if fallback_cancelled_marker_path is not None:
            fallback_cancelled_marker_path.parent.mkdir(parents=True, exist_ok=True)
            fallback_cancelled_marker_path.write_text(
                json.dumps(
                    {
                        "schema": "fallback_cancelled_v1",
                        "task_id": task_id,
                        "fallback_callback_cron_id": target_cron_id,
                        "fallback_cancelled": True,
                        "dry_run": dry_run,
                        "ts_utc": ts,
                        "safe_remove_checks": checks,
                        "durable_evidence": ev,
                    },
                    ensure_ascii=False,
                    indent=2,
                ),
                encoding="utf-8",
            )
        return CancelDecision(
            classification=CancelClassification.CANCELLED,
            task_id=task_id,
            target_cron_id=target_cron_id,
            cron_remove_invoked=True,
            fallback_cancelled=True,
            cancel_skipped_reason="",
            safe_remove_checks=checks,
            durable_evidence=ev,
            remover_result=remover_dict,
            notes=["fallback_cancelled=true marker persist"],
            ts_utc=ts,
        )

    if rr.status == "already_gone":
        return CancelDecision(
            classification=CancelClassification.ALREADY_GONE,
            task_id=task_id,
            target_cron_id=target_cron_id,
            cron_remove_invoked=True,
            fallback_cancelled=False,
            cancel_skipped_reason="fallback cron 이미 삭제/만료 — idempotent, 실패 아님",
            safe_remove_checks=checks,
            durable_evidence=ev,
            remover_result=remover_dict,
            ts_utc=ts,
        )

    if rr.status == "already_fired":
        return CancelDecision(
            classification=CancelClassification.ALREADY_FIRED,
            task_id=task_id,
            target_cron_id=target_cron_id,
            cron_remove_invoked=True,
            fallback_cancelled=False,
            cancel_skipped_reason=(
                "fallback 이미 발화 — 기존 DUPLICATE_CALLBACK_IGNORED 경로 유지"
            ),
            safe_remove_checks=checks,
            durable_evidence=ev,
            remover_result=remover_dict,
            ts_utc=ts,
        )

    # rr.status == "failed" (또는 미지)
    return CancelDecision(
        classification=CancelClassification.REMOVE_FAILED_WARNING,
        task_id=task_id,
        target_cron_id=target_cron_id,
        cron_remove_invoked=True,
        fallback_cancelled=False,
        cancel_skipped_reason=(
            "cron remove 실패 — warning marker. normal collector success 는 "
            "실패로 바꾸지 않음 (§3.5). fallback 은 DUPLICATE 경로로 음소거됨"
        ),
        safe_remove_checks=checks,
        durable_evidence=ev,
        remover_result=remover_dict,
        notes=["collector success preserved despite remove failure"],
        ts_utc=ts,
    )
