"""tests.regression.test_owner_trigger_security_boundaries_2553 — fixture 5 + 보안 경계.

회장 §명시:
  - fixture 5: **non-queue-head blocked** — queue-head 아닌 PR 에 trigger 시 fail-fast
  - 15 금지 박제 (보안 경계 적대적 평가 핵심, Lv.4 security)

본 regression 의 7 테스트 케이스는 다음 doctrine 위반 시나리오를 차단한다:
  1. queue-head 아닌 PR 에 trigger (금지 #9)
  2. comment body strict equality 위반 (금지 #7)
  3. endpoint allowlist 위반 (금지 #1~5: merge/approve/close/reopen/push)
  4. token value log (금지 #12, #13)
  5. default GH_TOKEN fallback (금지 #14)
  6. owner_trigger_pat.py 소스 안에 forbidden endpoint 호출 표현식 0
  7. OWNER PAT env value leak (금지 #12, #13 — stdout/stderr/audit 어디에도 노출 0)

본 모듈은 `anu_v2.owner_trigger_pat` 를 **수정하지 않고** 정적 + behavioral 검증만 수행.
"""

from __future__ import annotations

import json
import subprocess
import sys
from pathlib import Path
from typing import Any, Mapping, Sequence

import pytest

# workspace root → sys.path
_WORKSPACE_ROOT = Path(__file__).resolve().parents[2]
if str(_WORKSPACE_ROOT) not in sys.path:
    sys.path.insert(0, str(_WORKSPACE_ROOT))

from anu_v2.owner_trigger_pat import (  # noqa: E402
    ALLOWED_COMMENT_BODY,
    DECISION_REJECT,
    ERR_BODY_NOT_ALLOWED,
    EVIDENCE_MISSING_FOR_CURRENT_HEAD,
    OUTCOME_OK,
    OUTCOME_REJECTED,
    OWNER_PAT_ENV_NAME,
    OwnerTriggerPat,
    assert_body_allowed,
    write_decision_json,
)


# ─── helpers ────────────────────────────────────────────────────────────────
def _cp(returncode: int = 0, stdout: str = "", stderr: str = "") -> subprocess.CompletedProcess:
    return subprocess.CompletedProcess(args=[], returncode=returncode, stdout=stdout, stderr=stderr)


_FAKE_OWNER_PAT = "ghp_SECRET12345"
_FIXED_TS = "2026-05-11T12:00:00+00:00"

# owner_trigger_pat.py 소스 위치 (정적 grep 대상)
_OWNER_TRIGGER_SRC = (
    _WORKSPACE_ROOT / "anu_v2" / "owner_trigger_pat.py"
)


def _make_trigger(
    gh_calls: list[dict[str, Any]],
    audit_calls: list[dict[str, Any]],
    decision_calls: list[dict[str, Any]],
    *,
    fail_returncode: int = 0,
    fail_stderr: str = "",
) -> OwnerTriggerPat:
    def gh_runner(args: Sequence[str], env: Mapping[str, str]) -> subprocess.CompletedProcess:
        gh_calls.append({"args": list(args), "env": dict(env or {})})
        return _cp(returncode=fail_returncode, stderr=fail_stderr)

    def audit_writer(record: Mapping[str, Any]) -> None:
        audit_calls.append(dict(record))

    def decision_writer(payload: Mapping[str, Any], path: Path) -> None:
        decision_calls.append({"payload": dict(payload), "path": str(path)})
        write_decision_json(payload, path)

    return OwnerTriggerPat(
        gh_runner=gh_runner,
        audit_writer=audit_writer,
        decision_writer=decision_writer,
        owner="jeon-jonghyuk",
        repo="taskctl-anu",
        clock=lambda: _FIXED_TS,
    )


