"""ANU normal callback 4-source validator.

회장 verbatim (2026-05-27, task-2694+1):
    "actual cron 등록 + schedule_history + owner key + ANU inbound
     또는 authoritative collector receipt 가 있어야 PASS."

본 모듈은 finish-task.sh 등에서 envelope 작성만으로 .done 이 생성되던
우회 패턴을 차단하는 단일 진입점 검증기다.

검증 4-source (AND):
    1) schedule_id            — envelope 회수, placeholder/blocked schedule_type 거부
    2) schedule_history       — /home/jay/.cokacdir/schedule_history/<sid>.log status=ok
    3) owner_key              — envelope owner_key == ANU_KEY (executor self-key → NON_AUTHORITATIVE)
    4) inbound/receipt        — ANU inbound 파일 OR authoritative collector receipt 1건 이상

추가 방어 (Codex suggestion 1/3 반영):
    - envelope task_id ↔ 호출 task_id 결속 (stale receipt 재사용 차단)
    - chair_facing_sid 결속 (옵션)

verdict 정책:
    PASS              모든 4-source PASS
    FAIL              1개 이상 FAIL
    NON_AUTHORITATIVE owner_key 가 executor self-key → downstream 은 FAIL 로 처리
"""

from __future__ import annotations

import argparse
import glob
import json
import os
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional, Sequence


# --------------------------------------------------------------------------- #
# Constants
# --------------------------------------------------------------------------- #

ANU_KEY = "c119085addb0f8b7"
SCHEDULE_HISTORY_DIR = "/home/jay/.cokacdir/schedule_history"

BLOCKED_SCHEDULE_TYPES = frozenset({
    "to_be_registered_by_finish_task_sh",
    "deferred",
    "pending",
})

# placeholder 로 취급해야 하는 schedule_id 값 — envelope 작성만으로 통과되던 패턴
_PLACEHOLDER_SCHEDULE_IDS = frozenset({
    "to_be_registered",
    "to_be_registered_by_finish_task_sh",
    "deferred",
    "pending",
    "none",
    "null",
    "",
})

_DEFAULT_INBOUND_DIRS = (
    "/home/jay/workspace/memory/events/anu_callback",
    "/home/jay/workspace/memory/events",
)

VALIDATOR_SCHEMA = "utils.normal_callback_registration_validator.v1"

PASS = "PASS"
FAIL = "FAIL"
NON_AUTHORITATIVE = "NON_AUTHORITATIVE"


# --------------------------------------------------------------------------- #
# Result dataclass
# --------------------------------------------------------------------------- #

@dataclass
class ValidationResult:
    schema: str
    verdict: str  # PASS | FAIL | NON_AUTHORITATIVE
    task_id: str
    schedule_id: Optional[str]
    sources_checked: Dict[str, str] = field(default_factory=dict)
    reasons: List[str] = field(default_factory=list)
    evidence: Dict[str, Any] = field(default_factory=dict)

    @property
    def ok(self) -> bool:
        return self.verdict == PASS

    def to_json(self) -> dict:
        return {
            "schema": self.schema,
            "verdict": self.verdict,
            "task_id": self.task_id,
            "schedule_id": self.schedule_id,
            "sources_checked": dict(self.sources_checked),
            "reasons": list(self.reasons),
            "evidence": dict(self.evidence),
        }


# --------------------------------------------------------------------------- #
# Internal helpers
# --------------------------------------------------------------------------- #

def _load_envelope(envelope_path: str) -> Optional[dict]:
    """envelope JSON 로드. 실패 시 None 반환."""
    if not envelope_path or not os.path.isfile(envelope_path):
        return None
    try:
        with open(envelope_path, "r", encoding="utf-8") as fh:
            return json.load(fh)
    except (OSError, ValueError, json.JSONDecodeError):
        return None


def _normalize(val: Any) -> str:
    if val is None:
        return ""
    return str(val).strip()


