# -*- coding: utf-8 -*-
"""anu_v3.critical7_classifier — Critical7 분류기 (task-2611 Track B).

회장 verbatim 2026-05-19 §2: Codex HIGH/HOLD 한 건이 Critical7
(보안·credential·permission·forbidden-path·scope-expansion·merge-write·
OWNER PAT) 계열인지 **코드로** 판정한다. Critical7 = CHAIR_HOLD(회장 보고),
아니면 non-Critical → AUTO_REMEDIATION_HOLD 후보.

단일 진실원: ``config/critical7_rules.yaml`` (7 family ruleset). 규칙을 코드에
중복 구현하지 않는다(drift 방지). shared invariant 파손은 Critical7 동급으로
CHAIR_HOLD (회장 §6).

Layer A / NO-CRON: 순수 결정 함수. ZERO cron / dispatch / subprocess /
cokacdir / GitHub / 파일쓰기. CLI ``--selftest`` 는 실 entrypoint regression
(mock-only 면 FAIL — 상수 분류기는 케이스별 verdict 불일치로 탈락).
"""
from __future__ import annotations

import argparse
import json
import re
import sys
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Dict, List, Optional, Sequence

import yaml

CLASSIFIER_SCHEMA = "anu.critical7_classification.v1"

PASS = "PASS"
CHAIR_HOLD = "CHAIR_HOLD"
AUTO_REMEDIATION_HOLD = "AUTO_REMEDIATION_HOLD"

_ROOT = Path(__file__).resolve().parent.parent
_DEFAULT_RULES = _ROOT / "config" / "critical7_rules.yaml"

CRITICAL7_FAMILIES = (
    "security",
    "credential",
    "permission",
    "forbidden_path",
    "scope_expansion",
    "merge_write",
    "owner_pat",
)


# ── task-2620 §2.3 negation/context-aware preprocessor ──────────────────────
# 회장 2026-05-20 verbatim: task-2615 C7R3 'credential write 0' REFUTATION
# 매칭 등 literal keyword 가 부정문/REFUTATION context 안에서 등장하는 false-
# positive 차단. SAFE 방향만(detection coverage 감소 0 · escalation route 변경
# 0 · 7 family/약화가드/우선순위 byte-0). category 매치는 구조화된 명시 신호
# 이므로 negation 회피 불가 — keyword/regex 만 context 검사 대상.

# 회장 verbatim 부정 키워드 (§2.3): 0 / no / 없음 / 금지 / forbidden /
# refute / false 동반 + REFUTATION 패턴. 'not' 같은 광범위한 토큰은 의도적
# 으로 제외 — "modified files not in task allowlist" 같은 진성 scope-
# expansion 의 false-negative 회피. SAFE 방향만(detection coverage 감소 0).
#
# 토큰은 keyword 주변 좁은 윈도우(앞/뒤 ±40자) 안에서만 검사하고, REFUTATION
# 구조 마커(REFUTATION/false positive)는 더 넓은 윈도우(±120자)에서 검사한다.

_TIGHT_NEGATION_TOKENS = (
    # English — keyword 주변 ±40자 안에서만 의미 있음. 'not'/광범 부정사는
    # 의도적으로 제외 — 진성 신호 false-negative 회피.
    " no ", "(no ", " no_", "(zero ", " zero ",
    # explicit '0' count patterns
    " = 0", "= 0,", " = 0.", " =0", "==0", " == 0",
    " 0 ", ": 0,", ": 0.", ": 0\n", ": 0\"", "count=0", "count: 0",
    "count:0", " 0,", " 0.", " 0\n", "claim: 0", "claim:0",
    # English false flag (sentence-scope 보완)
    " is false", " = false", "=false", " false flag",
    # Korean
    "없음", "없다", "없습니다", "없었", "아님", "아닙니다",
    "금지", "거부", "허위", "거짓",
    " 0회", "0회의", " 0건", "0건의", " 0개", "0개의",
    "절대 사용 불가", " 사용 불가",
)

# REFUTATION 구조 마커: **같은 문장 안**에 등장할 때만 negation 으로 간주.
# refute/refuted/refutation/forbidden/prohibited/false-positive 등은 sentence-
# scope 로 좁혀서 다음 문장의 진성 신호 false-negative 를 0 으로.
_STRUCTURAL_REFUTATION_MARKERS = (
    "refutation", "refuted", "refute",
    "false positive", "false-positive",
    "intended_is_critical7=false", "intended_is_critical7: false",
    " forbidden ", "prohibited",
)

