# -*- coding: utf-8 -*-
"""dispatch.anu_pickup_driver — task-2721 P0-b user-level systemd path driver.

default DISABLED. activation flag(memory/state/p0b_driver_enabled == "enabled") 부재 시 전면 no-op.
scan 한정: memory/events/task-*.result.json final 만. tmp/partial/다른 marker·jsonl·md → NOOP_NOT_TARGET.
6조건 전부 PASS 시에만 P0-a pickup_once(lock-free) 호출. pickup_once 가 ANU-owned wake argv(dry-run)를 빌드.
실제 cron 발사 0(P0-a dry_run=True/FIRE_NOT_ACTIVATED). ANU key literal 0 — .env.keys 런타임 로드만.
"""
from __future__ import annotations

import functools
import glob
import hmac
import json
import os
import shutil
import time
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from typing import Optional

# 재사용 모듈 (절대 수정 금지)
from dispatch.anu_result_pickup_runner import pickup_once, _default_sealed_key_loader  # pyright: ignore[reportMissingImports]
from dispatch.anu_pickup_wake_launcher import launch_wake  # pyright: ignore[reportMissingImports]
from dispatch.anu_owned_callback_enforcement import (  # pyright: ignore[reportMissingImports]
    CANONICAL_ROOT,
    VERDICT_AUTHORITATIVE,
    verify_collector_authoritative,
)

# ── 상수 ─────────────────────────────────────────────────────────────────────
DRIVER_NAME = "systemd-path"
ACTIVATION_FLAG_REL = "memory/state/p0b_driver_enabled"
EVENTS_DIR_REL = "memory/events"
QUARANTINE_DIR_REL = "memory/p0b_state/quarantine"
PROCESSED_DIR_REL = "memory/p0b_state/processed"
EVIDENCE_JSONL_REL = "memory/p0b_state/driver_runs.jsonl"
RESULT_GLOB = "task-*.result.json"
MAX_FILES = 50  # scan_once 단일 cycle 처리 상한 (안전한 상한). 초과분은 다음 cycle.
ACTIVATION_EPOCH_REL = "memory/state/p0b_activation_epoch"  # driver 읽기 전용 마커

# ── readiness grace window (write race 방어) ─────────────────────────────────
# writer flush 완료 전 부분 JSON 을 즉시 quarantine 하지 않기 위한 파라미터.
# 테스트에서 process_one kwargs(stable_sec/readiness_retries/readiness_interval/sleep_fn)로 주입 가능.
STABLE_SEC = 2.0            # mtime 이 now - STABLE_SEC 이내면 in-flight 가능성 → DEFER
STABILITY_RETRIES = 3      # size/mtime 안정성 재확인 stat 횟수 (총 window ≤ 2s)
STABILITY_INTERVAL_SEC = 0.2  # stat 샘플 간격(초)

# verdict
VERDICT_WAKE_BUILT = "WAKE_BUILT"
VERDICT_PICKUP_SKIP = "PICKUP_SKIP"
VERDICT_QUARANTINE = "QUARANTINE"
VERDICT_FIRE_FAILED = "FIRE_FAILED"
VERDICT_NOOP_DISABLED = "NOOP_DISABLED"
VERDICT_NOOP_NOT_TARGET = "NOOP_NOT_TARGET"
VERDICT_NOOP_NOT_READY = "NOOP_NOT_READY"  # readiness 미충족 → DEFER (wake 0/quarantine 0)
VERDICT_NOOP_LEGACY_SKIP = "NOOP_LEGACY_SKIP"   # activation_epoch 이전 result → move 0/wake 0/quarantine 0
VERDICT_NOOP_MAX_FILES_DEFER = "NOOP_MAX_FILES_DEFER"  # MAX_FILES 초과분 defer

# owner_key_class (literal 키 아님 — 분류 라벨만)
OKC_ANU = "ANU"
OKC_FOREIGN = "FOREIGN"
OKC_SELF = "SELF"

ACTIVATION_ENABLED = "enabled"
ACTIVATION_DISABLED = "disabled"

REAL_WAKE_FLAG_REL = "memory/state/p0b_real_wake_enabled"
REAL_WAKE_ENABLED = "enabled"

# 시간/KST
KST = timezone(timedelta(hours=9))

