# -*- coding: utf-8 -*-
"""anu_v3.auto_remediation_planner — task-2612 Track C (plan-only).

회장 verbatim (task-2612 §1/§2): "Critical7이 아니면 ANU-Codex loop로 자동
remediation 하여 all-settled 까지 진행하는 구조를 구현한다(remediation plan
생성부)."

본 모듈은 batch HOLD 분류 결과를 입력받아 **새 task ID remediation spec
골격**(목표/필수/금지/expected_files/9-R)으로 변환하는 *생성기* 다. 세 유형:

  - ``GLOBAL_LEDGER_SHA_FALSE_POSITIVE`` (2604 유형) — 전역 ledger SHA
    하드핀이 sanctioned cross-track append 와 자가모순(false-positive)
    → track-scoped invariant 교정 plan.
  - ``STAGE_CLAIM_TEST_MISMATCH`` (2605 유형) — claimed real-entrypoint
    stage 가 실증되지 않음(부분 stage 만 real·regression range 절단)
    → spy/monkeypatch 실호출 증명 보강 plan (mock-only FAIL 강제).
  - ``COVERAGE_GAP`` (2609 유형) — mandated fixture 가 특정 standalone
    조건을 한 번도 exercise 하지 않아 verdict logic 미망라
    → coverage 보강 plan (fail-safe 방향 실증).

분류 규칙 (회장 §3/§6 verbatim): Critical7 또는 shared invariant 파손 →
``HOLD_FOR_CHAIR``. 그 외 non-Critical 은 ``AUTO_REMEDIATION_HOLD`` 로
자동 수렴(회장 개별 확인 대기 없음). Critical7 판정 자체는 TrackB
critical7_classifier 소관 — 본 planner 는 그 분류 결과(``is_critical7``)를
**입력으로만 소비**한다(중복 소유 0·DISJOINT 보존).

**plan only**: 본 모듈은 plan 골격 *생성* 만 수행한다. 실제 dispatch /
코드수정 / subprocess / cokacdir / cron / merge / PR / credential 표면을
import 하거나 호출하지 않는다(§2/§5). ``PLAN_ONLY`` / ``DISPATCH_PERFORMED``
불변식 + ``assert_plan_only`` 정적 가드로 강제한다.

비-문서 산출: 본 모듈은 실 entrypoint(``build_plan_from_adjudication`` /
``main`` CLI)를 갖는다. ``run_self_check`` 는 mock 0 으로 실제 2604/2605/
2609 adjudication 증거 파일을 read-only consume → 실 plan 을 생성·스키마
검증한다(mock-only 경로는 본질적으로 plan 을 못 만들어 FAIL).
"""
from __future__ import annotations

import argparse
import ast
import json
import os
import stat
import sys
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Dict, List, Optional, Sequence, cast

PLAN_SCHEMA = "anu_v3.auto_remediation_plan.v1"
SCHEMA_PATH = "schemas/auto_remediation_plan.schema.json"
CANONICAL_ROOT = "/home/jay/workspace"
# H1/H2 hardening: 모든 입력 read-path / expected_files 경로는 이 canonical
# workspace root 하위 strict containment 만 허용한다(resolve 후 symlink·``..``
# traversal·임의 절대경로 fail-closed). 이는 planner 모듈의 *입력검증* 강화일
# 뿐 산출 성격(plan 골격 생성만·실 dispatch 0·실 코드수정 0)은 불변이다.
CANONICAL_WS_ROOT = Path(CANONICAL_ROOT).resolve()

# ── issue types (2604 / 2605 / 2609 유형) ───────────────────────────────
TYPE_GLOBAL_LEDGER_SHA_FALSE_POSITIVE = "GLOBAL_LEDGER_SHA_FALSE_POSITIVE"
TYPE_STAGE_CLAIM_TEST_MISMATCH = "STAGE_CLAIM_TEST_MISMATCH"
TYPE_COVERAGE_GAP = "COVERAGE_GAP"

ISSUE_TYPES = (
    TYPE_GLOBAL_LEDGER_SHA_FALSE_POSITIVE,
    TYPE_STAGE_CLAIM_TEST_MISMATCH,
    TYPE_COVERAGE_GAP,
)

# ── disposition ─────────────────────────────────────────────────────────
DISPOSITION_AUTO = "AUTO_REMEDIATION_HOLD"
DISPOSITION_CHAIR = "HOLD_FOR_CHAIR"

SEVERITIES = ("LOW", "MEDIUM", "HIGH", "CRITICAL")

# plan-only 불변식 — 본 모듈은 절대 dispatch 하지 않는다.
PLAN_ONLY = True
DISPATCH_PERFORMED = False

class PlanOnlyViolation(RuntimeError):
    """planner 가 dispatch/side-effect 표면을 건드리려 할 때."""


