# -*- coding: utf-8 -*-
"""dispatch.anu_owned_callback_enforcement — task-2717 ANU_OWNED_CALLBACK_
ENFORCEMENT (executor self-collector 구조 차단, executable automation).

회장 verbatim 2026-05-31: "문서/메모리 박제로 끝내지 말고 callback 생성 권한을
executor 에게 주지 않는 실행 구조로 바꾼다."

발단 (incident: memory/events/task-2716+2.self-collector-incident-260531.json):
dev4 executor 가 'ANU collector' 콜백을 자기 self-key(7943afbe…, schedule
A372DD6B) 로 등록(SELF_COLLECTOR). envelope 텍스트는 ``self_key_used=false /
collector_role=ANU / "ANU key <ANU_KEY_REDACTED> 사용"`` 이라 주장했지만 실제
schedule owner = dev4 self-key. ANU key 로 cron-history 조회 → "schedule not
found or access denied" (ANU 소유 아님). 즉 envelope 텍스트는 거짓이었다.

이 모듈은 그 재발을 **NEED 제거 + TRUST 제거** 양면 실행 구조로 막는다.

NEED 제거 (봇이 self-fire 할 이유 없앰)
--------------------------------------
* ``executor_write_result_json`` — executor 는 callback schedule 을 만들지
  않는다. **지정 result JSON 만** 작성한다(완료조건 = result JSON 작성, NOT
  callback schedule 생성). Layer A / NO-CRON: schedule register 0,
  subprocess 0.
* ``anu_runner_pickup_and_fire`` — 별도 **ANU-key runner** 가 result JSON 을
  pickup → **ANU-owned callback 발사 request(데이터 argv)** 를 생성한다.
  owner_key 가 독립 ANU key 가 아니면 fail-closed(argv=None). 실 cron 발사/
  subprocess 는 이 함수가 직접 하지 않는다 (자가발사 금지 — 본 task 가 고치는
  그 패턴; argv 만 surface, 권한 있는 ANU 세션이 실행).

TRUST 제거 (self-fire 해봤자 격리)
----------------------------------
* ``resolve_authoritative_owner`` — envelope 의 ``self_key_used`` /
  ``collector_role`` / "ANU key 사용" 문구를 **믿지 않는다**. ANU key 로
  ``cokacdir --cron-history <schedule_id>`` 를 실제 조회하여 schedule 의 실
  owner 를 authoritative 하게 판정한다. ANU key 가 조회 가능 ⇔ ANU 소유.
  조회 거부(access denied)/부재 ⇔ 비-ANU 소유.
* ``verify_collector_authoritative`` — 실 owner 가 ANU 가 아니면
  ``SELF_COLLECTOR_QUARANTINE`` 로 자동 격리 + ``NON_AUTHORITATIVE``. 격리된
  self-collector 결과는 보존하되(non-blocking, 작업물 미파기) 자동 push/merge
  판단에는 절대 사용하지 않는다(``usable_for_auto_merge=False``).
* ``write_quarantine_record`` — 격리 증거 artifact 를 durable 하게 남긴다.

정렬 (중복 신설 0)
------------------
owner-key 판정의 권위 게이트는 ``dispatch.callback_owner_enforcer.
enforce_callback_owner`` 를 **재사용**한다(신규 owner 검증 로직 미신설).
ANU-owned callback request argv 생성은 ``dispatch.normal_fallback_callback_
helper.build_anu_owned_callback_request`` 를 **재사용**한다(lazy import).
task-2713(CALLBACK_LOCATION_METADATA) 의 worktree-mismatch envelope 필드 ·
task-2715(REMEDIATION_HEAD_ADVANCE) 와 기능 중복 0 — 본 모듈은 callback
**ownership(소유권) authority** 축만 담당한다.

Layer A / NO-CRON: 이 모듈은 cron register/remove 0, dispatch 0, merge 0.
실 cokacdir 호출은 ``RealCokacdirCronHistoryProbe`` (운영 전용, 테스트는
주입 fake 만 사용) 의 read-only ``--cron-history`` 조회 1건뿐이다.
"""
from __future__ import annotations

import json
import os
import re
import subprocess
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Callable, List, Optional, Sequence

# 재사용 (중복 신설 0): owner-key 권위 게이트는 +49 enforcer.
# (pyright extraPaths 가 main workspace 를 가리켜 worktree-local 분석에서
#  미해석될 수 있으나 origin/main · worktree 양쪽에 존재 — runtime 결선 정상.
#  sibling normal_fallback_callback_helper 와 동일 ignore 관용.)
from dispatch.callback_owner_enforcer import (  # pyright: ignore[reportMissingImports]
    ANU_KEY_2553 as ANU_KEY,
    DEFAULT_ANU_KEYS,
    SELF_COLLECTOR_FORBIDDEN,
    is_anu_key,
)

SCHEMA = "dispatch.anu_owned_callback_enforcement.v1"

# 운영 상수 (회장 verbatim — 하드 경계, live_cron_state_verifier 와 동일값).
ANU_CHAT_ID = "6937032012"
COKACDIR_BINARY = "/usr/local/bin/cokacdir"
CANONICAL_ROOT = "/home/jay/workspace"
DEFAULT_QUARANTINE_DIR = os.path.join(
    CANONICAL_ROOT, "memory", "events", "quarantine"
)
DEFAULT_RESULT_DIR = os.path.join(CANONICAL_ROOT, "memory", "events")

# envelope 가 stale 로 판정되는 기본 임계(초). 회장 doctrine: stale envelope
# 재사용은 reject — 너무 오래된 callback envelope 는 다른 run 의 잔재일 수 있다.
DEFAULT_MAX_ENVELOPE_AGE_SECONDS = 6 * 3600  # 6h

# ── verdicts (collector 권위 판정) ───────────────────────────────────────────
VERDICT_AUTHORITATIVE = "AUTHORITATIVE"          # 실 owner=ANU → 권위 PASS
VERDICT_QUARANTINED = "QUARANTINED"              # 실 owner≠ANU → 자동 격리
VERDICT_NON_AUTHORITATIVE = "NON_AUTHORITATIVE"  # 조회 실패/판정 불가 → 미권위(fail-closed)
VERDICT_PENDING_OWNER_PROOF = "PENDING_OWNER_PROOF"  # owner proof 미확정(재시도 가능)
VERDICT_REJECTED = "REJECTED"                    # stale/binding mismatch → reject

