# -*- coding: utf-8 -*-
"""task-2729+7 P0-b pickup activation hardening — regression (10).

대상:
  - dispatch.anu_pickup_driver  : D1(MAX_FILES bounded scan) + D2(legacy cutoff)
  - dispatch.anu_result_pickup_runner : D3(ledger/marker write 실패 fail-closed/관측)

절대 제약 (test_anu_pickup_driver_2721 / test_anu_result_pickup_runner_2720 동일):
  - ANU key literal 'c119085...' 를 테스트 코드에 절대 쓰지 않는다 (완성 literal 0건).
    검증용으로만 분할 조합("c119085" + "addb0f8b7") 사용.
  - pickup_once 실호출은 mock(pickup_fn/verify_fn) 주입 또는 sealed_key_loader fake + gh_probe 미주입.
    실제 wake/cron/spawn 0.
  - canonical memory(/home/jay/workspace/memory/**) 쓰기 0 — tmp_path / tempfile 만.

회장 verbatim 검증 10항목 + 핵심 시나리오 커버.
"""
from __future__ import annotations

import glob
import importlib.util as _ilu
import json
import os
import sys
import tempfile
import time as _time
import types
from pathlib import Path

import pytest

# ── import 부트스트랩 (test_anu_pickup_driver_2721 동일 패턴) ──────────────────
_ROOT = Path(__file__).resolve().parents[2]
if str(_ROOT) not in sys.path:
    sys.path.insert(0, str(_ROOT))

# tests/dispatch (테스트용 빈 패키지) 가 실제 dispatch 패키지를 가리는 것을 방지:
# 실제 dispatch 패키지를 파일 위치로 직접 로드해 sys.modules 에 고정한다.
_real_init = _ROOT / "dispatch" / "__init__.py"
_cached = sys.modules.get("dispatch")
if _cached is None or (getattr(_cached, "__file__", "") or "") != str(_real_init):
    for _k in [k for k in list(sys.modules) if k == "dispatch" or k.startswith("dispatch.")]:
        del sys.modules[_k]
    _spec = _ilu.spec_from_file_location(
        "dispatch", _real_init, submodule_search_locations=[str(_ROOT / "dispatch")]
    )
    assert _spec is not None and _spec.loader is not None
    _pkg = _ilu.module_from_spec(_spec)
    sys.modules["dispatch"] = _pkg
    _spec.loader.exec_module(_pkg)

from dispatch import anu_pickup_driver as drv  # noqa: E402


def _load(modname: str, relpath: str):
    """runner 테스트(2720) 의 _load 패턴 — 의존 모듈 실 로드."""
    if modname in sys.modules:
        return sys.modules[modname]
    spec = _ilu.spec_from_file_location(modname, _ROOT / relpath)
    assert spec is not None and spec.loader is not None
    mod = _ilu.module_from_spec(spec)
    sys.modules[modname] = mod
    spec.loader.exec_module(mod)
    return mod


# runner D3 테스트용 모듈 (2720 패턴 동일)
_load("dispatch.callback_owner_enforcer", "dispatch/callback_owner_enforcer.py")
_load("dispatch.normal_fallback_callback_helper",
      "dispatch/normal_fallback_callback_helper.py")
M_enf = _load("dispatch.anu_owned_callback_enforcement",
              "dispatch/anu_owned_callback_enforcement.py")
M = _load("dispatch.anu_result_pickup_runner",
          "dispatch/anu_result_pickup_runner.py")

# ANU key: 모듈 상수에서만 확보 (literal 노출 0)
_ANU_KEY = M_enf.ANU_KEY
_DEV_KEY = "7943afbe12c12f7d"  # executor self-key (ANU 아님)


# ─────────────────────────────────────────────────────────────────────────────
# 헬퍼 (test_anu_pickup_driver_2721 동일 패턴)
# ─────────────────────────────────────────────────────────────────────────────
VALID_PAYLOAD = {
    "task_id": "task-999",
    "completion_signal": "EXECUTOR_RESULT_WRITTEN",
    "collector_envelope": {"task_id": "task-999", "schedule_id": "sch-1"},
    "report_path": "r.md",
    "sha256": "abc",
}


