#!/usr/bin/env python3
"""gemini_evidence_verify.py — Gemini App evidence-based gate verifier.

Phase 3 redesign (task-2465, 2026-05-06).
- Gemini API 호출 0건 — GitHub App이 박제한 evidence(review/comment/check-run)만 신뢰
- gh api (subprocess) 만 사용 — 직접 HTTP 호출 금지
- GEMINI_API_KEY 의존 0건
"""
from __future__ import annotations

import argparse
import json
import os
import re
import subprocess
import sys
from datetime import datetime, timezone
from pathlib import Path

TIMEOUT_SECONDS = 300  # 5분

DEFAULT_REPO = "JonghyukJeon/dev_workspace"
GEMINI_BOT_LOGIN = "gemini-code-assist[bot]"

# audit log 경로
WORKSPACE = Path(os.environ.get("WORKSPACE", str(Path(__file__).resolve().parent.parent)))
AUDIT_DIR = WORKSPACE / "memory" / "audit"

# ── high severity 패턴 (code block 제외 후 적용) ─────────────────────────────
_HS_EMOJI = re.compile(r"[🔴❌]")
_HS_SEVERITY = re.compile(r"severity\s*:\s*(high|critical)", re.IGNORECASE)
_HS_KEYWORD = re.compile(r"\b(BLOCKING|CRITICAL|MUST FIX)\b")  # 정확히 대문자만
_HS_HEADER = re.compile(r"^\s*##\s+(High|Critical|Blocking)\b", re.MULTILINE)

# 보조 신호 (단독 차단 금지, audit 전용)
_SUPP_SECURITY = re.compile(r"\bsecurity\b", re.IGNORECASE)
_SUPP_DATA_LOSS = re.compile(r"\bdata loss\b", re.IGNORECASE)
_SUPP_REGRESSION = re.compile(r"\bregression\b", re.IGNORECASE)


def _now_iso() -> str:
    return datetime.now(timezone.utc).isoformat()


def _now_ts() -> float:
    return datetime.now(timezone.utc).timestamp()


def strip_code_blocks(body: str) -> str:
    """fenced code block (``` ... ```) 과 inline code (`...`) 제거."""
    if not body:
        return body
    # fenced code blocks (multiline)
    cleaned = re.sub(r"```[\s\S]*?```", " ", body)
    # inline code (single backtick)
    cleaned = re.sub(r"`[^`\n]*`", " ", cleaned)
    return cleaned


def match_high_severity(body: str) -> list[str]:
    """body에서 high-severity 패턴 매칭 결과 반환 (code block 제외 후)."""
    if not body:
        return []
    stripped = strip_code_blocks(body)
    hits: list[str] = []
    if _HS_EMOJI.search(stripped):
        hits.append("emoji:🔴/❌")
    if _HS_SEVERITY.search(stripped):
        hits.append("severity:high/critical")
    if _HS_KEYWORD.search(stripped):
        hits.append("keyword:BLOCKING/CRITICAL/MUST_FIX")
    if _HS_HEADER.search(stripped):
        hits.append("header:##High/Critical/Blocking")
    return hits


def match_supplementary(body: str) -> list[str]:
    """보조 신호 매칭 (단독 차단 금지 — audit 전용)."""
    if not body:
        return []
    stripped = strip_code_blocks(body)
    sigs: list[str] = []
    if _SUPP_SECURITY.search(stripped):
        sigs.append("security")
    if _SUPP_DATA_LOSS.search(stripped):
        sigs.append("data_loss")
    if _SUPP_REGRESSION.search(stripped):
        sigs.append("regression")
    return sigs


def _gh_api(endpoint: str, timeout: int = 30) -> tuple[int, list | dict]:
    """gh api {endpoint} 호출 → (returncode, parsed_json_or_empty)."""
    try:
        proc = subprocess.run(
            ["gh", "api", endpoint],
            capture_output=True, text=True, timeout=timeout
        )
        rc = proc.returncode
        if rc != 0 or not proc.stdout.strip():
            return rc, []
        try:
            data = json.loads(proc.stdout)
        except json.JSONDecodeError:
            return rc, []
        return rc, data
    except subprocess.TimeoutExpired:
        return -1, []
    except Exception:
        return -1, []


