# -*- coding: utf-8 -*-
"""dispatch.anu_pickup_driver — task-2721 P0-b user-level systemd path driver.

default DISABLED. activation flag(memory/state/p0b_driver_enabled == "enabled") 부재 시 전면 no-op.
scan 한정: memory/events/task-*.result.json final 만. tmp/partial/다른 marker·jsonl·md → NOOP_NOT_TARGET.
6조건 전부 PASS 시에만 P0-a pickup_once(lock-free) 호출. pickup_once 가 ANU-owned wake argv(dry-run)를 빌드.
실제 cron 발사 0(P0-a dry_run=True/FIRE_NOT_ACTIVATED). ANU key literal 0 — .env.keys 런타임 로드만.
"""
from __future__ import annotations

import glob
import json
import os
import shutil
import time
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from typing import Optional

# 재사용 모듈 (절대 수정 금지)
from dispatch.anu_result_pickup_runner import pickup_once  # pyright: ignore[reportMissingImports]
from dispatch.anu_owned_callback_enforcement import (  # pyright: ignore[reportMissingImports]
    CANONICAL_ROOT,
    VERDICT_AUTHORITATIVE,
    verify_collector_authoritative,
)

# ── 상수 ─────────────────────────────────────────────────────────────────────
DRIVER_NAME = "systemd-path"
ACTIVATION_FLAG_REL = "memory/state/p0b_driver_enabled"
EVENTS_DIR_REL = "memory/events"
QUARANTINE_DIR_REL = "memory/p0b_state/quarantine"
PROCESSED_DIR_REL = "memory/p0b_state/processed"
EVIDENCE_JSONL_REL = "memory/p0b_state/driver_runs.jsonl"
RESULT_GLOB = "task-*.result.json"

# ── readiness grace window (write race 방어) ─────────────────────────────────
# writer flush 완료 전 부분 JSON 을 즉시 quarantine 하지 않기 위한 파라미터.
# 테스트에서 process_one kwargs(stable_sec/readiness_retries/readiness_interval/sleep_fn)로 주입 가능.
STABLE_SEC = 2.0            # mtime 이 now - STABLE_SEC 이내면 in-flight 가능성 → DEFER
STABILITY_RETRIES = 3      # size/mtime 안정성 재확인 stat 횟수 (총 window ≤ 2s)
STABILITY_INTERVAL_SEC = 0.2  # stat 샘플 간격(초)

# verdict
VERDICT_WAKE_BUILT = "WAKE_BUILT"
VERDICT_PICKUP_SKIP = "PICKUP_SKIP"
VERDICT_QUARANTINE = "QUARANTINE"
VERDICT_FIRE_FAILED = "FIRE_FAILED"
VERDICT_NOOP_DISABLED = "NOOP_DISABLED"
VERDICT_NOOP_NOT_TARGET = "NOOP_NOT_TARGET"
VERDICT_NOOP_NOT_READY = "NOOP_NOT_READY"  # readiness 미충족 → DEFER (wake 0/quarantine 0)

# owner_key_class (literal 키 아님 — 분류 라벨만)
OKC_ANU = "ANU"
OKC_FOREIGN = "FOREIGN"
OKC_SELF = "SELF"

ACTIVATION_ENABLED = "enabled"
ACTIVATION_DISABLED = "disabled"

# 시간/KST
KST = timezone(timedelta(hours=9))

# pickup_once verdict 문자열 (P0-a 계약)
_PICKUP_WAKE_BUILT = "WAKE_BUILT"
_PICKUP_SKIP_TERMINAL = "SKIP_TERMINAL"
_PICKUP_SKIP_DEDUPE = "SKIP_DEDUPE"
_PICKUP_QUARANTINE = "QUARANTINE"
_PICKUP_PENDING = "PENDING_OWNER_PROOF"


