# -*- coding: utf-8 -*-
"""dispatch.anu_pickup_wake_launcher — task-2729+8 P0-B real wake 결선 (W1).

단일 책임 (single responsibility): WAKE_BUILT argv(이미 sealed ANU key 포함)를
권한 경로로 **실 실행**하는 유일 spawn 책임 모듈. driver=decision-only,
launcher=실행. (real spawn 책임은 오직 이 모듈에만 존재.)

안전 doctrine:
  * raw key 0 — argv 는 stdout/print/log/audit/ledger 어디에도 절대 출력/기록하지
    않는다. argv 의 길이(``argv_len``)만 surface. ``LaunchRecord.to_json()`` 에
    argv/key literal 필드 부재. subprocess args 로만 전달.
  * dry_run 기본 True — 기본 호출은 실행 0(audit-neutral). production ledger/audit
    write 0. audit_path 가 명시 주입된 isolated temp 일 때만 1줄 기록(키/ argv
    미포함). (OWNER_TRIGGER_DRY_RUN_LEDGER_CONTAMINATION 교훈.)
  * fail-closed 우선 — argv None/malformed/non-ANU key/ledger 확인 실패 → launch 0.
"""
from __future__ import annotations

import json
import os
import subprocess
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import Callable, List, Optional

# CANONICAL_ROOT 재사용 (중복 신설 0).
from dispatch.anu_owned_callback_enforcement import (  # pyright: ignore[reportMissingImports]
    CANONICAL_ROOT,
)

SCHEMA_LAUNCH = "dispatch.anu_pickup_wake_launcher.launch.v1"
SCHEMA_AUDIT = "dispatch.anu_pickup_wake_launcher.audit.v1"

# launcher 자체 dedupe ledger(WAKE_LAUNCHED) — pickup 의 PICKUP_WAKE_BUILT ledger 와 분리.
DEFAULT_LAUNCH_LEDGER_REL = os.path.join(
    "memory", "p0b_state", "wake_launch_ledger.jsonl"
)
DEFAULT_AUDIT_REL = os.path.join("memory", "p0b_state", "wake_launch_audit.jsonl")

EVENT_WAKE_LAUNCHED = "WAKE_LAUNCHED"

# ── decision 상수 ─────────────────────────────────────────────────────────────
DECISION_FAIL_CLOSED_NO_ARGV = "FAIL_CLOSED_NO_ARGV"
DECISION_FAIL_CLOSED_MALFORMED = "FAIL_CLOSED_MALFORMED"
DECISION_FAIL_CLOSED_NON_ANU_KEY = "FAIL_CLOSED_NON_ANU_KEY"
DECISION_FAIL_CLOSED_LEDGER_ERROR = "FAIL_CLOSED_LEDGER_ERROR"
DECISION_SKIP_DEDUPE = "SKIP_DEDUPE"
DECISION_DRY_RUN = "DRY_RUN"
DECISION_LAUNCHED = "LAUNCHED"


def _now() -> datetime:
    return datetime.now(timezone.utc)


def _iso(dt: datetime) -> str:
    """timezone-aware datetime 은 UTC 로 변환한 뒤 Z suffix 부여.

    * dt.tzinfo is not None → dt.astimezone(timezone.utc) 로 UTC 정규화 후 포맷.
    * naive datetime(tzinfo is None) → 명시적 UTC 정책: 이미 UTC 인 것으로 간주하여
      그대로 Z suffix (기본 clock _now() 는 항상 tz-aware UTC 반환). 이 정책을 코드에 고정.
    """
    if dt.tzinfo is not None:
        dt = dt.astimezone(timezone.utc)
    return dt.strftime("%Y-%m-%dT%H:%M:%SZ")


