# -*- coding: utf-8 -*-
"""utils.callback_authority_4source_validator — 4-source callback authority gate.

task-2680 — CALLBACK_SELF_KEY_REGISTRATION_HARDENING_FIX_IMPLEMENTATION
chair_authorization_id: CHAIR-AUTH-CALLBACK-SELF-KEY-HARDENING-FIX-20260526-JJONGS-IMPLEMENT-001

★ 본 module 은 callback collector spawn 시점에 4-source 교차검증을 수행하는
   pure-function validator 이다. 코드 외부 (cokacdir CLI, schedule_history file)
   의 actual owner key 를 query 해 envelope text 의 owner_key 와 1:1 대조한다.

4 source (회장 verbatim · task-2677 4source_verify_doctrine §1.1):

   * S1 schedule_history — /home/jay/.cokacdir/schedule_history/<schedule_id>.log
     의 callback fire 시점 bot_key_verifier / workspace / prompt 채록.
   * S2 cron-history     — cokacdir --cron-history <schedule_id> --key <K>
     의 actual owner key 채록 (★ K = ANU key + executor self-key 2회 query).
   * S3 envelope         — S1.prompt 본문 (envelope text) 의 owner_key /
     self_key / schema 텍스트 채록.
   * S4 result artifact  — memory/events/<task_id>.*-result-*.json 의
     callback_cron_id / callback_registration_status / callback_role 채록.

분류 enum (회장 task-2680 수정 목표 4):
   * ANU_AUTHORITATIVE             — 4-source PASS, S2 owner == ANU key
   * NON_AUTHORITATIVE_SELF_COLLECTOR
                                   — S2 owner == executor self-key
                                     (★ self-key callback 자동 분류)
   * NON_AUTHORITATIVE_KEY_DRIFT   — S2 owner != ANU and != executor self-key
   * UNDETERMINED_HISTORY_GAP      — S1/S2 query 실패 (cron-history 부재)
   * PROMPT_DRIFT                  — S3 envelope text 의 owner_key 가 ANU 아님

Layer A · Read-only · NO-CRON (회장 9-R.1 1:1):
   본 module 은 cron register/remove, dispatch, subprocess cokacdir exec 0.
   external state query (file read, subprocess --cron-history) 만 수행.

★ ANU independent reverify trigger (회장 task-2680 수정 목표 5):
   classify_collector_authority 가 NON_AUTHORITATIVE_* / UNDETERMINED_* /
   PROMPT_DRIFT 를 반환하면 caller (collector helper integration) 가
   anu_independent_reverify_request 를 emit 해야 한다. 본 module 은 trigger
   결정만 노출 — 실 dispatch 는 별도 layer.
"""
from __future__ import annotations

import json
import re
import subprocess
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, Sequence

VALIDATOR_SCHEMA = "utils.callback_authority_4source_validator.v1"

ANU_KEY = "c119085addb0f8b7"
DEFAULT_CHAT_ID = "6937032012"
COKACDIR_CLI = "/usr/local/bin/cokacdir"
SCHEDULE_HISTORY_DIR = Path("/home/jay/.cokacdir/schedule_history")
RESULT_ARTIFACT_DIR = Path("/home/jay/workspace/memory/events")

# Classification enum (★ task-2680 수정 목표 4 — verbatim from task-2677 doctrine §2.1)
ANU_AUTHORITATIVE = "ANU_AUTHORITATIVE"
NON_AUTHORITATIVE_SELF_COLLECTOR = "NON_AUTHORITATIVE_SELF_COLLECTOR"
NON_AUTHORITATIVE_KEY_DRIFT = "NON_AUTHORITATIVE_KEY_DRIFT"
UNDETERMINED_HISTORY_GAP = "UNDETERMINED_HISTORY_GAP"
PROMPT_DRIFT = "PROMPT_DRIFT"

# trigger set — classifications that REQUIRE ANU independent reverify dispatch
REVERIFY_TRIGGER_CLASSIFICATIONS = frozenset({
    NON_AUTHORITATIVE_SELF_COLLECTOR,
    NON_AUTHORITATIVE_KEY_DRIFT,
    UNDETERMINED_HISTORY_GAP,
    PROMPT_DRIFT,
})


