# -*- coding: utf-8 -*-
"""task-2720 P0-a callback pickup runner — regression (9).

pickup_once 의 모든 분기를 mock/fixture 로 검증.
네트워크 0, 전부 mock/fixture, tmpdir 사용.
ANU key literal 절대 노출 금지 — 모듈 상수 / sealed_key_loader fake 주입으로만.
"""
from __future__ import annotations

import importlib.util
import json
import os
import shutil
import sys
import tempfile
import unittest
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Optional

_ROOT = Path(__file__).resolve().parent.parent.parent
if str(_ROOT) not in sys.path:
    sys.path.insert(0, str(_ROOT))


def _load(modname: str, relpath: str):
    if modname in sys.modules:
        return sys.modules[modname]
    spec = importlib.util.spec_from_file_location(modname, _ROOT / relpath)
    assert spec is not None and spec.loader is not None
    mod = importlib.util.module_from_spec(spec)
    sys.modules[modname] = mod
    spec.loader.exec_module(mod)
    return mod


# 의존 모듈 실 로드 (2717 패턴 동일)
_load("dispatch.callback_owner_enforcer",
      "dispatch/callback_owner_enforcer.py")
_load("dispatch.normal_fallback_callback_helper",
      "dispatch/normal_fallback_callback_helper.py")
M_enf = _load("dispatch.anu_owned_callback_enforcement",
              "dispatch/anu_owned_callback_enforcement.py")
M = _load("dispatch.anu_result_pickup_runner",
          "dispatch/anu_result_pickup_runner.py")

# ANU key: 모듈 상수에서 가져옴 (literal 금지)
_ANU_KEY = M_enf.ANU_KEY
# executor self-key (dev self, ANU 아님)
_DEV_KEY = "7943afbe12c12f7d"

# 고정 시각
_NOW = datetime(2026, 5, 31, 3, 0, 0, tzinfo=timezone.utc)


def _clock():
    return _NOW


def _fresh_ts():
    return (_NOW - timedelta(minutes=5)).strftime("%Y-%m-%dT%H:%M:%SZ")


def _sealed_key_loader():
    """테스트용 ANU key fake 로더 — literal 노출 없이 모듈 상수 반환."""
    return _ANU_KEY


def _no_key_loader():
    """키 미설정 모사 — SEALED_KEY_MISSING 경로 테스트용."""
    return None


# ── fake cron-history probe (2717 make_probe 스타일) ─────────────────────────
def _make_probe(owned_schedules):
    owned = set(owned_schedules)

    def _probe(schedule_id: str) -> dict:
        if schedule_id in owned:
            return {
                "status": "ok",
                "id": schedule_id,
                "count": 1,
                "history": [{"status": "ok", "ts": "2026-05-31 03:00:00"}],
            }
        return {
            "status": "error",
            "message": f"schedule not found or access denied: {schedule_id}",
        }

    return _probe


def _write_result_json(result_dir: str, task_id: str, extra: Optional[dict] = None) -> str:
    """헬퍼: 정상 result.json 작성 후 경로 반환."""
    payload = {"task_id": task_id, "summary": "done", "sha256": "deadbeef"}
    if extra:
        payload.update(extra)
    path = os.path.join(result_dir, f"{task_id}.result.json")
    with open(path, "w", encoding="utf-8") as fh:
        json.dump(payload, fh, ensure_ascii=False)
    return path