# pickup_once verdict 문자열 (P0-a 계약)
_PICKUP_WAKE_BUILT = "WAKE_BUILT"
_PICKUP_SKIP_TERMINAL = "SKIP_TERMINAL"
_PICKUP_SKIP_DEDUPE = "SKIP_DEDUPE"
_PICKUP_QUARANTINE = "QUARANTINE"
_PICKUP_PENDING = "PENDING_OWNER_PROOF"


# ── DriverRecord dataclass ────────────────────────────────────────────────────
@dataclass
class DriverRecord:
    ts: str
    result_path: str
    verdict: str
    owner_key_class: Optional[str] = None
    quarantined: bool = False
    quarantine_reason: Optional[str] = None
    fire_cron_id: Optional[str] = None
    retry: int = 0
    error: Optional[str] = None
    driver: str = DRIVER_NAME
    activation: str = ACTIVATION_DISABLED

    def to_json(self) -> dict:
        return {
            "ts": self.ts,
            "result_path": self.result_path,
            "verdict": self.verdict,
            "owner_key_class": self.owner_key_class,
            "quarantined": self.quarantined,
            "quarantine_reason": self.quarantine_reason,
            "fire_cron_id": self.fire_cron_id,
            "retry": self.retry,
            "error": self.error,
            "driver": self.driver,
            "activation": self.activation,
        }


# ── activation ────────────────────────────────────────────────────────────────
def read_activation(root: str = CANONICAL_ROOT, *, flag_reader=None) -> str:
    """flag 파일 첫 줄 trim 값 반환. 부재/읽기실패 → "" (=> disabled).
    flag_reader: 테스트 주입용 callable() -> Optional[str] (None 이면 실제 파일 읽기)."""
    if flag_reader is not None:
        try:
            val = flag_reader()
        except Exception:  # noqa: BLE001 — reader 실패 → disabled(fail-closed)
            return ""
        if val is None:
            return ""
        return str(val).splitlines()[0].strip() if str(val).strip() else str(val).strip()
    flag_path = os.path.join(root, ACTIVATION_FLAG_REL)
    try:
        with open(flag_path, "r", encoding="utf-8") as fh:
            first = fh.readline()
    except (OSError, ValueError):
        return ""
    return first.strip()


def is_activated(root: str = CANONICAL_ROOT, *, flag_reader=None) -> bool:
    return read_activation(root, flag_reader=flag_reader) == ACTIVATION_ENABLED


def read_activation_epoch(root: str = CANONICAL_ROOT, *, epoch_reader=None) -> Optional[float]:
    """activation_epoch 마커(memory/state/p0b_activation_epoch) 첫 줄을 unix timestamp(float)로 읽음.
    부재/읽기실패/파싱실패 → None (호출자는 fail-closed 처리). epoch_reader: 테스트 주입 callable()->Optional[str|float]."""
    if epoch_reader is not None:
        try:
            val = epoch_reader()
        except Exception:  # noqa: BLE001
            return None
        if val is None:
            return None
        try:
            return float(str(val).splitlines()[0].strip())
        except (ValueError, IndexError):
            return None
    epoch_path = os.path.join(root, ACTIVATION_EPOCH_REL)
    try:
        with open(epoch_path, "r", encoding="utf-8") as fh:
            first = fh.readline()
    except (OSError, ValueError):
        return None
    try:
        return float(first.strip())
    except ValueError:
        return None


# ── real-wake activation ──────────────────────────────────────────────────────
def read_real_wake_enabled(root: str = CANONICAL_ROOT, *, flag_reader=None) -> bool:
    """real-wake activation flag(memory/state/p0b_real_wake_enabled) 첫 줄 trim == "enabled" 일 때만 True.
    부재/읽기실패/그 외 값 → False (fail-closed). flag_reader: 테스트 주입 callable()->Optional[str]."""
    if flag_reader is not None:
        try:
            val = flag_reader()
        except Exception:  # noqa: BLE001
            return False
        if val is None:
            return False
        return str(val).splitlines()[0].strip() == REAL_WAKE_ENABLED if str(val).strip() else False
    flag_path = os.path.join(root, REAL_WAKE_FLAG_REL)
    try:
        with open(flag_path, "r", encoding="utf-8") as fh:
            first = fh.readline()
    except (OSError, ValueError):
        return False
    return first.strip() == REAL_WAKE_ENABLED


