# -*- coding: utf-8 -*-
"""anu_pickup_preflight_check.py — ANU result-pickup 활성화 전 안전성 점검 (read-only).

절대 제약:
  - systemd service 실행/enable/restart 금지 (is-enabled/is-active 조회만)
  - ANU spawn / subprocess claude 호출 금지
  - 파일 수정/생성 금지 (점검 전용)
  - ANU key raw 값 출력 금지

사용:
  python3 anu_pickup_preflight_check.py [--candidate-root <PATH>]
"""
from __future__ import annotations

import argparse
import json
import os
import re
import subprocess
from pathlib import Path
from typing import Dict, List, Tuple

# ── 기본 후보 경로 ────────────────────────────────────────────────────────────
DEFAULT_CANDIDATE_ROOT = "/home/jay/workspace/.worktrees/task-2729+2-dev2"

# ── 점검 대상 파일 (후보 루트 기준 상대경로) ─────────────────────────────────
CANDIDATE_FILES = [
    "dispatch/anu_result_pickup_runner.py",
    "dispatch/anu_pickup_driver.py",
    "deploy/systemd/anu-pickup.path",
    "deploy/systemd/anu-pickup.service",
    "scripts/anu_pickup_entrypoint.sh",
]

# ── 결선 키워드 (파일 타입별) ─────────────────────────────────────────────────
UNIT_KEYWORDS = {
    # PathExistsGlob 또는 PathChanged 중 하나면 충분 — OR 검사는 check_candidate_files 에서 처리
    "anu-pickup.path":    ["PathExistsGlob_OR_PathChanged", "Unit="],
    "anu-pickup.service": ["ExecStart", "Type=oneshot", "WorkingDirectory"],
}
RUNNER_KEYWORDS = ["pickup_once", "anu_runner_pickup_and_fire", "PICKUP_WAKE_BUILT"]
DRIVER_KEYWORDS = ["scan_once", "process_one", "ACTIVATION_FLAG_REL", "is_activated"]
ENTRYPOINT_KEYWORDS = ["flock", "XDG_RUNTIME_DIR", "p0b_driver_enabled", "enabled"]

# ── 안전속성 키워드 (runner/driver 소스에서 grep) ───────────────────────────
SAFETY_KEYWORDS = {
    "idempotency":     ["idempotent", "SKIP_DEDUPE", "SKIP_TERMINAL", "dedupe"],
    "lock":            ["flock", "lock"],
    "terminal_marker": ["pickup.done", "pickup.acked", "terminal"],
    "runaway_guard":   ["NOOP_DISABLED", "ACTIVATION_FLAG", "is_activated", "p0b_driver_enabled"],
    "key_sealing":     ["sealed_key_loader", "ENV_ANU_KEY", "COKACDIR_KEY_ANU", "env_loader"],
}

# ── ANU key raw 노출 패턴 (값 자체가 아닌 패턴 검출) ─────────────────────────
# 키 길이 16자 hex + 특수 패턴(환경변수 echo 등) 검출 — 실제 key 값 출력 금지
RAW_KEY_PATTERNS = [
    # 16자 이상 hex 리터럴이 코드에 직접 등장 (주석 제외 코드 라인에서)
    re.compile(r'(?<!["\w])[0-9a-f]{16,}(?!["\w])'),
    # export/echo KEY=<값> 형태
    re.compile(r'(?:export\s+|echo\s+)[A-Z_]*KEY\s*=\s*["\']?[0-9a-f]{8,}'),
    # --key <literal> 형태 (argv)
    re.compile(r'--key\s+[0-9a-f]{12,}'),
]

# ── PASS/FAIL/CAVEAT 상태 ──────────────────────────────────────────────────────
PASS = "PASS"
FAIL = "FAIL"
CAVEAT = "CAVEAT"


