"""anu_v2.auto_gemini_triage — ANU v2 Gemini review evidence 자동 분류기 v0 (task-2538).

회장 §명시 (2026-05-10) 4 분류:
  1. false_positive            → dismiss (회귀 fixture 매칭)
  2. style_only                → dismiss (코드 동작 무관)
  3. minor_fix_in_scope        → expected_files 내부 자동 적용
  4. scope_expansion           → Critical 7종 #N 보고

설계 원칙:
  - one-way isolation: anu_v2/* 만 import. utils/dispatch/scripts/dashboard 의존성 0.
  - 외부 부수효과 (audit / file write) 는 주입 가능한 callable 로 추상화.
  - executor (task-2531) 인터페이스 contract: triage_batch → {"applied", "dismissed", "escalated"}.
  - chat_id tag + filter 로 다른 chat record 노출 차단 (default chat=6937032012).
  - token raw 0: classify/triage 결과/audit record 어디에도 BOT_GITHUB_TOKEN 등 raw 토큰 미노출.
"""

from __future__ import annotations

import re
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Any, Callable, Iterable, Mapping, Sequence


# ─── Action codes (4 분류) ───────────────────────────────────────────────────
ACTION_DISMISS_FALSE_POSITIVE = "dismiss_false_positive"
ACTION_DISMISS_STYLE_ONLY = "dismiss_style_only"
ACTION_AUTO_APPLY_MINOR_FIX = "auto_apply_minor_fix"
ACTION_ESCALATE_SCOPE_EXPANSION = "escalate_scope_expansion"

ACTIONS: frozenset[str] = frozenset({
    ACTION_DISMISS_FALSE_POSITIVE,
    ACTION_DISMISS_STYLE_ONLY,
    ACTION_AUTO_APPLY_MINOR_FIX,
    ACTION_ESCALATE_SCOPE_EXPANSION,
})


# ─── Critical 7종 (회장 §, task-2531 박제 코드 재참조) ───────────────────────
CRITICAL_GEMINI_SCOPE_EXPANSION = "GEMINI_REAL_BUG_SCOPE_EXPANSION"


# ─── Severity / Category 정규화 ──────────────────────────────────────────────
SEVERITY_HIGH = "high"
SEVERITY_MEDIUM = "medium"
SEVERITY_LOW = "low"

CATEGORY_SECURITY = "security"
CATEGORY_STYLE = "style"
CATEGORY_BUG = "bug"
CATEGORY_PERFORMANCE = "performance"
CATEGORY_DOCS = "docs"

# 코드 동작 무관 — style-only 로 dismiss 가능한 카테고리.
# `docs` 도 코드 동작 무관이므로 함께 포함. classify_evidence 의 ② Security-High
# 우선 분기가 high severity 케이스를 먼저 차단하므로, severity 가 high 인 docs 는
# style_only 분류에 도달하지 못한다 (low/medium docs 만 dismiss 됨).
STYLE_ONLY_CATEGORIES: frozenset[str] = frozenset({CATEGORY_STYLE, CATEGORY_DOCS})

# Token / secret 관련 키 (raw 노출 차단 대상). lowercase 비교.
TOKEN_KEY_HINTS: tuple[str, ...] = (
    "github_token", "bot_github_token", "gh_token", "owner_pat",
    "ghp_", "ghs_", "github_pat_",  # 토큰 prefix (Personal Access Token / App / Server)
    "x-api-key", "authorization", "secret", "password",
)

# 회장 §명시 default chat — 다른 chat record 노출 차단의 기준값.
DEFAULT_CHAT_ID = "6937032012"


# ─── Data types ──────────────────────────────────────────────────────────────
@dataclass(frozen=True)
class FalsePositiveFixture:
    """false_positive 박제 fixture.

    - rule_id: Gemini rule code (예: `unused-import`, `redundant-cast`).
    - signature: optional finding 본문 signature 요약 (regex). None 이면 rule_id 만 매칭.
    - reason: dismiss 사유 기록 (audit 용).
    """
    rule_id: str
    signature: str | None = None
    reason: str = ""


@dataclass
class TriageResult:
    """triage_batch 결과 — executor 인터페이스 contract 의 raw container.

    serialize() 호출 시 회장 §명시 3 키 (applied / dismissed / escalated) 만 노출.
    """
    applied: list[dict[str, Any]] = field(default_factory=list)
    dismissed: list[dict[str, Any]] = field(default_factory=list)
    escalated: list[dict[str, Any]] = field(default_factory=list)

    def serialize(self) -> dict[str, list[dict[str, Any]]]:
        return {
            "applied": list(self.applied),
            "dismissed": list(self.dismissed),
            "escalated": list(self.escalated),
        }

    @property
    def has_escalation(self) -> bool:
        return bool(self.escalated)


