"""scripts/safe_cron_dispatch.py — task-2526 §3 wrapper 본체.

회장 §본질 (2026-05-10 PR #74 재위임 오류):
  독립 bot task cron에 ``--session`` 옵션이 잘못 붙어 봇이 아니라 **아누 자기 세션이
  resume된 사고**가 발생함 (cron 5C9995CCB → PID 448820 = 아누 자기 세션 resume → kill).
  동일 사고가 다시 발생하면 안 된다.

  이 wrapper는 cokacdir cron 명령을 직접 조립하는 모든 호출자가 **반드시 거쳐야 하는**
  preflight gate다. 잘못된 조합 (independent_task + ``--session``, merge_task + ``--session``,
  bot_task + bot_key 누락, owner_pat fallback path) 을 **실행 전에** 차단한다.

회장 §명시 5 차단 조건:
  1. ``independent_task`` + ``--session`` 존재 → BLOCK
  2. ``merge_task`` + ``--session`` 존재 → BLOCK
  3. ``bot_task`` 인데 bot_key 없이 merge/PR 작업 → BLOCK
  4. target bot과 session owner 불일치 → BLOCK
  5. owner_pat fallback 가능성 (gh pr merge 명령에 GH_TOKEN=$BOT_GITHUB_TOKEN 미설정) → BLOCK

회장 §명시 1 ALLOW 예외:
  - ``followup_readonly`` + ``--session`` → 같은 chat 아누 session resume 허용 (오직 이 케이스만)

회장 §금지:
  - ❌ dispatch.py 본체 수정
  - ❌ cokacdir CLI 본체 수정
  - ❌ 직접 cokacdir cron 명령 조립 (외부 호출자도 본 wrapper만 사용)
  - ❌ admin override / owner_pat fallback 정당화
  - ❌ Critical 7종 외 회장 보고 (CRON_TARGETING_GUARD_BLOCKED는 Critical enum이 아니라
    audit JSONL용 짧은 운영 신호 — 별도 chairman 메시지는 호출자 책임)
  - ❌ token / raw key / session secret 원문 기록

API:
  result = safe_cron_dispatch(
      prompt="...",
      schedule="0 9 * * *",
      chat="6937032012",
      target_bot_key="0b94683120a691cf",
      task_kind="bot_task",
      session_id=None,
  )

  if result.status == DispatchStatus.BLOCKED:
      # blocked_reason / audit_record 활용
      ...
  else:
      # result.command_argv 를 caller가 subprocess로 실행하거나
      # safe_cron_dispatch_run() 헬퍼로 실행 (선택)
      ...
"""
from __future__ import annotations

import argparse
import json
import re
import shlex
import subprocess
import sys
from dataclasses import asdict, dataclass
from pathlib import Path
from typing import Callable, List, Literal, Optional, Sequence, Tuple

# worktree root → sys.path (utils import용)
_WORKTREE_ROOT = Path(__file__).resolve().parent.parent
if str(_WORKTREE_ROOT) not in sys.path:
    sys.path.insert(0, str(_WORKTREE_ROOT))

from utils.cron_targeting_audit import (  # noqa: E402  # pyright: ignore[reportMissingImports]
    ACTOR_ANU_SESSION,
    ACTOR_BOT_SESSION,
    ALLOWED_TASK_KINDS,
    BlockedReason,
    CronTargetingAuditRecord,
    TASK_KIND_BOT,
    TASK_KIND_FOLLOWUP_RO,
    TASK_KIND_INDEPENDENT,
    TASK_KIND_MERGE,
    append_audit_jsonl,
    build_audit_record,
    hash_bot_key,
)

__all__ = [
    "DEFAULT_COKACDIR_CLI",
    "DispatchStatus",
    "CronDispatchResult",
    "safe_cron_dispatch",
    "format_chairman_block_notice",
]


# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------

DEFAULT_COKACDIR_CLI = "/usr/local/bin/cokacdir"

# Recognised "bot session owner" mapping — chair_chat → 아누(anu) 자기 세션
# 본 사건 fixture: chat=6937032012 (회장 chat) ↔ session=anu_session
_CHAIR_CHATS: Tuple[str, ...] = ("6937032012",)


# ---------------------------------------------------------------------------
# Result types
# ---------------------------------------------------------------------------

class DispatchStatus:
    ALLOWED = "ALLOWED"
    BLOCKED = "BLOCKED"