class PathContainmentError(ValueError):
    """입력 경로가 CANONICAL_WS_ROOT 하위 strict containment 를 벗어남.

    H1(``_read_json`` read-path)·H2(``expected_files`` module_rel/test_rel)
    공통 fail-closed 신호 — ``..`` traversal·임의 절대경로·symlink 이탈을
    거부한다(downstream out-of-scope 유도 차단).
    """


def _contained_resolved(path: str) -> Path:
    """입력 경로를 CANONICAL_WS_ROOT 기준 resolve 후 strict containment 검증.

    - 상대경로는 ``CANONICAL_WS_ROOT`` 기준 결합, 절대경로는 그대로 사용.
    - ``Path.resolve()`` 로 ``..`` 정규화 + symlink 실체화한 뒤,
      결과가 ``CANONICAL_WS_ROOT`` 의 진정한 하위(root 자기 자신 제외)인지
      검증한다. 이탈 시 ``PathContainmentError`` (fail-closed).
    """
    raw = Path(path)
    candidate = raw if raw.is_absolute() else (CANONICAL_WS_ROOT / raw)
    resolved = candidate.resolve()
    if resolved == CANONICAL_WS_ROOT or CANONICAL_WS_ROOT not in resolved.parents:
        raise PathContainmentError(
            f"path escapes CANONICAL_WS_ROOT (fail-closed): "
            f"{path!r} -> {resolved}"
        )
    return resolved


# 실 호출로 간주하는 (module, attr) / 전역 함수 — AST 로만 탐지(문자열/주석 무시).
_FORBIDDEN_ATTR_CALLS = {
    "subprocess": {
        "run",
        "Popen",
        "call",
        "check_call",
        "check_output",
        "getoutput",
        "getstatusoutput",
    },
    "os": {"system", "popen", "execv", "execvp", "spawnv", "posix_spawn"},
}
_FORBIDDEN_BARE_CALLS = {"system", "Popen"}


def assert_plan_only() -> None:
    """정적 자기검증: 본 소스에 dispatch/side-effect *실호출* 이 없음을 확인.

    문자열 리터럴/주석/docstring 의 토큰 언급은 허용하고, AST Call 노드
    (``subprocess.run(...)``, ``os.system(...)`` 등 실제 호출)만 탐지한다.
    위반 시 ``PlanOnlyViolation``.
    """
    if not (PLAN_ONLY and not DISPATCH_PERFORMED):
        raise PlanOnlyViolation("PLAN_ONLY/DISPATCH_PERFORMED invariant broken")
    src = Path(__file__).read_text(encoding="utf-8")
    tree = ast.parse(src)
    for node in ast.walk(tree):
        if not isinstance(node, ast.Call):
            continue
        fn = node.func
        if isinstance(fn, ast.Attribute) and isinstance(fn.value, ast.Name):
            mod, attr = fn.value.id, fn.attr
            if attr in _FORBIDDEN_ATTR_CALLS.get(mod, set()):
                raise PlanOnlyViolation(
                    f"dispatch surface call detected: {mod}.{attr}(...)"
                )
        elif isinstance(fn, ast.Name) and fn.id in _FORBIDDEN_BARE_CALLS:
            raise PlanOnlyViolation(
                f"dispatch surface call detected: {fn.id}(...)"
            )


# ── 입력 분류 모델 ──────────────────────────────────────────────────────
@dataclass
class IssueClassification:
    """batch HOLD 분류 결과(planner 입력).

    ``is_critical7`` 은 TrackB critical7_classifier 산출 — 본 planner 는
    소비만 한다. ``shared_invariant_broken`` 은 TrackA batch_hold_
    adjudicator 류 산출의 신호.
    """

    source_task: str
    issue_type: str
    severity: str = "HIGH"
    is_critical7: bool = False
    shared_invariant_broken: bool = False
    summary: str = ""
    evidence_refs: List[str] = field(default_factory=list)
    detail: Dict[str, object] = field(default_factory=dict)

    def __post_init__(self) -> None:
        if self.issue_type not in ISSUE_TYPES:
            raise ValueError(
                f"unknown issue_type {self.issue_type!r} "
                f"(expected one of {ISSUE_TYPES})"
            )
        if self.severity not in SEVERITIES:
            raise ValueError(f"unknown severity {self.severity!r}")


@dataclass
class RemediationPlan:
    schema: str
    plan_id: str
    remediation_of: str
    issue_type: str
    disposition: str
    severity: str
    is_critical7: bool
    plan_only: bool
    dispatch_performed: bool
    shared_invariant_preserved: bool
    hold_for_chair: bool
    reasons: List[str]
    source_evidence_refs: List[str]
    spec_skeleton: Dict[str, object]

    def to_dict(self) -> Dict[str, object]:
        return {
            "schema": self.schema,
            "plan_id": self.plan_id,
            "remediation_of": self.remediation_of,
            "issue_type": self.issue_type,
            "disposition": self.disposition,
            "severity": self.severity,
            "is_critical7": self.is_critical7,
            "plan_only": self.plan_only,
            "dispatch_performed": self.dispatch_performed,
            "shared_invariant_preserved": self.shared_invariant_preserved,
            "hold_for_chair": self.hold_for_chair,
            "reasons": list(self.reasons),
            "source_evidence_refs": list(self.source_evidence_refs),
            "spec_skeleton": self.spec_skeleton,
        }