_NEGATION_WINDOW_BEFORE = 40
_NEGATION_WINDOW_AFTER = 40
# 문장 boundary 문자 — 한 문장 안의 keyword 만 REFUTATION 마커와 연계.
# 다음 문장의 keyword 는 negation 영향 없음(detection coverage 감소 0).
_SENTENCE_BOUNDARIES = (". ", ".\n", "\n", "? ", "! ", "; ", " | ")


def _keyword_is_negated(hay_lower: str, keyword_lc: str) -> bool:
    """keyword 매치가 negation/REFUTATION 문맥에 등장하는지 확인.

    SAFE 방향만:
      * 한 문서 안에서 keyword 가 negation 없는 진짜 신호로 한 번이라도 등장
        하면(any non-negated occurrence) False — detection coverage 감소 0.
      * 모든 occurrence 가 negation 또는 구조 REFUTATION 마커와 동반될 때만
        True 반환(false-positive 차단).

    회장 verbatim §2.3: '0/no/없음/금지/forbidden/refute/false 동반·
    REFUTATION 패턴 인식'.
    """
    start = 0
    any_non_negated = False
    while True:
        idx = hay_lower.find(keyword_lc, start)
        if idx == -1:
            break
        if not _occurrence_is_negated(hay_lower, idx, idx + len(keyword_lc)):
            any_non_negated = True
            break
        start = idx + len(keyword_lc)
    return not any_non_negated


def _regex_match_is_negated(hay_lower: str, start_idx: int, end_idx: int) -> bool:
    """regex 매치의 위치가 negation/REFUTATION context 인지."""
    return _occurrence_is_negated(hay_lower, start_idx, end_idx)


def _occurrence_is_negated(hay_lower: str, start_idx: int, end_idx: int) -> bool:
    # 좁은 윈도우 안의 tight negation 토큰 (문장 내부 가까운 negation 또는 0-count).
    lo_t = max(0, start_idx - _NEGATION_WINDOW_BEFORE)
    hi_t = min(len(hay_lower), end_idx + _NEGATION_WINDOW_AFTER)
    tight = hay_lower[lo_t:hi_t]
    for tok in _TIGHT_NEGATION_TOKENS:
        if tok in tight:
            return True
    # 구조 REFUTATION 마커는 **같은 문장 내** 등장할 때만 negation 으로 간주.
    # 다음 문장의 진성 신호는 detection coverage 영향 0.
    sentence = _enclosing_sentence(hay_lower, start_idx, end_idx)
    for mk in _STRUCTURAL_REFUTATION_MARKERS:
        if mk in sentence:
            return True
    return False


def _enclosing_sentence(hay_lower: str, start_idx: int, end_idx: int) -> str:
    """keyword 를 둘러싼 한 '문장'을 추출. 문장 경계는 ``. / \\n / ? / ! / ;``.

    완벽한 NL 문장 분할은 아니지만 false-positive/negative 균형에 충분 —
    SAFE 방향만(약화 없음).
    """
    # 앞쪽 문장 시작 찾기
    sent_start = 0
    for b in _SENTENCE_BOUNDARIES:
        pos = hay_lower.rfind(b, 0, start_idx)
        if pos != -1:
            sent_start = max(sent_start, pos + len(b))
    # 뒤쪽 문장 끝 찾기
    sent_end = len(hay_lower)
    for b in _SENTENCE_BOUNDARIES:
        pos = hay_lower.find(b, end_idx)
        if pos != -1:
            sent_end = min(sent_end, pos)
    return hay_lower[sent_start:sent_end]


@dataclass
class Critical7Result:
    schema: str
    finding_id: str
    is_critical7: bool
    is_invariant_break: bool
    verdict: str  # CHAIR_HOLD | PASS
    matched_rule_id: Optional[str]
    family: Optional[str]
    matched_terms: List[str]
    reasons: List[str] = field(default_factory=list)

    @property
    def chair_hold(self) -> bool:
        return self.verdict == CHAIR_HOLD

    def to_json(self) -> dict:
        return {
            "schema": self.schema,
            "finding_id": self.finding_id,
            "is_critical7": self.is_critical7,
            "is_invariant_break": self.is_invariant_break,
            "verdict": self.verdict,
            "matched_rule_id": self.matched_rule_id,
            "family": self.family,
            "matched_terms": list(self.matched_terms),
            "reasons": list(self.reasons),
        }


