"""utils/auto_gemini_triage.py — task-2511 5 모듈 #3.

회장 명시: Gemini review thread 자동 분류/resolve.
PR #61류 unresolved-thread blocker를 사람 개입 없이 자동 해소 가능하게 만든다.
산출물은 정책 문서가 아니라 실행 가능한 코드 + 회귀 테스트 16건.

Critical 7종 (contracts freeze 기준):
  1. FORBIDDEN_PATH_INTRUSION
  2. REPLACEMENT_PR_AUTO_CREATION_FAILED_FOR_CONTAMINATED_DIFF
  3. GEMINI_REAL_BUG_REQUIRES_SCOPE_EXPANSION
  4. BLOCK_OVERRIDE_REQUIRED_OR_REASON_INSUFFICIENT
  5. DEPENDENCY_CYCLE_OR_SERIAL_ONLY_COLLISION
  6. REPLACEMENT_PR_FAILED
  7. POST_MERGE_SMOKE_FAILED

7-state TriageVerdict:
  FALSE_POSITIVE / STYLE_ONLY / OUTDATED / CODE_ALREADY_FIXED /
  MINOR_FIX_ALLOWED / REAL_BUG_IN_SCOPE / REAL_BUG_SCOPE_EXPANSION

CLI:
  python3 utils/auto_gemini_triage.py --pr <N> [--dry-run | --apply]
      [--task-file <path>] [--fixture <path>]
  -> ReviewGateStatus JSON + exit code
  exit 0: all clean, exit 1: unresolved, exit 2: blocking
"""

from __future__ import annotations

import argparse
import json
import logging
import re
import subprocess
import sys
from dataclasses import asdict, dataclass
from enum import Enum
from pathlib import Path
from typing import Optional

from utils.automation_contracts import (  # pyright: ignore[reportMissingImports]
    CriticalEscalationType,
    EscalationPacket,
    GeminiStatus,
    GeminiTriageResult,
    ReviewGateStatus,
)

logger = logging.getLogger(__name__)

# ---------------------------------------------------------------------------
# 정규식 패턴 상수
# ---------------------------------------------------------------------------

# False positive 패턴
FALSE_POSITIVE_PATTERN = re.compile(
    r"grep|regex|wrapper|helper\s*pattern|false[-\s]?positive|이미\s*처리|정상\s*동작|의도적",
    re.IGNORECASE,
)

# Style only 패턴 (bug 단어 부재 조건과 함께)
STYLE_HINT_PATTERN = re.compile(
    r"naming|formatting|readability|consistency|code\s*style|style\s*only|nit:|optional:|f-string|\.format\(\)",
    re.IGNORECASE,
)

# Bug 관련 단어 (style only 판별 시 부재 확인)
BUG_WORD_PATTERN = re.compile(
    r"\bbug\b|\bcrash\b|\berror\b|\bfail\b|\bbroken\b|\bincorrect\b|\bwrong\b|\bnull\b|\bexception\b",
    re.IGNORECASE,
)

# file path 패턴 추출 (utils/foo.py, src/bar.py, **/*.py, dispatch.py 등 단순 파일명 포함)
FILE_PATH_PATTERN = re.compile(
    r"(?:[\w\-]+/)+[\w\-]+\.(?:py|js|ts|json|yaml|yml|sh|md|txt)|"
    r"\*\*/\*\.\w+|"
    r"\b[\w\-]+\.(?:py|js|ts|json|yaml|yml|sh)",
    re.IGNORECASE,
)

# 작은 수정 패턴 (minor fix)
MINOR_FIX_PATTERN = re.compile(
    r"bool\s*flag|변수명|rename|is_\w+|enabled|disabled|flag",
    re.IGNORECASE,
)

# hardcoded path 패턴 (절대 경로 또는 홈 디렉토리)
HARDCODED_PATH_PATTERN = re.compile(
    r"/home/[\w/\-\.]+|/usr/[\w/\-\.]+|/var/[\w/\-\.]+|C:\\[\w\\]+",
    re.IGNORECASE,
)

# commit에서 코드 변경 여부 확인용 (인용된 코드 fragment 패턴)
CODE_QUOTE_PATTERN = re.compile(
    r"```[\s\S]*?```|`[^`]+`|def\s+\w+\s*\(|class\s+\w+",
    re.DOTALL,
)

# 자동 처리 가능한 5종 verdict
AUTO_RESOLVABLE = frozenset([
    "FALSE_POSITIVE",
    "STYLE_ONLY",
    "OUTDATED",
    "CODE_ALREADY_FIXED",
    "MINOR_FIX_ALLOWED",
])


# ---------------------------------------------------------------------------
# Inner Enum: 7-state verdict (thread-level)
# ---------------------------------------------------------------------------