# ── classifications ──────────────────────────────────────────────────────────
CLS_ANU_OWNED_AUTHORITATIVE = "ANU_OWNED_AUTHORITATIVE"
CLS_SELF_COLLECTOR_QUARANTINE = "SELF_COLLECTOR_QUARANTINE"
CLS_OWNER_QUERY_FAILED = "OWNER_QUERY_FAILED"
CLS_STALE_ENVELOPE_REJECTED = "STALE_ENVELOPE_REJECTED"
CLS_ENVELOPE_BINDING_MISMATCH = "ENVELOPE_BINDING_MISMATCH"
CLS_SCHEDULE_ID_MISSING = "SCHEDULE_ID_MISSING"
# task-2717+1 (B) status enum 보강 — 상태기계 명확화. ★ 아래 둘은 서로 다른 상태:
#  · NO_RESULT_JSON      : executor 가 result JSON 자체를 미작성(runner pickup 부재).
#  · PENDING_OWNER_PROOF : result 는 있으나 owner proof 가 미확정(조회 보류/재시도
#                          가능) — fail-closed(NON_AUTHORITATIVE)와 명시적으로 구분.
CLS_NO_RESULT_JSON = "NO_RESULT_JSON"
CLS_PENDING_OWNER_PROOF = "PENDING_OWNER_PROOF"

# ── owner-resolution outcomes (authoritative cron-history 조회 결과) ──────────
OWNER_ANU = "ANU_OWNED"                          # ANU key 가 조회 성공
OWNER_NOT_ANU = "NOT_ANU_OWNED_OR_ACCESS_DENIED"  # access denied / not found
OWNER_QUERY_FAILED = "QUERY_FAILED"              # 명시적 error/구조적 실패 → fail-closed
OWNER_PENDING = "PENDING_OWNER_PROOF"            # 결정적 답 없음(예외/garbled/미상) → 재시도 가능

# ``cokacdir --cron-history`` 가 비-소유/부재 schedule 에 돌려주는 error 신호
# (실측: {"status":"error","message":"schedule not found or access denied: ..."}).
_ACCESS_DENIED_TOKENS = ("not found", "access denied")

# 인프라 장애(probe 실행 자체 실패: 바이너리 부재/타임아웃/JSON 파싱 등) 를 실
# cokacdir access-denied 응답과 구분하는 **message-비의존 distinct 신호**. 실
# cokacdir 응답은 이 키를 절대 내보내지 않으므로, 인프라 예외 message 에 우연히
# "not found" 등이 섞여도 OWNER_NOT_ANU(false quarantine)로 오분류되지 않는다.
PROBE_EXEC_FAILED_KIND = "probe_exec_failed"

# ── fire-path gating (Phase 1) ───────────────────────────────────────────────
# Phase 1 은 cokacdir cron 정상 fire/closeout 경로를 전제하지 않는다. argv dry-run/
# build 까지만 허용하고 실 fire/closeout 은 NOT_ACTIVATED/PHASE2_REQUIRED 로 막는다.
FIRE_NOT_ACTIVATED = "NOT_ACTIVATED"      # 실 fire 비활성(기본) — argv dry-run 까지만
FIRE_PHASE2_REQUIRED = "PHASE2_REQUIRED"  # 실 fire/closeout 은 Phase 2 필요


# cron-history probe 시그니처: (schedule_id) -> 원시 cokacdir json dict.
CronHistoryProbe = Callable[[str], dict]


def _now() -> datetime:
    return datetime.now(timezone.utc)


def _iso(dt: datetime) -> str:
    return dt.strftime("%Y-%m-%dT%H:%M:%SZ")


def _sanitize_filename_component(value: object, *, default: str) -> str:
    """파일명 컴포넌트 sanitize — path separator(``/`` 와 ``\\`` 양쪽)를 ``_`` 로
    치환한다. ``str.replace(os.sep, "_")`` 는 Linux 에서 ``/`` 만 치환하고 ``\\``
    를 남겨 cross-platform 으로 안전하지 않으므로 양쪽을 명시 치환한다.
    ``value`` 가 None/빈/공백이면 ``str(None)='None'`` 같은 의도치 않은 파일명
    대신 ``default`` 를 반환한다(silent 'None' 파일명 0)."""
    if value is None:
        return default
    s = str(value).strip()
    if not s:
        return default
    sanitized = re.sub(r"[\\/]+", "_", s)
    return sanitized or default


# ─────────────────────────────────────────────────────────────────────────────
# TRUST 제거 — authoritative owner resolution (cron-history 실조회)
# ─────────────────────────────────────────────────────────────────────────────
class RealCokacdirCronHistoryProbe:
    """실 ``cokacdir --cron-history <sid> --chat <chat> --key <ANU_KEY>`` wrapper.

    ★ 핵심: 조회 key 는 **ANU key** 다. cron-history 는 ownership-checked API —
    ANU key 로 조회가 성공(``status=ok``)하면 그 schedule 은 ANU 소유, error(
    "not found or access denied")면 비-ANU 소유다. 즉 envelope 텍스트가 아니라
    실제 소유권을 묻는다.

    운영 collector 전용. 본 task regression 은 fake probe 를 주입하므로 이
    subprocess 경로는 테스트에서 실행되지 않는다.
    """

    binary = COKACDIR_BINARY

    def __init__(self, *, anu_key: str = ANU_KEY, chat_id: str = ANU_CHAT_ID,
                 timeout: int = 60) -> None:
        self.anu_key = anu_key
        self.chat_id = str(chat_id)
        self.timeout = timeout

    def __call__(self, schedule_id: str) -> dict:  # pragma: no cover - 운영 전용
        try:
            proc = subprocess.run(
                [
                    self.binary,
                    "--cron-history",
                    str(schedule_id),
                    "--chat",
                    self.chat_id,
                    "--key",
                    self.anu_key,
                ],
                capture_output=True,
                text=True,
                timeout=self.timeout,
            )
            return json.loads((proc.stdout or "").strip() or "{}")
        except (OSError, ValueError, subprocess.SubprocessError) as exc:
            # 인프라 실패 ≠ 실 access-denied. message 문자열에 의존하지 않는
            # distinct 신호(error_kind)로 표시 → resolve 가 OWNER_PENDING(재시도
            # 가능)으로 매핑하고, message 토큰 매칭으로 false quarantine 되지 않는다.
            return {
                "status": "error",
                "error_kind": PROBE_EXEC_FAILED_KIND,
                "message": f"probe exec failed: {exc}",
            }


@dataclass
class OwnerResolution:
    """authoritative cron-history 조회로 판정한 schedule 실 owner."""

    schema: str
    schedule_id: str
    outcome: str          # OWNER_ANU | OWNER_NOT_ANU | OWNER_QUERY_FAILED
    owner_is_anu: bool
    query_ok: bool        # 조회가 결정적이었나 (ok 또는 명시적 access-denied)
    run_count: Optional[int]
    raw_status: Optional[str]
    raw_message: str = ""
    reasons: List[str] = field(default_factory=list)

    def to_json(self) -> dict:
        return {
            "schema": self.schema,
            "schedule_id": self.schedule_id,
            "outcome": self.outcome,
            "owner_is_anu": self.owner_is_anu,
            "query_ok": self.query_ok,
            "run_count": self.run_count,
            "raw_status": self.raw_status,
            "raw_message": self.raw_message,
            "reasons": list(self.reasons),
        }


