# -*- coding: utf-8 -*-
"""anu_v3.cli_output_path_guard — CLI output-path hardening guard (task-2617).

회장 직접 승인 (2026-05-19) BOUNDED REMEDIATION. task-2616 scan 이 확정한
Critical7 "arbitrary fs write" 3건(``batch_hold_adjudicator --output`` ·
``batch_dependency_classifier --out`` · ``pre_authorized_evidence_bundle_builder
--out``)의 출력 경로를 정책 허용 경로로만 제한하기 위한 **import-only 순수
모듈**.

설계 불변식 (회장 verbatim · task-2617 §4):

  * 본 모듈 자체에 argparse / main / CLI / ``__main__`` **절대 금지**
    (자기참조 결함 회피 — guard 가 또 다른 CLI sink 이 되면 안 됨).
  * 기본 출력은 stdout 유지 — 호출부의 ``else: stdout`` 분기는 불변.
  * 파일 출력은 policy 허용 경로로만 가능 (fail-closed).
  * CANONICAL_WS_ROOT(``/home/jay/workspace``) 기준 ``memory/events`` 또는
    ``memory/reports`` 직하위만 허용.
  * 파일명 task-id prefix 강제.
  * absolute path · ``../`` traversal · workspace 이탈 · symlink component ·
    hardlink · 기존파일 overwrite = **fail-closed**.
  * ``Path.resolve()`` strict-prefix 단독으로 충분하다고 가정하지 않는다.
    경로 component 별 ``O_NOFOLLOW`` open + ``lstat`` 으로 symlink component
    를 직접 거부한다.
  * ``O_NOFOLLOW``/``O_EXCL`` temp 생성 + ``os.link`` 기반 atomic no-overwrite
    + dir-fd 상대연산 으로 **TOCTOU 경계 차단**.
  * 검증 실패 시 **write 전 차단** (어떤 바이트도 디스크에 닿지 않는다).

NO-CRON / Layer A: 본 모듈은 cron register/remove · dispatch · merge ·
subprocess · credential 접근 ZERO. 경로 검증과 정책 허용 경로 내 atomic
파일 생성만 수행한다.
"""
from __future__ import annotations

import errno
import json
import os
import re
from dataclasses import dataclass, field
from pathlib import Path, PurePosixPath
from typing import FrozenSet, Optional, Sequence, Tuple, Union

GUARD_SCHEMA = "anu_v3.cli_output_path_guard.v1"

# ── canonical anchor (회장 §4 · CLAUDE.md §1) ────────────────────────────────
CANONICAL_WS_ROOT = Path("/home/jay/workspace")
DEFAULT_POLICY_PATH = CANONICAL_WS_ROOT / "config" / "cli_output_path_policy.yaml"

# task-id prefix 형태: task-2617 · task-2613+1 · task-2553+46 …
_TASK_ID_RE = re.compile(r"^task-\d+(?:\+\d+)?$")
# 파일명이 task-id prefix 로 시작하는지 (구분자: . _ -)
_FILENAME_TASK_PREFIX_RE = re.compile(r"^(task-\d+(?:\+\d+)?)[._-]")

PathLike = Union[str, "os.PathLike[str]"]


class GuardError(Exception):
    """fail-closed 위반. 어떤 write 도 수행되지 않았음을 의미한다."""


@dataclass(frozen=True)
class GuardPolicy:
    """CLI output-path 정책. yaml/json 에서 로드되거나 기본값으로 생성된다."""

    canonical_ws_root: Path = CANONICAL_WS_ROOT
    allowed_roots: Tuple[str, ...] = ("memory/events", "memory/reports")
    task_id_prefix_required: bool = True
    deny: FrozenSet[str] = field(
        default_factory=lambda: frozenset(
            {
                "absolute_outside_ws",
                "dotdot_traversal",
                "symlink_component",
                "hardlink",
                "ws_escape",
                "overwrite_existing",
            }
        )
    )
    single_segment_under_allowed_root: bool = True

    def normalized_roots(self) -> Tuple[PurePosixPath, ...]:
        return tuple(PurePosixPath(r) for r in self.allowed_roots)