class TriageVerdict(str, Enum):
    FALSE_POSITIVE = "FALSE_POSITIVE"
    STYLE_ONLY = "STYLE_ONLY"
    OUTDATED = "OUTDATED"
    CODE_ALREADY_FIXED = "CODE_ALREADY_FIXED"
    MINOR_FIX_ALLOWED = "MINOR_FIX_ALLOWED"
    REAL_BUG_IN_SCOPE = "REAL_BUG_IN_SCOPE"
    REAL_BUG_SCOPE_EXPANSION = "REAL_BUG_SCOPE_EXPANSION"


# ---------------------------------------------------------------------------
# Inner dataclass: per-thread outcome
# ---------------------------------------------------------------------------

@dataclass
class ThreadTriageOutcome:
    thread_id: str
    verdict: TriageVerdict
    auto_resolved: bool
    dismiss_comment: str            # 1~3 문장, 분류 사유 + 증거
    escalation_type: Optional[CriticalEscalationType]
    evidence: dict                  # {"matched_pattern": ..., "outdated": bool, "fix_commit_sha": ..., ...}


# ---------------------------------------------------------------------------
# PR-level dataclass
# ---------------------------------------------------------------------------

@dataclass
class TriageReport:
    pr_number: int
    threads: list[ThreadTriageOutcome]
    unresolved_count: int
    auto_resolved_count: int
    blocking_thread_count: int       # REAL_BUG_SCOPE_EXPANSION + FORBIDDEN_PATH_INTRUSION
    merge_readiness: bool            # blocking_thread_count == 0
    review_gate_status: ReviewGateStatus      # contracts freeze
    triage_summary: GeminiTriageResult        # contracts freeze


# ---------------------------------------------------------------------------
# 핵심 함수: classify_thread
# ---------------------------------------------------------------------------

