# -*- coding: utf-8 -*-
"""Regression — task-2617 CLI output-path guard (회장 직접 승인 2026-05-19).

검증 범위 (task-2617 §5):
  * guard 모듈 import-only (argparse/main/CLI/__main__ 부재)
  * policy 허용 경로(memory/events|reports 직하위·task-id prefix)만 write
  * absolute · ../ · ws-escape · symlink-component · hardlink · overwrite
    = fail-closed (write 전 차단·부분파일 미잔존)
  * 3 CLI 호출부(batch_hold_adjudicator/batch_dependency_classifier/
    pre_authorized_evidence_bundle_builder) 가 guard 경유
  * stdout-only 3건(auto_remediation_planner/codex_high_classifier/
    critical7_classifier) byte-0 불변
  * 변경 3파일의 stdout 기본 동작 불변(--out 없을 때)
"""
from __future__ import annotations

import ast
import hashlib
import os
import subprocess
import sys
from pathlib import Path

import pytest

WS = Path("/home/jay/workspace")
GUARD_MOD = WS / "anu_v3" / "cli_output_path_guard.py"
POLICY = WS / "config" / "cli_output_path_policy.yaml"

sys.path.insert(0, str(WS))

from anu_v3.cli_output_path_guard import (  # noqa: E402
    GuardError,
    GuardPolicy,
    atomic_guarded_write,
    load_policy,
    validate_output_path,
)

# stdout-only 3건: 본 remediation 으로 절대 변경 금지 → byte-0 anchor.
STDOUT_ONLY_BASELINE = {
    "anu_v3/auto_remediation_planner.py":
        "150e79992dbbfe41a432abf6096a8ade0e897f0eef23c86e585103b1eaa5b364",
    "anu_v3/codex_high_classifier.py":
        "214af21eac48b184e37d4ed86403417636098c84f4e573cad949454ec3964006",
    "anu_v3/critical7_classifier.py":
        "6143a2d25fff1f15201feb461c9ce28e62d3400f5893264fe445d311fe4877f2",
}


def _sha(p: Path) -> str:
    return hashlib.sha256(p.read_bytes()).hexdigest()


@pytest.fixture()
def policy() -> GuardPolicy:
    return load_policy(POLICY)


# ── guard 모듈 import-only 검증 ──────────────────────────────────────────────
def test_guard_module_is_import_only_no_cli():
    src = GUARD_MOD.read_text(encoding="utf-8")
    tree = ast.parse(src)
    # 문자열/주석이 아닌 *코드* 차원에서만 검사 (docstring 의 금지 설명은 허용).
    for node in ast.walk(tree):
        if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
            assert node.name != "main", "guard 에 main() 금지"
        if isinstance(node, ast.Import):
            for a in node.names:
                assert a.name != "argparse", "guard 에 argparse 금지"
        if isinstance(node, ast.ImportFrom):
            assert node.module != "argparse", "guard 에 argparse 금지"
        # `if __name__ == "__main__":` 엔트리포인트 블록 부재
        if isinstance(node, ast.If):
            cond = ast.dump(node.test)
            assert "__name__" not in cond or "__main__" not in cond, (
                "guard 에 __main__ 엔트리포인트 금지"
            )
    # 모듈 import 시 부작용(write/dispatch) 0 — 이미 import 됨이 곧 증거.
    assert callable(validate_output_path) and callable(atomic_guarded_write)


# ── 허용 경로 PASS ──────────────────────────────────────────────────────────
def test_allowed_event_path_pass(policy):
    p = validate_output_path(
        "memory/events/task-2617.unit.json", task_id=None, policy=policy
    )
    assert p == WS / "memory/events/task-2617.unit.json"


def test_allowed_report_path_pass(policy):
    p = validate_output_path(
        "memory/reports/task-2617.unit.md", task_id="task-2617", policy=policy
    )
    assert p == WS / "memory/reports/task-2617.unit.md"


