"""tests/regression/test_ci_gemini_watcher_gh_adapter_2719.py — task-2719 regression 테스트.

시나리오 매핑 (§regression 검증 1:1):
  1.  test_op_five_dispatch_accuracy         — op 5종 argv 매핑 및 반환값 정확성
  2a. test_write_op_forbidden_unknown_ops    — write/unknown op → GhWriteForbiddenError
  2b. test_write_op_forbidden_post_method    — reviews POST method → GhWriteForbiddenError
  2c. test_full_cycle_no_write_argv          — 전체 사이클 argv GET-only + github_writes==0
  3.  test_stale_owner_allow_trigger         — STALE + owner proof + dedupe=False → ALLOW_OWNER_TRIGGER, writes==0
  4.  test_fresh_clean_ci_pass_merge_ready   — FRESH + 0 escalated + CI SUCCESS → MERGE_READY_CANDIDATE
  5.  test_expected_head_mismatch_stale_head — actual_head != expected_head → HOLD_STALE_HEAD
  6.  test_scope_mismatch_unclean            — diff에 out-of-scope 경로 → HOLD_SCOPE_UNCLEAN
  7.  test_ci_failure_non_remediable         — FAILURE state non-remediable → CI_FAILED_NON_REMEDIABLE
  8.  test_validate_decision_spy             — STALE+owner 경로에서 validate_decision spy
  9.  test_auto_gemini_triage_spy            — FRESH 경로에서 triage_batch 호출 검증
  10. test_default_gh_cli_write_guard        — default_gh_cli write argv → GhWriteForbiddenError (네트워크 0)
  11. test_read_ops_constant                 — READ_OPS 정확히 5종 포함

all tests: 네트워크 호출 0, 전부 mock gh_cli.
"""

from __future__ import annotations

import json
import unittest.mock as mock
from typing import Any, Sequence

import pytest

from anu_v2.auto_gemini_triage import AutoGeminiTriage
from anu_v2.ci_gemini_watcher_gh_adapter import (
    GhWriteForbiddenError,
    READ_OPS,
    build_readonly_gh_runner,
    default_gh_cli,
    run_one_shot_dry_run,
)
from anu_v2.ci_gemini_watcher_runner import (
    TERMINAL_CI_FAILED_NON_REMEDIABLE,
    TERMINAL_GEMINI_EXTERNAL_TRIGGER_REQUIRED,
    TERMINAL_HOLD_SCOPE_UNCLEAN,
    TERMINAL_HOLD_STALE_HEAD,
    TERMINAL_MERGE_READY_CANDIDATE,
    TRIGGER_ALLOW_OWNER,
)

# ──────────────────────────────────────────────────────────────────────────────
# SHA 헬퍼 상수
# ──────────────────────────────────────────────────────────────────────────────

HEAD_A = "a" * 40  # expected head (= actual head for happy paths)
HEAD_B = "b" * 40  # alternate / stale head

# 기본 expected_files
EXPECTED_FILES = [
    "anu_v2/ci_gemini_watcher_gh_adapter.py",
    "anu_v2/auto_gemini_triage.py",
]

PR_NUMBER = 42


# ──────────────────────────────────────────────────────────────────────────────
# mock gh_cli 빌더 헬퍼
# ──────────────────────────────────────────────────────────────────────────────

