# -*- coding: utf-8 -*-
"""anu_v3.dispatch_callback_contract — dispatch callback/progress-trigger 계약
(task-2614 §7b · 회장 2026-05-20 야간 필수 보강).

회장 verbatim mandate (BATCH_LEVEL_HOLD 시스템 필수 보강 — 새 범위 아님):

  모든 executor dispatch 는 (1) normal completion callback(독립 ANU key) 와
  (2) ANU-key fallback safety-net(미수신 recovery 전용·NON_BLOCKING) 둘 다
  등록하는 계약을 가져야 한다. normal callback durable-success 즉시 fallback
  은 cancel-on-success 로 제거된다(노이즈 0). fallback 은 *recovery 전용* —
  진행 트리거가 아니다(fixed-time/dead-man 진행트리거 금지). normal·fallback
  둘 다 부재면 ``DISPATCH_CONTRACT_VIOLATION`` 이다. ``result exists + normal
  missing + fallback missing`` 조건(=DISPATCH_CONTRACT_VIOLATION)에서만
  recovery watcher 가 **idempotent** 하게 독립 ANU collector 를 1회 spawn 한다
  (중복 0). 무조율 dead-man/fixed-time 진행트리거는 절대 금지 — 640665C8
  안티패턴(무조율 fixed-time ANU-key cron 진행트리거) 재발 차단. executor
  self-key 는 collector/adjudication/dispatch 에 절대 사용 불가(독립 ANU key
  전용 · +49 정본).

데몬 안전원칙 (§7b.7): sense / act / think 분리 · dry-run 우선 · 관측 후
행동. recovery watcher 는 상태를 변경하지 않고 조건만 탐지(sense)하고,
조건 충족 시 정확히 1회 spawn(act)하며, self-record 로 재진입을 차단한다.

Layer A / NO-CRON / NO-WRITE: 본 모듈은 ZERO cron register/remove, ZERO
subprocess, ZERO cokacdir, ZERO network, ZERO 파일쓰기(엔트리포인트 CLI 의
``--output`` 명시 경로 제외)다. spawn 은 *주입된* collback(``spawn_fn``)으로
위임 — 본 모듈은 실제 dispatch/cron/credential 표면을 import 하지 않는다.
기존 ``anu_v3.executor_callback_contract``(+32 doctrine) 는 무수정 — 본
모듈은 그 doctrine 을 dispatch 계약 *런타임 분류기*로 확장(중복 구현 0).
"""
from __future__ import annotations

import argparse
import json
import os
from dataclasses import asdict, dataclass, field
from pathlib import Path
from typing import Callable, Dict, List, Optional, Sequence

CONTRACT_SCHEMA = "anu_v3.dispatch_callback_contract.v1"

# 독립 ANU collector key (callback owner = ANU key 전용 · +49 정본).
INDEPENDENT_ANU_KEY = "c119085addb0f8b7"
# executor self-key — collector/adjudication/dispatch 에 절대 사용 금지.
EXECUTOR_SELF_KEY_FORBIDDEN = "1e41a2324a3ccdd0"

# ── classification taxonomy (§7b schema) ────────────────────────────────────
CONTRACT_OK = "CONTRACT_OK"
DISPATCH_CONTRACT_VIOLATION = "DISPATCH_CONTRACT_VIOLATION"
RECOVERY_SPAWNED = "RECOVERY_SPAWNED"
# normal 미수신·fallback 존재 — 정상 안전망 recovery 경로(진행트리거 아님).
FALLBACK_RECOVERY = "FALLBACK_RECOVERY"

CLASSIFICATIONS = (
    CONTRACT_OK,
    DISPATCH_CONTRACT_VIOLATION,
    RECOVERY_SPAWNED,
    FALLBACK_RECOVERY,
)


class ContractViolation(RuntimeError):
    """dispatch callback 계약 위반 — fail-closed."""


class ExecutorSelfKeyForbidden(RuntimeError):
    """executor self-key 를 collector/adjudication/dispatch 에 사용 시도.

    독립 ANU key(c119085addb0f8b7) 전용 — executor self-key
    (1e41a2324a3ccdd0) 는 절대 금지(+49 정본). fail-closed.
    """


