# -*- coding: utf-8 -*-
"""anu_v3.codex_high_classifier — Codex HIGH/HOLD batch 분류기 (task-2611 Track B).

회장 verbatim §1/§2: 병렬 작업 중 Codex HIGH/HOLD 가 발생해도 batch 전체
context 를 모아 Critical7 인지 non-critical remediation 인지 자동 분류한다.
Codex HIGH 를 security/scope/credential 계열과 coverage/test/claim mismatch
계열로 분류 — Critical7 = CHAIR_HOLD, 아니면 AUTO_REMEDIATION_HOLD 후보.

본 분류기 출력 = Track A ``anu_v3/batch_hold_adjudicator.py``(task-2610) 입력.
출력 schema = ``anu.codex_high_classification.v1``.

분류 우선순위 (deterministic, 회장 §3/§5 약화 금지):
  1) shared invariant 파손  -> CHAIR_HOLD
  2) Critical7 매치          -> CHAIR_HOLD
  3) remediation_family      -> AUTO_REMEDIATION_HOLD
  4) 그 외 HIGH/HOLD          -> AUTO_REMEDIATION_HOLD (high_unmatched, §3 자동수렴)

단일 진실원: ``config/codex_high_classification_rules.yaml`` +
``config/critical7_rules.yaml`` (anu_v3.critical7_classifier 재사용 — 규칙
중복 금지).

Layer A / NO-CRON: 순수 결정. ZERO cron / dispatch / subprocess / cokacdir /
파일쓰기. CLI ``--selftest`` 는 실 entrypoint regression (mock-only FAIL).
"""
from __future__ import annotations

import argparse
import json
import sys
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Dict, List, Optional, Sequence

import yaml

_ROOT = Path(__file__).resolve().parent.parent
if str(_ROOT) not in sys.path:
    sys.path.insert(0, str(_ROOT))

try:
    from anu_v3.critical7_classifier import (  # noqa: E402
        CHAIR_HOLD,
        Critical7Result,
        Critical7Ruleset,
    )
except ImportError:  # direct-script invocation fallback
    from critical7_classifier import (  # type: ignore  # noqa: E402
        CHAIR_HOLD,
        Critical7Result,
        Critical7Ruleset,
    )

BATCH_SCHEMA = "anu.codex_high_classification.v1"

AUTO_REMEDIATION_HOLD = "AUTO_REMEDIATION_HOLD"
GO = "GO"

_DEFAULT_RULES = _ROOT / "config" / "codex_high_classification_rules.yaml"


@dataclass
class FindingClassification:
    finding_id: str
    severity: str
    family: str            # security|credential|...|coverage|test_failure|...
    route: str             # CHAIR_HOLD | AUTO_REMEDIATION_HOLD
    is_critical7: bool
    is_invariant_break: bool
    critical7: dict        # Critical7Result.to_json()
    matched_terms: List[str]
    reasons: List[str] = field(default_factory=list)

    def to_json(self) -> dict:
        return {
            "finding_id": self.finding_id,
            "severity": self.severity,
            "family": self.family,
            "route": self.route,
            "is_critical7": self.is_critical7,
            "is_invariant_break": self.is_invariant_break,
            "critical7": self.critical7,
            "matched_terms": list(self.matched_terms),
            "reasons": list(self.reasons),
        }