# ── disposition 분류 (회장 §3/§6) ───────────────────────────────────────
def classify_disposition(issue: IssueClassification) -> str:
    """Critical7 또는 shared invariant 파손 → HOLD_FOR_CHAIR.
    그 외 non-Critical → AUTO_REMEDIATION_HOLD 자동 수렴 (회장 §6 verbatim)."""
    if issue.is_critical7 or issue.shared_invariant_broken:
        return DISPOSITION_CHAIR
    return DISPOSITION_AUTO


# ── 새 remediation task ID 발급 ─────────────────────────────────────────
def next_plan_id(source_task: str, round_no: int = 1) -> str:
    """원 HOLD task ID → 새 remediation task ID 골격(plan 식별자).

    실제 dispatch 시 ANU 가 확정 — 여기서는 결정적 default 만 제시한다.
    """
    if round_no < 1:
        raise ValueError("round_no must be >= 1")
    return f"{source_task}-AR{round_no}"


# ── 공통 9-R 골격 (생성되는 remediation spec 의 GO_READY 기준) ──────────
_COMMON_NINE_R = [
    "ANU-Codex lint = GO_READY (HIGH/CRITICAL 0)",
    "실 entrypoint regression PASS · mock-only 경로는 반드시 FAIL "
    "(문서-only/mock-only 완료 금지)",
    "expected_files allowlist 외 write 0 · 타 track 과 DISJOINT",
    "shared invariant 보존 (기존 task-2553·task-2604 multitrack "
    "산출물·frozen anchor byte-0)",
    "git HEAD/branch 전후 EQUAL · PR/merge/branch/main write/credential 0",
    "executor self-* (callback/collector/adjudication/dispatch) 0 · "
    "independent ANU collector 만 authoritative",
]

_COMMON_FORBIDDEN = [
    "기존 산출물 변조 (task-2553·task-2604 multitrack·frozen anchor byte-0)",
    "문서-only / mock-only 완료 (실 entrypoint + regression 필수)",
    "executor self-callback/self-collector/self-adjudication/self-dispatch",
    "independent ANU authoritative 약화",
    "fallback/dead-man/fixed-time 진행 트리거",
    "PR/branch/main write·merge·credential",
]


def _normalize_expected_rel(rel: str) -> str:
    """H2 hardening: module_rel/test_rel 을 ``expected_files`` 유입 전 정규화.

    골격 placeholder(``<...>``)는 실경로가 아니므로 그대로 통과해 기존
    plan 골격(2604/2605/2609) 동작을 byte-0 보존한다. 실경로 형태는
    CANONICAL_WS_ROOT 하위 strict containment 로 정규화하며, ``..``
    traversal·절대경로 이탈·symlink 이탈 시 ``PathContainmentError``
    (fail-closed — downstream executor 의 out-of-scope 경로 유도 차단).
    """
    if rel.startswith("<") and rel.endswith(">"):
        return rel
    return _contained_resolved(rel).relative_to(CANONICAL_WS_ROOT).as_posix()


def _expected_files(plan_id: str, module_rel: str, test_rel: str) -> List[str]:
    return [
        _normalize_expected_rel(module_rel),
        _normalize_expected_rel(test_rel),
        f"memory/events/{plan_id}.decision.json",
        f"memory/events/{plan_id}.result.json",
        f"memory/reports/{plan_id}.md",
    ]


