#!/usr/bin/env python3
"""ANU PreToolUse Guard hook — task-2643 Track A (staged · live 미적용).

목적: ANU 본체가 위험 tool call (특히 Bash CI/Gemini polling) 을 실행하기 *전에*
deny 해서 session-bound polling 위반 (PR #145 bzaona6au 사건) 재발을 막는다.

본 스크립트는 **staged draft** 다:
- `~/.claude/settings.json` 의 `PreToolUse` 섹션에 등록되어야 활성된다.
- 본 task-2643 범위에서는 live 적용 0 (회장 verbatim).
- 활성 절차: `memory/specs/task_2643_final_activation_packet_template_260523.json`.

fail-closed 원칙:
- 입력 JSON parse 실패, config 누락, internal exception → **deny** (allow fallback 금지).
- 어떤 정상 tool call 도 차단되어선 안 되지만, 위반 시 차단이 더 안전한 trade-off.

핵심 검사:
- tool_name == "Bash" 인 경우만 검사한다 (다른 tool 은 allow).
- forbidden pattern 5 그룹 (spec §2.2) 정적 매칭.
- match 발생 시 deny + reason + allowed_alternative + next_steps 반환.

stdin/stdout 규약:
- claude-code `PreToolUse` hook 규약: stdin 으로 `{tool_name, tool_input, ...}` JSON 수신.
- stdout 으로 `{decision, reason, allowed_alternative?, next_steps?}` JSON 반환.
- exit code 0 = decision 적용, 비0 = error (claude-code 가 fail-closed 처리).

CLI 모드 (dry-run):
- `--mode=dry-run --fixture=<path>` 형태로 fixture JSON 을 입력받아 결과를 stdout 으로 dump.
- `--mode=stdin` (default) 으로 claude-code hook 처럼 stdin 처리.
"""

from __future__ import annotations

import argparse
import json
import re
import sys
from pathlib import Path
from typing import Any

# ─────────────────────────────────────────────────────────────────────────────
# Forbidden pattern 5 그룹 (spec §2.2)
# ─────────────────────────────────────────────────────────────────────────────

# Group 1: run_in_background + gh pr view/checks/statusCheckRollup
GROUP_1_BACKGROUND_GH_POLL = "BACKGROUND_GH_PR_POLL"
GROUP_1_PATTERNS = [
    re.compile(r"\bgh\s+pr\s+(view|checks)\b"),
    re.compile(r"statusCheckRollup", re.IGNORECASE),
]

# Group 2: while/until + sleep + gh pr view/checks
GROUP_2_LOOP_POLL = "LOOP_SLEEP_GH_PR_POLL"
GROUP_2_LOOP_KEYWORDS = re.compile(r"\b(while|until)\b")
GROUP_2_SLEEP = re.compile(r"\bsleep\s+\d+")
GROUP_2_GH_POLL = re.compile(r"\bgh\s+(pr|run|api)\b")

# Group 3: gh run watch / gh run list 반복 대기
GROUP_3_RUN_WATCH = "GH_RUN_WATCH_POLL"
GROUP_3_PATTERNS = [
    re.compile(r"\bgh\s+run\s+watch\b"),
    re.compile(r"\bgh\s+run\s+list\b"),
]

# Group 4: CI/Gemini terminal wait 목적의 ANU 본체 polling (semantic intent)
# heuristic: command 안에 "wait/waiting/until/poll/polling/monitor" 같은 의도 시그널 + GH/API 호출 결합
GROUP_4_SEMANTIC_WAIT = "SEMANTIC_CI_GEMINI_WAIT"
GROUP_4_INTENT_KEYWORDS = re.compile(
    r"\b(wait\w*|until|poll\w*|monitor\w*|nudge_wait|gemini[\w\s.]*review\w*|ci[\w\s.]*complete\w*)\b",
    re.IGNORECASE,
)

# Group 5: 명시적 forbidden actions
GROUP_5_EXPLICIT_FORBIDDEN = "EXPLICIT_FORBIDDEN_ACTION"
GROUP_5_PATTERNS = [
    re.compile(r"admin[\s_-]*override", re.IGNORECASE),
    re.compile(r"BOT_GITHUB_TOKEN"),
    re.compile(r"BOT_APP_TOKEN"),
    re.compile(r"chair_authorization", re.IGNORECASE),
    re.compile(r"auto[\s_-]*merge.*activat", re.IGNORECASE),
    re.compile(r"\bPR\s*#?141\b.*pilot", re.IGNORECASE),
    # gh CLI 의 admin override flag — "gh pr merge --admin", "gh ... --admin"
    re.compile(r"\bgh\b[^|;&]*--admin\b", re.IGNORECASE),
    re.compile(r"\bpr\s+merge\b[^|;&]*--admin\b", re.IGNORECASE),
]

ALLOWED_ALTERNATIVE = "delegated_watcher_contract"
NEXT_STEPS = [
    "1. watcher contract 9 필드 생성 (schemas/watcher_contract_v1.json 참조)",
    "2. watcher dispatch (dev bot 또는 cron-watcher)",
    "3. ANU normal callback 대기 (owner_key=c119085addb0f8b7)",
]


# ─────────────────────────────────────────────────────────────────────────────
# 핵심 검사 함수
# ─────────────────────────────────────────────────────────────────────────────


def _has_background_gh_poll(command: str, run_in_background: bool) -> bool:
    """Group 1: run_in_background=True + gh pr view/checks/statusCheckRollup."""
    if not run_in_background:
        return False
    return any(p.search(command) for p in GROUP_1_PATTERNS)