@dataclass
class BatchClassification:
    schema: str
    batch_id: str
    total_findings: int
    in_scope_findings: int
    any_critical7: bool
    any_invariant_break: bool
    batch_verdict: str     # CHAIR_HOLD | AUTO_REMEDIATION_HOLD | GO
    critical7_rules_hit: List[str]
    chair_hold_findings: List[str]
    remediation_candidates: List[str]
    per_finding: List[FindingClassification]
    reasons: List[str] = field(default_factory=list)

    def to_json(self) -> dict:
        return {
            "schema": self.schema,
            "batch_id": self.batch_id,
            "total_findings": self.total_findings,
            "in_scope_findings": self.in_scope_findings,
            "any_critical7": self.any_critical7,
            "any_invariant_break": self.any_invariant_break,
            "batch_verdict": self.batch_verdict,
            "critical7_rules_hit": list(self.critical7_rules_hit),
            "chair_hold_findings": list(self.chair_hold_findings),
            "remediation_candidates": list(self.remediation_candidates),
            "per_finding": [f.to_json() for f in self.per_finding],
            "reasons": list(self.reasons),
            # Track A batch_hold_adjudicator 입력 계약 명시.
            "batch_hold_adjudicator_input": {
                "consumer": "anu_v3/batch_hold_adjudicator.py (task-2610 Track A)",
                "verdict": self.batch_verdict,
                "chair_hold": self.batch_verdict == CHAIR_HOLD,
                "auto_remediation_candidates": list(self.remediation_candidates),
            },
        }


class CodexHighClassifier:
    """codex_high_classification_rules.yaml + critical7_rules.yaml 분류 엔진."""

    def __init__(
        self,
        rules: Dict[str, Any],
        critical7: Optional[Critical7Ruleset] = None,
    ):
        self.raw = rules
        self.schema = rules.get("schema")
        self.version = rules.get("version")
        self.severity_in_scope = {
            s.upper() for s in rules.get("severity_in_scope", ["HIGH", "CRITICAL"])
        }
        self.remediation_families: Dict[str, Any] = rules.get(
            "remediation_families", {}
        )
        dflt = rules.get("default_when_high_unmatched", {}) or {}
        self.default_route = dflt.get("route", AUTO_REMEDIATION_HOLD)
        self.default_family = dflt.get("family", "high_unmatched")
        self.critical7 = critical7 or Critical7Ruleset.load()

    @classmethod
    def load(
        cls,
        path: Optional[Path] = None,
        critical7_path: Optional[Path] = None,
    ) -> "CodexHighClassifier":
        p = Path(path) if path else _DEFAULT_RULES
        with open(p, "r", encoding="utf-8") as fh:
            rules = yaml.safe_load(fh)
        c7 = Critical7Ruleset.load(critical7_path)
        return cls(rules, c7)

    @staticmethod
    def _haystack(finding: Dict[str, Any]) -> str:
        parts: List[str] = []
        for key in ("message", "text", "title", "detail", "action", "summary"):
            v = finding.get(key)
            if v:
                parts.append(str(v))
        return " \n ".join(parts).lower()

    def _remediation_family(self, finding: Dict[str, Any]) -> Optional[str]:
        hay = self._haystack(finding)
        for fam, spec in self.remediation_families.items():
            for kw in (spec or {}).get("keywords", []) or []:
                if kw and str(kw).lower() in hay:
                    return fam
        return None

    def classify_finding(self, finding: Dict[str, Any]) -> FindingClassification:
        fid = str(finding.get("id") or finding.get("finding_id") or "unknown")
        severity = str(finding.get("severity", "HIGH")).upper()

        c7: Critical7Result = self.critical7.classify(finding)

        # 1) invariant break / 2) Critical7 -> CHAIR_HOLD
        if c7.verdict == CHAIR_HOLD:
            return FindingClassification(
                finding_id=fid,
                severity=severity,
                family=c7.family or "critical7",
                route=CHAIR_HOLD,
                is_critical7=c7.is_critical7,
                is_invariant_break=c7.is_invariant_break,
                critical7=c7.to_json(),
                matched_terms=list(c7.matched_terms),
                reasons=list(c7.reasons),
            )

        # severity out of scope -> non-actionable (route GO-side, not held)
        if severity not in self.severity_in_scope:
            return FindingClassification(
                finding_id=fid,
                severity=severity,
                family="below_scope",
                route=GO,
                is_critical7=False,
                is_invariant_break=False,
                critical7=c7.to_json(),
                matched_terms=[],
                reasons=[f"severity {severity} not in scope — not held"],
            )

        # 3) remediation family -> AUTO_REMEDIATION_HOLD
        fam = self._remediation_family(finding)
        if fam:
            return FindingClassification(
                finding_id=fid,
                severity=severity,
                family=fam,
                route=AUTO_REMEDIATION_HOLD,
                is_critical7=False,
                is_invariant_break=False,
                critical7=c7.to_json(),
                matched_terms=[],
                reasons=[
                    f"non-Critical remediation 계열={fam} -> "
                    "AUTO_REMEDIATION_HOLD (회장 §3 자동수렴)"
                ],
            )

        # 4) HIGH but unmatched -> AUTO_REMEDIATION_HOLD (§3 자동수렴, C7 약화 아님)
        return FindingClassification(
            finding_id=fid,
            severity=severity,
            family=self.default_family,
            route=self.default_route,
            is_critical7=False,
            is_invariant_break=False,
            critical7=c7.to_json(),
            matched_terms=[],
            reasons=[
                "Critical7 게이트 탈락 + remediation 계열 미매치 — "
                "non-Critical 자동수렴 (회장 §3, Critical7 기준 약화 아님)"
            ],
        )

    def classify_batch(
        self,
        findings: Sequence[Dict[str, Any]],
        *,
        batch_id: str = "batch",
    ) -> BatchClassification:
        per: List[FindingClassification] = [
            self.classify_finding(f) for f in findings
        ]
        in_scope = [p for p in per if p.family != "below_scope"]
        chair = [p for p in per if p.route == CHAIR_HOLD]
        remed = [p for p in per if p.route == AUTO_REMEDIATION_HOLD]
        any_c7 = any(p.is_critical7 for p in per)
        any_inv = any(p.is_invariant_break for p in per)
        c7_rules = sorted({
            str(p.critical7.get("matched_rule_id"))
            for p in per
            if p.route == CHAIR_HOLD and p.critical7.get("matched_rule_id")
        })

        if chair:
            verdict = CHAIR_HOLD
            why = (
                f"{len(chair)} CHAIR_HOLD finding(s) — Critical7/invariant. "
                "회장 보고. 자동수렴 금지 (회장 §6)."
            )
        elif remed:
            verdict = AUTO_REMEDIATION_HOLD
            why = (
                f"{len(remed)} non-Critical HOLD — AUTO_REMEDIATION_HOLD "
                "자동수렴 (Track C planner 입력, 회장 §3)."
            )
        else:
            verdict = GO
            why = "in-scope HIGH/HOLD 0 — GO."

        return BatchClassification(
            schema=BATCH_SCHEMA,
            batch_id=batch_id,
            total_findings=len(per),
            in_scope_findings=len(in_scope),
            any_critical7=any_c7,
            any_invariant_break=any_inv,
            batch_verdict=verdict,
            critical7_rules_hit=list(c7_rules),
            chair_hold_findings=[p.finding_id for p in chair],
            remediation_candidates=[p.finding_id for p in remed],
            per_finding=per,
            reasons=[why],
        )