TaskKindLiteral = Literal[
    "independent_task",
    "merge_task",
    "bot_task",
    "followup_readonly",
    "human_response",
]


@dataclass(frozen=True)
class CronDispatchResult:
    """preflight 결과. caller는 status 검사 후 분기.

    BLOCKED 상태에서는 command_argv가 비어있고 blocked_reason이 채워진다.
    ALLOWED 상태에서는 command_argv (cokacdir cron CLI 인자)가 채워진다.
    audit_record는 항상 채워진다 (성공/차단 모두 audit JSONL에 기록).
    """

    status: str
    blocked_reason: Optional[str]
    audit_record: CronTargetingAuditRecord
    command_argv: Tuple[str, ...]
    chairman_notice: Optional[str]


# ---------------------------------------------------------------------------
# Static merge / owner_pat detection (회장 §2 #5 — owner_pat fallback path)
# ---------------------------------------------------------------------------

_GH_PR_MERGE_RE = re.compile(r"\bgh\s+pr\s+merge\b")
_BOT_GH_TOKEN_INJECTION_RE = re.compile(
    r"\bGH_TOKEN\s*=\s*\$?\{?\s*BOT_GITHUB_TOKEN\b"
)


def _prompt_uses_gh_pr_merge(prompt: str) -> bool:
    return bool(_GH_PR_MERGE_RE.search(prompt))


def _prompt_injects_bot_gh_token(prompt: str) -> bool:
    return bool(_BOT_GH_TOKEN_INJECTION_RE.search(prompt))


def _looks_like_owner_pat_fallback(task_kind: str, prompt: str) -> bool:
    """``gh pr merge`` 등장 + ``GH_TOKEN=$BOT_GITHUB_TOKEN`` 미설정 → True.

    merge_task / bot_task에만 적용. independent / followup은 무관.
    """
    if task_kind not in (TASK_KIND_MERGE, TASK_KIND_BOT):
        return False
    if not _prompt_uses_gh_pr_merge(prompt):
        return False
    return not _prompt_injects_bot_gh_token(prompt)


# ---------------------------------------------------------------------------
# Actor inference (회장 §2 — actor_expected vs session owner)
# ---------------------------------------------------------------------------

def _expected_actor(task_kind: str) -> str:
    """task_kind로 expected actor 추론.

    - ``followup_readonly`` → ``anu_session`` (예외 케이스, 같은 chat resume 허용)
    - ``human_response`` → ``anu_session``
    - 그 외 (independent / merge / bot) → ``bot_session``
    """
    if task_kind in (TASK_KIND_FOLLOWUP_RO, "human_response"):
        return ACTOR_ANU_SESSION
    return ACTOR_BOT_SESSION


def _session_owner_is_anu(session_id: Optional[str], chat: str) -> bool:
    """현재는 chat이 회장 chat이고 session_id가 채워졌으면 아누 self-session으로 추정."""
    if not session_id:
        return False
    return chat in _CHAIR_CHATS


# ---------------------------------------------------------------------------
# Preflight (회장 §2)
# ---------------------------------------------------------------------------

def _run_preflight(
    *,
    prompt: str,
    chat: str,
    target_bot_key: Optional[str],
    task_kind: str,
    session_id: Optional[str],
) -> Tuple[Optional[str], bool]:
    """preflight 검사 — (blocked_reason, session_id_allowed) 반환.

    blocked_reason가 None이면 ALLOW.
    """
    if task_kind not in ALLOWED_TASK_KINDS:
        raise ValueError(
            f"task_kind must be one of {ALLOWED_TASK_KINDS}, got {task_kind!r}"
        )

    has_session = bool(session_id)
    session_allowed = False

    # 1. independent_task + --session → BLOCK
    if task_kind == TASK_KIND_INDEPENDENT and has_session:
        return BlockedReason.INDEPENDENT_TASK_WITH_SESSION, False

    # 2. merge_task + --session → BLOCK
    if task_kind == TASK_KIND_MERGE and has_session:
        return BlockedReason.MERGE_TASK_WITH_SESSION, False

    # 3. followup_readonly + --session → ALLOW (같은 chat 가정)
    if task_kind == TASK_KIND_FOLLOWUP_RO and has_session:
        # 회장 §followup cron 정책 — read-only이고 session owner = 회장 chat이면 ALLOW
        if _session_owner_is_anu(session_id, chat):
            session_allowed = True
        else:
            return BlockedReason.TARGET_SESSION_OWNER_MISMATCH, False

    # 4. bot_task / merge_task without bot_key → BLOCK
    if task_kind in (TASK_KIND_BOT, TASK_KIND_MERGE):
        if not target_bot_key:
            return BlockedReason.BOT_KEY_MISSING_FOR_BOT_TASK, False

    # 5. owner_pat fallback path (gh pr merge 동반 + bot token injection 누락) → BLOCK
    if _looks_like_owner_pat_fallback(task_kind, prompt):
        return BlockedReason.OWNER_PAT_FALLBACK_DETECTED, False

    # 6. (보강) bot_session owner mismatch — bot_task 인데 session_id가 회장 chat 아누 세션
    if task_kind in (TASK_KIND_INDEPENDENT, TASK_KIND_BOT, TASK_KIND_MERGE):
        if has_session and _session_owner_is_anu(session_id, chat):
            # 이미 1/2 BLOCK이 트리거되지만, bot_task에서 #4 통과했어도 여기서 추가 차단.
            return BlockedReason.TARGET_SESSION_OWNER_MISMATCH, False

    return None, session_allowed