def build_launcher_fn(
    root: str = CANONICAL_ROOT,
    *,
    real_wake_reader=None,
    sealed_key_loader=None,
    launch_wake_fn=None,
):
    """real-wake flag on + sealed key 존재 시에만 sealed-key 기반 launcher_fn 반환.
    그 외(flag off / sealed key 부재) → None (현행 보존, launcher 함수 호출 0, real wake 0).

    fail-closed:
      - real-wake flag != "enabled" → None (sealed loader 조차 호출 안 함).
      - sealed key 부재(None/빈값) → None (real wake 0).
    verifier 는 sealed key 와의 상수시간 비교(hmac.compare_digest). 반환 launcher_fn 은
    functools.partial(launch_wake, dry_run=False, anu_key_verifier=verifier).
    """
    if not read_real_wake_enabled(root, flag_reader=real_wake_reader):
        return None
    loader = sealed_key_loader or _default_sealed_key_loader
    try:
        sealed_key = loader()
    except Exception:  # noqa: BLE001 — loader 실패 → fail-closed
        return None
    if not sealed_key:
        return None
    sealed_key_str = str(sealed_key)

    def _verifier(candidate) -> bool:
        try:
            return hmac.compare_digest(str(candidate), sealed_key_str)
        except Exception:  # noqa: BLE001
            return False

    lw = launch_wake_fn or launch_wake
    return functools.partial(lw, dry_run=False, anu_key_verifier=_verifier)


# ── target 판정 ────────────────────────────────────────────────────────────────
def is_target(path: str) -> bool:
    """final task-*.result.json 만 True. basename 이 'task-' 로 시작 + '.result.json' 으로 끝.
    '.result.json.tmp-...'/partial/.md/.jsonl/다른 marker → False."""
    if not path:
        return False
    base = os.path.basename(str(path))
    return base.startswith("task-") and base.endswith(".result.json")


# ── 내부 helper ────────────────────────────────────────────────────────────────
def _now_kst(clock) -> str:
    return clock().isoformat()


def _envelope_claim_class(envelope: dict) -> str:
    """envelope claim 으로 SELF/FOREIGN 라벨 추정 (판정 아님, 라벨링만)."""
    try:
        if bool(envelope.get("self_key_used")):
            return OKC_SELF
    except AttributeError:
        return OKC_FOREIGN
    return OKC_FOREIGN


def _collision_safe_dest(dest_dir: str, basename: str) -> str:
    """dest_dir/basename. 이미 존재하면 .{ms타임스탬프}[-n] suffix 로 충돌 회피."""
    dest = os.path.join(dest_dir, basename)
    if not os.path.exists(dest):
        return dest
    ts = str(int(time.time() * 1000))
    cand = os.path.join(dest_dir, f"{basename}.{ts}")
    n = 0
    while os.path.exists(cand):
        n += 1
        cand = os.path.join(dest_dir, f"{basename}.{ts}-{n}")
    return cand


def _move_processed(path: str, root: str, processed_dir: Optional[str] = None) -> Optional[str]:
    """terminal(WAKE_BUILT/PICKUP_SKIP) result 파일을 watched 밖 processed 디렉토리로
    atomic 이동. 성공 시 None, 실패 시 에러 메시지(str) 반환(fail-safe: 크래시 0).
    os.replace(같은 fs atomic) 우선, 실패 시 shutil.move fallback."""
    pdir = processed_dir or os.path.join(root, PROCESSED_DIR_REL)
    try:
        os.makedirs(pdir, exist_ok=True)
        dest = _collision_safe_dest(pdir, os.path.basename(path))
    except OSError as exc:
        return f"processed move 실패: {exc}"
    try:
        os.replace(path, dest)
        return None
    except OSError:
        try:
            shutil.move(path, dest)
            return None
        except (OSError, shutil.Error) as exc:
            return f"processed move 실패: {exc}"


def _quarantine_move(path: str, root: str, quarantine_dir: Optional[str]) -> Optional[str]:
    """result 파일을 quarantine 디렉토리로 이동. 실패 시 예외 메시지 반환(None=성공)."""
    qdir = quarantine_dir or os.path.join(root, QUARANTINE_DIR_REL)
    try:
        os.makedirs(qdir, exist_ok=True)
        dest = _collision_safe_dest(qdir, os.path.basename(path))
        shutil.move(path, dest)
        return None
    except (OSError, shutil.Error) as exc:
        return f"quarantine move 실패: {exc}"