def resolve_authoritative_owner(
    schedule_id: str,
    *,
    probe: CronHistoryProbe,
) -> OwnerResolution:
    """schedule 의 실 owner 가 ANU 인지 cron-history(ANU key 조회)로 판정.

    * ``status == "ok"``  → ANU key 가 schedule 조회 성공 ⇒ **ANU 소유**.
    * ``status == "error"`` 이고 message 가 "not found"/"access denied" 포함
      → ANU key 로 조회 거부/부재 ⇒ **비-ANU 소유** (regression 2 의 access
      denied 판정 포함).
    * 그 외(파싱 불가 / subprocess 실패 / 알 수 없는 응답) → **QUERY_FAILED**
      (fail-closed; envelope 텍스트로 fallback 하지 않는다).

    envelope 의 owner_key/self_key_used/collector_role 텍스트는 이 함수에
    **입력되지 않는다** — 오직 실제 schedule 소유권만 본다.
    """
    if not schedule_id or not str(schedule_id).strip():
        return OwnerResolution(
            schema=SCHEMA, schedule_id=str(schedule_id or ""),
            outcome=OWNER_QUERY_FAILED, owner_is_anu=False, query_ok=False,
            run_count=None, raw_status=None,
            reasons=["schedule_id 없음 — authoritative owner 조회 불가."],
        )
    if not callable(probe):
        # probe 미주입/비-callable 은 일시 장애가 아니라 설정 오류다 —
        # PENDING(재시도 가능)이 아니라 QUERY_FAILED 로 즉시 fail-closed.
        return OwnerResolution(
            schema=SCHEMA, schedule_id=str(schedule_id),
            outcome=OWNER_QUERY_FAILED, owner_is_anu=False, query_ok=False,
            run_count=None, raw_status=None,
            reasons=["probe 가 실행 가능(callable)하지 않습니다 — 설정 오류(fail-closed)."],
        )
    try:
        raw = probe(str(schedule_id))
    except Exception as exc:  # noqa: BLE001 — probe 예외는 결정적 답 아님 → 재시도 가능
        return OwnerResolution(
            schema=SCHEMA, schedule_id=str(schedule_id),
            outcome=OWNER_PENDING, owner_is_anu=False, query_ok=False,
            run_count=None, raw_status=None,
            reasons=[
                f"cron-history probe 예외 → owner proof 미확정(PENDING, 재시도 "
                f"가능, fail-closed 아님): {exc}"
            ],
        )

    if not isinstance(raw, dict):
        return OwnerResolution(
            schema=SCHEMA, schedule_id=str(schedule_id),
            outcome=OWNER_PENDING, owner_is_anu=False, query_ok=False,
            run_count=None, raw_status=None,
            reasons=[
                "cron-history 응답이 dict 아님(garbled) → owner proof 미확정"
                "(PENDING, 재시도 가능)."
            ],
        )

    status = str(raw.get("status") or "").strip().lower()
    message = str(raw.get("message") or "")

    if status == "ok":
        count = raw.get("count")
        try:
            count_i = int(count) if count is not None else None
        except (TypeError, ValueError):
            count_i = None
        return OwnerResolution(
            schema=SCHEMA, schedule_id=str(schedule_id),
            outcome=OWNER_ANU, owner_is_anu=True, query_ok=True,
            run_count=count_i, raw_status=status,
            reasons=[
                "ANU key 로 cron-history 조회 성공 — schedule 은 ANU 소유 "
                "(authoritative)."
            ],
        )

    if status == "error":
        # 인프라 장애(probe 실행 실패)는 실 cokacdir access-denied 응답이 아니다.
        # distinct error_kind 신호가 있으면 message 토큰(not found/access denied)을
        # 보기 전에 OWNER_PENDING(재시도 가능, fail-closed 아님)으로 매핑한다.
        # 인프라 장애를 self-collector(OWNER_NOT_ANU)로 오분류 → false quarantine 차단.
        if str(raw.get("error_kind") or "") == PROBE_EXEC_FAILED_KIND:
            return OwnerResolution(
                schema=SCHEMA, schedule_id=str(schedule_id),
                outcome=OWNER_PENDING, owner_is_anu=False, query_ok=False,
                run_count=None, raw_status=status, raw_message=message,
                reasons=[
                    "cron-history probe 실행 실패(인프라 장애 — 바이너리 부재/"
                    "타임아웃/파싱 등) → owner proof 미확정(PENDING, 재시도 가능, "
                    "fail-closed 아님). 실 access-denied 응답이 아니므로 "
                    "OWNER_NOT_ANU(false quarantine) 로 분류하지 않는다."
                ],
            )
        low = message.lower()
        if any(tok in low for tok in _ACCESS_DENIED_TOKENS):
            return OwnerResolution(
                schema=SCHEMA, schedule_id=str(schedule_id),
                outcome=OWNER_NOT_ANU, owner_is_anu=False, query_ok=True,
                run_count=None, raw_status=status, raw_message=message,
                reasons=[
                    "ANU key 로 cron-history 조회 거부/부재 (not found / "
                    "access denied) — schedule 은 비-ANU 소유 (self-collector "
                    "신호)."
                ],
            )
        # 명시적 error 인데 access-denied 신호가 아님 → 보수적 fail-closed.
        return OwnerResolution(
            schema=SCHEMA, schedule_id=str(schedule_id),
            outcome=OWNER_QUERY_FAILED, owner_is_anu=False, query_ok=False,
            run_count=None, raw_status=status, raw_message=message,
            reasons=[
                f"cron-history error 가 access-denied 신호 아님 → fail-closed: "
                f"{message!r}"
            ],
        )

    # ok/error 도 아닌 알 수 없는 status → 결정적 답 없음 → PENDING(재시도 가능).
    return OwnerResolution(
        schema=SCHEMA, schedule_id=str(schedule_id),
        outcome=OWNER_PENDING, owner_is_anu=False, query_ok=False,
        run_count=None, raw_status=status or None, raw_message=message,
        reasons=[
            f"알 수 없는 cron-history status={status!r} → owner proof 미확정"
            "(PENDING, 재시도 가능)."
        ],
    )


@dataclass
class CollectorVerification:
    """collector 권위 판정 결과. self-collector 결과는 NON_AUTHORITATIVE 이며
    자동 push/merge 판단에 절대 사용하지 않는다(``usable_for_auto_merge``)."""

    schema: str
    verdict: str             # AUTHORITATIVE | QUARANTINED | NON_AUTHORITATIVE | REJECTED
    classification: str
    task_id: str
    schedule_id: str
    owner_resolution: Optional[dict]
    envelope_claims: dict    # 기록만 — 판정에 미사용 (TRUST 제거 증거)
    work_preserved: bool      # 격리해도 작업물은 파기하지 않음 (non-blocking)
    quarantine_record_path: Optional[str] = None
    reasons: List[str] = field(default_factory=list)

    @property
    def ok(self) -> bool:
        return self.verdict == VERDICT_AUTHORITATIVE

    @property
    def usable_for_auto_merge(self) -> bool:
        """오직 authoritative(실 owner=ANU) 일 때만 자동 push/merge 판단 사용 가능.
        quarantine / non-authoritative / rejected → 절대 0 (회장 doctrine)."""
        return self.verdict == VERDICT_AUTHORITATIVE

    def to_json(self) -> dict:
        return {
            "schema": self.schema,
            "verdict": self.verdict,
            "classification": self.classification,
            "task_id": self.task_id,
            "schedule_id": self.schedule_id,
            "owner_resolution": self.owner_resolution,
            "envelope_claims": dict(self.envelope_claims),
            "work_preserved": self.work_preserved,
            "usable_for_auto_merge": self.usable_for_auto_merge,
            "quarantine_record_path": self.quarantine_record_path,
            "reasons": list(self.reasons),
        }