def make_mock_gh_cli(
    *,
    actual_head: str = HEAD_A,
    diff_files: list[str] | None = None,
    ci_conclusion: str | None = None,  # None → no failure conclusion, all COMPLETED
    ci_status: str = "COMPLETED",
    reviews: list[dict] | None = None,
    findings: list[dict] | None = None,
    recorded_calls: list[list[str]] | None = None,
):
    """argv Sequence[str] → stdout JSON str を返す mock gh_cli 클로저를 반환.

    recorded_calls が与えられた場合は受け取った各 argv を append して GET-only 검증に 사용.
    """
    _diff_files = diff_files if diff_files is not None else list(EXPECTED_FILES)
    _reviews = reviews if reviews is not None else []
    _findings = findings if findings is not None else []

    def _gh_cli(argv: Sequence[str]) -> str:
        argv_list = list(argv)
        if recorded_calls is not None:
            recorded_calls.append(argv_list)

        # actual_head op: --json headRefOid
        if "--json" in argv_list and "headRefOid" in argv_list:
            return json.dumps({"headRefOid": actual_head})

        # diff_paths op: --json files
        if "--json" in argv_list and "files" in argv_list:
            return json.dumps({"files": [{"path": p} for p in _diff_files]})

        # ci_rollup op: --json statusCheckRollup
        if "--json" in argv_list and "statusCheckRollup" in argv_list:
            if ci_conclusion is not None:
                item = {"conclusion": ci_conclusion, "status": "COMPLETED"}
            else:
                item = {"conclusion": "SUCCESS", "status": ci_status}
            return json.dumps({"statusCheckRollup": [item]})

        # reviews op: path contains /reviews
        path_tokens = [t for t in argv_list if "/reviews" in t]
        if path_tokens:
            return json.dumps(_reviews)

        # findings op: path contains /comments
        comment_tokens = [t for t in argv_list if "/comments" in t]
        if comment_tokens:
            return json.dumps(_findings)

        # fallback: 빈 문자열 (알 수 없는 argv)
        return ""

    return _gh_cli


def make_fresh_review(commit_id: str) -> dict:
    """gemini-code-assist[bot] review fixture (FRESH용)."""
    return {
        "user": {"login": "gemini-code-assist[bot]"},
        "commit_id": commit_id,
        "state": "COMMENTED",
        "body": "ok",
    }


def make_stale_review(commit_id: str = HEAD_B) -> dict:
    """STALE용: commit_id 가 actual_head 와 다른 Gemini review."""
    return {
        "user": {"login": "gemini-code-assist[bot]"},
        "commit_id": commit_id,
        "state": "COMMENTED",
        "body": "ok",
    }


def make_finding(
    *,
    rule_id: str = "no-op",
    severity: str = "low",
    category: str = "style",
    path: str | None = None,
    body: str = "",
    title: str = "",
) -> dict:
    """finding fixture 헬퍼."""
    return {
        "rule_id": rule_id,
        "severity": severity,
        "category": category,
        "path": path or EXPECTED_FILES[0],
        "body": body,
        "title": title,
    }


# ──────────────────────────────────────────────────────────────────────────────
# 시나리오 1: op 5종 매핑 정확성
# ──────────────────────────────────────────────────────────────────────────────

def test_op_five_dispatch_accuracy():
    """build_readonly_gh_runner 직접 호출 — 5종 op 가 올바른 값 반환하고 argv 기록됨."""
    calls: list[list[str]] = []
    reviews_data = [make_fresh_review(HEAD_A)]
    findings_data = [make_finding()]

    gh_cli = make_mock_gh_cli(
        actual_head=HEAD_A,
        diff_files=EXPECTED_FILES,
        reviews=reviews_data,
        findings=findings_data,
        recorded_calls=calls,
    )
    gh_runner = build_readonly_gh_runner(gh_cli=gh_cli, owner="myorg", repo="myrepo")

    # actual_head
    head = gh_runner("actual_head", pr_number=PR_NUMBER)
    assert head == HEAD_A, f"actual_head returned {head!r}"

    # diff_paths
    paths = gh_runner("diff_paths", pr_number=PR_NUMBER)
    assert isinstance(paths, list)
    assert set(paths) == set(EXPECTED_FILES)

    # ci_rollup
    ci = gh_runner("ci_rollup", pr_number=PR_NUMBER)
    assert isinstance(ci, dict)
    assert "state" in ci
    assert "remediable" in ci
    assert ci["state"] == "SUCCESS"

    # reviews
    revs = gh_runner("reviews", method="GET", path=f"/repos/myorg/myrepo/pulls/{PR_NUMBER}/reviews")
    assert isinstance(revs, list)
    assert len(revs) == 1
    assert revs[0]["commit_id"] == HEAD_A

    # findings
    fds = gh_runner("findings", pr_number=PR_NUMBER)
    assert isinstance(fds, list)
    assert len(fds) == 1

    # argv 기록 검증: 5종 모두 호출 완료
    assert len(calls) == 5, f"expected 5 argv records, got {len(calls)}"

    # headRefOid 호출 존재 확인
    head_calls = [c for c in calls if "headRefOid" in c]
    assert len(head_calls) >= 1

    # files 호출 존재 확인
    file_calls = [c for c in calls if "files" in c]
    assert len(file_calls) >= 1

    # statusCheckRollup 호출 존재 확인
    rollup_calls = [c for c in calls if "statusCheckRollup" in c]
    assert len(rollup_calls) >= 1

    # api GET 호출 2건 (reviews + findings)
    api_calls = [c for c in calls if c and c[0] == "api"]
    assert len(api_calls) >= 2, f"expected >=2 api calls, got {len(api_calls)}"