def classify_thread(
    thread: dict,
    pr_head_sha: str,
    fix_commits: list[dict],
    expected_files: list[str],
    forbidden_paths: list[str],
) -> ThreadTriageOutcome:
    """7-state classifier — 정규식 + outdated flag + commit body/diff 매칭.

    분류 우선순위 (top-down):
    1. isOutdated=True → OUTDATED
    2. forbidden path 매칭 → REAL_BUG_SCOPE_EXPANSION + FORBIDDEN_PATH_INTRUSION (보안 우선)
    3. false-positive 패턴 (regex/wrapper) 또는 hardcoded path 코멘트 → FALSE_POSITIVE
    4. style hint (bug 단어 부재) → STYLE_ONLY
    5. 후속 commit 매칭 → CODE_ALREADY_FIXED
    6. expected_files 차집합 (path 추출) → REAL_BUG_SCOPE_EXPANSION + GEMINI_REAL_BUG_REQUIRES_SCOPE_EXPANSION
    7. minor fix (expected_files 안) → MINOR_FIX_ALLOWED
    8. 기본 → REAL_BUG_IN_SCOPE

    Codex G1 권고 반영: false-positive/style/fixed 검사가 expected_files
    scope expansion 검사보다 먼저 실행되어, benign 코멘트가 단순 path 언급만으로
    Critical escalate되는 사고를 방지한다.
    """
    thread_id = thread.get("id", "")
    is_outdated = thread.get("isOutdated", False)
    comments = thread.get("comments", [])
    body = " ".join(c.get("body", "") for c in comments) if comments else ""

    # audit용 head SHA (모든 outcome.evidence에 prefix로 기록)
    head_sha_prefix = pr_head_sha[:8] if pr_head_sha else ""

    # 1. OUTDATED
    if is_outdated:
        return ThreadTriageOutcome(
            thread_id=thread_id,
            verdict=TriageVerdict.OUTDATED,
            auto_resolved=True,
            dismiss_comment=(
                f"[AUTO-OUTDATED] 이 스레드는 PR head 변경 후 outdated 처리되었습니다. "
                f"Evidence: isOutdated=True."
            ),
            escalation_type=None,
            evidence={"outdated": True, "matched_pattern": "isOutdated"},
        )

    # 2. forbidden path 매칭 (보안 — 항상 최우선)
    extracted_paths = FILE_PATH_PATTERN.findall(body)
    for path in extracted_paths:
        for fp in forbidden_paths:
            if fp in path or path in fp or re.search(re.escape(fp), path):
                return ThreadTriageOutcome(
                    thread_id=thread_id,
                    verdict=TriageVerdict.REAL_BUG_SCOPE_EXPANSION,
                    auto_resolved=False,
                    dismiss_comment=(
                        f"[AUTO-REAL_BUG_SCOPE_EXPANSION] forbidden path '{fp}' 수정 요구 감지. "
                        f"이 변경은 금지된 파일 경로에 영향을 미칩니다. "
                        f"Evidence: path={path}, forbidden={fp}."
                    ),
                    escalation_type=CriticalEscalationType.FORBIDDEN_PATH_INTRUSION,
                    evidence={
                        "matched_pattern": path,
                        "outdated": False,
                        "forbidden_path": fp,
                    },
                )

    # 3. FALSE_POSITIVE — 표준 패턴 또는 hardcoded path 코멘트 (PR #56 케이스)
    fp_matched = FALSE_POSITIVE_PATTERN.search(body)
    hardcoded_matched = HARDCODED_PATH_PATTERN.search(body)
    matched = fp_matched or hardcoded_matched
    if matched is not None:
        matched_text = matched.group()
        reason_kind = "regex/wrapper false-positive" if fp_matched else "hardcoded path 코멘트 (PR #56 패턴)"
        return ThreadTriageOutcome(
            thread_id=thread_id,
            verdict=TriageVerdict.FALSE_POSITIVE,
            auto_resolved=True,
            dismiss_comment=(
                f"[AUTO-FALSE_POSITIVE] {reason_kind}으로 판단됩니다. "
                f"Evidence: matched='{matched_text}'."
            ),
            escalation_type=None,
            evidence={
                "matched_pattern": matched_text,
                "matched_kind": "false_positive" if fp_matched else "hardcoded_path",
                "outdated": False,
            },
        )

    # 4. STYLE_ONLY
    if STYLE_HINT_PATTERN.search(body) and not BUG_WORD_PATTERN.search(body):
        matched = STYLE_HINT_PATTERN.search(body)
        return ThreadTriageOutcome(
            thread_id=thread_id,
            verdict=TriageVerdict.STYLE_ONLY,
            auto_resolved=True,
            dismiss_comment=(
                f"[AUTO-STYLE_ONLY] 코드 스타일 의견으로 semantic 동등 판단됩니다. "
                f"Evidence: matched='{matched.group() if matched else ''}'."
            ),
            escalation_type=None,
            evidence={
                "matched_pattern": matched.group() if matched else "",
                "outdated": False,
            },
        )

    # 5. CODE_ALREADY_FIXED — 후속 commit 매칭
    for commit in fix_commits:
        commit_sha = commit.get("sha", "")
        commit_msg = commit.get("message", "")
        commit_files = commit.get("files", [])

        # commit message에 thread body 키워드 포함 여부 (3자 이상 단어 매칭, 한국어 포함)
        body_words = [w.strip(":`'\",./()[]") for w in body.split() if len(w.strip(":`'\",./()[]")) >= 3]
        keyword_match = any(
            word.lower() in commit_msg.lower()
            for word in body_words[:10]
            if word
        )

        # 또는 comment에서 SHA 직접 언급
        sha_mentioned = bool(commit_sha) and commit_sha[:8] in body

        # 또는 commit files에 인용된 path 포함
        path_in_commit = any(
            any(cf in path or path in cf for cf in commit_files)
            for path in extracted_paths
        ) if extracted_paths and commit_files else False

        if sha_mentioned or keyword_match or path_in_commit:
            return ThreadTriageOutcome(
                thread_id=thread_id,
                verdict=TriageVerdict.CODE_ALREADY_FIXED,
                auto_resolved=True,
                dismiss_comment=(
                    f"[AUTO-CODE_ALREADY_FIXED] 이미 후속 commit {commit_sha[:8] if commit_sha else 'N/A'}으로 반영되었습니다. "
                    f"Evidence: commit_sha={commit_sha}, sha_mentioned={sha_mentioned}."
                ),
                escalation_type=None,
                evidence={
                    "fix_commit_sha": commit_sha,
                    "outdated": False,
                    "matched_pattern": "commit_body_keyword" if keyword_match else (
                        "sha_mentioned" if sha_mentioned else "path_in_commit"
                    ),
                },
            )

    # 6. expected_files 차집합 → REAL_BUG_SCOPE_EXPANSION (false-positive/style/fixed 제외 후)
    for path in extracted_paths:
        in_expected = any(
            ef in path or path in ef or path.endswith(ef) or ef.endswith(path)
            for ef in expected_files
        )
        if not in_expected and expected_files:
            return ThreadTriageOutcome(
                thread_id=thread_id,
                verdict=TriageVerdict.REAL_BUG_SCOPE_EXPANSION,
                auto_resolved=False,
                dismiss_comment=(
                    f"[AUTO-REAL_BUG_SCOPE_EXPANSION] expected_files 외 파일 '{path}' 수정 요구 감지. "
                    f"scope expansion이 필요한 변경입니다. "
                    f"Evidence: path={path}, expected_files={expected_files}."
                ),
                escalation_type=CriticalEscalationType.GEMINI_REAL_BUG_REQUIRES_SCOPE_EXPANSION,
                evidence={
                    "matched_pattern": path,
                    "outdated": False,
                    "expected_files": expected_files,
                },
            )

    # 7. MINOR_FIX_ALLOWED — expected_files 안 + 작은 수정
    if MINOR_FIX_PATTERN.search(body):
        # hardcoded path도 minor fix로 처리 (expected_files 안이면)
        in_scope = not extracted_paths or any(
            any(ef in path or path in ef for ef in expected_files)
            for path in extracted_paths
        )
        if in_scope or not expected_files:
            matched = MINOR_FIX_PATTERN.search(body)
            return ThreadTriageOutcome(
                thread_id=thread_id,
                verdict=TriageVerdict.MINOR_FIX_ALLOWED,
                auto_resolved=True,
                dismiss_comment=(
                    f"[AUTO-MINOR_FIX_ALLOWED] expected_files 범위 내 작은 수정으로 자동 처리됩니다. "
                    f"Evidence: matched='{matched.group() if matched else ''}'."
                ),
                escalation_type=None,
                evidence={
                    "matched_pattern": matched.group() if matched else "",
                    "outdated": False,
                },
            )

    # 8. 기본: REAL_BUG_IN_SCOPE
    return ThreadTriageOutcome(
        thread_id=thread_id,
        verdict=TriageVerdict.REAL_BUG_IN_SCOPE,
        auto_resolved=False,
        dismiss_comment=(
            f"[AUTO-REAL_BUG_IN_SCOPE] expected_files 범위 내 실제 버그로 판단됩니다. "
            f"수동 검토가 필요합니다."
        ),
        escalation_type=None,
        evidence={
            "outdated": False,
            "matched_pattern": "default",
            "pr_head_sha_prefix": head_sha_prefix,
        },
    )