def _extract_claims(envelope: dict) -> dict:
    """envelope 가 주장하는(=불신해야 하는) 소유권 텍스트만 추출(기록용)."""
    return {
        "claimed_owner_key": envelope.get("owner_key")
        or envelope.get("collector_key"),
        "claimed_self_key_used": envelope.get("self_key_used"),
        "claimed_collector_role": envelope.get("collector_role"),
        "claimed_schedule_id": envelope.get("schedule_id"),
    }


# timestamp 값이 존재하나 어떤 포맷으로도 파싱되지 않을 때, staleness 체크를
# silent bypass 하지 않기 위해 '무한히 오래됨' 으로 취급하는 sentinel(초). 어떤
# max_envelope_age_seconds 임계보다도 크므로 항상 stale 로 reject 된다 — 파싱
# 실패가 staleness 우회로 악용되지 않게 한다(silent pass 0).
_UNPARSEABLE_TS_STALE_SECONDS = 10 ** 12


def _parse_timestamp_value(val: object) -> Optional[datetime]:
    """단일 timestamp 값 → tz-aware UTC datetime.

    숫자형 epoch(int/float 및 숫자 문자열)와 ISO/날짜 문자열 포맷을 모두 처리한다.
    파싱 불가 시 None (TypeError/ValueError/OverflowError/OSError 모두 핸들 —
    숫자 epoch 가 ``strptime`` 에 들어가 TypeError 로 터지던 문제 포함).

    수정2 (task-2720 2차 fix, High #466):
      * "Z" suffix ISO(예: "2026-06-01T04:00:00Z") — 회귀 0 으로 계속 파싱.
      * timezone offset ISO(예: "2026-06-01T13:00:00+09:00") — 신규 지원.
      * datetime.fromisoformat 사용(Python 3.7+). "Z" 는 fromisoformat 이
        직접 처리 못하므로 끝의 "Z" 를 "+00:00" 으로 치환 후 파싱.
      * naive datetime 은 UTC 로 간주하여 tzinfo=timezone.utc 부여(tz-aware 정규화).
    """
    # None 은 즉시 None — str(None)='None' 으로 변환해 불필요한 float/strptime
    # 루프를 도는 비용 제거(결과는 동일하게 None).
    if val is None:
        return None
    # (1) 숫자형 epoch(int/float). bool 은 int subclass 이므로 명시 제외.
    if isinstance(val, (int, float)) and not isinstance(val, bool):
        try:
            return datetime.fromtimestamp(float(val), tz=timezone.utc)
        except (TypeError, ValueError, OverflowError, OSError):
            return None
    s = str(val).strip()
    if not s:
        return None
    # (2) 숫자 문자열 epoch(예: "1717000000" / "1717000000.5").
    try:
        return datetime.fromtimestamp(float(s), tz=timezone.utc)
    except (TypeError, ValueError, OverflowError, OSError):
        pass
    # (3) fromisoformat 경로: "Z" suffix 와 timezone offset("+09:00") 양쪽 처리.
    #     Python 3.7+ fromisoformat 은 "Z" 를 직접 파싱하지 못하므로 치환.
    try:
        iso_s = s
        if iso_s.endswith("Z"):
            iso_s = iso_s[:-1] + "+00:00"
        dt = datetime.fromisoformat(iso_s)
        # naive → UTC tz-aware 정규화 (aware 는 그대로, 비교 일관성 보장).
        if dt.tzinfo is None:
            dt = dt.replace(tzinfo=timezone.utc)
        return dt
    except (TypeError, ValueError):
        pass
    # (4) 날짜만 포맷(fallback).
    for fmt in (
        "%Y-%m-%d %H:%M:%S",
        "%Y-%m-%d",
    ):
        try:
            return datetime.strptime(s, fmt).replace(tzinfo=timezone.utc)
        except (TypeError, ValueError):
            continue
    return None


def _envelope_age_seconds(envelope: dict, *, now: datetime) -> Optional[int]:
    """envelope 발급 시각 → now 까지 경과 초.

    * timestamp 키가 아예 없으면 None (계산 불가 — 호출부가 staleness 미적용).
    * 값이 하나라도 파싱되면 해당 경과 초를 반환(숫자 epoch 포함).
    * 값은 존재하나 전부 파싱 실패 → ``_UNPARSEABLE_TS_STALE_SECONDS`` 반환
      (항상 stale 로 reject). 파싱 실패를 None 으로 삼켜 staleness 를 silent
      bypass 하지 않는다(silent pass 0).
    """
    saw_value = False
    for key in ("recorded_at", "created_at", "ts", "ts_utc", "fired_at"):
        val = envelope.get(key)
        if val is None or (isinstance(val, str) and not val.strip()):
            continue
        saw_value = True
        dt = _parse_timestamp_value(val)
        if dt is not None:
            return int((now - dt).total_seconds())
    if saw_value:
        # 값은 있었으나 전부 파싱 실패 → stale 취급(silent bypass 금지).
        return _UNPARSEABLE_TS_STALE_SECONDS
    return None