# ── DriverRecord dataclass ────────────────────────────────────────────────────
@dataclass
class DriverRecord:
    ts: str
    result_path: str
    verdict: str
    owner_key_class: Optional[str] = None
    quarantined: bool = False
    quarantine_reason: Optional[str] = None
    fire_cron_id: Optional[str] = None
    retry: int = 0
    error: Optional[str] = None
    driver: str = DRIVER_NAME
    activation: str = ACTIVATION_DISABLED

    def to_json(self) -> dict:
        return {
            "ts": self.ts,
            "result_path": self.result_path,
            "verdict": self.verdict,
            "owner_key_class": self.owner_key_class,
            "quarantined": self.quarantined,
            "quarantine_reason": self.quarantine_reason,
            "fire_cron_id": self.fire_cron_id,
            "retry": self.retry,
            "error": self.error,
            "driver": self.driver,
            "activation": self.activation,
        }


# ── activation ────────────────────────────────────────────────────────────────
def read_activation(root: str = CANONICAL_ROOT, *, flag_reader=None) -> str:
    """flag 파일 첫 줄 trim 값 반환. 부재/읽기실패 → "" (=> disabled).
    flag_reader: 테스트 주입용 callable() -> Optional[str] (None 이면 실제 파일 읽기)."""
    if flag_reader is not None:
        try:
            val = flag_reader()
        except Exception:  # noqa: BLE001 — reader 실패 → disabled(fail-closed)
            return ""
        if val is None:
            return ""
        return str(val).splitlines()[0].strip() if str(val).strip() else str(val).strip()
    flag_path = os.path.join(root, ACTIVATION_FLAG_REL)
    try:
        with open(flag_path, "r", encoding="utf-8") as fh:
            first = fh.readline()
    except (OSError, ValueError):
        return ""
    return first.strip()


def is_activated(root: str = CANONICAL_ROOT, *, flag_reader=None) -> bool:
    return read_activation(root, flag_reader=flag_reader) == ACTIVATION_ENABLED


# ── target 판정 ────────────────────────────────────────────────────────────────
def is_target(path: str) -> bool:
    """final task-*.result.json 만 True. basename 이 'task-' 로 시작 + '.result.json' 으로 끝.
    '.result.json.tmp-...'/partial/.md/.jsonl/다른 marker → False."""
    if not path:
        return False
    base = os.path.basename(str(path))
    return base.startswith("task-") and base.endswith(".result.json")


# ── 내부 helper ────────────────────────────────────────────────────────────────
def _now_kst(clock) -> str:
    return clock().isoformat()


def _envelope_claim_class(envelope: dict) -> str:
    """envelope claim 으로 SELF/FOREIGN 라벨 추정 (판정 아님, 라벨링만)."""
    try:
        if bool(envelope.get("self_key_used")):
            return OKC_SELF
    except AttributeError:
        return OKC_FOREIGN
    return OKC_FOREIGN


def _collision_safe_dest(dest_dir: str, basename: str) -> str:
    """dest_dir/basename. 이미 존재하면 .{ms타임스탬프}[-n] suffix 로 충돌 회피."""
    dest = os.path.join(dest_dir, basename)
    if not os.path.exists(dest):
        return dest
    ts = str(int(time.time() * 1000))
    cand = os.path.join(dest_dir, f"{basename}.{ts}")
    n = 0
    while os.path.exists(cand):
        n += 1
        cand = os.path.join(dest_dir, f"{basename}.{ts}-{n}")
    return cand


def _move_processed(path: str, root: str, processed_dir: Optional[str] = None) -> Optional[str]:
    """terminal(WAKE_BUILT/PICKUP_SKIP) result 파일을 watched 밖 processed 디렉토리로
    atomic 이동. 성공 시 None, 실패 시 에러 메시지(str) 반환(fail-safe: 크래시 0).
    os.replace(같은 fs atomic) 우선, 실패 시 shutil.move fallback."""
    pdir = processed_dir or os.path.join(root, PROCESSED_DIR_REL)
    try:
        os.makedirs(pdir, exist_ok=True)
        dest = _collision_safe_dest(pdir, os.path.basename(path))
    except OSError as exc:
        return f"processed move 실패: {exc}"
    try:
        os.replace(path, dest)
        return None
    except OSError:
        try:
            shutil.move(path, dest)
            return None
        except (OSError, shutil.Error) as exc:
            return f"processed move 실패: {exc}"


