# -*- coding: utf-8 -*-
"""task-2717 ANU_OWNED_CALLBACK_ENFORCEMENT — 필수 regression (6).

회장 verbatim: "문서/메모리 박제로 끝내지 말고 callback 생성 권한을 executor
에게 주지 않는 실행 구조로 바꾼다." — 본 테스트는 실행 코드(runner/helper +
owner-key authority + quarantine path)를 직접 호출하여 입증한다. mock-only 금지:
실 cron-history 응답 형태(원시 cokacdir json)를 fake probe 로 주입하고, 실 spoof
envelope(텍스트 ANU, 실제 self-key)를 입력 → 자동 격리 출력을 실행으로 증명한다.

regression
  1. dev self-key spoof envelope (텍스트 ANU, 실제 self-key) → QUARANTINE
  2. access denied schedule (ANU key 조회 거부) → 비-ANU 판정
  3. stale envelope 재사용 → reject
  4. ANU-owned 정상 callback → PASS (authoritative)
  5. result JSON pickup 정상 경로 → ANU runner 가 발사 request 생성
  6. self-collector quarantine 경로 → 격리 + 비차단(작업물 보존) + 비권위
"""
from __future__ import annotations

import importlib.util
import inspect
import json
import os
import shutil
import sys
import tempfile
import unittest
from datetime import datetime, timedelta, timezone
from pathlib import Path
from unittest import mock

_ROOT = Path(__file__).resolve().parent.parent.parent
if str(_ROOT) not in sys.path:
    sys.path.insert(0, str(_ROOT))


def _load(modname: str, relpath: str):
    if modname in sys.modules:
        return sys.modules[modname]
    spec = importlib.util.spec_from_file_location(modname, _ROOT / relpath)
    assert spec is not None and spec.loader is not None
    mod = importlib.util.module_from_spec(spec)
    sys.modules[modname] = mod
    spec.loader.exec_module(mod)
    return mod


# 실 runtime entrypoint 직접 호출 (재사용 의존 enforcer/helper 도 실모듈 로드).
_load("dispatch.callback_owner_enforcer",
      "dispatch/callback_owner_enforcer.py")
_load("dispatch.normal_fallback_callback_helper",
      "dispatch/normal_fallback_callback_helper.py")
M = _load("dispatch.anu_owned_callback_enforcement",
          "dispatch/anu_owned_callback_enforcement.py")

ANU_KEY = M.ANU_KEY                       # 독립 ANU key (import 상수 — 실 literal 비노출)
DEV4_SELF_KEY = "7943afbe12c12f7d"        # incident: dev4 self-collector key
ANU_SCHEDULE = "ANUOWNED1"                # ANU 가 소유한 schedule (조회 성공)
SELF_SCHEDULE = "A372DD6B"                # incident: self-key 가 소유한 schedule
# task-2717+1 (A): 실 ANU key literal 비노출 — spoof envelope 텍스트 주장에는
# placeholder 만 쓴다(어차피 기록만, 판정 미사용).
ANU_KEY_PLACEHOLDER = "<ANU_KEY_REDACTED>"


# ── fake cron-history probe (실 cokacdir json 형태 그대로 재현) ──────────────
def make_probe(owned_schedules):
    """owned_schedules: ANU key 로 조회 성공하는 sid 집합.

    cokacdir --cron-history 의 실제 응답을 재현한다:
      * ANU 소유      → {"status":"ok","id":sid,"count":1,"history":[...]}
      * 비-ANU/부재   → {"status":"error","message":"schedule not found or
                          access denied: <sid>"}  (실측 메시지)
    """
    owned = set(owned_schedules)

    def _probe(schedule_id: str) -> dict:
        if schedule_id in owned:
            return {
                "status": "ok",
                "id": schedule_id,
                "count": 1,
                "history": [{"status": "ok", "ts": "2026-05-31 01:10:00"}],
            }
        return {
            "status": "error",
            "message": f"schedule not found or access denied: {schedule_id}",
        }

    return _probe


def _now():
    return datetime(2026, 5, 31, 2, 0, 0, tzinfo=timezone.utc)


def _fresh_ts():
    return (_now() - timedelta(minutes=5)).strftime("%Y-%m-%dT%H:%M:%SZ")