def verify_collector_authoritative(
    *,
    task_id: str,
    envelope: dict,
    probe: CronHistoryProbe,
    executor_key: str = "",
    schedule_id: Optional[str] = None,
    now: Optional[datetime] = None,
    max_envelope_age_seconds: int = DEFAULT_MAX_ENVELOPE_AGE_SECONDS,
    quarantine_dir: Optional[str] = None,
    write_quarantine: bool = True,
) -> CollectorVerification:
    """collector 의 권위 판정 (TRUST 제거 핵심 진입점).

    회장 doctrine 적용 순서 (fail-closed):
      1. envelope ↔ task_id binding (stale receipt 재사용 차단, regression 3).
      2. envelope 발급 시각이 너무 오래됨 → ``STALE_ENVELOPE_REJECTED``
         (regression 3).
      3. schedule_id 확보(인자 우선, 없으면 envelope) — 없으면 격리 불가 판정.
      4. ``resolve_authoritative_owner`` 로 **실 owner** 판정 (envelope 텍스트
         미사용).
         * 실 owner=ANU → ``AUTHORITATIVE`` (regression 4).
         * 실 owner≠ANU (access denied/not found) → ``SELF_COLLECTOR_
           QUARANTINE`` + ``NON_AUTHORITATIVE`` (regression 1·2·6). 작업물은
           보존(non-blocking), 자동 push/merge 사용 0.
         * 조회 실패 → ``NON_AUTHORITATIVE`` / ``OWNER_QUERY_FAILED``
           (fail-closed).

    envelope.owner_key / self_key_used / collector_role 는 ``envelope_claims``
    로 기록만 하고 판정에는 절대 쓰지 않는다.
    """
    now = now or _now()
    if not isinstance(envelope, dict):
        # envelope 가 dict 아님/None → _extract_claims(envelope.get(...)) 가
        # AttributeError 로 비정상 종료하기 전에 reject(fail-closed).
        return CollectorVerification(
            schema=SCHEMA, verdict=VERDICT_REJECTED,
            classification=CLS_ENVELOPE_BINDING_MISMATCH,
            task_id=str(task_id), schedule_id="",
            owner_resolution=None, envelope_claims={},
            work_preserved=True,
            reasons=["envelope 이 올바른 dict 형식이 아닙니다 — 판정 불가(reject)."],
        )
    claims = _extract_claims(envelope)
    qdir = quarantine_dir or DEFAULT_QUARANTINE_DIR

    # (1) task_id binding — stale receipt 재사용 차단.
    env_task_id = str(envelope.get("task_id") or "").strip()
    if env_task_id != str(task_id).strip():
        return CollectorVerification(
            schema=SCHEMA, verdict=VERDICT_REJECTED,
            classification=CLS_ENVELOPE_BINDING_MISMATCH,
            task_id=str(task_id), schedule_id="",
            owner_resolution=None, envelope_claims=claims,
            work_preserved=True,
            reasons=[
                "envelope.task_id != caller task_id — stale/foreign receipt "
                f"재사용 reject: envelope={env_task_id!r} caller={task_id!r}."
            ],
        )

    # (2) staleness — 너무 오래된 envelope 재사용 reject.
    age = _envelope_age_seconds(envelope, now=now)
    if age is not None and age > max_envelope_age_seconds:
        return CollectorVerification(
            schema=SCHEMA, verdict=VERDICT_REJECTED,
            classification=CLS_STALE_ENVELOPE_REJECTED,
            task_id=str(task_id),
            schedule_id=str(schedule_id or envelope.get("schedule_id") or ""),
            owner_resolution=None, envelope_claims=claims,
            work_preserved=True,
            reasons=[
                f"envelope 발급 후 {age}s 경과 > {max_envelope_age_seconds}s "
                "임계 — stale envelope 재사용 reject (regression 3)."
            ],
        )

    # (3) schedule_id 확보 (인자 우선 — envelope schedule_id 도 신뢰 대상이 아님,
    #     단지 '어떤 schedule 을 조회할지' 의 식별자일 뿐 소유권 주장은 무시).
    sid = str(schedule_id or envelope.get("schedule_id") or "").strip()
    if not sid:
        return CollectorVerification(
            schema=SCHEMA, verdict=VERDICT_NON_AUTHORITATIVE,
            classification=CLS_SCHEDULE_ID_MISSING,
            task_id=str(task_id), schedule_id="",
            owner_resolution=None, envelope_claims=claims,
            work_preserved=True,
            reasons=[
                "schedule_id 없음 — authoritative owner 조회 불가 → "
                "NON_AUTHORITATIVE (자동 push/merge 사용 0)."
            ],
        )

    # (4) 실 owner authoritative 판정.
    res = resolve_authoritative_owner(sid, probe=probe)

    if res.owner_is_anu:
        return CollectorVerification(
            schema=SCHEMA, verdict=VERDICT_AUTHORITATIVE,
            classification=CLS_ANU_OWNED_AUTHORITATIVE,
            task_id=str(task_id), schedule_id=sid,
            owner_resolution=res.to_json(), envelope_claims=claims,
            work_preserved=True,
            reasons=[
                "schedule 실 owner = ANU (cron-history authoritative) — "
                "AUTHORITATIVE callback, 자동 push/merge 판단 사용 가능 "
                "(regression 4)."
            ] + list(res.reasons),
        )

    if res.outcome == OWNER_NOT_ANU:
        # self-collector — 자동 격리 (NON_AUTHORITATIVE, 작업물 보존/non-blocking).
        verification = CollectorVerification(
            schema=SCHEMA, verdict=VERDICT_QUARANTINED,
            classification=CLS_SELF_COLLECTOR_QUARANTINE,
            task_id=str(task_id), schedule_id=sid,
            owner_resolution=res.to_json(), envelope_claims=claims,
            work_preserved=True,
            reasons=[
                "schedule 실 owner ≠ ANU (envelope 텍스트가 ANU 주장이어도 "
                "cron-history 권위가 우선) — SELF_COLLECTOR_QUARANTINE 자동 "
                "격리, NON_AUTHORITATIVE. 작업물은 보존(non-blocking) 하되 "
                "자동 push/merge 판단 사용 0 (regression 1·2·6).",
                f"executor_key={executor_key!r} 가 self-key 로 collector 를 "
                "흉내냈을 가능성 — owner 주장은 무시되고 실 소유권으로 격리됨.",
            ] + list(res.reasons),
        )
        if write_quarantine:
            try:
                verification.quarantine_record_path = write_quarantine_record(
                    verification, quarantine_dir=qdir, now=now,
                    executor_key=executor_key,
                )
            except OSError as exc:  # 기록 실패해도 격리 판정은 유지.
                verification.reasons.append(
                    f"quarantine 기록 실패(판정은 유지): {exc}"
                )
        return verification

    if res.outcome == OWNER_PENDING:
        # owner proof 미확정(예외/garbled/미상 status) → 재시도 가능한 PENDING.
        # ★ fail-closed(NON_AUTHORITATIVE)와 구분되는 별도 상태. self-collector
        #   격리도 아니므로 quarantine artifact 는 남기지 않는다.
        return CollectorVerification(
            schema=SCHEMA, verdict=VERDICT_PENDING_OWNER_PROOF,
            classification=CLS_PENDING_OWNER_PROOF,
            task_id=str(task_id), schedule_id=sid,
            owner_resolution=res.to_json(), envelope_claims=claims,
            work_preserved=True,
            reasons=[
                "owner proof 미확정(cron-history 결정적 답 없음) → "
                "PENDING_OWNER_PROOF (재시도 가능). fail-closed 와 구분 — "
                "자동 push/merge 사용 0(미확정), 작업물 보존.",
            ] + list(res.reasons),
        )

    # 명시적 error/구조적 실패 → fail-closed NON_AUTHORITATIVE (재시도 전제 없음).
    return CollectorVerification(
        schema=SCHEMA, verdict=VERDICT_NON_AUTHORITATIVE,
        classification=CLS_OWNER_QUERY_FAILED,
        task_id=str(task_id), schedule_id=sid,
        owner_resolution=res.to_json(), envelope_claims=claims,
        work_preserved=True,
        reasons=[
            "authoritative owner 조회 fail-closed → NON_AUTHORITATIVE "
            "(envelope 텍스트로 fallback 0, 자동 push/merge 사용 0)."
        ] + list(res.reasons),
    )