# ─── 외부 부수효과 콜백 시그니처 ─────────────────────────────────────────────
AuditWriter = Callable[[Mapping[str, Any]], None]
FixApplier = Callable[[Mapping[str, Any], str], bool]   # (finding, target_file) → success?


def _default_fix_applier(finding: Mapping[str, Any], target_file: str) -> bool:
    """기본 fix_applier — 본 v0 는 분류만 보장하고 실제 patch 적용 자체는 호출부가 담당.

    분류 결과가 minor_fix_in_scope 인 finding 은 호출부가 별도 패치 적용 후 결과를
    fix_applier 로 리포트한다. default 는 항상 성공으로 가정 (테스트에서 실패 mock 주입).
    인자는 시그니처 유지를 위해 받기만 하고, 본 default 는 부수효과 없이 True 반환.
    """
    # 인자 미사용 — 시그니처 유지 목적. (분석기 노이즈 방지 명시 read.)
    _ = (finding, target_file)
    return True


# ─── Helpers ────────────────────────────────────────────────────────────────
def _now_iso() -> str:
    return datetime.now(timezone.utc).isoformat(timespec="seconds")


def _normalize_severity(value: Any) -> str:
    """severity 값을 lowercase 로 정규화. 미지정 = "" (low 와 구분)."""
    if value is None:
        return ""
    text = str(value).strip().lower()
    return text


def _normalize_category(value: Any) -> str:
    if value is None:
        return ""
    return str(value).strip().lower()


def _redact_tokens(value: Any) -> Any:
    """dict / list / str 안에 박혀있을 수 있는 raw 토큰 / secret 흔적 제거.

    - dict key 가 token hint 매치 → 값 `***REDACTED***`.
    - str 값에 `ghp_` / `ghs_` prefix 가 보이면 `***REDACTED***`.
    - 재귀 처리 (list/Mapping/tuple 중첩 허용).
    - dict 의 key 자체에도 토큰이 박혀 있을 수 있어 _redact_tokens 재귀 적용
      (Gemini 6차 Security-High 박제 — key 를 통한 token raw 누출 차단).
    - OrderedDict / TypedDict 등 dict 가 아닌 Mapping 도 동일하게 처리.
    """
    if isinstance(value, Mapping):
        out: dict[Any, Any] = {}
        for k, v in value.items():
            # key 자체에도 redact 적용 (예: key='ghp_TOKEN_NAME' 같은 누출 케이스).
            k_redacted = _redact_tokens(k)
            key_lower = str(k).lower() if k is not None else ""
            if any(hint in key_lower for hint in TOKEN_KEY_HINTS):
                out[k_redacted] = "***REDACTED***"
            else:
                out[k_redacted] = _redact_tokens(v)
        return out
    if isinstance(value, (list, set, frozenset)):
        # set/frozenset 도 list 로 평탄화하여 token raw 0 원칙 보장
        # (Gemini 7차 medium #1 박제 — JSON 직렬화 호환).
        return [_redact_tokens(v) for v in value]
    if isinstance(value, tuple):
        return tuple(_redact_tokens(v) for v in value)
    if isinstance(value, str):
        # raw token prefix 직접 노출되면 마스킹.
        # 컴파일된 IGNORECASE 정규식 1회 스캔으로 lower() 복사본 생성 + 다중 substring 루프 제거
        # (Gemini 5차 medium 박제 최적화).
        if _TOKEN_VALUE_RE.search(value):
            return "***REDACTED***"
        return value
    return value


# 문자열 값 안에서 직접 검출하는 토큰 prefix 셋. dict 키 검사용 `TOKEN_KEY_HINTS` 와 별개로,
# 값 마스킹 휴리스틱용 별도 상수 + 컴파일된 IGNORECASE 정규식.
_TOKEN_VALUE_PREFIXES: tuple[str, ...] = ("ghp_", "ghs_", "github_pat_")
_TOKEN_VALUE_RE = re.compile(
    "|".join(re.escape(p) for p in _TOKEN_VALUE_PREFIXES),
    re.IGNORECASE,
)