# ── 유형별 spec 골격 빌더 ───────────────────────────────────────────────
def _skeleton_2604(plan_id: str, issue: IssueClassification) -> Dict[str, object]:
    """2604 유형: global ledger SHA false-positive → track-scoped invariant."""
    target = str(issue.detail.get("target_test", "<HOLD test/fixture>"))
    module_rel = str(issue.detail.get("module_rel", target))
    test_rel = str(issue.detail.get("test_rel", target))
    return {
        "task_id": plan_id,
        "title": f"{plan_id} — global ledger SHA false-positive 교정 "
        "(track-scoped invariant)",
        "executor_hint": "단일 executor 1회 한정 · ANU key callback",
        "goal": [
            f"{issue.source_task} 의 전역 ledger SHA 하드핀(full-file "
            "hardpin)을 track-scoped invariant 로 교정하여 sanctioned "
            "cross-track append 와의 자가모순(false-positive)을 제거한다.",
            "기능 로직은 무변경 — test-harness invariant 한정 교정.",
        ],
        "must": [
            "전역 SHA/full-file 하드핀 제거 → track-scoped invariant 로 대체: "
            "(a) 본 track row 의 shared ledger self-append 0, "
            "(b) pre-existing prefix byte 보존(변조 0), "
            "(c) append-only 성장만 허용(축소/재작성 = FAIL).",
            "sanctioned/sibling +53/+54 durable-success append 는 무FAIL "
            "(false-positive 제거 실증).",
            "append-only 위반(shrink/rewrite/prefix-tamper/self-row)은 "
            "여전히 FAIL — isolated /tmp ledger self-proof 로 양방향 증명.",
            "실 regression: 교정 전 FAIL(전역 하드핀이 sanctioned append "
            "에서 collection-time FAIL) → 교정 후 PASS 재현.",
            f"{target} 외 기능 본문 byte-0 (s1..sN 동작 무변경).",
        ],
        "forbidden": list(_COMMON_FORBIDDEN)
        + ["기능 로직 변경(test-harness invariant 한정)"],
        "expected_files": _expected_files(plan_id, module_rel, test_rel),
        "nine_r": {
            "go_ready_criteria": list(_COMMON_NINE_R)
            + [
                "track-scoped invariant 3요소(self-append 0·prefix 보존·"
                "append-only) 정합 · 전역 하드핀 완전 제거",
            ],
            "relint_required": False,
        },
    }


def _skeleton_2605(plan_id: str, issue: IssueClassification) -> Dict[str, object]:
    """2605 유형: stage claim/test mismatch → spy/monkeypatch 실호출 증명."""
    stage = str(issue.detail.get("claimed_stage", "stage9"))
    rng = issue.detail.get("regression_range", "range(1, N)")
    module_rel = str(issue.detail.get("module_rel", "<entrypoint module>"))
    test_rel = str(issue.detail.get("test_rel", "<regression test>"))
    return {
        "task_id": plan_id,
        "title": f"{plan_id} — {stage} claim/test mismatch 교정 "
        "(spy/monkeypatch 실호출 증명 보강)",
        "executor_hint": "단일 executor 1회 한정 · ANU key callback",
        "goal": [
            f"{issue.source_task} 의 claimed real-entrypoint {stage} 가 "
            "실증되지 않은 mismatch 를 해소한다 — spy/monkeypatch 로 "
            f"{stage} 실호출을 증명하고 regression range 를 전 stage 로 "
            "확장한다.",
        ],
        "must": [
            f"{stage} 의 실 entrypoint 호출을 spy/monkeypatch 로 포착 — "
            "호출 횟수·인자·반환을 단언(claim 과 test 일치 실증).",
            f"regression range 를 {rng} → 전 stage 포함으로 확장 "
            f"({stage} 누락 0).",
            "mock-only 테스트는 반드시 FAIL 하도록 가드: 실 호출이 "
            "spy 에 잡히지 않으면 assertion 실패(허위 PASS 차단).",
            "기존 stage 1..N-1 regression 무회귀(0 regression) + "
            f"{stage} additive 강화.",
            "실 entrypoint 직접 호출(REAL) · 인접 PASS track 산출물 byte-0.",
        ],
        "forbidden": list(_COMMON_FORBIDDEN)
        + ["claim 만 수정하고 test 미보강(역방향 mismatch 은폐)"],
        "expected_files": _expected_files(plan_id, module_rel, test_rel),
        "nine_r": {
            "go_ready_criteria": list(_COMMON_NINE_R)
            + [
                f"{stage} 실호출 spy/monkeypatch 단언 통과 · regression "
                "range 전 stage 망라 · mock-only 경로 FAIL 실증",
            ],
            "relint_required": False,
        },
    }


def _skeleton_2609(plan_id: str, issue: IssueClassification) -> Dict[str, object]:
    """2609 유형: coverage gap → coverage 보강 plan."""
    _uc: object = issue.detail.get(
        "uncovered_conditions",
        ["<standalone condition never exercised>"],
    )
    uncovered: List[str] = (
        [_uc] if isinstance(_uc, str) else [str(x) for x in cast(list, _uc)]
    )
    module_rel = str(issue.detail.get("module_rel", "<verdict module>"))
    test_rel = str(issue.detail.get("test_rel", "<regression test>"))
    return {
        "task_id": plan_id,
        "title": f"{plan_id} — coverage gap 보강 "
        "(미망라 standalone 조건 fixture 추가)",
        "executor_hint": "단일 executor 1회 한정 · ANU key callback",
        "goal": [
            f"{issue.source_task} 의 verdict logic 미망라 coverage gap 을 "
            "보강한다 — mandated fixture 가 한 번도 exercise 하지 않은 "
            "standalone 조건을 격리 fixture 로 추가한다.",
        ],
        "must": [
            "미망라 조건마다 해당 조건만 false/standalone 으로 격리한 "
            f"fixture 추가: {list(uncovered)}.",
            "각 추가 fixture 가 실 entrypoint(judge/verdict 함수) 위에서 "
            "fail-safe 방향(보수적 HOLD/non-merge)으로 결정적 수렴함을 "
            "실증 — 위험 방향(auto-merge/PASS)으로 새지 않음.",
            "기존 §-mandated regression 전부 무회귀(verdict + critical7 "
            "match 유지) + additive 보강만.",
            "spec-mandated regression 누락 0 · invariant 약화 0.",
            "실 entrypoint 직접 호출(REAL) · mock-only 는 coverage 미증명 "
            "으로 FAIL.",
        ],
        "forbidden": list(_COMMON_FORBIDDEN)
        + ["기존 mandated fixture 의 expected verdict 변조"],
        "expected_files": _expected_files(plan_id, module_rel, test_rel),
        "nine_r": {
            "go_ready_criteria": list(_COMMON_NINE_R)
            + [
                "미망라 조건 격리 fixture 전부 fail-safe 방향 실증 · "
                "기존 mandated regression 무회귀 · coverage gap 0",
            ],
            "relint_required": False,
        },
    }