def write_quarantine_record(
    verification: CollectorVerification,
    *,
    quarantine_dir: str = DEFAULT_QUARANTINE_DIR,
    now: Optional[datetime] = None,
    executor_key: str = "",
) -> str:
    """self-collector 격리 증거 artifact 를 durable 하게 기록.

    회장 doctrine: self-collector 결과 삭제 금지 — NON_AUTHORITATIVE 로 보존 후
    독립 ANU 재검증으로 대체. 본 함수는 그 보존 증거를 남긴다(작업물 파기 0).
    파일명: ``<task_id>.<schedule_id>.quarantine.json``.
    """
    now = now or _now()
    os.makedirs(quarantine_dir, exist_ok=True)
    safe_task = _sanitize_filename_component(
        verification.task_id, default="unknown"
    )
    safe_sid = _sanitize_filename_component(
        verification.schedule_id, default="nosid"
    )
    path = os.path.join(quarantine_dir, f"{safe_task}.{safe_sid}.quarantine.json")
    record = {
        "schema": "dispatch.anu_owned_callback_enforcement.quarantine.v1",
        "record_type": "SELF_COLLECTOR_QUARANTINE",
        "classification": "SELF_COLLECTOR_QUARANTINE — NON_AUTHORITATIVE",
        "recorded_at": _iso(now),
        "task_id": verification.task_id,
        "schedule_id": verification.schedule_id,
        "executor_key": executor_key,
        "work_preserved": True,
        "usable_for_auto_merge": False,
        "verification": verification.to_json(),
        "disposition": (
            "self-collector 결과 보존(삭제 금지). 독립 ANU 재검증으로 대체. "
            "self-collector 의 검증 주장 신뢰 금지."
        ),
    }
    tmp = f"{path}.{os.getpid()}.tmp"
    with open(tmp, "w", encoding="utf-8") as fh:
        json.dump(record, fh, ensure_ascii=False, indent=2)
    os.replace(tmp, path)
    return path


# ─────────────────────────────────────────────────────────────────────────────
# NEED 제거 — executor result-JSON-only 완료 + ANU-key runner pickup→fire
# ─────────────────────────────────────────────────────────────────────────────
EXECUTOR_COMPLETION_SIGNAL = "RESULT_JSON_WRITTEN"


@dataclass
class ExecutorResultWrite:
    """executor 완료 = result JSON 작성 (NOT callback schedule 생성)."""

    schema: str
    completion_signal: str
    task_id: str
    result_json_path: str
    schedule_created: bool      # 항상 False — executor 는 schedule 안 만든다.
    callback_fired: bool        # 항상 False — executor 는 callback 안 쏜다.
    reasons: List[str] = field(default_factory=list)

    @property
    def ok(self) -> bool:
        return (
            self.completion_signal == EXECUTOR_COMPLETION_SIGNAL
            and not self.schedule_created
            and not self.callback_fired
        )

    def to_json(self) -> dict:
        return {
            "schema": self.schema,
            "completion_signal": self.completion_signal,
            "task_id": self.task_id,
            "result_json_path": self.result_json_path,
            "schedule_created": self.schedule_created,
            "callback_fired": self.callback_fired,
            "reasons": list(self.reasons),
        }


def executor_write_result_json(
    *,
    task_id: str,
    payload: dict,
    result_dir: str = DEFAULT_RESULT_DIR,
    now: Optional[datetime] = None,
) -> ExecutorResultWrite:
    """executor 완료조건 = result JSON 작성 (NEED 제거).

    executor 는 callback schedule 을 만들지 않고 callback 을 쏘지도 않는다.
    오직 지정 result JSON 만 작성한다. 별도 ANU-key runner 가 이 파일을 pickup
    하여 ANU-owned callback 을 발사한다. (Layer A / NO-CRON — schedule register
    0, subprocess 0, callback fire 0.)

    payload 에 schedule_id/owner_key/collector_role 같은 소유권 주장이 들어와도
    executor 는 schedule 을 만들 권한이 없으므로 무의미하다 — 작성하는 것은
    결과 데이터(report_path/sha256/summary/expected_files 등)뿐이다.
    """
    now = now or _now()
    os.makedirs(result_dir, exist_ok=True)
    safe_task = _sanitize_filename_component(task_id, default="unknown")
    path = os.path.join(result_dir, f"{safe_task}.result.json")

    record = dict(payload)
    record.setdefault("task_id", str(task_id))
    record["completion_signal"] = EXECUTOR_COMPLETION_SIGNAL
    record["written_at"] = _iso(now)
    # executor 가 절대 하지 않는 것을 명시(자기-증명).
    record["schedule_created_by_executor"] = False
    record["callback_fired_by_executor"] = False

    tmp = f"{path}.{os.getpid()}.tmp"
    with open(tmp, "w", encoding="utf-8") as fh:
        json.dump(record, fh, ensure_ascii=False, indent=2)
    os.replace(tmp, path)

    return ExecutorResultWrite(
        schema=SCHEMA,
        completion_signal=EXECUTOR_COMPLETION_SIGNAL,
        task_id=str(task_id),
        result_json_path=path,
        schedule_created=False,
        callback_fired=False,
        reasons=[
            "executor 완료 = result JSON 작성 only. callback schedule 생성 0, "
            "callback 발사 0 (NEED 제거). ANU runner 가 pickup 하여 발사."
        ],
    )


@dataclass
class RunnerResult:
    """ANU-key runner 가 result JSON 을 pickup 하여 생성한 ANU-owned callback
    발사 request. argv 는 데이터일 뿐 — 실 발사는 권한 있는 ANU 세션이 한다
    (이 runner 가 직접 self-fire 하지 않는다)."""

    schema: str
    verdict: str             # PASS | FAIL
    task_id: str
    result_json_path: str
    owner_key: str
    pickup_ok: bool
    argv: Optional[List[str]]
    request: Optional[dict]
    reasons: List[str] = field(default_factory=list)
    # task-2717+1: pickup 상태 분류(예: NO_RESULT_JSON) + fire-path gating.
    classification: str = ""
    fire_path: str = FIRE_NOT_ACTIVATED   # Phase 1: argv dry-run 까지만, 실 fire 0
    dry_run: bool = True                  # ★ runner 는 절대 자가발사하지 않는다

    @property
    def ok(self) -> bool:
        return self.verdict == "PASS"

    def to_json(self) -> dict:
        return {
            "schema": self.schema,
            "verdict": self.verdict,
            "task_id": self.task_id,
            "result_json_path": self.result_json_path,
            "owner_key": self.owner_key,
            "pickup_ok": self.pickup_ok,
            "argv": list(self.argv) if self.argv is not None else None,
            "request": self.request,
            "classification": self.classification,
            "fire_path": self.fire_path,
            "dry_run": self.dry_run,
            "reasons": list(self.reasons),
        }