# ---------------------------------------------------------------------------
# build_resolve_thread_args
# ---------------------------------------------------------------------------

def build_resolve_thread_args(thread_id: str) -> list[str]:
    """gh api graphql resolveReviewThread mutation subprocess args를 반환.

    GraphQL mutation은 thread_id만 필요. owner/repo는 GitHub GraphQL API에서
    요구하지 않으므로 인자에서 제외.

    반환 예: ["gh", "api", "graphql", "-f",
              "query=mutation { resolveReviewThread(input: {threadId: \\"...\\"})" ...}]
    """
    mutation = (
        f'mutation {{ resolveReviewThread(input: {{threadId: "{thread_id}"}}) '
        f'{{ thread {{ id isResolved }} }} }}'
    )
    return ["gh", "api", "graphql", "-f", f"query={mutation}"]


# ---------------------------------------------------------------------------
# build_dismiss_comment
# ---------------------------------------------------------------------------

def build_dismiss_comment(outcome: ThreadTriageOutcome) -> str:
    """1~3 문장 dismiss 코멘트 생성.

    형식: "[AUTO-{verdict}] <분류 사유>. Evidence: <evidence 요약>."
    """
    if outcome.dismiss_comment:
        return outcome.dismiss_comment

    verdict = outcome.verdict.value
    evidence_summary = ", ".join(
        f"{k}={v}" for k, v in outcome.evidence.items()
        if v is not None and v != ""
    )
    return (
        f"[AUTO-{verdict}] 자동 분류 결과. "
        f"Evidence: {evidence_summary}."
    )


# ---------------------------------------------------------------------------
# auto_resolve_threads
# ---------------------------------------------------------------------------

def auto_resolve_threads(
    outcomes: list[ThreadTriageOutcome],
    apply: bool = False,
) -> list[ThreadTriageOutcome]:
    """5종 (FALSE_POSITIVE / STYLE_ONLY / OUTDATED / CODE_ALREADY_FIXED / MINOR_FIX_ALLOWED) 자동 처리.

    apply=False (dry-run): subprocess args 합성만, 실제 호출 X.
    apply=True: subprocess.run으로 호출. retry 1회, timeout=30s.

    반환: outcome.auto_resolved=True 업데이트된 리스트.
    """
    result = []
    for outcome in outcomes:
        if outcome.verdict.value in AUTO_RESOLVABLE:
            args = build_resolve_thread_args(outcome.thread_id)
            logger.debug("resolve args: %s", args)

            # apply=True 시 실제 GraphQL 호출 결과를 auto_resolved에 반영.
            # 실패 시 auto_resolved=False (허위 성공 방지).
            apply_succeeded = True
            if apply:
                apply_succeeded = False
                for attempt in (1, 2):
                    try:
                        proc = subprocess.run(
                            args,
                            capture_output=True,
                            text=True,
                            timeout=30,
                        )
                        if proc.returncode == 0:
                            apply_succeeded = True
                            break
                        logger.warning(
                            "resolve thread %s failed (attempt=%d, rc=%d): %s",
                            outcome.thread_id,
                            attempt,
                            proc.returncode,
                            proc.stderr,
                        )
                    except subprocess.TimeoutExpired:
                        logger.error(
                            "resolve thread %s timed out (attempt=%d)",
                            outcome.thread_id,
                            attempt,
                        )
                    except (OSError, subprocess.SubprocessError) as e:
                        logger.error(
                            "resolve thread %s subprocess error (attempt=%d): %s",
                            outcome.thread_id,
                            attempt,
                            e,
                        )
                if not apply_succeeded:
                    logger.error(
                        "resolve thread %s failed after retry — auto_resolved=False",
                        outcome.thread_id,
                    )

            # auto_resolved 업데이트:
            #   dry-run(apply=False): 시뮬레이션이므로 True (회귀 테스트 목적)
            #   apply=True: subprocess 성공 시에만 True
            updated = ThreadTriageOutcome(
                thread_id=outcome.thread_id,
                verdict=outcome.verdict,
                auto_resolved=apply_succeeded,
                dismiss_comment=outcome.dismiss_comment or build_dismiss_comment(outcome),
                escalation_type=outcome.escalation_type,
                evidence={**outcome.evidence, "apply_succeeded": apply_succeeded if apply else None},
            )
            result.append(updated)
        else:
            result.append(outcome)

    return result