# ──────────────────────────────────────────────────────────────────────────────
# 시나리오 2a: write/unknown op → GhWriteForbiddenError
# ──────────────────────────────────────────────────────────────────────────────

def test_write_op_forbidden_unknown_ops():
    """unknown/write op 요청 시 GhWriteForbiddenError raise."""
    gh_cli = make_mock_gh_cli()
    gh_runner = build_readonly_gh_runner(gh_cli=gh_cli)

    with pytest.raises(GhWriteForbiddenError):
        gh_runner("comment", pr_number=1)

    with pytest.raises(GhWriteForbiddenError):
        gh_runner("merge", pr_number=1)

    with pytest.raises(GhWriteForbiddenError):
        gh_runner("post_review", pr_number=1)

    with pytest.raises(GhWriteForbiddenError):
        gh_runner("unknown_op", pr_number=1)


# ──────────────────────────────────────────────────────────────────────────────
# 시나리오 2b: reviews POST method → GhWriteForbiddenError
# ──────────────────────────────────────────────────────────────────────────────

def test_write_op_forbidden_post_method():
    """reviews op 에서 method=POST 요청 시 GhWriteForbiddenError raise."""
    gh_cli = make_mock_gh_cli()
    gh_runner = build_readonly_gh_runner(gh_cli=gh_cli)

    with pytest.raises(GhWriteForbiddenError):
        gh_runner("reviews", method="POST", path="/repos/o/r/pulls/1/reviews")

    with pytest.raises(GhWriteForbiddenError):
        gh_runner("reviews", method="PATCH", path="/repos/o/r/pulls/1/reviews")

    with pytest.raises(GhWriteForbiddenError):
        gh_runner("reviews", method="DELETE", path="/repos/o/r/pulls/1/reviews")


# ──────────────────────────────────────────────────────────────────────────────
# 시나리오 2c: full cycle — 모든 argv GET-only, github_writes == 0
# ──────────────────────────────────────────────────────────────────────────────

def test_full_cycle_no_write_argv():
    """run_one_shot_dry_run 1회 실행 후 모든 argv 수집 → GET-only 검증 + github_writes == 0."""
    calls: list[list[str]] = []
    reviews = [make_fresh_review(HEAD_A)]
    findings = [make_finding(severity="low", category="style")]

    gh_cli = make_mock_gh_cli(
        actual_head=HEAD_A,
        diff_files=EXPECTED_FILES,
        reviews=reviews,
        findings=findings,
        recorded_calls=calls,
    )

    result = run_one_shot_dry_run(
        PR_NUMBER,
        HEAD_A,
        EXPECTED_FILES,
        gh_cli=gh_cli,
    )

    assert result.github_writes == 0, "github_writes must be 0"

    # 모든 api 호출에 -X GET 포함 확인
    write_subcmds = {"comment", "merge", "close", "edit", "review", "ready",
                     "create", "delete", "lock", "unlock", "reopen"}

    for argv in calls:
        # api 호출이면 -X 다음 토큰이 GET 인지 검증
        if argv and argv[0] == "api":
            assert "-X" in argv, f"api call missing -X: {argv}"
            x_idx = argv.index("-X")
            assert x_idx + 1 < len(argv), f"-X has no following token: {argv}"
            method_token = argv[x_idx + 1].upper()
            assert method_token == "GET", f"non-GET method found: {method_token!r} in {argv}"

        # write 서브커맨드 미포함 검증
        for token in argv[:2]:
            assert token.lower() not in write_subcmds, \
                f"write subcommand {token!r} found in argv: {argv}"


# ──────────────────────────────────────────────────────────────────────────────
# 시나리오 3: STALE + owner proof OK + dedupe False → ALLOW_OWNER_TRIGGER
# ──────────────────────────────────────────────────────────────────────────────