# ── fail-closed 거부 ────────────────────────────────────────────────────────
@pytest.mark.parametrize(
    "bad",
    [
        "/etc/passwd",
        "/home/jay/workspace/memory/events/task-2617.x.json",  # absolute
        "memory/events/../../../etc/task-2617.x.json",
        "memory/events/../reports/task-2617.x.json",
        "anu_v3/task-2617.evil.py",
        "config/task-2617.evil.yaml",
        "memory/fixtures/task-2617.x.json",            # fixtures 비허용
        "memory/events/no_task_prefix.json",
        "memory/events/2617.json",                     # task- 누락
        "memory/events/sub/task-2617.x.json",          # 중첩 금지
        "",
        ".",
        "memory/events/",
    ],
)
def test_reject_paths_fail_closed(policy, bad):
    with pytest.raises(GuardError):
        validate_output_path(bad, task_id=None, policy=policy)


def test_taskid_mismatch_rejected(policy):
    with pytest.raises(GuardError):
        validate_output_path(
            "memory/events/task-2610.x.json",
            task_id="task-2617",
            policy=policy,
        )


def test_none_rejected(policy):
    with pytest.raises(GuardError):
        validate_output_path(None, task_id=None, policy=policy)  # type: ignore[arg-type]


# ── 물리 write: 허용 경로만, fail-closed 시 부분파일 미잔존 ─────────────────
def test_atomic_write_allowed_and_overwrite_blocked(policy):
    rel = "memory/events/task-2617._regr_probe.json"
    full = WS / rel
    if full.exists():
        full.unlink()
    try:
        ret = atomic_guarded_write(rel, '{"ok":1}\n', task_id=None, policy=policy)
        assert ret == full and full.read_text() == '{"ok":1}\n'
        before = _sha(full)
        # overwrite 시도 → fail-closed, 원본 불변
        with pytest.raises(GuardError):
            atomic_guarded_write(rel, "TAMPER", task_id=None, policy=policy)
        assert _sha(full) == before, "overwrite 차단 후 원본 byte 불변이어야"
    finally:
        if full.exists():
            full.unlink()


def test_rejected_write_leaves_no_partial_file(policy):
    rel = "anu_v3/task-2617.should_not_exist.py"
    with pytest.raises(GuardError):
        atomic_guarded_write(rel, "PARTIAL", task_id=None, policy=policy)
    assert not (WS / rel).exists(), "거부 시 어떤 파일도 생성되면 안 됨"
    # temp 잔존물도 없어야
    assert not list((WS / "anu_v3").glob(".*task-2617.tmp*"))


def test_symlink_component_fail_closed(tmp_path, policy):
    """memory/events 하위에 symlink 디렉터리를 만들어 통과 시도 → 거부.

    single-segment 정책상 events 직하위 파일만 허용되므로, symlink
    component 경유 경로는 문법(중첩) 또는 O_NOFOLLOW 단계에서 거부된다.
    """
    victim = tmp_path / "outside"
    victim.mkdir()
    link = WS / "memory/events/task-2617-symlinkdir"
    if link.is_symlink() or link.exists():
        link.unlink()
    os.symlink(victim, link)
    try:
        with pytest.raises(GuardError):
            atomic_guarded_write(
                "memory/events/task-2617-symlinkdir/task-2617.x.json",
                "X",
                task_id=None,
                policy=policy,
            )
        assert not (victim / "task-2617.x.json").exists()
    finally:
        if link.is_symlink():
            link.unlink()


# ── 3 CLI 호출부 guard 경유 검증 ────────────────────────────────────────────
TARGET_SINKS = [
    "anu_v3/batch_hold_adjudicator.py",
    "anu_v3/batch_dependency_classifier.py",
    "anu_v3/pre_authorized_evidence_bundle_builder.py",
]


@pytest.mark.parametrize("rel", TARGET_SINKS)
def test_cli_sinks_route_through_guard(rel):
    src = (WS / rel).read_text(encoding="utf-8")
    assert "from anu_v3.cli_output_path_guard import atomic_guarded_write" in src
    assert "atomic_guarded_write(" in src
    # 원시 arbitrary write 패턴이 제거됐는지 (해당 sink 한정)
    assert "Path(a.output).write_text" not in src
    assert "Path(args.out).write_text" not in src