_BUILDERS = {
    TYPE_GLOBAL_LEDGER_SHA_FALSE_POSITIVE: _skeleton_2604,
    TYPE_STAGE_CLAIM_TEST_MISMATCH: _skeleton_2605,
    TYPE_COVERAGE_GAP: _skeleton_2609,
}


# ── plan 빌더 ───────────────────────────────────────────────────────────
def build_remediation_plan(
    issue: IssueClassification, round_no: int = 1
) -> RemediationPlan:
    """IssueClassification → RemediationPlan (plan only, dispatch 0).

    fail-closed: HOLD_FOR_CHAIR 인 경우에도 spec 골격은 *생성*(회장 보고용)
    하되 disposition/hold_for_chair 로 명확히 표기한다. dispatch 는 결코
    수행하지 않는다.
    """
    assert_plan_only()
    disposition = classify_disposition(issue)
    hold_for_chair = disposition == DISPOSITION_CHAIR
    plan_id = next_plan_id(issue.source_task, round_no)

    builder = _BUILDERS[issue.issue_type]
    skeleton = builder(plan_id, issue)

    reasons: List[str] = []
    if issue.is_critical7:
        reasons.append(
            "Critical7 분류 → HOLD_FOR_CHAIR (회장 §6, 자동 수렴 금지)"
        )
    if issue.shared_invariant_broken:
        reasons.append(
            "shared invariant 파손 → HOLD_FOR_CHAIR (회장 §6)"
        )
    if not hold_for_chair:
        reasons.append(
            f"non-Critical {issue.severity} ({issue.issue_type}) → "
            "AUTO_REMEDIATION_HOLD 자동 수렴 (회장 §3/§6, 개별 확인 대기 "
            "없음)"
        )
    reasons.append(
        "plan only — 실제 dispatch/코드수정은 본 산출 아님 (회장 §2/§5)"
    )

    return RemediationPlan(
        schema=PLAN_SCHEMA,
        plan_id=plan_id,
        remediation_of=issue.source_task,
        issue_type=issue.issue_type,
        disposition=disposition,
        severity=issue.severity,
        is_critical7=issue.is_critical7,
        plan_only=PLAN_ONLY,
        dispatch_performed=DISPATCH_PERFORMED,
        shared_invariant_preserved=not issue.shared_invariant_broken,
        hold_for_chair=hold_for_chair,
        reasons=reasons,
        source_evidence_refs=list(issue.evidence_refs),
        spec_skeleton=skeleton,
    )


# ── 실 adjudication 증거(read-only) → IssueClassification 추출 ──────────
def _fd_realpath(fd: int) -> Path:
    """열린 fd 가 *실제* 가리키는 커널-정규 경로를 ``/proc/self/fd`` 로 회수.

    ``os.readlink('/proc/self/fd/<fd>')`` 는 open 시점에 커널이 이미 모든
    구성요소(중간 디렉터리 symlink-swap 포함)를 해소한 실경로를 돌려준다 —
    파일시스템을 다시 walk 하지 않으므로 추가 TOCTOU 가 없다. ``/proc``
    미가용·삭제된 inode (`" (deleted)"` 접미) 등은 fail-closed 로 거부한다.
    """
    try:
        link = os.readlink(f"/proc/self/fd/{fd}")
    except OSError as exc:
        raise PathContainmentError(
            f"cannot resolve opened fd realpath (fail-closed): {exc}"
        ) from exc
    if link.endswith(" (deleted)") or "\x00" in link:
        raise PathContainmentError(
            f"opened fd realpath is unstable/deleted (fail-closed): {link!r}"
        )
    return Path(link)