def test_stale_owner_allow_trigger():
    """STALE reviews + owner_proof(admin) + dedupe=False → terminal=GEMINI_EXTERNAL_TRIGGER_REQUIRED,
    trigger_decision=ALLOW_OWNER_TRIGGER, decision_json is not None, github_writes==0."""
    # STALE: Gemini review commit_id = HEAD_B != HEAD_A (actual)
    reviews = [make_stale_review(HEAD_B)]

    gh_cli = make_mock_gh_cli(
        actual_head=HEAD_A,
        diff_files=EXPECTED_FILES,
        reviews=reviews,
        findings=[],
    )

    owner_proof = {"is_owner": True, "admin": True, "push": True, "self_key": False}
    dedupe_checker = lambda pr, sha: False  # noqa: E731

    result = run_one_shot_dry_run(
        PR_NUMBER,
        HEAD_A,
        EXPECTED_FILES,
        gh_cli=gh_cli,
        owner_proof=owner_proof,
        dedupe_checker=dedupe_checker,
    )

    assert result.terminal == TERMINAL_GEMINI_EXTERNAL_TRIGGER_REQUIRED, \
        f"expected GEMINI_EXTERNAL_TRIGGER_REQUIRED, got {result.terminal!r}"
    assert result.trigger_decision == TRIGGER_ALLOW_OWNER, \
        f"expected ALLOW_OWNER_TRIGGER, got {result.trigger_decision!r}"
    assert result.decision_json is not None, "decision_json must not be None"
    assert result.github_writes == 0


# ──────────────────────────────────────────────────────────────────────────────
# 시나리오 4: FRESH + CI PASS + scope clean → MERGE_READY_CANDIDATE
# ──────────────────────────────────────────────────────────────────────────────

def test_fresh_clean_ci_pass_merge_ready():
    """FRESH review + 0 escalated + CI SUCCESS + scope clean → MERGE_READY_CANDIDATE."""
    reviews = [make_fresh_review(HEAD_A)]
    # style/low → dismiss_style_only → escalated 0
    findings = [make_finding(severity="low", category="style", path=EXPECTED_FILES[0])]

    gh_cli = make_mock_gh_cli(
        actual_head=HEAD_A,
        diff_files=EXPECTED_FILES,
        reviews=reviews,
        findings=findings,
    )

    result = run_one_shot_dry_run(
        PR_NUMBER,
        HEAD_A,
        EXPECTED_FILES,
        gh_cli=gh_cli,
    )

    assert result.terminal == TERMINAL_MERGE_READY_CANDIDATE, \
        f"expected MERGE_READY_CANDIDATE, got {result.terminal!r}"
    assert result.github_writes == 0
    assert result.ci_state == "SUCCESS"


# ──────────────────────────────────────────────────────────────────────────────
# 시나리오 5: expected_head mismatch → HOLD_STALE_HEAD
# ──────────────────────────────────────────────────────────────────────────────

def test_expected_head_mismatch_stale_head():
    """actual_head (HEAD_B) != expected_head (HEAD_A) → HOLD_STALE_HEAD."""
    gh_cli = make_mock_gh_cli(actual_head=HEAD_B)  # HEAD_B != HEAD_A

    result = run_one_shot_dry_run(
        PR_NUMBER,
        HEAD_A,  # expected = HEAD_A, actual = HEAD_B
        EXPECTED_FILES,
        gh_cli=gh_cli,
    )

    assert result.terminal == TERMINAL_HOLD_STALE_HEAD, \
        f"expected HOLD_STALE_HEAD, got {result.terminal!r}"
    assert result.actual_head == HEAD_B
    assert result.github_writes == 0


# ──────────────────────────────────────────────────────────────────────────────
# 시나리오 6: scope mismatch → HOLD_SCOPE_UNCLEAN
# ──────────────────────────────────────────────────────────────────────────────

def test_scope_mismatch_unclean():
    """diff_paths 에 expected_files 밖 경로 포함 → HOLD_SCOPE_UNCLEAN."""
    dirty_diff = list(EXPECTED_FILES) + ["scripts/intruder.sh"]

    gh_cli = make_mock_gh_cli(
        actual_head=HEAD_A,
        diff_files=dirty_diff,
    )

    result = run_one_shot_dry_run(
        PR_NUMBER,
        HEAD_A,
        EXPECTED_FILES,
        gh_cli=gh_cli,
    )

    assert result.terminal == TERMINAL_HOLD_SCOPE_UNCLEAN, \
        f"expected HOLD_SCOPE_UNCLEAN, got {result.terminal!r}"
    assert result.github_writes == 0