def _append_evidence(record: DriverRecord, root: str, evidence_path: Optional[str]) -> None:
    """DriverRecord.to_json() 한 줄 JSON append. ANU key literal 절대 미기록."""
    ev = evidence_path or os.path.join(root, EVIDENCE_JSONL_REL)
    try:
        os.makedirs(os.path.dirname(ev), exist_ok=True)
        with open(ev, "a", encoding="utf-8") as fh:
            fh.write(json.dumps(record.to_json(), ensure_ascii=False) + "\n")
            fh.flush()
            os.fsync(fh.fileno())
    except OSError:
        pass  # evidence 기록 실패는 비치명 (fail-safe)


def _dedupe_hit(task_id: str, ledger_path: Optional[str], root: str) -> bool:
    """dedupe ledger 에 동일 task_id 의 PICKUP_WAKE_BUILT 항목 존재 여부."""
    ledger = ledger_path or os.path.join(
        root, "memory", "events", "callback_4tuple_index.jsonl"
    )
    if not os.path.isfile(ledger):
        return False
    try:
        with open(ledger, "r", encoding="utf-8") as fh:
            for line in fh:
                line = line.strip()
                if not line:
                    continue
                try:
                    entry = json.loads(line)
                except (ValueError, TypeError):
                    continue
                if (
                    isinstance(entry, dict)
                    and entry.get("event") == "PICKUP_WAKE_BUILT"
                    and entry.get("task_id") == task_id
                ):
                    return True
    except (OSError, ValueError):
        return False
    return False


def _legacy_cutoff_check(path, *, activation_epoch, stat_fn=None):
    """legacy cutoff 판정. skip 해야 하면 reason(str) 반환, 정상 진행이면 None.
    - activation_epoch is None (부재/불명확) → fail-open 금지 → skip("epoch_absent").
    - stat 실패 → skip("legacy_stat_fail") (보수적 skip — canonical 무수정).
    - result mtime < activation_epoch → skip("pre_activation_epoch") (legacy).
    - mtime >= activation_epoch → None (post-activation, 정상 decision path)."""
    if activation_epoch is None:
        return "epoch_absent"
    stat_fn = stat_fn or os.stat
    try:
        st = stat_fn(path)
    except OSError:
        return "legacy_stat_fail"
    if st.st_mtime < activation_epoch:
        return "pre_activation_epoch"
    return None


def _check_readiness(
    path: str,
    *,
    clock,
    stable_sec: float = STABLE_SEC,
    retries: int = STABILITY_RETRIES,
    interval: float = STABILITY_INTERVAL_SEC,
    sleep_fn=None,
    stat_fn=None,
) -> tuple:
    """write race 방어 readiness 판정. (ready: bool, reason: str) 반환.

    판정 규칙:
    - 파일 stat 실패(존재X 등) → (False, "stat_fail"): DEFER, 다음 트리거 재평가.
    - mtime 이 now - stable_sec 이내(최근 생성/수정) → (False, "recent_mtime"): writer 미완 가능 → DEFER.
    - size/mtime 안정성: 짧은 간격으로 최대 retries 회 stat 하여 (size, mtime) 불변 확인.
      마지막 두 샘플이 다르면 (False, "unstable"): 아직 쓰는 중 → DEFER.
    - 위 모두 통과(aged + stable) → (True, "ready").
    단순 무한 sleep 없음: 총 window = (retries-1) * interval ≤ 약 0.6s.
    """
    stat_fn = stat_fn or os.stat
    sleep_fn = sleep_fn or time.sleep

    samples = []
    for attempt in range(max(1, retries)):
        try:
            st = stat_fn(path)
        except OSError:
            return (False, "stat_fail")
        samples.append((st.st_size, st.st_mtime))
        # 직전 샘플과 동일하면 안정화 → 조기 종료
        if len(samples) >= 2 and samples[-1] == samples[-2]:
            break
        if attempt < retries - 1:
            sleep_fn(interval)

    mtime = samples[-1][1]
    age = clock().timestamp() - mtime
    if age < stable_sec:
        return (False, "recent_mtime")
    if len(samples) >= 2 and samples[-1] != samples[-2]:
        return (False, "unstable")
    return (True, "ready")