def _quarantine_move(path: str, root: str, quarantine_dir: Optional[str]) -> Optional[str]:
    """result 파일을 quarantine 디렉토리로 이동. 실패 시 예외 메시지 반환(None=성공)."""
    qdir = quarantine_dir or os.path.join(root, QUARANTINE_DIR_REL)
    try:
        os.makedirs(qdir, exist_ok=True)
        dest = _collision_safe_dest(qdir, os.path.basename(path))
        shutil.move(path, dest)
        return None
    except (OSError, shutil.Error) as exc:
        return f"quarantine move 실패: {exc}"


def _append_evidence(record: DriverRecord, root: str, evidence_path: Optional[str]) -> None:
    """DriverRecord.to_json() 한 줄 JSON append. ANU key literal 절대 미기록."""
    ev = evidence_path or os.path.join(root, EVIDENCE_JSONL_REL)
    try:
        os.makedirs(os.path.dirname(ev), exist_ok=True)
        with open(ev, "a", encoding="utf-8") as fh:
            fh.write(json.dumps(record.to_json(), ensure_ascii=False) + "\n")
            fh.flush()
            os.fsync(fh.fileno())
    except OSError:
        pass  # evidence 기록 실패는 비치명 (fail-safe)


def _dedupe_hit(task_id: str, ledger_path: Optional[str], root: str) -> bool:
    """dedupe ledger 에 동일 task_id 의 PICKUP_WAKE_BUILT 항목 존재 여부."""
    ledger = ledger_path or os.path.join(
        root, "memory", "events", "callback_4tuple_index.jsonl"
    )
    if not os.path.isfile(ledger):
        return False
    try:
        with open(ledger, "r", encoding="utf-8") as fh:
            for line in fh:
                line = line.strip()
                if not line:
                    continue
                try:
                    entry = json.loads(line)
                except (ValueError, TypeError):
                    continue
                if (
                    isinstance(entry, dict)
                    and entry.get("event") == "PICKUP_WAKE_BUILT"
                    and entry.get("task_id") == task_id
                ):
                    return True
    except (OSError, ValueError):
        return False
    return False


def _check_readiness(
    path: str,
    *,
    clock,
    stable_sec: float = STABLE_SEC,
    retries: int = STABILITY_RETRIES,
    interval: float = STABILITY_INTERVAL_SEC,
    sleep_fn=None,
    stat_fn=None,
) -> tuple:
    """write race 방어 readiness 판정. (ready: bool, reason: str) 반환.

    판정 규칙:
    - 파일 stat 실패(존재X 등) → (False, "stat_fail"): DEFER, 다음 트리거 재평가.
    - mtime 이 now - stable_sec 이내(최근 생성/수정) → (False, "recent_mtime"): writer 미완 가능 → DEFER.
    - size/mtime 안정성: 짧은 간격으로 최대 retries 회 stat 하여 (size, mtime) 불변 확인.
      마지막 두 샘플이 다르면 (False, "unstable"): 아직 쓰는 중 → DEFER.
    - 위 모두 통과(aged + stable) → (True, "ready").
    단순 무한 sleep 없음: 총 window = (retries-1) * interval ≤ 약 0.6s.
    """
    stat_fn = stat_fn or os.stat
    sleep_fn = sleep_fn or time.sleep

    samples = []
    for attempt in range(max(1, retries)):
        try:
            st = stat_fn(path)
        except OSError:
            return (False, "stat_fail")
        samples.append((st.st_size, st.st_mtime))
        # 직전 샘플과 동일하면 안정화 → 조기 종료
        if len(samples) >= 2 and samples[-1] == samples[-2]:
            break
        if attempt < retries - 1:
            sleep_fn(interval)

    mtime = samples[-1][1]
    age = clock().timestamp() - mtime
    if age < stable_sec:
        return (False, "recent_mtime")
    if len(samples) >= 2 and samples[-1] != samples[-2]:
        return (False, "unstable")
    return (True, "ready")