# ──────────────────────────────────────────────────────────────────────────────
# 시나리오 7: CI fail FAILURE non-remediable → CI_FAILED_NON_REMEDIABLE
# ──────────────────────────────────────────────────────────────────────────────

def test_ci_failure_non_remediable():
    """CI FAILURE non-remediable → CI_FAILED_NON_REMEDIABLE."""
    gh_cli = make_mock_gh_cli(
        actual_head=HEAD_A,
        diff_files=EXPECTED_FILES,
        ci_conclusion="FAILURE",  # 실패 결론 → state=FAILURE, remediable=False
    )

    result = run_one_shot_dry_run(
        PR_NUMBER,
        HEAD_A,
        EXPECTED_FILES,
        gh_cli=gh_cli,
    )

    assert result.terminal == TERMINAL_CI_FAILED_NON_REMEDIABLE, \
        f"expected CI_FAILED_NON_REMEDIABLE, got {result.terminal!r}"
    assert result.ci_state == "FAILURE"
    assert result.github_writes == 0


# ──────────────────────────────────────────────────────────────────────────────
# 시나리오 8: validate_decision spy
# ──────────────────────────────────────────────────────────────────────────────

def test_validate_decision_spy():
    """STALE + owner 경로에서 validate_decision 이 실제 호출됐는지 spy 검증."""
    reviews = [make_stale_review(HEAD_B)]

    gh_cli = make_mock_gh_cli(
        actual_head=HEAD_A,
        diff_files=EXPECTED_FILES,
        reviews=reviews,
        findings=[],
    )

    owner_proof = {"is_owner": True, "admin": True, "push": True, "self_key": False}

    with mock.patch(
        "anu_v2.ci_gemini_watcher_runner.validate_decision",
        wraps=__import__(
            "anu_v2.owner_trigger_decision", fromlist=["validate_decision"]
        ).validate_decision,
    ) as spy:
        result = run_one_shot_dry_run(
            PR_NUMBER,
            HEAD_A,
            EXPECTED_FILES,
            gh_cli=gh_cli,
            owner_proof=owner_proof,
            dedupe_checker=lambda pr, sha: False,
        )
        spy.assert_called()

    assert result.github_writes == 0


# ──────────────────────────────────────────────────────────────────────────────
# 시나리오 9: auto_gemini_triage 호출 spy
# ──────────────────────────────────────────────────────────────────────────────

def test_auto_gemini_triage_spy():
    """FRESH 경로에서 AutoGeminiTriage.triage_batch 가 실제 호출됐는지 spy 검증."""
    reviews = [make_fresh_review(HEAD_A)]
    findings = [make_finding(severity="low", category="style", path=EXPECTED_FILES[0])]

    gh_cli = make_mock_gh_cli(
        actual_head=HEAD_A,
        diff_files=EXPECTED_FILES,
        reviews=reviews,
        findings=findings,
    )

    # 실제 AutoGeminiTriage 인스턴스 생성 후 triage_batch 를 spy
    audit_records: list[Any] = []
    triage_instance = AutoGeminiTriage(
        audit_writer=lambda rec: audit_records.append(rec),
        task_id="task-2719",
    )

    with mock.patch.object(
        triage_instance,
        "triage_batch",
        wraps=triage_instance.triage_batch,
    ) as triage_spy:
        result = run_one_shot_dry_run(
            PR_NUMBER,
            HEAD_A,
            EXPECTED_FILES,
            gh_cli=gh_cli,
            triage=triage_instance,
        )
        triage_spy.assert_called()

    assert result.terminal == TERMINAL_MERGE_READY_CANDIDATE
    assert result.github_writes == 0


# ──────────────────────────────────────────────────────────────────────────────
# 시나리오 10: default_gh_cli GET-only argv guard (subprocess 도달 전 차단)
# ──────────────────────────────────────────────────────────────────────────────

