"""utils/gemini_gate_validator.py — thread-aware Gemini gate 검증기.

task-2472 구현 4: body keyword grep만으로는 부족한 기존 Gemini gate 보강.
latestReviews + comments + threads + image badge 종합 판정.

fail-closed: 어느 gh API 호출이라도 실패 시 fetch_ok=False → 자동 FAIL.
"""
from __future__ import annotations

import hashlib
import json
import os
import re
import subprocess
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional

# --------------------------------------------------------------------------
# 상수 / 경로
# --------------------------------------------------------------------------

# Gemini 봇 계정명 (PR 리뷰 작성자 식별)
GEMINI_BOT_AUTHORS = frozenset({"gemini-code-assist[bot]", "gemini-code-assist"})

# audit jsonl 경로 (workspace root 상대)
AUDIT_JSONL_REL = Path("memory/orchestration-audit/gemini-gate-decision.jsonl")

# image markdown severity 패턴 (alt-text 기반)
_IMAGE_SEVERITY_PATTERN = re.compile(
    r"!\[\s*(high|medium|critical|low)\s*\]\(",
    re.IGNORECASE,
)

# GraphQL: PR reviewThreads 조회
_REVIEW_THREADS_QUERY = """
query ReviewThreads($owner: String!, $name: String!, $number: Int!) {
  repository(owner: $owner, name: $name) {
    pullRequest(number: $number) {
      reviewThreads(first: 50) {
        nodes {
          id
          isResolved
          comments(first: 50) {
            nodes {
              body
              author { login }
            }
          }
        }
      }
    }
  }
}
"""

# --------------------------------------------------------------------------
# 내부 헬퍼
# --------------------------------------------------------------------------


def _now_iso() -> str:
    return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")


def _evidence_hash_str(record: dict) -> str:
    serialized = json.dumps(record, sort_keys=True, ensure_ascii=False)
    return hashlib.sha256(serialized.encode("utf-8")).hexdigest()


def _workspace_root(workspace: Optional[Path] = None) -> Path:
    return workspace or Path(os.environ.get("WORKSPACE_ROOT", "/home/jay/workspace"))


def _gh_api_json(endpoint: str, timeout: int = 30) -> tuple[bool, object]:
    """gh api {endpoint} → (ok, parsed_json).

    실패 시 (False, None).
    """
    try:
        proc = subprocess.run(
            ["gh", "api", endpoint],
            capture_output=True,
            text=True,
            timeout=timeout,
            shell=False,
            check=False,
        )
        if proc.returncode != 0 or not proc.stdout.strip():
            return False, None
        try:
            return True, json.loads(proc.stdout)
        except json.JSONDecodeError:
            return False, None
    except Exception:
        return False, None


def _gh_graphql(query: str, variables: dict, timeout: int = 30) -> tuple[bool, object]:
    """gh api graphql 호출 → (ok, data).

    실패 시 (False, None).
    """
    try:
        vars_str = json.dumps(variables)
        proc = subprocess.run(
            [
                "gh", "api", "graphql",
                "-f", f"query={query}",
                "-f", f"variables={vars_str}",
            ],
            capture_output=True,
            text=True,
            timeout=timeout,
            shell=False,
            check=False,
        )
        if proc.returncode != 0 or not proc.stdout.strip():
            return False, None
        try:
            return True, json.loads(proc.stdout)
        except json.JSONDecodeError:
            return False, None
    except Exception:
        return False, None


def _parse_repo(repo: str) -> tuple[str, str]:
    """'owner/name' 형식 파싱 → (owner, name)."""
    parts = repo.split("/", 1)
    if len(parts) != 2:
        return repo, repo
    return parts[0], parts[1]


# --------------------------------------------------------------------------
# 공개 API
# --------------------------------------------------------------------------