# ─── 1. non_queue_head_blocked (fixture 5) ──────────────────────────────────
@pytest.mark.parametrize("bad_queue_position", [1, 2, 5, 10])
def test_non_queue_head_blocked(
    tmp_path: Path,
    bad_queue_position: int,
    monkeypatch: pytest.MonkeyPatch,
) -> None:
    """queue_position != 0 인 PR trigger 시도 → REJECT + gh 호출 0.

    회장 §명시 fixture 5 + 15 금지 #9.
    """
    monkeypatch.setenv(OWNER_PAT_ENV_NAME, _FAKE_OWNER_PAT)
    gh_calls: list[dict[str, Any]] = []
    audit_calls: list[dict[str, Any]] = []
    decision_calls: list[dict[str, Any]] = []
    trigger = _make_trigger(gh_calls, audit_calls, decision_calls)

    out = trigger.trigger_gemini_review(
        pr_number=42,
        head_sha="nqh",
        queue_position=bad_queue_position,
        gemini_evidence_state=EVIDENCE_MISSING_FOR_CURRENT_HEAD,
        audit_log_path=tmp_path / "audit.jsonl",
        decision_json_path=tmp_path / "decision.json",
    )
    assert out.outcome == OUTCOME_REJECTED
    reason_upper = out.reason.upper()
    assert ("QUEUE_HEAD" in reason_upper) or ("NOT_QUEUE_HEAD" in reason_upper)
    # gh 호출 0
    assert len(gh_calls) == 0
    # decision REJECT 박제
    decision_data = json.loads((tmp_path / "decision.json").read_text(encoding="utf-8"))
    assert decision_data["decision"] == DECISION_REJECT


# ─── 2. comment_body_strict_equality ────────────────────────────────────────
def test_comment_body_strict_equality_rejects_variants() -> None:
    """`/gemini review` 외 body 입력 시 RuntimeError("BODY_NOT_ALLOWED") raise.

    회장 §명시 15 금지 #7 + 6 허용 #1: comment body 정확히 `/gemini review` only.
    strict equality (대소문자/공백/접미사 등 모두 차단).
    """
    # OK case — 박제된 상수와 동일
    assert_body_allowed(ALLOWED_COMMENT_BODY)

    # NG cases — 다양한 변형 시도
    bad_bodies = [
        "/gemini review please",
        " /gemini review",
        "/gemini review ",
        "/gemini Review",
        "/Gemini review",
        "/gemini-review",
        "/gemini  review",   # double space
        "/gemini review\n",
        "/gemini review;",
        "",
        "gemini review",     # missing slash
        "/gemini review!",
    ]
    for body in bad_bodies:
        with pytest.raises(RuntimeError) as exc:
            assert_body_allowed(body)
        assert ERR_BODY_NOT_ALLOWED in str(exc.value)

    # ALLOWED_COMMENT_BODY 자체가 strict equality string 으로 박제되어 있는지
    # 정적 어설션 (regex 가 아니라 string constant).
    assert ALLOWED_COMMENT_BODY == "/gemini review"
    assert isinstance(ALLOWED_COMMENT_BODY, str)


# ─── 3. endpoint_allowlist_enforced ─────────────────────────────────────────
def test_endpoint_allowlist_enforced_exact_args(
    tmp_path: Path,
    monkeypatch: pytest.MonkeyPatch,
) -> None:
    """gh_runner 에 전달되는 args 가 정확히 issue comments POST 1 endpoint 형식.

    회장 §명시 15 금지 #1~5: merge / approve / close / reopen / push 호출 0.
    """
    monkeypatch.setenv(OWNER_PAT_ENV_NAME, _FAKE_OWNER_PAT)
    gh_calls: list[dict[str, Any]] = []
    audit_calls: list[dict[str, Any]] = []
    decision_calls: list[dict[str, Any]] = []
    trigger = _make_trigger(gh_calls, audit_calls, decision_calls)

    pr = 81
    out = trigger.trigger_gemini_review(
        pr_number=pr,
        head_sha="ep_test",
        queue_position=0,
        gemini_evidence_state=EVIDENCE_MISSING_FOR_CURRENT_HEAD,
        audit_log_path=tmp_path / "audit.jsonl",
        decision_json_path=tmp_path / "decision.json",
    )
    assert out.outcome == OUTCOME_OK
    assert len(gh_calls) == 1
    args = gh_calls[0]["args"]

    # 정확히 일치
    expected = [
        "api", "-X", "POST",
        f"/repos/jeon-jonghyuk/taskctl-anu/issues/{pr}/comments",
        "-f", f"body={ALLOWED_COMMENT_BODY}",
    ]
    assert args == expected

    # merge / approve / close / reopen / push 키워드 0 (args + 평탄화 후)
    flat = " ".join(args)
    forbidden_fragments = (
        "/merges", "/merge",
        "/approvals", "/approve", "/reviews",
        "state=closed", "state=open",
        "pr merge", "pr approve", "pr close", "pr reopen",
        "git push", "/git/refs", "/branches/",
        "/pulls/",
    )
    for frag in forbidden_fragments:
        assert frag not in flat, f"forbidden fragment {frag!r} found in args: {args}"