# ── policy 로더 ──────────────────────────────────────────────────────────────
def load_policy(path: Optional[PathLike] = None) -> GuardPolicy:
    """yaml/json 정책 파일을 로드한다. 부재/파싱불가 = fail-closed (GuardError).

    인자 없으면 ``DEFAULT_POLICY_PATH`` 사용.
    """
    p = Path(path) if path is not None else DEFAULT_POLICY_PATH
    try:
        raw = p.read_text(encoding="utf-8")
    except OSError as e:  # noqa: PERF203
        raise GuardError(f"policy 로드 실패(fail-closed): {p}: {e}") from e

    data = None
    try:
        import yaml  # 지연 import — 순수 import-only 모듈 부담 최소화

        data = yaml.safe_load(raw)
    except Exception:  # pragma: no cover - yaml 부재/오류 시 json 폴백
        try:
            data = json.loads(raw)
        except Exception as e:
            raise GuardError(f"policy 파싱 실패(fail-closed): {p}: {e}") from e
    if not isinstance(data, dict):
        raise GuardError(f"policy 형식 오류(fail-closed): {p}")

    ws_root = Path(str(data.get("canonical_ws_root", str(CANONICAL_WS_ROOT))))
    allowed = data.get("allowed_roots") or ["memory/events", "memory/reports"]
    if not isinstance(allowed, (list, tuple)) or not allowed:
        raise GuardError("policy.allowed_roots 누락/형식오류(fail-closed)")
    deny = data.get("deny") or []
    filename_cfg = data.get("filename") or {}
    single = True
    if isinstance(filename_cfg, dict):
        single = bool(
            filename_cfg.get("single_segment_under_allowed_root", True)
        )
    return GuardPolicy(
        canonical_ws_root=ws_root,
        allowed_roots=tuple(str(x) for x in allowed),
        task_id_prefix_required=bool(data.get("task_id_prefix_required", True)),
        deny=frozenset(str(x) for x in deny) or GuardPolicy().deny,
        single_segment_under_allowed_root=single,
    )


# ── 경로 검증 (fail-closed · write 전) ───────────────────────────────────────
def _reject(msg: str) -> "GuardError":
    return GuardError(msg)


def _derive_task_id(filename: str) -> Optional[str]:
    m = _FILENAME_TASK_PREFIX_RE.match(filename)
    return m.group(1) if m else None