def _glob_match(pattern: str, path: str) -> bool:
    """forbidden_paths / expected_files glob 매칭 (task-2531 동일 규칙).

    `**/foo.py` 형태가 루트 파일 `foo.py` 도 매칭하도록 `**/` 를 우선 처리.
    """
    escaped = re.escape(pattern)
    regex = (
        "^"
        + escaped.replace(r"\*\*\/", r"(?:.*/)?")
                 .replace(r"\*\*/", r"(?:.*/)?")
                 .replace(r"\*\*", ".*")
                 .replace(r"\*", "[^/]*")
        + "$"
    )
    return re.match(regex, path) is not None


# ─── 본체 ────────────────────────────────────────────────────────────────────
class AutoGeminiTriage:
    """ANU v2 Gemini review evidence 자동 분류기 v0.

    회장 §명시 4 분류:
      - false_positive: 회귀 fixture 매칭 → dismiss
      - style_only: 코드 동작 무관 → dismiss
      - minor_fix_in_scope: expected_files 내부 → 자동 적용
      - scope_expansion: expected_files 밖 → Critical 7종 #N 보고

    executor (task-2531) 인터페이스 contract:
      input: `gemini_findings: list[dict]`, `expected_files: set`
      output: `{"applied": list, "dismissed": list, "escalated": list}`
    """

    def __init__(
        self,
        *,
        audit_writer: AuditWriter,
        fix_applier: FixApplier | None = None,
        false_positive_fixtures: Iterable[FalsePositiveFixture] = (),
        chat_id: str = DEFAULT_CHAT_ID,
        task_id: str = "unknown",
    ) -> None:
        self._audit = audit_writer
        # default fix_applier 는 항상 성공 (테스트에서 명시적으로 실패 mock 주입).
        self._fix_applier: FixApplier = fix_applier if fix_applier is not None else _default_fix_applier
        self._fixtures: tuple[FalsePositiveFixture, ...] = tuple(false_positive_fixtures)
        self._chat_id = str(chat_id)
        self._task_id = str(task_id)

    # ── ① classify_evidence ────────────────────────────────────────────────
    def classify_evidence(
        self,
        gemini_finding: Mapping[str, Any],
        expected_files: Iterable[str],
    ) -> tuple[str, dict[str, Any]]:
        """Gemini finding 1건 분류 → (action, details).

        action ∈ ACTIONS.

        분류 우선순위 (회장 §):
          1. false_positive fixture 매칭 → dismiss_false_positive (rule_id 일치 검사)
          2. severity=high & category=security → in-scope 면 minor_fix, 아니면 escalate
             (Security-High 는 style 분류와 무관하게 우선 적용, 단 scope 내부일 때만)
          3. style_only → dismiss_style_only
          4. in-scope minor_fix → auto_apply_minor_fix
          5. out-of-scope → escalate_scope_expansion
        """
        # 이미 set 인 경우 (예: triage_batch 에서 1회 정규화 후 핫패스 호출) 재정규화 생략.
        expected_set = (
            expected_files
            if isinstance(expected_files, set)
            else self._normalize_expected(expected_files)
        )
        rule_id = str(gemini_finding.get("rule_id", "")).strip()
        severity = _normalize_severity(gemini_finding.get("severity"))
        category = _normalize_category(gemini_finding.get("category"))
        finding_path = str(gemini_finding.get("path", "")).strip()

        # ── ① false_positive fixture 우선 ──────────────────────────────────
        fp = self._match_false_positive(rule_id, gemini_finding)
        if fp is not None:
            return (
                ACTION_DISMISS_FALSE_POSITIVE,
                {
                    "rule_id": rule_id,
                    "fixture_reason": fp.reason,
                    "path": finding_path,
                },
            )

        # ── ② Security-High 우선 처리 (scope 검사 후 분기) ─────────────────
        is_security_high = severity == SEVERITY_HIGH and category == CATEGORY_SECURITY
        if is_security_high:
            if self.is_in_scope(finding_path, expected_set):
                return (
                    ACTION_AUTO_APPLY_MINOR_FIX,
                    {
                        "rule_id": rule_id,
                        "severity": severity,
                        "category": category,
                        "path": finding_path,
                        "reason": "security_high_in_scope",
                    },
                )
            return (
                ACTION_ESCALATE_SCOPE_EXPANSION,
                {
                    "rule_id": rule_id,
                    "severity": severity,
                    "category": category,
                    "path": finding_path,
                    "reason": "security_high_out_of_scope",
                },
            )

        # ── ③ style_only dismiss (severity 가 high 가 아닐 때만) ─────────────
        if category in STYLE_ONLY_CATEGORIES and severity != SEVERITY_HIGH:
            return (
                ACTION_DISMISS_STYLE_ONLY,
                {
                    "rule_id": rule_id,
                    "category": category,
                    "path": finding_path,
                },
            )

        # ── ④ scope check ──────────────────────────────────────────────────
        if self.is_in_scope(finding_path, expected_set):
            return (
                ACTION_AUTO_APPLY_MINOR_FIX,
                {
                    "rule_id": rule_id,
                    "severity": severity,
                    "category": category,
                    "path": finding_path,
                    "reason": "minor_fix_in_scope",
                },
            )

        # ── ⑤ out-of-scope → escalate ──────────────────────────────────────
        return (
            ACTION_ESCALATE_SCOPE_EXPANSION,
            {
                "rule_id": rule_id,
                "severity": severity,
                "category": category,
                "path": finding_path,
                "reason": "scope_expansion_required",
            },
        )

    # ── ② is_in_scope ──────────────────────────────────────────────────────
    def is_in_scope(
        self,
        finding_path: str,
        expected_files: Iterable[str],
    ) -> bool:
        """finding 의 path 가 expected_files 집합 내부인지 검증.

        - exact match 와 glob match 모두 허용 (`**` / `*`).
        - finding_path 가 비어 있으면 False (path 미지정 → 자동 적용 불가).
        - expected_files 가 이미 정규화된 set 인 경우 재정규화 생략 (triage_batch
          루프 내부 호출 핫패스 — Gemini medium #3 박제 최적화).
        """
        if not finding_path:
            return False
        expected_set = (
            expected_files
            if isinstance(expected_files, set)
            else self._normalize_expected(expected_files)
        )
        if not expected_set:
            return False
        if finding_path in expected_set:
            return True
        for pattern in expected_set:
            if _glob_match(pattern, finding_path):
                return True
        return False

    # ── ③ apply_minor_fix ──────────────────────────────────────────────────
    def apply_minor_fix(
        self,
        finding: Mapping[str, Any],
        target_file: str,
    ) -> dict[str, Any]:
        """minor fix 자동 적용 — expected_files 내부만 (호출부에서 scope 보장 가정).

        - fix_applier callable 호출. 성공 시 applied=True, 실패 시 escalate=True 로 전환.
        - 결과 record 는 token raw 0 (redact) 보장.
        - audit 기록: `kind=auto_apply_minor_fix`.
        """
        ts = _now_iso()
        try:
            ok = bool(self._fix_applier(dict(finding), target_file))
        except Exception as exc:  # noqa: BLE001 — 어떤 예외든 escalate 로 안전 전환
            ok = False
            err_msg = f"{type(exc).__name__}: {exc}"
        else:
            err_msg = ""

        record = {
            "ts": ts,
            "kind": "auto_apply_minor_fix" if ok else "auto_apply_failed_escalated",
            "task_id": self._task_id,
            "chat_id": self._chat_id,
            "rule_id": str(finding.get("rule_id", "")),
            "target_file": target_file,
            "applied": ok,
            "escalated": not ok,
            "error": err_msg,
            "finding": _redact_tokens(dict(finding)),
        }
        self._audit(record)
        return record

    # ── ④ escalate_scope_expansion ─────────────────────────────────────────
    def escalate_scope_expansion(
        self,
        finding: Mapping[str, Any],
    ) -> dict[str, Any]:
        """scope 확장 요구 → Critical 7종 #N 보고 record 생성.

        - critical_code = `GEMINI_REAL_BUG_SCOPE_EXPANSION` (task-2531 정의 재참조).
        - audit 기록: `kind=scope_expansion_critical`.
        - 회장 §명시: 본 record 가 곧 회장 보고 source-of-truth.
        """
        ts = _now_iso()
        record = {
            "ts": ts,
            "kind": "scope_expansion_critical",
            "task_id": self._task_id,
            "chat_id": self._chat_id,
            "critical_code": CRITICAL_GEMINI_SCOPE_EXPANSION,
            "rule_id": str(finding.get("rule_id", "")),
            "severity": _normalize_severity(finding.get("severity")),
            "category": _normalize_category(finding.get("category")),
            "path": str(finding.get("path", "")),
            "finding": _redact_tokens(dict(finding)),
        }
        self._audit(record)
        return record

    # ── ⑤ triage_batch (executor 인터페이스 contract) ──────────────────────
    def triage_batch(
        self,
        gemini_findings: Sequence[Mapping[str, Any]],
        expected_files: Iterable[str],
    ) -> dict[str, list[dict[str, Any]]]:
        """task-2531 executor 인터페이스 contract.

        input  : `gemini_findings: list[dict]`, `expected_files: set`
        output : `{"applied": list, "dismissed": list, "escalated": list}`

        escalated 비어있으면 호출부 자동 진행, 비어있지 않으면 Critical 보고.
        """
        result = TriageResult()
        expected_set = self._normalize_expected(expected_files)

        for raw_finding in gemini_findings:
            finding = dict(raw_finding)  # 외부 mutation 방지
            action, details = self.classify_evidence(finding, expected_set)
            base = {
                "action": action,
                "rule_id": details.get("rule_id", ""),
                "path": details.get("path", ""),
                "details": _redact_tokens(details),
            }

            if action == ACTION_DISMISS_FALSE_POSITIVE:
                result.dismissed.append({**base, "reason": "false_positive"})
                self._audit({
                    "ts": _now_iso(),
                    "kind": "dismiss_false_positive",
                    "task_id": self._task_id,
                    "chat_id": self._chat_id,
                    "rule_id": base["rule_id"],
                })
            elif action == ACTION_DISMISS_STYLE_ONLY:
                result.dismissed.append({**base, "reason": "style_only"})
                self._audit({
                    "ts": _now_iso(),
                    "kind": "dismiss_style_only",
                    "task_id": self._task_id,
                    "chat_id": self._chat_id,
                    "rule_id": base["rule_id"],
                })
            elif action == ACTION_AUTO_APPLY_MINOR_FIX:
                applied = self.apply_minor_fix(finding, base["path"])
                if applied.get("applied"):
                    result.applied.append({**base, "target_file": base["path"]})
                else:
                    # apply 실패 → escalate 전환 (안전한 fallback).
                    escalation = self.escalate_scope_expansion({
                        **finding,
                        "auto_apply_failed": True,
                    })
                    result.escalated.append({
                        **base,
                        "action": ACTION_ESCALATE_SCOPE_EXPANSION,
                        "reason": "auto_apply_failed",
                        "critical_code": escalation["critical_code"],
                    })
            else:  # ACTION_ESCALATE_SCOPE_EXPANSION
                escalation = self.escalate_scope_expansion(finding)
                result.escalated.append({
                    **base,
                    "critical_code": escalation["critical_code"],
                    "reason": details.get("reason", "scope_expansion"),
                })

        return result.serialize()

    # ── chat-isolated audit 조회 헬퍼 (task-2531 격리 패턴) ────────────────
    def list_chat_audit(
        self,
        all_records: Iterable[Mapping[str, Any]],
    ) -> list[dict[str, Any]]:
        """주어진 audit record stream 에서 본 인스턴스 chat_id 의 record 만 반환.

        회장 §명시 chat=6937032012 격리 — 다른 chat record 노출 0.
        - 동일 chat_id 만 통과
        - chat_id 누락 record 는 차단 (보수적)
        - 추가로 token redact 한 번 더 적용 (이중 안전)
        """
        kept: list[dict[str, Any]] = []
        for rec in all_records:
            cid = str(rec.get("chat_id", ""))
            if cid != self._chat_id:
                continue
            kept.append(_redact_tokens(dict(rec)))
        return kept

    # ─── internal helpers ─────────────────────────────────────────────────
    def _normalize_expected(self, expected_files: Iterable[str]) -> set[str]:
        return {str(p).strip() for p in expected_files if str(p).strip()}

    def _match_false_positive(
        self,
        rule_id: str,
        finding: Mapping[str, Any],
    ) -> FalsePositiveFixture | None:
        """fixture 매칭 — rule_id 일치 + (signature 있으면) regex 매칭."""
        if not rule_id:
            return None
        # body 문자열은 signature 매칭이 실제 필요한 시점에만 생성 (Gemini 4차 medium 박제 lazy init).
        body: str | None = None
        for fp in self._fixtures:
            if fp.rule_id != rule_id:
                continue
            if fp.signature is None:
                return fp
            # 빈 문자열 signature 는 re.search 가 항상 매칭하므로 명시적 가드
            # (Gemini 3차 medium #2 박제 — None vs "" 분리).
            if not fp.signature:
                continue
            if body is None:
                body = str(finding.get("body", "")) + " " + str(finding.get("title", ""))
            try:
                if re.search(fp.signature, body):
                    return fp
            except re.error:
                # 잘못된 regex fixture 는 audit 후 skip
                # (Gemini 7차 medium #2 박제 — 설정 오류 가시화).
                self._audit({
                    "ts": _now_iso(),
                    "kind": "fixture_regex_error",
                    "rule_id": rule_id,
                    "fixture_signature": fp.signature,
                })
                continue
        return None