# ─── 4. token_value_never_logged ────────────────────────────────────────────
def test_token_value_never_logged_in_audit(
    tmp_path: Path,
    monkeypatch: pytest.MonkeyPatch,
) -> None:
    """token=`ghp_SECRET12345` 환경에서 trigger 후 audit dict 직렬화에 token raw 0 + token_hash 12자.

    회장 §명시 15 금지 #12, #13. audit 에는 token_present (bool) 또는 token_hash
    (sha256 prefix 12자 hex) 만 등장.
    """
    monkeypatch.setenv(OWNER_PAT_ENV_NAME, _FAKE_OWNER_PAT)
    gh_calls: list[dict[str, Any]] = []
    audit_calls: list[dict[str, Any]] = []
    decision_calls: list[dict[str, Any]] = []
    trigger = _make_trigger(gh_calls, audit_calls, decision_calls)

    out = trigger.trigger_gemini_review(
        pr_number=81,
        head_sha="logtest",
        queue_position=0,
        gemini_evidence_state=EVIDENCE_MISSING_FOR_CURRENT_HEAD,
        audit_log_path=tmp_path / "audit.jsonl",
        decision_json_path=tmp_path / "decision.json",
    )
    assert out.outcome == OUTCOME_OK

    # audit dict 직렬화 결과에 token raw 0
    record = audit_calls[-1]
    serialized = json.dumps(record, sort_keys=True)
    assert _FAKE_OWNER_PAT not in serialized
    # token_present (bool) 존재
    assert record["token_present"] is True
    # token_hash 12자 hex
    assert "token_hash" in record
    token_hash = record["token_hash"]
    assert isinstance(token_hash, str)
    assert len(token_hash) == 12
    assert all(c in "0123456789abcdef" for c in token_hash)

    # decision JSON 에도 token raw 0
    decision_text = (tmp_path / "decision.json").read_text(encoding="utf-8")
    assert _FAKE_OWNER_PAT not in decision_text

    # outcome.reason 도 token raw 0
    assert _FAKE_OWNER_PAT not in out.reason


def test_token_value_redacted_on_gh_runner_failure(
    tmp_path: Path,
    monkeypatch: pytest.MonkeyPatch,
) -> None:
    """gh stderr 에 token 이 우연히 박혀도 redact 처리되어 reason/audit 어디에도 token raw 0."""
    monkeypatch.setenv(OWNER_PAT_ENV_NAME, _FAKE_OWNER_PAT)
    gh_calls: list[dict[str, Any]] = []
    audit_calls: list[dict[str, Any]] = []
    decision_calls: list[dict[str, Any]] = []
    trigger = _make_trigger(
        gh_calls, audit_calls, decision_calls,
        fail_returncode=1,
        fail_stderr=f"401 unauthorized: token={_FAKE_OWNER_PAT} expired",
    )

    out = trigger.trigger_gemini_review(
        pr_number=81,
        head_sha="fail",
        queue_position=0,
        gemini_evidence_state=EVIDENCE_MISSING_FOR_CURRENT_HEAD,
        audit_log_path=tmp_path / "audit.jsonl",
        decision_json_path=tmp_path / "decision.json",
    )
    assert out.outcome == "failed"
    # reason / audit 모두 redact
    assert _FAKE_OWNER_PAT not in out.reason
    record = audit_calls[-1]
    assert _FAKE_OWNER_PAT not in json.dumps(record)