def _check_schedule_id(envelope: dict) -> tuple[str, Optional[str], List[str]]:
    """Source 1: schedule_id + blocked schedule_type 검증."""
    reasons: List[str] = []

    schedule_id_raw = envelope.get("schedule_id")
    schedule_id = _normalize(schedule_id_raw)
    # task-2729+2 reconciliation: registrar(anu_callback_registrar)는 real
    # schedule_id 를 'cron_schedule_id' 필드로 기록한다. canonical 'schedule_id'
    # 가 부재/공백일 때만 alias 로 회수한다(canonical 우선 — 검증 약화 0:
    # placeholder/blocked/path-traversal/history status=ok 검사는 회수값에 그대로 적용).
    if not schedule_id:
        schedule_id = _normalize(envelope.get("cron_schedule_id"))
    schedule_type = _normalize(envelope.get("schedule_type"))

    # blocked schedule_type → 즉시 FAIL (sid 와 무관)
    if schedule_type in BLOCKED_SCHEDULE_TYPES:
        reasons.append(
            f"blocked schedule_type detected: schedule_type='{schedule_type}'"
        )
        return FAIL, schedule_id or None, reasons

    if not schedule_id:
        reasons.append("schedule_id missing or empty in envelope")
        return FAIL, None, reasons

    if schedule_id.lower() in _PLACEHOLDER_SCHEDULE_IDS:
        reasons.append(
            f"schedule_id is placeholder: schedule_id='{schedule_id}'"
        )
        return FAIL, schedule_id, reasons

    return PASS, schedule_id, reasons


def _check_schedule_history(
    schedule_id: Optional[str],
    schedule_history_dir: str,
) -> tuple[str, List[str], Dict[str, Any]]:
    """Source 2: schedule_history status=ok grep."""
    reasons: List[str] = []
    evidence: Dict[str, Any] = {}

    if not schedule_id:
        reasons.append("schedule_history check skipped: schedule_id missing")
        return FAIL, reasons, evidence

    # Gemini PR #155 round-2 security-high: Path Traversal 방어.
    # schedule_id 는 envelope 외부입력 — basename 동일성 + 경로 구분자 부재 검증.
    if (
        os.path.basename(schedule_id) != schedule_id
        or "/" in schedule_id
        or "\\" in schedule_id
        or schedule_id in (".", "..")
    ):
        reasons.append(
            f"invalid schedule_id (path traversal attempt detected): {schedule_id!r}"
        )
        return FAIL, reasons, evidence

    path = os.path.join(schedule_history_dir, f"{schedule_id}.log")
    evidence["schedule_history_path"] = path

    if not os.path.isfile(path):
        reasons.append(f"schedule_history file missing: {path}")
        return FAIL, reasons, evidence

    # Gemini PR #155 medium: streaming line iteration (O(1) memory + short-circuit on first hit).
    found_via = None
    try:
        with open(path, "r", encoding="utf-8", errors="replace") as fh:
            for raw in fh:
                line = raw.strip()
                if not line:
                    continue
                try:
                    obj = json.loads(line)
                    if isinstance(obj, dict) and _normalize(obj.get("status")) == "ok":
                        found_via = "jsonl"
                        break
                except (ValueError, json.JSONDecodeError):
                    if '"status": "ok"' in line or "status=ok" in line:
                        found_via = "plain_grep"
                        break
    except OSError as exc:
        reasons.append(f"schedule_history read error: {exc}")
        return FAIL, reasons, evidence

    if found_via is None:
        reasons.append(
            f"schedule_history status=ok line missing in {path}"
        )
        return FAIL, reasons, evidence

    evidence["schedule_history_match"] = found_via
    return PASS, reasons, evidence


def _check_owner_key(
    envelope: dict,
    anu_key: str,
    executor_key: str,
) -> tuple[str, List[str], Dict[str, Any]]:
    """Source 3: owner_key 검증."""
    reasons: List[str] = []
    evidence: Dict[str, Any] = {}

    # Gemini PR #155 medium: hex key case-insensitive 비교 (대문자 혼합 입력 방어).
    owner_key = _normalize(envelope.get("owner_key")).lower()
    # task-2729+2 reconciliation: registrar 는 ANU key 를 'anu_key' 필드로 기록.
    # canonical 'owner_key' 가 부재/공백일 때만 alias 로 회수(canonical 우선 —
    # 검증 약화 0: self-key→NON_AUTHORITATIVE, !=ANU→FAIL 정책 그대로 적용).
    if not owner_key:
        owner_key = _normalize(envelope.get("anu_key")).lower()
    evidence["owner_key_present"] = bool(owner_key)

    if not owner_key:
        reasons.append("owner_key missing in envelope")
        return FAIL, reasons, evidence

    anu = _normalize(anu_key).lower()
    exec_k = _normalize(executor_key).lower()

    # self-key channel hit → NON_AUTHORITATIVE
    if exec_k and owner_key == exec_k and owner_key != anu:
        reasons.append(
            "self-key channel hit: owner_key matches executor_key (non-authoritative)"
        )
        return NON_AUTHORITATIVE, reasons, evidence

    if owner_key != anu:
        reasons.append(
            f"owner_key mismatch: expected ANU key '{anu}', got '{owner_key}'"
        )
        return FAIL, reasons, evidence

    return PASS, reasons, evidence