def _make_dirs(tmp_path: Path) -> Path:
    (tmp_path / "memory" / "events").mkdir(parents=True, exist_ok=True)
    (tmp_path / "memory" / "state").mkdir(parents=True, exist_ok=True)
    return tmp_path


def _events_dir(root: Path) -> Path:
    return root / "memory" / "events"


def _write_result(root: Path, name: str = "task-999.result.json", payload=None) -> Path:
    p = _events_dir(root) / name
    if payload is None:
        payload = VALID_PAYLOAD
    p.write_text(json.dumps(payload), encoding="utf-8")
    return p


def _enable_activation(root: Path) -> None:
    flag = root / "memory" / "state" / "p0b_driver_enabled"
    flag.parent.mkdir(parents=True, exist_ok=True)
    flag.write_text(drv.ACTIVATION_ENABLED + "\n", encoding="utf-8")


def _age(path, seconds: float = 10.0) -> None:
    """mtime/atime 을 과거로 설정 — readiness aged 게이트 통과 + legacy mtime 모사."""
    past = _time.time() - seconds
    os.utime(str(path), (past, past))


_NO_SLEEP = lambda *a, **k: None  # noqa: E731


def make_pickup_mock(verdict: str = "WAKE_BUILT"):
    """pickup_fn mock — 호출 인자 calls 기록, 실 wake/cron 0."""
    calls: list = []

    def _pickup(*args, **kwargs):
        calls.append((args, kwargs))
        return types.SimpleNamespace(
            verdict=verdict,
            ok=(verdict == "WAKE_BUILT"),
            argv=["x"],
            task_id="task-999",
            reasons=[],
        )

    _pickup.calls = calls  # type: ignore[attr-defined]
    return _pickup


def make_verify_mock(verdict: str = "AUTHORITATIVE"):
    calls: list = []

    def _verify(*args, **kwargs):
        calls.append((args, kwargs))
        return types.SimpleNamespace(
            verdict=verdict,
            ok=(verdict == "AUTHORITATIVE"),
            classification="",
            reasons=[],
        )

    _verify.calls = calls  # type: ignore[attr-defined]
    return _verify


# runner D3 헬퍼 (2720 패턴)
def _sealed_key_loader():
    """ANU key fake 로더 — literal 노출 없이 모듈 상수 반환."""
    return _ANU_KEY


def _write_runner_result(result_dir: str, task_id: str) -> str:
    """runner 용 정상 result.json (collector_envelope 없음 → verify skip)."""
    payload = {"task_id": task_id, "summary": "done", "sha256": "deadbeef"}
    path = os.path.join(result_dir, f"{task_id}.result.json")
    with open(path, "w", encoding="utf-8") as fh:
        json.dump(payload, fh, ensure_ascii=False)
    return path


# ═════════════════════════════════════════════════════════════════════════════
# A. legacy cutoff (D2) — driver
# ═════════════════════════════════════════════════════════════════════════════
def test_legacy_skip_before_epoch(tmp_path):
    """activation 활성 + epoch=now, result mtime 이 epoch 보다 과거(10s 전) →
    NOOP_LEGACY_SKIP("pre_activation_epoch"). pickup/verify 미호출, 파일 잔류,
    quarantine/processed 디렉토리 미생성."""
    root = _make_dirs(tmp_path)
    _enable_activation(root)
    pickup = make_pickup_mock()
    verify = make_verify_mock()

    p = _write_result(root)
    _age(p, seconds=10)             # mtime = now - 10s
    epoch = _time.time()            # epoch = now → mtime < epoch

    rec = drv.process_one(
        str(p), root=root,
        pickup_fn=pickup, verify_fn=verify,
        legacy_cutoff=True, activation_epoch=epoch,
        sleep_fn=_NO_SLEEP,
    )
    assert rec.verdict == drv.VERDICT_NOOP_LEGACY_SKIP
    assert rec.quarantine_reason == "pre_activation_epoch"
    assert rec.quarantined is False
    assert len(pickup.calls) == 0
    assert len(verify.calls) == 0
    # 파일 잔류(이동 0)
    assert p.exists()
    # quarantine / processed 미생성
    assert not (root / "memory" / "p0b_state" / "quarantine").exists()
    assert not (root / "memory" / "p0b_state" / "processed").exists()


