"""utils/live_cron_state_verifier.py

task-2553+23 — CALLBACK_CANCEL_ON_SUCCESS_OPERATIONAL_INTEGRATION.

목적: 운영 collector 가 사전등록 fallback callback cron 을 실제로 제거하기
**직전**, 실 cron 상태를 live 조회하여 5조건 AND 교차검증한다. dispatch-fired
marker 의 ``fallback_cron_id`` 만으로 제거하지 않는다 — live 조회가 권위,
marker 는 교차입력(cross-input)일 뿐이다 (task-2553+23 §2, §4.2).

5조건 (전부 충족 시에만 remove 허용):
  ① live cron entry.task_id  == 처리 task_id
  ② live cron entry.chat_id  == 6937032012 (ANU 소유)
  ③ live cron entry.role     == "fallback"
  ④ dispatch-fired marker.fallback_callback_cron_id == target_cron_id
                                                    (== live entry.id)
  ⑤ 해당 cron 미발화(pending) AND 미제거

하나라도 불충족 / live 조회 실패 → SKIP(보존), remove 0.
idempotency: 이미 발화/제거된 fallback → no-op (이중제거 0, wrong id 제거 0).

설계 규율 (task-2553+9a 안전성 상속):
  * callback orchestrator(utils/anu_delegation_completion_callback.py) 무접촉
    — 본 모듈은 그것을 import/호출하지 않는다.
  * 본 모듈은 **판정만** 한다. cron 을 직접 제거하지 않는다(SRP — remove 는
    +9a remover 가, 호출 결선은 seam 이).
  * live 조회는 dependency-injected ``cron_lister``. 실 subprocess
    (`cokacdir --cron-list`) 는 운영 collector 만, 본 task 구현/테스트는
    fake/격리 only (§6 9-R.4 — 실 운영 cron 절대 비접촉).
  * fail-safe: live 상태를 5조건으로 **적극(positive) 확정**하지 못하면
    무조건 SKIP. "모르면 제거하지 않는다."
"""
from __future__ import annotations

import json
import subprocess
from dataclasses import dataclass, field
from datetime import datetime, timezone
from enum import Enum
from pathlib import Path
from typing import Callable, Optional

# ── 소유권 상수 (회장 verbatim — 절대 하드 경계, +9a 와 동일값) ──────────────
ANU_CHAT_ID = 6937032012
ANU_KEY = "c119085addb0f8b7"
FALLBACK_ROLE = "fallback"


def _now_utc() -> str:
    return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")


class LiveVerifyClassification(str, Enum):
    VERIFIED = "VERIFIED"  # 5조건 AND 전부 충족 → remove 허용
    SKIP_QUERY_FAILED = "SKIP_QUERY_FAILED"  # live 조회 실패/파싱불가
    SKIP_NOT_FOUND = "SKIP_NOT_FOUND"  # live 목록에 대상 cron 없음
    SKIP_ALREADY_REMOVED = "SKIP_ALREADY_REMOVED"  # 이미 제거됨 (idempotent)
    SKIP_ALREADY_FIRED = "SKIP_ALREADY_FIRED"  # 이미 발화함 (idempotent)
    SKIP_MISMATCH = "SKIP_MISMATCH"  # task/chat/role/marker 불일치
    SKIP_MARKER_ABSENT = "SKIP_MARKER_ABSENT"  # dispatch-fired marker 부재/파싱불가


# cron_lister 시그니처: () -> {"status": "ok"|"error", "entries": [...], "raw": ...}
#   entries[i] = {"id","task_id","chat_id","role","fired","removed"}
CronLister = Callable[[], dict]


def normalize_cokacdir_schedules(raw: dict) -> dict:
    """실 `cokacdir --cron-list` 출력 → 정규화 entries.

    cokacdir cron-list 는 [{id,prompt,schedule,created_at},...] 를 돌려준다.
    task_id/role 은 prompt 내 embedded callback-policy 토큰에서 best-effort
    파싱한다. 확정 불가 필드는 None 으로 두어 verifier 가 보수적으로 SKIP
    하도록 한다(fail-safe). chat_id 는 chat-scoped 조회이므로 ANU_CHAT_ID.
    """
    if not isinstance(raw, dict) or raw.get("status") != "ok":
        return {"status": "error", "entries": [], "raw": raw}
    entries = []
    for sch in raw.get("schedules", []) or []:
        if not isinstance(sch, dict):
            continue
        prompt = str(sch.get("prompt", ""))
        task_id = None
        role = None
        for tok in prompt.replace("\n", " ").split():
            if tok.startswith("task_id=") or tok.startswith("task-id="):
                task_id = tok.split("=", 1)[1].strip().strip(",;")
            elif tok.startswith("role="):
                role = tok.split("=", 1)[1].strip().strip(",;")
        entries.append(
            {
                "id": sch.get("id"),
                "task_id": task_id,
                "chat_id": ANU_CHAT_ID,  # chat-scoped 조회 결과
                "role": role,
                "fired": bool(sch.get("fired", False)),
                "removed": bool(sch.get("removed", False)),
            }
        )
    return {"status": "ok", "entries": entries, "raw": raw}


