# -*- coding: utf-8 -*-
"""ANU pickup P0-b audit-first decision-only 검증 하니스 (task-2729+5).

목적(read-only audit decision-only):
  pickup driver(dispatch/anu_pickup_driver.py) 및 pickup runner(dispatch/anu_result_pickup_runner.py)의
  결정 경로(decision path)를 isolated temp root 에서 재현·검증한다.
  런타임 코드 무수정(dispatch/** 절대 수정 금지), canonical root 무영향(절대 write 금지).
  실 발사 0건 — 모든 pickup_fn 은 fake 주입 또는 sealed_key_loader=lambda:None 으로 종단.

절대 제약:
  1. dispatch/** 런타임 코드 절대 수정/생성 금지 — import 만.
  2. /home/jay/workspace (CANONICAL_ROOT) memory/events, memory/state, ledger 절대 write 금지.
  3. ANU key raw 노출 0 — 실키·16hex literal·c119085 전체 키 절대 출력/코드 리터럴 금지.
  4. 실제 wake/cron 발사 0 — 모든 검증은 mock 주입 + isolated temp root 에서만 수행.
  5. CANONICAL_ROOT 는 snapshot 비교(read-only stat)에만 사용. 절대 write 금지.

사용법:
  python3 anu_pickup_p0b_audit_decision_check.py --json

반환값:
  exit 0 → all_pass=true, canonical_untouched=true, raw_key_exposure=0
  exit 1 → 위 조건 미충족
"""
from __future__ import annotations

import argparse
import importlib.util as _ilu
import json
import os
import re
import sys
import tempfile
import time
import types
from datetime import datetime
from pathlib import Path
from typing import Any, List, Optional, Tuple

# ── sys.path / dispatch 패키지 강제 로드 ────────────────────────────────────────
# scripts/harness/v36/ → parents[3] = worktree root
_ROOT = Path(__file__).resolve().parents[3]
if str(_ROOT) not in sys.path:
    sys.path.insert(0, str(_ROOT))

# worktree root 의 실제 dispatch 패키지를 sys.modules 에 고정.
# (pytest prepend 모드에서 tests/dispatch 가 가려지는 패턴과 동일하게 방어.)
_real_init = _ROOT / "dispatch" / "__init__.py"
_cached = sys.modules.get("dispatch")
if _cached is None or (getattr(_cached, "__file__", "") or "") != str(_real_init):
    for _k in [k for k in list(sys.modules) if k == "dispatch" or k.startswith("dispatch.")]:
        del sys.modules[_k]
    _spec = _ilu.spec_from_file_location(
        "dispatch", str(_real_init),
        submodule_search_locations=[str(_ROOT / "dispatch")]
    )
    assert _spec is not None and _spec.loader is not None
    _pkg = _ilu.module_from_spec(_spec)
    sys.modules["dispatch"] = _pkg
    _spec.loader.exec_module(_pkg)  # type: ignore[union-attr]

from dispatch import anu_pickup_driver as drv  # noqa: E402  # pyright: ignore[reportMissingImports]
from dispatch import anu_result_pickup_runner as rnr  # noqa: E402  # pyright: ignore[reportMissingImports]
from dispatch.anu_owned_callback_enforcement import VERDICT_AUTHORITATIVE  # noqa: E402  # pyright: ignore[reportMissingImports]

# ── 상수 ─────────────────────────────────────────────────────────────────────
# 노출 감지용 패턴으로만 사용. 이 문자열 자체가 실키 prefix — 전체 키는 절대 코드에 없음.
REAL_ANU_KEY_PREFIX = "c119085"
# 테스트용 비-ANU 더미 키 (ANU key 아님, 절대 실키 아님)
DUMMY_SEALED_KEY = "SEALED_KEY_DUMMY_NOT_ANU"


# ── 공개 함수 1: make_isolated_root ──────────────────────────────────────────
def make_isolated_root() -> str:
    """tempfile.mkdtemp 으로 isolated temp root 생성.

    그 아래 필요한 하위 디렉토리를 모두 생성하고 경로를 반환한다.
    CANONICAL_ROOT 는 절대 건드리지 않음.
    """
    root = tempfile.mkdtemp(prefix="p0b_audit_")
    for sub in [
        "memory/events",
        "memory/state",
        "memory/p0b_state/quarantine",
        "memory/p0b_state/processed",
    ]:
        os.makedirs(os.path.join(root, sub), exist_ok=True)
    return root


