# -*- coding: utf-8 -*-
"""task-2721 P0-b ANU pickup driver — regression (11).

대상 모듈: dispatch.anu_pickup_driver (불칸 동시 구현).

절대 제약:
- ANU key literal 'c119085...' 를 테스트 코드에 절대 쓰지 않는다 (0건).
- pickup_once 실호출 절대 금지 — 전부 mock/stub. pickup_fn / verify_fn 은
  호출 인자로 mock 주입한다. 실제 wake/cron 발사 0건.
- tmp_path fixture 만 사용. 실제 memory 디렉토리에 쓰지 않는다.
  activation flag 도 tmp dir 에만 둔다.

이 파일은 driver 의 공개 API 계약(task-2721 spec)에 맞춰 작성된 회귀 테스트다.
"""
from __future__ import annotations

import glob
import json
import os
import sys
import types
from pathlib import Path

import pytest

# regression/conftest.py 가 worktree root 를 sys.path[0] 에 보장하지만,
# 단독 실행/순서 변동 대비로 한 번 더 보강한다.
_ROOT = Path(__file__).resolve().parents[2]
if str(_ROOT) not in sys.path:
    sys.path.insert(0, str(_ROOT))

# pytest 기본 import 모드(prepend)에서, tests/ 에 __init__.py 가 없고
# tests/dispatch/__init__.py 가 존재하면 tests/ 가 sys.path 에 prepend 되어
# `import dispatch` 가 워크트리 root 의 실제 dispatch 패키지 대신 tests/dispatch
# (테스트용 빈 패키지)로 해석된다. 실제 dispatch 패키지를 파일 위치로 직접 로드해
# sys.modules 에 고정한 뒤 서브모듈을 import 한다.
import importlib.util as _ilu  # noqa: E402

_real_init = _ROOT / "dispatch" / "__init__.py"
_cached = sys.modules.get("dispatch")
if _cached is None or (getattr(_cached, "__file__", "") or "") != str(_real_init):
    for _k in [k for k in list(sys.modules) if k == "dispatch" or k.startswith("dispatch.")]:
        del sys.modules[_k]
    _spec = _ilu.spec_from_file_location(
        "dispatch", _real_init, submodule_search_locations=[str(_ROOT / "dispatch")]
    )
    assert _spec is not None and _spec.loader is not None
    _pkg = _ilu.module_from_spec(_spec)
    sys.modules["dispatch"] = _pkg
    _spec.loader.exec_module(_pkg)

from dispatch import anu_pickup_driver as drv  # noqa: E402


# ─────────────────────────────────────────────────────────────────────────────
# 헬퍼
# ─────────────────────────────────────────────────────────────────────────────
VALID_PAYLOAD = {
    "task_id": "task-999",
    "completion_signal": "EXECUTOR_RESULT_WRITTEN",
    "collector_envelope": {"task_id": "task-999", "schedule_id": "sch-1"},
    "report_path": "r.md",
    "sha256": "abc",
}


def _make_dirs(tmp_path: Path) -> Path:
    """tmp_path 안에 memory/events, memory/state 디렉토리를 만들고 root 를 반환."""
    (tmp_path / "memory" / "events").mkdir(parents=True, exist_ok=True)
    (tmp_path / "memory" / "state").mkdir(parents=True, exist_ok=True)
    return tmp_path


def _events_dir(root: Path) -> Path:
    return root / "memory" / "events"


def _write_result(root: Path, name: str = "task-999.result.json", payload=None) -> Path:
    """memory/events 아래에 result.json 을 쓰고 경로를 반환."""
    p = _events_dir(root) / name
    if payload is None:
        payload = VALID_PAYLOAD
    p.write_text(json.dumps(payload), encoding="utf-8")
    return p


def _enable_activation(root: Path) -> None:
    """tmp dir 의 activation flag 파일에 'enabled' 작성."""
    flag = root / "memory" / "state" / "p0b_driver_enabled"
    flag.parent.mkdir(parents=True, exist_ok=True)
    flag.write_text(drv.ACTIVATION_ENABLED + "\n", encoding="utf-8")


import time as _time


def _age(path, seconds: float = 10.0) -> None:
    """result 파일 mtime/atime 을 과거로 설정해 readiness 'aged' 게이트를 통과시킨다.
    (write race 방어 readiness window 회피용 — 테스트에서 writer flush 완료 상태 모사.)"""
    past = _time.time() - seconds
    os.utime(str(path), (past, past))


# 모든 process_one/scan_once 호출에서 readiness sleep 을 no-op 로 만들어 테스트 가속.
_NO_SLEEP = lambda *a, **k: None  # noqa: E731


# ── mock 헬퍼 (호출 횟수 기록) ───────────────────────────────────────────────
def make_pickup_mock(verdict: str = "WAKE_BUILT"):
    """pickup_fn mock. 호출 인자를 calls 리스트에 기록한다.

    mock.calls -> list[(args, kwargs)]
    호출 시 SimpleNamespace(verdict, ok, argv, task_id, reasons) 반환.
    실제 wake/cron 발사는 전혀 하지 않는다 (순수 stub).
    """
    calls: list = []

    def _pickup(*args, **kwargs):
        calls.append((args, kwargs))
        return types.SimpleNamespace(
            verdict=verdict,
            ok=(verdict == "WAKE_BUILT"),
            argv=["x"],
            task_id="task-999",
            reasons=[],
        )

    _pickup.calls = calls  # type: ignore[attr-defined]
    return _pickup