@dataclass
class FourSourceEvidence:
    """Raw + parsed evidence from each source (audit trail)."""
    s1_schedule_history: Optional[Dict[str, Any]] = None
    s2_anu_cron_history: Optional[Dict[str, Any]] = None
    s2_self_cron_history: Optional[Dict[str, Any]] = None
    s3_envelope: Optional[Dict[str, Any]] = None
    s4_result_artifact: Optional[Dict[str, Any]] = None
    queried_paths: List[str] = field(default_factory=list)
    query_errors: List[str] = field(default_factory=list)

    def to_json(self) -> Dict[str, Any]:
        return {
            "s1_schedule_history": self.s1_schedule_history,
            "s2_anu_cron_history": self.s2_anu_cron_history,
            "s2_self_cron_history": self.s2_self_cron_history,
            "s3_envelope": self.s3_envelope,
            "s4_result_artifact": self.s4_result_artifact,
            "queried_paths": list(self.queried_paths),
            "query_errors": list(self.query_errors),
        }


@dataclass
class AuthorityClassification:
    """Final 4-source classification + escalation hint."""
    schema: str
    classification: str
    schedule_id: str
    task_id: str
    executor_key: str
    anu_key: str
    is_authoritative: bool
    requires_anu_reverify: bool
    reasons: List[str] = field(default_factory=list)
    evidence: Optional[FourSourceEvidence] = None
    escalation_hint: Optional[str] = None

    def to_json(self) -> Dict[str, Any]:
        return {
            "schema": self.schema,
            "classification": self.classification,
            "schedule_id": self.schedule_id,
            "task_id": self.task_id,
            "executor_key": self.executor_key,
            "anu_key": self.anu_key,
            "is_authoritative": self.is_authoritative,
            "requires_anu_reverify": self.requires_anu_reverify,
            "reasons": list(self.reasons),
            "evidence": self.evidence.to_json() if self.evidence else None,
            "escalation_hint": self.escalation_hint,
        }


# ── S1: schedule_history reader ─────────────────────────────────────────────

def _read_schedule_history(
    schedule_id: str,
    history_dir: Path = SCHEDULE_HISTORY_DIR,
) -> Optional[Dict[str, Any]]:
    """Read /home/jay/.cokacdir/schedule_history/<schedule_id>.log (JSONL).

    Returns the LAST line parsed as JSON (most recent execution) or None
    if the file is missing / unreadable / empty.
    """
    log_path = history_dir / f"{schedule_id}.log"
    if not log_path.exists():
        return None
    try:
        with log_path.open("r", encoding="utf-8") as f:
            lines = [ln.strip() for ln in f.readlines() if ln.strip()]
        if not lines:
            return None
        return json.loads(lines[-1])
    except (OSError, ValueError):
        return None


# ── S2: cron-history subprocess query ───────────────────────────────────────

def _default_cokacdir_runner(argv: Sequence[str], timeout: int = 15) -> Any:
    """Default subprocess runner — calls cokacdir CLI.

    Returns subprocess.CompletedProcess (or equivalent). Callers may inject a
    fake runner for tests.
    """
    return subprocess.run(
        list(argv), capture_output=True, text=True, timeout=timeout
    )


def _query_cron_history(
    schedule_id: str,
    key: str,
    chat_id: str = DEFAULT_CHAT_ID,
    runner: Optional[Callable[[Sequence[str], int], Any]] = None,
    cokacdir_path: str = COKACDIR_CLI,
) -> Dict[str, Any]:
    """Query cokacdir --cron-history <schedule_id> --key <key>.

    Returns dict with parsed JSON output (or {"status":"error", ...} on
    failure). Schema: {status, count, history, id, ...}.
    """
    runner = runner or _default_cokacdir_runner
    argv = [
        cokacdir_path,
        "--cron-history",
        schedule_id,
        "--chat",
        str(chat_id),
        "--key",
        key,
    ]
    try:
        proc = runner(argv, 15)
    except Exception as exc:  # noqa: BLE001
        return {"status": "error", "error": f"subprocess raised: {exc!r}", "count": None}
    rc = getattr(proc, "returncode", 1)
    stdout = (getattr(proc, "stdout", "") or "").strip()
    stderr = (getattr(proc, "stderr", "") or "").strip()
    if rc != 0:
        return {
            "status": "error",
            "error": f"cokacdir exit={rc} stderr={stderr[:256]}",
            "count": None,
        }
    if not stdout:
        return {"status": "error", "error": "empty stdout", "count": None}
    try:
        payload = json.loads(stdout.splitlines()[-1])
    except (ValueError, IndexError):
        return {"status": "error", "error": f"bad json: {stdout[:256]}", "count": None}
    if not isinstance(payload, dict):
        return {"status": "error", "error": "not a dict", "count": None}
    return payload


# ── S3: envelope text parser ────────────────────────────────────────────────