# ── 공개 함수 2: write_result_json ───────────────────────────────────────────
def write_result_json(
    root: str,
    task_id: str = "task-pilot-noop",
    *,
    payload: Optional[dict] = None,
    aged_seconds: float = 10,
    authoritative: bool = True,
) -> str:
    """root/memory/events/{task_id}.result.json 작성.

    payload 기본값은 spec에 명시된 최소 유효 JSON.
    os.utime 으로 mtime 을 (now - aged_seconds) 로 설정 → readiness aging 통과.
    경로 반환.
    """
    events_dir = os.path.join(root, "memory", "events")
    os.makedirs(events_dir, exist_ok=True)
    path = os.path.join(events_dir, f"{task_id}.result.json")

    if payload is None:
        payload = {
            "task_id": task_id,
            "completion_signal": "EXECUTOR_RESULT_WRITTEN",
            "collector_envelope": {
                "schedule_id": "SCHED-DUMMY",
                "self_key_used": False,
                "collector_role": "ANU",
            },
        }
    with open(path, "w", encoding="utf-8") as fh:
        json.dump(payload, fh, ensure_ascii=False)

    # readiness aging: mtime 을 과거로 설정
    now_ts = time.time()
    past_ts = now_ts - aged_seconds
    os.utime(path, (past_ts, past_ts))
    return path


# ── 공개 함수 3: fake_pickup_factory ─────────────────────────────────────────
def fake_pickup_factory() -> Tuple[List[dict], Any]:
    """fake pickup_fn 팩토리.

    반환: (calls_list, fake_pickup_fn)
    fake_pickup_fn 은 pickup_once 를 실호출하지 않고,
    호출 인자를 calls_list 에 기록 후 WAKE_BUILT SimpleNamespace 반환.
    실 발사 0.
    """
    calls_list: List[dict] = []

    def fake_pickup_fn(path: str, *, executor_key: str = "", ledger_path: Optional[str] = None) -> Any:
        calls_list.append({"path": path, "executor_key": executor_key, "ledger_path": ledger_path})
        return types.SimpleNamespace(
            verdict="WAKE_BUILT",
            argv=["<DRY_RUN_ARGV_REDACTED>"],
            reasons=[],
        )

    return calls_list, fake_pickup_fn


# ── 공개 함수 4: fake_verify_factory ─────────────────────────────────────────
def fake_verify_factory(verdict: str) -> Any:
    """verify_fn 팩토리.

    반환 callable(**kwargs) → SimpleNamespace(verdict=verdict, ...).
    VERDICT_AUTHORITATIVE(= "AUTHORITATIVE") 또는 비-AUTHORITATIVE 값으로
    통과/거부 시뮬레이션.
    """
    def verify_fn(**kwargs: Any) -> Any:
        return types.SimpleNamespace(
            verdict=verdict,
            classification="",
            reasons=[],
        )
    return verify_fn


# ── 공개 함수 5: fixed_clock ──────────────────────────────────────────────────
def fixed_clock(dt: Optional[datetime] = None) -> Any:
    """고정 시각 clock 반환. callable() -> datetime.

    기본값: datetime.now(drv.KST). readiness 결정론을 위해 고정.
    """
    if dt is None:
        dt = datetime.now(drv.KST)
    _dt = dt

    def _clock() -> datetime:
        return _dt

    return _clock