# ─── 5. default_gh_token_fallback_blocked ───────────────────────────────────
def test_default_gh_token_fallback_blocked(
    tmp_path: Path,
    monkeypatch: pytest.MonkeyPatch,
) -> None:
    """OWNER_GEMINI_TRIGGER_PAT 누락 + GH_TOKEN 존재 시 fallback X — REJECT.

    회장 §명시 15 금지 #14: default GH_TOKEN fallback. 누락 시 즉시 REJECT
    (silent skip 금지).
    """
    monkeypatch.delenv(OWNER_PAT_ENV_NAME, raising=False)
    monkeypatch.setenv("GH_TOKEN", "ghp_FALLBACK_FAKE")
    monkeypatch.setenv("GITHUB_TOKEN", "ghp_FALLBACK_FAKE")

    gh_calls: list[dict[str, Any]] = []
    audit_calls: list[dict[str, Any]] = []
    decision_calls: list[dict[str, Any]] = []
    trigger = _make_trigger(gh_calls, audit_calls, decision_calls)

    out = trigger.trigger_gemini_review(
        pr_number=81,
        head_sha="nofallback",
        queue_position=0,
        gemini_evidence_state=EVIDENCE_MISSING_FOR_CURRENT_HEAD,
        audit_log_path=tmp_path / "audit.jsonl",
        decision_json_path=tmp_path / "decision.json",
    )
    assert out.outcome == OUTCOME_REJECTED
    assert "token_missing" in out.reason
    # gh 호출 0
    assert len(gh_calls) == 0
    # fallback token 이 audit 어디에도 leak 안 됨
    assert "ghp_FALLBACK_FAKE" not in json.dumps(audit_calls[-1])
    assert "ghp_FALLBACK_FAKE" not in out.reason


# ─── 6. merge_approve_close_reopen_push — source grep ───────────────────────
def test_merge_approve_close_reopen_push_no_call_in_source() -> None:
    """owner_trigger_pat.py 소스에 merge/approve/close/reopen/push 호출 표현식 0.

    회장 §명시 15 금지 #1~5 박제. 본 테스트는 모듈 소스를 파일 read 후 정적 grep.
    docstring / 주석 / forbidden fragment 상수 (의도된 negative list) 는 검사 대상에서
    제외한다. 실제 gh api 호출 표현식이 존재하면 fail.
    """
    src = _OWNER_TRIGGER_SRC.read_text(encoding="utf-8")

    # 실제 코드에 등장하면 안 되는 "호출 표현식 패턴".
    forbidden_call_patterns = (
        # gh CLI 직접 명령 (gh pr merge / approve / close / reopen / push)
        '"pr", "merge"',
        '"pr", "approve"',
        '"pr", "close"',
        '"pr", "reopen"',
        "subprocess.run([\"git\", \"push\"",
        # API endpoint hardcoded 호출 (string literal)
        '"/repos/{owner}/{repo}/pulls/{pr}/merge"',
        '"/repos/{owner}/{repo}/pulls/{pr}/reviews"',
        # 직접 함수 호출 명
        ".merge_pr(",
        ".approve_pr(",
        ".close_pr(",
        ".reopen_pr(",
        ".push_commit(",
    )
    for pat in forbidden_call_patterns:
        assert pat not in src, f"forbidden call pattern found in source: {pat!r}"

    # gh runner 호출은 단 1군데 (self._gh(args, env)) 임을 추가 박제 — 호출
    # 빈도 검증 (단일 책임 박제).
    gh_call_count = src.count("self._gh(args, env)")
    assert gh_call_count == 1, f"expected 1 self._gh(...) call, found {gh_call_count}"