def test_post_epoch_proceeds_wake(tmp_path):
    """epoch 을 충분히 과거(now-100s)로 두고 파일은 aged(now-10s) → mtime >= epoch.
    legacy_cutoff=True 라도 정상 decision path 진입 → WAKE_BUILT, pickup 1회."""
    root = _make_dirs(tmp_path)
    _enable_activation(root)
    pickup = make_pickup_mock(verdict="WAKE_BUILT")
    verify = make_verify_mock(verdict="AUTHORITATIVE")

    p = _write_result(root)
    _age(p, seconds=10)                 # mtime = now - 10s
    epoch = _time.time() - 100          # epoch = now - 100s → mtime > epoch

    rec = drv.process_one(
        str(p), root=root,
        pickup_fn=pickup, verify_fn=verify,
        legacy_cutoff=True, activation_epoch=epoch,
        sleep_fn=_NO_SLEEP,
    )
    assert rec.verdict == drv.VERDICT_WAKE_BUILT
    assert len(pickup.calls) == 1


def test_epoch_absent_fail_closed(tmp_path):
    """legacy_cutoff=True + activation_epoch=None → NOOP_LEGACY_SKIP("epoch_absent").
    fail-open 금지 입증 — pickup 미호출."""
    root = _make_dirs(tmp_path)
    _enable_activation(root)
    pickup = make_pickup_mock()
    verify = make_verify_mock()

    p = _write_result(root)
    _age(p, seconds=10)

    rec = drv.process_one(
        str(p), root=root,
        pickup_fn=pickup, verify_fn=verify,
        legacy_cutoff=True, activation_epoch=None,
        sleep_fn=_NO_SLEEP,
    )
    assert rec.verdict == drv.VERDICT_NOOP_LEGACY_SKIP
    assert rec.quarantine_reason == "epoch_absent"
    assert len(pickup.calls) == 0
    assert len(verify.calls) == 0
    assert p.exists()


def test_read_activation_epoch_parse(tmp_path):
    """epoch 마커(memory/state/p0b_activation_epoch) str(float) → float 반환.
    부재 → None. invalid("abc") → None. epoch_reader 주입 → 파싱값."""
    root = _make_dirs(tmp_path)
    epoch_path = root / "memory" / "state" / "p0b_activation_epoch"

    # 정상 float
    val = 1234567.5
    epoch_path.write_text(str(val), encoding="utf-8")
    assert drv.read_activation_epoch(str(root)) == pytest.approx(val)

    # invalid → None
    epoch_path.write_text("abc", encoding="utf-8")
    assert drv.read_activation_epoch(str(root)) is None

    # 부재 → None
    epoch_path.unlink()
    assert drv.read_activation_epoch(str(root)) is None

    # epoch_reader 주입 → 파싱값
    assert drv.read_activation_epoch(str(root), epoch_reader=lambda: "123.5") == pytest.approx(123.5)


def test_legacy_cutoff_off_unaffected(tmp_path):
    """legacy_cutoff 기본 False(미지정) → 기존 동작(WAKE_BUILT) 유지. 회귀 무영향."""
    root = _make_dirs(tmp_path)
    _enable_activation(root)
    pickup = make_pickup_mock(verdict="WAKE_BUILT")
    verify = make_verify_mock(verdict="AUTHORITATIVE")

    p = _write_result(root)
    _age(p, seconds=10)

    rec = drv.process_one(
        str(p), root=root,
        pickup_fn=pickup, verify_fn=verify,
        sleep_fn=_NO_SLEEP,
    )
    assert rec.verdict == drv.VERDICT_WAKE_BUILT
    assert len(pickup.calls) == 1