def detect_image_severity(text: str) -> dict:
    """image markdown alt-text 기반 severity 탐지.

    패턴: ![high](...) ![medium](...) ![critical](...) ![low](...)
    Returns {"high": N, "medium": N, "critical": N, "low": N}
    """
    counts: dict[str, int] = {"high": 0, "medium": 0, "critical": 0, "low": 0}
    if not text:
        return counts
    for m in _IMAGE_SEVERITY_PATTERN.finditer(text):
        sev = m.group(1).lower()
        if sev in counts:
            counts[sev] += 1
    return counts


def fetch_pr_review_data(
    pr_number: int,
    repo: str | None = None,
) -> dict:
    """gh API로 PR review 데이터 종합 수집.

    수집 항목:
    - latestReviews: /repos/{repo}/pulls/{pr}/reviews
    - reviewComments: /repos/{repo}/pulls/{pr}/comments
    - reviewThreads: GraphQL (isResolved + comments[0].body)
    - issueComments: /repos/{repo}/issues/{pr}/comments

    fail-closed: 어느 호출이라도 실패 시 fetch_ok=False.

    Returns
    -------
    dict
        {
            "reviews": [...],
            "comments": [...],
            "threads": [...],
            "issue_comments": [...],
            "fetch_ok": bool,
            "errors": [...]
        }
    """
    # Gemini 리뷰 medium: docstring 다음에 repo 검증 (Gemini가 docstring 위치 지적).
    if repo is None:
        repo = os.environ.get("GH_REPO", "Jeon-Jonghyuk/dev_workspace")
    errors: list[str] = []

    # 1. latestReviews
    ok1, reviews = _gh_api_json(f"repos/{repo}/pulls/{pr_number}/reviews")
    if not ok1:
        errors.append("reviews fetch failed")
        reviews = []

    # 2. reviewComments (inline)
    ok2, comments = _gh_api_json(f"repos/{repo}/pulls/{pr_number}/comments")
    if not ok2:
        errors.append("comments fetch failed")
        comments = []

    # 3. issueComments
    ok3, issue_comments = _gh_api_json(f"repos/{repo}/issues/{pr_number}/comments")
    if not ok3:
        errors.append("issue_comments fetch failed")
        issue_comments = []

    # 4. reviewThreads (GraphQL)
    owner, name = _parse_repo(repo)
    ok4, gql_data = _gh_graphql(
        _REVIEW_THREADS_QUERY,
        {"owner": owner, "name": name, "number": pr_number},
    )
    threads: list[dict] = []
    if ok4 and isinstance(gql_data, dict):
        try:
            nodes = (
                gql_data["data"]["repository"]["pullRequest"]["reviewThreads"]["nodes"]
            )
            for node in nodes:
                body = ""
                if node.get("comments", {}).get("nodes"):
                    body = node["comments"]["nodes"][0].get("body", "") or ""
                threads.append(
                    {
                        "id": node.get("id", ""),
                        "isResolved": node.get("isResolved", False),
                        "body": body,
                    }
                )
        except (KeyError, TypeError):
            errors.append("reviewThreads GraphQL parse failed")
    else:
        errors.append("reviewThreads GraphQL fetch failed")

    # fail-closed: 하나라도 실패하면 fetch_ok=False
    fetch_ok = ok1 and ok2 and ok3 and ok4 and not errors

    return {
        "reviews": reviews if isinstance(reviews, list) else [],
        "comments": comments if isinstance(comments, list) else [],
        "threads": threads,
        "issue_comments": issue_comments if isinstance(issue_comments, list) else [],
        "fetch_ok": fetch_ok,
        "errors": errors,
    }