_OWNER_KEY_RE = re.compile(r"owner_key[\"']?\s*[:=]\s*[\"']?([a-f0-9]+)", re.IGNORECASE)
_SELF_KEY_RE = re.compile(r"self_key[\"']?\s*[:=]\s*[\"']?([0-9a-f]+)", re.IGNORECASE)


def _parse_envelope_from_prompt(prompt: str) -> Dict[str, Any]:
    """Parse envelope text (the cron prompt body) for owner_key / self_key /
    schema. Tolerates JSON envelope or YAML-style envelope text.

    Returns dict with keys: owner_key, self_key, schema, task_id, parse_ok.
    """
    if not prompt:
        return {"owner_key": None, "self_key": None, "schema": None,
                "task_id": None, "parse_ok": False, "reason": "empty prompt"}
    # try JSON first
    try:
        payload = json.loads(prompt)
        if isinstance(payload, dict):
            return {
                "owner_key": payload.get("owner_key"),
                "self_key": payload.get("self_key"),
                "schema": payload.get("schema"),
                "task_id": payload.get("task_id"),
                "parse_ok": True,
                "format": "json",
            }
    except (ValueError, TypeError):
        pass
    # fallback regex (envelope-as-text)
    owner_m = _OWNER_KEY_RE.search(prompt)
    self_m = _SELF_KEY_RE.search(prompt)
    return {
        "owner_key": owner_m.group(1) if owner_m else None,
        "self_key": self_m.group(1) if self_m else None,
        "schema": None,
        "task_id": None,
        "parse_ok": bool(owner_m or self_m),
        "format": "regex",
    }


# ── S4: result artifact reader ──────────────────────────────────────────────

def _read_result_artifact(
    task_id: str,
    artifact_dir: Path = RESULT_ARTIFACT_DIR,
) -> Optional[Dict[str, Any]]:
    """Scan memory/events/<task_id>.*-result-*.json (most recent first).

    Returns the parsed dict of the most recent match or None if absent.
    """
    if not artifact_dir.exists():
        return None
    pattern = f"{task_id}.*result*.json"
    matches = sorted(artifact_dir.glob(pattern), reverse=True)
    for path in matches:
        try:
            with path.open("r", encoding="utf-8") as f:
                return json.load(f)
        except (OSError, ValueError):
            continue
    return None


# ── Main classifier ─────────────────────────────────────────────────────────