# ─────────────────────────────────────────────────────────────────────────────
# 1. systemd user unit 상태 조회 (read-only)
# ─────────────────────────────────────────────────────────────────────────────
def check_systemd_units() -> Tuple[str, List[str]]:
    """systemctl --user is-enabled/is-active 조회. 실 service 미실행."""
    units = ["anu-pickup.path", "anu-pickup.service"]
    lines: List[str] = []
    all_not_installed = True

    for unit in units:
        for verb in ("is-enabled", "is-active"):
            try:
                r = subprocess.run(
                    ["systemctl", "--user", verb, unit],
                    capture_output=True, text=True, timeout=5,
                )
                status = (r.stdout.strip() or r.stderr.strip() or "(empty)")
            except FileNotFoundError:
                status = "systemctl not found"
            except subprocess.TimeoutExpired:
                status = "timeout"
            except Exception as e:
                status = f"error: {e}"
            lines.append(f"  systemctl --user {verb} {unit} → {status}")
            if status not in ("not-found", "inactive", "disabled", "static",
                              "systemctl not found"):
                all_not_installed = False

    # list-unit-files grep
    try:
        r2 = subprocess.run(
            ["systemctl", "--user", "list-unit-files"],
            capture_output=True, text=True, timeout=5,
        )
        matches = [l for l in r2.stdout.splitlines() if "anu-pickup" in l.lower()]
        if matches:
            lines.append(f"  list-unit-files: {matches}")
            all_not_installed = False
        else:
            lines.append("  list-unit-files: NOT_INSTALLED (grep 0건)")
    except Exception as e:
        lines.append(f"  list-unit-files: error: {e}")

    # ~/.config/systemd/user/anu-pickup* 파일 존재 확인
    user_unit_glob = Path.home() / ".config" / "systemd" / "user"
    found_files = list(user_unit_glob.glob("anu-pickup*")) if user_unit_glob.exists() else []
    if found_files:
        lines.append(f"  ~/.config/systemd/user/: {[str(f) for f in found_files]}")
        all_not_installed = False
    else:
        lines.append("  ~/.config/systemd/user/anu-pickup*: NO_USER_UNIT_FILES")

    verdict = PASS if all_not_installed else CAVEAT
    return verdict, lines


# ─────────────────────────────────────────────────────────────────────────────
# 2. 후보 파일 존재 + 결선 키워드 grep
# ─────────────────────────────────────────────────────────────────────────────
def check_candidate_files(root: str) -> Tuple[str, List[str]]:
    """후보 5개 파일 존재 확인 + 결선 키워드 grep."""
    lines: List[str] = []
    missing: List[str] = []
    keyword_fails: List[str] = []

    for relpath in CANDIDATE_FILES:
        fpath = Path(root) / relpath
        basename = fpath.name
        if not fpath.exists():
            lines.append(f"  [{FAIL}] {relpath}: 파일 없음")
            missing.append(relpath)
            continue
        lines.append(f"  [FOUND] {relpath}")

        # 결선 키워드 선택
        kws: List[str] = []
        if basename == "anu_result_pickup_runner.py":
            kws = RUNNER_KEYWORDS
        elif basename == "anu_pickup_driver.py":
            kws = DRIVER_KEYWORDS
        elif basename in UNIT_KEYWORDS:
            kws = UNIT_KEYWORDS[basename]
        elif basename == "anu_pickup_entrypoint.sh":
            kws = ENTRYPOINT_KEYWORDS

        if kws:
            try:
                content = fpath.read_text(encoding="utf-8", errors="replace")
            except OSError as e:
                lines.append(f"    read error: {e}")
                continue
            for kw in kws:
                # OR 키워드: 'A_OR_B' 형태는 A 또는 B 중 하나 있으면 PASS
                if "_OR_" in kw:
                    parts = kw.split("_OR_")
                    found = any(p in content for p in parts)
                    found_kw = next((p for p in parts if p in content), None)
                    status = PASS if found else FAIL
                    lines.append(
                        f"    [{status}] keyword '{parts[0]} OR {parts[1]}': "
                        f"{('found (' + (found_kw or '') + ')') if found else 'MISSING (neither)'}"
                    )
                else:
                    found = kw in content
                    status = PASS if found else FAIL
                    lines.append(f"    [{status}] keyword '{kw}': {'found' if found else 'MISSING'}")
                if not found:
                    keyword_fails.append(f"{relpath}::{kw}")

    verdict = FAIL if missing or keyword_fails else PASS
    return verdict, lines