def _check_inbound_receipt(
    task_id: str,
    envelope: dict,
    inbound_search_dirs: Sequence[str],
) -> tuple[str, List[str], Dict[str, Any]]:
    """Source 4: ANU inbound 파일 OR authoritative collector receipt."""
    reasons: List[str] = []
    evidence: Dict[str, Any] = {}

    # 1) envelope 명기 evidence 우선
    # Gemini PR #155 round-2 security-high: Arbitrary File Existence Bypass 방어.
    # envelope이 /etc/passwd 등 외부 파일을 가리켜 isfile 통과하는 우회를 차단.
    # inbound_search_dirs 하위 경로만 허용.
    allowed_roots = [
        os.path.abspath(d) for d in inbound_search_dirs if d and os.path.isdir(d)
    ]
    for key in ("inbound_evidence", "collector_receipt"):
        path = _normalize(envelope.get(key))
        if not path:
            continue
        abs_path = os.path.abspath(path)
        contained = any(
            abs_path == root or abs_path.startswith(root + os.sep)
            for root in allowed_roots
        )
        if not contained:
            reasons.append(
                f"envelope.{key} path is outside allowed inbound dirs: {path}"
            )
            continue
        if os.path.isfile(abs_path):
            evidence["receipt_via_envelope_field"] = {key: abs_path}
            return PASS, reasons, evidence
        reasons.append(
            f"envelope.{key} path does not exist on disk: {path}"
        )

    # 2) inbound 디렉토리 glob 검색
    # Gemini PR #155 security-medium: glob.escape 로 task_id 와일드카드 이스케이프 (Glob Injection 방어).
    hits: List[str] = []
    safe_task_id = task_id.replace(os.sep, "")
    escaped_task_id = glob.escape(safe_task_id)
    for d in inbound_search_dirs:
        if not d or not os.path.isdir(d):
            continue
        pattern = os.path.join(d, f"{escaped_task_id}*")
        for p in glob.glob(pattern):
            if os.path.isfile(p):
                hits.append(p)

    if hits:
        evidence["inbound_hits"] = hits[:5]  # 최대 5개만 기록
        evidence["inbound_hit_count"] = len(hits)
        return PASS, reasons, evidence

    reasons.append(
        "inbound/receipt evidence missing: no envelope.inbound_evidence / "
        f"collector_receipt and no glob hit for task_id='{task_id}' in "
        f"{inbound_search_dirs}"
    )
    return FAIL, reasons, evidence


# --------------------------------------------------------------------------- #
# Public API
# --------------------------------------------------------------------------- #