# ─── 7. owner_pat_env_isolated — capsys 로 stdout/stderr leak 검증 ──────────
def test_owner_pat_env_isolated_no_leak_in_stdout_stderr_or_audit(
    tmp_path: Path,
    monkeypatch: pytest.MonkeyPatch,
    capsys: pytest.CaptureFixture[str],
) -> None:
    """trigger 후 OWNER PAT 값이 stdout/stderr/audit 어디에도 leak 0.

    회장 §명시 15 금지 #12, #13: token value log 0.
    """
    monkeypatch.setenv(OWNER_PAT_ENV_NAME, _FAKE_OWNER_PAT)
    gh_calls: list[dict[str, Any]] = []
    audit_calls: list[dict[str, Any]] = []
    decision_calls: list[dict[str, Any]] = []
    trigger = _make_trigger(gh_calls, audit_calls, decision_calls)

    out = trigger.trigger_gemini_review(
        pr_number=81,
        head_sha="isolation",
        queue_position=0,
        gemini_evidence_state=EVIDENCE_MISSING_FOR_CURRENT_HEAD,
        audit_log_path=tmp_path / "audit.jsonl",
        decision_json_path=tmp_path / "decision.json",
    )
    assert out.outcome == OUTCOME_OK

    captured = capsys.readouterr()
    # stdout / stderr 모두 token value leak 0
    assert _FAKE_OWNER_PAT not in captured.out
    assert _FAKE_OWNER_PAT not in captured.err

    # audit + decision 도 leak 0
    record_text = json.dumps(audit_calls[-1])
    assert _FAKE_OWNER_PAT not in record_text
    decision_text = (tmp_path / "decision.json").read_text(encoding="utf-8")
    assert _FAKE_OWNER_PAT not in decision_text

    # outcome 객체 자체에도 token raw 0
    assert _FAKE_OWNER_PAT not in out.reason
    assert _FAKE_OWNER_PAT not in out.decision_path

    # gh_calls 의 env dict 에는 GH_TOKEN/GITHUB_TOKEN 값으로 token 이 들어가지만,
    # 이는 subprocess 호출 인자 (외부 leak 경로 X). 본 fixture 의 gh_runner 가
    # capsys 영역으로 출력하지 않음을 추가 확인.
    assert gh_calls[0]["env"]["GH_TOKEN"] == _FAKE_OWNER_PAT  # subprocess env only


# ─── 8. 추가 정적 어설션 — 모듈 외 OWNER PAT env 이름 등장 0 ─────────────────
def test_owner_pat_env_name_isolated_in_module() -> None:
    """OWNER_GEMINI_TRIGGER_PAT env 이름이 owner_trigger_pat.py 외 다른 anu_v2 모듈에
    등장하지 않는지 박제 (one-way isolation, 회장 §명시 secret 별도 저장).

    본 테스트는 anu_v2 디렉토리 내 .py 파일을 스캔하여 OWNER_GEMINI_TRIGGER_PAT
    문자열이 owner_trigger_pat.py 외 다른 곳에 hardcoded 되어 있지 않은지 검증.
    test 파일은 정상적으로 import 하므로 제외.
    """
    anu_v2_dir = _WORKSPACE_ROOT / "anu_v2"
    offenders: list[str] = []
    for py in anu_v2_dir.rglob("*.py"):
        rel = py.relative_to(_WORKSPACE_ROOT).as_posix()
        # 자기 자신 + 테스트 파일은 제외 (테스트는 import 함)
        if "owner_trigger_pat" in rel:
            continue
        if "/tests/" in rel:
            continue
        try:
            text = py.read_text(encoding="utf-8")
        except OSError:
            continue
        if "OWNER_GEMINI_TRIGGER_PAT" in text:
            offenders.append(rel)
    # 위반 0 — OWNER PAT env 이름은 owner_trigger_pat.py 에서만 정의/사용.
    assert offenders == [], (
        f"OWNER_GEMINI_TRIGGER_PAT env name leaked to: {offenders}"
    )