def make_verify_mock(verdict: str = "AUTHORITATIVE"):
    """verify_fn mock. 호출 인자를 calls 리스트에 기록한다."""
    calls: list = []

    def _verify(*args, **kwargs):
        calls.append((args, kwargs))
        return types.SimpleNamespace(
            verdict=verdict,
            ok=(verdict == "AUTHORITATIVE"),
            classification="",
            reasons=[],
        )

    _verify.calls = calls  # type: ignore[attr-defined]
    return _verify


# ─────────────────────────────────────────────────────────────────────────────
# 1. default disabled → NOOP, pickup 미호출
# ─────────────────────────────────────────────────────────────────────────────
def test_default_disabled_noop(tmp_path):
    root = _make_dirs(tmp_path)
    pickup = make_pickup_mock()
    # flag 파일 없음 + flag_reader 가 None 반환 → disabled
    records = drv.scan_once(
        root,
        pickup_fn=pickup,
        flag_reader=lambda: None,
        write_evidence=False,
    )
    assert len(records) == 1
    rec = records[0]
    assert rec.verdict == drv.VERDICT_NOOP_DISABLED
    assert rec.activation == drv.ACTIVATION_DISABLED
    # pickup_fn 은 절대 호출되지 않는다
    assert len(pickup.calls) == 0


# ─────────────────────────────────────────────────────────────────────────────
# 2. is_target / not-target → NOOP_NOT_TARGET, pickup 미호출
# ─────────────────────────────────────────────────────────────────────────────
def test_scan_limited_not_target(tmp_path):
    root = _make_dirs(tmp_path)
    ev = _events_dir(root)
    pickup = make_pickup_mock()

    p_md = ev / "foo.md"
    p_jsonl = ev / "bar.jsonl"
    p_tmp = ev / "task-1.result.json.tmp-9"
    for p in (p_md, p_jsonl, p_tmp):
        p.write_text("x", encoding="utf-8")

    # is_target 직접 단언: final 만 True
    final = ev / "task-7.result.json"
    final.write_text(json.dumps(VALID_PAYLOAD), encoding="utf-8")
    assert drv.is_target(str(final)) is True
    assert drv.is_target(str(p_md)) is False
    assert drv.is_target(str(p_jsonl)) is False
    assert drv.is_target(str(p_tmp)) is False

    # 비-target 경로들에 직접 process_one → 각 NOOP_NOT_TARGET
    for p in (p_md, p_jsonl, p_tmp):
        rec = drv.process_one(str(p), root=root, pickup_fn=pickup)
        assert rec.verdict == drv.VERDICT_NOOP_NOT_TARGET

    assert len(pickup.calls) == 0


# ─────────────────────────────────────────────────────────────────────────────
# 3. size0 → QUARANTINE("size0"), 파일 이동, pickup 미호출
# ─────────────────────────────────────────────────────────────────────────────
def test_quarantine_size0(tmp_path):
    root = _make_dirs(tmp_path)
    _enable_activation(root)
    pickup = make_pickup_mock()

    p = _events_dir(root) / "task-999.result.json"
    p.write_text("", encoding="utf-8")  # 0 byte
    _age(p)
    assert p.stat().st_size == 0

    rec = drv.process_one(str(p), root=root, pickup_fn=pickup, sleep_fn=_NO_SLEEP)
    assert rec.verdict == drv.VERDICT_QUARANTINE
    assert rec.quarantine_reason == "size0"
    assert rec.quarantined is True
    assert len(pickup.calls) == 0

    # 원본 경로 부재 + quarantine 디렉토리로 이동
    assert not p.exists()
    qdir = root / "memory" / "p0b_state" / "quarantine"
    assert (qdir / "task-999.result.json").exists()
    # ★ 출력물(quarantine)이 watched 디렉터리(memory/events) 밖에 있어야 함
    assert not (root / "memory" / "events" / "quarantine" / "task-999.result.json").exists()


# ─────────────────────────────────────────────────────────────────────────────
# 4. parse fail → QUARANTINE("parse_fail"), pickup 미호출
# ─────────────────────────────────────────────────────────────────────────────
def test_quarantine_parse_fail(tmp_path):
    root = _make_dirs(tmp_path)
    _enable_activation(root)
    pickup = make_pickup_mock()

    p = _events_dir(root) / "task-999.result.json"
    p.write_text('"{invalid json', encoding="utf-8")
    _age(p)

    rec = drv.process_one(str(p), root=root, pickup_fn=pickup, sleep_fn=_NO_SLEEP)
    assert rec.verdict == drv.VERDICT_QUARANTINE
    assert rec.quarantine_reason == "parse_fail"
    assert len(pickup.calls) == 0