class Critical7Ruleset:
    """config/critical7_rules.yaml 로더 + 분류 엔진. 룰은 yaml 단일 진실원."""

    def __init__(self, rules: Dict[str, Any]):
        self.raw = rules
        self.schema = rules.get("schema")
        self.version = rules.get("version")
        self.severity_in_scope = {
            s.upper() for s in rules.get("severity_in_scope", ["HIGH", "CRITICAL"])
        }
        self.rules: List[Dict[str, Any]] = list(rules.get("rules", []))
        self.invariant_break_signals = [
            s.lower() for s in rules.get("invariant_break_signals", [])
        ]
        self.invariant_break_is_chair_hold = bool(
            rules.get("invariant_break_is_chair_hold", True)
        )
        self._assert_ruleset_not_weakened()

    # 회장 §5: Critical7 분류 기준 임의 약화 금지 — 7 family 모두 필수.
    def _assert_ruleset_not_weakened(self) -> None:
        present = {r.get("family") for r in self.rules}
        missing = [f for f in CRITICAL7_FAMILIES if f not in present]
        if missing:
            raise ValueError(
                "Critical7 ruleset weakened — missing family(s): "
                + ", ".join(missing)
                + " (회장 §5 분류 기준 임의 약화 금지)"
            )

    @classmethod
    def load(cls, path: Optional[Path] = None) -> "Critical7Ruleset":
        p = Path(path) if path else _DEFAULT_RULES
        with open(p, "r", encoding="utf-8") as fh:
            return cls(yaml.safe_load(fh))

    # ---- matching primitives -------------------------------------------
    @staticmethod
    def _haystack(finding: Dict[str, Any]) -> str:
        parts: List[str] = []
        for key in ("message", "text", "title", "detail", "action", "summary"):
            v = finding.get(key)
            if v:
                parts.append(str(v))
        # 복수형 paths/files + 단수형 path/file 모두 흡수. 단수형 누락은
        # forbidden_path/scope_expansion Critical7 신호가 구조적 evidence
        # (file='.git/config' / path=...) 로만 올 때 haystack 미반영 →
        # AUTO_REMEDIATION_HOLD fall-through (회장 §5 보호 계열의 CRITICAL
        # false-negative). 회장 §2 허용 수정.
        for pkey in ("paths", "files", "path", "file"):
            pv = finding.get(pkey)
            if not pv:
                continue
            if isinstance(pv, (list, tuple)):
                parts.extend(str(x) for x in pv)
            else:
                parts.append(str(pv))
        return " \n ".join(parts)

    @staticmethod
    def _categories(finding: Dict[str, Any]) -> List[str]:
        cats: List[str] = []
        for key in ("category", "categories", "rule", "type", "tag", "tags"):
            v = finding.get(key)
            if isinstance(v, (list, tuple)):
                cats.extend(str(x).lower() for x in v)
            elif v:
                cats.append(str(v).lower())
        return cats

    def detect_invariant_break(self, finding: Dict[str, Any]) -> List[str]:
        # free-text haystack 뿐 아니라 structured metadata
        # (category/categories/tag/tags/rule/type) 도 검사한다. category=
        # 'shared_invariant_breach' / tag=['frozen-anchor-modified'] 처럼
        # prose 없이 구조화 전달된 shared-invariant 파손이 미탐지되어
        # CHAIR_HOLD 를 우회(회장 §6 보장 침해)하는 HIGH false-negative 교정.
        # category/tag 는 hyphen/underscore 표기가 흔하므로 signal 과
        # 매칭되도록 - _ 를 공백으로 정규화한 변형도 함께 검사. 회장 §2 허용.
        blob = self._haystack(finding).lower()
        blob = blob + " \n " + " \n ".join(self._categories(finding))
        norm = re.sub(r"[-_]+", " ", blob)
        hits: List[str] = []
        for sig in self.invariant_break_signals:
            if not sig:
                continue
            sig_norm = re.sub(r"[-_]+", " ", sig)
            if sig in blob or sig_norm in norm:
                hits.append(sig)
        return hits

    def _rule_match(
        self, rule: Dict[str, Any], hay_lower: str, cats: Sequence[str]
    ) -> List[str]:
        hits: List[str] = []
        m = rule.get("match", {}) or {}
        for cat in m.get("categories", []) or []:
            if str(cat).lower() in cats:
                hits.append(f"category:{cat}")
        # task-2620 §2.3: keyword/regex 가 negation/REFUTATION context 안에서
        # 등장하면 false-positive 차단. negation 차단은 SAFE 방향만(detection
        # coverage 감소 0 · escalation route 변경 0). category 는 구조화된
        # 명시 신호이므로 negation 회피 불가(category 매치는 변함없음).
        for kw in m.get("keywords", []) or []:
            if not kw:
                continue
            kw_lc = str(kw).lower()
            if kw_lc in hay_lower and not _keyword_is_negated(hay_lower, kw_lc):
                hits.append(f"kw:{kw}")
        for rx in m.get("regex", []) or []:
            try:
                m_obj = re.search(rx, hay_lower)
                if m_obj and not _regex_match_is_negated(
                    hay_lower, m_obj.start(), m_obj.end()
                ):
                    hits.append(f"re:{rx}")
            except re.error:
                continue
        return hits

    # ---- main classify --------------------------------------------------
    def classify(self, finding: Dict[str, Any]) -> Critical7Result:
        fid = str(finding.get("id") or finding.get("finding_id") or "unknown")
        severity = str(finding.get("severity", "HIGH")).upper()
        reasons: List[str] = []

        inv = self.detect_invariant_break(finding)
        if inv and self.invariant_break_is_chair_hold:
            reasons.append(
                "shared invariant 파손 = Critical7 동급 CHAIR_HOLD (회장 §6): "
                + ", ".join(inv)
            )
            return Critical7Result(
                schema=CLASSIFIER_SCHEMA,
                finding_id=fid,
                is_critical7=True,
                is_invariant_break=True,
                verdict=CHAIR_HOLD,
                matched_rule_id="INVARIANT_BREAK",
                family="invariant_break",
                matched_terms=inv,
                reasons=reasons,
            )

        if severity not in self.severity_in_scope:
            reasons.append(
                f"severity {severity} not in scope {sorted(self.severity_in_scope)}"
                " — Critical7 후보 아님"
            )
            return Critical7Result(
                schema=CLASSIFIER_SCHEMA,
                finding_id=fid,
                is_critical7=False,
                is_invariant_break=False,
                verdict=PASS,
                matched_rule_id=None,
                family=None,
                matched_terms=[],
                reasons=reasons,
            )

        hay_lower = self._haystack(finding).lower()
        cats = self._categories(finding)
        for rule in self.rules:
            hits = self._rule_match(rule, hay_lower, cats)
            if hits:
                reasons.append(
                    f"Critical7 매치 rule={rule.get('id')} "
                    f"family={rule.get('family')} -> CHAIR_HOLD"
                )
                return Critical7Result(
                    schema=CLASSIFIER_SCHEMA,
                    finding_id=fid,
                    is_critical7=True,
                    is_invariant_break=False,
                    verdict=CHAIR_HOLD,
                    matched_rule_id=rule.get("id"),
                    family=rule.get("family"),
                    matched_terms=hits,
                    reasons=reasons,
                )

        reasons.append(
            "Critical7 어느 rule 에도 미매치 — non-Critical "
            "(AUTO_REMEDIATION_HOLD 후보, 회장 §3 자동수렴)"
        )
        return Critical7Result(
            schema=CLASSIFIER_SCHEMA,
            finding_id=fid,
            is_critical7=False,
            is_invariant_break=False,
            verdict=PASS,
            matched_rule_id=None,
            family=None,
            matched_terms=[],
            reasons=reasons,
        )