class RealCokacdirCronLister:
    """실 `cokacdir --cron-list` wrapper (운영 collector 전용 기본값).

    본 task 의 regression 은 fake lister 를 주입하므로 이 클래스의 subprocess
    경로는 테스트에서 절대 실행되지 않는다(§6 9-R.4). 조회 실패/파싱불가는
    status=error 로 반환되어 verifier 가 SKIP(보존)하도록 한다.
    """

    binary = "/usr/local/bin/cokacdir"

    def __call__(self) -> dict:  # pragma: no cover - 운영 collector 전용
        try:
            proc = subprocess.run(
                [
                    self.binary,
                    "--cron-list",
                    "--chat",
                    str(ANU_CHAT_ID),
                    "--key",
                    ANU_KEY,
                ],
                capture_output=True,
                text=True,
                timeout=60,
            )
            payload = json.loads(proc.stdout.strip() or "{}")
        except (OSError, ValueError, subprocess.SubprocessError) as exc:
            return {"status": "error", "entries": [], "raw": {"error": str(exc)}}
        return normalize_cokacdir_schedules(payload)


@dataclass
class LiveCronStateVerification:
    classification: LiveVerifyClassification
    task_id: str
    target_cron_id: str
    remove_allowed: bool
    checks: dict = field(default_factory=dict)
    live_entry: Optional[dict] = None
    marker_fallback_cron_id: Optional[str] = None
    skip_reason: str = ""
    notes: list = field(default_factory=list)
    ts_utc: str = ""

    def to_dict(self) -> dict:
        return {
            "schema": "live_cron_state_verification_v1",
            "task_id": self.task_id,
            "target_cron_id": self.target_cron_id,
            "classification": self.classification.value,
            "remove_allowed": self.remove_allowed,
            "checks": self.checks,
            "live_entry": self.live_entry,
            "marker_fallback_cron_id": self.marker_fallback_cron_id,
            "skip_reason": self.skip_reason,
            "notes": self.notes,
            "ts_utc": self.ts_utc,
        }


def _read_json(path: Path) -> Optional[dict]:
    try:
        return json.loads(path.read_text(encoding="utf-8"))
    except (OSError, json.JSONDecodeError):
        return None


def _marker_fallback_cron_id(marker: dict) -> Optional[str]:
    policy = marker.get("callback_policy_a")
    if not isinstance(policy, dict):
        return None
    cid = policy.get("fallback_callback_cron_id")
    return cid if isinstance(cid, str) and cid else None