# ─────────────────────────────────────────────────────────────────────────────
# 5. schema fail (task_id 누락) → QUARANTINE("schema_fail"), pickup 미호출
# ─────────────────────────────────────────────────────────────────────────────
def test_quarantine_schema_fail(tmp_path):
    root = _make_dirs(tmp_path)
    _enable_activation(root)
    pickup = make_pickup_mock()

    p = _events_dir(root) / "task-999.result.json"
    p.write_text(json.dumps({"foo": 1}), encoding="utf-8")
    _age(p)

    rec = drv.process_one(str(p), root=root, pickup_fn=pickup, sleep_fn=_NO_SLEEP)
    assert rec.verdict == drv.VERDICT_QUARANTINE
    assert rec.quarantine_reason == "schema_fail"
    assert len(pickup.calls) == 0


# ─────────────────────────────────────────────────────────────────────────────
# 6. owner proof fail → QUARANTINE("owner_proof_fail"), pickup 미호출,
#    owner_key_class 가 OKC_ANU 아님
# ─────────────────────────────────────────────────────────────────────────────
def test_quarantine_owner_proof_fail(tmp_path):
    root = _make_dirs(tmp_path)
    _enable_activation(root)
    pickup = make_pickup_mock()
    verify = make_verify_mock(verdict="QUARANTINED")  # AUTHORITATIVE 아님

    p = _write_result(root)  # valid schema + envelope 있음
    _age(p)

    rec = drv.process_one(str(p), root=root, pickup_fn=pickup, verify_fn=verify,
                          sleep_fn=_NO_SLEEP)
    assert rec.verdict == drv.VERDICT_QUARANTINE
    assert rec.quarantine_reason == "owner_proof_fail"
    assert len(pickup.calls) == 0
    assert rec.owner_key_class != drv.OKC_ANU
    assert rec.owner_key_class in (drv.OKC_FOREIGN, drv.OKC_SELF)


# ─────────────────────────────────────────────────────────────────────────────
# 7. owner unprovable (collector_envelope 없음) → QUARANTINE("owner_unprovable")
# ─────────────────────────────────────────────────────────────────────────────
def test_quarantine_owner_unprovable(tmp_path):
    root = _make_dirs(tmp_path)
    _enable_activation(root)
    pickup = make_pickup_mock()
    verify = make_verify_mock()  # AUTHORITATIVE 지만 envelope 없어 호출 전 차단

    payload = dict(VALID_PAYLOAD)
    payload.pop("collector_envelope")
    p = _write_result(root, payload=payload)
    _age(p)

    rec = drv.process_one(str(p), root=root, pickup_fn=pickup, verify_fn=verify,
                          sleep_fn=_NO_SLEEP)
    assert rec.verdict == drv.VERDICT_QUARANTINE
    assert rec.quarantine_reason == "owner_unprovable"
    assert len(pickup.calls) == 0


# ─────────────────────────────────────────────────────────────────────────────
# 8. dedupe done marker → PICKUP_SKIP, pickup 미호출
# ─────────────────────────────────────────────────────────────────────────────
def test_dedupe_done_marker_noop(tmp_path):
    root = _make_dirs(tmp_path)
    _enable_activation(root)
    pickup = make_pickup_mock()
    verify = make_verify_mock(verdict="AUTHORITATIVE")

    p = _write_result(root)
    _age(p)
    # done 마커 생성: {task_id}.pickup.done
    marker = _events_dir(root) / "task-999.pickup.done"
    marker.write_text("done", encoding="utf-8")

    rec = drv.process_one(str(p), root=root, pickup_fn=pickup, verify_fn=verify,
                          sleep_fn=_NO_SLEEP)
    assert rec.verdict == drv.VERDICT_PICKUP_SKIP
    assert rec.quarantined is False
    assert len(pickup.calls) == 0


# ─────────────────────────────────────────────────────────────────────────────
# 9. 6조건 통과 → WAKE_BUILT, owner_key_class==OKC_ANU, pickup 정확히 1회
# ─────────────────────────────────────────────────────────────────────────────
def test_six_conditions_pass_wake_built(tmp_path):
    root = _make_dirs(tmp_path)
    _enable_activation(root)
    pickup = make_pickup_mock(verdict="WAKE_BUILT")
    verify = make_verify_mock(verdict="AUTHORITATIVE")

    p = _write_result(root)
    _age(p)

    rec = drv.process_one(str(p), root=root, pickup_fn=pickup, verify_fn=verify,
                          sleep_fn=_NO_SLEEP)
    assert rec.verdict == drv.VERDICT_WAKE_BUILT
    assert rec.owner_key_class == drv.OKC_ANU

    # pickup_fn 정확히 1회 호출 + 첫 positional 이 result 경로
    assert len(pickup.calls) == 1
    args, kwargs = pickup.calls[0]
    assert args[0] == str(p)


# ─────────────────────────────────────────────────────────────────────────────
# 10. scan_once evidence jsonl 생성 + key literal 0건
# ─────────────────────────────────────────────────────────────────────────────
def test_scan_once_evidence_jsonl(tmp_path):
    root = _make_dirs(tmp_path)
    _enable_activation(root)
    pickup = make_pickup_mock(verdict="WAKE_BUILT")
    verify = make_verify_mock(verdict="AUTHORITATIVE")

    p = _write_result(root)
    _age(p)

    records = drv.scan_once(
        root,
        pickup_fn=pickup,
        verify_fn=verify,
        write_evidence=True,
        sleep_fn=_NO_SLEEP,
    )
    assert len(records) >= 1

    evidence = root / "memory" / "p0b_state" / "driver_runs.jsonl"
    assert evidence.exists()
    # ★ evidence jsonl 이 watched 디렉터리 밖(memory/p0b_state)에 기록되는지 확인
    assert not (root / "memory" / "events" / "p0b_driver_runs.jsonl").exists()
    content = evidence.read_text(encoding="utf-8")

    lines = [ln for ln in content.splitlines() if ln.strip()]
    assert len(lines) >= 1
    for ln in lines:
        obj = json.loads(ln)  # 각 줄 JSON 파싱 가능
        assert obj.get("driver") == drv.DRIVER_NAME

    # "driver":"systemd-path" 포함
    assert '"driver"' in content
    assert drv.DRIVER_NAME in content

    # ★ ANU key literal 0건 (분할 문자열 조합으로 literal 직접 기재 회피)
    forbidden = "c119085" + "addb0f8b7"
    assert forbidden not in content