def evaluate_gate(
    pr_data: dict,
    *,
    block_unresolved_medium: bool = True,
) -> dict:
    """Gemini gate 종합 판정.

    판정 규칙:
    1. reviews 중 Gemini bot 작성 review 0개 → FAIL ("Gemini review 부재")
    2. reviewThreads 중 unresolved && severity in (medium,high,critical) → FAIL
    3. body/comment 텍스트 또는 image badge에 high/critical → FAIL
    4. medium unresolved thread → block_unresolved_medium=True 시 FAIL,
       False 시 PASS_WITH_MEDIUM

    Returns
    -------
    dict
        {
            "verdict": "PASS" | "FAIL" | "BLOCKED" | "RECOVERABLE_BLOCKED",
            "reason": str,
            "severity_counts": {"high": N, "medium": N, "low": N, "critical": N, "image_high": N},
            "unresolved_threads": [{"id":..., "severity":..., "body":...}, ...],
            "gemini_review_present": bool,
        }
    """
    # fetch_ok=False → 자동 FAIL (fail-closed)
    if not pr_data.get("fetch_ok", False):
        return {
            "verdict": "FAIL",
            "reason": f"PR 데이터 fetch 실패: {pr_data.get('errors', [])}",
            "severity_counts": {"high": 0, "medium": 0, "low": 0, "critical": 0, "image_high": 0},
            "unresolved_threads": [],
            "gemini_review_present": False,
        }

    # Gemini review 존재 여부
    reviews = pr_data.get("reviews", [])
    gemini_review_present = any(
        (r.get("user", {}) or {}).get("login", "") in GEMINI_BOT_AUTHORS
        for r in reviews
        if isinstance(r, dict)
    )

    if not gemini_review_present:
        return {
            "verdict": "FAIL",
            "reason": "Gemini review 부재: gemini-code-assist[bot] 리뷰 없음",
            "severity_counts": {"high": 0, "medium": 0, "low": 0, "critical": 0, "image_high": 0},
            "unresolved_threads": [],
            "gemini_review_present": False,
        }

    # 텍스트 합산 (review body + comment body + issue_comment body)
    all_texts: list[str] = []
    for r in reviews:
        if isinstance(r, dict):
            all_texts.append(r.get("body", "") or "")
    for c in pr_data.get("comments", []):
        if isinstance(c, dict):
            all_texts.append(c.get("body", "") or "")
    for ic in pr_data.get("issue_comments", []):
        if isinstance(ic, dict):
            all_texts.append(ic.get("body", "") or "")

    combined = "\n".join(all_texts)

    # severity 카운트 (텍스트 기반 — 간단한 패턴)
    sev_pattern = re.compile(
        r"(?:severity|priority)\s*[:=]\s*(high|medium|low|critical)|"
        r"!\[\s*(high|medium|low|critical)\s*\]\(",
        re.IGNORECASE,
    )
    sev_counts: dict[str, int] = {"high": 0, "medium": 0, "low": 0, "critical": 0}
    for m in sev_pattern.finditer(combined):
        sev = (m.group(1) or m.group(2) or "").lower()
        if sev in sev_counts:
            sev_counts[sev] += 1

    # image badge 탐지 — Gemini 리뷰 medium: sev_pattern이 이미 image badge를 포함하므로
    # detect_image_severity 결과는 summary 메트릭(image_high)으로만 사용. 중복 합산 제거.
    img_sev = detect_image_severity(combined)
    image_high = img_sev["high"] + img_sev["critical"]

    severity_counts = {**sev_counts, "image_high": image_high}

    # high/critical body hit → FAIL
    if sev_counts["high"] > 0 or sev_counts["critical"] > 0:
        return {
            "verdict": "FAIL",
            "reason": (
                f"high/critical severity 발견: high={sev_counts['high']}, "
                f"critical={sev_counts['critical']}"
            ),
            "severity_counts": severity_counts,
            "unresolved_threads": [],
            "gemini_review_present": True,
        }

    # unresolved thread 분류
    threads = pr_data.get("threads", [])
    unresolved_threads: list[dict] = []
    for t in threads:
        if isinstance(t, dict) and not t.get("isResolved", True):
            body = t.get("body", "") or ""
            # Gemini 리뷰 medium: re.search는 첫 매치만 → finditer로 모든 매치 본 뒤 최고 severity 채택.
            # severity 우선순위: critical > high > medium > low.
            severity_rank = {"low": 0, "medium": 1, "high": 2, "critical": 3}
            thread_sev = "low"
            best_rank = -1
            for m in re.finditer(
                r"(?:severity|priority)\s*[:=]\s*(high|medium|critical|low)|"
                r"!\[\s*(high|medium|critical|low)\s*\]\(",
                body,
                re.IGNORECASE,
            ):
                hit = (m.group(1) or m.group(2) or "low").lower()
                if severity_rank.get(hit, -1) > best_rank:
                    best_rank = severity_rank[hit]
                    thread_sev = hit
            # medium/high/critical → unresolved 목록에 추가
            if thread_sev in ("medium", "high", "critical"):
                unresolved_threads.append(
                    {"id": t.get("id", ""), "severity": thread_sev, "body": body[:200]}
                )

    # high/critical unresolved thread → FAIL
    high_crit_unresolved = [
        t for t in unresolved_threads if t["severity"] in ("high", "critical")
    ]
    if high_crit_unresolved:
        return {
            "verdict": "FAIL",
            "reason": f"unresolved high/critical thread {len(high_crit_unresolved)}건",
            "severity_counts": severity_counts,
            "unresolved_threads": unresolved_threads,
            "gemini_review_present": True,
        }

    # medium unresolved thread
    medium_unresolved = [
        t for t in unresolved_threads if t["severity"] == "medium"
    ]
    if medium_unresolved:
        if block_unresolved_medium:
            return {
                "verdict": "FAIL",
                "reason": f"unresolved medium thread {len(medium_unresolved)}건 (block_unresolved_medium=True)",
                "severity_counts": severity_counts,
                "unresolved_threads": unresolved_threads,
                "gemini_review_present": True,
            }
        else:
            return {
                "verdict": "RECOVERABLE_BLOCKED",
                "reason": f"unresolved medium thread {len(medium_unresolved)}건 (block_unresolved_medium=False → RECOVERABLE_BLOCKED)",
                "severity_counts": severity_counts,
                "unresolved_threads": unresolved_threads,
                "gemini_review_present": True,
            }

    return {
        "verdict": "PASS",
        "reason": "Gemini gate 통과: review 존재, unresolved medium+ thread 없음, high/critical 없음",
        "severity_counts": severity_counts,
        "unresolved_threads": [],
        "gemini_review_present": True,
    }