def _fetch_reviews(repo: str, pr: int) -> list[dict]:
    """PR reviews — gemini-code-assist[bot] 발행분만."""
    _, data = _gh_api(f"repos/{repo}/pulls/{pr}/reviews")
    if not isinstance(data, list):
        return []
    return [r for r in data if r.get("user", {}).get("login") == GEMINI_BOT_LOGIN]


def _fetch_review_comments(repo: str, pr: int) -> list[dict]:
    """PR review comments (line comments) — gemini-code-assist[bot] 발행분만."""
    _, data = _gh_api(f"repos/{repo}/pulls/{pr}/comments")
    if not isinstance(data, list):
        return []
    return [c for c in data if c.get("user", {}).get("login") == GEMINI_BOT_LOGIN]


def _fetch_issue_comments(repo: str, pr: int) -> list[dict]:
    """Issue (PR) comments — gemini-code-assist[bot] 발행분만."""
    _, data = _gh_api(f"repos/{repo}/issues/{pr}/comments")
    if not isinstance(data, list):
        return []
    return [c for c in data if c.get("user", {}).get("login") == GEMINI_BOT_LOGIN]


def _fetch_check_runs(repo: str, head_sha: str) -> list[dict]:
    """check-runs for head_sha — app.slug == 'gemini-code-assist' 발행분만 (엄격 매칭)."""
    _, data = _gh_api(f"repos/{repo}/commits/{head_sha}/check-runs")
    if not isinstance(data, dict):
        return []
    runs = data.get("check_runs", [])
    if not isinstance(runs, list):
        return []
    result = []
    for run in runs:
        app_slug = run.get("app", {}).get("slug", "")
        # ★ name prefix 허용 제거 — app.slug 정확 매칭만 인정 (다른 봇/수동 check 차단)
        if app_slug == "gemini-code-assist":
            result.append(run)
    return result


def _fetch_head_pushed_at(repo: str, pr_number: int, head_sha: str) -> str | None:
    """head SHA의 마지막 push 시각 추정 — PR updated_at, head.repo.pushed_at, committer.date 중 가장 늦은 시각.

    각 후보의 의미와 한계:
    - `pulls/{pr}.updated_at`: PR-level 마지막 활동 시각 (push/comment/review 모두). PR-specific으로 가장 좁음.
    - `pulls/{pr}.head.repo.pushed_at`: head 저장소 전체의 최근 push 시각 (다른 branch push도 반영) — 너무 광범위.
    - `commits/{sha}.commit.committer.date`: commit 작성 시각 (push 시각 아님). force-push 시 옛날 시각.

    세 후보의 max를 취하면 "실제 push 시각 이후의 가장 가까운 보수적 추정"이 됨.
    None인 경우는 무시.
    """
    candidates: list[float] = []
    raw_strs: list[str] = []

    if pr_number > 0:
        _, pr_data = _gh_api(f"repos/{repo}/pulls/{pr_number}")
        if isinstance(pr_data, dict):
            for key_path in (("updated_at",), ("head", "repo", "pushed_at")):
                node: dict | str | None = pr_data
                for k in key_path:
                    if isinstance(node, dict):
                        node = node.get(k)
                    else:
                        node = None
                        break
                if isinstance(node, str):
                    ts = _parse_iso(node)
                    if ts is not None:
                        candidates.append(ts)
                        raw_strs.append(node)

    _, data = _gh_api(f"repos/{repo}/commits/{head_sha}")
    if isinstance(data, dict):
        try:
            committer_date = data["commit"]["committer"]["date"]
        except (KeyError, TypeError):
            committer_date = None
        if isinstance(committer_date, str):
            ts = _parse_iso(committer_date)
            if ts is not None:
                candidates.append(ts)
                raw_strs.append(committer_date)

    if not candidates:
        return None
    # max ts에 대응하는 ISO 문자열 반환
    max_idx = candidates.index(max(candidates))
    return raw_strs[max_idx]


# 하위호환 alias — 기존 테스트/호출부가 _fetch_head_sha_date 이름으로 monkeypatch
# (런타임에 외부에서 참조 — pyright는 정적으로 미사용으로 표시되지만 실제로는 사용됨)
_fetch_head_sha_date = _fetch_head_pushed_at  # type: ignore[assignment]
__all__ = [
    "evaluate_gate",
    "match_high_severity",
    "match_supplementary",
    "strip_code_blocks",
    "_fetch_head_pushed_at",
    "_fetch_head_sha_date",
]