# ---------------------------------------------------------------------------
# build_review_gate_status
# ---------------------------------------------------------------------------

def build_review_gate_status(
    outcomes: list[ThreadTriageOutcome],
) -> ReviewGateStatus:
    """contracts freeze ReviewGateStatus 생성.

    gemini_status:
      - blocking 있으면 GEMINI_SCOPE_EXPANSION 또는 GEMINI_REAL_BUG
      - 모두 auto_resolved → GEMINI_COMPLETED
      - 일부 unresolved → GEMINI_UNRESOLVED
    unresolved_threads = blocking + 자동처리 외 잔여
    review_gate_passed = blocking_count == 0 AND unresolved_count == 0
      (REAL_BUG_IN_SCOPE 잔여가 있으면 후속 wiring이 머지하지 않도록 차단)
    """
    blocking = [
        o for o in outcomes
        if o.verdict == TriageVerdict.REAL_BUG_SCOPE_EXPANSION
    ]
    auto_resolved = [o for o in outcomes if o.auto_resolved]
    unresolved = [
        o for o in outcomes
        if not o.auto_resolved
    ]
    blocking_count = len(blocking)
    unresolved_count = len(unresolved)

    if blocking_count > 0:
        # scope expansion 여부로 분기
        has_forbidden = any(
            o.escalation_type == CriticalEscalationType.FORBIDDEN_PATH_INTRUSION
            for o in blocking
        )
        gemini_status = GeminiStatus.GEMINI_SCOPE_EXPANSION if has_forbidden or any(
            o.escalation_type == CriticalEscalationType.GEMINI_REAL_BUG_REQUIRES_SCOPE_EXPANSION
            for o in blocking
        ) else GeminiStatus.GEMINI_REAL_BUG
        reason = f"blocking_thread_count={blocking_count}, REAL_BUG_SCOPE_EXPANSION 존재"
    elif unresolved_count > 0:
        gemini_status = GeminiStatus.GEMINI_UNRESOLVED
        reason = f"unresolved_threads={unresolved_count}"
    else:
        gemini_status = GeminiStatus.GEMINI_COMPLETED
        reason = f"all auto_resolved={len(auto_resolved)}"

    # review_gate_passed는 blocking 0 AND unresolved 0 일 때만 True.
    # (merge_readiness는 별도로 blocking 기준만 사용 — TriageReport.merge_readiness)
    review_gate_passed = blocking_count == 0 and unresolved_count == 0

    return ReviewGateStatus(
        gemini_status=gemini_status,
        unresolved_threads=unresolved_count,
        fallback_review_used=False,
        fallback_review_passed=False,
        review_gate_passed=review_gate_passed,
        reason=reason,
    )


# ---------------------------------------------------------------------------
# build_triage_summary
# ---------------------------------------------------------------------------