def test_batch_hold_adjudicator_stdout_default_unchanged(tmp_path):
    """--output 없으면 stdout 으로 그대로 — 동작 불변."""
    payload = tmp_path / "p.json"
    payload.write_text("{}", encoding="utf-8")
    r = subprocess.run(
        [sys.executable, "-m", "anu_v3.batch_hold_adjudicator",
         "--input", str(payload)],
        cwd=str(WS), capture_output=True, text=True,
    )
    assert r.returncode == 0, r.stderr
    assert r.stdout.strip(), "stdout 출력이 있어야(동작 불변)"
    # 디스크 부작용 0
    assert not (WS / "p.json").exists()


def test_cli_out_to_disallowed_path_fail_closed(tmp_path):
    """--output 이 비허용 경로면 fail-closed(비정상 종료·파일 미생성)."""
    payload = tmp_path / "p.json"
    payload.write_text("{}", encoding="utf-8")
    evil = tmp_path / "evil_out.json"
    r = subprocess.run(
        [sys.executable, "-m", "anu_v3.batch_hold_adjudicator",
         "--input", str(payload), "--output", str(evil)],
        cwd=str(WS), capture_output=True, text=True,
    )
    assert r.returncode != 0, "비허용 경로 write 는 실패해야"
    assert not evil.exists(), "fail-closed: 파일 미생성"
    assert "fail-closed" in (r.stderr + r.stdout)


# ── stdout-only 3건 byte-0 anchor ───────────────────────────────────────────
@pytest.mark.parametrize("rel,sha", list(STDOUT_ONLY_BASELINE.items()))
def test_stdout_only_modules_byte0_unchanged(rel, sha):
    assert _sha(WS / rel) == sha, f"{rel} 는 본 remediation 으로 불변이어야"


# ── task-2617+1: post-link final-inode bound 재검증 (dir-rename TOCTOU) ──────
def _mk_sandbox_ws(tmp_path) -> Path:
    ws = tmp_path / "ws"
    (ws / "memory" / "events").mkdir(parents=True)
    return ws


def test_dir_rename_after_check_toctou_blocked(tmp_path, monkeypatch):
    """containment 체크 ~ os.link 사이 부모 dir 가 ws 밖으로 rename 되면
    post-link final-inode bound 재검증이 이를 탐지해 final 을 즉시 unlink
    하고 fail-closed(SystemExit). (잔여 HIGH 재현·차단)"""
    import anu_v3.cli_output_path_guard as g

    ws = _mk_sandbox_ws(tmp_path)
    events = ws / "memory" / "events"
    outside = tmp_path / "evil_moved"      # ws 밖
    pol = GuardPolicy(canonical_ws_root=ws)

    real_link = os.link

    def toctou_link(src, dst, *, src_dir_fd=None, dst_dir_fd=None):
        # check 통과 후 link 직전: 부모 dir 를 ws 밖으로 rename (TOCTOU 주입)
        if events.exists():
            os.rename(str(events), str(outside))
        return real_link(
            src, dst, src_dir_fd=src_dir_fd, dst_dir_fd=dst_dir_fd
        )

    monkeypatch.setattr(g.os, "link", toctou_link)

    with pytest.raises(SystemExit) as ei:
        atomic_guarded_write(
            "memory/events/task-2617.toctou.json",
            '{"x":1}\n',
            task_id=None,
            policy=pol,
        )
    assert "post-link final-inode bound 이탈" in str(ei.value)
    # TOCTOU 로 ws 밖에 안착한 final 은 즉시 unlink 됐어야 (fail-closed)
    assert not (outside / "task-2617.toctou.json").exists()
    # 원래 위치에도 없어야 · temp 잔존물 0
    assert not (ws / "memory/events/task-2617.toctou.json").exists()
    assert not list(outside.glob(".*tmp*"))


