# -*- coding: utf-8 -*-
"""dispatch.anu_result_pickup_runner — task-2720 P0-a callback pickup runner.

wired 후보 / active=P0-b 대기, 자가발사 0.

이 모듈은 executor 가 작성한 result.json 을 pickup 하여 ANU-owned callback
발사 request(argv)를 생성한다. crontab/systemd/inotify 설치·감시 코드 0.
실 발사는 권한 있는 ANU 세션이 수행한다 (P0-b driver 가 pickup_once 를 호출).

재사용 (중복 신설 0):
  - executor_write_result_json  : dispatch.anu_owned_callback_enforcement
  - anu_runner_pickup_and_fire  : dispatch.anu_owned_callback_enforcement
  - verify_collector_authoritative: dispatch.anu_owned_callback_enforcement
  - 상수/verdicts               : 동일 모듈

Layer A / NO-CRON: cron register/remove 0, subprocess 0, merge 0.
"""
from __future__ import annotations

import hashlib
import json
import os
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Callable, List, Optional

# 재사용 모듈 (절대 수정 금지)
from dispatch.anu_owned_callback_enforcement import (  # pyright: ignore[reportMissingImports]
    CANONICAL_ROOT,
    VERDICT_NON_AUTHORITATIVE,
    VERDICT_PENDING_OWNER_PROOF,
    VERDICT_QUARANTINED,
    VERDICT_REJECTED,
    anu_runner_pickup_and_fire,
    verify_collector_authoritative,
)

# ── 상수 ─────────────────────────────────────────────────────────────────────
PICKUP_SCHEMA = "dispatch.anu_result_pickup_runner.v1"

# verdict 상수
PICKUP_WAKE_BUILT = "WAKE_BUILT"
PICKUP_NO_RESULT_JSON = "NO_RESULT_JSON"
PICKUP_FAIL = "FAIL"
PICKUP_SKIP_TERMINAL = "SKIP_TERMINAL"
PICKUP_SKIP_DEDUPE = "SKIP_DEDUPE"
PICKUP_QUARANTINE = "QUARANTINE"
PICKUP_PENDING = "PENDING_OWNER_PROOF"
PICKUP_REJECT = "REJECT"
PICKUP_SEALED_KEY_MISSING = "SEALED_KEY_MISSING"

# dedupe ledger 기본 경로
DEFAULT_LEDGER_PATH = os.path.join(
    CANONICAL_ROOT, "memory", "events", "callback_4tuple_index.jsonl"
)

# ANU key 환경변수명 (sealed-key 로더용)
ENV_ANU_KEY = "COKACDIR_KEY_ANU"


# ── sealed-key 로더 ───────────────────────────────────────────────────────────
def _default_sealed_key_loader() -> Optional[str]:
    """ANU key 는 .env.keys 의 COKACDIR_KEY_ANU 환경변수로만 로드.
    코드/argv/prompt 어디에도 키 literal 금지."""
    try:
        from utils.env_loader import load_env_keys  # pyright: ignore[reportMissingImports]
        load_env_keys()  # 기본 경로 /home/jay/workspace/.env.keys
    except Exception:  # noqa: BLE001 — 로더 자체 실패는 fail-closed(None)
        pass
    val = (os.environ.get(ENV_ANU_KEY) or "").strip()
    return val or None


# ── PickupResult dataclass ───────────────────────────────────────────────────
@dataclass
class PickupResult:
    """pickup_once 의 반환값."""

    schema: str
    verdict: str
    task_id: str
    result_json_path: str
    sha256: str
    wake_built: bool
    argv: Optional[List[str]]
    classification: str
    reasons: List[str] = field(default_factory=list)
    marker_path: Optional[str] = None

    @property
    def ok(self) -> bool:
        return self.verdict == PICKUP_WAKE_BUILT

    def to_json(self) -> dict:
        return {
            "schema": self.schema,
            "verdict": self.verdict,
            "task_id": self.task_id,
            "result_json_path": self.result_json_path,
            "sha256": self.sha256,
            "wake_built": self.wake_built,
            "argv": list(self.argv) if self.argv is not None else None,
            "classification": self.classification,
            "reasons": list(self.reasons),
            "marker_path": self.marker_path,
        }