def validate_output_path(
    requested: PathLike,
    *,
    task_id: Optional[str] = None,
    policy: GuardPolicy,
) -> Path:
    """요청 출력경로를 정책으로 검증한다. **write 는 수행하지 않는다.**

    PASS 시 검증된 절대 target 경로를 반환. 위반 시 ``GuardError``
    (fail-closed — 어떤 바이트도 디스크에 닿지 않음).

    Path.resolve() strict-prefix 만으로 충분하다고 가정하지 않는다:
    여기서는 *문법적* 거부(absolute/.. /ws-escape/task-id)만 수행하고,
    symlink-component/overwrite/hardlink 같은 *물리적* TOCTOU 거부는
    실제 open 시점(:func:`atomic_guarded_write`)에 ``O_NOFOLLOW``/
    ``O_EXCL``/``lstat`` 으로 재확인한다.
    """
    try:
        req = os.fspath(requested)  # type: ignore[arg-type]
    except TypeError as e:
        raise _reject(f"output 경로 형식오류(fail-closed): {e!r}") from e
    if not isinstance(req, str) or req.strip() == "":
        raise _reject("output 경로 공백/None(fail-closed)")

    pp = PurePosixPath(req)

    # 1) absolute 거부 (ws 안 절대경로라 해도 상대표기 강제 — 단순·안전)
    if pp.is_absolute():
        raise _reject(f"absolute path 금지(fail-closed): {req}")

    # 2) 모든 component 검사: '..' / '.' / 빈 component 거부
    parts = pp.parts
    if not parts:
        raise _reject("빈 경로(fail-closed)")
    for seg in parts:
        if seg in ("..",):
            raise _reject(f"'..' traversal 금지(fail-closed): {req}")
        if seg in ("", "."):
            raise _reject(f"비정상 component 금지(fail-closed): {req}")
        if "\x00" in seg:
            raise _reject("NUL byte 금지(fail-closed)")

    # 3) allowed_root 직하위(single-segment) 강제
    rel = PurePosixPath(*parts)
    matched_root: Optional[PurePosixPath] = None
    for root in policy.normalized_roots():
        rp = root.parts
        if parts[: len(rp)] == rp:
            matched_root = root
            break
    if matched_root is None:
        raise _reject(
            f"allowed_roots {list(policy.allowed_roots)} 밖 출력 금지"
            f"(fail-closed): {req}"
        )
    remainder = parts[len(matched_root.parts):]
    if policy.single_segment_under_allowed_root and len(remainder) != 1:
        raise _reject(
            f"allowed_root 직하위 단일파일만 허용(fail-closed): {req}"
        )
    if not remainder:
        raise _reject(f"파일명 누락(fail-closed): {req}")
    filename = remainder[-1]

    # 4) task-id prefix 강제
    if policy.task_id_prefix_required:
        derived = _derive_task_id(filename)
        if derived is None or not _TASK_ID_RE.match(derived):
            raise _reject(
                f"파일명 task-id prefix 누락/형식오류(fail-closed): {filename}"
            )
        if task_id is not None and derived != task_id:
            raise _reject(
                f"task-id prefix 불일치(fail-closed): {filename} != {task_id}"
            )

    # 5) ws-escape 최종 확인 (정규화 후에도 ws_root 밖이면 거부)
    target = (policy.canonical_ws_root / rel)
    norm = os.path.normpath(str(target))
    ws_norm = os.path.normpath(str(policy.canonical_ws_root)) + os.sep
    if not (norm + os.sep).startswith(ws_norm):
        raise _reject(f"workspace 이탈(fail-closed): {req}")

    return Path(norm)


# ── TOCTOU-safe atomic write ────────────────────────────────────────────────
def _open_dir_nofollow(ws_root: Path, components: Sequence[str]) -> int:
    """ws_root 부터 component 별 O_NOFOLLOW open 으로 부모 dir fd 를 얻는다.

    중간 component 중 하나라도 symlink 면 ELOOP 로 open 실패 →
    symlink-component fail-closed. ws_root 자체는 신뢰 절대경로(고정 anchor).
    """
    base_flags = os.O_RDONLY | os.O_DIRECTORY
    if hasattr(os, "O_NOFOLLOW"):
        base_flags |= os.O_NOFOLLOW
    # ws_root 자체: O_NOFOLLOW 로 열어 ws_root 가 symlink 면 거부
    dfd = os.open(str(ws_root), base_flags)
    try:
        for comp in components:
            # 진입 전 lstat: symlink component 직접 거부 (이중 방어)
            st = os.lstat(comp, dir_fd=dfd)
            import stat as _stat

            if _stat.S_ISLNK(st.st_mode):
                raise _reject(f"symlink component 금지(fail-closed): {comp}")
            if not _stat.S_ISDIR(st.st_mode):
                raise _reject(f"비-디렉터리 component(fail-closed): {comp}")
            nfd = os.open(comp, base_flags, dir_fd=dfd)
            os.close(dfd)
            dfd = nfd
        return dfd
    except BaseException:
        os.close(dfd)
        raise