def _parse_iso(s: str | None) -> float | None:
    """ISO8601 → POSIX timestamp."""
    if not s:
        return None
    try:
        dt = datetime.fromisoformat(s.replace("Z", "+00:00"))
        return dt.timestamp()
    except Exception:
        return None


def _append_audit(record: dict) -> None:
    """memory/audit/gemini-gate-YYYYMMDD.jsonl 에 append."""
    date_str = datetime.now(timezone.utc).strftime("%Y%m%d")
    AUDIT_DIR.mkdir(parents=True, exist_ok=True)
    audit_path = AUDIT_DIR / f"gemini-gate-{date_str}.jsonl"
    with audit_path.open("a", encoding="utf-8") as f:
        f.write(json.dumps(record, ensure_ascii=False) + "\n")


def evaluate_gate(pr_number: int, head_sha: str, repo: str = DEFAULT_REPO) -> dict:
    """
    Returns: {
        "state": "pass" | "hold" | "block",
        "reason": str,
        "evidence": {
            "primary": list of {type, id, body_snippet, commit_sha, stale: bool, severity_matches: list},
            "secondary": list (check_runs from gemini-code-assist app),
            "high_severity_hits": list of pattern matches,
            "supplementary_signals": list of supplementary signals
        },
        "head_pushed_at": ISO8601 str,
        "elapsed_seconds": int,
        "timeout_seconds": 300
    }
    """
    now_ts = _now_ts()

    # ── 1. head SHA push 시각 조회 ───────────────────────────────────────────
    # PR head.repo.pushed_at 우선 (force-push 정확 반영), fallback commit.committer.date
    head_pushed_at_str = _fetch_head_pushed_at(repo, pr_number, head_sha)
    head_pushed_at_ts = _parse_iso(head_pushed_at_str)
    elapsed = int(now_ts - head_pushed_at_ts) if head_pushed_at_ts else TIMEOUT_SECONDS + 1

    # ── 2. evidence 수집 ─────────────────────────────────────────────────────
    reviews = _fetch_reviews(repo, pr_number)
    review_comments = _fetch_review_comments(repo, pr_number)
    issue_comments = _fetch_issue_comments(repo, pr_number)
    check_runs = _fetch_check_runs(repo, head_sha)

    # ── 3. primary evidence 구성 (reviews + comments) ────────────────────────
    primary: list[dict] = []
    all_high_severity: list[str] = []
    all_supplementary: list[str] = []

    # PR reviews
    for r in reviews:
        body = r.get("body", "") or ""
        commit_id = r.get("commit_id", "")
        stale = bool(commit_id and commit_id != head_sha)
        hs = match_high_severity(body)
        supp = match_supplementary(body)
        entry = {
            "type": "review",
            "id": r.get("id"),
            "body_snippet": body[:200],
            "commit_sha": commit_id,
            "stale": stale,
            "severity_matches": hs,
        }
        primary.append(entry)
        if hs:
            all_high_severity.extend(hs)
        if supp:
            all_supplementary.extend(supp)

    # review comments (line comments)
    for c in review_comments:
        body = c.get("body", "") or ""
        commit_id = c.get("commit_id", "")
        stale = bool(commit_id and commit_id != head_sha)
        hs = match_high_severity(body)
        supp = match_supplementary(body)
        entry = {
            "type": "review_comment",
            "id": c.get("id"),
            "body_snippet": body[:200],
            "commit_sha": commit_id,
            "stale": stale,
            "severity_matches": hs,
        }
        primary.append(entry)
        if hs:
            all_high_severity.extend(hs)
        if supp:
            all_supplementary.extend(supp)

    # issue comments — commit_id가 없으므로 created_at >= head_pushed_at일 때만 valid
    # (force-push 이전 issue comment는 stale 처리 — 회피 경로 차단)
    for c in issue_comments:
        body = c.get("body", "") or ""
        hs = match_high_severity(body)
        supp = match_supplementary(body)
        created_at_str = c.get("created_at") or c.get("updated_at")
        created_ts = _parse_iso(created_at_str)
        # head_pushed_at 모르거나 comment 시각 모르면 보수적으로 stale=True
        if head_pushed_at_ts is None or created_ts is None:
            stale_ic = True
        else:
            # comment가 head push 이전에 생성됐으면 stale
            stale_ic = created_ts < head_pushed_at_ts
        entry = {
            "type": "issue_comment",
            "id": c.get("id"),
            "body_snippet": body[:200],
            "commit_sha": None,
            "created_at": created_at_str,
            "stale": stale_ic,
            "severity_matches": hs,
        }
        primary.append(entry)
        # high severity는 stale 여부와 무관하게 BLOCK 사유로 인정 (수상한 evidence는 보수적)
        if hs:
            all_high_severity.extend(hs)
        if supp:
            all_supplementary.extend(supp)

    # secondary evidence (check-runs) — stale 불필요 (head_sha 기반 조회)
    secondary: list[dict] = []
    for run in check_runs:
        secondary.append({
            "type": "check_run",
            "id": run.get("id"),
            "name": run.get("name"),
            "status": run.get("status"),
            "conclusion": run.get("conclusion"),
            "app_slug": run.get("app", {}).get("slug"),
        })

    # ── 4. 판정 ──────────────────────────────────────────────────────────────
    valid_primary = [e for e in primary if not e["stale"]]
    all_stale = len(primary) > 0 and all(e["stale"] for e in primary)

    # 중복 제거
    unique_hs = list(dict.fromkeys(all_high_severity))
    unique_supp = list(dict.fromkeys(all_supplementary))

    if unique_hs:
        state = "block"
        reason = f"high severity matched: {unique_hs}"
    elif all_stale:
        state = "block"
        reason = "all evidence stale (SHA mismatch)"
    elif valid_primary:
        state = "pass"
        reason = f"valid evidence found: {len(valid_primary)} item(s)"
    elif elapsed < TIMEOUT_SECONDS:
        state = "hold"
        reason = f"no evidence yet, elapsed {elapsed}s < {TIMEOUT_SECONDS}s timeout"
    else:
        state = "block"
        reason = f"evidence timeout ({TIMEOUT_SECONDS // 60}min exceeded), no valid evidence"

    result: dict = {
        "state": state,
        "reason": reason,
        "evidence": {
            "primary": primary,
            "secondary": secondary,
            "high_severity_hits": unique_hs,
            "supplementary_signals": unique_supp,
        },
        "head_pushed_at": head_pushed_at_str or "",
        "elapsed_seconds": elapsed,
        "timeout_seconds": TIMEOUT_SECONDS,
    }

    # ── 5. audit log ─────────────────────────────────────────────────────────
    audit_record = {
        "timestamp": _now_iso(),
        "pr": pr_number,
        "sha": head_sha,
        "repo": repo,
        "state": state,
        "reason": reason,
        "elapsed_seconds": elapsed,
        "primary_count": len(primary),
        "valid_primary_count": len(valid_primary),
        "secondary_count": len(secondary),
        "high_severity_hits": unique_hs,
        "supplementary_signals": unique_supp,
    }
    try:
        _append_audit(audit_record)
    except Exception:
        # audit 실패는 gate 판정에 영향 없음
        pass

    return result