def _assert_fd_inode_contained(fd: int, resolved: Path) -> None:
    """H1h hardening (3차·additive defense-in-depth — hard-link 동일-inode 봉합).

    H1r 2차 방어(``O_NOFOLLOW`` + ``/proc/self/fd`` realpath + ``fstat``)는
    *경로* 기준 이탈만 차단한다. ``CANONICAL_WS_ROOT`` 내부 쓰기권한 공격자가
    동일 파일시스템·동일 uid 의 ws-밖 inode 에 대한 *hard link* 를 ws 내부에
    심으면 그 in-root pathname 은 정적·H1r 검증을 모두 통과하지만 열린 fd 는
    ws-밖 inode 를 가리킨다(race 없는 out-of-scope read). hard link 은 별도
    realpath 가 없으므로 inode 메타데이터로만 탐지 가능하다:

    - 열린 fd 의 ``os.fstat`` st_dev/st_ino 를 ``_contained_resolved()`` 가
      산출한 정적 경로의 ``os.lstat``(symlink 미추종) st_dev/st_ino 와
      ``os.path.samestat`` 동일성 대조 — 불일치 = check→open 사이 swap/우회
      → fail-closed (H1r realpath 검증과 직교한 additive 보강).
    - 정규파일이면서 ``st_nlink > 1`` 이면 해당 inode 가 ws-밖에서도 도달
      가능한 hard link 일 수 있다. 본 read-path 가 소비하는 정본
      JSON(schema·adjudication·plan)은 단일-link 정규파일이므로, 다중-link
      정규파일은 ws-내부 정합을 별도 보장할 수 없는 한 보수적으로
      fail-closed (out-of-scope hard-link inode read 차단).

    어떤 이탈도 ``PathContainmentError`` (fail-closed). 본 함수는 read-only
    inode 메타데이터만 조회하며 planner 산출 성격(plan 골격만)과 무관하다.
    """
    fst = os.fstat(fd)
    try:
        lst = os.lstat(resolved)
    except OSError as exc:
        raise PathContainmentError(
            f"cannot lstat static resolved path (fail-closed): {exc}"
        ) from exc
    if not os.path.samestat(fst, lst):
        raise PathContainmentError(
            "opened fd inode != static path inode "
            f"(check->open swap fail-closed): fd=({fst.st_dev},{fst.st_ino}) "
            f"static=({lst.st_dev},{lst.st_ino})"
        )
    if stat.S_ISREG(fst.st_mode) and fst.st_nlink > 1:
        raise PathContainmentError(
            "opened regular file has st_nlink>1 — possible out-of-scope "
            f"hard link (fail-closed): nlink={fst.st_nlink}"
        )


def _read_json(path: str) -> Dict[str, Any]:
    # H1 hardening (1차·정적 방어): 절대경로 무검 허용 / 상대 ``..``
    # traversal 금지. resolve 후 CANONICAL_WS_ROOT 하위 strict containment
    # 만 통과(symlink 이탈·임의 절대 JSON 경로·prefix-confusion fail-closed).
    resolved = _contained_resolved(path)
    # H1r hardening (2차·열린 fd 기준 재검증 — validate→open TOCTOU 봉합):
    # 정적 검증과 open 사이 공격자가 경로 구성요소를 ws-밖 symlink 로
    # 교체하는 race 를 제거한다. (i) ``O_NOFOLLOW`` 로 최종 구성요소
    # symlink 추종을 차단하고, (ii) 실제 열린 fd 가 가리키는 커널-정규
    # 실경로(/proc/self/fd)가 CANONICAL_WS_ROOT strict 하위인지 재확인하며
    # (중간 구성요소 symlink-swap 까지 반영), (iii) 정규파일 여부를
    # fstat 으로 확인한다. 어떤 이탈도 PathContainmentError fail-closed.
    flags = os.O_RDONLY | os.O_NOFOLLOW | getattr(os, "O_CLOEXEC", 0)
    try:
        fd = os.open(resolved, flags)
    except OSError as exc:
        # ELOOP 등: 검증 후 최종 구성요소가 symlink 로 교체됨 → fail-closed.
        raise PathContainmentError(
            f"path open blocked (O_NOFOLLOW fail-closed): {path!r} -> "
            f"{resolved} ({type(exc).__name__}: {exc})"
        ) from exc
    try:
        real = _fd_realpath(fd)
        if real == CANONICAL_WS_ROOT or CANONICAL_WS_ROOT not in real.parents:
            raise PathContainmentError(
                f"opened fd escapes CANONICAL_WS_ROOT (TOCTOU fail-closed): "
                f"{path!r} -> {real}"
            )
        st = os.fstat(fd)
        if not stat.S_ISREG(st.st_mode):
            raise PathContainmentError(
                f"opened fd is not a regular file (fail-closed): {path!r}"
            )
        # H1h hardening (3차·additive — hard-link 동일-inode 우회 봉합):
        # 정적 1차(_contained_resolved)·H1r 2차(O_NOFOLLOW+/proc/self/fd
        # realpath+fstat)는 보존한 채, 열린 fd 의 inode 가 정적 경로 inode 와
        # 동일한지 + 다중-link 정규파일(ws-밖 hard link 가능) 여부를
        # fail-closed 로 추가 검증한다(defense-in-depth).
        _assert_fd_inode_contained(fd, resolved)
        with os.fdopen(fd, "r", encoding="utf-8", closefd=True) as fh:
            fd = -1  # fdopen 이 fd 소유권 인수 — 이중 close 방지.
            text = fh.read()
    finally:
        if fd >= 0:
            os.close(fd)
    return cast(Dict[str, Any], json.loads(text))