# ---------------------------------------------------------------------------
# Command builder
# ---------------------------------------------------------------------------

def _build_command_argv(
    *,
    prompt: str,
    schedule: str,
    chat: str,
    target_bot_key: Optional[str],
    session_id: Optional[str],
    session_allowed: bool,
    cli: str,
    once: bool,
) -> Tuple[str, ...]:
    """ALLOWED 시 cokacdir cron 인자 리스트 조립.

    BLOCKED인 경우 호출자가 이 함수를 부르지 않는다.
    """
    argv: List[str] = [cli, "--cron", prompt, "--at", schedule, "--chat", chat]
    if target_bot_key:
        argv.extend(["--key", target_bot_key])
    if session_id and session_allowed:
        argv.extend(["--session", session_id])
    if once:
        argv.append("--once")
    return tuple(argv)


def _build_command_preview_for_audit(argv: Tuple[str, ...]) -> str:
    """argv를 shell-quote된 단일 문자열로 결합 (sanitize 직전 형태)."""
    return " ".join(shlex.quote(a) for a in argv)


# ---------------------------------------------------------------------------
# Public API (회장 §3)
# ---------------------------------------------------------------------------

def safe_cron_dispatch(
    prompt: str,
    schedule: str,
    chat: str,
    target_bot_key: Optional[str],
    task_kind: TaskKindLiteral,
    session_id: Optional[str] = None,
    *,
    once: bool = False,
    audit_path: Optional[Path] = None,
    cli_path: str = DEFAULT_COKACDIR_CLI,
) -> CronDispatchResult:
    """preflight → BLOCK 또는 ALLOW.

    이 함수는 외부 부수효과 없음:
      - subprocess 실행 X (caller가 결정)
      - chairman 메시지 송신 X (caller 책임)
      - audit JSONL append O (성공/차단 모두 1건 기록)

    raw bot_key / raw session_id는 audit에 절대 기록되지 않는다 (해시/플래그만).
    """
    if not isinstance(prompt, str) or not prompt:
        raise ValueError("prompt must be a non-empty str")
    if not isinstance(schedule, str) or not schedule:
        raise ValueError("schedule must be a non-empty str")
    if not isinstance(chat, str) or not chat:
        raise ValueError("chat must be a non-empty str")
    if task_kind not in ALLOWED_TASK_KINDS:
        raise ValueError(
            f"task_kind must be one of {ALLOWED_TASK_KINDS}, got {task_kind!r}"
        )

    blocked_reason, session_allowed = _run_preflight(
        prompt=prompt,
        chat=chat,
        target_bot_key=target_bot_key,
        task_kind=task_kind,
        session_id=session_id,
    )

    if blocked_reason is None:
        argv = _build_command_argv(
            prompt=prompt,
            schedule=schedule,
            chat=chat,
            target_bot_key=target_bot_key,
            session_id=session_id,
            session_allowed=session_allowed,
            cli=cli_path,
            once=once,
        )
    else:
        # BLOCKED 시에도 사용자가 시도한 입력 그대로 sanitized preview에 남긴다.
        # session_id가 있었다면 audit preview에 --session이 포함되어야
        # "어떤 위반이 차단되었는지" 추적 가능하다 (값은 sanitize 단계에서 redact).
        argv = _build_command_argv(
            prompt=prompt,
            schedule=schedule,
            chat=chat,
            target_bot_key=target_bot_key,
            session_id=session_id,
            session_allowed=True,
            cli=cli_path,
            once=once,
        )

    command_preview = _build_command_preview_for_audit(argv)
    actor_expected = _expected_actor(task_kind)
    _key_hash = hash_bot_key(target_bot_key)
    target_bot = (
        f"chat={chat}/key=<hash:{_key_hash[:8] if _key_hash else ''}…>"
        if target_bot_key
        else f"chat={chat}/no-key"
    )

    record = build_audit_record(
        cron_id=None,  # cron은 아직 발사되지 않았음
        target_bot=target_bot,
        bot_key=target_bot_key,
        session_id=session_id,
        session_id_allowed=session_allowed,
        task_kind=task_kind,
        actor_expected=actor_expected,
        actor_actual_if_known=None,
        command_preview=command_preview,
        blocked_reason=blocked_reason,
    )
    append_audit_jsonl(record, audit_path=audit_path)

    if blocked_reason is not None:
        return CronDispatchResult(
            status=DispatchStatus.BLOCKED,
            blocked_reason=blocked_reason,
            audit_record=record,
            command_argv=tuple(),
            chairman_notice=format_chairman_block_notice(
                task_kind=task_kind,
                blocked_reason=blocked_reason,
                target_bot=target_bot,
            ),
        )

    return CronDispatchResult(
        status=DispatchStatus.ALLOWED,
        blocked_reason=None,
        audit_record=record,
        command_argv=argv,
        chairman_notice=None,
    )