# ── LaunchRecord ──────────────────────────────────────────────────────────────
@dataclass
class LaunchRecord:
    """launcher 결정 결과. ★ argv/key literal 절대 미저장 — argv_len(길이)만."""

    ts: str
    task_id: str
    sha256: str
    decision: str            # 위 DECISION_* 상수 중 하나
    dry_run: bool
    reason: Optional[str] = None
    returncode: Optional[int] = None
    argv_len: Optional[int] = None   # ★ argv 길이만. argv/key literal 절대 미저장

    def to_json(self) -> dict:
        """JSON 직렬화. argv/key literal 미포함 보장 (argv_len 만 노출)."""
        return {
            "ts": self.ts,
            "task_id": self.task_id,
            "sha256": self.sha256,
            "decision": self.decision,
            "dry_run": self.dry_run,
            "reason": self.reason,
            "returncode": self.returncode,
            "argv_len": self.argv_len,
        }


# ── 안전 헬퍼 ─────────────────────────────────────────────────────────────────
def _redact_argv(argv) -> Optional[int]:
    """argv 를 redaction — 길이(정수)만 반환. argv None/비-list → None.
    ★ argv/key literal 은 어디에도 노출하지 않는다."""
    if isinstance(argv, list):
        return len(argv)
    return None


def _extract_key_value(argv: List[str]) -> str:
    """argv 에서 ``--key`` 인덱스 다음 원소(값) 반환. 없으면 ""."""
    try:
        idx = argv.index("--key")
    except (ValueError, AttributeError):
        return ""
    if idx + 1 < len(argv):
        return str(argv[idx + 1])
    return ""


def _argv_well_formed(argv) -> bool:
    """argv 구조 검증: list[str] AND ``--cron`` 포함 AND ``--key`` 포함 +
    그 값(--key 다음 원소)이 비어있지 않음."""
    if not isinstance(argv, list) or not argv:
        return False
    if not all(isinstance(x, str) for x in argv):
        return False
    if "--cron" not in argv:
        return False
    if "--key" not in argv:
        return False
    key_val = _extract_key_value(argv)
    if not key_val.strip():
        return False
    return True


def _append_jsonl(record: dict, path: str) -> None:
    """JSONL 1줄 append (ensure_ascii=False, flush+fsync atomic-ish).
    ★ argv/key literal 절대 미기록 (호출부가 redacted record 만 전달).
    OSError 는 비치명(fail-safe) — 단, dedupe 확인 실패는 fail-closed(별도 처리)."""
    try:
        os.makedirs(os.path.dirname(path) or ".", exist_ok=True)
        with open(path, "a", encoding="utf-8") as fh:
            fh.write(json.dumps(record, ensure_ascii=False) + "\n")
            fh.flush()
            os.fsync(fh.fileno())
    except OSError:
        pass  # 기록 실패는 비치명 (fail-safe)


def _dedupe_launched(launch_ledger_path: str, *, task_id: str, sha256: str) -> bool:
    """launch_ledger 에서 event==WAKE_LAUNCHED AND 동일 (task_id, sha256) 존재 여부.

    파일 부재(FileNotFoundError) → False (중복 없음, 정상 진행).
    OSError 외 예외 또는 명백한 손상으로 확인 불가 → LedgerError(fail-closed)로 raise.
    ★ try-block hygiene: 단일 ``with open(...)`` 으로 파일 핸들 수명 일원화.
    """
    try:
        with open(launch_ledger_path, "r", encoding="utf-8") as fh:
            for line in fh:
                line = line.strip()
                if not line:
                    continue
                entry = json.loads(line)  # 손상 라인 → ValueError → fail-closed
                if (
                    isinstance(entry, dict)
                    and entry.get("event") == EVENT_WAKE_LAUNCHED
                    and entry.get("task_id") == task_id
                    and entry.get("sha256") == sha256
                ):
                    return True
    except FileNotFoundError:
        return False  # 파일 부재 = 중복 없음 (정상)
    except OSError as exc:
        # 파일은 있으나 읽기 불가(권한 등) → 확인 불가 → fail-closed.
        raise _LedgerError(f"launch ledger 읽기 불가: {exc}") from exc
    except Exception as exc:  # noqa: BLE001 — JSON 손상 등 → 확인 불가 → fail-closed
        raise _LedgerError(f"launch ledger 손상/파싱 실패: {exc}") from exc
    return False