def assert_collector_key_is_independent_anu(key: str) -> None:
    """collector/adjudication/dispatch key 가 독립 ANU key 인지 강제.

    executor self-key 또는 그 외 임의 key 면 ``ExecutorSelfKeyForbidden``
    (fail-closed). 약화 우회 불가.
    """
    if key == EXECUTOR_SELF_KEY_FORBIDDEN:
        raise ExecutorSelfKeyForbidden(
            f"executor self-key {key!r} 는 collector/adjudication/dispatch 에 "
            "절대 사용 불가 — 독립 ANU key 전용(+49 정본)"
        )
    if key != INDEPENDENT_ANU_KEY:
        raise ExecutorSelfKeyForbidden(
            f"collector key {key!r} 는 독립 ANU key({INDEPENDENT_ANU_KEY}) 가 "
            "아님 — fail-closed(self-chain 약화 금지)"
        )


@dataclass(frozen=True)
class DispatchContractRecord:
    """dispatch_callback_contract.v1 레코드 (schema 1:1)."""

    schema: str
    task_id: str
    normal_callback_present: bool
    fallback_present: bool
    result_present: bool
    classification: str
    idempotency_key: str
    collector_key: str
    executor_self_key_forbidden: bool
    fallback_cancel_on_success: bool
    recovery_required: bool
    recovery_is_fixed_time_or_dead_man: bool
    reasons: List[str] = field(default_factory=list)

    def to_dict(self) -> dict:
        return asdict(self)

    def to_json(self, indent: int = 2) -> str:
        return json.dumps(self.to_dict(), ensure_ascii=False, indent=indent)


def classify_dispatch_contract(
    *,
    task_id: str,
    normal_callback_present: bool,
    fallback_present: bool,
    result_present: bool,
    collector_key: str = INDEPENDENT_ANU_KEY,
) -> DispatchContractRecord:
    """단일 dispatch 의 callback 계약 분류 (공개 entrypoint · 순수 함수).

    §7b 런타임 계약 1~6 을 코드로 강제한다:

      * normal callback 수신 → ``CONTRACT_OK``. fallback 은
        cancel-on-success(노이즈 0). (계약 1·3)
      * normal 미수신 · fallback 존재 → ``FALLBACK_RECOVERY`` —
        미수신 recovery 안전망(진행트리거 아님·NON_BLOCKING). (계약 2·4)
      * normal·fallback 둘 다 부재 → ``DISPATCH_CONTRACT_VIOLATION``.
        result 까지 존재(=계약 6 조건)면 recovery watcher 가 독립 ANU
        collector 를 1회 spawn 해야 한다(``recovery_required=True``).
        result 부재면 위반이되 spawn 조건 미충족(no-op·진행트리거화 0).
        (계약 5·6)

    collector_key 는 독립 ANU key 만 허용 — executor self-key 면 fail-closed.
    recovery 는 *조건 탐지* 결과일 뿐 fixed-time/dead-man 진행트리거가
    아니다(``recovery_is_fixed_time_or_dead_man`` 는 항상 False — 640665C8
    안티패턴 재발 차단의 코드 명문).

    task-2620 §2.2 H1 hardening: CONTRACT_OK edge case 입력 정규화.
    mixed-signal (예: normal_callback_present=True 인데 fallback_present
    가 None/non-bool) 도 ``bool(...)`` 으로 강제하여 truthy 잡음을 차단한다.
    truthy 비-bool 입력은 boolean 정규화 후 그대로 평가 — 약화 없음.
    """
    assert_collector_key_is_independent_anu(collector_key)

    # H1 edge-case 정규화: classify 분기 전에 모든 signal 을 명시적 bool 로
    # 강제. truthy 비-bool 입력은 정규화 후 동일 분기, 그러나 분류 record
    # 의 signal 필드는 항상 진짜 bool 이 되어 downstream 약화/혼동을 차단.
    normal_callback_present = bool(normal_callback_present)
    fallback_present = bool(fallback_present)
    result_present = bool(result_present)

    reasons: List[str] = []
    idempotency_key = f"dispatch_callback_contract:{task_id}"

    if normal_callback_present:
        classification = CONTRACT_OK
        fallback_cancel_on_success = bool(fallback_present)
        recovery_required = False
        reasons.append(
            "normal completion callback 수신 → CONTRACT_OK (계약 1). "
            + (
                "fallback durable-success 즉시 cancel-on-success 제거(노이즈 "
                "0·계약 3)."
                if fallback_present
                else "fallback 미등록이나 normal 수신으로 계약 충족."
            )
        )
    elif fallback_present:
        classification = FALLBACK_RECOVERY
        fallback_cancel_on_success = False
        recovery_required = False
        reasons.append(
            "normal callback 미수신 · fallback safety-net 존재 → "
            "FALLBACK_RECOVERY (계약 2·4). fallback 은 미수신 recovery "
            "전용·NON_BLOCKING — 진행 트리거 아님(fixed-time/dead-man 금지)."
        )
    else:
        classification = DISPATCH_CONTRACT_VIOLATION
        fallback_cancel_on_success = False
        # 계약 6: result 존재 + normal 미수신 + fallback 미수신 일 때만
        # recovery watcher 가 독립 ANU collector spawn 대상.
        recovery_required = bool(result_present)
        reasons.append(
            "normal callback · fallback safety-net 둘 다 부재 → "
            "DISPATCH_CONTRACT_VIOLATION (계약 5)."
        )
        if result_present:
            reasons.append(
                "result 존재 + normal 미수신 + fallback 미수신 — recovery "
                "watcher 가 독립 ANU collector 를 idempotent 1회 spawn 해야 함 "
                "(계약 6). 무조율 dead-man/fixed-time 진행트리거 금지 "
                "(640665C8 안티패턴 차단)."
            )
        else:
            reasons.append(
                "result 부재 — 계약 6 spawn 조건 미충족. recovery watcher "
                "no-op(진행트리거화 0 · fixed-time/dead-man 아님)."
            )

    return DispatchContractRecord(
        schema=CONTRACT_SCHEMA,
        task_id=task_id,
        normal_callback_present=bool(normal_callback_present),
        fallback_present=bool(fallback_present),
        result_present=bool(result_present),
        classification=classification,
        idempotency_key=idempotency_key,
        collector_key=collector_key,
        executor_self_key_forbidden=True,
        fallback_cancel_on_success=fallback_cancel_on_success,
        recovery_required=recovery_required,
        recovery_is_fixed_time_or_dead_man=False,
        reasons=reasons,
    )