def _has_loop_sleep_gh_poll(command: str) -> bool:
    """Group 2: while/until + sleep + gh pr/run/api 결합."""
    has_loop = bool(GROUP_2_LOOP_KEYWORDS.search(command))
    has_sleep = bool(GROUP_2_SLEEP.search(command))
    has_gh = bool(GROUP_2_GH_POLL.search(command))
    # 3개 모두 있어야 polling intent 로 판단
    return has_loop and has_sleep and has_gh


def _has_gh_run_watch(command: str) -> bool:
    """Group 3: gh run watch / gh run list 반복."""
    return any(p.search(command) for p in GROUP_3_PATTERNS)


def _has_semantic_wait(command: str) -> bool:
    """Group 4: CI/Gemini wait 의도 + GH API 호출 결합."""
    if not GROUP_4_INTENT_KEYWORDS.search(command):
        return False
    # GH API 호출이 같이 있을 때만 trigger
    return bool(
        re.search(r"\bgh\s+", command)
        or "api.github.com" in command
        or "statusCheckRollup" in command
    )


def _has_explicit_forbidden(command: str) -> bool:
    """Group 5: 명시적 forbidden action."""
    return any(p.search(command) for p in GROUP_5_PATTERNS)


def evaluate_bash_command(command: str, run_in_background: bool = False) -> dict[str, Any]:
    """Bash command 를 5 그룹 정적 검사한 뒤 decision dict 를 반환한다.

    Args:
        command: Bash tool 의 `command` 인자.
        run_in_background: Bash tool 의 `run_in_background` 인자 (기본 False).

    Returns:
        {"decision": "allow"} 혹은
        {"decision": "deny", "reason": "...", "allowed_alternative": "...", "next_steps": [...], "match_group": "..."}
    """
    if not isinstance(command, str) or not command.strip():
        # 빈 command 는 allow (다른 검사가 처리)
        return {"decision": "allow"}

    matched_groups: list[str] = []

    if _has_background_gh_poll(command, run_in_background):
        matched_groups.append(GROUP_1_BACKGROUND_GH_POLL)
    if _has_loop_sleep_gh_poll(command):
        matched_groups.append(GROUP_2_LOOP_POLL)
    if _has_gh_run_watch(command):
        matched_groups.append(GROUP_3_RUN_WATCH)
    if _has_semantic_wait(command):
        matched_groups.append(GROUP_4_SEMANTIC_WAIT)
    if _has_explicit_forbidden(command):
        matched_groups.append(GROUP_5_EXPLICIT_FORBIDDEN)

    if not matched_groups:
        return {"decision": "allow"}

    primary_reason = matched_groups[0]
    return {
        "decision": "deny",
        "reason": "ANU_DIRECT_CI_POLLING_FORBIDDEN",
        "match_group": primary_reason,
        "all_match_groups": matched_groups,
        "allowed_alternative": ALLOWED_ALTERNATIVE,
        "next_steps": NEXT_STEPS,
    }


def evaluate_tool_call(payload: dict[str, Any]) -> dict[str, Any]:
    """전체 tool call payload 에서 decision 을 산출한다.

    Bash 가 아니면 allow.
    Bash 면 command/run_in_background 추출 후 evaluate_bash_command 위임.
    """
    tool_name = payload.get("tool_name") or payload.get("toolName")
    if tool_name != "Bash":
        return {"decision": "allow", "reason": "NOT_BASH_TOOL"}

    tool_input = payload.get("tool_input") or payload.get("toolInput") or {}
    if not isinstance(tool_input, dict):
        # fail-closed: 알 수 없는 형태는 deny
        return {
            "decision": "deny",
            "reason": "INVALID_TOOL_INPUT_TYPE",
            "allowed_alternative": ALLOWED_ALTERNATIVE,
            "next_steps": ["Bash tool_input 형식 오류. payload 재검토."],
        }

    command = tool_input.get("command", "")
    run_in_background = bool(tool_input.get("run_in_background", False))
    return evaluate_bash_command(command, run_in_background)


# ─────────────────────────────────────────────────────────────────────────────
# CLI / stdin 진입점
# ─────────────────────────────────────────────────────────────────────────────


def _read_stdin_payload() -> dict[str, Any]:
    raw = sys.stdin.read()
    if not raw.strip():
        raise ValueError("empty stdin payload")
    return json.loads(raw)


def _load_fixture(path: str) -> dict[str, Any]:
    return json.loads(Path(path).read_text(encoding="utf-8"))


def main(argv: list[str] | None = None) -> int:
    parser = argparse.ArgumentParser(description="ANU PreToolUse Guard (staged · live 미적용)")
    parser.add_argument("--mode", choices=["stdin", "dry-run"], default="stdin")
    parser.add_argument("--fixture", help="dry-run 모드일 때 fixture JSON 경로")
    args = parser.parse_args(argv)

    try:
        if args.mode == "dry-run":
            if not args.fixture:
                raise ValueError("--fixture 가 dry-run 모드에 필수입니다")
            payload = _load_fixture(args.fixture)
        else:
            payload = _read_stdin_payload()
        decision = evaluate_tool_call(payload)
    except Exception as exc:  # fail-closed
        decision = {
            "decision": "deny",
            "reason": "HOOK_FAIL_CLOSED",
            "error": f"{type(exc).__name__}: {exc}",
            "allowed_alternative": ALLOWED_ALTERNATIVE,
            "next_steps": [
                "hook 입력/내부 오류 → fail-closed deny.",
                "정상 동작 확인 후 재시도 (rollback plan 참조).",
            ],
        }

    print(json.dumps(decision, ensure_ascii=False, indent=2))
    return 0


if __name__ == "__main__":
    raise SystemExit(main())