# ─────────────────────────────────────────────────────────────────────────────
# 11. record.to_json() 에 key literal 미포함
# ─────────────────────────────────────────────────────────────────────────────
def test_no_key_literal_in_records(tmp_path):
    root = _make_dirs(tmp_path)
    _enable_activation(root)
    pickup = make_pickup_mock(verdict="WAKE_BUILT")
    verify = make_verify_mock(verdict="AUTHORITATIVE")

    p = _write_result(root)
    _age(p)
    rec = drv.process_one(str(p), root=root, pickup_fn=pickup, verify_fn=verify,
                          sleep_fn=_NO_SLEEP)

    dumped = json.dumps(rec.to_json())
    forbidden = "c119085" + "addb0f8b7"
    assert forbidden not in dumped


# ─────────────────────────────────────────────────────────────────────────────
# 12. HIGH-1: entrypoint lock 경로에 UID 포함 + /tmp 고정 경로 미사용
# ─────────────────────────────────────────────────────────────────────────────
def test_entrypoint_lock_path_uid_isolated():
    """HIGH-1: entrypoint lock 경로에 UID 포함 + /tmp 고정 경로 미사용."""
    root = Path(__file__).resolve().parents[2]
    sh = (root / "scripts" / "anu_pickup_entrypoint.sh").read_text(encoding="utf-8")
    # UID 분리 흔적(변수/경로에 UID 포함)
    assert "UID" in sh
    # /tmp/anu-pickup.driver.lock 고정 문자열 미사용 (XDG_RUNTIME_DIR 치환 후에도)
    assert "/tmp/anu-pickup.driver.lock" not in sh.replace("XDG_RUNTIME_DIR", "")
    # driver-level flock single-flight 유지
    assert "flock" in sh


# ─────────────────────────────────────────────────────────────────────────────
# 13. HIGH-2: quarantine/evidence 출력 경로가 watched 디렉터리 밖
# ─────────────────────────────────────────────────────────────────────────────
def test_driver_outputs_outside_watched_dir():
    """HIGH-2: quarantine/evidence 출력 경로가 watched 디렉터리(memory/events) 밖."""
    assert drv.QUARANTINE_DIR_REL.startswith("memory/p0b_state")
    assert drv.EVIDENCE_JSONL_REL.startswith("memory/p0b_state")
    # watched 입력 디렉터리(memory/events) 밖이어야 함
    assert not drv.QUARANTINE_DIR_REL.startswith("memory/events")
    assert not drv.EVIDENCE_JSONL_REL.startswith("memory/events")
    # 입력 스캔 디렉터리는 여전히 memory/events
    assert drv.EVENTS_DIR_REL == "memory/events"


# ─────────────────────────────────────────────────────────────────────────────
# 14. path traversal: task_id 에 ../, /, \ 포함 → schema_fail quarantine
#     + pickup 미호출 + done_path 가 result_dir 밖에 생성되지 않음
# ─────────────────────────────────────────────────────────────────────────────
@pytest.mark.parametrize("evil_task_id", [
    "../../../etc/task-pwn",
    "../task-escape",
    "sub/dir/task-x",
    "..\\task-win",
    "/abs/task-abs",
])
def test_path_traversal_task_id_quarantine(tmp_path, evil_task_id):
    root = _make_dirs(tmp_path)
    _enable_activation(root)
    pickup = make_pickup_mock(verdict="WAKE_BUILT")
    verify = make_verify_mock(verdict="AUTHORITATIVE")

    payload = dict(VALID_PAYLOAD)
    payload["task_id"] = evil_task_id
    payload["collector_envelope"] = {"task_id": evil_task_id, "schedule_id": "sch-1"}
    p = _write_result(root, payload=payload)
    _age(p)

    rec = drv.process_one(str(p), root=root, pickup_fn=pickup, verify_fn=verify,
                          sleep_fn=_NO_SLEEP)

    # owner proof 전(schema 단계)에서 차단되어야 함
    assert rec.verdict == drv.VERDICT_QUARANTINE
    assert rec.quarantine_reason == "schema_fail"
    # pickup_fn / verify_fn 절대 미호출 (신뢰경계 통과 전 차단)
    assert len(pickup.calls) == 0
    assert len(verify.calls) == 0

    # done_path 가 result_dir(memory/events) 밖에 생성되지 않음
    result_dir = _events_dir(root)
    leaked = list(result_dir.parent.parent.rglob("*.pickup.done"))
    assert leaked == []