def verify_live_cron_state(
    *,
    task_id: str,
    target_cron_id: str,
    dispatch_fired_marker_path: Path,
    cron_lister: Optional[CronLister] = None,
    now_fn: Callable[[], str] = _now_utc,
) -> LiveCronStateVerification:
    """실 cron-remove 직전 live cron-state 5조건 AND 교차검증.

    live 조회 결과가 권위. marker 의 fallback_cron_id 는 교차입력(④)일 뿐
    단독 권위가 아니다. 5조건 전부 positive 확정 시에만 remove_allowed=True.
    그 외 모든 경로(조회실패·부재·발화·제거·불일치·marker부재)는 SKIP
    (remove_allowed=False) → fallback 보존, 실 remove 0.
    """
    ts = now_fn()
    checks = {
        "c1_task_id_match": False,
        "c2_chat_id_owned": False,
        "c3_role_fallback": False,
        "c4_marker_id_crosscheck": False,
        "c5_pending_not_fired_not_removed": False,
        "all_satisfied": False,
    }

    # (0) dispatch-fired marker → fallback_cron_id (교차입력, 권위 아님)
    marker = _read_json(dispatch_fired_marker_path)
    if marker is None:
        return LiveCronStateVerification(
            classification=LiveVerifyClassification.SKIP_MARKER_ABSENT,
            task_id=task_id,
            target_cron_id=target_cron_id,
            remove_allowed=False,
            checks=checks,
            skip_reason="dispatch-fired marker 부재/파싱불가 → 교차입력 없음, SKIP",
            ts_utc=ts,
        )
    marker_cron_id = _marker_fallback_cron_id(marker)

    # (1) live 조회 (DI). 실패 → SKIP(보존).
    lister = cron_lister or RealCokacdirCronLister()
    try:
        listed = lister()
    except Exception as exc:  # noqa: BLE001 - 어떤 조회 예외든 보수적 SKIP
        return LiveCronStateVerification(
            classification=LiveVerifyClassification.SKIP_QUERY_FAILED,
            task_id=task_id,
            target_cron_id=target_cron_id,
            remove_allowed=False,
            checks=checks,
            marker_fallback_cron_id=marker_cron_id,
            skip_reason=f"live cron 조회 예외 → SKIP(보존): {exc}",
            ts_utc=ts,
        )
    if not isinstance(listed, dict) or listed.get("status") != "ok":
        return LiveCronStateVerification(
            classification=LiveVerifyClassification.SKIP_QUERY_FAILED,
            task_id=task_id,
            target_cron_id=target_cron_id,
            remove_allowed=False,
            checks=checks,
            marker_fallback_cron_id=marker_cron_id,
            skip_reason="live cron 조회 실패/비정상 응답 → SKIP(보존)",
            ts_utc=ts,
        )

    entries = listed.get("entries") or []
    entry = next(
        (e for e in entries if isinstance(e, dict) and e.get("id") == target_cron_id),
        None,
    )

    # (2) 대상 cron 이 live 목록에 없음 → 이미 제거됨(idempotent no-op).
    if entry is None:
        return LiveCronStateVerification(
            classification=LiveVerifyClassification.SKIP_ALREADY_REMOVED,
            task_id=task_id,
            target_cron_id=target_cron_id,
            remove_allowed=False,
            checks=checks,
            live_entry=None,
            marker_fallback_cron_id=marker_cron_id,
            skip_reason=(
                "live 목록에 대상 cron 부재 → 이미 제거/만료, idempotent no-op "
                "(이중 remove 0)"
            ),
            notes=["idempotent: already-removed 분기 — 실 remove 0"],
            ts_utc=ts,
        )

    # (3) 이미 발화 → idempotent no-op (DUPLICATE 경로 유지).
    if entry.get("fired"):
        return LiveCronStateVerification(
            classification=LiveVerifyClassification.SKIP_ALREADY_FIRED,
            task_id=task_id,
            target_cron_id=target_cron_id,
            remove_allowed=False,
            checks=checks,
            live_entry=entry,
            marker_fallback_cron_id=marker_cron_id,
            skip_reason=(
                "live cron 이미 발화(fired) → idempotent no-op, 기존 "
                "DUPLICATE_CALLBACK_IGNORED 경로 유지"
            ),
            notes=["idempotent: already-fired 분기 — 실 remove 0"],
            ts_utc=ts,
        )

    # (3') removed 플래그가 명시된 경우 → 이미 제거 (idempotent).
    if entry.get("removed"):
        return LiveCronStateVerification(
            classification=LiveVerifyClassification.SKIP_ALREADY_REMOVED,
            task_id=task_id,
            target_cron_id=target_cron_id,
            remove_allowed=False,
            checks=checks,
            live_entry=entry,
            marker_fallback_cron_id=marker_cron_id,
            skip_reason="live cron entry.removed=true → idempotent no-op (이중 remove 0)",
            notes=["idempotent: already-removed 플래그 분기 — 실 remove 0"],
            ts_utc=ts,
        )

    # (4) 5조건 AND.
    checks["c1_task_id_match"] = entry.get("task_id") == task_id
    checks["c2_chat_id_owned"] = entry.get("chat_id") == ANU_CHAT_ID
    checks["c3_role_fallback"] = entry.get("role") == FALLBACK_ROLE
    checks["c4_marker_id_crosscheck"] = bool(
        marker_cron_id
        and isinstance(marker_cron_id, str)
        and marker_cron_id == target_cron_id
        and entry.get("id") == target_cron_id
    )
    checks["c5_pending_not_fired_not_removed"] = (
        not entry.get("fired") and not entry.get("removed")
    )
    checks["all_satisfied"] = all(
        (
            checks["c1_task_id_match"],
            checks["c2_chat_id_owned"],
            checks["c3_role_fallback"],
            checks["c4_marker_id_crosscheck"],
            checks["c5_pending_not_fired_not_removed"],
        )
    )

    if checks["all_satisfied"]:
        return LiveCronStateVerification(
            classification=LiveVerifyClassification.VERIFIED,
            task_id=task_id,
            target_cron_id=target_cron_id,
            remove_allowed=True,
            checks=checks,
            live_entry=entry,
            marker_fallback_cron_id=marker_cron_id,
            skip_reason="",
            notes=[
                "5조건 AND 전부 충족 — live 조회 권위 확정, remove 허용",
            ],
            ts_utc=ts,
        )

    failed = [
        k
        for k in (
            "c1_task_id_match",
            "c2_chat_id_owned",
            "c3_role_fallback",
            "c4_marker_id_crosscheck",
            "c5_pending_not_fired_not_removed",
        )
        if not checks[k]
    ]
    return LiveCronStateVerification(
        classification=LiveVerifyClassification.SKIP_MISMATCH,
        task_id=task_id,
        target_cron_id=target_cron_id,
        remove_allowed=False,
        checks=checks,
        live_entry=entry,
        marker_fallback_cron_id=marker_cron_id,
        skip_reason=(
            "live cron-state 5조건 미충족 → SKIP(보존), 실 remove 0: "
            + ",".join(failed)
        ),
        notes=["mismatch — 추정 remove 0, wrong cron id 제거 0"],
        ts_utc=ts,
    )