class InvalidObservation(ValueError):
    """task-2620 §2.2 H2 — evaluate input coercion fail-closed.

    빈 dict · None · wrong type · required-key 누락 입력을 받았을 때
    silent normalization 대신 즉시 명시 예외를 발생시킨다. 약화 우회 0.
    """


_OBSERVATION_BOOL_KEYS = (
    "normal_callback_present",
    "fallback_present",
    "result_present",
)
_OBSERVATION_REQUIRED_KEYS = ("task_id",) + _OBSERVATION_BOOL_KEYS


def evaluate(observation: Dict[str, object]) -> DispatchContractRecord:
    """실 entrypoint: observation dict(또는 fixture) → 계약 레코드.

    mock-only 경로는 본 함수를 우회할 수 없다 — 실제 관측값을 분류한다.

    task-2620 §2.2 H2 hardening: input-coercion fail-closed.

      * ``observation is None`` / dict 아님 / 빈 dict → InvalidObservation.
      * required key (task_id + 3 boolean signal) 부재 → InvalidObservation.
      * boolean signal 이 None / non-bool non-truthy-coerce 가능 type → 즉시
        InvalidObservation 으로 차단 (실수로 dict / list 가 들어오면 정상
        bool(...) 이 항상 True 가 되어 분류가 왜곡되는 경로 봉합).

    기존 PASS-path (정합 fixture · dict 입력) 는 byte-0 — 어느 fixture 도
    실패하지 않는다(H2 regression 케이스 + 기존 cases (a)~(f) 둘 다 PASS).
    """
    if observation is None:
        raise InvalidObservation(
            "observation is None — dispatch contract 평가에는 dict 가 필요 "
            "(H2 fail-closed)"
        )
    if not isinstance(observation, dict):
        raise InvalidObservation(
            f"observation must be dict, got {type(observation).__name__} — "
            "wrong type fail-closed (H2)"
        )
    if not observation:
        raise InvalidObservation(
            "observation is empty dict — required keys missing "
            f"({_OBSERVATION_REQUIRED_KEYS}) — H2 fail-closed"
        )
    missing = [k for k in _OBSERVATION_REQUIRED_KEYS if k not in observation]
    if missing:
        raise InvalidObservation(
            f"observation missing required key(s): {missing} — H2 fail-closed"
        )
    for k in _OBSERVATION_BOOL_KEYS:
        v = observation[k]
        # None / non-bool-non-numeric-non-str 등 dict/list 가 들어오면
        # silently True 가 되는 truthy-잡음 경로 봉합. bool/int/str 만 허용.
        if v is None:
            raise InvalidObservation(
                f"observation[{k!r}] is None — boolean signal required "
                "(H2 fail-closed)"
            )
        if not isinstance(v, (bool, int, str)):
            raise InvalidObservation(
                f"observation[{k!r}] type {type(v).__name__} is not "
                "bool/int/str — wrong type fail-closed (H2)"
            )

    task_id = str(observation.get("task_id", "task-unknown"))
    collector_key = str(
        observation.get("collector_key", INDEPENDENT_ANU_KEY)
    )
    return classify_dispatch_contract(
        task_id=task_id,
        normal_callback_present=bool(
            observation.get("normal_callback_present", False)
        ),
        fallback_present=bool(observation.get("fallback_present", False)),
        result_present=bool(observation.get("result_present", False)),
        collector_key=collector_key,
    )