def extract_issue_from_adjudication(
    path: str, issue_type: str, *, severity: str = "HIGH"
) -> IssueClassification:
    """원 HOLD adjudication JSON 을 **read-only consume** 하여
    IssueClassification 을 추출한다. (기존 산출물 byte-0 — 읽기만.)

    is_critical7 은 본 adjudication 의 codex high/critical + Critical7
    표식에서 *보수적으로* 유도하되, 권위 분류는 TrackB 소관임을 명시.
    """
    data = _read_json(path)
    raw_task = str(
        data.get("remediation_of")
        or data.get("task_id")
        or data.get("schema_task")
        or data.get("schema_id")
        or "task-unknown"
    )
    # 원 HOLD task 로 정규화: '+N' / '-ARn' / '.suffix' 제거.
    source_task = raw_task.split("+")[0].split("-AR")[0].split(".")[0]
    _codex_raw = data.get("codex_adjudication") or data.get("codex") or {}
    codex: Dict[str, Any] = (
        cast(Dict[str, Any], _codex_raw) if isinstance(_codex_raw, dict) else {}
    )
    critical = int(codex.get("critical", 0) or 0)
    # Critical7 권위 판정은 TrackB — 여기서는 codex critical>0 만 보수적 hint.
    is_critical7 = critical > 0
    # shared invariant: git before==after 미보존이면 파손 신호.
    _inv_raw = data.get("independent_anu_verification", {})
    inv: Dict[str, Any] = (
        cast(Dict[str, Any], _inv_raw) if isinstance(_inv_raw, dict) else {}
    )
    shared_broken = inv.get("git_equal_before_after") is False
    summary = str(
        codex.get("summary")
        or codex.get("high_finding")
        or codex.get("high_resolution")
        or ""
    )
    return IssueClassification(
        source_task=source_task,
        issue_type=issue_type,
        severity=severity,
        is_critical7=is_critical7,
        shared_invariant_broken=shared_broken,
        summary=summary[:600],
        evidence_refs=[path],
        detail=cast(Dict[str, object], data.get("_planner_detail", {})) or {},
    )


def build_plan_from_adjudication(
    path: str, issue_type: str, *, severity: str = "HIGH", round_no: int = 1
) -> RemediationPlan:
    """실 entrypoint: adjudication 증거 경로 → RemediationPlan."""
    issue = extract_issue_from_adjudication(path, issue_type, severity=severity)
    return build_remediation_plan(issue, round_no=round_no)


# ── 스키마 검증 ─────────────────────────────────────────────────────────
def validate_plan(plan: Dict[str, object]) -> List[str]:
    """plan dict 를 schemas/auto_remediation_plan.schema.json 으로 검증.

    jsonschema 미가용 시 최소 required 키 검사로 fail-closed.
    """
    schema = _read_json(SCHEMA_PATH)
    try:
        import jsonschema  # type: ignore

        validator = jsonschema.Draft7Validator(schema)
        return [
            f"{list(e.path)}: {e.message}"
            for e in validator.iter_errors(cast(Any, plan))
        ]
    except ImportError:
        errs: List[str] = []
        for key in cast(List[str], schema.get("required", [])):
            if key not in plan:
                errs.append(f"missing required key: {key}")
        return errs


# ── self-check (mock 0 · 실 증거 read-only) ─────────────────────────────
_SELF_CHECK_CASES = (
    (
        "memory/events/task-2604+1.independent-collector-adjudication.json",
        TYPE_GLOBAL_LEDGER_SHA_FALSE_POSITIVE,
    ),
    (
        "memory/events/task-2605+2.independent-anu-collector.adjudication.json",
        TYPE_STAGE_CLAIM_TEST_MISMATCH,
    ),
    (
        "memory/events/task-2609.independent-collector-adjudication.json",
        TYPE_COVERAGE_GAP,
    ),
)