def main() -> int:
    ap = argparse.ArgumentParser(
        description="gemini_evidence_verify — Gemini App evidence 기반 PR 게이트 검증"
    )
    ap.add_argument("--pr-number", type=int, required=True, help="PR number")
    ap.add_argument("--head-sha", required=True, help="Head commit SHA")
    ap.add_argument("--repo", default=DEFAULT_REPO, help="OWNER/REPO (default: JonghyukJeon/dev_workspace)")
    ap.add_argument("--json", action="store_true", dest="json_out", help="JSON 출력")
    args = ap.parse_args()

    result = evaluate_gate(args.pr_number, args.head_sha, args.repo)
    state = result["state"]

    if args.json_out:
        print(json.dumps(result, ensure_ascii=False, indent=2))
    else:
        print(f"state={state}")
        print(f"reason={result['reason']}")
        print(f"elapsed={result['elapsed_seconds']}s / timeout={result['timeout_seconds']}s")
        print(f"primary={len(result['evidence']['primary'])} secondary={len(result['evidence']['secondary'])}")
        if result["evidence"]["high_severity_hits"]:
            print(f"HIGH SEVERITY: {result['evidence']['high_severity_hits']}")

    # exit code: pass=0, hold=1, block=2
    if state == "pass":
        return 0
    elif state == "hold":
        return 1
    else:
        return 2


if __name__ == "__main__":
    sys.exit(main())