# ── 공개 함수 7(순서 앞): snapshot_canonical / assert_canonical_untouched ────
def snapshot_canonical() -> dict:
    """CANONICAL_ROOT memory/events 파일 목록+mtime, memory/state 목록,
    ledger 크기를 스냅샷으로 수집. read-only stat.
    """
    canonical_root = drv.CANONICAL_ROOT
    snap: dict = {
        "events_files": {},
        "state_files": [],
        "ledger_size": None,
    }

    events_dir = os.path.join(canonical_root, "memory", "events")
    if os.path.isdir(events_dir):
        try:
            for fname in os.listdir(events_dir):
                fpath = os.path.join(events_dir, fname)
                if os.path.isfile(fpath):
                    snap["events_files"][fname] = os.stat(fpath).st_mtime
        except OSError:
            pass

    state_dir = os.path.join(canonical_root, "memory", "state")
    if os.path.isdir(state_dir):
        try:
            snap["state_files"] = sorted(os.listdir(state_dir))
        except OSError:
            pass

    ledger = os.path.join(canonical_root, "memory", "events", "callback_4tuple_index.jsonl")
    if os.path.isfile(ledger):
        try:
            snap["ledger_size"] = os.path.getsize(ledger)
        except OSError:
            pass

    return snap


def assert_canonical_untouched(before: dict, after: dict) -> bool:
    """canonical 스냅샷 전후 비교. 변화 0 이면 True."""
    if before.get("events_files") != after.get("events_files"):
        return False
    if before.get("state_files") != after.get("state_files"):
        return False
    if before.get("ledger_size") != after.get("ledger_size"):
        return False
    return True


# ── 내부 헬퍼: 시나리오 실행기 ───────────────────────────────────────────────
def _run_scenario(
    name: str,
    expected: str,
    fn,  # () -> (actual_verdict, pickup_calls)
) -> dict:
    """시나리오 실행 + 결과 dict 반환."""
    try:
        actual, pickup_calls = fn()
        passed = actual == expected
        return {
            "name": name,
            "expected": expected,
            "actual": actual,
            "passed": passed,
            "pickup_calls": pickup_calls,
        }
    except Exception as exc:  # noqa: BLE001
        return {
            "name": name,
            "expected": expected,
            "actual": f"EXCEPTION: {exc}",
            "passed": False,
            "pickup_calls": 0,
        }