# ─────────────────────────────────────────────────────────────────────────────
# 15. 정상 task_id(basename==task_id, separator 없음)는 path traversal 검증
#     통과 → WAKE_BUILT (정상 경로 회귀 0)
# ─────────────────────────────────────────────────────────────────────────────
def test_normal_task_id_passes_traversal_guard(tmp_path):
    root = _make_dirs(tmp_path)
    _enable_activation(root)
    pickup = make_pickup_mock(verdict="WAKE_BUILT")
    verify = make_verify_mock(verdict="AUTHORITATIVE")

    p = _write_result(root)  # task_id="task-999"
    _age(p)
    rec = drv.process_one(str(p), root=root, pickup_fn=pickup, verify_fn=verify,
                          sleep_fn=_NO_SLEEP)
    assert rec.verdict == drv.VERDICT_WAKE_BUILT
    assert rec.owner_key_class == drv.OKC_ANU
    assert len(pickup.calls) == 1


# ─────────────────────────────────────────────────────────────────────────────
# 16. fail-safe: activation flag 가 invalid UTF-8 → read_activation 크래시 0,
#     disabled 처리(no-op). is_activated False.
# ─────────────────────────────────────────────────────────────────────────────
def test_failsafe_activation_invalid_utf8(tmp_path):
    root = _make_dirs(tmp_path)
    flag = root / "memory" / "state" / "p0b_driver_enabled"
    flag.parent.mkdir(parents=True, exist_ok=True)
    flag.write_bytes(b"\xff\xfe enabled")  # invalid UTF-8

    # 크래시 없이 "" 반환 → disabled
    assert drv.read_activation(str(root)) == ""
    assert drv.is_activated(str(root)) is False

    pickup = make_pickup_mock()
    records = drv.scan_once(str(root), pickup_fn=pickup, write_evidence=False)
    assert len(records) == 1
    assert records[0].verdict == drv.VERDICT_NOOP_DISABLED
    assert len(pickup.calls) == 0


# ─────────────────────────────────────────────────────────────────────────────
# 17. fail-safe: dedupe ledger 가 invalid UTF-8 → _dedupe_hit 크래시 0,
#     False(보수 처리: dedupe 미확인) 반환. 정상 경로 동작 유지.
# ─────────────────────────────────────────────────────────────────────────────
def test_failsafe_dedupe_ledger_invalid_utf8(tmp_path):
    root = _make_dirs(tmp_path)
    ledger = root / "memory" / "events" / "callback_4tuple_index.jsonl"
    ledger.parent.mkdir(parents=True, exist_ok=True)
    ledger.write_bytes(b"\xff\xfe not utf8")  # invalid UTF-8

    # 크래시 없이 False 반환 (보수 처리)
    assert drv._dedupe_hit("task-999", str(ledger), str(root)) is False


# ─────────────────────────────────────────────────────────────────────────────
# 18. fail-safe: quarantine move 중 shutil.Error(OSError 미상속) → 크래시 0,
#     에러 메시지 반환(None 아님). process_one 도 크래시 없이 QUARANTINE record.
# ─────────────────────────────────────────────────────────────────────────────
def test_failsafe_quarantine_shutil_error(tmp_path, monkeypatch):
    import shutil as _sh
    root = _make_dirs(tmp_path)
    _enable_activation(root)

    p = _events_dir(root) / "task-999.result.json"
    p.write_text("", encoding="utf-8")  # 0 byte → size0 quarantine 경로 유도
    _age(p)

    def _boom(src, dst):
        raise _sh.Error("simulated shutil.Error")
    monkeypatch.setattr(drv.shutil, "move", _boom)

    # _quarantine_move 직접: 크래시 없이 에러 메시지(None 아님)
    msg = drv._quarantine_move(str(p), str(root), None)
    assert msg is not None
    assert "shutil" in msg.lower() or "실패" in msg

    # process_one 전체도 크래시 없이 QUARANTINE record
    pickup = make_pickup_mock()
    rec = drv.process_one(str(p), root=str(root), pickup_fn=pickup, sleep_fn=_NO_SLEEP)
    assert rec.verdict == drv.VERDICT_QUARANTINE
    assert len(pickup.calls) == 0


# ─────────────────────────────────────────────────────────────────────────────
# 19. write race: 부분 JSON + 최근 mtime → NOOP_NOT_READY(DEFER),
#     즉시 quarantine 안 됨 + pickup 미호출 + quarantine 디렉토리 미생성
# ─────────────────────────────────────────────────────────────────────────────
def test_partial_json_recent_mtime_defers(tmp_path):
    root = _make_dirs(tmp_path)
    _enable_activation(root)
    pickup = make_pickup_mock()
    verify = make_verify_mock()

    p = _events_dir(root) / "task-999.result.json"
    p.write_text('{"task_id": "task-999", "completion_sig', encoding="utf-8")  # 잘린 부분 JSON
    # ★ 일부러 aged 처리 하지 않음 → 최근 mtime → DEFER 되어야 함

    rec = drv.process_one(str(p), root=str(root), pickup_fn=pickup, verify_fn=verify,
                          sleep_fn=_NO_SLEEP)
    assert rec.verdict == drv.VERDICT_NOOP_NOT_READY
    assert rec.quarantined is False
    # pickup/verify 절대 미호출
    assert len(pickup.calls) == 0
    assert len(verify.calls) == 0
    # 원본 파일 그대로 존재 (이동 안 됨)
    assert p.exists()
    # quarantine 디렉토리에 파일 없음
    qfile = root / "memory" / "p0b_state" / "quarantine" / "task-999.result.json"
    assert not qfile.exists()