def _fail(verdict: str, *, task_id: str = "", result_json_path: str = "",
          sha256: str = "", classification: str = "",
          reasons: Optional[List[str]] = None) -> PickupResult:
    """no-op(wake=0) PickupResult 편의 생성."""
    return PickupResult(
        schema=PICKUP_SCHEMA,
        verdict=verdict,
        task_id=task_id,
        result_json_path=result_json_path,
        sha256=sha256,
        wake_built=False,
        argv=None,
        classification=classification,
        reasons=list(reasons or []),
        marker_path=None,
    )


# ── pickup_once (공개 API) ────────────────────────────────────────────────────
def pickup_once(
    result_json_path: str,
    *,
    gh_probe: Optional[Callable[[str], dict]] = None,
    clock: Optional[Callable[[], datetime]] = None,
    sealed_key_loader: Optional[Callable[[], Optional[str]]] = None,
    executor_key: str = "",
    ledger_path: Optional[str] = None,
    anu_keys: Optional[object] = None,
) -> PickupResult:
    """result.json pickup → ANU-owned callback 발사 request(argv) 생성 (순수 함수).

    처리 순서 (fail-closed):
      1. result.json 읽기 / 파싱
      2. task_id 검증 + sha256 계산
      3. terminal-marker no-op (done/acked)
      4. dedupe ledger 중복 확인
      5. collector verify (조건부, gh_probe 주입 시)
      6. sealed-key proof (ANU key 로드)
      7. wake argv 생성 (anu_runner_pickup_and_fire 재사용)
      8. dedupe ledger 기록
      9. done marker 작성
      10. WAKE_BUILT 반환
    """
    _clock = clock or (lambda: datetime.now(timezone.utc))
    _ledger = ledger_path or DEFAULT_LEDGER_PATH

    # ── 1. result.json 읽기 ───────────────────────────────────────────────────
    if not result_json_path or not os.path.isfile(result_json_path):
        return _fail(PICKUP_NO_RESULT_JSON,
                     result_json_path=str(result_json_path or ""),
                     reasons=["result.json 부재 — executor 미작성 또는 경로 오류."])
    try:
        with open(result_json_path, "rb") as fh:
            raw_bytes = fh.read()
        result: object = json.loads(raw_bytes.decode("utf-8"))
    except (OSError, ValueError) as exc:
        return _fail(PICKUP_FAIL,
                     result_json_path=result_json_path,
                     reasons=[f"result.json 읽기/파싱 실패: {exc}"])
    if not isinstance(result, dict):
        return _fail(PICKUP_FAIL,
                     result_json_path=result_json_path,
                     reasons=["result.json 이 object(dict) 아님."])

    # ── 2. task_id 검증 + sha256 계산 ─────────────────────────────────────────
    task_id = str(result.get("task_id") or "").strip()
    if not task_id:
        return _fail(PICKUP_REJECT,
                     result_json_path=result_json_path,
                     reasons=["result.json task_id 없음/빈값 — 잘못된 argv 차단(fail-closed)."])
    sha256 = hashlib.sha256(raw_bytes).hexdigest()

    result_dir = os.path.dirname(result_json_path)

    # ── 3. terminal-marker no-op ───────────────────────────────────────────────
    done_path = os.path.join(result_dir, f"{task_id}.pickup.done")
    acked_path = os.path.join(result_dir, f"{task_id}.pickup.acked")
    if os.path.exists(done_path) or os.path.exists(acked_path):
        return _fail(PICKUP_SKIP_TERMINAL,
                     task_id=task_id, result_json_path=result_json_path,
                     sha256=sha256,
                     reasons=["terminal marker 존재 — 이미 처리됨 (no-op)."])

    # ── 4. dedupe ledger 확인 ─────────────────────────────────────────────────
    if os.path.isfile(_ledger):
        try:
            with open(_ledger, "r", encoding="utf-8") as lfh:
                for line in lfh:
                    line = line.strip()
                    if not line:
                        continue
                    try:
                        entry = json.loads(line)
                    except (ValueError, TypeError):
                        continue  # 파싱 실패 라인 skip (fail-safe)
                    if (
                        isinstance(entry, dict)
                        and entry.get("event") == "PICKUP_WAKE_BUILT"
                        and entry.get("task_id") == task_id
                        and entry.get("sha256") == sha256
                    ):
                        return _fail(PICKUP_SKIP_DEDUPE,
                                     task_id=task_id,
                                     result_json_path=result_json_path,
                                     sha256=sha256,
                                     reasons=["dedupe: (task_id, sha256) 이미 기록됨 — wake 0."])
        except OSError:
            pass  # ledger 읽기 실패는 skip (fail-safe, 진행)

    # ── 5. collector verify (조건부) ─────────────────────────────────────────
    envelope = result.get("collector_envelope")
    if isinstance(envelope, dict) and gh_probe is not None:
        schedule_id = (
            envelope.get("schedule_id")
            or result.get("schedule_id")
            or None
        )
        v = verify_collector_authoritative(
            task_id=task_id,
            envelope=envelope,
            probe=gh_probe,
            executor_key=executor_key,
            schedule_id=schedule_id,
            now=_clock(),
        )
        if v.verdict == VERDICT_QUARANTINED:
            return _fail(PICKUP_QUARANTINE,
                         task_id=task_id,
                         result_json_path=result_json_path,
                         sha256=sha256,
                         classification=v.classification,
                         reasons=[
                             "collector verify: QUARANTINED (self-collector) — 작업물 보존, wake 0."
                         ] + list(v.reasons))
        if v.verdict == VERDICT_PENDING_OWNER_PROOF:
            return _fail(PICKUP_PENDING,
                         task_id=task_id,
                         result_json_path=result_json_path,
                         sha256=sha256,
                         classification=v.classification,
                         reasons=[
                             "collector verify: PENDING_OWNER_PROOF (재시도 가능) — wake 0."
                         ] + list(v.reasons))
        if v.verdict in (VERDICT_NON_AUTHORITATIVE, VERDICT_REJECTED):
            return _fail(PICKUP_FAIL,
                         task_id=task_id,
                         result_json_path=result_json_path,
                         sha256=sha256,
                         classification=v.classification,
                         reasons=[
                             f"collector verify: {v.verdict} — fail-closed, wake 0."
                         ] + list(v.reasons))
        # v.ok (AUTHORITATIVE) → 통과, 계속

    # ── 6. sealed-key proof ───────────────────────────────────────────────────
    _loader = sealed_key_loader if sealed_key_loader is not None else _default_sealed_key_loader
    anu_key = _loader()
    if not anu_key:
        return _fail(PICKUP_SEALED_KEY_MISSING,
                     task_id=task_id,
                     result_json_path=result_json_path,
                     sha256=sha256,
                     reasons=[
                         "ANU sealed key 미로드 — COKACDIR_KEY_ANU 환경변수 미설정 또는 "
                         ".env.keys 부재. fail-closed (wake 0)."
                     ])

    # ── 7. wake argv 생성 (anu_runner_pickup_and_fire 재사용) ─────────────────
    fire_kwargs: dict = dict(
        result_json_path=result_json_path,
        executor_key=executor_key,
        anu_key=anu_key,
    )
    if anu_keys is not None:
        fire_kwargs["anu_keys"] = anu_keys
    r = anu_runner_pickup_and_fire(**fire_kwargs)

    if not r.ok:
        return _fail(PICKUP_FAIL,
                     task_id=task_id,
                     result_json_path=result_json_path,
                     sha256=sha256,
                     classification=getattr(r, "classification", ""),
                     reasons=[
                         "anu_runner_pickup_and_fire FAIL (executor self-key refuse 포함)."
                     ] + list(r.reasons))

    argv = list(r.argv) if r.argv is not None else []

    # ── 8. dedupe ledger 기록 ─────────────────────────────────────────────────
    ledger_entry = json.dumps({
        "schema": "dispatch.anu_result_pickup_runner.dedupe.v1",
        "event": "PICKUP_WAKE_BUILT",
        "task_id": task_id,
        "sha256": sha256,
        "ts": _clock().isoformat(),
    }, ensure_ascii=False)
    try:
        os.makedirs(os.path.dirname(_ledger), exist_ok=True)
        with open(_ledger, "a", encoding="utf-8") as lfh:
            lfh.write(ledger_entry + "\n")
            lfh.flush()
            os.fsync(lfh.fileno())
    except OSError:
        pass  # noqa: don't abort; argv is already built

    # ── 9. done marker 작성 (idempotent) ──────────────────────────────────────
    done_content = json.dumps({
        "schema": PICKUP_SCHEMA,
        "event": "PICKUP_WAKE_BUILT",
        "task_id": task_id,
        "sha256": sha256,
        "ts": _clock().isoformat(),
    }, ensure_ascii=False)
    done_path_out: Optional[str] = None
    tmp_path = f"{done_path}.tmp-{os.getpid()}"
    try:
        with open(tmp_path, "w", encoding="utf-8") as dfh:
            dfh.write(done_content)
            dfh.flush()
            os.fsync(dfh.fileno())
        os.replace(tmp_path, done_path)
        done_path_out = done_path
    except OSError:
        try:
            os.unlink(tmp_path)
        except OSError:
            pass

    # ── 10. WAKE_BUILT 반환 ───────────────────────────────────────────────────
    return PickupResult(
        schema=PICKUP_SCHEMA,
        verdict=PICKUP_WAKE_BUILT,
        task_id=task_id,
        result_json_path=result_json_path,
        sha256=sha256,
        wake_built=True,
        argv=argv,
        classification="",
        reasons=[
            "result.json pickup 정상 → ANU-owned callback 발사 argv 생성. "
            "실 발사는 권한 있는 ANU 세션이 수행 (자가발사 0)."
        ],
        marker_path=done_path_out,
    )