# ── recovery watcher (sense/act/think 분리 · idempotent · NO fixed-time) ─────
SpawnFn = Callable[[str, str], object]


class RecoveryWatcher:
    """DISPATCH_CONTRACT_VIOLATION(result+normal-missing+fallback-missing)
    조건에서만 독립 ANU collector 를 **idempotent** 하게 1회 spawn.

    데몬 안전원칙 (§7b.7):

      * **sense**: ``observe`` 는 레코드를 분류하기만 한다 — 어떤 상태도
        변경하지 않는다(부수효과 0).
      * **act**: ``maybe_spawn`` 은 조건 충족 시에만 *정확히 1회* 주입된
        ``spawn_fn`` 을 호출하고 idempotency_key 를 self-record 하여
        재진입을 차단한다.
      * 무조율 dead-man / fixed-time 진행트리거 절대 금지(640665C8 차단).
        조건 미충족이면 **no-op** — 시간 경과만으로 발화하는 경로가 코드에
        존재하지 않는다.
      * spawn 은 독립 ANU key 로만 — executor self-key fail-closed.
    """

    def __init__(
        self,
        spawn_fn: SpawnFn,
        *,
        collector_key: str = INDEPENDENT_ANU_KEY,
        seen: Optional[set] = None,
    ) -> None:
        assert_collector_key_is_independent_anu(collector_key)
        self._spawn_fn = spawn_fn
        self._collector_key = collector_key
        # idempotency ledger (task 단위 1회 보장). 외부 주입 가능(영속).
        self._spawned: set = seen if seen is not None else set()

    # -- sense (부수효과 0) ---------------------------------------------------
    def observe(
        self, observation: Dict[str, object]
    ) -> DispatchContractRecord:
        """관측만 — 상태 변경 0. 분류 레코드를 반환한다."""
        return evaluate(observation)

    def _spawn_condition_met(self, rec: DispatchContractRecord) -> bool:
        """계약 6 조건: result 존재 ∧ normal 미수신 ∧ fallback 미수신
        (= DISPATCH_CONTRACT_VIOLATION ∧ recovery_required)."""
        return (
            rec.classification == DISPATCH_CONTRACT_VIOLATION
            and rec.result_present
            and not rec.normal_callback_present
            and not rec.fallback_present
            and rec.recovery_required
        )

    # -- act (조건 충족 시 정확히 1회) ---------------------------------------
    def maybe_spawn(
        self, observation: Dict[str, object]
    ) -> Dict[str, object]:
        """조건 충족 시에만 독립 ANU collector 를 1회 spawn (idempotent).

        반환: {spawned, classification, idempotency_key, duplicate_suppressed,
        reason}. 조건 미충족이면 spawned=False(no-op·진행트리거화 0).
        """
        rec = self.observe(observation)
        out: Dict[str, object] = {
            "spawned": False,
            "classification": rec.classification,
            "idempotency_key": rec.idempotency_key,
            "duplicate_suppressed": False,
            "collector_key": self._collector_key,
            "fixed_time_or_dead_man": False,
        }

        if not self._spawn_condition_met(rec):
            out["reason"] = (
                "spawn 조건 미충족(result+normal-missing+fallback-missing "
                "아님) — no-op. fixed-time/dead-man 진행트리거 아님."
            )
            return out

        if rec.idempotency_key in self._spawned:
            out["duplicate_suppressed"] = True
            out["reason"] = (
                "idempotency_key 이미 spawn 됨 — 중복 spawn 0 (재진입 차단)."
            )
            return out

        # task-2620 §2.2 H3 hardening: atomic record-order.
        #   write-then-mark 가 아니라 mark-then-act 로 바꾼다. spawn_fn 호출
        #   *전에* idempotency_key 를 self-record 하여 동시/재시도 경계에서
        #   spawn_fn 이 한 번 시작된 시점에 이미 ledger 가 잠긴 상태가 된다.
        #   set.add() 자체가 단일 hash-bucket 갱신으로 atomic — race 가 발생
        #   해도 같은 key 두 번째 add 는 no-op 이고, `if rec.idempotency_key
        #   in self._spawned` 가 두 번째 호출에서는 즉시 duplicate_suppressed
        #   를 반환한다. spawn_fn 에서 예외가 발생하면 rollback 하여 다음
        #   호출이 재시도할 수 있게 한다(fail-restorable).
        self._spawned.add(rec.idempotency_key)
        try:
            # act: 정확히 1회 독립 ANU collector spawn (주입된 spawn_fn 위임).
            spawn_result = self._spawn_fn(rec.task_id, self._collector_key)
        except BaseException:
            # spawn_fn 실패 시 idempotency 마킹 rollback — 다음 호출이 동일
            # task 를 다시 시도할 수 있게 (재시도 경계에서 deadlock 0).
            self._spawned.discard(rec.idempotency_key)
            raise
        out.update(
            spawned=True,
            spawn_result=spawn_result,
            reason=(
                "DISPATCH_CONTRACT_VIOLATION(result+normal-missing+"
                "fallback-missing) — 독립 ANU collector 1회 spawn. 무조율 "
                "dead-man/fixed-time 진행트리거 아님(640665C8 차단). "
                "atomic mark-then-act (H3) — 동시/재시도 경계 중복 0."
            ),
        )
        return out

    @property
    def spawned_keys(self) -> List[str]:
        return sorted(self._spawned)