# ── 공개 함수 6: run_checks ───────────────────────────────────────────────────
def run_checks() -> dict:
    """11개 decision 시나리오를 각각 새 isolated root 에서 실행.

    반환 dict:
    {
      "checks": [{"name", "expected", "actual", "passed": bool, "pickup_calls": int}, ...],
      "all_pass": bool,
      "canonical_untouched": bool,
      "raw_key_exposure": int,
      "temp_roots": [...],
      "findings": [str, ...],
    }

    실 발사 0건 보장:
      - drv.scan_once 시나리오는 모두 fake_pickup_fn 주입.
      - rnr.pickup_once 직접 호출 시나리오는 sealed_key_loader=lambda: None (fail-closed).
    """
    before_snap = snapshot_canonical()
    checks = []
    temp_roots: List[str] = []
    findings: List[str] = []

    # ──────────────────────────────────────────────────────────────────────────
    # DC1: disabled_noop
    # ──────────────────────────────────────────────────────────────────────────
    def _dc1():
        root = make_isolated_root()
        temp_roots.append(root)
        calls, fake_fn = fake_pickup_factory()
        records = drv.scan_once(
            root,
            flag_reader=lambda: "disabled",
            pickup_fn=fake_fn,
            write_evidence=False,
        )
        verdict = records[0].verdict if records else "NO_RECORDS"
        return verdict, len(calls)

    checks.append(_run_scenario("DC1 disabled_noop", drv.VERDICT_NOOP_DISABLED, _dc1))

    # ──────────────────────────────────────────────────────────────────────────
    # DC2: enabled_nontarget — foo.md 같은 비-대상 파일
    # ──────────────────────────────────────────────────────────────────────────
    def _dc2():
        root = make_isolated_root()
        temp_roots.append(root)
        calls, fake_fn = fake_pickup_factory()
        nontarget_path = os.path.join(root, "memory", "events", "foo.md")
        with open(nontarget_path, "w") as fh:
            fh.write("not a result.json")
        records = drv.scan_once(
            root,
            flag_reader=lambda: "enabled",
            pickup_fn=fake_fn,
            paths=[nontarget_path],
            write_evidence=False,
        )
        verdict = records[0].verdict if records else "NO_RECORDS"
        return verdict, len(calls)

    checks.append(_run_scenario("DC2 enabled_nontarget", drv.VERDICT_NOOP_NOT_TARGET, _dc2))

    # ──────────────────────────────────────────────────────────────────────────
    # DC3: authoritative_wake — 유효 result.json(aged), AUTHORITATIVE, fake pickup
    # ──────────────────────────────────────────────────────────────────────────
    def _dc3():
        root = make_isolated_root()
        temp_roots.append(root)
        calls, fake_fn = fake_pickup_factory()
        path = write_result_json(root, "task-dc3", aged_seconds=10)
        clock = fixed_clock()
        records = drv.scan_once(
            root,
            flag_reader=lambda: "enabled",
            pickup_fn=fake_fn,
            verify_fn=fake_verify_factory(VERDICT_AUTHORITATIVE),
            clock=clock,
            paths=[path],
            stable_sec=2.0,
            readiness_retries=1,
            readiness_interval=0.0,
            sleep_fn=lambda s: None,
            write_evidence=False,
        )
        verdict = records[0].verdict if records else "NO_RECORDS"
        return verdict, len(calls)

    checks.append(_run_scenario("DC3 authoritative_wake", drv.VERDICT_WAKE_BUILT, _dc3))

    # ──────────────────────────────────────────────────────────────────────────
    # DC4: terminal_marker_skip — done 마커 미리 생성
    # ──────────────────────────────────────────────────────────────────────────
    def _dc4():
        root = make_isolated_root()
        temp_roots.append(root)
        calls, fake_fn = fake_pickup_factory()
        task_id = "task-dc4"
        path = write_result_json(root, task_id, aged_seconds=10)
        # done marker 미리 생성
        done_marker = os.path.join(os.path.dirname(path), f"{task_id}.pickup.done")
        with open(done_marker, "w") as fh:
            fh.write("{}")
        clock = fixed_clock()
        records = drv.scan_once(
            root,
            flag_reader=lambda: "enabled",
            pickup_fn=fake_fn,
            verify_fn=fake_verify_factory(VERDICT_AUTHORITATIVE),
            clock=clock,
            paths=[path],
            stable_sec=2.0,
            readiness_retries=1,
            readiness_interval=0.0,
            sleep_fn=lambda s: None,
            write_evidence=False,
        )
        verdict = records[0].verdict if records else "NO_RECORDS"
        return verdict, len(calls)

    checks.append(_run_scenario("DC4 terminal_marker_skip", drv.VERDICT_PICKUP_SKIP, _dc4))

    # ──────────────────────────────────────────────────────────────────────────
    # DC5: dedupe_skip — ledger 에 PICKUP_WAKE_BUILT/task_id 미리 기록
    # ──────────────────────────────────────────────────────────────────────────
    def _dc5():
        root = make_isolated_root()
        temp_roots.append(root)
        calls, fake_fn = fake_pickup_factory()
        task_id = "task-dc5"
        path = write_result_json(root, task_id, aged_seconds=10)
        # ledger 에 dedupe 항목 미리 기록
        ledger_path = os.path.join(root, "memory", "events", "callback_4tuple_index.jsonl")
        with open(ledger_path, "w", encoding="utf-8") as fh:
            fh.write(json.dumps({
                "event": "PICKUP_WAKE_BUILT",
                "task_id": task_id,
            }) + "\n")
        clock = fixed_clock()
        records = drv.scan_once(
            root,
            flag_reader=lambda: "enabled",
            pickup_fn=fake_fn,
            verify_fn=fake_verify_factory(VERDICT_AUTHORITATIVE),
            clock=clock,
            ledger_path=ledger_path,
            paths=[path],
            stable_sec=2.0,
            readiness_retries=1,
            readiness_interval=0.0,
            sleep_fn=lambda s: None,
            write_evidence=False,
        )
        verdict = records[0].verdict if records else "NO_RECORDS"
        return verdict, len(calls)

    checks.append(_run_scenario("DC5 dedupe_skip", drv.VERDICT_PICKUP_SKIP, _dc5))

    # ──────────────────────────────────────────────────────────────────────────
    # DC6: owner_proof_fail_quarantine — verify_fn 이 비-AUTHORITATIVE 반환
    # ──────────────────────────────────────────────────────────────────────────
    def _dc6():
        root = make_isolated_root()
        temp_roots.append(root)
        calls, fake_fn = fake_pickup_factory()
        path = write_result_json(root, "task-dc6", aged_seconds=10)
        clock = fixed_clock()
        records = drv.scan_once(
            root,
            flag_reader=lambda: "enabled",
            pickup_fn=fake_fn,
            verify_fn=fake_verify_factory("NON_AUTHORITATIVE"),
            clock=clock,
            paths=[path],
            stable_sec=2.0,
            readiness_retries=1,
            readiness_interval=0.0,
            sleep_fn=lambda s: None,
            write_evidence=False,
        )
        verdict = records[0].verdict if records else "NO_RECORDS"
        return verdict, len(calls)

    checks.append(_run_scenario("DC6 owner_proof_fail_quarantine", drv.VERDICT_QUARANTINE, _dc6))

    # ──────────────────────────────────────────────────────────────────────────
    # DC7: readiness_defer — aged_seconds=0 (방금 작성, 최근 mtime)
    # ──────────────────────────────────────────────────────────────────────────
    def _dc7():
        root = make_isolated_root()
        temp_roots.append(root)
        calls, fake_fn = fake_pickup_factory()
        # aged_seconds=0 → mtime = now → recent_mtime → NOT_READY
        path = write_result_json(root, "task-dc7", aged_seconds=0)
        # clock 도 now 기준으로 고정 (stable_sec=2 → age<2 → defer)
        clock = fixed_clock()
        records = drv.scan_once(
            root,
            flag_reader=lambda: "enabled",
            pickup_fn=fake_fn,
            verify_fn=fake_verify_factory(VERDICT_AUTHORITATIVE),
            clock=clock,
            paths=[path],
            stable_sec=2.0,
            readiness_retries=1,
            readiness_interval=0.0,
            sleep_fn=lambda s: None,
            write_evidence=False,
        )
        verdict = records[0].verdict if records else "NO_RECORDS"
        return verdict, len(calls)

    checks.append(_run_scenario("DC7 readiness_defer", drv.VERDICT_NOOP_NOT_READY, _dc7))

    # ──────────────────────────────────────────────────────────────────────────
    # DC8: null_byte_quarantine — result.json 에 \x00 포함
    # ──────────────────────────────────────────────────────────────────────────
    def _dc8():
        root = make_isolated_root()
        temp_roots.append(root)
        calls, fake_fn = fake_pickup_factory()
        task_id = "task-dc8"
        events_dir = os.path.join(root, "memory", "events")
        path = os.path.join(events_dir, f"{task_id}.result.json")
        # null byte 포함 내용 작성
        with open(path, "wb") as fh:
            content = json.dumps({
                "task_id": task_id,
                "completion_signal": "EXECUTOR_RESULT_WRITTEN",
                "collector_envelope": {"schedule_id": "SCHED-DUMMY"},
            }).encode("utf-8") + b"\x00"
            fh.write(content)
        # aged → readiness 통과
        past_ts = time.time() - 10
        os.utime(path, (past_ts, past_ts))
        clock = fixed_clock()
        records = drv.scan_once(
            root,
            flag_reader=lambda: "enabled",
            pickup_fn=fake_fn,
            verify_fn=fake_verify_factory(VERDICT_AUTHORITATIVE),
            clock=clock,
            paths=[path],
            stable_sec=2.0,
            readiness_retries=1,
            readiness_interval=0.0,
            sleep_fn=lambda s: None,
            write_evidence=False,
        )
        verdict = records[0].verdict if records else "NO_RECORDS"
        qr = records[0].quarantine_reason if records else None
        if verdict == drv.VERDICT_QUARANTINE and qr != "null_byte":
            findings.append(f"DC8: quarantine_reason={qr!r} (expected 'null_byte')")
        return verdict, len(calls)

    checks.append(_run_scenario("DC8 null_byte_quarantine", drv.VERDICT_QUARANTINE, _dc8))

    # ──────────────────────────────────────────────────────────────────────────
    # DC9: duplicate_prevention — scan_once 2회 연속. 누적 pickup_calls==1
    # ──────────────────────────────────────────────────────────────────────────
    def _dc9():
        root = make_isolated_root()
        temp_roots.append(root)
        calls, fake_fn = fake_pickup_factory()
        path = write_result_json(root, "task-dc9", aged_seconds=10)
        clock = fixed_clock()
        common_kwargs = dict(
            flag_reader=lambda: "enabled",
            pickup_fn=fake_fn,
            verify_fn=fake_verify_factory(VERDICT_AUTHORITATIVE),
            clock=clock,
            stable_sec=2.0,
            readiness_retries=1,
            readiness_interval=0.0,
            sleep_fn=lambda s: None,
            write_evidence=False,
        )
        # 1회차
        records1 = drv.scan_once(root, paths=[path], **common_kwargs)
        v1 = records1[0].verdict if records1 else "NO_RECORDS"
        # 2회차: 파일이 processed 로 이동됐으므로 glob 대상 없음 (side-effect 확인용)
        drv.scan_once(root, **common_kwargs)  # paths=None → glob, 추가 pickup 0 이어야 함
        # 2회차는 빈 records 이거나 pickup 미호출 → total_calls 로 중복 방지 검증
        total_calls = len(calls)
        # DC9 는 1회차 verdict 기준
        return v1, total_calls

    checks.append(_run_scenario("DC9 duplicate_prevention", drv.VERDICT_WAKE_BUILT, _dc9))
    # DC9 특별 검증: pickup_calls == 1
    dc9 = checks[-1]
    if dc9["passed"] and dc9["pickup_calls"] != 1:
        dc9["passed"] = False
        findings.append(f"DC9: pickup_calls={dc9['pickup_calls']} (expected 1)")

    # ──────────────────────────────────────────────────────────────────────────
    # DC10: pickup_once_sealed_key_missing — 실제 rnr.pickup_once 직접 호출
    #        sealed_key_loader=lambda: None → SEALED_KEY_MISSING, wake_built=False
    # ──────────────────────────────────────────────────────────────────────────
    def _dc10():
        root = make_isolated_root()
        temp_roots.append(root)
        path = write_result_json(root, "task-dc10", aged_seconds=10)
        clock = fixed_clock()
        ledger_path = os.path.join(root, "memory", "events", "callback_4tuple_index.jsonl")
        result = rnr.pickup_once(
            path,
            clock=clock,
            sealed_key_loader=lambda: None,  # fail-closed, 실 키 로드 0
            executor_key="",
            ledger_path=ledger_path,
        )
        verdict = result.verdict
        wake = result.wake_built
        if wake:
            findings.append("DC10: wake_built=True (실 발사 위험! sealed_key_loader=None 인데 wake)")
        return verdict, 0  # pickup_fn 직접 호출이므로 calls=0

    checks.append(_run_scenario(
        "DC10 pickup_once_sealed_key_missing",
        rnr.PICKUP_SEALED_KEY_MISSING,
        _dc10,
    ))

    # ──────────────────────────────────────────────────────────────────────────
    # DC11: pickup_once_terminal_skip — done marker 미리 생성
    # ──────────────────────────────────────────────────────────────────────────
    def _dc11():
        root = make_isolated_root()
        temp_roots.append(root)
        task_id = "task-dc11"
        path = write_result_json(root, task_id, aged_seconds=10)
        # done marker 미리 생성
        done_marker = os.path.join(os.path.dirname(path), f"{task_id}.pickup.done")
        with open(done_marker, "w") as fh:
            fh.write("{}")
        clock = fixed_clock()
        ledger_path = os.path.join(root, "memory", "events", "callback_4tuple_index.jsonl")
        result = rnr.pickup_once(
            path,
            clock=clock,
            sealed_key_loader=lambda: None,
            executor_key="",
            ledger_path=ledger_path,
        )
        verdict = result.verdict
        wake = result.wake_built
        if wake:
            findings.append("DC11: wake_built=True (terminal marker 있는데 wake!)")
        return verdict, 0

    checks.append(_run_scenario(
        "DC11 pickup_once_terminal_skip",
        rnr.PICKUP_SKIP_TERMINAL,
        _dc11,
    ))

    # ── 집계 ──────────────────────────────────────────────────────────────────
    after_snap = snapshot_canonical()
    canonical_untouched = assert_canonical_untouched(before_snap, after_snap)

    all_pass = all(c["passed"] for c in checks)

    summary = {
        "checks": checks,
        "all_pass": all_pass,
        "canonical_untouched": canonical_untouched,
        "raw_key_exposure": 0,  # 8번에서 채워짐
        "temp_roots": temp_roots,
        "findings": findings,
    }

    # ── raw_key_exposure 계산 ─────────────────────────────────────────────────
    summary["raw_key_exposure"] = _count_raw_key_exposure(summary)

    return summary