# ── CLI ───────────────────────────────────────────────────────────────────────
def main(argv: Optional[List[str]] = None) -> int:
    import argparse

    ap = argparse.ArgumentParser(prog="dispatch.anu_result_pickup_runner")
    sub = ap.add_subparsers(dest="cmd", required=True)

    pu = sub.add_parser("pickup")
    pu.add_argument("--result-json-path", required=True)
    pu.add_argument("--executor-key", default="")

    a = ap.parse_args(argv)

    if a.cmd == "pickup":
        res = pickup_once(
            a.result_json_path,
            executor_key=a.executor_key,
        )
        print(json.dumps(res.to_json(), ensure_ascii=False))
        return 0 if res.ok else 2

    return 2


__all__ = [
    "PICKUP_SCHEMA",
    "PICKUP_WAKE_BUILT",
    "PICKUP_NO_RESULT_JSON",
    "PICKUP_FAIL",
    "PICKUP_SKIP_TERMINAL",
    "PICKUP_SKIP_DEDUPE",
    "PICKUP_QUARANTINE",
    "PICKUP_PENDING",
    "PICKUP_REJECT",
    "PICKUP_SEALED_KEY_MISSING",
    "DEFAULT_LEDGER_PATH",
    "ENV_ANU_KEY",
    "PickupResult",
    "pickup_once",
    "main",
]

if __name__ == "__main__":
    raise SystemExit(main())