class _LedgerError(Exception):
    """dedupe 확인 실패 신호 — FAIL_CLOSED_LEDGER_ERROR 로 매핑."""


def _default_subprocess_runner(argv: List[str]) -> int:
    """기본 subprocess_runner: argv 를 실행하고 returncode 반환.

    ★ capture_output 하지 않는다 (키 누출 방지 — 출력 미수집). 실제 호출은
    dry_run=False 일 때만 일어난다. 테스트는 mock runner 를 주입한다.
    """
    return subprocess.run(argv, check=False).returncode


def launch_wake(
    argv,
    *,
    task_id: str,
    sha256: str,
    dry_run: bool = True,
    root: str = CANONICAL_ROOT,
    launch_ledger_path: Optional[str] = None,
    audit_path: Optional[str] = None,
    anu_key_verifier: Optional[object] = None,  # 기대: Callable[[str], bool]. 런타임 callable 검증(fail-closed) 위해 object 로 완화.
    subprocess_runner: Optional[Callable[[List[str]], int]] = None,
    clock: Optional[Callable[[], datetime]] = None,
) -> LaunchRecord:
    """WAKE_BUILT argv 를 안전하게 실 실행(또는 dry-run).

    결정 순서 (fail-closed 우선, 설계 W1 6단계):
      (1) argv None/empty            → FAIL_CLOSED_NO_ARGV
      (2) argv 구조 검증 실패         → FAIL_CLOSED_MALFORMED
      (3) anu_key_verifier 검증 실패  → FAIL_CLOSED_NON_ANU_KEY
      (4) dedupe(WAKE_LAUNCHED) 존재  → SKIP_DEDUPE / 확인 실패 → FAIL_CLOSED_LEDGER_ERROR
      (5) dry_run=True               → DRY_RUN (production write 0)
      (6) dry_run=False              → subprocess 실행 → LAUNCHED

    ★ argv/key 는 어떤 기록에도 미포함 — argv_len(길이)만.
    """
    clock = clock or _now
    ts = _iso(clock())
    ledger = launch_ledger_path or os.path.join(root, DEFAULT_LAUNCH_LEDGER_REL)

    # (1) argv None/empty → FAIL_CLOSED_NO_ARGV.
    if not argv:
        return LaunchRecord(
            ts=ts, task_id=task_id, sha256=sha256,
            decision=DECISION_FAIL_CLOSED_NO_ARGV, dry_run=dry_run,
            reason="argv None/빈 리스트 — wake 0 (fail-closed).",
            argv_len=_redact_argv(argv),
        )

    # (2) argv 구조 검증 → 실패 시 FAIL_CLOSED_MALFORMED.
    if not _argv_well_formed(argv):
        return LaunchRecord(
            ts=ts, task_id=task_id, sha256=sha256,
            decision=DECISION_FAIL_CLOSED_MALFORMED, dry_run=dry_run,
            reason=(
                "argv 구조 이상 (list[str]/--cron/--key+값 검증 실패) — "
                "wake 0 (fail-closed)."
            ),
            argv_len=_redact_argv(argv),
        )

    argv_len = _redact_argv(argv)

    # (3) anu_key_verifier 주어지면 --key 값 검증. False → FAIL_CLOSED_NON_ANU_KEY.
    #     (미주입 시 스킵 — argv 는 이미 runner 의 self-key refuse 통과 산물.)
    if anu_key_verifier is not None:
        # ★ SECURITY: None 이 아닌데 callable 아니면 검증 불가 → fail-closed (스킵 금지).
        if not callable(anu_key_verifier):
            return LaunchRecord(
                ts=ts, task_id=task_id, sha256=sha256,
                decision=DECISION_FAIL_CLOSED_NON_ANU_KEY, dry_run=dry_run,
                reason="anu_key_verifier 가 callable 아님(검증 불가) — wake 0 (fail-closed).",
                argv_len=argv_len,
            )
        key_value = _extract_key_value(argv)
        try:
            verified = bool(anu_key_verifier(key_value))
        except Exception as exc:  # noqa: BLE001 — verifier 예외 → fail-closed
            return LaunchRecord(
                ts=ts, task_id=task_id, sha256=sha256,
                decision=DECISION_FAIL_CLOSED_NON_ANU_KEY, dry_run=dry_run,
                reason=f"anu_key_verifier 예외 → wake 0 (fail-closed): {exc}",
                argv_len=argv_len,
            )
        if not verified:
            return LaunchRecord(
                ts=ts, task_id=task_id, sha256=sha256,
                decision=DECISION_FAIL_CLOSED_NON_ANU_KEY, dry_run=dry_run,
                reason="--key 값이 ANU key 검증 실패 — wake 0 (fail-closed).",
                argv_len=argv_len,
            )

    # (4) dedupe 재확인 — WAKE_LAUNCHED 존재 시 SKIP. 확인 실패 → fail-closed.
    try:
        if _dedupe_launched(ledger, task_id=task_id, sha256=sha256):
            return LaunchRecord(
                ts=ts, task_id=task_id, sha256=sha256,
                decision=DECISION_SKIP_DEDUPE, dry_run=dry_run,
                reason=(
                    "동일 (task_id, sha256) WAKE_LAUNCHED 기록 존재 — "
                    "중복 wake 0 (SKIP_DEDUPE)."
                ),
                argv_len=argv_len,
            )
    except _LedgerError as exc:
        return LaunchRecord(
            ts=ts, task_id=task_id, sha256=sha256,
            decision=DECISION_FAIL_CLOSED_LEDGER_ERROR, dry_run=dry_run,
            reason=f"dedupe 확인 실패 → wake 0 (fail-closed): {exc}",
            argv_len=argv_len,
        )

    # (5) dry_run=True → DRY_RUN. ★ production ledger/audit write 0.
    #     audit_path 가 명시 주입된 경우(isolated temp)에만 audit 1줄 기록.
    if dry_run:
        record = LaunchRecord(
            ts=ts, task_id=task_id, sha256=sha256,
            decision=DECISION_DRY_RUN, dry_run=True,
            reason="dry_run=True — 실행 0, audit-neutral (production write 0).",
            argv_len=argv_len,
        )
        if audit_path is not None:
            # isolated temp audit 만 — argv/key 미포함(redacted record only).
            audit_line = {
                "schema": SCHEMA_AUDIT,
                "event": "WAKE_DRY_RUN",
                **record.to_json(),
            }
            _append_jsonl(audit_line, audit_path)
        return record

    # (6) dry_run=False → subprocess 실 실행 → LAUNCHED.
    runner = subprocess_runner or _default_subprocess_runner
    returncode = runner(argv)
    record = LaunchRecord(
        ts=ts, task_id=task_id, sha256=sha256,
        decision=DECISION_LAUNCHED, dry_run=False,
        reason="모든 게이트 통과 → subprocess 실 실행 (real wake).",
        returncode=returncode,
        argv_len=argv_len,
    )
    # launch_ledger 에 WAKE_LAUNCHED 1줄 append (★ argv/key 절대 미기록).
    _append_jsonl(
        {
            "schema": SCHEMA_LAUNCH,
            "event": EVENT_WAKE_LAUNCHED,
            "task_id": task_id,
            "sha256": sha256,
            "ts": ts,
            "returncode": returncode,
        },
        ledger,
    )
    # audit_path 명시 주입 시 isolated audit 도 기록(redacted).
    if audit_path is not None:
        _append_jsonl(
            {"schema": SCHEMA_AUDIT, "event": EVENT_WAKE_LAUNCHED, **record.to_json()},
            audit_path,
        )
    return record


__all__ = [
    "LaunchRecord",
    "launch_wake",
    "DECISION_FAIL_CLOSED_NO_ARGV",
    "DECISION_FAIL_CLOSED_MALFORMED",
    "DECISION_FAIL_CLOSED_NON_ANU_KEY",
    "DECISION_FAIL_CLOSED_LEDGER_ERROR",
    "DECISION_SKIP_DEDUPE",
    "DECISION_DRY_RUN",
    "DECISION_LAUNCHED",
]