# ─────────────────────────────────────────────────────────────────────────────
# 20. 오래된 invalid JSON (mtime 충분히 과거) → grace 후 quarantine("parse_fail")
# ─────────────────────────────────────────────────────────────────────────────
def test_old_invalid_json_quarantines_after_grace(tmp_path):
    root = _make_dirs(tmp_path)
    _enable_activation(root)
    pickup = make_pickup_mock()

    p = _events_dir(root) / "task-999.result.json"
    p.write_text('{"task_id": "task-999", "completion_sig', encoding="utf-8")  # invalid
    _age(p, seconds=10)  # mtime 과거 → readiness 통과

    rec = drv.process_one(str(p), root=str(root), pickup_fn=pickup, sleep_fn=_NO_SLEEP)
    assert rec.verdict == drv.VERDICT_QUARANTINE
    assert rec.quarantine_reason == "parse_fail"
    assert len(pickup.calls) == 0
    # quarantine 디렉토리로 이동
    assert not p.exists()
    assert (root / "memory" / "p0b_state" / "quarantine" / "task-999.result.json").exists()


# ─────────────────────────────────────────────────────────────────────────────
# 21. null byte 부분 write: 최근 mtime → DEFER, aged → quarantine("null_byte")
# ─────────────────────────────────────────────────────────────────────────────
def test_null_byte_defer_then_quarantine(tmp_path):
    root = _make_dirs(tmp_path)
    _enable_activation(root)
    pickup = make_pickup_mock()

    # (a) 최근 mtime + null byte → DEFER (grace 내)
    p = _events_dir(root) / "task-999.result.json"
    p.write_bytes(b'{"task_id": "task-999"\x00\x00 truncated')
    rec_defer = drv.process_one(str(p), root=str(root), pickup_fn=pickup, sleep_fn=_NO_SLEEP)
    assert rec_defer.verdict == drv.VERDICT_NOOP_NOT_READY
    assert rec_defer.quarantined is False
    assert p.exists()

    # (b) aged + null byte 잔존 → quarantine("null_byte")
    _age(p, seconds=10)
    rec_q = drv.process_one(str(p), root=str(root), pickup_fn=pickup, sleep_fn=_NO_SLEEP)
    assert rec_q.verdict == drv.VERDICT_QUARANTINE
    assert rec_q.quarantine_reason == "null_byte"
    assert len(pickup.calls) == 0
    assert not p.exists()
    assert (root / "memory" / "p0b_state" / "quarantine" / "task-999.result.json").exists()


# ─────────────────────────────────────────────────────────────────────────────
# 22. _check_readiness 단위: aged+stable → ready, recent → not ready,
#     stat 실패 → not ready
# ─────────────────────────────────────────────────────────────────────────────
def test_check_readiness_unit(tmp_path):
    from datetime import datetime
    root = _make_dirs(tmp_path)
    p = _events_dir(root) / "task-999.result.json"
    p.write_text(json.dumps(VALID_PAYLOAD), encoding="utf-8")
    clock = lambda: datetime.now(drv.KST)

    # 최근 mtime → not ready (recent_mtime)
    ready, reason = drv._check_readiness(str(p), clock=clock, sleep_fn=_NO_SLEEP)
    assert ready is False
    assert reason == "recent_mtime"

    # aged → ready
    _age(p, seconds=10)
    ready2, reason2 = drv._check_readiness(str(p), clock=clock, sleep_fn=_NO_SLEEP)
    assert ready2 is True
    assert reason2 == "ready"

    # 존재하지 않는 파일 → stat_fail (not ready)
    ready3, reason3 = drv._check_readiness(str(_events_dir(root) / "nope.result.json"),
                                           clock=clock, sleep_fn=_NO_SLEEP)
    assert ready3 is False
    assert reason3 == "stat_fail"


# ─────────────────────────────────────────────────────────────────────────────
# 23. WAKE_BUILT terminal → result 파일 watched 밖 processed 로 이동
# ─────────────────────────────────────────────────────────────────────────────
def test_wake_built_moves_result_out_of_watched(tmp_path):
    """WAKE_BUILT 도달 시 원본 memory/events 에서 삭제 + processed 에 존재.
    glob 재매칭 0 입증 (무한 재트리거 소멸 확인)."""
    root = _make_dirs(tmp_path)
    _enable_activation(root)
    pickup = make_pickup_mock(verdict="WAKE_BUILT")
    verify = make_verify_mock(verdict="AUTHORITATIVE")

    p = _write_result(root)
    _age(p)

    rec = drv.process_one(str(p), root=root, pickup_fn=pickup, verify_fn=verify,
                          sleep_fn=_NO_SLEEP)
    assert rec.verdict == drv.VERDICT_WAKE_BUILT

    # 원본 파일 삭제됨
    assert p.exists() is False

    # processed 디렉토리에 파일 이동됨
    processed = root / "memory" / "p0b_state" / "processed"
    assert (processed / "task-999.result.json").exists() is True

    # ★ glob 재매칭 0 입증: watched 디렉터리에 result.json 없음
    remaining = glob.glob(str(_events_dir(root) / "task-*.result.json"))
    assert remaining == []