def classify_codex_high_batch(
    findings: Sequence[Dict[str, Any]],
    *,
    batch_id: str = "batch",
    classifier: Optional[CodexHighClassifier] = None,
) -> BatchClassification:
    """batch Codex HIGH/HOLD 분류 (공개 entrypoint → Track A 입력)."""
    clf = classifier or CodexHighClassifier.load()
    return clf.classify_batch(findings, batch_id=batch_id)


# ---- selftest: 실 entrypoint regression (mock-only FAIL) ----------------
_SELFTEST_BATCH = [
    {"id": "b-sec", "severity": "HIGH",
     "message": "SSRF in webhook fetcher — server-side request forgery"},
    {"id": "b-cred", "severity": "HIGH",
     "message": "AWS secret AKIAABCDEFGH12345678 hardcoded in config"},
    {"id": "b-scope", "severity": "HIGH",
     "message": "out-of-scope write: touched another track artifact"},
    {"id": "b-merge", "severity": "HIGH",
     "message": "force push to main branch attempted"},
    {"id": "b-cov", "severity": "HIGH",
     "message": "coverage regression — uncovered branch in new code"},
    {"id": "b-test", "severity": "HIGH",
     "message": "failing test in regression suite, assertionerror"},
    {"id": "b-claim", "severity": "HIGH",
     "message": "claim mismatch: result does not match, doc-only delivery"},
    {"id": "b-misc", "severity": "HIGH",
     "message": "unexpected behavior in retry loop (no known category)"},
    # canonical entrypoint regression: 단수 file= 구조적 evidence 가
    # 정규화 장벽 없이 critical7.classify() 도달 → CHAIR_HOLD 라우팅
    # (회장 §2 _haystack 단수 path/file CRITICAL FN 교정 end-to-end 증명).
    {"id": "b-path-struct", "severity": "HIGH",
     "message": "unexpected write detected", "file": ".git/config"},
    {"id": "b-pat-bare", "severity": "HIGH",
     "message": "GitHub PAT with elevated privileges used"},
    {"id": "b-low", "severity": "LOW",
     "message": "minor style nit"},
]