class TestPickupRunner2720(unittest.TestCase):

    def setUp(self):
        self._tmp = tempfile.mkdtemp(prefix="task2720-")
        self.addCleanup(shutil.rmtree, self._tmp, ignore_errors=True)

    # ── 1. 정상 result.json → WAKE_BUILT ──────────────────────────────────────
    def test_fx_result_json_valid(self):
        """정상 result.json (collector_envelope 없음) → WAKE_BUILT, argv 비어있지 않음."""
        rdir = tempfile.mkdtemp(prefix="t2720-1-", dir=self._tmp)
        ledger = os.path.join(self._tmp, "ledger1.jsonl")
        path = _write_result_json(rdir, "task-2720")
        res = M.pickup_once(
            path,
            executor_key=_DEV_KEY,
            clock=_clock,
            sealed_key_loader=_sealed_key_loader,
            ledger_path=ledger,
        )
        self.assertEqual(res.verdict, M.PICKUP_WAKE_BUILT)
        self.assertTrue(res.ok)
        self.assertTrue(res.wake_built)
        self.assertIsNotNone(res.argv)
        self.assertGreater(len(res.argv), 0)
        self.assertEqual(res.task_id, "task-2720")
        # argv 에 ANU key 가 들어있는지 (--key 위치)
        self.assertIn("--key", res.argv)
        idx = res.argv.index("--key")
        self.assertEqual(res.argv[idx + 1], _ANU_KEY)
        # done marker 기록
        self.assertIsNotNone(res.marker_path)
        self.assertTrue(os.path.isfile(res.marker_path))

    # ── 2. self-collector key → QUARANTINE ────────────────────────────────────
    def test_fx_self_collector_key(self):
        """collector_envelope(self_key 소유 schedule_id) + access-denied → QUARANTINE."""
        rdir = tempfile.mkdtemp(prefix="t2720-2-", dir=self._tmp)
        ledger = os.path.join(self._tmp, "ledger2.jsonl")
        SELF_SID = "A372DD6B"
        ANU_SID = "ANUOWNED1"
        envelope = {
            "task_id": "task-2720",
            "schedule_id": SELF_SID,   # self-key 소유 (ANU 조회 거부됨)
            "collector_role": "ANU",   # 텍스트 주장 (불신)
            "self_key_used": False,
            "recorded_at": _fresh_ts(),
        }
        path = _write_result_json(rdir, "task-2720",
                                  extra={"collector_envelope": envelope})
        res = M.pickup_once(
            path,
            gh_probe=_make_probe({ANU_SID}),   # SELF_SID 는 access denied
            executor_key=_DEV_KEY,
            clock=_clock,
            sealed_key_loader=_sealed_key_loader,
            ledger_path=ledger,
        )
        self.assertEqual(res.verdict, M.PICKUP_QUARANTINE)
        self.assertFalse(res.wake_built)
        self.assertIsNone(res.argv)

    # ── 3. owner proof pending → PICKUP_PENDING ────────────────────────────────
    def test_fx_owner_proof_pending(self):
        """gh_probe 가 unknown status 반환 → PENDING_OWNER_PROOF, retryable, wake 0."""
        rdir = tempfile.mkdtemp(prefix="t2720-3-", dir=self._tmp)
        ledger = os.path.join(self._tmp, "ledger3.jsonl")
        SID = "WEIRDSCHEDULE"
        envelope = {
            "task_id": "task-2720",
            "schedule_id": SID,
            "recorded_at": _fresh_ts(),
        }
        path = _write_result_json(rdir, "task-2720",
                                  extra={"collector_envelope": envelope})

        def weird_probe(_sid):
            return {"status": "weird"}

        res = M.pickup_once(
            path,
            gh_probe=weird_probe,
            executor_key=_DEV_KEY,
            clock=_clock,
            sealed_key_loader=_sealed_key_loader,
            ledger_path=ledger,
        )
        self.assertEqual(res.verdict, M.PICKUP_PENDING)
        self.assertFalse(res.wake_built)
        self.assertIsNone(res.argv)

    # ── 4. 같은 result.json 2회 → 1회차 WAKE_BUILT, 2회차 SKIP_DEDUPE/TERMINAL ─
    def test_fx_duplicate_result_json(self):
        """같은 result.json 2회 pickup → 1회차 WAKE_BUILT, 2회차 SKIP (wake 1회만)."""
        rdir = tempfile.mkdtemp(prefix="t2720-4-", dir=self._tmp)
        ledger = os.path.join(self._tmp, "ledger4.jsonl")
        path = _write_result_json(rdir, "task-2720dup")
        # 1회차
        res1 = M.pickup_once(
            path,
            executor_key=_DEV_KEY,
            clock=_clock,
            sealed_key_loader=_sealed_key_loader,
            ledger_path=ledger,
        )
        self.assertEqual(res1.verdict, M.PICKUP_WAKE_BUILT)
        self.assertTrue(res1.wake_built)
        # 2회차: done marker 또는 dedupe hit
        res2 = M.pickup_once(
            path,
            executor_key=_DEV_KEY,
            clock=_clock,
            sealed_key_loader=_sealed_key_loader,
            ledger_path=ledger,
        )
        self.assertIn(res2.verdict,
                      (M.PICKUP_SKIP_DEDUPE, M.PICKUP_SKIP_TERMINAL))
        self.assertFalse(res2.wake_built)
        self.assertIsNone(res2.argv)

    # ── 5. terminal marker 미리 생성 → SKIP_TERMINAL ─────────────────────────
    def test_fx_terminal_marker_present(self):
        """<task>.pickup.done 미리 생성 → SKIP_TERMINAL, wake 0."""
        rdir = tempfile.mkdtemp(prefix="t2720-5-", dir=self._tmp)
        ledger = os.path.join(self._tmp, "ledger5.jsonl")
        path = _write_result_json(rdir, "task-2720mrk")
        # done marker 미리 생성
        done = os.path.join(rdir, "task-2720mrk.pickup.done")
        with open(done, "w") as fh:
            fh.write("{}")
        res = M.pickup_once(
            path,
            executor_key=_DEV_KEY,
            clock=_clock,
            sealed_key_loader=_sealed_key_loader,
            ledger_path=ledger,
        )
        self.assertEqual(res.verdict, M.PICKUP_SKIP_TERMINAL)
        self.assertFalse(res.wake_built)

    # ── 6. invalid task_id → REJECT ──────────────────────────────────────────
    def test_fx_invalid_task_id(self):
        """result.json task_id 빈값/공백 → REJECT."""
        rdir = tempfile.mkdtemp(prefix="t2720-6-", dir=self._tmp)
        ledger = os.path.join(self._tmp, "ledger6.jsonl")
        # task_id 빈값
        path = os.path.join(rdir, "empty.result.json")
        with open(path, "w", encoding="utf-8") as fh:
            json.dump({"task_id": "   ", "summary": "x"}, fh)
        res = M.pickup_once(
            path,
            executor_key=_DEV_KEY,
            clock=_clock,
            sealed_key_loader=_sealed_key_loader,
            ledger_path=ledger,
        )
        self.assertEqual(res.verdict, M.PICKUP_REJECT)
        self.assertFalse(res.wake_built)

        # task_id 완전 부재
        path2 = os.path.join(rdir, "notaskid.result.json")
        with open(path2, "w", encoding="utf-8") as fh:
            json.dump({"summary": "y"}, fh)
        res2 = M.pickup_once(
            path2,
            executor_key=_DEV_KEY,
            clock=_clock,
            sealed_key_loader=_sealed_key_loader,
            ledger_path=ledger,
        )
        self.assertEqual(res2.verdict, M.PICKUP_REJECT)

    # ── 7. finish-task.sh 정적 검사 ──────────────────────────────────────────
    def test_fx_finish_task_emits_result_json_only(self):
        """finish-task.sh 치환 후:
        - T2626_ANU_KEY literal 0건
        - callback-launch.json 문자열 0건
        - result.json 작성 경로 존재 확인
        """
        finish_sh = _ROOT / "scripts" / "finish-task.sh"
        self.assertTrue(finish_sh.is_file(),
                        "scripts/finish-task.sh 가 존재해야 함.")
        content = finish_sh.read_text("utf-8")

        # ANU key literal 노출 0건
        anu_literal_count = content.count("T2626_ANU_KEY")
        self.assertEqual(
            anu_literal_count, 0,
            f"finish-task.sh 에 T2626_ANU_KEY 가 {anu_literal_count}건 남아 있음 "
            "— task-2720 치환 후 0건 이어야 함.",
        )

        # callback-launch.json 0건
        cbl_count = content.count("callback-launch.json")
        self.assertEqual(
            cbl_count, 0,
            f"finish-task.sh 에 callback-launch.json 이 {cbl_count}건 남아 있음 "
            "— task-2720 치환 후 0건 이어야 함.",
        )

        # result.json 작성 경로 / anu_owned_callback_enforcement 존재 확인
        has_result_path = (
            "result.json" in content
            or "anu_owned_callback_enforcement" in content
            or "executor-write-result" in content
        )
        self.assertTrue(
            has_result_path,
            "finish-task.sh 에 result.json 작성 경로 (anu_owned_callback_enforcement "
            "또는 executor-write-result 또는 .result.json) 가 없음.",
        )

    # ── 8. ANU key sealed only — executor key 미포함 검증 ────────────────────
    def test_fx_anu_key_sealed_only(self):
        """wake argv 에 executor self-key 미포함 + sealed_key_loader 경유만.
        sealed_key_loader=None + env 미설정 → SEALED_KEY_MISSING."""
        rdir = tempfile.mkdtemp(prefix="t2720-8-", dir=self._tmp)
        ledger = os.path.join(self._tmp, "ledger8.jsonl")
        path = _write_result_json(rdir, "task-2720seal")
        res = M.pickup_once(
            path,
            executor_key=_DEV_KEY,
            clock=_clock,
            sealed_key_loader=_sealed_key_loader,
            ledger_path=ledger,
        )
        self.assertEqual(res.verdict, M.PICKUP_WAKE_BUILT)
        self.assertIsNotNone(res.argv)
        argv_str = " ".join(res.argv)
        # argv 에 executor self-key 문자열 없어야 함
        self.assertNotIn(_DEV_KEY, argv_str,
                         "argv 에 executor self-key 가 포함되면 안 됨.")
        # ANU key 는 포함(--key 위치)
        self.assertIn("--key", res.argv)
        idx = res.argv.index("--key")
        self.assertEqual(res.argv[idx + 1], _ANU_KEY)

        # sealed_key_loader=None + env 미설정 → SEALED_KEY_MISSING
        orig = os.environ.pop(M.ENV_ANU_KEY, None)
        try:
            rdir2 = tempfile.mkdtemp(prefix="t2720-8b-", dir=self._tmp)
            ledger2 = os.path.join(self._tmp, "ledger8b.jsonl")
            path2 = _write_result_json(rdir2, "task-2720seal2")
            res2 = M.pickup_once(
                path2,
                executor_key=_DEV_KEY,
                clock=_clock,
                sealed_key_loader=_no_key_loader,  # 빈 키 반환
                ledger_path=ledger2,
            )
            self.assertEqual(res2.verdict, M.PICKUP_SEALED_KEY_MISSING)
            self.assertFalse(res2.wake_built)
        finally:
            if orig is not None:
                os.environ[M.ENV_ANU_KEY] = orig

    # ── 9. lock 파일이 존재해도 pickup 차단 안 됨 (파일락 완전 제거 검증) ─────────
    def test_fx_no_lock_file_created(self):
        """<task>.pickup.lock 파일이 미리 존재해도 pickup 을 차단하지 않음.
        (SKIP_LOCK 상수 제거 — 파일락 완전 제거) → 정상 WAKE_BUILT 반환.
        pickup 완료 후 *.pickup.lock 파일이 생성되지 않았음을 fs 검증."""
        import glob as _glob

        # (1) lock 파일이 전혀 없는 fresh dir → pickup 후에도 *.pickup.lock 미생성 검증
        rdir = tempfile.mkdtemp(prefix="t2720-9-", dir=self._tmp)
        ledger = os.path.join(self._tmp, "ledger9.jsonl")
        path = _write_result_json(rdir, "task-2720lck")
        res = M.pickup_once(
            path,
            executor_key=_DEV_KEY,
            clock=_clock,
            sealed_key_loader=_sealed_key_loader,
            ledger_path=ledger,
        )
        self.assertEqual(res.verdict, M.PICKUP_WAKE_BUILT)
        self.assertTrue(res.wake_built)
        # 핵심 fs 검증: pickup 은 lock 파일을 생성하지 않는다 (파일락 완전 제거).
        new_locks = _glob.glob(os.path.join(rdir, "*.pickup.lock"))
        self.assertEqual(
            new_locks, [],
            f"pickup 이 *.pickup.lock 파일을 생성함 (파일락 미제거): {new_locks}",
        )

        # (2) lock 파일이 미리 존재해도 pickup 을 차단하지 않음 (SKIP_LOCK 없음)
        rdir2 = tempfile.mkdtemp(prefix="t2720-9b-", dir=self._tmp)
        ledger2 = os.path.join(self._tmp, "ledger9-2.jsonl")
        path2 = _write_result_json(rdir2, "task-2720stale")
        stale_lock = os.path.join(rdir2, "task-2720stale.pickup.lock")
        with open(stale_lock, "w") as fh:
            fh.write("stale-lock")
        res2 = M.pickup_once(
            path2,
            executor_key=_DEV_KEY,
            clock=_clock,
            sealed_key_loader=_sealed_key_loader,
            ledger_path=ledger2,
        )
        self.assertEqual(res2.verdict, M.PICKUP_WAKE_BUILT,
                         "lock 파일이 있어도 pickup 이 차단되면 안 됨 (파일락 제거 회귀).")
        self.assertTrue(res2.wake_built)
        self.assertIsNotNone(res2.argv)

        # SKIP_LOCK 상수 부재 확인
        self.assertFalse(
            hasattr(M, "PICKUP_SKIP_LOCK"),
            "PICKUP_SKIP_LOCK 상수가 모듈에 존재 — 파일락 완전 제거 실패.",
        )

    # ── 9b. 파일락 완전 제거 입증 — lock 심볼 코드 라인 0건 ─────────────────────
    def test_fx_no_lock_symbols_at_all(self):
        """파일락 완전 제거(task-2720 3차 fix) 입증:
        모듈 코드(비-주석) 라인에 lock 관련 심볼이 0건임을 assert."""
        import inspect

        # 모듈 레벨 속성 부재 확인
        self.assertFalse(
            hasattr(M, "PICKUP_SKIP_LOCK"),
            "PICKUP_SKIP_LOCK 상수가 모듈에 존재 — 파일락 완전 제거 실패.",
        )
        self.assertFalse(
            hasattr(M, "_STALE_LOCK_SECONDS"),
            "_STALE_LOCK_SECONDS 가 모듈에 존재 — stale 회수 로직 미제거.",
        )

        src = inspect.getsource(M)
        # 코드(비-주석) 라인: strip() 후 '#' 로 시작하지 않는 라인
        code_lines = [
            line for line in src.splitlines()
            if not line.strip().startswith("#")
        ]

        # 코드 라인에서 lock 심볼 0건 검증
        lock_symbols = [
            "pickup.lock",
            "_lock_acquired",
            "_STALE_LOCK",
            "O_EXCL",
            "O_CREAT",
            "lock_path",
            "SKIP_LOCK",
            "lock-steal",
            "steal",
        ]
        for sym in lock_symbols:
            matching = [ln for ln in code_lines if sym in ln]
            self.assertEqual(
                matching, [],
                f"lock 심볼 '{sym}' 이 코드(비-주석) 라인에 존재: {matching}",
            )

    # ── 9c. 동시 pickup 2회 → wake 1회만 (dedupe/done-marker 보장) ────────────
    def test_fx_concurrent_pickup_dedupe_wake_once(self):
        """동시 pickup 2회 호출(serial 모사) → 1회차만 WAKE_BUILT, 2회차는
        dedupe/done-marker 로 no-op. lock 단순화 후에도 wake 중복 0 보장."""
        rdir = tempfile.mkdtemp(prefix="t2720-9c-", dir=self._tmp)
        ledger = os.path.join(self._tmp, "ledger9c.jsonl")
        path = _write_result_json(rdir, "task-2720con")

        # 1회차
        res1 = M.pickup_once(
            path,
            executor_key=_DEV_KEY,
            clock=_clock,
            sealed_key_loader=_sealed_key_loader,
            ledger_path=ledger,
        )
        self.assertEqual(res1.verdict, M.PICKUP_WAKE_BUILT)
        self.assertTrue(res1.wake_built)

        # 2회차 (동시 호출 serial 모사): done marker or dedupe → no-op
        res2 = M.pickup_once(
            path,
            executor_key=_DEV_KEY,
            clock=_clock,
            sealed_key_loader=_sealed_key_loader,
            ledger_path=ledger,
        )
        self.assertIn(res2.verdict,
                      (M.PICKUP_SKIP_DEDUPE, M.PICKUP_SKIP_TERMINAL),
                      "2회차 pickup 이 WAKE_BUILT — 중복 wake 발생 (dedupe/done-marker 결함)")
        self.assertFalse(res2.wake_built)
        self.assertIsNone(res2.argv)

    # ── 9d. terminal done-marker 존재 → no-op ─────────────────────────────────
    def test_fx_done_marker_noop(self):
        """terminal done-marker(<task>.pickup.done) 존재 시 → SKIP_TERMINAL no-op.
        lock 단순화 후에도 terminal marker 우선 처리 회귀 없음."""
        rdir = tempfile.mkdtemp(prefix="t2720-9d-", dir=self._tmp)
        ledger = os.path.join(self._tmp, "ledger9d.jsonl")
        path = _write_result_json(rdir, "task-2720dmrk")
        done = os.path.join(rdir, "task-2720dmrk.pickup.done")
        with open(done, "w", encoding="utf-8") as fh:
            import json as _json
            _json.dump({"event": "PICKUP_WAKE_BUILT", "task_id": "task-2720dmrk"}, fh)

        res = M.pickup_once(
            path,
            executor_key=_DEV_KEY,
            clock=_clock,
            sealed_key_loader=_sealed_key_loader,
            ledger_path=ledger,
        )
        self.assertEqual(res.verdict, M.PICKUP_SKIP_TERMINAL,
                         "done-marker 존재 시 SKIP_TERMINAL 이어야 함 (no-op)")
        self.assertFalse(res.wake_built)
        self.assertIsNone(res.argv)


if __name__ == "__main__":
    unittest.main()