def test_post_link_check_passes_for_legitimate_write(tmp_path):
    """rename 주입 없을 때(정상 경로): 변경 후에도 write 성공·final 이
    ws 안에 안착 — 기존 task-2617 통과 경로 동작 불변 입증."""
    ws = _mk_sandbox_ws(tmp_path)
    pol = GuardPolicy(canonical_ws_root=ws)
    ret = atomic_guarded_write(
        "memory/events/task-2617.ok.json",
        '{"ok":1}\n',
        task_id=None,
        policy=pol,
    )
    final = ws / "memory/events/task-2617.ok.json"
    assert ret == final and final.read_text() == '{"ok":1}\n'
    assert not list((ws / "memory/events").glob(".*tmp*"))


# ── task-2617+2: reopen-by-name substitution race (inode-bound 차단) ─────────
def test_reopen_by_name_substitution_race_blocked(tmp_path, monkeypatch):
    """잔여 HIGH 재현: ``os.link()`` 직후·재오픈 직전 fname 이 *다른
    in-ws inode* 로 치환되는 reopen-by-name substitution race.

    치환된 파일은 ws 안(events 직하위)이라 realpath containment 만으로는
    통과한다 — 그러나 post-link 재검증이 inode-bound(link 가 만든 temp
    inode 의 st_dev/st_ino 와 동치) 이므로 inode 불일치를 탐지해 final
    을 즉시 unlink 하고 fail-closed(SystemExit). containment=True 인데도
    inode_bound=False 로 차단됨을 입증한다."""
    import anu_v3.cli_output_path_guard as g

    ws = _mk_sandbox_ws(tmp_path)
    events = ws / "memory" / "events"
    pol = GuardPolicy(canonical_ws_root=ws)

    real_link = os.link

    def substituting_link(src, dst, *, src_dir_fd=None, dst_dir_fd=None):
        # 정상 link 수행(final = temp inode 의 hardlink) 직후, link 와
        # 재오픈 사이를 모사: dst 를 제거하고 *같은 in-ws dir* 에 전혀
        # 다른 inode 의 동명 파일을 심는다(치환). 경로는 ws 안이므로
        # realpath containment 는 통과 — inode-bound 만이 이를 탐지.
        rv = real_link(
            src, dst, src_dir_fd=src_dir_fd, dst_dir_fd=dst_dir_fd
        )
        os.unlink(dst, dir_fd=dst_dir_fd)
        dflags = os.O_WRONLY | os.O_CREAT | os.O_EXCL
        if hasattr(os, "O_NOFOLLOW"):
            dflags |= os.O_NOFOLLOW
        decoy_fd = os.open(dst, dflags, 0o600, dir_fd=dst_dir_fd)
        os.write(decoy_fd, b"DECOY-DIFFERENT-INODE\n")
        os.close(decoy_fd)
        return rv

    monkeypatch.setattr(g.os, "link", substituting_link)

    with pytest.raises(SystemExit) as ei:
        atomic_guarded_write(
            "memory/events/task-2617.subst.json",
            '{"x":1}\n',
            task_id=None,
            policy=pol,
        )
    msg = str(ei.value)
    assert "post-link final-inode bound 이탈" in msg
    assert "inode_bound=False" in msg, msg
    # 치환 파일은 ws 안 → realpath containment 는 통과했어야
    assert "containment=True" in msg, msg
    # inode 불일치로 치환된 decoy 는 즉시 unlink 됐어야 (fail-closed)
    assert not (events / "task-2617.subst.json").exists()
    # temp 잔존물 0
    assert not list(events.glob(".*tmp*"))


def test_guard_policy_file_present_and_valid():
    pol = load_policy(POLICY)
    assert str(pol.canonical_ws_root) == "/home/jay/workspace"
    assert set(pol.allowed_roots) == {"memory/events", "memory/reports"}
    assert pol.task_id_prefix_required is True
    assert pol.single_segment_under_allowed_root is True