# (finding_id -> expected_route, expected_is_critical7)
_EXPECT = {
    "b-sec": (CHAIR_HOLD, True),
    "b-cred": (CHAIR_HOLD, True),
    "b-scope": (CHAIR_HOLD, True),
    "b-merge": (CHAIR_HOLD, True),
    "b-cov": (AUTO_REMEDIATION_HOLD, False),
    "b-test": (AUTO_REMEDIATION_HOLD, False),
    "b-claim": (AUTO_REMEDIATION_HOLD, False),
    "b-misc": (AUTO_REMEDIATION_HOLD, False),
    "b-path-struct": (CHAIR_HOLD, True),
    "b-pat-bare": (CHAIR_HOLD, True),
    "b-low": (GO, False),
}


def _selftest() -> int:
    clf = CodexHighClassifier.load()
    failures: List[str] = []

    batch = clf.classify_batch(_SELFTEST_BATCH, batch_id="selftest")
    by_id = {p.finding_id: p for p in batch.per_finding}
    for fid, (exp_route, exp_c7) in _EXPECT.items():
        p = by_id.get(fid)
        if p is None:
            failures.append(f"{fid}: missing from output")
            continue
        if p.route != exp_route:
            failures.append(
                f"{fid}: route={p.route} expected {exp_route}"
            )
        if p.is_critical7 != exp_c7:
            failures.append(
                f"{fid}: is_critical7={p.is_critical7} expected {exp_c7}"
            )

    # batch verdict: 하나라도 Critical7 면 CHAIR_HOLD.
    if batch.batch_verdict != CHAIR_HOLD:
        failures.append(
            f"batch_verdict={batch.batch_verdict} expected CHAIR_HOLD "
            "(Critical7 finding 존재)"
        )

    # all-remediation batch -> AUTO_REMEDIATION_HOLD.
    remed_only = clf.classify_batch(
        [f for f in _SELFTEST_BATCH if f["id"] in ("b-cov", "b-test", "b-claim")],
        batch_id="remed-only",
    )
    if remed_only.batch_verdict != AUTO_REMEDIATION_HOLD:
        failures.append(
            f"remed-only batch_verdict={remed_only.batch_verdict} "
            "expected AUTO_REMEDIATION_HOLD"
        )

    # no-HIGH batch -> GO.
    go_batch = clf.classify_batch(
        [{"id": "x", "severity": "LOW", "message": "nit"}], batch_id="go"
    )
    if go_batch.batch_verdict != GO:
        failures.append(
            f"low-only batch_verdict={go_batch.batch_verdict} expected GO"
        )

    # invariant break -> CHAIR_HOLD even without Critical7 keyword.
    inv = clf.classify_batch(
        [{"id": "inv", "severity": "HIGH",
          "message": "executor self-callback detected; byte-0 violation"}],
        batch_id="inv",
    )
    if inv.batch_verdict != CHAIR_HOLD or not inv.any_invariant_break:
        failures.append(
            f"invariant batch verdict={inv.batch_verdict} "
            f"any_invariant_break={inv.any_invariant_break} expected CHAIR_HOLD/True"
        )

    # MEDIUM folding (회장 §2): schemas/ artifact 는 Track A(task-2610) 소유
    # DISJOINT 으로 본 allowlist 에 신규 schema 파일 생성 불가 → 출력 schema
    # 계약을 allowlist 내 inline 으로 machine-validate. anu.codex_high_
    # classification.v1 + Track A batch_hold_adjudicator 입력 키 정합 단언.
    bj = batch.to_json()
    if bj.get("schema") != BATCH_SCHEMA:
        failures.append(
            f"schema_contract: output schema={bj.get('schema')} "
            f"expected {BATCH_SCHEMA}"
        )
    ba_in = bj.get("batch_hold_adjudicator_input", {})
    for k in ("consumer", "verdict", "chair_hold", "auto_remediation_candidates"):
        if k not in ba_in:
            failures.append(
                f"schema_contract: batch_hold_adjudicator_input missing '{k}'"
            )

    # mock-only guard: 상수 분류기는 케이스별 route 불일치로 반드시 실패.
    distinct_routes = {v[0] for v in _EXPECT.values()}
    if len(distinct_routes) < 3:
        failures.append(
            "mock_only_guard: selftest must span >=3 distinct routes "
            "(constant classifier must fail)"
        )

    result = {
        "module": "anu_v3.codex_high_classifier",
        "schema": BATCH_SCHEMA,
        "rules": str(_DEFAULT_RULES.relative_to(_ROOT)),
        "rules_version": clf.version,
        "batch_findings": len(_SELFTEST_BATCH),
        "batch_verdict": batch.batch_verdict,
        "critical7_rules_hit": batch.critical7_rules_hit,
        "remediation_candidates": batch.remediation_candidates,
        "chair_hold_findings": batch.chair_hold_findings,
        "failures": failures,
        "verdict": "PASS" if not failures else "FAIL",
        "mock_only_would_fail": True,
        "track_a_input_schema": BATCH_SCHEMA,
    }
    print(json.dumps(result, ensure_ascii=False, indent=2))
    return 0 if not failures else 1