# ── process_one ────────────────────────────────────────────────────────────────
def process_one(
    path: str,
    *,
    root: str = CANONICAL_ROOT,
    pickup_fn=None,            # 기본 dispatch.anu_result_pickup_runner.pickup_once
    verify_fn=None,            # 기본 verify_collector_authoritative
    probe=None,                # cron-history probe (owner proof 용). 기본 None
    clock=None,                # 기본 lambda: datetime.now(KST)
    executor_key: str = "",
    ledger_path: Optional[str] = None,
    quarantine_dir: Optional[str] = None,
    processed_dir: Optional[str] = None,
    write_evidence: bool = False,   # process_one 단독 호출 시 evidence append 여부
    evidence_path: Optional[str] = None,
    stable_sec: Optional[float] = None,
    readiness_retries: Optional[int] = None,
    readiness_interval: Optional[float] = None,
    sleep_fn=None,
) -> DriverRecord:
    """단일 result.json 처리. activation 은 호출자(scan_once)가 이미 보장 — 여기선 target/6조건만."""
    pickup_fn = pickup_fn or pickup_once
    verify_fn = verify_fn or verify_collector_authoritative
    clock = clock or (lambda: datetime.now(KST))

    ts = _now_kst(clock)

    def _emit(rec: DriverRecord) -> DriverRecord:
        if write_evidence:
            _append_evidence(rec, root, evidence_path)
        return rec

    def _quarantine(reason: str, owner_class: Optional[str] = None,
                    extra_err: Optional[str] = None) -> DriverRecord:
        move_err = _quarantine_move(path, root, quarantine_dir)
        err = extra_err
        if move_err:
            err = (err + "; " + move_err) if err else move_err
        return _emit(DriverRecord(
            ts=ts,
            result_path=path,
            verdict=VERDICT_QUARANTINE,
            owner_key_class=owner_class,
            quarantined=True,
            quarantine_reason=reason,
            error=err,
            activation=ACTIVATION_ENABLED,
        ))

    # ── target 아니면 즉시 반환 ───────────────────────────────────────────────
    if not is_target(path):
        return _emit(DriverRecord(
            ts=ts,
            result_path=path,
            verdict=VERDICT_NOOP_NOT_TARGET,
            activation=ACTIVATION_ENABLED,
        ))

    # ── readiness grace window (write race 방어) ──────────────────────────────
    # ★ 부분 JSON 을 즉시 quarantine 하지 않는다. writer flush 미완 가능성을 먼저 배제.
    #   readiness 미충족(최근 mtime / size·mtime 불안정 / stat 실패) → NOOP_NOT_READY(DEFER):
    #   wake/quarantine 모두 금지. 다음 트리거/재시도에서 재평가.
    ready, ready_reason = _check_readiness(
        path,
        clock=clock,
        stable_sec=STABLE_SEC if stable_sec is None else stable_sec,
        retries=STABILITY_RETRIES if readiness_retries is None else readiness_retries,
        interval=STABILITY_INTERVAL_SEC if readiness_interval is None else readiness_interval,
        sleep_fn=sleep_fn,
    )
    if not ready:
        return _emit(DriverRecord(
            ts=ts,
            result_path=path,
            verdict=VERDICT_NOOP_NOT_READY,
            quarantined=False,
            quarantine_reason=ready_reason,
            activation=ACTIVATION_ENABLED,
        ))

    # ── 조건 1: size > 0 ──────────────────────────────────────────────────────
    try:
        size = os.path.getsize(path)
    except OSError as exc:
        return _quarantine("size0", extra_err=f"getsize 실패: {exc}")
    if size <= 0:
        return _quarantine("size0")

    # ── 조건 2: JSON parse (+ null byte truncation 방어) ──────────────────────
    try:
        with open(path, "rb") as fh:
            raw = fh.read()
    except OSError as exc:
        return _quarantine("parse_fail", extra_err=f"read: {exc}")
    # ★ null byte(\x00) 는 부분 write/truncation 흔적. grace 후(readiness 통과)에도
    #   잔존하면 quarantine. (grace 내 최근 mtime 은 위 readiness 에서 이미 DEFER.)
    if b"\x00" in raw:
        return _quarantine("null_byte")
    try:
        result = json.loads(raw.decode("utf-8"))
    except (ValueError, UnicodeDecodeError) as exc:
        return _quarantine("parse_fail", extra_err=f"parse: {exc}")

    # ── 조건 3: schema ────────────────────────────────────────────────────────
    if not isinstance(result, dict):
        return _quarantine("schema_fail")
    task_id = result.get("task_id")
    completion_signal = result.get("completion_signal")
    if not (isinstance(task_id, str) and task_id.strip()):
        return _quarantine("schema_fail")
    if not (isinstance(completion_signal, str) and completion_signal.strip()):
        return _quarantine("schema_fail")
    task_id = task_id.strip()
    # ★ path traversal 방어 (owner proof 전, defense-in-depth):
    # task_id 가 done_path=os.path.join(result_dir, f"{task_id}.pickup.done") 에
    # 직접 쓰이므로, 경로 탐색 문자 포함 시 schema_fail quarantine.
    if (
        os.path.basename(task_id) != task_id
        or ".." in task_id
        or "/" in task_id
        or "\\" in task_id
    ):
        return _quarantine("schema_fail")

    # ── 조건 4&5: owner proof + self/foreign 아님 ────────────────────────────
    envelope = result.get("collector_envelope")
    if not isinstance(envelope, dict):
        # envelope 부재 → owner_unprovable. claim 추정 불가하므로 FOREIGN 보수 라벨.
        return _quarantine("owner_unprovable", owner_class=OKC_FOREIGN)

    schedule_id = envelope.get("schedule_id") or result.get("schedule_id")
    try:
        v = verify_fn(
            task_id=task_id,
            envelope=envelope,
            probe=probe,
            executor_key=executor_key,
            schedule_id=schedule_id,
            now=clock(),
        )
    except Exception as exc:  # noqa: BLE001 — verify 예외 → owner_proof_error 격리
        return _quarantine("owner_proof_error",
                           owner_class=_envelope_claim_class(envelope),
                           extra_err=f"verify_fn 예외: {exc}")

    if getattr(v, "verdict", None) != VERDICT_AUTHORITATIVE:
        return _quarantine("owner_proof_fail",
                           owner_class=_envelope_claim_class(envelope))

    owner_class = OKC_ANU

    # ── 조건 6: dedupe/done/acked ─────────────────────────────────────────────
    result_dir = os.path.dirname(path)
    done_path = os.path.join(result_dir, f"{task_id}.pickup.done")
    acked_path = os.path.join(result_dir, f"{task_id}.pickup.acked")
    if (
        os.path.exists(done_path)
        or os.path.exists(acked_path)
        or _dedupe_hit(task_id, ledger_path, root)
    ):
        move_err = _move_processed(path, root, processed_dir)
        return _emit(DriverRecord(
            ts=ts,
            result_path=path,
            verdict=VERDICT_PICKUP_SKIP,
            owner_key_class=owner_class,
            quarantined=False,
            error=move_err,
            activation=ACTIVATION_ENABLED,
        ))

    # ── 6조건 전부 통과 → pickup_fn 호출 (P0-a, lock-free) ───────────────────
    try:
        res = pickup_fn(path, executor_key=executor_key, ledger_path=ledger_path)
    except Exception as exc:  # noqa: BLE001 — pickup 예외 → FIRE_FAILED (파일 미이동)
        return _emit(DriverRecord(
            ts=ts,
            result_path=path,
            verdict=VERDICT_FIRE_FAILED,
            owner_key_class=owner_class,
            error=f"pickup_fn 예외: {exc}",
            activation=ACTIVATION_ENABLED,
        ))

    pv = getattr(res, "verdict", None)
    if pv == _PICKUP_WAKE_BUILT:
        # ★ driver 는 argv 를 실행하지 않음 (P0-a dry_run, 실제 wake 0).
        move_err = _move_processed(path, root, processed_dir)
        return _emit(DriverRecord(
            ts=ts,
            result_path=path,
            verdict=VERDICT_WAKE_BUILT,
            owner_key_class=OKC_ANU,
            fire_cron_id=None,
            error=move_err,
            activation=ACTIVATION_ENABLED,
        ))
    if pv in (_PICKUP_SKIP_TERMINAL, _PICKUP_SKIP_DEDUPE):
        move_err = _move_processed(path, root, processed_dir)
        return _emit(DriverRecord(
            ts=ts,
            result_path=path,
            verdict=VERDICT_PICKUP_SKIP,
            owner_key_class=owner_class,
            error=move_err,
            activation=ACTIVATION_ENABLED,
        ))
    if pv in (_PICKUP_QUARANTINE, _PICKUP_PENDING):
        return _quarantine("pickup_" + str(pv).lower(), owner_class=owner_class)

    # 그 외 (FAIL / REJECT / SEALED_KEY_MISSING / NO_RESULT_JSON) → FIRE_FAILED.
    reasons = getattr(res, "reasons", None)
    err = "; ".join(reasons) if isinstance(reasons, list) and reasons else f"pickup verdict={pv}"
    return _emit(DriverRecord(
        ts=ts,
        result_path=path,
        verdict=VERDICT_FIRE_FAILED,
        owner_key_class=owner_class,
        error=err,
        activation=ACTIVATION_ENABLED,
    ))