# ── 공개 함수 8: raw_key_exposure 계산 ───────────────────────────────────────
def _count_raw_key_exposure(summary: dict) -> int:
    """summary JSON 문자열에서 ANU key 노출 건수 검사.

    (a) REAL_ANU_KEY_PREFIX("c119085") 로 시작하는 16자리 이상 hex 패턴
    (b) 환경변수 COKACDIR_KEY_ANU 실 값이 출력 문자열에 포함되는지

    주의: 검사용으로 읽은 실키를 절대 stdout/summary 에 포함하지 않음.
    """
    try:
        dump = json.dumps(summary, ensure_ascii=False)
    except (TypeError, ValueError):
        dump = str(summary)

    count = 0

    # (a) c119085 로 시작하는 16자 이상 hex — 전체 키 리터럴 탐지
    # REAL_ANU_KEY_PREFIX 자체("c119085", 7자)는 노출 감지용 패턴으로만 써도 됨.
    # 전체 키 패턴(prefix + 추가 hex) 탐지:
    full_key_pattern = re.compile(r"c119085[0-9a-fA-F]{9,}")  # 7+9 = 16자 이상
    matches_a = full_key_pattern.findall(dump)
    count += len(matches_a)

    # (b) 환경의 COKACDIR_KEY_ANU 실 값이 출력에 포함되는지
    # 실키를 변수에 읽되, 절대 출력/print/log 하지 않음.
    real_key = (os.environ.get("COKACDIR_KEY_ANU") or "").strip()
    if real_key and real_key in dump:
        count += 1

    return count