# ─────────────────────────────────────────────────────────────────────────────
# 3. ANU key raw 노출 정적 스캔
# ─────────────────────────────────────────────────────────────────────────────
def check_raw_key_exposure(root: str) -> Tuple[str, List[str]]:
    """소스에서 평문 key 패턴 검출. 실제 key 값 출력 금지 — 위치만 기록."""
    lines: List[str] = []
    suspects: List[str] = []

    py_files = list(Path(root).rglob("*.py"))
    sh_files = list(Path(root).rglob("*.sh"))
    all_files = py_files + sh_files

    # 너무 많으면 핵심 파일만 (dispatch/, scripts/ 아래)
    target_files = [
        f for f in all_files
        if ("dispatch" in str(f) or "scripts" in str(f))
        and ".git" not in str(f)
        and "__pycache__" not in str(f)
    ]

    for fpath in target_files:
        try:
            content = fpath.read_text(encoding="utf-8", errors="replace")
        except OSError:
            continue

        for lineno, line in enumerate(content.splitlines(), 1):
            # 주석 라인은 건너뜀 (python # 또는 bash #)
            stripped = line.strip()
            if stripped.startswith("#"):
                continue
            for pat in RAW_KEY_PATTERNS:
                m = pat.search(line)
                if m:
                    rel = str(fpath.relative_to(root))
                    suspects.append(f"RAW_KEY_EXPOSURE_SUSPECTED at {rel}:{lineno}")
                    break  # 라인당 1건만

    if suspects:
        for s in suspects[:20]:  # 최대 20건만 출력
            lines.append(f"  [WARN] {s}")
        if len(suspects) > 20:
            lines.append(f"  ... 외 {len(suspects) - 20}건 더")
        verdict = CAVEAT
    else:
        lines.append("  raw key 패턴 검출 0건 (PASS)")
        verdict = PASS

    return verdict, lines


# ─────────────────────────────────────────────────────────────────────────────
# 4. 안전속성 키워드 grep (idempotency/lock/terminal-marker/runaway/key-sealing)
# ─────────────────────────────────────────────────────────────────────────────
def check_safety_keywords(root: str) -> Tuple[str, List[str]]:
    """runner/driver/entrypoint 소스에서 안전속성 키워드 존재 여부 확인."""
    lines: List[str] = []
    fails: List[str] = []

    check_files = [
        "dispatch/anu_result_pickup_runner.py",
        "dispatch/anu_pickup_driver.py",
        "scripts/anu_pickup_entrypoint.sh",
    ]

    # 파일 내용 합산 (안전속성은 전체 소스에서 1건 이상이면 PASS)
    combined = ""
    for relpath in check_files:
        fpath = Path(root) / relpath
        if fpath.exists():
            try:
                combined += fpath.read_text(encoding="utf-8", errors="replace")
            except OSError:
                pass

    for prop, kws in SAFETY_KEYWORDS.items():
        found_any = any(kw in combined for kw in kws)
        status = PASS if found_any else FAIL
        found_kws = [kw for kw in kws if kw in combined]
        lines.append(
            f"  [{status}] {prop}: "
            + (f"found {found_kws}" if found_any else f"MISSING — expected one of {kws}")
        )
        if not found_any:
            fails.append(prop)

    verdict = FAIL if fails else PASS
    return verdict, lines