# ── scan_once ───────────────────────────────────────────────────────────────────
def scan_once(
    root: str = CANONICAL_ROOT,
    *,
    pickup_fn=None,
    verify_fn=None,
    probe=None,
    clock=None,
    executor_key: str = "",
    ledger_path: Optional[str] = None,
    quarantine_dir: Optional[str] = None,
    processed_dir: Optional[str] = None,
    paths: Optional[list] = None,   # None 이면 glob(root/memory/events/task-*.result.json), 주어지면 그 목록
    flag_reader=None,
    write_evidence: bool = True,
    evidence_path: Optional[str] = None,
    stable_sec: Optional[float] = None,
    readiness_retries: Optional[int] = None,
    readiness_interval: Optional[float] = None,
    sleep_fn=None,
) -> list:
    """진입점. (1) activation 재확인: disabled → [NOOP_DISABLED] 1건 + evidence + pickup 미호출.
    (2) enabled → 후보 경로 결정 → 각 path process_one → record 목록 반환. 각 record evidence append."""
    clock = clock or (lambda: datetime.now(KST))

    # (1) activation 재확인
    if not is_activated(root, flag_reader=flag_reader):
        rec = DriverRecord(
            ts=_now_kst(clock),
            result_path="",
            verdict=VERDICT_NOOP_DISABLED,
            activation=ACTIVATION_DISABLED,
        )
        if write_evidence:
            _append_evidence(rec, root, evidence_path)
        return [rec]

    # (2) enabled → 후보 경로 결정
    if paths is None:
        pattern = os.path.join(root, EVENTS_DIR_REL, RESULT_GLOB)
        candidates = sorted(glob.glob(pattern))
    else:
        candidates = list(paths)

    records = []
    for p in candidates:
        rec = process_one(
            p,
            root=root,
            pickup_fn=pickup_fn,
            verify_fn=verify_fn,
            probe=probe,
            clock=clock,
            executor_key=executor_key,
            ledger_path=ledger_path,
            quarantine_dir=quarantine_dir,
            processed_dir=processed_dir,
            write_evidence=write_evidence,
            evidence_path=evidence_path,
            stable_sec=stable_sec,
            readiness_retries=readiness_retries,
            readiness_interval=readiness_interval,
            sleep_fn=sleep_fn,
        )
        records.append(rec)
    return records