def _build_envelope_prompt(result: dict, *, anu_key: str, chat_id: str) -> str:
    """result JSON → envelope-only callback prompt (자유 지시문 금지)."""
    lines = [
        f"task_id={result.get('task_id', '')}",
        f"result_path={result.get('result_json_path', '')}",
        f"report_path={result.get('report_path', '')}",
        f"sha256={result.get('sha256', '')}",
        "collector_role=ANU",
        f"owner_key={anu_key}",
        f"chat_id={chat_id}",
        f"canonical_root={CANONICAL_ROOT}",
    ]
    return "\n".join(lines)


def anu_runner_pickup_and_fire(
    *,
    result_json_path: str,
    executor_key: str,
    anu_key: str = ANU_KEY,
    chat_id: str = ANU_CHAT_ID,
    prompt: Optional[str] = None,
    at: Optional[str] = None,
    dispatch_cron_id: str = "",
    normal_collector_cron_id: Optional[str] = None,
    fallback_callback_cron_id: Optional[str] = None,
    anu_keys: Sequence[str] = tuple(DEFAULT_ANU_KEYS),
    no_fallback: bool = True,
) -> RunnerResult:
    """ANU-key runner: result JSON pickup → **ANU-owned** callback 발사 request.

    fail-closed 핀:
      * ``anu_key`` 가 독립 ANU key 가 아니거나 executor self key 와 같으면
        REFUSE (argv=None) — runner 는 비-ANU key 로 발사하지 않는다.
      * owner-key 권위 게이트는 ``build_anu_owned_callback_request`` (=
        ``enforce_callback_owner`` 재사용)를 경유 — 중복 신설 0.

    이 함수는 실 cron 을 **직접 발사하지 않는다**(argv 데이터만 surface). 실
    발사는 권한 있는 ANU 세션이 ANU key 로 수행한다 — self-key 자가발사 금지
    (본 task 가 고치는 패턴).
    """
    # (0) result JSON pickup. 부재 = executor 가 result JSON 미작성 → NO_RESULT_JSON
    #     (별도 상태 — parse 실패/비-object 와도 구분).
    if not result_json_path or not os.path.isfile(result_json_path):
        return RunnerResult(
            schema=SCHEMA, verdict="FAIL", task_id="",
            result_json_path=str(result_json_path or ""), owner_key=anu_key,
            pickup_ok=False, argv=None, request=None,
            classification=CLS_NO_RESULT_JSON,
            reasons=[
                f"result JSON 부재(executor 미작성) — NO_RESULT_JSON, pickup "
                f"불가: {result_json_path!r}"
            ],
        )
    try:
        with open(result_json_path, "r", encoding="utf-8") as fh:
            result = json.load(fh)
    except (OSError, ValueError) as exc:
        return RunnerResult(
            schema=SCHEMA, verdict="FAIL", task_id="",
            result_json_path=str(result_json_path), owner_key=anu_key,
            pickup_ok=False, argv=None, request=None,
            reasons=[f"result JSON 파싱 실패: {exc}"],
        )
    if not isinstance(result, dict):
        return RunnerResult(
            schema=SCHEMA, verdict="FAIL", task_id="",
            result_json_path=str(result_json_path), owner_key=anu_key,
            pickup_ok=False, argv=None, request=None,
            reasons=["result JSON 이 object 아님."],
        )
    result.setdefault("result_json_path", result_json_path)
    task_id = str(result.get("task_id") or "").strip()

    # (0) fail-closed: result JSON 에 유효한 task_id 가 없으면 빈 task_id 로
    #     잘못된 callback argv 가 생성되는 것을 차단한다.
    if not task_id:
        return RunnerResult(
            schema=SCHEMA, verdict="FAIL", task_id="",
            result_json_path=str(result_json_path), owner_key=anu_key,
            pickup_ok=True, argv=None, request=None,
            reasons=[
                "result JSON 에 유효한 task_id 없음 — 잘못된 callback 발사 "
                "차단(fail-closed)."
            ],
        )

    # (1) fail-closed: runner 의 owner_key 는 반드시 독립 ANU key.
    if not is_anu_key(anu_key, anu_keys) or (anu_key == executor_key):
        return RunnerResult(
            schema=SCHEMA, verdict="FAIL", task_id=task_id,
            result_json_path=str(result_json_path), owner_key=anu_key,
            pickup_ok=True, argv=None, request=None,
            reasons=[
                f"runner owner_key={anu_key!r} 가 독립 ANU key 아님 또는 "
                f"executor self key 와 동일 → REFUSE ({SELF_COLLECTOR_FORBIDDEN}). "
                "runner 는 비-ANU key 로 callback 을 발사하지 않는다."
            ],
        )

    # (2) owner-key 권위 게이트 재사용 (중복 신설 0; lazy import 로 결선 견고).
    from dispatch.normal_fallback_callback_helper import (  # pyright: ignore[reportMissingImports]
        build_anu_owned_callback_request,
    )

    callback_prompt = prompt or _build_envelope_prompt(
        result, anu_key=anu_key, chat_id=str(chat_id)
    )
    req = build_anu_owned_callback_request(
        kind="normal",
        task_id=task_id,
        executor_key=executor_key,
        owner_key=anu_key,
        chat_id=str(chat_id),
        prompt=callback_prompt,
        at=at or "30s",
        dispatch_cron_id=dispatch_cron_id or f"{task_id}::dispatch",
        normal_collector_cron_id=(
            normal_collector_cron_id or f"{task_id}::normal"
        ),
        fallback_callback_cron_id=fallback_callback_cron_id,
        anu_keys=anu_keys,
        no_fallback=no_fallback,
    )

    if not req.ok:
        return RunnerResult(
            schema=SCHEMA, verdict="FAIL", task_id=task_id,
            result_json_path=str(result_json_path), owner_key=anu_key,
            pickup_ok=True, argv=None, request=req.to_json(),
            reasons=[
                "owner enforcement 미통과 → ANU-owned argv 미생성 (fail-closed)."
            ] + list(req.reasons),
        )

    return RunnerResult(
        schema=SCHEMA, verdict="PASS", task_id=task_id,
        result_json_path=str(result_json_path), owner_key=anu_key,
        pickup_ok=True, argv=req.argv, request=req.to_json(),
        reasons=[
            "result JSON pickup 정상 → owner=독립 ANU key 게이트 PASS → "
            "ANU-owned callback 발사 argv(데이터) 생성. 실 발사는 권한 있는 "
            "ANU 세션이 수행(자가발사 0) (regression 5)."
        ],
    )