# ─────────────────────────────────────────────────────────────────────────────
# 5. dry-run / decision 가드 정적 확인
# ─────────────────────────────────────────────────────────────────────────────
def check_dry_run_guard(root: str) -> Tuple[str, List[str]]:
    """driver/entrypoint 의 실 ANU spawn 0 가드 확인.
    - activation flag 가드 (default DISABLED)
    - pickup_fn 주입 가능 (mock 교체 가능)
    - entrypoint FLAG_VALUE != 'enabled' → exit 0 가드
    - driver 는 argv 를 실행하지 않음 (P0-a dry_run 주석)
    """
    lines: List[str] = []
    checks: List[bool] = []

    # 1. activation flag 가드 (entrypoint)
    ep = Path(root) / "scripts" / "anu_pickup_entrypoint.sh"
    if ep.exists():
        ep_text = ep.read_text(encoding="utf-8", errors="replace")
        has_flag_guard = ('FLAG_VALUE" != "enabled"' in ep_text
                          or "FLAG_VALUE" in ep_text and "exit 0" in ep_text)
        lines.append(f"  [{PASS if has_flag_guard else FAIL}] entrypoint: activation flag guard (exit 0 when not enabled)")
        checks.append(has_flag_guard)
    else:
        lines.append(f"  [{CAVEAT}] anu_pickup_entrypoint.sh 파일 없음 — 점검 스킵")
        checks.append(True)  # 파일 없음은 CAVEAT, fail 아님

    # 2. driver scan_once: activation 미활성 시 NOOP_DISABLED 반환
    driver = Path(root) / "dispatch" / "anu_pickup_driver.py"
    if driver.exists():
        drv_text = driver.read_text(encoding="utf-8", errors="replace")
        has_noop = "VERDICT_NOOP_DISABLED" in drv_text and "is_activated" in drv_text
        lines.append(f"  [{PASS if has_noop else FAIL}] driver scan_once: NOOP_DISABLED when not activated")
        checks.append(has_noop)

        # 3. driver 는 argv 실행 안 함 — import subprocess / subprocess. 실호출 검사
        has_no_subprocess = (
            "import subprocess" not in drv_text
            and "subprocess." not in drv_text
        )
        lines.append(f"  [{PASS if has_no_subprocess else FAIL}] driver: subprocess import/호출 0건 (실 spawn 없음)")
        checks.append(has_no_subprocess)

        # 4. pickup_fn 주입 가능 (mock 교체 가능)
        has_injection = "pickup_fn=None" in drv_text or "pickup_fn =" in drv_text
        lines.append(f"  [{PASS if has_injection else FAIL}] driver: pickup_fn 주입 가능 (테스트/mock 교체)")
        checks.append(has_injection)
    else:
        lines.append(f"  [{CAVEAT}] anu_pickup_driver.py 없음 — 점검 스킵")

    # 5. runner pickup_once: subprocess 0 (주석 제외 코드 라인 기준)
    runner = Path(root) / "dispatch" / "anu_result_pickup_runner.py"
    if runner.exists():
        run_text = runner.read_text(encoding="utf-8", errors="replace")
        # import subprocess / subprocess. 실호출 검사 (docstring 내 언급은 무시)
        has_no_subprocess_runner = (
            "import subprocess" not in run_text
            and "subprocess." not in run_text
        )
        lines.append(f"  [{PASS if has_no_subprocess_runner else FAIL}] runner: subprocess import/호출 0건 (실 spawn 없음)")
        checks.append(has_no_subprocess_runner)

        # 6. runner WAKE_BUILT 는 argv 반환 (실행 아님)
        has_argv_only = (
            "argv" in run_text
            and "wake_built=True" in run_text
            and "import subprocess" not in run_text
        )
        lines.append(
            f"  [{PASS if has_argv_only else CAVEAT}] runner: WAKE_BUILT 는 argv dict 반환만 "
            f"(실 실행 없음 — '실 발사는 권한 있는 ANU 세션이 수행' 주석 확인)"
        )
        checks.append(True)  # 코드 존재 확인이므로 CAVEAT 수준

    verdict = PASS if all(checks) else FAIL
    return verdict, lines