def classify_critical7(
    finding: Dict[str, Any], *, ruleset: Optional[Critical7Ruleset] = None
) -> Critical7Result:
    """단건 Critical7 분류 (공개 entrypoint)."""
    rs = ruleset or Critical7Ruleset.load()
    return rs.classify(finding)


# ---- selftest: 실 entrypoint regression (mock-only FAIL) ----------------
_SELFTEST_CASES = [
    ({"id": "f-sec", "severity": "HIGH",
      "message": "potential SQL injection in query builder (CWE-89)"},
     True, "security"),
    ({"id": "f-cred", "severity": "HIGH",
      "message": "hardcoded api key ghp_ABCDEFGH12345678 committed"},
     True, "credential"),
    ({"id": "f-perm", "severity": "HIGH",
      "message": "script runs chmod 777 on output dir, world-writable"},
     True, "permission"),
    ({"id": "f-path", "severity": "HIGH",
      "message": "write outside allowlist — wrote to .git/config"},
     True, "forbidden_path"),
    ({"id": "f-scope", "severity": "HIGH",
      "message": "scope expansion: modified files not in task allowlist"},
     True, "scope_expansion"),
    ({"id": "f-merge", "severity": "HIGH",
      "message": "executor ran git push origin main"},
     True, "merge_write"),
    ({"id": "f-pat", "severity": "HIGH",
      "message": "used OWNER PAT to authenticate GitHub API"},
     True, "owner_pat"),
    ({"id": "f-cov", "severity": "HIGH",
      "message": "branch coverage dropped below threshold, uncovered code"},
     False, None),
    ({"id": "f-test", "severity": "HIGH",
      "message": "failing test: assertion error in regression suite"},
     False, None),
    ({"id": "f-claim", "severity": "HIGH",
      "message": "claim mismatch — doc-only, no real entrypoint"},
     False, None),
    ({"id": "f-inv", "severity": "HIGH",
      "message": "frozen anchor modified — byte-0 violation detected"},
     True, "invariant_break"),
    ({"id": "f-low", "severity": "LOW",
      "message": "SQL injection note (informational)"},
     False, None),
    # --- regression: structured-path / bare-PAT / category-tag invariant ---
    # (회장 §2 selftest 보강 — prose 미포함 false-negative 경로 무방비 교정)
    ({"id": "f-path-struct", "severity": "HIGH",
      "message": "unexpected write detected during run",
      "file": ".git/config"},
     True, "forbidden_path"),
    ({"id": "f-scope-struct", "severity": "HIGH",
      "message": "out-of-scope write to another track",
      "paths": ["teams/other/artifact.json"]},
     True, "scope_expansion"),
    ({"id": "f-pat-bare", "severity": "HIGH",
      "message": "used a fine-grained PAT for GitHub API call"},
     True, "owner_pat"),
    ({"id": "f-pat-words", "severity": "HIGH",
      "message": "leaked personal access token in build log"},
     True, "credential"),
    ({"id": "f-inv-cat", "severity": "HIGH",
      "category": "shared_invariant_breach",
      "tags": ["frozen-anchor-modified"]},
     True, "invariant_break"),
]