def validate_callback_registration(
    *,
    task_id: str,
    envelope_path: str,
    executor_key: str = "",
    anu_key: str = ANU_KEY,
    schedule_history_dir: str = SCHEDULE_HISTORY_DIR,
    inbound_search_dirs: Optional[List[str]] = None,
    require_chair_facing_sid_match: bool = False,
    expected_chair_facing_sid: Optional[str] = None,
) -> ValidationResult:
    """4-source validator. envelope 파일을 읽어 task_id 결속 후 4-source AND 검증.

    Args:
        task_id: 검증 대상 task_id (envelope.task_id 와 결속 확인)
        envelope_path: ANU callback envelope JSON 파일 경로
        executor_key: 실행 주체 key (self-key channel hit 탐지용, 선택)
        anu_key: 권위 authoritative key (기본 ANU_KEY)
        schedule_history_dir: schedule_history 디렉토리
        inbound_search_dirs: ANU inbound 파일 검색 디렉토리
        require_chair_facing_sid_match: chair_facing_sid 결속 강제 여부
        expected_chair_facing_sid: 결속 강제 시 기대값

    Returns:
        ValidationResult — verdict PASS / FAIL / NON_AUTHORITATIVE
    """
    sources_checked: Dict[str, str] = {}
    reasons: List[str] = []
    evidence: Dict[str, Any] = {"envelope_path": envelope_path}

    # ----------------------------------------------------------------- #
    # 0. envelope load + task_id 결속
    # ----------------------------------------------------------------- #
    envelope = _load_envelope(envelope_path)
    if envelope is None:
        return ValidationResult(
            schema=VALIDATOR_SCHEMA,
            verdict=FAIL,
            task_id=task_id,
            schedule_id=None,
            sources_checked={"envelope_load": FAIL},
            reasons=["envelope file missing or invalid JSON"],
            evidence=evidence,
        )

    envelope_task_id = _normalize(envelope.get("task_id"))
    if envelope_task_id != _normalize(task_id):
        return ValidationResult(
            schema=VALIDATOR_SCHEMA,
            verdict=FAIL,
            task_id=task_id,
            schedule_id=_normalize(envelope.get("schedule_id")) or None,
            sources_checked={"task_id_binding": FAIL},
            reasons=[
                "task_id mismatch (stale receipt reuse blocked): "
                f"envelope.task_id='{envelope_task_id}' vs caller='{task_id}'"
            ],
            evidence=evidence,
        )

    # chair_facing_sid 결속 (옵션)
    if require_chair_facing_sid_match:
        env_cf = _normalize(envelope.get("chair_facing_sid"))
        expected_cf = _normalize(expected_chair_facing_sid)
        if not expected_cf or env_cf != expected_cf:
            return ValidationResult(
                schema=VALIDATOR_SCHEMA,
                verdict=FAIL,
                task_id=task_id,
                schedule_id=_normalize(envelope.get("schedule_id")) or None,
                sources_checked={"chair_facing_sid_binding": FAIL},
                reasons=[
                    "chair_facing_sid mismatch: "
                    f"envelope='{env_cf}' vs expected='{expected_cf}'"
                ],
                evidence=evidence,
            )

    # ----------------------------------------------------------------- #
    # Source 1 — schedule_id
    # ----------------------------------------------------------------- #
    sid_verdict, schedule_id, sid_reasons = _check_schedule_id(envelope)
    sources_checked["schedule_id"] = sid_verdict
    reasons.extend(sid_reasons)

    # ----------------------------------------------------------------- #
    # Source 2 — schedule_history
    # ----------------------------------------------------------------- #
    hist_verdict, hist_reasons, hist_evidence = _check_schedule_history(
        schedule_id, schedule_history_dir
    )
    sources_checked["schedule_history"] = hist_verdict
    reasons.extend(hist_reasons)
    evidence.update(hist_evidence)

    # ----------------------------------------------------------------- #
    # Source 3 — owner_key
    # ----------------------------------------------------------------- #
    own_verdict, own_reasons, own_evidence = _check_owner_key(
        envelope, anu_key=anu_key, executor_key=executor_key
    )
    sources_checked["owner_key"] = own_verdict
    reasons.extend(own_reasons)
    evidence.update(own_evidence)

    # ----------------------------------------------------------------- #
    # Source 4 — inbound/receipt
    # ----------------------------------------------------------------- #
    dirs = list(inbound_search_dirs) if inbound_search_dirs else list(_DEFAULT_INBOUND_DIRS)
    inb_verdict, inb_reasons, inb_evidence = _check_inbound_receipt(
        task_id=task_id, envelope=envelope, inbound_search_dirs=dirs
    )
    sources_checked["inbound_receipt"] = inb_verdict
    reasons.extend(inb_reasons)
    evidence.update(inb_evidence)

    # ----------------------------------------------------------------- #
    # 종합 verdict
    # ----------------------------------------------------------------- #
    if any(v == NON_AUTHORITATIVE for v in sources_checked.values()):
        verdict = NON_AUTHORITATIVE
    elif all(v == PASS for v in sources_checked.values()):
        verdict = PASS
    else:
        verdict = FAIL

    return ValidationResult(
        schema=VALIDATOR_SCHEMA,
        verdict=verdict,
        task_id=task_id,
        schedule_id=schedule_id,
        sources_checked=sources_checked,
        reasons=reasons,
        evidence=evidence,
    )


# --------------------------------------------------------------------------- #
# CLI
# --------------------------------------------------------------------------- #

def main(argv: Optional[List[str]] = None) -> int:
    """CLI: 정상 PASS면 exit 0, FAIL/NON_AUTHORITATIVE면 exit 2."""
    ap = argparse.ArgumentParser(prog="normal_callback_registration_validator")
    ap.add_argument("--task-id", required=True)
    ap.add_argument("--envelope-path", required=True)
    ap.add_argument("--executor-key", default="")
    ap.add_argument("--anu-key", default=ANU_KEY)
    ap.add_argument("--expected-chair-facing-sid", default=None)
    ap.add_argument("--require-chair-facing-sid-match", action="store_true")
    args = ap.parse_args(argv)

    result = validate_callback_registration(
        task_id=args.task_id,
        envelope_path=args.envelope_path,
        executor_key=args.executor_key,
        anu_key=args.anu_key,
        expected_chair_facing_sid=args.expected_chair_facing_sid,
        require_chair_facing_sid_match=args.require_chair_facing_sid_match,
    )
    print(json.dumps(result.to_json(), ensure_ascii=False, indent=2))
    return 0 if result.ok else 2


if __name__ == "__main__":
    raise SystemExit(main())