# ── process_one ────────────────────────────────────────────────────────────────
def process_one(
    path: str,
    *,
    root: str = CANONICAL_ROOT,
    pickup_fn=None,            # 기본 dispatch.anu_result_pickup_runner.pickup_once
    launcher_fn=None,          # 기본 None = 현행 보존(surface only). 주입 시 real wake 결선.
    verify_fn=None,            # 기본 verify_collector_authoritative
    probe=None,                # cron-history probe (owner proof 용). 기본 None
    clock=None,                # 기본 lambda: datetime.now(KST)
    executor_key: str = "",
    ledger_path: Optional[str] = None,
    quarantine_dir: Optional[str] = None,
    processed_dir: Optional[str] = None,
    write_evidence: bool = False,   # process_one 단독 호출 시 evidence append 여부
    evidence_path: Optional[str] = None,
    stable_sec: Optional[float] = None,
    readiness_retries: Optional[int] = None,
    readiness_interval: Optional[float] = None,
    sleep_fn=None,
    activation_epoch: Optional[float] = None,
    legacy_cutoff: bool = False,
    stat_fn=None,
) -> DriverRecord:
    """단일 result.json 처리. activation 은 호출자(scan_once)가 이미 보장 — 여기선 target/6조건만."""
    pickup_fn = pickup_fn or pickup_once
    verify_fn = verify_fn or verify_collector_authoritative
    clock = clock or (lambda: datetime.now(KST))

    ts = _now_kst(clock)

    def _emit(rec: DriverRecord) -> DriverRecord:
        if write_evidence:
            _append_evidence(rec, root, evidence_path)
        return rec

    def _quarantine(reason: str, owner_class: Optional[str] = None,
                    extra_err: Optional[str] = None) -> DriverRecord:
        move_err = _quarantine_move(path, root, quarantine_dir)
        err = extra_err
        if move_err:
            err = (err + "; " + move_err) if err else move_err
        return _emit(DriverRecord(
            ts=ts,
            result_path=path,
            verdict=VERDICT_QUARANTINE,
            owner_key_class=owner_class,
            quarantined=True,
            quarantine_reason=reason,
            error=err,
            activation=ACTIVATION_ENABLED,
        ))

    # ── target 아니면 즉시 반환 ───────────────────────────────────────────────
    if not is_target(path):
        return _emit(DriverRecord(
            ts=ts,
            result_path=path,
            verdict=VERDICT_NOOP_NOT_TARGET,
            activation=ACTIVATION_ENABLED,
        ))

    # ── D2 legacy cutoff (activation_epoch 기준) ──────────────────────────────
    # legacy_cutoff 활성 시: activation_epoch 이전 생성 result 는 canonical 무수정 skip.
    #   epoch 부재/불명확 → fail-open 금지(epoch_absent skip). move 0·wake 0·quarantine 0.
    if legacy_cutoff:
        skip_reason = _legacy_cutoff_check(path, activation_epoch=activation_epoch, stat_fn=stat_fn)
        if skip_reason is not None:
            return _emit(DriverRecord(
                ts=ts,
                result_path=path,
                verdict=VERDICT_NOOP_LEGACY_SKIP,
                quarantined=False,
                quarantine_reason=skip_reason,
                activation=ACTIVATION_ENABLED,
            ))

    # ── readiness grace window (write race 방어) ──────────────────────────────
    # ★ 부분 JSON 을 즉시 quarantine 하지 않는다. writer flush 미완 가능성을 먼저 배제.
    #   readiness 미충족(최근 mtime / size·mtime 불안정 / stat 실패) → NOOP_NOT_READY(DEFER):
    #   wake/quarantine 모두 금지. 다음 트리거/재시도에서 재평가.
    ready, ready_reason = _check_readiness(
        path,
        clock=clock,
        stable_sec=STABLE_SEC if stable_sec is None else stable_sec,
        retries=STABILITY_RETRIES if readiness_retries is None else readiness_retries,
        interval=STABILITY_INTERVAL_SEC if readiness_interval is None else readiness_interval,
        sleep_fn=sleep_fn,
        stat_fn=stat_fn,
    )
    if not ready:
        return _emit(DriverRecord(
            ts=ts,
            result_path=path,
            verdict=VERDICT_NOOP_NOT_READY,
            quarantined=False,
            quarantine_reason=ready_reason,
            activation=ACTIVATION_ENABLED,
        ))

    # ── 조건 1: size > 0 ──────────────────────────────────────────────────────
    try:
        size = os.path.getsize(path)
    except OSError as exc:
        return _quarantine("size0", extra_err=f"getsize 실패: {exc}")
    if size <= 0:
        return _quarantine("size0")

    # ── 조건 2: JSON parse (+ null byte truncation 방어) ──────────────────────
    try:
        with open(path, "rb") as fh:
            raw = fh.read()
    except OSError as exc:
        return _quarantine("parse_fail", extra_err=f"read: {exc}")
    # ★ null byte(\x00) 는 부분 write/truncation 흔적. grace 후(readiness 통과)에도
    #   잔존하면 quarantine. (grace 내 최근 mtime 은 위 readiness 에서 이미 DEFER.)
    if b"\x00" in raw:
        return _quarantine("null_byte")
    try:
        result = json.loads(raw.decode("utf-8"))
    except (ValueError, UnicodeDecodeError) as exc:
        return _quarantine("parse_fail", extra_err=f"parse: {exc}")

    # ── 조건 3: schema ────────────────────────────────────────────────────────
    if not isinstance(result, dict):
        return _quarantine("schema_fail")
    task_id = result.get("task_id")
    completion_signal = result.get("completion_signal")
    if not (isinstance(task_id, str) and task_id.strip()):
        return _quarantine("schema_fail")
    if not (isinstance(completion_signal, str) and completion_signal.strip()):
        return _quarantine("schema_fail")
    task_id = task_id.strip()
    # ★ path traversal 방어 (owner proof 전, defense-in-depth):
    # task_id 가 done_path=os.path.join(result_dir, f"{task_id}.pickup.done") 에
    # 직접 쓰이므로, 경로 탐색 문자 포함 시 schema_fail quarantine.
    if (
        os.path.basename(task_id) != task_id
        or ".." in task_id
        or "/" in task_id
        or "\\" in task_id
    ):
        return _quarantine("schema_fail")

    # ── 조건 4&5: owner proof + self/foreign 아님 ────────────────────────────
    envelope = result.get("collector_envelope")
    if not isinstance(envelope, dict):
        # envelope 부재 → owner_unprovable. claim 추정 불가하므로 FOREIGN 보수 라벨.
        return _quarantine("owner_unprovable", owner_class=OKC_FOREIGN)

    schedule_id = envelope.get("schedule_id") or result.get("schedule_id")
    try:
        v = verify_fn(
            task_id=task_id,
            envelope=envelope,
            probe=probe,
            executor_key=executor_key,
            schedule_id=schedule_id,
            now=clock(),
        )
    except Exception as exc:  # noqa: BLE001 — verify 예외 → owner_proof_error 격리
        return _quarantine("owner_proof_error",
                           owner_class=_envelope_claim_class(envelope),
                           extra_err=f"verify_fn 예외: {exc}")

    if getattr(v, "verdict", None) != VERDICT_AUTHORITATIVE:
        return _quarantine("owner_proof_fail",
                           owner_class=_envelope_claim_class(envelope))

    owner_class = OKC_ANU

    # ── 조건 6: dedupe/done/acked ─────────────────────────────────────────────
    result_dir = os.path.dirname(path)
    done_path = os.path.join(result_dir, f"{task_id}.pickup.done")
    acked_path = os.path.join(result_dir, f"{task_id}.pickup.acked")
    if (
        os.path.exists(done_path)
        or os.path.exists(acked_path)
        or _dedupe_hit(task_id, ledger_path, root)
    ):
        move_err = _move_processed(path, root, processed_dir)
        return _emit(DriverRecord(
            ts=ts,
            result_path=path,
            verdict=VERDICT_PICKUP_SKIP,
            owner_key_class=owner_class,
            quarantined=False,
            error=move_err,
            activation=ACTIVATION_ENABLED,
        ))

    # ── 6조건 전부 통과 → pickup_fn 호출 (P0-a, lock-free) ───────────────────
    try:
        res = pickup_fn(path, executor_key=executor_key, ledger_path=ledger_path)
    except Exception as exc:  # noqa: BLE001 — pickup 예외 → FIRE_FAILED (파일 미이동)
        return _emit(DriverRecord(
            ts=ts,
            result_path=path,
            verdict=VERDICT_FIRE_FAILED,
            owner_key_class=owner_class,
            error=f"pickup_fn 예외: {exc}",
            activation=ACTIVATION_ENABLED,
        ))

    pv = getattr(res, "verdict", None)
    if pv == _PICKUP_WAKE_BUILT:
        # ★ launcher_fn 미주입(기본 None) → 현행 보존(surface only, 실행 0).
        #   주입 시에만 res.argv 를 launcher 로 전달(real wake 결선). driver=decision-only.
        fire_cron_id = None
        launch_err = None
        if launcher_fn is not None:
            try:
                lr = launcher_fn(
                    getattr(res, "argv", None),
                    task_id=task_id,
                    sha256=getattr(res, "sha256", ""),
                )
                fire_cron_id = getattr(lr, "decision", None)  # 라벨만 (argv/key 0)
            except Exception as exc:  # noqa: BLE001 — launcher 예외 → fail-safe(크래시 0)
                launch_err = f"launcher 예외: {exc}"
        move_err = _move_processed(path, root, processed_dir)
        err = "; ".join(x for x in (launch_err, move_err) if x) or None
        return _emit(DriverRecord(
            ts=ts,
            result_path=path,
            verdict=VERDICT_WAKE_BUILT,
            owner_key_class=OKC_ANU,
            fire_cron_id=fire_cron_id,
            error=err,
            activation=ACTIVATION_ENABLED,
        ))
    if pv in (_PICKUP_SKIP_TERMINAL, _PICKUP_SKIP_DEDUPE):
        move_err = _move_processed(path, root, processed_dir)
        return _emit(DriverRecord(
            ts=ts,
            result_path=path,
            verdict=VERDICT_PICKUP_SKIP,
            owner_key_class=owner_class,
            error=move_err,
            activation=ACTIVATION_ENABLED,
        ))
    if pv in (_PICKUP_QUARANTINE, _PICKUP_PENDING):
        return _quarantine("pickup_" + str(pv).lower(), owner_class=owner_class)

    # 그 외 (FAIL / REJECT / SEALED_KEY_MISSING / NO_RESULT_JSON) → FIRE_FAILED.
    reasons = getattr(res, "reasons", None)
    err = "; ".join(reasons) if isinstance(reasons, list) and reasons else f"pickup verdict={pv}"
    return _emit(DriverRecord(
        ts=ts,
        result_path=path,
        verdict=VERDICT_FIRE_FAILED,
        owner_key_class=owner_class,
        error=err,
        activation=ACTIVATION_ENABLED,
    ))