# ── 공개 함수 9: main ─────────────────────────────────────────────────────────
def main(argv=None) -> int:
    """argparse: --json (summary JSON 출력).

    run_checks 실행 → summary 출력.
    return 0 if (all_pass and canonical_untouched and raw_key_exposure==0) else 1.
    """
    parser = argparse.ArgumentParser(
        prog="anu_pickup_p0b_audit_decision_check",
        description="ANU pickup P0-b audit-first decision-only 검증 하니스",
    )
    parser.add_argument(
        "--json",
        action="store_true",
        default=False,
        help="결과를 JSON 형식으로 출력",
    )
    args = parser.parse_args(argv)

    summary = run_checks()

    if args.json:
        # temp_roots 는 경로만 포함 (내용 없음)
        print(json.dumps(summary, ensure_ascii=False, indent=2))
    else:
        # 기본 텍스트 출력
        print(f"all_pass: {summary['all_pass']}")
        print(f"canonical_untouched: {summary['canonical_untouched']}")
        print(f"raw_key_exposure: {summary['raw_key_exposure']}")
        for c in summary["checks"]:
            status = "PASS" if c["passed"] else "FAIL"
            print(f"  [{status}] {c['name']} expected={c['expected']!r} actual={c['actual']!r} pickup_calls={c['pickup_calls']}")
        if summary["findings"]:
            print("findings:")
            for f in summary["findings"]:
                print(f"  - {f}")

    ok = (
        summary["all_pass"]
        and summary["canonical_untouched"]
        and summary["raw_key_exposure"] == 0
    )
    return 0 if ok else 1


if __name__ == "__main__":
    raise SystemExit(main())