# ═════════════════════════════════════════════════════════════════════════════
# B. MAX_FILES (D1) — driver scan_once
# ═════════════════════════════════════════════════════════════════════════════
def test_max_files_bounded(tmp_path):
    """result 60개 + max_files=50 → process_one 50건 처리 + defer record 정확히 1건
    (quarantine_reason=="max_files_defer:10"). legacy_cutoff 미지정 → 정상 경로."""
    root = _make_dirs(tmp_path)
    _enable_activation(root)
    pickup = make_pickup_mock(verdict="WAKE_BUILT")
    verify = make_verify_mock(verdict="AUTHORITATIVE")

    paths = []
    for i in range(60):
        payload = dict(VALID_PAYLOAD)
        payload["task_id"] = f"task-{i}"
        payload["collector_envelope"] = {"task_id": f"task-{i}", "schedule_id": "sch-1"}
        p = _write_result(root, name=f"task-{i}.result.json", payload=payload)
        _age(p, seconds=10)
        paths.append(str(p))

    records = drv.scan_once(
        root, paths=paths,
        max_files=50,
        pickup_fn=pickup, verify_fn=verify,
        write_evidence=False, sleep_fn=_NO_SLEEP,
    )

    defer = [r for r in records if r.verdict == drv.VERDICT_NOOP_MAX_FILES_DEFER]
    processed = [r for r in records if r.verdict != drv.VERDICT_NOOP_MAX_FILES_DEFER]
    assert len(defer) == 1
    assert defer[0].quarantine_reason == "max_files_defer:10"
    assert len(processed) == 50


def test_max_files_under_limit_no_defer(tmp_path):
    """파일 3개 + max_files=50 → defer record 0건."""
    root = _make_dirs(tmp_path)
    _enable_activation(root)
    pickup = make_pickup_mock(verdict="WAKE_BUILT")
    verify = make_verify_mock(verdict="AUTHORITATIVE")

    paths = []
    for i in range(3):
        payload = dict(VALID_PAYLOAD)
        payload["task_id"] = f"task-{i}"
        payload["collector_envelope"] = {"task_id": f"task-{i}", "schedule_id": "sch-1"}
        p = _write_result(root, name=f"task-{i}.result.json", payload=payload)
        _age(p, seconds=10)
        paths.append(str(p))

    records = drv.scan_once(
        root, paths=paths,
        max_files=50,
        pickup_fn=pickup, verify_fn=verify,
        write_evidence=False, sleep_fn=_NO_SLEEP,
    )
    defer = [r for r in records if r.verdict == drv.VERDICT_NOOP_MAX_FILES_DEFER]
    assert len(defer) == 0


# ═════════════════════════════════════════════════════════════════════════════
# C. ledger/marker failure (D3) — runner
# ═════════════════════════════════════════════════════════════════════════════
def test_ledger_write_failed_fail_closed(tmp_path, monkeypatch):
    """정상 WAKE_BUILT 직전까지 진행 + ledger write(Step8 os.makedirs) 만 실패시킴 →
    PICKUP_LEDGER_WRITE_FAILED, wake_built False, argv None, reasons 에 LEDGER_WRITE_FAILED."""
    rdir = tempfile.mkdtemp(prefix="t2729p7-led-", dir=str(tmp_path))
    ledger = os.path.join(str(tmp_path), "ledger_led.jsonl")
    path = _write_runner_result(rdir, "task-2729led")

    def boom(*a, **k):
        raise OSError("simulated ledger makedirs failure")

    monkeypatch.setattr(M.os, "makedirs", boom)

    res = M.pickup_once(
        path,
        executor_key=_DEV_KEY,
        sealed_key_loader=lambda: M_enf.ANU_KEY,   # 모듈 상수, literal 0
        ledger_path=ledger,
    )
    assert res.verdict == M.PICKUP_LEDGER_WRITE_FAILED
    assert res.wake_built is False
    assert res.argv is None
    assert any("LEDGER_WRITE_FAILED" in r for r in res.reasons)