# ── scan_once ───────────────────────────────────────────────────────────────────
def scan_once(
    root: str = CANONICAL_ROOT,
    *,
    pickup_fn=None,
    launcher_fn=None,          # 기본 None = 현행 보존. process_one 으로 그대로 전달.
    verify_fn=None,
    probe=None,
    clock=None,
    executor_key: str = "",
    ledger_path: Optional[str] = None,
    quarantine_dir: Optional[str] = None,
    processed_dir: Optional[str] = None,
    paths: Optional[list] = None,   # None 이면 glob(root/memory/events/task-*.result.json), 주어지면 그 목록
    flag_reader=None,
    write_evidence: bool = True,
    evidence_path: Optional[str] = None,
    stable_sec: Optional[float] = None,
    readiness_retries: Optional[int] = None,
    readiness_interval: Optional[float] = None,
    sleep_fn=None,
    max_files: Optional[int] = None,
    legacy_cutoff: bool = False,
    activation_epoch: Optional[float] = None,
    epoch_reader=None,
    stat_fn=None,
) -> list:
    """진입점. (1) activation 재확인: disabled → [NOOP_DISABLED] 1건 + evidence + pickup 미호출.
    (2) enabled → 후보 경로 결정 → 각 path process_one → record 목록 반환. 각 record evidence append."""
    clock = clock or (lambda: datetime.now(KST))

    # (1) activation 재확인
    if not is_activated(root, flag_reader=flag_reader):
        rec = DriverRecord(
            ts=_now_kst(clock),
            result_path="",
            verdict=VERDICT_NOOP_DISABLED,
            activation=ACTIVATION_DISABLED,
        )
        if write_evidence:
            _append_evidence(rec, root, evidence_path)
        return [rec]

    # (2) enabled → 후보 경로 결정
    if paths is None:
        pattern = os.path.join(root, EVENTS_DIR_REL, RESULT_GLOB)
        candidates = sorted(glob.glob(pattern))
    else:
        candidates = list(paths)

    # D2: legacy_cutoff 활성 시 activation_epoch 해석 (인자 우선, 없으면 마커 파일)
    epoch = activation_epoch
    if legacy_cutoff and epoch is None:
        epoch = read_activation_epoch(root, epoch_reader=epoch_reader)

    # D1: MAX_FILES 상한 — 초과분은 다음 cycle (전수 처리 금지, bounded)
    limit = MAX_FILES if max_files is None else max_files
    deferred = []
    if limit is not None and len(candidates) > limit:
        deferred = candidates[limit:]
        candidates = candidates[:limit]

    records = []
    for p in candidates:
        rec = process_one(
            p,
            root=root,
            pickup_fn=pickup_fn,
            launcher_fn=launcher_fn,
            verify_fn=verify_fn,
            probe=probe,
            clock=clock,
            executor_key=executor_key,
            ledger_path=ledger_path,
            quarantine_dir=quarantine_dir,
            processed_dir=processed_dir,
            write_evidence=write_evidence,
            evidence_path=evidence_path,
            stable_sec=stable_sec,
            readiness_retries=readiness_retries,
            readiness_interval=readiness_interval,
            sleep_fn=sleep_fn,
            activation_epoch=epoch,
            legacy_cutoff=legacy_cutoff,
            stat_fn=stat_fn,
        )
        records.append(rec)

    if deferred:
        defer_rec = DriverRecord(
            ts=_now_kst(clock),
            result_path="",
            verdict=VERDICT_NOOP_MAX_FILES_DEFER,
            quarantined=False,
            quarantine_reason=f"max_files_defer:{len(deferred)}",
            activation=ACTIVATION_ENABLED,
        )
        if write_evidence:
            _append_evidence(defer_rec, root, evidence_path)
        records.append(defer_rec)
    return records