def build_triage_summary(outcomes: list[ThreadTriageOutcome]) -> GeminiTriageResult:
    """contracts freeze GeminiTriageResult로 집계 변환.

    false_positive_count, style_only_count: 직접 매핑
    real_bug_small_count = REAL_BUG_IN_SCOPE + MINOR_FIX_ALLOWED
    scope_expansion_count = REAL_BUG_SCOPE_EXPANSION (forbidden 포함)
    unresolved_count = blocking + 잔여
    actions_taken = ["resolved:<thread_id>:<verdict>", ...]
    """
    false_positive_count = sum(
        1 for o in outcomes if o.verdict == TriageVerdict.FALSE_POSITIVE
    )
    style_only_count = sum(
        1 for o in outcomes if o.verdict == TriageVerdict.STYLE_ONLY
    )
    real_bug_small_count = sum(
        1 for o in outcomes
        if o.verdict in (TriageVerdict.REAL_BUG_IN_SCOPE, TriageVerdict.MINOR_FIX_ALLOWED)
    )
    scope_expansion_count = sum(
        1 for o in outcomes if o.verdict == TriageVerdict.REAL_BUG_SCOPE_EXPANSION
    )
    unresolved_count = sum(1 for o in outcomes if not o.auto_resolved)
    actions_taken = [
        f"resolved:{o.thread_id}:{o.verdict.value}"
        for o in outcomes
        if o.auto_resolved
    ]

    # status 결정
    if scope_expansion_count > 0:
        status = GeminiStatus.GEMINI_SCOPE_EXPANSION
    elif unresolved_count > 0:
        status = GeminiStatus.GEMINI_UNRESOLVED
    else:
        status = GeminiStatus.GEMINI_COMPLETED

    return GeminiTriageResult(
        status=status,
        false_positive_count=false_positive_count,
        style_only_count=style_only_count,
        real_bug_small_count=real_bug_small_count,
        scope_expansion_count=scope_expansion_count,
        unresolved_count=unresolved_count,
        actions_taken=actions_taken,
    )


# ---------------------------------------------------------------------------
# detect_scope_expansion
# ---------------------------------------------------------------------------

def detect_scope_expansion(
    outcomes: list[ThreadTriageOutcome],
    task_id: str,
    pr_number: int,
) -> Optional[EscalationPacket]:
    """Critical #1/#3 escalation packet 생성. blocking 있으면 첫 case 기준 packet 반환.
    blocking 없으면 None.
    """
    blocking = [
        o for o in outcomes
        if o.verdict == TriageVerdict.REAL_BUG_SCOPE_EXPANSION
    ]
    if not blocking:
        return None

    first = blocking[0]
    escalation_type = first.escalation_type or CriticalEscalationType.GEMINI_REAL_BUG_REQUIRES_SCOPE_EXPANSION

    return EscalationPacket(
        task_id=task_id,
        pr_number=pr_number,
        escalation_type=escalation_type,
        reason=(
            f"PR #{pr_number}에서 scope expansion 감지: "
            f"thread_id={first.thread_id}, verdict={first.verdict.value}"
        ),
        why_auto_cannot_continue=(
            "REAL_BUG_SCOPE_EXPANSION 또는 FORBIDDEN_PATH_INTRUSION이 존재하여 "
            "자동 머지를 진행할 수 없습니다."
        ),
        safe_options=[
            "thread를 수동 검토 후 close",
            "scope 조정 후 PR 재제출",
            "회장 직접 승인",
        ],
        recommended_option="thread를 수동 검토 후 close",
        evidence={
            "blocking_thread_count": len(blocking),
            "first_thread_id": first.thread_id,
            "first_verdict": first.verdict.value,
            "first_escalation": (
                escalation_type.value if escalation_type else None
            ),
        },
    )


# ---------------------------------------------------------------------------
# triage_pr
# ---------------------------------------------------------------------------

def triage_pr(
    pr_number: int,
    threads: list[dict],
    pr_head_sha: str,
    fix_commits: list[dict],
    expected_files: list[str],
    forbidden_paths: list[str],
    apply: bool = False,
    task_id: str = "",
) -> TriageReport:
    """orchestration. 모든 thread classify → auto_resolve → ReviewGateStatus 생성 → TriageReport 반환."""
    # isResolved=True인 스레드는 스킵 (caller 책임)
    active_threads = [t for t in threads if not t.get("isResolved", False)]

    # classify
    outcomes: list[ThreadTriageOutcome] = []
    for thread in active_threads:
        outcome = classify_thread(
            thread=thread,
            pr_head_sha=pr_head_sha,
            fix_commits=fix_commits,
            expected_files=expected_files,
            forbidden_paths=forbidden_paths,
        )
        outcomes.append(outcome)

    # auto resolve
    outcomes = auto_resolve_threads(outcomes, apply=apply)

    # counts
    auto_resolved_count = sum(1 for o in outcomes if o.auto_resolved)
    blocking = [o for o in outcomes if o.verdict == TriageVerdict.REAL_BUG_SCOPE_EXPANSION]
    blocking_thread_count = len(blocking)
    unresolved = [o for o in outcomes if not o.auto_resolved]
    unresolved_count = len(unresolved)
    merge_readiness = blocking_thread_count == 0

    # build ReviewGateStatus
    review_gate_status = build_review_gate_status(outcomes)

    # build GeminiTriageResult
    triage_summary = build_triage_summary(outcomes)

    # blocking이 있으면 escalation packet 생성 + audit 로그
    escalation_packet = detect_scope_expansion(outcomes, task_id=task_id, pr_number=pr_number)
    if escalation_packet is not None:
        logger.warning(
            "Critical escalation detected: pr=%s task=%s type=%s reason=%s",
            pr_number,
            task_id,
            escalation_packet.escalation_type.value,
            escalation_packet.reason,
        )

    return TriageReport(
        pr_number=pr_number,
        threads=outcomes,
        unresolved_count=unresolved_count,
        auto_resolved_count=auto_resolved_count,
        blocking_thread_count=blocking_thread_count,
        merge_readiness=merge_readiness,
        review_gate_status=review_gate_status,
        triage_summary=triage_summary,
    )