def test_marker_write_failed_wake_kept(tmp_path, monkeypatch):
    """ledger 정상 + done marker(os.replace) 만 실패 → PICKUP_WAKE_BUILT, wake_built True,
    marker_path None, reasons 에 MARKER_WRITE_FAILED. ledger 기록됨 → 2회차 SKIP_DEDUPE."""
    rdir = tempfile.mkdtemp(prefix="t2729p7-mrk-", dir=str(tmp_path))
    ledger = os.path.join(str(tmp_path), "ledger_mrk.jsonl")
    path = _write_runner_result(rdir, "task-2729mrk")

    def boom_oserror(*a, **k):
        raise OSError("simulated marker os.replace failure")

    monkeypatch.setattr(M.os, "replace", boom_oserror)

    res = M.pickup_once(
        path,
        executor_key=_DEV_KEY,
        sealed_key_loader=lambda: M_enf.ANU_KEY,
        ledger_path=ledger,
    )
    assert res.verdict == M.PICKUP_WAKE_BUILT
    assert res.wake_built is True
    assert res.marker_path is None
    assert any("MARKER_WRITE_FAILED" in r for r in res.reasons)

    # ledger 기록됨 → duplicate wake 차단: 같은 result.json 2회차 → SKIP_DEDUPE
    monkeypatch.undo()  # os.replace 복구 (2회차는 marker 도 정상)
    res2 = M.pickup_once(
        path,
        executor_key=_DEV_KEY,
        sealed_key_loader=lambda: M_enf.ANU_KEY,
        ledger_path=ledger,
    )
    assert res2.verdict == M.PICKUP_SKIP_DEDUPE
    assert res2.wake_built is False


# ═════════════════════════════════════════════════════════════════════════════
# D. 126 legacy isolated sim (회장 verbatim #1,#2,#9)
# ═════════════════════════════════════════════════════════════════════════════
def test_126_legacy_isolated_sim_canonical_delta_zero(tmp_path):
    """isolated tmp root 에 126개 legacy result.json(terminal marker 0, 고유 task_id,
    모두 aged mtime). activation 활성, epoch=now → 모든 파일 mtime < epoch.
    scan_once(legacy_cutoff=True, max_files=200):
      - 모든 처리 record verdict == NOOP_LEGACY_SKIP (126/126)
      - pickup_fn/verify_fn 호출 0
      - canonical delta 0: events 126 파일 잔류, quarantine 미생성, processed 미생성
      - max_files=200 → defer record 없음 (순수 legacy skip)."""
    root = _make_dirs(tmp_path)
    _enable_activation(root)
    pickup = make_pickup_mock(verdict="WAKE_BUILT")
    verify = make_verify_mock(verdict="AUTHORITATIVE")

    N = 126
    for i in range(N):
        payload = dict(VALID_PAYLOAD)
        payload["task_id"] = f"task-legacy-{i}"
        payload["collector_envelope"] = {"task_id": f"task-legacy-{i}", "schedule_id": "sch-1"}
        p = _write_result(root, name=f"task-legacy-{i}.result.json", payload=payload)
        _age(p, seconds=30)   # mtime 과거

    epoch = _time.time()      # epoch = now → 모든 파일 mtime < epoch

    records = drv.scan_once(
        root,
        legacy_cutoff=True, activation_epoch=epoch,
        max_files=200,
        pickup_fn=pickup, verify_fn=verify,
        write_evidence=False, sleep_fn=_NO_SLEEP,
    )

    # defer record 없음
    defer = [r for r in records if r.verdict == drv.VERDICT_NOOP_MAX_FILES_DEFER]
    assert len(defer) == 0

    # 모든 처리 record == NOOP_LEGACY_SKIP (126/126)
    processed = [r for r in records if r.verdict != drv.VERDICT_NOOP_MAX_FILES_DEFER]
    assert len(processed) == N
    assert all(r.verdict == drv.VERDICT_NOOP_LEGACY_SKIP for r in processed)
    assert all(r.quarantine_reason == "pre_activation_epoch" for r in processed)

    # pickup/verify 호출 0
    assert len(pickup.calls) == 0
    assert len(verify.calls) == 0

    # canonical delta 0
    remaining = glob.glob(str(_events_dir(root) / "task-*.result.json"))
    assert len(remaining) == N
    assert not (root / "memory" / "p0b_state" / "quarantine").exists()
    assert not (root / "memory" / "p0b_state" / "processed").exists()


# ═════════════════════════════════════════════════════════════════════════════
# key literal 0 검증 (분할 조합으로만 — 완성 literal 미노출)
# ═════════════════════════════════════════════════════════════════════════════
def test_no_anu_key_literal_in_this_file():
    """이 테스트 파일 소스에 완성 ANU key literal 0건."""
    src = Path(__file__).read_text(encoding="utf-8")
    forbidden = "c119085" + "addb0f8b7"
    assert src.count(forbidden) == 0