def classify_collector_authority(
    *,
    schedule_id: str,
    executor_key: str,
    task_id: str,
    anu_key: str = ANU_KEY,
    chat_id: str = DEFAULT_CHAT_ID,
    cokacdir_runner: Optional[Callable[[Sequence[str], int], Any]] = None,
    history_dir: Path = SCHEDULE_HISTORY_DIR,
    artifact_dir: Path = RESULT_ARTIFACT_DIR,
    cokacdir_path: str = COKACDIR_CLI,
) -> AuthorityClassification:
    """4-source cross-check at collector spawn time. Returns classification.

    ★ Transition table (task-2680 6 수정 목표 #3 + #4):

      | S2.anu_count | S2.self_count | S3.owner_key      | result               |
      | >=1          | 0             | == ANU            | ANU_AUTHORITATIVE     |
      | 0            | >=1           | *                 | NON_AUTHORITATIVE_SELF_COLLECTOR |
      | 0            | 0             | *                 | UNDETERMINED_HISTORY_GAP |
      | >=1          | >=1           | *                 | NON_AUTHORITATIVE_KEY_DRIFT (★ both registered = collision) |
      | *            | *             | != ANU (≠None)    | PROMPT_DRIFT (overrides PASS) |

    Read-only · NO-CRON · no mutation.
    """
    evidence = FourSourceEvidence()
    reasons: List[str] = []

    # S1
    s1 = _read_schedule_history(schedule_id, history_dir=history_dir)
    evidence.s1_schedule_history = s1
    evidence.queried_paths.append(str(history_dir / f"{schedule_id}.log"))
    if s1 is None:
        evidence.query_errors.append(f"S1: schedule_history/{schedule_id}.log missing")

    # S2 — ANU + self-key 2회 query
    s2_anu = _query_cron_history(
        schedule_id, anu_key, chat_id=chat_id,
        runner=cokacdir_runner, cokacdir_path=cokacdir_path,
    )
    s2_self = _query_cron_history(
        schedule_id, executor_key, chat_id=chat_id,
        runner=cokacdir_runner, cokacdir_path=cokacdir_path,
    )
    evidence.s2_anu_cron_history = s2_anu
    evidence.s2_self_cron_history = s2_self

    def _count(payload: Dict[str, Any]) -> int:
        c = payload.get("count")
        if isinstance(c, int):
            return c
        if isinstance(c, str) and c.isdigit():
            return int(c)
        return 0

    anu_count = _count(s2_anu) if s2_anu.get("status") != "error" else -1
    self_count = _count(s2_self) if s2_self.get("status") != "error" else -1

    # S3 — envelope text parse
    prompt = (s1 or {}).get("prompt", "") if s1 else ""
    s3 = _parse_envelope_from_prompt(prompt)
    evidence.s3_envelope = s3

    # S4 — result artifact
    s4 = _read_result_artifact(task_id, artifact_dir=artifact_dir)
    evidence.s4_result_artifact = s4

    # ── Transition table application ──
    # Both queries errored → UNDETERMINED
    if anu_count < 0 and self_count < 0:
        reasons.append(
            "S2 query failed for both ANU and executor self-key — "
            "cron-history unavailable (likely cokacdir CLI absent or auth)."
        )
        cls = UNDETERMINED_HISTORY_GAP

    # self-key callback registered (executor self-key) → SELF_COLLECTOR
    elif self_count >= 1 and anu_count <= 0:
        reasons.append(
            f"S2.cron-history with --key {executor_key} returned count="
            f"{self_count} (>=1) AND ANU key count={anu_count} — "
            "executor self-key callback detected (★ NON_AUTHORITATIVE_SELF_COLLECTOR)."
        )
        cls = NON_AUTHORITATIVE_SELF_COLLECTOR

    # both registered → key drift / collision
    elif anu_count >= 1 and self_count >= 1:
        reasons.append(
            f"S2: BOTH ANU (count={anu_count}) AND executor self-key "
            f"(count={self_count}) cron-history returned — registration "
            "drift / dual-owner collision (NON_AUTHORITATIVE_KEY_DRIFT)."
        )
        cls = NON_AUTHORITATIVE_KEY_DRIFT

    # neither registered → history gap
    elif anu_count == 0 and self_count == 0:
        reasons.append(
            "S2: neither ANU nor executor self-key cron-history returned a "
            "matching schedule — registration history gap "
            "(UNDETERMINED_HISTORY_GAP)."
        )
        cls = UNDETERMINED_HISTORY_GAP

    # ANU registered, self not → likely PASS (will verify S3 below)
    elif anu_count >= 1 and self_count <= 0:
        reasons.append(
            f"S2: ANU key cron-history count={anu_count} (>=1) and "
            f"executor self-key count={self_count} — actual owner is ANU."
        )
        cls = ANU_AUTHORITATIVE
    else:
        # Defensive: should be unreachable. Default to UNDETERMINED.
        reasons.append(
            f"S2: anu_count={anu_count} self_count={self_count} — "
            "unhandled state, defaulting to UNDETERMINED."
        )
        cls = UNDETERMINED_HISTORY_GAP

    # S3 envelope text drift detection (overrides ANU_AUTHORITATIVE only)
    if cls == ANU_AUTHORITATIVE and s3.get("owner_key") and s3["owner_key"] != anu_key:
        reasons.append(
            f"S3.envelope.owner_key={s3['owner_key']!r} != ANU key — "
            "envelope text drifted (PROMPT_DRIFT)."
        )
        cls = PROMPT_DRIFT

    # Final assembly
    is_auth = cls == ANU_AUTHORITATIVE
    needs_reverify = cls in REVERIFY_TRIGGER_CLASSIFICATIONS
    hint = None
    if cls == NON_AUTHORITATIVE_SELF_COLLECTOR:
        hint = (
            "★ self-key callback detected — emit ANU independent reverify "
            "request (utils.callback_collector_helper_integration."
            "emit_anu_independent_reverify_request) + 회장 chat 보고."
        )
    elif cls == NON_AUTHORITATIVE_KEY_DRIFT:
        hint = (
            "★ dual-owner cron registration — escalate to chair, halt "
            "collector self-attestation."
        )
    elif cls == UNDETERMINED_HISTORY_GAP:
        hint = (
            "retry S2 query after backoff; if persistent, escalate to chair."
        )
    elif cls == PROMPT_DRIFT:
        hint = (
            "envelope text owner_key drifted from ANU canonical — reject "
            "envelope, cancel collector spawn."
        )

    return AuthorityClassification(
        schema=VALIDATOR_SCHEMA,
        classification=cls,
        schedule_id=schedule_id,
        task_id=task_id,
        executor_key=executor_key,
        anu_key=anu_key,
        is_authoritative=is_auth,
        requires_anu_reverify=needs_reverify,
        reasons=reasons,
        evidence=evidence,
        escalation_hint=hint,
    )