class Regression2717(unittest.TestCase):

    def setUp(self):
        # quarantine artifact 는 격리 tmp dir 에만 — 실 운영 evidence 비접촉.
        self._tmp = tempfile.mkdtemp(prefix="task2717-q-")
        # tmpdir 누수 방지 — 테스트 종료 시 자동 정리(ignore_errors).
        self.addCleanup(shutil.rmtree, self._tmp, ignore_errors=True)

    # ── regression 1 — dev self-key spoof envelope → QUARANTINE ──────────────
    def test_1_dev_self_key_spoof_envelope_quarantine(self):
        """★ 실 spoof envelope 입력 → 자동격리 출력 증명.

        envelope 텍스트는 incident 처럼 ANU 를 주장(self_key_used=false,
        collector_role=ANU, owner_key=ANU). 그러나 실제 schedule(SELF_SCHEDULE)
        은 ANU key 로 조회 거부 → 텍스트 무시하고 SELF_COLLECTOR_QUARANTINE.
        """
        spoof_envelope = {
            "task_id": "task-2717",
            "schedule_id": SELF_SCHEDULE,
            "self_key_used": False,               # 거짓 주장
            "collector_role": "ANU",              # 거짓 주장
            "owner_key": ANU_KEY,                 # 거짓 주장 (텍스트만)
            "recorded_at": _fresh_ts(),
            "note": f"ANU key {ANU_KEY_PLACEHOLDER} 사용",  # 텍스트 주장(기록만)
        }
        res = M.verify_collector_authoritative(
            task_id="task-2717",
            envelope=spoof_envelope,
            probe=make_probe({ANU_SCHEDULE}),     # SELF_SCHEDULE 은 비-소유
            executor_key=DEV4_SELF_KEY,
            now=_now(),
            quarantine_dir=self._tmp,
        )
        # ★ C 강화: 반드시 SELF_COLLECTOR_QUARANTINE · usable_for_auto_merge=False
        #   · work_preserved=True 3종을 동시에 고정.
        self.assertEqual(res.verdict, M.VERDICT_QUARANTINED)
        self.assertEqual(res.classification, M.CLS_SELF_COLLECTOR_QUARANTINE)
        self.assertFalse(res.usable_for_auto_merge)
        self.assertTrue(res.work_preserved)
        # ★ envelope claim 은 '기록만, 판정 미사용' — claim 은 ANU 를 주장하지만
        #   판정의 근거는 actual owner proof(access denied) 다.
        self.assertEqual(res.envelope_claims["claimed_collector_role"], "ANU")
        self.assertEqual(res.envelope_claims["claimed_self_key_used"], False)
        self.assertEqual(res.envelope_claims["claimed_owner_key"], ANU_KEY)
        self.assertEqual(res.owner_resolution["outcome"], M.OWNER_NOT_ANU)
        self.assertFalse(res.owner_resolution["owner_is_anu"])
        self.assertIn("access denied",
                      res.owner_resolution["raw_message"].lower())
        # 자동 격리 증거 artifact 가 실제로 기록됨.
        self.assertIsNotNone(res.quarantine_record_path)
        self.assertTrue(Path(res.quarantine_record_path).is_file())
        rec = json.loads(Path(res.quarantine_record_path).read_text("utf-8"))
        self.assertEqual(rec["record_type"], "SELF_COLLECTOR_QUARANTINE")
        self.assertFalse(rec["usable_for_auto_merge"])
        self.assertTrue(rec["work_preserved"])

    # ── regression 2 — access denied schedule → 비-ANU 판정 ──────────────────
    def test_2_access_denied_schedule_non_anu(self):
        res = M.resolve_authoritative_owner(
            SELF_SCHEDULE, probe=make_probe(set()),  # 아무것도 소유 안 함
        )
        self.assertFalse(res.owner_is_anu)
        self.assertEqual(res.outcome, M.OWNER_NOT_ANU)
        self.assertTrue(res.query_ok)  # access-denied 는 결정적 신호
        self.assertIn("access denied", res.raw_message.lower())

    # ── regression 3 — stale envelope 재사용 → reject ────────────────────────
    def test_3_stale_envelope_reject(self):
        # (a) 너무 오래된 envelope → STALE_ENVELOPE_REJECTED
        stale = {
            "task_id": "task-2717",
            "schedule_id": ANU_SCHEDULE,
            "owner_key": ANU_KEY,
            "recorded_at": (_now() - timedelta(hours=48)).strftime(
                "%Y-%m-%dT%H:%M:%SZ"
            ),
        }
        res = M.verify_collector_authoritative(
            task_id="task-2717", envelope=stale,
            probe=make_probe({ANU_SCHEDULE}), now=_now(),
            quarantine_dir=self._tmp,
        )
        self.assertEqual(res.verdict, M.VERDICT_REJECTED)
        self.assertEqual(res.classification, M.CLS_STALE_ENVELOPE_REJECTED)
        self.assertFalse(res.usable_for_auto_merge)

        # (b) 다른 task 의 receipt 재사용 (task_id mismatch) → reject
        foreign = {
            "task_id": "task-9999",
            "schedule_id": ANU_SCHEDULE,
            "owner_key": ANU_KEY,
            "recorded_at": _fresh_ts(),
        }
        res2 = M.verify_collector_authoritative(
            task_id="task-2717", envelope=foreign,
            probe=make_probe({ANU_SCHEDULE}), now=_now(),
            quarantine_dir=self._tmp,
        )
        self.assertEqual(res2.verdict, M.VERDICT_REJECTED)
        self.assertEqual(res2.classification, M.CLS_ENVELOPE_BINDING_MISMATCH)

    # ── regression 4 — ANU-owned 정상 callback → PASS (authoritative) ────────
    def test_4_anu_owned_authoritative_pass(self):
        envelope = {
            "task_id": "task-2717",
            "schedule_id": ANU_SCHEDULE,
            "owner_key": ANU_KEY,
            "collector_role": "ANU",
            "recorded_at": _fresh_ts(),
        }
        res = M.verify_collector_authoritative(
            task_id="task-2717", envelope=envelope,
            probe=make_probe({ANU_SCHEDULE}),  # ANU 가 실제 소유
            executor_key=DEV4_SELF_KEY, now=_now(),
            quarantine_dir=self._tmp,
        )
        self.assertEqual(res.verdict, M.VERDICT_AUTHORITATIVE)
        self.assertEqual(res.classification, M.CLS_ANU_OWNED_AUTHORITATIVE)
        self.assertTrue(res.ok)
        self.assertTrue(res.usable_for_auto_merge)  # 권위 → 자동 판단 사용 가능
        self.assertIsNone(res.quarantine_record_path)  # 격리 아님

    # ── regression 5 — result JSON pickup 정상 → ANU runner 발사 ─────────────
    def test_5_result_json_pickup_anu_runner_fires(self):
        import tempfile
        rdir = tempfile.mkdtemp(prefix="task2717-r-")
        self.addCleanup(shutil.rmtree, rdir, ignore_errors=True)
        # (executor) 완료 = result JSON 작성 only — schedule 생성/발사 0.
        ew = M.executor_write_result_json(
            task_id="task-2717",
            payload={"report_path": "memory/reports/task-2717.md",
                     "sha256": "deadbeef", "summary": "done"},
            result_dir=rdir, now=_now(),
        )
        self.assertTrue(ew.ok)
        self.assertFalse(ew.schedule_created)
        self.assertFalse(ew.callback_fired)
        self.assertTrue(Path(ew.result_json_path).is_file())
        written = json.loads(Path(ew.result_json_path).read_text("utf-8"))
        self.assertEqual(written["completion_signal"], "RESULT_JSON_WRITTEN")
        self.assertFalse(written["schedule_created_by_executor"])

        # (ANU runner) result JSON pickup → ANU-owned 발사 request(argv) 생성.
        rr = M.anu_runner_pickup_and_fire(
            result_json_path=ew.result_json_path,
            executor_key=DEV4_SELF_KEY,
            anu_key=ANU_KEY,
            chat_id="6937032012",
            at="30s",
        )
        self.assertTrue(rr.ok)
        self.assertEqual(rr.owner_key, ANU_KEY)
        self.assertIsNotNone(rr.argv)
        # argv 가 ANU key 로 --cron 등록하는 데이터인지 — owner != executor.
        self.assertIn("--key", rr.argv)
        self.assertEqual(rr.argv[rr.argv.index("--key") + 1], ANU_KEY)
        self.assertNotIn(DEV4_SELF_KEY, rr.argv)

    def test_5b_runner_refuses_non_anu_key(self):
        """runner 가 self-key/비-ANU key 로는 발사 request 를 만들지 않음."""
        import tempfile
        rdir = tempfile.mkdtemp(prefix="task2717-r2-")
        self.addCleanup(shutil.rmtree, rdir, ignore_errors=True)
        ew = M.executor_write_result_json(
            task_id="task-2717", payload={"summary": "x"},
            result_dir=rdir, now=_now(),
        )
        # anu_key 자리에 executor self-key 를 넣어도 fail-closed (argv=None).
        rr = M.anu_runner_pickup_and_fire(
            result_json_path=ew.result_json_path,
            executor_key=DEV4_SELF_KEY,
            anu_key=DEV4_SELF_KEY,  # ← 비-ANU / self key
        )
        self.assertFalse(rr.ok)
        self.assertIsNone(rr.argv)

    # ── regression 6 — self-collector quarantine 경로 (격리+비차단+비권위) ────
    def test_6_self_collector_quarantine_non_blocking(self):
        envelope = {
            "task_id": "task-2717",
            "schedule_id": SELF_SCHEDULE,
            "owner_key": ANU_KEY,            # 텍스트 주장 (불신)
            "collector_role": "ANU",
            "recorded_at": _fresh_ts(),
        }
        res = M.verify_collector_authoritative(
            task_id="task-2717", envelope=envelope,
            probe=make_probe({ANU_SCHEDULE}),  # SELF_SCHEDULE 비-소유
            executor_key=DEV4_SELF_KEY, now=_now(),
            quarantine_dir=self._tmp,
        )
        # 격리됨
        self.assertEqual(res.verdict, M.VERDICT_QUARANTINED)
        # 비권위 — 자동 push/merge 판단 사용 0
        self.assertFalse(res.usable_for_auto_merge)
        # 비차단 — 작업물 보존(파기 0)
        self.assertTrue(res.work_preserved)
        # 격리 증거는 보존됨
        self.assertTrue(Path(res.quarantine_record_path).is_file())

    # ── regression 7 (C 강화) — actual owner proof = self-key ⇒ QUARANTINE ────
    def test_7_envelope_claims_anu_but_actual_proof_self_key_quarantine(self):
        """★ task-2717+1 (C): envelope 이 collector_role=ANU / self_key_used=
        false / ANU owner 를 '주장' 하지만 actual owner proof 가 self-key(=
        access denied)인 케이스. 반드시 SELF_COLLECTOR_QUARANTINE ·
        usable_for_auto_merge=False · work_preserved=True. envelope claim 은
        기록만 되고 판정에는 쓰이지 않음을 명시한다.
        """
        envelope = {
            "task_id": "task-2717",
            "schedule_id": SELF_SCHEDULE,        # dev4 self-key 소유 schedule
            "collector_role": "ANU",             # 주장 (거짓)
            "self_key_used": False,              # 주장 (거짓)
            "owner_key": ANU_KEY,                # 주장 (거짓, 텍스트만)
            "recorded_at": _fresh_ts(),
            "note": f"ANU key {ANU_KEY_PLACEHOLDER} 사용",
        }
        # SELF_SCHEDULE 은 ANU key 로 조회 거부(access denied) = 실제 self 소유.
        res = M.verify_collector_authoritative(
            task_id="task-2717", envelope=envelope,
            probe=make_probe({ANU_SCHEDULE}), executor_key=DEV4_SELF_KEY,
            now=_now(), quarantine_dir=self._tmp,
        )
        # 3종 필수 출력 동시 고정.
        self.assertEqual(res.classification, M.CLS_SELF_COLLECTOR_QUARANTINE)
        self.assertFalse(res.usable_for_auto_merge)
        self.assertTrue(res.work_preserved)
        # claim 은 그대로 기록(불변)되어 있다 — 그러나 판정은 proof 가 결정.
        self.assertEqual(res.envelope_claims["claimed_collector_role"], "ANU")
        self.assertEqual(res.envelope_claims["claimed_self_key_used"], False)
        self.assertEqual(res.owner_resolution["outcome"], M.OWNER_NOT_ANU)

    # ── regression 8 (B) — NO_RESULT_JSON: executor 미작성 → 별도 상태 ────────
    def test_8_no_result_json_distinct_state(self):
        """runner pickup 대상 result JSON 이 없으면 NO_RESULT_JSON 분류(별도 상태).
        PENDING_OWNER_PROOF 와 명시적으로 다른 상태임을 고정한다."""
        rr = M.anu_runner_pickup_and_fire(
            result_json_path="/nonexistent/path/task-2717.result.json",
            executor_key=DEV4_SELF_KEY,
            anu_key=ANU_KEY,
        )
        self.assertFalse(rr.ok)
        self.assertIsNone(rr.argv)
        self.assertEqual(rr.classification, M.CLS_NO_RESULT_JSON)
        self.assertFalse(rr.pickup_ok)
        # NO_RESULT_JSON ≠ PENDING_OWNER_PROOF (서로 다른 상태).
        self.assertNotEqual(M.CLS_NO_RESULT_JSON, M.CLS_PENDING_OWNER_PROOF)

    # ── regression 9 (B) — PENDING_OWNER_PROOF: owner proof 미확정(재시도 가능) ─
    def test_9_pending_owner_proof_distinct_from_fail_closed(self):
        """owner proof 미확정(조회 보류/재시도 가능)은 PENDING_OWNER_PROOF —
        fail-closed(NON_AUTHORITATIVE)와 구분되는 별도 상태."""
        # (a) probe 예외 → resolve outcome=PENDING (query_ok=False, 재시도 가능).
        def raising_probe(_sid):
            raise RuntimeError("transient cron-history outage")

        res_resolve = M.resolve_authoritative_owner(
            ANU_SCHEDULE, probe=raising_probe
        )
        self.assertEqual(res_resolve.outcome, M.OWNER_PENDING)
        self.assertFalse(res_resolve.owner_is_anu)
        self.assertFalse(res_resolve.query_ok)

        # (b) verify 레벨 → verdict=PENDING_OWNER_PROOF, 자동 사용 0, 작업물 보존.
        envelope = {
            "task_id": "task-2717", "schedule_id": ANU_SCHEDULE,
            "owner_key": ANU_KEY, "recorded_at": _fresh_ts(),
        }
        res = M.verify_collector_authoritative(
            task_id="task-2717", envelope=envelope, probe=raising_probe,
            now=_now(), quarantine_dir=self._tmp,
        )
        self.assertEqual(res.verdict, M.VERDICT_PENDING_OWNER_PROOF)
        self.assertEqual(res.classification, M.CLS_PENDING_OWNER_PROOF)
        self.assertFalse(res.usable_for_auto_merge)
        self.assertTrue(res.work_preserved)
        self.assertIsNone(res.quarantine_record_path)  # 격리 아님(미확정일 뿐)

        # (c) 대비: 명시적 비-access-denied error → fail-closed NON_AUTHORITATIVE
        #     (PENDING 과 다른 상태임을 동시 고정).
        def hard_error_probe(_sid):
            return {"status": "error", "message": "internal server error 500"}

        res2 = M.verify_collector_authoritative(
            task_id="task-2717", envelope=envelope, probe=hard_error_probe,
            now=_now(), quarantine_dir=self._tmp,
        )
        self.assertEqual(res2.verdict, M.VERDICT_NON_AUTHORITATIVE)
        self.assertEqual(res2.classification, M.CLS_OWNER_QUERY_FAILED)
        self.assertNotEqual(res.verdict, res2.verdict)  # PENDING ≠ fail-closed

    # ── regression 10 — fire-path gating: 실 fire/closeout 차단(Phase 1) ──────
    def test_10_fire_path_gated_no_self_fire(self):
        """Phase 1: ANU-owned argv 는 build/dry-run 까지만. 실 fire/closeout 은
        NOT_ACTIVATED / PHASE2_REQUIRED 로 차단(자가발사 0)."""
        import tempfile
        rdir = tempfile.mkdtemp(prefix="task2717-fire-")
        self.addCleanup(shutil.rmtree, rdir, ignore_errors=True)
        ew = M.executor_write_result_json(
            task_id="task-2717", payload={"summary": "x"},
            result_dir=rdir, now=_now(),
        )
        rr = M.anu_runner_pickup_and_fire(
            result_json_path=ew.result_json_path,
            executor_key=DEV4_SELF_KEY, anu_key=ANU_KEY, at="30s",
        )
        self.assertTrue(rr.ok)
        # runner 자체가 dry-run/비활성 — 자가발사 안 함.
        self.assertTrue(rr.dry_run)
        self.assertEqual(rr.fire_path, M.FIRE_NOT_ACTIVATED)
        # 기본 gate → NOT_ACTIVATED, 절대 fire 안 됨.
        gate = M.fire_callback_request(rr)
        self.assertEqual(gate["status"], M.FIRE_NOT_ACTIVATED)
        self.assertFalse(gate["fired"])
        self.assertTrue(gate["dry_run"])
        self.assertIsNotNone(gate["argv"])  # argv 는 surface(데이터)
        # activate=True 라도 Phase 1 은 실 fire 0 → PHASE2_REQUIRED.
        gate2 = M.fire_callback_request(rr, activate=True)
        self.assertEqual(gate2["status"], M.FIRE_PHASE2_REQUIRED)
        self.assertFalse(gate2["fired"])

    # ── regression 11 — result JSON schema + 멱등 lock(atomic write) ──────────
    def test_11_result_json_schema_and_idempotent_write(self):
        """result JSON schema 필드 고정 + 같은 경로 멱등(atomic os.replace) write."""
        import tempfile
        rdir = tempfile.mkdtemp(prefix="task2717-schema-")
        self.addCleanup(shutil.rmtree, rdir, ignore_errors=True)
        ew1 = M.executor_write_result_json(
            task_id="task-2717",
            payload={"report_path": "memory/reports/task-2717.md",
                     "sha256": "deadbeef"},
            result_dir=rdir, now=_now(),
        )
        rec = json.loads(Path(ew1.result_json_path).read_text("utf-8"))
        # schema 필수 필드.
        for key in ("task_id", "completion_signal", "written_at",
                    "schedule_created_by_executor", "callback_fired_by_executor"):
            self.assertIn(key, rec)
        self.assertEqual(rec["completion_signal"], "RESULT_JSON_WRITTEN")
        self.assertFalse(rec["schedule_created_by_executor"])
        self.assertFalse(rec["callback_fired_by_executor"])
        # 멱등 lock: 재작성 시 같은 경로(중복 schedule/파일 생성 0).
        ew2 = M.executor_write_result_json(
            task_id="task-2717", payload={"summary": "again"},
            result_dir=rdir, now=_now(),
        )
        self.assertEqual(ew1.result_json_path, ew2.result_json_path)
        self.assertEqual(
            len(list(Path(rdir).glob("task-2717.result.json"))), 1
        )

    # ── regression 12 — 인프라 probe 실패 → OWNER_PENDING (false quarantine 0) ──
    def test_12_infra_probe_failure_pending_not_quarantine(self):
        """인프라 장애(probe 실행 실패)는 message 에 'not found' 가 섞여도 실
        cokacdir access-denied 가 아니다. distinct error_kind 신호 →
        OWNER_PENDING(재시도 가능) 매핑으로 OWNER_NOT_ANU(false quarantine) 오분류
        차단."""
        def infra_failed_probe(_sid):
            # __call__ 의 인프라 실패 반환 형태 재현: message 에 'not found' 가
            # 섞여 있어도 error_kind 가 우선 → PENDING(NOT access-denied).
            return {
                "status": "error",
                "error_kind": M.PROBE_EXEC_FAILED_KIND,
                "message": "probe exec failed: cokacdir: command not found",
            }
        res = M.resolve_authoritative_owner(
            SELF_SCHEDULE, probe=infra_failed_probe
        )
        self.assertEqual(res.outcome, M.OWNER_PENDING)
        self.assertFalse(res.owner_is_anu)
        self.assertFalse(res.query_ok)
        # 대비: error_kind 없는 실 access-denied 응답은 OWNER_NOT_ANU(결정적).
        res2 = M.resolve_authoritative_owner(
            SELF_SCHEDULE, probe=make_probe(set())
        )
        self.assertEqual(res2.outcome, M.OWNER_NOT_ANU)
        self.assertNotEqual(res.outcome, res2.outcome)

        # verify 레벨: 인프라 장애 → PENDING_OWNER_PROOF, 격리 artifact 미생성.
        envelope = {
            "task_id": "task-2717", "schedule_id": SELF_SCHEDULE,
            "owner_key": ANU_KEY, "recorded_at": _fresh_ts(),
        }
        vres = M.verify_collector_authoritative(
            task_id="task-2717", envelope=envelope, probe=infra_failed_probe,
            executor_key=DEV4_SELF_KEY, now=_now(), quarantine_dir=self._tmp,
        )
        self.assertEqual(vres.verdict, M.VERDICT_PENDING_OWNER_PROOF)
        self.assertFalse(vres.usable_for_auto_merge)
        self.assertTrue(vres.work_preserved)
        self.assertIsNone(vres.quarantine_record_path)  # 격리 아님(인프라 장애)

    def test_12b_real_probe_infra_failure_distinct_signal(self):
        """RealCokacdirCronHistoryProbe 가 바이너리 부재(인프라 실패)를
        message-비의존 distinct 신호(error_kind=probe_exec_failed)로 반환 →
        resolve 가 OWNER_PENDING 으로 매핑(실 cokacdir 미호출)."""
        probe = M.RealCokacdirCronHistoryProbe(anu_key="x", chat_id="0")
        probe.binary = "/nonexistent/cokacdir-binary-task2717"
        out = probe(ANU_SCHEDULE)
        self.assertEqual(out.get("status"), "error")
        self.assertEqual(out.get("error_kind"), M.PROBE_EXEC_FAILED_KIND)
        res = M.resolve_authoritative_owner(ANU_SCHEDULE, probe=probe)
        self.assertEqual(res.outcome, M.OWNER_PENDING)
        self.assertFalse(res.owner_is_anu)

    # ── regression 13 — 숫자형 epoch staleness + silent bypass 0 ──────────────
    def test_13_numeric_epoch_staleness_no_silent_bypass(self):
        """숫자형 epoch timestamp 처리(strptime TypeError 미발생) + 파싱 실패가
        staleness 를 silent bypass 하지 않음."""
        now = _now()
        # (a) fresh numeric epoch(int, 5분 전) → stale 아님 → owner resolution 진행.
        fresh_epoch = int((now - timedelta(minutes=5)).timestamp())
        env_fresh = {
            "task_id": "task-2717", "schedule_id": ANU_SCHEDULE,
            "owner_key": ANU_KEY, "recorded_at": fresh_epoch,
        }
        res = M.verify_collector_authoritative(
            task_id="task-2717", envelope=env_fresh,
            probe=make_probe({ANU_SCHEDULE}), now=now, quarantine_dir=self._tmp,
        )
        self.assertEqual(res.verdict, M.VERDICT_AUTHORITATIVE)  # stale reject 0

        # (b) old numeric epoch(float, 48h 전) → STALE_ENVELOPE_REJECTED.
        old_epoch = (now - timedelta(hours=48)).timestamp()  # float
        env_old = {
            "task_id": "task-2717", "schedule_id": ANU_SCHEDULE,
            "owner_key": ANU_KEY, "recorded_at": old_epoch,
        }
        res_old = M.verify_collector_authoritative(
            task_id="task-2717", envelope=env_old,
            probe=make_probe({ANU_SCHEDULE}), now=now, quarantine_dir=self._tmp,
        )
        self.assertEqual(res_old.verdict, M.VERDICT_REJECTED)
        self.assertEqual(res_old.classification, M.CLS_STALE_ENVELOPE_REJECTED)

        # (c) timestamp 값은 있으나 파싱 불가 → silent bypass 0 (stale reject).
        env_bad = {
            "task_id": "task-2717", "schedule_id": ANU_SCHEDULE,
            "owner_key": ANU_KEY, "recorded_at": "not-a-timestamp",
        }
        res_bad = M.verify_collector_authoritative(
            task_id="task-2717", envelope=env_bad,
            probe=make_probe({ANU_SCHEDULE}), now=now, quarantine_dir=self._tmp,
        )
        self.assertEqual(res_bad.verdict, M.VERDICT_REJECTED)
        self.assertEqual(res_bad.classification, M.CLS_STALE_ENVELOPE_REJECTED)

        # (d) 함수 단위: numeric epoch age 계산 정확 + TypeError 미발생.
        age = M._envelope_age_seconds({"recorded_at": fresh_epoch}, now=now)
        self.assertIsNotNone(age)
        self.assertAlmostEqual(age, 300, delta=2)
        # 값 없음 → None(미적용), 값 있고 파싱 불가 → sentinel(>임계).
        self.assertIsNone(M._envelope_age_seconds({}, now=now))
        self.assertGreater(
            M._envelope_age_seconds({"recorded_at": "garbage"}, now=now),
            M.DEFAULT_MAX_ENVELOPE_AGE_SECONDS,
        )

    # ── regression 14 — 파일명 sanitize: '/' 와 '\\' 양쪽 치환(cross-platform) ─
    def test_14_filename_sanitize_both_separators(self):
        """write_quarantine_record / executor_write_result_json 의 파일명
        sanitize 가 '/' 와 '\\' 양쪽을 '_' 로 치환(Linux 에서 '\\' 미치환 차단)."""
        # (a) write_quarantine_record — task_id/schedule_id 에 양쪽 separator.
        verification = M.CollectorVerification(
            schema=M.SCHEMA, verdict=M.VERDICT_QUARANTINED,
            classification=M.CLS_SELF_COLLECTOR_QUARANTINE,
            task_id="task\\2717/evil", schedule_id="A\\B/C",
            owner_resolution=None, envelope_claims={}, work_preserved=True,
        )
        path = M.write_quarantine_record(
            verification, quarantine_dir=self._tmp, now=_now(),
        )
        name = Path(path).name
        self.assertNotIn("\\", name)
        self.assertNotIn("/", name)
        self.assertTrue(Path(path).is_file())

        # (b) executor_write_result_json — task_id 에 '\\' 포함 → 파일명 미존재.
        rdir = tempfile.mkdtemp(prefix="task2717-san-")
        self.addCleanup(shutil.rmtree, rdir, ignore_errors=True)
        ew = M.executor_write_result_json(
            task_id="task\\2717/x", payload={"summary": "x"},
            result_dir=rdir, now=_now(),
        )
        fname = Path(ew.result_json_path).name
        self.assertNotIn("\\", fname)
        self.assertNotIn("/", fname)
        self.assertTrue(Path(ew.result_json_path).is_file())

    # ── regression 15 — setUp tmpdir cleanup 등록(누수 0) ─────────────────────
    def test_15_setup_tmpdir_cleanup_registered(self):
        """setUp 의 mkdtemp tmpdir 가 addCleanup(shutil.rmtree) 로 등록되어
        테스트 종료 시 자동 정리됨(tmpdir 누수 0)."""
        self.assertTrue(Path(self._tmp).is_dir())
        cleanups = getattr(self, "_cleanups", [])
        registered = any(
            func is shutil.rmtree and args and args[0] == self._tmp
            for func, args, _kwargs in cleanups
        )
        self.assertTrue(
            registered, "setUp tmpdir 가 addCleanup 으로 정리 등록되어야 함"
        )

    # ── regression 16 — Gemini MEDIUM 보정: 방어 가드(None/non-callable/non-dict) ─
    def test_16_defensive_guards_none_noncallable_nondict(self):
        """task-2717 Phase1 Gemini MEDIUM 보정 4건의 방어 동작 고정:
        (a) _sanitize_filename_component(None/빈/공백) → default (silent 'None' 0)
        (b) resolve_authoritative_owner(probe=비-callable) → OWNER_QUERY_FAILED
            (설정오류는 PENDING 재시도 아니라 fail-closed)
        (c) _parse_timestamp_value(None) → None (불필요 파싱 루프 0)
        (d) verify_collector_authoritative(envelope=non-dict) → REJECTED
            (AttributeError crash 0, 작업물 보존)."""
        # (a) filename component None/빈/공백 → default.
        self.assertEqual(M._sanitize_filename_component(None, default="d"), "d")
        self.assertEqual(M._sanitize_filename_component("   ", default="d"), "d")
        self.assertEqual(
            M._sanitize_filename_component("a/b\\c", default="d"), "a_b_c"
        )
        # (b) probe 비-callable(None) → 설정오류 fail-closed(QUERY_FAILED).
        r = M.resolve_authoritative_owner("ANUOWNED1", probe=None)
        self.assertEqual(r.outcome, M.OWNER_QUERY_FAILED)
        self.assertFalse(r.owner_is_anu)
        self.assertFalse(r.query_ok)
        # (c) None timestamp → None.
        self.assertIsNone(M._parse_timestamp_value(None))
        # (d) envelope 가 dict 아님 → crash 없이 REJECTED + 작업물 보존.
        res = M.verify_collector_authoritative(
            task_id="task-2717",
            envelope=None,
            probe=make_probe({ANU_SCHEDULE}),
            executor_key=DEV4_SELF_KEY,
            now=_now(),
            quarantine_dir=self._tmp,
        )
        self.assertEqual(res.verdict, M.VERDICT_REJECTED)
        self.assertEqual(res.classification, M.CLS_ENVELOPE_BINDING_MISMATCH)
        self.assertTrue(res.work_preserved)

    # ── regression 17 — result JSON task_id 부재 → runner fail-closed ──────────
    def test_17_runner_fail_closed_on_missing_task_id(self):
        """result JSON 에 유효한 task_id 가 없으면 runner 는 빈 task_id 로 잘못된
        callback argv 를 만들지 않고 fail-closed(FAIL, argv=None)."""
        rdir = tempfile.mkdtemp(prefix="task2717-noid-")
        self.addCleanup(shutil.rmtree, rdir, ignore_errors=True)
        rpath = Path(rdir) / "result.json"
        rpath.write_text(json.dumps({"summary": "x"}), encoding="utf-8")  # task_id 없음
        rr = M.anu_runner_pickup_and_fire(
            result_json_path=str(rpath),
            executor_key=DEV4_SELF_KEY,
            anu_key=ANU_KEY,
            chat_id="6937032012",
            at="30s",
        )
        self.assertFalse(rr.ok)
        self.assertIsNone(rr.argv)
        self.assertEqual(rr.verdict, "FAIL")


    # ── regression 18 (High #466) — timezone offset ISO 파싱 ────────────────────
    def test_18_timezone_offset_iso_parsing(self):
        """_parse_timestamp_value 가 timezone offset ISO("+09:00") 를 정상 파싱.
        stale 오인 0, age 계산 일관성 확인."""
        now = datetime(2026, 6, 1, 4, 0, 0, tzinfo=timezone.utc)

        # (a) "+09:00" offset ISO (KST = UTC+9) → tz-aware, 정상 age 계산.
        # "2026-06-01T13:00:00+09:00" = UTC 04:00:00 → age ≈ 0초(now 와 동일).
        kst_ts = "2026-06-01T13:00:00+09:00"
        dt = M._parse_timestamp_value(kst_ts)
        self.assertIsNotNone(dt, "timezone offset ISO 파싱 실패")
        self.assertIsNotNone(dt.tzinfo, "tz-aware 이어야 함")
        # UTC 로 정규화 후 now 와 비교
        dt_utc = dt.astimezone(timezone.utc)
        self.assertEqual(dt_utc.year, 2026)
        self.assertEqual(dt_utc.month, 6)
        self.assertEqual(dt_utc.day, 1)
        self.assertEqual(dt_utc.hour, 4)
        self.assertEqual(dt_utc.minute, 0)
        age = M._envelope_age_seconds({"recorded_at": kst_ts}, now=now)
        self.assertIsNotNone(age)
        # age ≈ 0초 (±5초 허용), stale 오인 0 (기본 임계 6h = 21600s 보다 훨씬 작음)
        self.assertAlmostEqual(age, 0, delta=5,
                               msg="timezone offset ISO age 계산이 stale 오인(age 과대)")
        self.assertLess(age, M.DEFAULT_MAX_ENVELOPE_AGE_SECONDS,
                        "fresh +09:00 timestamp 가 stale 로 잘못 판정됨")

        # (b) "Z" suffix (UTC) — 기존 경로 회귀 0.
        z_ts = "2026-06-01T03:55:00Z"  # 5분 전
        dt_z = M._parse_timestamp_value(z_ts)
        self.assertIsNotNone(dt_z, "Z suffix ISO 파싱 회귀")
        self.assertIsNotNone(dt_z.tzinfo, "Z suffix: tz-aware 이어야 함")
        age_z = M._envelope_age_seconds({"recorded_at": z_ts}, now=now)
        self.assertIsNotNone(age_z)
        self.assertAlmostEqual(age_z, 300, delta=2,
                               msg="Z suffix age 계산 회귀")

        # (c) 실 verify 경로: "+09:00" fresh envelope → reject 아님(AUTHORITATIVE).
        envelope = {
            "task_id": "task-2717",
            "schedule_id": ANU_SCHEDULE,
            "owner_key": ANU_KEY,
            "recorded_at": kst_ts,
        }
        res = M.verify_collector_authoritative(
            task_id="task-2717", envelope=envelope,
            probe=make_probe({ANU_SCHEDULE}), now=now,
            quarantine_dir=self._tmp,
        )
        self.assertEqual(res.verdict, M.VERDICT_AUTHORITATIVE,
                         f"timezone offset ISO fresh envelope 이 stale reject 됨: "
                         f"verdict={res.verdict} reasons={res.reasons}")

        # (d) naive datetime 은 UTC 로 간주 — tz-aware 비교 오류(TypeError) 없음.
        dt_naive_input = "2026-06-01T04:00:00"  # timezone 없는 naive ISO
        dt_naive = M._parse_timestamp_value(dt_naive_input)
        self.assertIsNotNone(dt_naive)
        self.assertIsNotNone(dt_naive.tzinfo,
                             "naive ISO → UTC tz-aware 정규화 실패")
        # age 비교가 TypeError 없이 정상 수행됨
        age_naive = M._envelope_age_seconds({"recorded_at": dt_naive_input}, now=now)
        self.assertIsNotNone(age_naive)
        self.assertAlmostEqual(age_naive, 0, delta=2)

        # (e) "-05:00" offset (UTC-5) → UTC 정규화 정확.
        # "2026-05-31T23:00:00-05:00" = UTC 2026-06-01T04:00:00 → age ≈ 0초.
        minus5_ts = "2026-05-31T23:00:00-05:00"
        dt_m5 = M._parse_timestamp_value(minus5_ts)
        self.assertIsNotNone(dt_m5)
        dt_m5_utc = dt_m5.astimezone(timezone.utc)
        self.assertEqual(dt_m5_utc.hour, 4)
        age_m5 = M._envelope_age_seconds({"recorded_at": minus5_ts}, now=now)
        self.assertAlmostEqual(age_m5, 0, delta=5)

    # ── regression 19 — fsync(file fd) spy ≥1 ───────────────────────────────
    def test_19_fsync_file_fd_called(self):
        """_atomic_write_json 내부에서 file fd 에 대한 os.fsync 가 최소 1회 호출됨.
        검증조건 2: fsync(file fd) spy ≥1."""
        with mock.patch.object(M.os, "fsync") as mock_fsync:
            M.executor_write_result_json(
                task_id="t-19",
                payload={"x": 1},
                result_dir=self._tmp,
                now=_now(),
            )
        self.assertGreaterEqual(
            mock_fsync.call_count, 1,
            "os.fsync 가 최소 1회 이상 호출되어야 합니다(file fd fsync).",
        )

    # ── regression 20 — fsync(parent dir fd) spy ≥1 ─────────────────────────
    def test_20_fsync_parent_dir_fd_called(self):
        """_atomic_write_json 에서 file fd fsync + parent dir fd fsync = 총 2회 이상.
        검증조건 3: fsync(parent dir fd) spy ≥1 (file + dir 합산)."""
        with mock.patch.object(M.os, "fsync") as mock_fsync, \
             mock.patch.object(M.os, "open", wraps=M.os.open) as mock_os_open:
            M.executor_write_result_json(
                task_id="t-20",
                payload={"y": 2},
                result_dir=self._tmp,
                now=_now(),
            )
        # file fd fsync(1회) + parent dir fd fsync(1회) = 총 2회 이상
        self.assertGreaterEqual(
            mock_fsync.call_count, 2,
            "os.fsync 가 file fd + parent dir fd 합산 2회 이상 호출되어야 합니다.",
        )
        # parent dir 를 O_RDONLY 로 open 한 호출이 1회 이상 있음 (dir fd 확인)
        dir_open_calls = [
            call for call in mock_os_open.call_args_list
            if call.args and call.args[0] == self._tmp
        ]
        self.assertGreaterEqual(
            len(dir_open_calls), 1,
            "os.open(result_dir, O_RDONLY) 로 parent dir fd 를 여는 호출이 "
            "최소 1회 있어야 합니다.",
        )

    # ── regression 21 — 크래시 시뮬 → final 파일 미오염(이전값 유지) ───────────
    def test_21_atomicity_crash_during_write_no_partial(self):
        """json.dump 크래시 시뮬레이션 → os.replace 미호출 → final 파일 이전값 유지.
        검증조건 4: crash 시 기존 final path 는 오염되지 않는다."""
        # 사전에 기존 result.json 에 이전값 기록
        final_path = os.path.join(self._tmp, "t-21.result.json")
        with open(final_path, "w", encoding="utf-8") as fh:
            json.dump({"prev": True}, fh, ensure_ascii=False)

        # json.dump 도중 크래시 시뮬
        with mock.patch.object(M.json, "dump", side_effect=RuntimeError("simulated crash")):
            with self.assertRaises(RuntimeError):
                M.executor_write_result_json(
                    task_id="t-21",
                    payload={"new": 1},
                    result_dir=self._tmp,
                    now=_now(),
                )

        # final 파일은 이전값 그대로 (os.replace 미호출 → 이전 내용 보존)
        self.assertTrue(
            Path(final_path).is_file(),
            "크래시 후에도 기존 final 파일이 존재해야 합니다.",
        )
        content = json.loads(Path(final_path).read_text("utf-8"))
        self.assertEqual(
            content, {"prev": True},
            "크래시 후 final 파일 내용이 이전값 그대로여야 합니다(부분 result 오염 0).",
        )
        # 크래시 시 생성 중이던 tmp 파일도 누수 없이 cleanup 되어야 한다.
        # (tmp 파일명에 uuid suffix 가 붙으므로 고정 경로 대신 디렉토리 .tmp 스캔으로 검증)
        leftover_tmps = list(Path(self._tmp).glob("*.tmp"))
        self.assertEqual(
            leftover_tmps, [],
            f"크래시 발생 시 생성 중이던 임시 파일(.tmp)이 삭제되어야 합니다: {leftover_tmps!r}",
        )

    # ── regression 22 — os.replace atomic rename 경로 보존 ──────────────────
    def test_22_os_replace_atomic_rename_preserved(self):
        """_atomic_write_json 이 os.replace(tmp, final) 를 정확히 1회 호출함.
        검증조건 5: atomic rename 경로가 보존됨을 spy 로 확인."""
        final_path = os.path.join(self._tmp, "t-22.result.json")

        with mock.patch.object(M.os, "replace", wraps=M.os.replace) as mock_replace:
            M.executor_write_result_json(
                task_id="t-22",
                payload={"z": 3},
                result_dir=self._tmp,
                now=_now(),
            )

        self.assertEqual(
            mock_replace.call_count, 1,
            "os.replace 가 정확히 1회 호출되어야 합니다.",
        )
        call_args = mock_replace.call_args
        # 첫 번째 인자(tmp)는 .tmp 로 끝남, 두 번째 인자(final)는 result.json 으로 끝남
        tmp_arg = call_args.args[0]
        final_arg = call_args.args[1]
        self.assertTrue(
            tmp_arg.endswith(".tmp"),
            f"os.replace 의 첫 인자(tmp 경로)가 .tmp 로 끝나야 합니다: {tmp_arg!r}",
        )
        self.assertEqual(
            final_arg, final_path,
            f"os.replace 의 두 번째 인자가 final path 와 같아야 합니다: "
            f"expected={final_path!r}, got={final_arg!r}",
        )

    # ── regression 23 — schema/signature 불변 ────────────────────────────────
    def test_23_schema_and_signature_invariant(self):
        """검증조건 6,7: result JSON schema 필드/값 불변 + 함수 시그니처 불변."""
        rdir = tempfile.mkdtemp(prefix="task2722-schema-")
        self.addCleanup(shutil.rmtree, rdir, ignore_errors=True)

        ew = M.executor_write_result_json(
            task_id="t-23",
            payload={"report_path": "memory/reports/t-23.md", "sha256": "abc123"},
            result_dir=rdir,
            now=_now(),
        )
        rec = json.loads(Path(ew.result_json_path).read_text("utf-8"))

        # 기존 schema 필드 전부 존재
        for key in ("task_id", "completion_signal", "written_at",
                    "schedule_created_by_executor", "callback_fired_by_executor"):
            self.assertIn(key, rec, f"schema 필드 {key!r} 가 result JSON 에 없습니다.")

        # 값 불변 검증
        self.assertEqual(rec["task_id"], "t-23")
        self.assertEqual(rec["completion_signal"], M.EXECUTOR_COMPLETION_SIGNAL)
        self.assertFalse(rec["schedule_created_by_executor"])
        self.assertFalse(rec["callback_fired_by_executor"])

        # executor_write_result_json 시그니처: task_id, payload, result_dir, now
        sig_ew = inspect.signature(M.executor_write_result_json)
        params_ew = list(sig_ew.parameters.keys())
        for expected_param in ("task_id", "payload", "result_dir", "now"):
            self.assertIn(
                expected_param, params_ew,
                f"executor_write_result_json 시그니처에 {expected_param!r} 가 없습니다.",
            )

        # write_quarantine_record 시그니처 불변 확인
        sig_wq = inspect.signature(M.write_quarantine_record)
        params_wq = list(sig_wq.parameters.keys())
        for expected_param in ("verification", "quarantine_dir", "now", "executor_key"):
            self.assertIn(
                expected_param, params_wq,
                f"write_quarantine_record 시그니처에 {expected_param!r} 가 없습니다.",
            )


    # ── regression 24 — tmp 파일명 충돌 방지(uuid suffix) ─────────────────────
    def test_24_tmp_filename_collision_prevention(self):
        """검증조건 2: 동일 target path 에 2회 write 시 tmp 경로가 서로 달라야 한다
        (uuid suffix 로 동일 프로세스 내 동시 write tmp 충돌 방지)."""
        captured_tmps = []

        real_replace = M.os.replace

        def _capture(src, dst):
            captured_tmps.append(src)
            return real_replace(src, dst)

        with mock.patch.object(M.os, "replace", side_effect=_capture):
            M.executor_write_result_json(
                task_id="t-24",
                payload={"n": 1},
                result_dir=self._tmp,
                now=_now(),
            )
            M.executor_write_result_json(
                task_id="t-24",
                payload={"n": 2},
                result_dir=self._tmp,
                now=_now(),
            )

        self.assertEqual(len(captured_tmps), 2, "os.replace 가 2회 호출되어야 합니다.")
        # 두 tmp 경로 모두 .tmp 로 끝나고, 서로 달라야 한다(uuid suffix 충돌 방지).
        for t in captured_tmps:
            self.assertTrue(t.endswith(".tmp"), f"tmp 경로가 .tmp 로 끝나야 합니다: {t!r}")
        self.assertNotEqual(
            captured_tmps[0], captured_tmps[1],
            f"동일 target 2회 write 의 tmp 경로가 달라야 합니다(uuid 충돌 방지): {captured_tmps!r}",
        )
        # 최종 result 파일은 정상 기록되고 tmp 누수 0.
        self.assertEqual(list(Path(self._tmp).glob("*.tmp")), [],
                         "write 완료 후 tmp 파일 누수가 없어야 합니다.")


if __name__ == "__main__":
    unittest.main()