# ── CLI ───────────────────────────────────────────────────────────────────────
def main(argv=None) -> int:
    launcher_fn = build_launcher_fn(CANONICAL_ROOT)
    scan_once(CANONICAL_ROOT, legacy_cutoff=True, launcher_fn=launcher_fn)
    return 0


__all__ = [
    "DRIVER_NAME",
    "ACTIVATION_FLAG_REL",
    "EVENTS_DIR_REL",
    "QUARANTINE_DIR_REL",
    "PROCESSED_DIR_REL",
    "EVIDENCE_JSONL_REL",
    "RESULT_GLOB",
    "MAX_FILES",
    "ACTIVATION_EPOCH_REL",
    "STABLE_SEC",
    "STABILITY_RETRIES",
    "STABILITY_INTERVAL_SEC",
    "VERDICT_WAKE_BUILT",
    "VERDICT_PICKUP_SKIP",
    "VERDICT_QUARANTINE",
    "VERDICT_FIRE_FAILED",
    "VERDICT_NOOP_DISABLED",
    "VERDICT_NOOP_NOT_TARGET",
    "VERDICT_NOOP_NOT_READY",
    "VERDICT_NOOP_LEGACY_SKIP",
    "VERDICT_NOOP_MAX_FILES_DEFER",
    "OKC_ANU",
    "OKC_FOREIGN",
    "OKC_SELF",
    "ACTIVATION_ENABLED",
    "ACTIVATION_DISABLED",
    "KST",
    "DriverRecord",
    "REAL_WAKE_FLAG_REL",
    "REAL_WAKE_ENABLED",
    "read_activation",
    "is_activated",
    "read_activation_epoch",
    "read_real_wake_enabled",
    "build_launcher_fn",
    "is_target",
    "_legacy_cutoff_check",
    "_check_readiness",
    "process_one",
    "scan_once",
    "main",
]

if __name__ == "__main__":
    raise SystemExit(main())