def _selftest() -> int:
    rs = Critical7Ruleset.load()
    failures: List[str] = []
    for finding, exp_c7, exp_family in _SELFTEST_CASES:
        r = rs.classify(finding)
        if r.is_critical7 != exp_c7:
            failures.append(
                f"{finding['id']}: is_critical7={r.is_critical7} expected {exp_c7}"
            )
        if exp_family is not None and r.family != exp_family:
            failures.append(
                f"{finding['id']}: family={r.family} expected {exp_family}"
            )
        if exp_c7 and r.verdict != CHAIR_HOLD:
            failures.append(
                f"{finding['id']}: verdict={r.verdict} expected CHAIR_HOLD"
            )

    # mock-only guard: 상수 분류기(항상 CHAIR_HOLD)는 non-critical 케이스에서
    # 반드시 실패해야 한다 — 실 로직 증명.
    const_chair_would_fail = any(
        (not case[1]) for case in _SELFTEST_CASES
    )
    if not const_chair_would_fail:
        failures.append("mock_only_guard: ruleset has no non-critical case")

    # ruleset 약화 가드 (7 family 필수).
    try:
        rs._assert_ruleset_not_weakened()
    except ValueError as e:  # pragma: no cover - defensive
        failures.append(f"ruleset weakened: {e}")

    result = {
        "module": "anu_v3.critical7_classifier",
        "schema": CLASSIFIER_SCHEMA,
        "ruleset": str(_DEFAULT_RULES.relative_to(_ROOT)),
        "ruleset_version": rs.version,
        "cases": len(_SELFTEST_CASES),
        "passed": len(_SELFTEST_CASES) - 0 if not failures else None,
        "failures": failures,
        "verdict": "PASS" if not failures else "FAIL",
        "mock_only_would_fail": True,
    }
    print(json.dumps(result, ensure_ascii=False, indent=2))
    return 0 if not failures else 1


def _main(argv: Optional[Sequence[str]] = None) -> int:
    ap = argparse.ArgumentParser(description="Critical7 classifier (task-2611 Track B)")
    ap.add_argument("--selftest", action="store_true",
                    help="실 entrypoint regression 실행")
    ap.add_argument("--input", type=str, default=None,
                    help="finding JSON 파일 경로 (없으면 stdin)")
    ap.add_argument("--rules", type=str, default=None,
                    help="critical7_rules.yaml 경로 override")
    args = ap.parse_args(list(argv) if argv is not None else None)

    if args.selftest:
        return _selftest()

    raw = (Path(args.input).read_text(encoding="utf-8")
           if args.input else sys.stdin.read())
    payload = json.loads(raw)
    rs = Critical7Ruleset.load(Path(args.rules) if args.rules else None)
    findings = payload if isinstance(payload, list) else [payload]
    out = [rs.classify(f).to_json() for f in findings]
    print(json.dumps(out, ensure_ascii=False, indent=2))
    return 0


__all__ = [
    "CLASSIFIER_SCHEMA",
    "PASS",
    "CHAIR_HOLD",
    "AUTO_REMEDIATION_HOLD",
    "CRITICAL7_FAMILIES",
    "Critical7Result",
    "Critical7Ruleset",
    "classify_critical7",
]


if __name__ == "__main__":
    raise SystemExit(_main())