# ---------------------------------------------------------------------------
# to_legacy_gemini_state — task-2514 wiring 호환
# ---------------------------------------------------------------------------

def to_legacy_gemini_state(report: TriageReport) -> dict:
    """merge_queue_executor가 기존 호출하는 dict 인터페이스로 변환.

    {
      "status": "critical_scope_expansion" | "auto_triage_candidate" | "completed",
      "unresolved": [thread_id, ...],
      "auto_resolved_count": int,
      "blocking_thread_count": int,
      "merge_readiness": bool,
    }
    """
    if report.blocking_thread_count > 0:
        status = "critical_scope_expansion"
    elif report.unresolved_count > 0:
        status = "auto_triage_candidate"
    else:
        status = "completed"

    unresolved_ids = [
        o.thread_id
        for o in report.threads
        if not o.auto_resolved
    ]

    return {
        "status": status,
        "unresolved": unresolved_ids,
        "auto_resolved_count": report.auto_resolved_count,
        "blocking_thread_count": report.blocking_thread_count,
        "merge_readiness": report.merge_readiness,
    }


# ---------------------------------------------------------------------------
# CLI helpers
# ---------------------------------------------------------------------------

def _parse_expected_files_from_task(task_file: str) -> tuple[list[str], list[str]]:
    """task md에서 expected_files / forbidden 파싱.

    allowed_resources 블록 또는 expected_files 블록 대상.
    """
    expected: list[str] = []
    forbidden: list[str] = []
    try:
        content = Path(task_file).read_text(encoding="utf-8")
        # expected_files 블록
        ef_match = re.search(
            r"expected_files:\s*\n((?:\s+-\s+.+\n?)+)",
            content,
        )
        if ef_match:
            for line in ef_match.group(1).splitlines():
                line = line.strip().lstrip("-").strip().strip('"')
                if line:
                    expected.append(line)

        # forbidden_actions 블록에서 파일 경로 추출
        fa_match = re.search(
            r"forbidden_actions:\s*\n((?:\s+-\s+.+\n?)+)",
            content,
        )
        if fa_match:
            for line in fa_match.group(1).splitlines():
                line = line.strip().lstrip("-").strip().strip('"')
                # 파일 경로처럼 보이는 것만
                if "/" in line or line.endswith(".py"):
                    forbidden.append(line)
    except Exception as exc:
        logger.warning("task file parse failed: %s", exc)
    return expected, forbidden


_FETCH_QUERY = """
query($pr: Int!, $owner: String!, $repo: String!) {
  repository(owner: $owner, name: $repo) {
    pullRequest(number: $pr) {
      headRefOid
      reviewThreads(first: 100) {
        nodes {
          id
          isOutdated
          isResolved
          comments(first: 50) { nodes { body } }
        }
      }
      commits(last: 30) {
        nodes { commit { oid message } }
      }
    }
  }
}
"""


def _detect_owner_repo() -> tuple[str, str]:
    """현재 git remote에서 owner/repo 추출. 실패 시 RuntimeError."""
    try:
        proc = subprocess.run(
            ["gh", "repo", "view", "--json", "owner,name"],
            capture_output=True,
            text=True,
            timeout=15,
        )
    except (OSError, subprocess.SubprocessError) as exc:
        raise RuntimeError(f"gh repo view 실패: {exc}") from exc
    if proc.returncode != 0:
        raise RuntimeError(
            f"gh repo view failed (rc={proc.returncode}): {proc.stderr.strip()}"
        )
    try:
        info = json.loads(proc.stdout)
        return info["owner"]["login"], info["name"]
    except (KeyError, ValueError) as exc:
        raise RuntimeError(f"gh repo view 응답 파싱 실패: {exc}") from exc