def atomic_guarded_write(
    requested: PathLike,
    data: Union[str, bytes],
    *,
    task_id: Optional[str] = None,
    policy: Optional[GuardPolicy] = None,
) -> Path:
    """정책 검증 후 TOCTOU-safe 하게 파일을 생성한다 (overwrite 금지).

    절차:
      1. :func:`validate_output_path` 로 문법 검증 (write 전 차단).
      2. ws_root 부터 부모 dir 까지 component 별 ``O_NOFOLLOW`` open
         (+lstat) — symlink component 거부.
      3. 부모 dir fd 상대로 ``O_CREAT|O_EXCL|O_NOFOLLOW`` temp 생성,
         write+fsync.
      4. ``os.link`` (src/dst dir_fd 상대) 로 final 생성 — final 이 이미
         존재하면 ``FileExistsError`` → overwrite/hardlink fail-closed.
      5. temp unlink + dir fsync. 모든 단계 dir-fd 상대 → 경로 재해석
         (TOCTOU) 차단.

    실패 시 부분 파일을 남기지 않고 ``GuardError`` 를 던진다.
    """
    pol = policy if policy is not None else load_policy()
    target = validate_output_path(requested, task_id=task_id, policy=pol)

    payload = data.encode("utf-8") if isinstance(data, str) else data

    rel = PurePosixPath(
        os.path.relpath(str(target), str(pol.canonical_ws_root))
    )
    *dir_parts, fname = rel.parts
    if ".." in dir_parts:  # 이중 방어 — validate 에서 이미 거부됨
        raise _reject("traversal(fail-closed)")

    dfd = _open_dir_nofollow(pol.canonical_ws_root, dir_parts)
    tmp_name = f".{fname}.task-2617.tmp.{os.getpid()}"
    tmp_fd = -1
    created_tmp = False
    try:
        # post-resolve revalidate: dir fd 의 실제 경로가 여전히 ws 안인지
        real_dir = os.path.realpath(f"/proc/self/fd/{dfd}")
        ws_real = os.path.realpath(str(pol.canonical_ws_root))
        if not (real_dir == ws_real or real_dir.startswith(ws_real + os.sep)):
            raise _reject(
                f"post-resolve workspace 이탈(fail-closed): {real_dir}"
            )

        # final 이 이미 존재하면(symlink 포함) overwrite fail-closed
        try:
            est = os.lstat(fname, dir_fd=dfd)
            raise _reject(
                f"기존 파일 overwrite 금지(fail-closed): {target} "
                f"(nlink={getattr(est, 'st_nlink', '?')})"
            )
        except FileNotFoundError:
            pass

        oflags = os.O_WRONLY | os.O_CREAT | os.O_EXCL
        if hasattr(os, "O_NOFOLLOW"):
            oflags |= os.O_NOFOLLOW
        try:
            tmp_fd = os.open(tmp_name, oflags, 0o600, dir_fd=dfd)
        except FileExistsError as e:
            raise _reject(
                f"temp 충돌(fail-closed): {tmp_name}: {e}"
            ) from e
        created_tmp = True
        with os.fdopen(tmp_fd, "wb", closefd=True) as fh:
            tmp_fd = -1  # fdopen 이 소유권 인수
            fh.write(payload)
            fh.flush()
            os.fsync(fh.fileno())
            # 사전 검증 containment 대상 inode 고정 (task-2617+2):
            # 이 temp 는 containment 검증된 부모 dir fd(dfd) 상대로
            # O_CREAT|O_EXCL|O_NOFOLLOW 로 생성됐다. ``os.link(tmp_name,
            # fname)`` 은 *이 inode* 로의 hardlink 를 만들므로, 정상 경로
            # 의 final inode 는 반드시 이 (st_dev, st_ino) 와 동일하다.
            # post-link 재검증의 inode-bound anchor — 재오픈된 fname 의
            # inode 가 여기서 벗어나면 substitution 으로 간주한다.
            _tmp_st = os.fstat(fh.fileno())
        trusted_dev_ino = (_tmp_st.st_dev, _tmp_st.st_ino)

        # atomic no-overwrite: link 는 dst 존재 시 EEXIST → overwrite 차단
        try:
            os.link(tmp_name, fname, src_dir_fd=dfd, dst_dir_fd=dfd)
        except FileExistsError as e:
            raise _reject(
                f"link 시점 기존파일 발견(fail-closed): {target}"
            ) from e
        except OSError as e:
            if e.errno == errno.EEXIST:
                raise _reject(
                    f"link 시점 기존파일 발견(fail-closed): {target}"
                ) from e
            raise

        # post-link inode-bound 재검증 (task-2617+2 — task-2617+1 의
        # reopen-by-name substitution race 폐쇄):
        # 과거 구현은 ``os.link()`` 후 fname 을 NAME 으로 재오픈해 그
        # realpath 만 검증했다 — link 가 생성한 *실제 inode* 에 bound 되지
        # 않아, link 와 재오픈 사이 fname 이 *다른 in-ws inode* 로 치환되면
        # realpath containment 만으로는 탐지하지 못했다(reopen-by-name
        # substitution race). 이제 재오픈 fd 를 hold 한 채 동일 fd 의
        # ``os.fstat`` (st_dev, st_ino) 가 사전 검증 containment 대상
        # (=link 대상 temp inode, ``trusted_dev_ino``) 과 *동일*한지
        # 확인한다(inode-bound). 더불어 같은 hold fd 의 ``/proc/self/fd``
        # realpath 가 ws_root/allowed_root 하위인지도 재확인해 dir-rename
        # TOCTOU(우리 inode 이나 부모 dir 가 ws 밖) 도 계속 차단한다.
        # 둘 중 하나라도 불일치면 final 을 즉시 제거 후 fail-closed
        # (SystemExit). 재오픈은 fd 획득 수단일 뿐 — 보안 판정은 inode
        # 동치 + 동일 hold-fd realpath 로 수행하므로 reopen-by-name 의
        # *판정 의존*은 제거됐다.
        ffd = -1
        try:
            f_oflags = os.O_RDONLY
            if hasattr(os, "O_NOFOLLOW"):
                f_oflags |= os.O_NOFOLLOW
            ffd = os.open(fname, f_oflags, dir_fd=dfd)
            final_st = os.fstat(ffd)
            final_real = os.path.realpath(f"/proc/self/fd/{ffd}")
        finally:
            if ffd != -1:
                try:
                    os.close(ffd)
                except OSError:
                    pass
        # (1) inode-bound: 재오픈 fd 가 link 가 만든(=사전 검증 temp)
        #     inode 와 동일해야 한다. 다른 inode 로 치환됐으면 즉시 실패.
        inode_bound_ok = (
            (final_st.st_dev, final_st.st_ino) == trusted_dev_ino
        )
        # (2) containment: 동일 hold fd 의 실제 경로가 여전히 ws 안.
        allowed_abs = tuple(
            os.path.normpath(os.path.join(ws_real, *r.parts))
            for r in pol.normalized_roots()
        )
        containment_ok = any(
            final_real == ar or final_real.startswith(ar + os.sep)
            for ar in allowed_abs
        )
        if not (inode_bound_ok and containment_ok):
            try:
                os.unlink(fname, dir_fd=dfd)
            except OSError:
                pass
            raise SystemExit(
                "post-link final-inode bound 이탈"
                "(fail-closed·reopen-by-name substitution/dir-rename "
                f"TOCTOU): inode_bound={inode_bound_ok} "
                f"containment={containment_ok} real={final_real}"
            )

        os.unlink(tmp_name, dir_fd=dfd)
        created_tmp = False
        try:
            os.fsync(dfd)
        except OSError:  # pragma: no cover - dir fsync 미지원 fs
            pass
        return target
    except GuardError:
        raise
    except Exception as e:  # 예기치 못한 OS 오류도 fail-closed 로 승격
        raise GuardError(f"guarded write 실패(fail-closed): {e}") from e
    finally:
        if tmp_fd != -1:
            try:
                os.close(tmp_fd)
            except OSError:
                pass
        if created_tmp:
            try:
                os.unlink(tmp_name, dir_fd=dfd)
            except OSError:
                pass
        try:
            os.close(dfd)
        except OSError:
            pass


__all__ = [
    "GUARD_SCHEMA",
    "CANONICAL_WS_ROOT",
    "DEFAULT_POLICY_PATH",
    "GuardError",
    "GuardPolicy",
    "load_policy",
    "validate_output_path",
    "atomic_guarded_write",
]