# ── 실 entrypoint self-check (mock-only FAIL) ───────────────────────────────
def run_self_check(fixture_path: Optional[str] = None) -> Dict[str, object]:
    """실 entrypoint regression. 회장 필수 fixture(callback-gap)를 read-only
    consume 하여 DISPATCH_CONTRACT_VIOLATION + recovery 경로를 실증한다.

    mock-only 경로는 본질적으로 실패한다 — 상수 분류기는 (a)CONTRACT_OK 와
    (c)DISPATCH_CONTRACT_VIOLATION 을 동시에 만족시킬 수 없다.
    """
    failures: List[str] = []

    # (a) normal present → CONTRACT_OK · fallback cancel-on-success.
    a = classify_dispatch_contract(
        task_id="task-selftest-a",
        normal_callback_present=True,
        fallback_present=True,
        result_present=True,
    )
    if a.classification != CONTRACT_OK or not a.fallback_cancel_on_success:
        failures.append(f"(a) {a.classification} fc={a.fallback_cancel_on_success}")

    # (b) normal missing · fallback present → FALLBACK_RECOVERY.
    b = classify_dispatch_contract(
        task_id="task-selftest-b",
        normal_callback_present=False,
        fallback_present=True,
        result_present=True,
    )
    if b.classification != FALLBACK_RECOVERY or b.recovery_required:
        failures.append(f"(b) {b.classification} rr={b.recovery_required}")

    # (c) 회장 필수 fixture: result + normal missing + fallback missing.
    fx = fixture_path or (
        "memory/fixtures/task-2614.case-callback-gap.json"
    )
    fxp = Path(fx)
    if not fxp.is_absolute():
        fxp = Path(os.environ.get("WORKSPACE_ROOT", "/home/jay/workspace")) / fx
    try:
        obs = json.loads(fxp.read_text(encoding="utf-8"))
    except Exception as exc:  # noqa: BLE001
        return {
            "schema": "anu_v3.dispatch_callback_contract.self_check.v1",
            "all_passed": False,
            "error": f"fixture read fail: {type(exc).__name__}: {exc}",
            "fixture": str(fxp),
        }
    c = evaluate(obs.get("observation", obs))
    if c.classification != DISPATCH_CONTRACT_VIOLATION or not c.recovery_required:
        failures.append(
            f"(c) {c.classification} rr={c.recovery_required}"
        )

    # (d) idempotent — 동일 task 2회 → spawn 정확히 1회.
    calls: List[str] = []
    w = RecoveryWatcher(lambda tid, _key: calls.append(tid) or "spawned")
    r1 = w.maybe_spawn(obs.get("observation", obs))
    r2 = w.maybe_spawn(obs.get("observation", obs))
    if not (r1["spawned"] and not r2["spawned"] and len(calls) == 1):
        failures.append(f"(d) spawns={len(calls)} r1={r1} r2={r2}")

    # (e) 조건 미충족 → no-op (진행트리거화 0).
    calls_e: List[str] = []
    w_e = RecoveryWatcher(lambda tid, _key: calls_e.append(tid))
    noop = w_e.maybe_spawn(
        {
            "task_id": "task-selftest-e",
            "normal_callback_present": True,
            "fallback_present": False,
            "result_present": True,
        }
    )
    if noop["spawned"] or calls_e:
        failures.append(f"(e) unexpected spawn noop={noop}")

    # (f) executor self-key → fail-closed.
    self_key_fail_closed = False
    try:
        classify_dispatch_contract(
            task_id="task-selftest-f",
            normal_callback_present=True,
            fallback_present=True,
            result_present=True,
            collector_key=EXECUTOR_SELF_KEY_FORBIDDEN,
        )
    except ExecutorSelfKeyForbidden:
        self_key_fail_closed = True
    if not self_key_fail_closed:
        failures.append("(f) executor self-key NOT fail-closed")

    # mock-only guard: 상수 분류기(항상 CONTRACT_OK)는 (c) 에서 반드시 실패.
    mock_only_would_fail = a.classification != c.classification

    return {
        "schema": "anu_v3.dispatch_callback_contract.self_check.v1",
        "all_passed": not failures,
        "failures": failures,
        "fixture": str(fxp),
        "mock_only_would_fail": mock_only_would_fail,
        "cases": ["a", "b", "c", "d", "e", "f"],
    }


def _build_parser() -> argparse.ArgumentParser:
    p = argparse.ArgumentParser(
        prog="dispatch_callback_contract",
        description="task-2614 §7b — dispatch callback/progress-trigger 계약 "
        "런타임 (회장 야간 필수 보강 · 문서-only 금지).",
    )
    p.add_argument("--selftest", action="store_true",
                   help="실 entrypoint regression 실행")
    p.add_argument("--fixture", default=None,
                   help="callback-gap fixture 경로 (기본: 회장 필수 fixture)")
    p.add_argument("--evaluate", default=None,
                   help="observation JSON 경로 → 계약 분류 출력")
    return p


def _main(argv: Optional[Sequence[str]] = None) -> int:
    args = _build_parser().parse_args(argv)
    if args.evaluate:
        obs = json.loads(Path(args.evaluate).read_text(encoding="utf-8"))
        rec = evaluate(obs.get("observation", obs))
        print(rec.to_json())
        return 0
    res = run_self_check(args.fixture)
    print(json.dumps(res, ensure_ascii=False, indent=2))
    return 0 if res.get("all_passed") else 1


if __name__ == "__main__":  # pragma: no cover
    raise SystemExit(_main())