# ─────────────────────────────────────────────────────────────────────────────
# main
# ─────────────────────────────────────────────────────────────────────────────
def main() -> int:
    ap = argparse.ArgumentParser(
        prog="anu_pickup_preflight_check",
        description="ANU result-pickup 활성화 전 안전성 점검 (read-only, 실 service 미실행)",
    )
    ap.add_argument(
        "--candidate-root",
        default=DEFAULT_CANDIDATE_ROOT,
        help=f"점검 대상 worktree 루트 (기본: {DEFAULT_CANDIDATE_ROOT})",
    )
    args = ap.parse_args()
    root = args.candidate_root

    print(f"[preflight] candidate-root: {root}")
    print(f"[preflight] root exists: {os.path.isdir(root)}")
    print()

    results: Dict[str, dict] = {}

    # ── 1. systemd 상태 ───────────────────────────────────────────────────────
    print("=== [1] systemd user unit 상태 (read-only) ===")
    v1, l1 = check_systemd_units()
    for line in l1:
        print(line)
    print(f"  => {v1}")
    results["systemd_units"] = {"verdict": v1, "detail": l1}
    print()

    # ── 2. 후보 파일 존재 + 결선 키워드 ──────────────────────────────────────
    print("=== [2] 후보 파일 존재 + 결선 키워드 grep ===")
    v2, l2 = check_candidate_files(root)
    for line in l2:
        print(line)
    print(f"  => {v2}")
    results["candidate_files"] = {"verdict": v2, "detail": l2}
    print()

    # ── 3. ANU key raw 노출 스캔 ──────────────────────────────────────────────
    print("=== [3] ANU key raw 노출 정적 스캔 ===")
    v3, l3 = check_raw_key_exposure(root)
    for line in l3:
        print(line)
    print(f"  => {v3}")
    results["raw_key_exposure"] = {"verdict": v3, "detail": l3}
    print()

    # ── 4. 안전속성 키워드 ────────────────────────────────────────────────────
    print("=== [4] 안전속성 키워드 (idempotency/lock/terminal/runaway/key-sealing) ===")
    v4, l4 = check_safety_keywords(root)
    for line in l4:
        print(line)
    print(f"  => {v4}")
    results["safety_keywords"] = {"verdict": v4, "detail": l4}
    print()

    # ── 5. dry-run / decision 가드 ────────────────────────────────────────────
    print("=== [5] dry-run / 실 ANU spawn 0 가드 확인 ===")
    v5, l5 = check_dry_run_guard(root)
    for line in l5:
        print(line)
    print(f"  => {v5}")
    results["dry_run_guard"] = {"verdict": v5, "detail": l5}
    print()

    # ── 종합 결과 ─────────────────────────────────────────────────────────────
    print("=== 종합 preflight 결과 ===")
    summary = {
        "candidate_root": root,
        "checks": {k: v["verdict"] for k, v in results.items()},
        "overall": PASS,
    }

    any_fail = any(v["verdict"] == FAIL for v in results.values())
    any_caveat = any(v["verdict"] == CAVEAT for v in results.values())
    if any_fail:
        summary["overall"] = FAIL
    elif any_caveat:
        summary["overall"] = CAVEAT

    print(json.dumps(summary, ensure_ascii=False, indent=2))
    print()

    overall = summary["overall"]
    if overall == FAIL:
        print("[preflight] OVERALL: FAIL — 활성화 전 필수 항목 해결 필요")
        return 1
    elif overall == CAVEAT:
        print("[preflight] OVERALL: CAVEAT — 경고 항목 확인 후 활성화 검토")
        return 0
    else:
        print("[preflight] OVERALL: PASS — 모든 항목 통과")
        return 0


if __name__ == "__main__":
    raise SystemExit(main())