# ─────────────────────────────────────────────────────────────────────────────
# 24. PICKUP_SKIP (done marker) → processed 로 이동, glob 재매칭 0
# ─────────────────────────────────────────────────────────────────────────────
def test_pickup_skip_done_marker_moves_out(tmp_path):
    """dedupe done marker 존재 시 PICKUP_SKIP → 원본 processed 로 이동."""
    root = _make_dirs(tmp_path)
    _enable_activation(root)
    pickup = make_pickup_mock()
    verify = make_verify_mock(verdict="AUTHORITATIVE")

    p = _write_result(root)
    _age(p)
    # done 마커 생성: {task_id}.pickup.done
    marker = _events_dir(root) / "task-999.pickup.done"
    marker.write_text("done", encoding="utf-8")

    rec = drv.process_one(str(p), root=root, pickup_fn=pickup, verify_fn=verify,
                          sleep_fn=_NO_SLEEP)
    assert rec.verdict == drv.VERDICT_PICKUP_SKIP

    # 원본 파일 삭제됨
    assert p.exists() is False

    # processed 디렉토리에 파일 이동됨
    processed = root / "memory" / "p0b_state" / "processed"
    assert (processed / "task-999.result.json").exists() is True

    # ★ glob 재매칭 0 입증
    remaining = glob.glob(str(_events_dir(root) / "task-*.result.json"))
    assert remaining == []


# ─────────────────────────────────────────────────────────────────────────────
# 25. PICKUP_SKIP (pickup_fn SKIP_TERMINAL) → processed 로 이동
# ─────────────────────────────────────────────────────────────────────────────
def test_pickup_skip_terminal_from_pickup_fn_moves_out(tmp_path):
    """pickup_fn 이 SKIP_TERMINAL 반환 시 PICKUP_SKIP → processed 이동."""
    root = _make_dirs(tmp_path)
    _enable_activation(root)
    pickup = make_pickup_mock(verdict="SKIP_TERMINAL")
    verify = make_verify_mock(verdict="AUTHORITATIVE")

    p = _write_result(root)
    _age(p)
    # done 마커 없음 — pickup_fn 이 SKIP_TERMINAL 반환하는 케이스

    rec = drv.process_one(str(p), root=root, pickup_fn=pickup, verify_fn=verify,
                          sleep_fn=_NO_SLEEP)
    assert rec.verdict == drv.VERDICT_PICKUP_SKIP

    # 원본 파일 삭제됨
    assert p.exists() is False

    # processed 디렉토리에 파일 이동됨
    processed = root / "memory" / "p0b_state" / "processed"
    assert (processed / "task-999.result.json").exists() is True

    # pickup_fn 정확히 1회 호출 (dedupe skip 이 아닌 pickup_fn 에 의한 skip)
    assert len(pickup.calls) == 1


# ─────────────────────────────────────────────────────────────────────────────
# 26. NOOP_NOT_READY(DEFER) → 파일 잔류, glob 길이 1 (재평가 대상)
# ─────────────────────────────────────────────────────────────────────────────
def test_defer_stays_for_retrigger(tmp_path):
    """최근 mtime(부분 JSON) → NOOP_NOT_READY. 파일 잔류 + glob 길이 1 입증.
    DEFER 는 terminal 아님 — 의도적 잔류."""
    root = _make_dirs(tmp_path)
    _enable_activation(root)
    pickup = make_pickup_mock()
    verify = make_verify_mock()

    p = _events_dir(root) / "task-999.result.json"
    p.write_text('{"task_id": "task-999", "completion_sig', encoding="utf-8")
    # ★ _age 호출 안 함 → 최근 mtime → DEFER

    rec = drv.process_one(str(p), root=root, pickup_fn=pickup, verify_fn=verify,
                          sleep_fn=_NO_SLEEP)
    assert rec.verdict == drv.VERDICT_NOOP_NOT_READY

    # 원본 파일 잔류 (이동 안 됨)
    assert p.exists() is True

    # processed 디렉토리에 파일 없음
    processed = root / "memory" / "p0b_state" / "processed"
    assert not (processed / "task-999.result.json").exists()

    # ★ glob 길이 1 — 재평가 대상으로 남음 (무한 재트리거가 아닌 의도적 잔류)
    remaining = glob.glob(str(_events_dir(root) / "task-*.result.json"))
    assert len(remaining) == 1


# ─────────────────────────────────────────────────────────────────────────────
# 27. scan_once 후 watched 에 result 0 (무한 재트리거 소멸 통합)
# ─────────────────────────────────────────────────────────────────────────────
def test_scan_once_no_glob_rematch_after_wake(tmp_path):
    """scan_once WAKE_BUILT 완료 후 watched(memory/events) 에 result.json 0건.
    무한 재트리거 소멸 입증."""
    root = _make_dirs(tmp_path)
    _enable_activation(root)
    pickup = make_pickup_mock(verdict="WAKE_BUILT")
    verify = make_verify_mock(verdict="AUTHORITATIVE")

    p = _write_result(root)
    _age(p)

    records = drv.scan_once(
        root,
        pickup_fn=pickup,
        verify_fn=verify,
        write_evidence=False,
        sleep_fn=_NO_SLEEP,
    )

    # watched 에 result.json 0건
    remaining = glob.glob(str(_events_dir(root) / "task-*.result.json"))
    assert remaining == []

    # records 에 WAKE_BUILT 1건 존재
    wake_records = [r for r in records if r.verdict == drv.VERDICT_WAKE_BUILT]
    assert len(wake_records) == 1