# ── Reverify request builder (★ 수정 목표 5: ANU independent reverify flow) ──

@dataclass
class ReverifyRequest:
    """Descriptor for an ANU independent reverify task dispatch request.

    NO subprocess / NO dispatch — pure data object. The caller (collector
    helper integration) emits this to the ANU dispatch path or escalates
    to chair via extract_followup.send_anu_notify.
    """
    schema: str
    task_id: str
    original_schedule_id: str
    classification: str
    chair_authorization_id: str
    artifact_path_hint: str
    reason: str

    def to_json(self) -> Dict[str, Any]:
        return {
            "schema": self.schema,
            "task_id": self.task_id,
            "original_schedule_id": self.original_schedule_id,
            "classification": self.classification,
            "chair_authorization_id": self.chair_authorization_id,
            "artifact_path_hint": self.artifact_path_hint,
            "reason": self.reason,
        }


REVERIFY_SCHEMA = "utils.callback_authority_4source_validator.reverify_request.v1"


def build_anu_independent_reverify_request(
    *,
    classification: AuthorityClassification,
    chair_authorization_id: str,
) -> Optional[ReverifyRequest]:
    """Build a reverify request descriptor if classification triggers reverify.

    Returns None if classification == ANU_AUTHORITATIVE (no reverify needed).
    Otherwise returns a ReverifyRequest the caller can pass to ANU dispatch
    or to chair escalation.
    """
    if not classification.requires_anu_reverify:
        return None
    artifact_hint = (
        f"memory/events/{classification.task_id}."
        "independent_anu_reverify.result.json"
    )
    primary_reason = (
        classification.reasons[0] if classification.reasons
        else f"classification={classification.classification}"
    )
    return ReverifyRequest(
        schema=REVERIFY_SCHEMA,
        task_id=classification.task_id,
        original_schedule_id=classification.schedule_id,
        classification=classification.classification,
        chair_authorization_id=chair_authorization_id,
        artifact_path_hint=artifact_hint,
        reason=primary_reason,
    )


# ── helper utilities for callers ────────────────────────────────────────────

def is_self_key_callback(classification: AuthorityClassification) -> bool:
    """True if classification == NON_AUTHORITATIVE_SELF_COLLECTOR.

    Convenience predicate for callers that only care about the self-key
    case (the dominant Track A / Track J failure mode).
    """
    return classification.classification == NON_AUTHORITATIVE_SELF_COLLECTOR


def classify_from_observed(
    *,
    schedule_id: str,
    executor_key: str,
    task_id: str,
    observed_owner_key: Optional[str],
    envelope_owner_key: Optional[str] = None,
    anu_key: str = ANU_KEY,
) -> AuthorityClassification:
    """Test-friendly fast-path: classify from already-observed values without
    hitting the filesystem or subprocess.

    Used by regression suite when fixtures already capture the observed owner
    binding (e.g. parsed schedule_history fixtures).
    """
    reasons: List[str] = []
    if observed_owner_key is None:
        reasons.append("observed_owner_key is None — history gap")
        cls = UNDETERMINED_HISTORY_GAP
    elif observed_owner_key == executor_key:
        reasons.append(
            f"observed owner == executor self-key {executor_key!r} — "
            "NON_AUTHORITATIVE_SELF_COLLECTOR"
        )
        cls = NON_AUTHORITATIVE_SELF_COLLECTOR
    elif observed_owner_key == anu_key:
        if envelope_owner_key and envelope_owner_key != anu_key:
            reasons.append(
                f"envelope text drift: owner_key={envelope_owner_key!r}"
            )
            cls = PROMPT_DRIFT
        else:
            reasons.append("observed owner == ANU key — authoritative")
            cls = ANU_AUTHORITATIVE
    else:
        reasons.append(
            f"observed owner {observed_owner_key!r} != ANU and != self — "
            "NON_AUTHORITATIVE_KEY_DRIFT"
        )
        cls = NON_AUTHORITATIVE_KEY_DRIFT
    return AuthorityClassification(
        schema=VALIDATOR_SCHEMA,
        classification=cls,
        schedule_id=schedule_id,
        task_id=task_id,
        executor_key=executor_key,
        anu_key=anu_key,
        is_authoritative=cls == ANU_AUTHORITATIVE,
        requires_anu_reverify=cls in REVERIFY_TRIGGER_CLASSIFICATIONS,
        reasons=reasons,
        evidence=None,
        escalation_hint=None,
    )