# ─────────────────────────────────────────────────────────────────────────────
# fire-path gate (Phase 1) — 실 fire/closeout 차단, argv dry-run 까지만
# ─────────────────────────────────────────────────────────────────────────────
def fire_callback_request(
    runner_result: RunnerResult,
    *,
    activate: bool = False,
) -> dict:
    """Phase 1 fire-path gate.

    회장 doctrine(task-2717+1): Phase 1 에서 cokacdir cron 을 정상 fire/closeout
    실행 경로로 전제하지 않는다. 본 함수는 ANU-owned argv(데이터)를 **발사하지
    않고** 차단한다:

      * ``activate=False`` (기본) → ``NOT_ACTIVATED`` — argv dry-run 까지만.
      * ``activate=True``         → ``PHASE2_REQUIRED`` — 실 fire/closeout 은
        Phase 2 에서만. Phase 1 은 절대 실 cron 을 쏘지 않는다.

    ``fired`` 는 어느 경우에도 ``False`` 다(자가발사 0 — 본 task 가 고치는 패턴).
    실 발사는 권한 있는 ANU 세션이 Phase 2 경로로 수행한다.
    """
    argv = list(runner_result.argv) if runner_result.argv else None
    status = FIRE_PHASE2_REQUIRED if activate else FIRE_NOT_ACTIVATED
    return {
        "schema": SCHEMA,
        "status": status,
        "fired": False,        # ★ Phase 1 실 fire 0 — 항상 False
        "dry_run": True,
        "argv": argv,          # 데이터만 surface (발사 X)
        "task_id": runner_result.task_id,
        "owner_key": runner_result.owner_key,
        "reasons": [
            "Phase 1: cokacdir cron 정상 fire/closeout 경로 비활성 — argv "
            "dry-run/build 까지만 허용. 실 fire/closeout 은 PHASE2_REQUIRED. "
            "자가발사 0(self-collector 패턴 차단).",
        ],
    }


# ─────────────────────────────────────────────────────────────────────────────
# CLI
# ─────────────────────────────────────────────────────────────────────────────
def main(argv: Optional[List[str]] = None) -> int:
    import argparse

    ap = argparse.ArgumentParser(
        prog="dispatch.anu_owned_callback_enforcement"
    )
    sub = ap.add_subparsers(dest="cmd", required=True)

    # executor 완료 = result JSON 작성.
    ew = sub.add_parser("executor-write-result")
    ew.add_argument("--task-id", required=True)
    ew.add_argument("--result-dir", default=DEFAULT_RESULT_DIR)
    ew.add_argument("--report-path", default="")
    ew.add_argument("--sha256", default="")
    ew.add_argument("--summary", default="")

    # ANU-key runner: result JSON pickup → ANU-owned callback request.
    rn = sub.add_parser("anu-run")
    rn.add_argument("--result-json-path", required=True)
    rn.add_argument("--executor-key", required=True)
    rn.add_argument("--anu-key", default=ANU_KEY)
    rn.add_argument("--chat-id", default=ANU_CHAT_ID)
    rn.add_argument("--at", default=None)

    # collector authoritative verify (실 cokacdir cron-history 조회).
    cv = sub.add_parser("collector-verify")
    cv.add_argument("--task-id", required=True)
    cv.add_argument("--envelope-path", required=True)
    cv.add_argument("--schedule-id", default=None)
    cv.add_argument("--executor-key", default="")
    cv.add_argument("--anu-key", default=ANU_KEY)
    cv.add_argument("--chat-id", default=ANU_CHAT_ID)
    cv.add_argument("--quarantine-dir", default=DEFAULT_QUARANTINE_DIR)
    cv.add_argument("--no-write-quarantine", action="store_true")

    a = ap.parse_args(argv)

    if a.cmd == "executor-write-result":
        res = executor_write_result_json(
            task_id=a.task_id,
            payload={
                "report_path": a.report_path,
                "sha256": a.sha256,
                "summary": a.summary,
            },
            result_dir=a.result_dir,
        )
        print(json.dumps(res.to_json(), ensure_ascii=False))
        return 0 if res.ok else 2

    if a.cmd == "anu-run":
        res = anu_runner_pickup_and_fire(
            result_json_path=a.result_json_path,
            executor_key=a.executor_key,
            anu_key=a.anu_key,
            chat_id=a.chat_id,
            at=a.at,
        )
        print(json.dumps(res.to_json(), ensure_ascii=False))
        return 0 if res.ok else 2

    if a.cmd == "collector-verify":
        try:
            with open(a.envelope_path, "r", encoding="utf-8") as fh:
                envelope = json.load(fh)
        except (OSError, ValueError) as exc:
            print(json.dumps(
                {"verdict": VERDICT_NON_AUTHORITATIVE,
                 "error": f"envelope load failed: {exc}"},
                ensure_ascii=False))
            return 2
        probe = RealCokacdirCronHistoryProbe(
            anu_key=a.anu_key, chat_id=a.chat_id
        )
        res = verify_collector_authoritative(
            task_id=a.task_id,
            envelope=envelope if isinstance(envelope, dict) else {},
            probe=probe,
            executor_key=a.executor_key,
            schedule_id=a.schedule_id,
            quarantine_dir=a.quarantine_dir,
            write_quarantine=not a.no_write_quarantine,
        )
        print(json.dumps(res.to_json(), ensure_ascii=False))
        return 0 if res.ok else 2

    return 2


__all__ = [
    "SCHEMA",
    "ANU_KEY",
    "ANU_CHAT_ID",
    "DEFAULT_QUARANTINE_DIR",
    "DEFAULT_RESULT_DIR",
    "DEFAULT_MAX_ENVELOPE_AGE_SECONDS",
    "VERDICT_AUTHORITATIVE",
    "VERDICT_QUARANTINED",
    "VERDICT_NON_AUTHORITATIVE",
    "VERDICT_PENDING_OWNER_PROOF",
    "VERDICT_REJECTED",
    "CLS_ANU_OWNED_AUTHORITATIVE",
    "CLS_SELF_COLLECTOR_QUARANTINE",
    "CLS_OWNER_QUERY_FAILED",
    "CLS_STALE_ENVELOPE_REJECTED",
    "CLS_ENVELOPE_BINDING_MISMATCH",
    "CLS_SCHEDULE_ID_MISSING",
    "CLS_NO_RESULT_JSON",
    "CLS_PENDING_OWNER_PROOF",
    "OWNER_ANU",
    "OWNER_NOT_ANU",
    "OWNER_QUERY_FAILED",
    "OWNER_PENDING",
    "PROBE_EXEC_FAILED_KIND",
    "FIRE_NOT_ACTIVATED",
    "FIRE_PHASE2_REQUIRED",
    "fire_callback_request",
    "CronHistoryProbe",
    "RealCokacdirCronHistoryProbe",
    "OwnerResolution",
    "resolve_authoritative_owner",
    "CollectorVerification",
    "verify_collector_authoritative",
    "write_quarantine_record",
    "EXECUTOR_COMPLETION_SIGNAL",
    "ExecutorResultWrite",
    "executor_write_result_json",
    "RunnerResult",
    "anu_runner_pickup_and_fire",
    "main",
]


if __name__ == "__main__":
    raise SystemExit(main())