def record_gate_decision(
    *,
    task_id: str,
    pr_number: int,
    verdict: str,
    severity_counts: dict,
    unresolved_threads: list,
    gemini_review_present: bool,
    actor: str,
    reason: str,
    workspace: Optional[Path] = None,
) -> Path:
    """Gemini gate 판정 결과 audit 기록.

    memory/orchestration-audit/gemini-gate-decision.jsonl 에 line append.
    필수 필드 10개: task_id, pr_number, verdict, severity_counts,
    unresolved_threads, gemini_review_present, actor, reason, timestamp, evidence_hash
    """
    timestamp = _now_iso()
    base_record: dict = {
        "task_id": task_id,
        "pr_number": pr_number,
        "verdict": verdict,
        "severity_counts": severity_counts,
        "unresolved_threads": unresolved_threads,
        "gemini_review_present": gemini_review_present,
        "actor": actor,
        "reason": reason,
        "timestamp": timestamp,
    }
    ev_hash = _evidence_hash_str(base_record)
    record = {**base_record, "evidence_hash": ev_hash}

    work_root = _workspace_root(workspace)
    target = work_root / AUDIT_JSONL_REL
    target.parent.mkdir(parents=True, exist_ok=True)

    line = json.dumps(record, ensure_ascii=False) + "\n"
    fd = os.open(str(target), os.O_WRONLY | os.O_APPEND | os.O_CREAT, 0o644)
    try:
        os.write(fd, line.encode("utf-8"))
    finally:
        os.close(fd)

    return target