def run_self_check() -> Dict[str, object]:
    """실 2604/2605/2609 adjudication 을 read-only consume → 실 plan 생성·
    스키마 검증. mock 0 — 증거 파일이 없거나 실 entrypoint 가 plan 을
    못 만들면 FAIL (mock-only 경로 자동 FAIL)."""
    assert_plan_only()
    results: List[Dict[str, object]] = []
    ok = True
    for rel, itype in _SELF_CHECK_CASES:
        rec: Dict[str, object] = {"evidence": rel, "issue_type": itype}
        try:
            plan = build_plan_from_adjudication(rel, itype)
            pd = plan.to_dict()
            errs = validate_plan(pd)
            sk = cast(Dict[str, Any], pd["spec_skeleton"])
            nine_r = cast(Dict[str, Any], sk.get("nine_r", {}))
            structural_ok = (
                bool(sk.get("goal"))
                and bool(sk.get("must"))
                and bool(sk.get("forbidden"))
                and bool(sk.get("expected_files"))
                and bool(nine_r.get("go_ready_criteria"))
            )
            rec.update(
                plan_id=pd["plan_id"],
                disposition=pd["disposition"],
                hold_for_chair=pd["hold_for_chair"],
                schema_errors=errs,
                structural_ok=structural_ok,
                passed=(not errs and structural_ok),
            )
            ok = ok and not errs and structural_ok
        except Exception as exc:  # noqa: BLE001 — self-check must capture all
            rec.update(passed=False, error=f"{type(exc).__name__}: {exc}")
            ok = False
        results.append(rec)
    return {
        "schema": "anu_v3.auto_remediation_planner.self_check.v1",
        "all_passed": ok,
        "plan_only": PLAN_ONLY,
        "dispatch_performed": DISPATCH_PERFORMED,
        "cases": results,
    }


# ── CLI 실 entrypoint ───────────────────────────────────────────────────
def _build_parser() -> argparse.ArgumentParser:
    p = argparse.ArgumentParser(
        prog="auto_remediation_planner",
        description="task-2612 Track C — auto remediation plan 생성기 "
        "(plan only · dispatch 0).",
    )
    p.add_argument(
        "--from-adjudication",
        help="원 HOLD adjudication JSON 경로 (read-only consume).",
    )
    p.add_argument(
        "--issue-type",
        choices=list(ISSUE_TYPES),
        help="2604/2605/2609 유형.",
    )
    p.add_argument("--severity", default="HIGH", choices=list(SEVERITIES))
    p.add_argument("--round", type=int, default=1, dest="round_no")
    p.add_argument(
        "--issue-json",
        help="IssueClassification dict JSON 경로(직접 입력).",
    )
    p.add_argument(
        "--self-check",
        action="store_true",
        help="실 2604/2605/2609 증거로 self-check (mock 0).",
    )
    p.add_argument(
        "--validate",
        action="store_true",
        help="생성 plan 을 스키마로 검증하고 결과를 함께 출력.",
    )
    return p


def main(argv: Optional[Sequence[str]] = None) -> int:
    assert_plan_only()
    args = _build_parser().parse_args(argv)

    if args.self_check:
        out = run_self_check()
        print(json.dumps(out, ensure_ascii=False, indent=2))
        return 0 if out["all_passed"] else 1

    if args.issue_json:
        raw = _read_json(args.issue_json)
        issue = IssueClassification(
            source_task=str(raw["source_task"]),
            issue_type=str(raw["issue_type"]),
            severity=str(raw.get("severity", "HIGH")),
            is_critical7=bool(raw.get("is_critical7", False)),
            shared_invariant_broken=bool(
                raw.get("shared_invariant_broken", False)
            ),
            summary=str(raw.get("summary", "")),
            evidence_refs=list(raw.get("evidence_refs", [])),
            detail=dict(raw.get("detail", {})),
        )
        plan = build_remediation_plan(issue, round_no=args.round_no)
    elif args.from_adjudication and args.issue_type:
        plan = build_plan_from_adjudication(
            args.from_adjudication,
            args.issue_type,
            severity=args.severity,
            round_no=args.round_no,
        )
    else:
        _build_parser().print_help(sys.stderr)
        return 2

    pd = plan.to_dict()
    if args.validate:
        pd = {"plan": pd, "schema_errors": validate_plan(pd)}
    print(json.dumps(pd, ensure_ascii=False, indent=2))
    return 0


__all__ = [
    "PLAN_SCHEMA",
    "PLAN_ONLY",
    "DISPATCH_PERFORMED",
    "TYPE_GLOBAL_LEDGER_SHA_FALSE_POSITIVE",
    "TYPE_STAGE_CLAIM_TEST_MISMATCH",
    "TYPE_COVERAGE_GAP",
    "ISSUE_TYPES",
    "DISPOSITION_AUTO",
    "DISPOSITION_CHAIR",
    "PlanOnlyViolation",
    "PathContainmentError",
    "assert_plan_only",
    "IssueClassification",
    "RemediationPlan",
    "classify_disposition",
    "next_plan_id",
    "build_remediation_plan",
    "extract_issue_from_adjudication",
    "build_plan_from_adjudication",
    "validate_plan",
    "run_self_check",
    "main",
]


if __name__ == "__main__":
    raise SystemExit(main())