def _fetch_pr_threads(pr_number: int) -> tuple[list[dict], list[dict], str]:
    """실제 GitHub GraphQL로 PR reviewThreads + recent commits + headRefOid를 fetch.

    실패 시 RuntimeError를 발생 — caller가 보수적으로 처리해야 함 (clean 오판 방지).
    owner/repo는 `gh repo view`로 동적 추출.
    """
    owner, repo = _detect_owner_repo()
    args = [
        "gh", "api", "graphql",
        "-F", f"pr={pr_number}",
        "-F", f"owner={owner}",
        "-F", f"repo={repo}",
        "-f", f"query={_FETCH_QUERY}",
    ]
    try:
        proc = subprocess.run(args, capture_output=True, text=True, timeout=30)
    except (OSError, subprocess.SubprocessError) as exc:
        raise RuntimeError(f"gh api graphql 호출 실패: {exc}") from exc

    if proc.returncode != 0:
        raise RuntimeError(
            f"gh api graphql failed (rc={proc.returncode}): {proc.stderr.strip()}"
        )

    try:
        payload = json.loads(proc.stdout)
        pr_data = payload["data"]["repository"]["pullRequest"]
        head_sha = pr_data.get("headRefOid", "") or ""
        thread_nodes = pr_data["reviewThreads"]["nodes"]
        threads = [
            {
                "id": t["id"],
                "isOutdated": t.get("isOutdated", False),
                "isResolved": t.get("isResolved", False),
                "comments": [
                    {"body": c.get("body", "")}
                    for c in t.get("comments", {}).get("nodes", [])
                ],
            }
            for t in thread_nodes
        ]
        commit_nodes = pr_data["commits"]["nodes"]
        fix_commits = [
            {
                "sha": c["commit"]["oid"],
                "message": c["commit"]["message"],
                "files": [],
            }
            for c in commit_nodes
        ]
        return threads, fix_commits, head_sha
    except (KeyError, ValueError) as exc:
        raise RuntimeError(f"gh api graphql 응답 파싱 실패: {exc}") from exc


def main() -> None:
    parser = argparse.ArgumentParser(
        description="auto_gemini_triage — Gemini review thread 자동 분류/resolve"
    )
    parser.add_argument("--pr", type=int, required=True, help="PR number")
    parser.add_argument("--dry-run", action="store_true", default=True, help="dry-run mode (default)")
    parser.add_argument("--apply", action="store_true", default=False, help="실제 GraphQL 호출")
    parser.add_argument("--task-file", type=str, default="", help="task md 파일 경로")
    parser.add_argument("--fixture", type=str, default="", help="JSON fixture 파일 경로")
    args = parser.parse_args()

    apply = args.apply

    # expected_files / forbidden
    expected_files: list[str] = []
    forbidden_paths: list[str] = []
    if args.task_file:
        expected_files, forbidden_paths = _parse_expected_files_from_task(args.task_file)

    # threads / fix_commits / pr_head_sha
    fetch_failed = False
    fetch_error_reason = ""
    if args.fixture:
        try:
            fixture_data = json.loads(Path(args.fixture).read_text(encoding="utf-8"))
            threads = fixture_data.get("threads", [])
            fix_commits = fixture_data.get("fix_commits", [])
            pr_head_sha = fixture_data.get("pr_head_sha", "")
        except (OSError, ValueError) as exc:
            logger.error("fixture load failed: %s", exc)
            fetch_failed = True
            fetch_error_reason = f"fixture load failed: {exc}"
            threads, fix_commits, pr_head_sha = [], [], ""
    else:
        # fixture 미지정 — 실제 gh api 호출. 실패 시 보수적 unresolved 신호.
        try:
            threads, fix_commits, pr_head_sha = _fetch_pr_threads(args.pr)
        except RuntimeError as exc:
            logger.error("PR fetch failed: %s", exc)
            fetch_failed = True
            fetch_error_reason = str(exc)
            threads, fix_commits, pr_head_sha = [], [], ""

    report = triage_pr(
        pr_number=args.pr,
        threads=threads,
        pr_head_sha=pr_head_sha,
        fix_commits=fix_commits,
        expected_files=expected_files,
        forbidden_paths=forbidden_paths,
        apply=apply,
        task_id="",
    )

    # 출력: ReviewGateStatus JSON + TriageReport 요약
    output = {
        "pr_number": report.pr_number,
        "fetch_failed": fetch_failed,
        "fetch_error_reason": fetch_error_reason,
        "review_gate_status": asdict(report.review_gate_status),
        "triage_summary": asdict(report.triage_summary),
        "auto_resolved_count": report.auto_resolved_count,
        "unresolved_count": report.unresolved_count,
        "blocking_thread_count": report.blocking_thread_count,
        "merge_readiness": report.merge_readiness,
        "legacy": to_legacy_gemini_state(report),
    }
    print(json.dumps(output, indent=2, ensure_ascii=False))

    # exit code
    # fetch 실패는 보수적으로 unresolved 신호로 취급 (CLEAN 오판 방지)
    if fetch_failed:
        sys.exit(1)
    if report.blocking_thread_count > 0:
        sys.exit(2)
    if report.unresolved_count > 0:
        sys.exit(1)
    sys.exit(0)


if __name__ == "__main__":
    main()