def test_default_gh_cli_write_guard():
    """default_gh_cli 에 write argv 전달 시 GhWriteForbiddenError raise.
    subprocess 도달 전 차단 → 네트워크 0.
    """
    # pr comment (write subcommand) → GhWriteForbiddenError
    with pytest.raises(GhWriteForbiddenError):
        default_gh_cli(["pr", "comment", "1", "--body", "x"])

    # api POST → GhWriteForbiddenError
    with pytest.raises(GhWriteForbiddenError):
        default_gh_cli(["api", "-X", "POST", "/foo"])

    # api PATCH → GhWriteForbiddenError
    with pytest.raises(GhWriteForbiddenError):
        default_gh_cli(["api", "--method", "PATCH", "/bar"])

    # pr merge → GhWriteForbiddenError
    with pytest.raises(GhWriteForbiddenError):
        default_gh_cli(["pr", "merge", "1"])

    # pr close → GhWriteForbiddenError
    with pytest.raises(GhWriteForbiddenError):
        default_gh_cli(["pr", "close", "1"])


# ──────────────────────────────────────────────────────────────────────────────
# 시나리오 11: READ_OPS 상수 검증 — 정확히 5종
# ──────────────────────────────────────────────────────────────────────────────

def test_read_ops_constant():
    """READ_OPS 상수가 정확히 5종 op 을 포함하는지 검증."""
    expected_ops = {"actual_head", "diff_paths", "ci_rollup", "reviews", "findings"}
    assert isinstance(READ_OPS, frozenset), "READ_OPS must be a frozenset"
    assert len(READ_OPS) == 5, f"READ_OPS must have 5 entries, got {len(READ_OPS)}: {READ_OPS}"
    assert READ_OPS == expected_ops, f"READ_OPS mismatch: {READ_OPS!r} != {expected_ops!r}"


# ──────────────────────────────────────────────────────────────────────────────
# 시나리오 12: security-critical 보정 — GET-only guard 우회 3경로 전부 차단
# ──────────────────────────────────────────────────────────────────────────────

def test_get_only_guard_blocks_all_bypass_paths():
    """PR #165 security-critical 보정: _assert_readonly_argv 우회 3경로 차단.
    (1) 전역 플래그로 밀린 write 서브커맨드 (2) --method=POST/-XPOST 붙여쓰기
    (3) -f/-F/--field/--raw-field/--input 암묵 POST 데이터 플래그. GraphQL mutation 포함."""
    from anu_v2.ci_gemini_watcher_gh_adapter import _assert_readonly_argv

    # 정상 GET op argv 는 통과 (복수형 path 는 단수 write subcmd 와 불일치)
    for ok in (
        ["pr", "view", "165", "--json", "headRefOid"],
        ["api", "repos/o/r/pulls/165/reviews"],
        ["api", "repos/o/r/pulls/165/comments"],
        ["pr", "checks", "165"],
        ["api", "repos/o/r/pulls/165", "-X", "GET"],
    ):
        _assert_readonly_argv(ok)  # raise 없어야 함

    # 우회 3경로 + 기타 write 전부 차단
    for bad in (
        ["--repo", "o/r", "pr", "comment", "165"],         # (1) 전역 플래그 우회
        ["-R", "o/r", "pr", "merge", "165"],               # (1) 전역 플래그 우회(-R)
        ["api", "x", "--method=POST"],                     # (2) 등호
        ["api", "x", "-XPOST"],                            # (2) 붙여쓰기
        ["api", "x", "-XPATCH"],                           # (2) 붙여쓰기 PATCH
        ["api", "x", "-f", "body=hi"],                     # (3) 암묵 POST -f
        ["api", "x", "-F", "k=@file"],                     # (3) -F
        ["api", "x", "--field=body=hi"],                   # (3) --field=
        ["api", "x", "--raw-field=q=x"],                   # (3) --raw-field=
        ["api", "x", "--input=-"],                         # (3) --input=
        ["api", "graphql", "-f", "query=mutation{ x }"],   # GraphQL mutation
        ["pr", "comment", "165", "--body", "x"],           # 기존 write
        ["api", "x", "-X", "DELETE"],                      # 분리 DELETE
    ):
        with pytest.raises(GhWriteForbiddenError):
            _assert_readonly_argv(bad)