# ─────────────────────────────────────────────────────────────────────────────
# 28. terminal 이동 실패 fail-safe: 크래시 0 + rec.error 기록
# ─────────────────────────────────────────────────────────────────────────────
def test_terminal_move_failsafe(tmp_path, monkeypatch):
    """os.replace / shutil.move 둘 다 실패해도 process_one 크래시 없이 반환.
    rec.verdict==WAKE_BUILT 유지, rec.error 에 이동 실패 메시지 기록."""
    import shutil as _sh

    root = _make_dirs(tmp_path)
    _enable_activation(root)
    pickup = make_pickup_mock(verdict="WAKE_BUILT")
    verify = make_verify_mock(verdict="AUTHORITATIVE")

    p = _write_result(root)
    _age(p)

    def boom_oserror(src, dst):
        raise OSError("simulated os.replace failure")

    def boom_shutilerror(src, dst):
        raise _sh.Error("simulated shutil.move failure")

    monkeypatch.setattr(drv.os, "replace", boom_oserror)
    monkeypatch.setattr(drv.shutil, "move", boom_shutilerror)

    # 크래시 없이 반환되어야 함
    rec = drv.process_one(str(p), root=root, pickup_fn=pickup, verify_fn=verify,
                          sleep_fn=_NO_SLEEP)

    # verdict 는 WAKE_BUILT 유지
    assert rec.verdict == drv.VERDICT_WAKE_BUILT

    # 이동 실패 메시지가 error 필드에 기록됨
    assert rec.error is not None


# ─────────────────────────────────────────────────────────────────────────────
# 29. quarantine overwrite protection: 같은 basename 두 번 이동 시 suffix 보호
# ─────────────────────────────────────────────────────────────────────────────
def test_quarantine_overwrite_protection(tmp_path):
    """동일 basename 두 번 _quarantine_move: 첫 파일 보존 + 두 번째는 suffix 별도 파일.
    덮어쓰기 0 검증."""
    root = _make_dirs(tmp_path)

    # 첫 번째 result 파일 생성 및 quarantine 이동
    p1 = _events_dir(root) / "task-999.result.json"
    p1.write_text(json.dumps(VALID_PAYLOAD), encoding="utf-8")
    err1 = drv._quarantine_move(str(p1), str(root), None)
    assert err1 is None  # 첫 이동 성공

    qdir = root / "memory" / "p0b_state" / "quarantine"
    assert (qdir / "task-999.result.json").exists()

    # 두 번째 동일 basename 파일 생성 및 재이동
    p2 = _events_dir(root) / "task-999.result.json"
    payload2 = dict(VALID_PAYLOAD)
    payload2["retry"] = 2
    p2.write_text(json.dumps(payload2), encoding="utf-8")
    err2 = drv._quarantine_move(str(p2), str(root), None)
    assert err2 is None  # 두 번째 이동도 성공

    # quarantine 디렉토리 내 task-999.result.json 으로 시작하는 파일 2개 이상 존재
    all_files = os.listdir(str(qdir))
    matching = [f for f in all_files if f.startswith("task-999.result.json")]
    assert len(matching) >= 2, (
        f"overwrite 방지 실패: quarantine 에 {matching!r} ({len(matching)}개만 존재)"
    )


# ─────────────────────────────────────────────────────────────────────────────
# 30. entrypoint lock 정적 검사: /tmp 고정 없음 + XDG + mkdir-p + symlink guard + flock
# ─────────────────────────────────────────────────────────────────────────────
def test_entrypoint_lock_no_tmp_and_symlink_guard():
    """scripts/anu_pickup_entrypoint.sh 정적 분석:
    /tmp 고정 경로 미사용, XDG_RUNTIME_DIR 우선, mkdir -p 보장,
    symlink 거부 가드(-L), flock single-flight 유지."""
    script_root = Path(__file__).resolve().parents[2]
    sh = (script_root / "scripts" / "anu_pickup_entrypoint.sh").read_text(encoding="utf-8")

    # /tmp 고정 경로 없음 (XDG fallback 형태는 허용)
    assert "/tmp" not in sh, "/tmp 고정 경로가 entrypoint 에 있어서는 안 됩니다"

    # XDG_RUNTIME_DIR 우선 사용
    assert "XDG_RUNTIME_DIR" in sh, "XDG_RUNTIME_DIR 이 entrypoint 에 있어야 합니다"

    # parent dir 보장 (mkdir -p)
    assert "mkdir -p" in sh, "mkdir -p 가 entrypoint 에 있어야 합니다"

    # symlink 거부 가드 ([[ -L ... ]])
    assert "-L" in sh, "symlink 거부 가드(-L 검사)가 entrypoint 에 있어야 합니다"

    # single-flight flock 유지
    assert "flock" in sh, "flock single-flight 가 entrypoint 에 있어야 합니다"