# ── CLI ───────────────────────────────────────────────────────────────────────
def main(argv=None) -> int:
    """scan_once(CANONICAL_ROOT) 실행 후 0 반환. CLI 인자 처리 최소."""
    scan_once(CANONICAL_ROOT)
    return 0


__all__ = [
    "DRIVER_NAME",
    "ACTIVATION_FLAG_REL",
    "EVENTS_DIR_REL",
    "QUARANTINE_DIR_REL",
    "PROCESSED_DIR_REL",
    "EVIDENCE_JSONL_REL",
    "RESULT_GLOB",
    "STABLE_SEC",
    "STABILITY_RETRIES",
    "STABILITY_INTERVAL_SEC",
    "VERDICT_WAKE_BUILT",
    "VERDICT_PICKUP_SKIP",
    "VERDICT_QUARANTINE",
    "VERDICT_FIRE_FAILED",
    "VERDICT_NOOP_DISABLED",
    "VERDICT_NOOP_NOT_TARGET",
    "VERDICT_NOOP_NOT_READY",
    "OKC_ANU",
    "OKC_FOREIGN",
    "OKC_SELF",
    "ACTIVATION_ENABLED",
    "ACTIVATION_DISABLED",
    "KST",
    "DriverRecord",
    "read_activation",
    "is_activated",
    "is_target",
    "_check_readiness",
    "process_one",
    "scan_once",
    "main",
]

if __name__ == "__main__":
    raise SystemExit(main())