def format_chairman_block_notice(
    *,
    task_kind: str,
    blocked_reason: str,
    target_bot: str,
) -> str:
    """회장 chat 짧은 보고용 텍스트 (Critical 7종 X — 별도 운영 신호)."""
    return (
        f"CRON_TARGETING_GUARD_BLOCKED — "
        f"task_kind={task_kind} reason={blocked_reason} target={target_bot}"
    )


# ---------------------------------------------------------------------------
# CLI surface — 직접 호출자도 본 wrapper만 사용 (cokacdir 직접 조립 금지 정합)
# ---------------------------------------------------------------------------

def _build_arg_parser() -> argparse.ArgumentParser:
    parser = argparse.ArgumentParser(
        prog="safe_cron_dispatch",
        description="task-2526 cron --session safety guard wrapper",
    )
    parser.add_argument("--prompt", required=True)
    parser.add_argument("--at", dest="schedule", required=True)
    parser.add_argument("--chat", required=True)
    parser.add_argument("--key", dest="target_bot_key", default=None)
    parser.add_argument(
        "--task-kind",
        required=True,
        choices=list(ALLOWED_TASK_KINDS),
    )
    parser.add_argument("--session", dest="session_id", default=None)
    parser.add_argument("--once", action="store_true")
    parser.add_argument(
        "--apply",
        action="store_true",
        help="ALLOWED 시 cokacdir 실행 (기본 dry-run: argv만 출력)",
    )
    parser.add_argument("--cli", default=DEFAULT_COKACDIR_CLI)
    parser.add_argument("--audit-path", default=None)
    return parser


def _cli_main(
    argv: Optional[Sequence[str]] = None,
    *,
    runner: Callable[[Sequence[str]], int] = lambda a: subprocess.call(list(a)),
) -> int:
    parser = _build_arg_parser()
    args = parser.parse_args(argv)

    result = safe_cron_dispatch(
        prompt=args.prompt,
        schedule=args.schedule,
        chat=args.chat,
        target_bot_key=args.target_bot_key,
        task_kind=args.task_kind,
        session_id=args.session_id,
        once=args.once,
        audit_path=Path(args.audit_path) if args.audit_path else None,
        cli_path=args.cli,
    )

    print(json.dumps({
        "status": result.status,
        "blocked_reason": result.blocked_reason,
        "command_argv": list(result.command_argv),
        "chairman_notice": result.chairman_notice,
        "audit": {
            **asdict(result.audit_record),
        },
    }, ensure_ascii=False))

    if result.status == DispatchStatus.BLOCKED:
        return 1

    if args.apply:
        return int(runner(result.command_argv) or 0)

    return 0


if __name__ == "__main__":
    sys.exit(_cli_main())