def _main(argv: Optional[Sequence[str]] = None) -> int:
    ap = argparse.ArgumentParser(
        description="Codex HIGH/HOLD batch classifier (task-2611 Track B)"
    )
    ap.add_argument("--selftest", action="store_true",
                    help="실 entrypoint regression 실행")
    ap.add_argument("--input", type=str, default=None,
                    help="findings JSON 배열 파일 (없으면 stdin)")
    ap.add_argument("--batch-id", type=str, default="batch")
    ap.add_argument("--rules", type=str, default=None)
    ap.add_argument("--critical7-rules", type=str, default=None)
    args = ap.parse_args(list(argv) if argv is not None else None)

    if args.selftest:
        return _selftest()

    raw = (Path(args.input).read_text(encoding="utf-8")
           if args.input else sys.stdin.read())
    payload = json.loads(raw)
    findings = payload if isinstance(payload, list) else [payload]
    clf = CodexHighClassifier.load(
        Path(args.rules) if args.rules else None,
        Path(args.critical7_rules) if args.critical7_rules else None,
    )
    batch = clf.classify_batch(findings, batch_id=args.batch_id)
    print(json.dumps(batch.to_json(), ensure_ascii=False, indent=2))
    return 0


__all__ = [
    "BATCH_SCHEMA",
    "CHAIR_HOLD",
    "AUTO_REMEDIATION_HOLD",
    "GO",
    "FindingClassification",
    "BatchClassification",
    "CodexHighClassifier",
    "classify_codex_high_batch",
]


if __name__ == "__main__":
    raise SystemExit(_main())
