"""test_branch_ref_allocator_2553plus9.py — task-2553+9 regression.

§8 필수 16 + §9-R.5 추가 6 = **총 22**. branch-collision-safe activation
refinement: collision-safe allocator (read-only, loop-until-clean, 강제
reset 0) + runner 4-primitive isolated-worktree 재타겟 + fail-closed
preflight + live workspace 불변.

전부 부작용 0 — collision 시뮬레이션은 **tmp sandbox git repo** 에서만
수행하고 live workspace 는 정적(AST/grep) + read-only 로만 검사한다.
실 git mutation / PR open / live 변형 0.
"""

from __future__ import annotations

import ast
import json
import subprocess
import sys
from pathlib import Path
from unittest import mock

import pytest

WS = Path(__file__).resolve().parents[2]
if str(WS) not in sys.path:
    sys.path.insert(0, str(WS))

from anu_v3 import branch_ref_allocator as BRA  # noqa: E402
from anu_v3.branch_ref_allocator import (  # noqa: E402
    STATUS_ALLOCATED,
    STATUS_HOLD,
    allocate_branch,
)
from anu_v3 import pre_authorized_activation_runner as RUN  # noqa: E402
from anu_v3.pre_authorized_evidence_bundle_builder import (  # noqa: E402
    TASK_2553P1_EFFECTIVE_DIFF_6,
)

ALLOC_SRC = WS / "anu_v3" / "branch_ref_allocator.py"
RUNNER_SRC = WS / "anu_v3" / "pre_authorized_activation_runner.py"
CLI_SRC = WS / "scripts" / "run_branch_ref_allocator.py"
SCHEMA = WS / "schemas" / "branch_allocation_provenance.schema.json"
FIXDIR = WS / "memory" / "fixtures"
BASE = "task/task-2553p1-f1-clean-replacement"


def _git(repo, *args):
    return subprocess.run(
        ["git", "-C", str(repo), *args],
        capture_output=True, text=True, check=True,
    ).stdout.strip()


@pytest.fixture()
def sandbox(tmp_path):
    """격리 sandbox: main + 1 commit. live workspace 절대 무참조."""
    repo = tmp_path / "repo"
    repo.mkdir()
    _git(repo, "init", "-q", "-b", "main")
    _git(repo, "config", "user.email", "t@t")
    _git(repo, "config", "user.name", "t")
    (repo / "f.txt").write_text("x\n")
    _git(repo, "add", "-A")
    _git(repo, "commit", "-qm", "init")
    sha = _git(repo, "rev-parse", "HEAD")
    # bare 'origin' remote (remote-ref 시나리오용)
    origin = tmp_path / "origin.git"
    subprocess.run(
        ["git", "init", "-q", "--bare", str(origin)], check=True
    )
    _git(repo, "remote", "add", "origin", str(origin))
    return repo, sha, origin


# ─────────────────────────────────────────────────────────────────────────────
# §8 필수 regression 16
# ─────────────────────────────────────────────────────────────────────────────
def test_01_target_checked_out_live_allocator_new_branch_no_reset(sandbox):
    """1. target branch 가 live workspace 에 checked-out → allocator 가
    새 branch 할당, 기존 ref reset 0."""
    repo, sha, _ = sandbox
    _git(repo, "branch", BASE)
    _git(repo, "checkout", "-q", BASE)
    ref_before = _git(repo, "rev-parse", f"refs/heads/{BASE}")
    r = allocate_branch(repo, BASE, base_sha=sha)
    assert r["status"] == STATUS_ALLOCATED
    assert r["collision_detected"] is True
    assert r["sources_checked"]["worktree"] is True
    assert r["allocated_branch_name"] != BASE
    assert r["allocated_branch_name"].startswith(BASE + "-")
    assert r["chosen_strategy"] == "suffixed"
    # 기존 live-checked-out branch ref 불변 (reset 0)
    assert _git(repo, "rev-parse", f"refs/heads/{BASE}") == ref_before


def test_02_target_local_not_checkedout_safe_allocation(sandbox):
    """2. target local 존재·미checkout → 정책상 안전 회피 allocation."""
    repo, sha, _ = sandbox
    _git(repo, "branch", BASE)  # local ref, but HEAD on main
    r = allocate_branch(repo, BASE, base_sha=sha)
    assert r["status"] == STATUS_ALLOCATED
    assert r["sources_checked"]["local"] is True
    assert r["sources_checked"]["worktree"] is False
    assert r["allocated_branch_name"] != BASE
    assert r["chosen_strategy"] == "suffixed"


def test_03_target_remote_collision_safe_suffix(sandbox):
    """3. target remote 존재 → collision-safe suffix allocation."""
    repo, sha, _ = sandbox
    _git(repo, "branch", BASE)
    _git(repo, "push", "-q", "origin", BASE)
    _git(repo, "branch", "-D", BASE)  # local 제거 → remote-only 충돌
    r = allocate_branch(repo, BASE, base_sha=sha)
    assert r["status"] == STATUS_ALLOCATED
    assert r["sources_checked"]["remote"] is True
    assert r["sources_checked"]["local"] is False
    assert r["chosen_strategy"] == "suffixed"
    assert r["allocated_branch_name"] != BASE


def test_04_no_collision_deterministic_name(sandbox):
    """4. 미충돌 → deterministic base name (suffix 0)."""
    repo, sha, _ = sandbox
    r = allocate_branch(repo, BASE, base_sha=sha)
    assert r["status"] == STATUS_ALLOCATED
    assert r["collision_detected"] is False
    assert r["allocated_branch_name"] == BASE
    assert r["chosen_strategy"] == "base"
    assert r["runid"] is None


def test_05_provenance_recorded_schema_1to1(sandbox):
    """5. branch allocation provenance 기록 (schema 7-field 1:1, 9-R.6)."""
    repo, sha, _ = sandbox
    r = allocate_branch(repo, BASE, base_sha=sha)
    sch = json.loads(SCHEMA.read_text())
    for f in sch["required"]:
        assert f in r, f"provenance 누락 필드: {f}"
    assert set(r["sources_checked"]) == {"worktree", "local", "remote"}
    assert r["base_sha"] == sha
    assert r["chosen_strategy"] in ("base", "suffixed")


def test_06_live_workspace_head_unchanged(sandbox):
    """6. allocate 전후 sandbox(=live 대역) HEAD 불변."""
    repo, sha, _ = sandbox
    h0 = _git(repo, "rev-parse", "HEAD")
    allocate_branch(repo, BASE, base_sha=sha)
    assert _git(repo, "rev-parse", "HEAD") == h0


def test_07_live_workspace_branch_ref_unchanged(sandbox):
    """7. allocate 전후 branch ref 집합 불변 (allocator 가 ref 생성 0)."""
    repo, sha, _ = sandbox
    _git(repo, "branch", BASE)
    refs0 = _git(repo, "for-each-ref", "--format=%(refname) %(objectname)")
    allocate_branch(repo, BASE, base_sha=sha)
    assert _git(
        repo, "for-each-ref", "--format=%(refname) %(objectname)"
    ) == refs0


def test_08_same_branch_push_impossible():
    """8. same/source-branch push 불가 — runner push primitive 가드 정적 존재."""
    src = RUNNER_SRC.read_text()
    assert "task/task-2553-dev5" in src and "push 거부" in src
    # allocator 는 push verb 자체 부재 (read-only 화이트리스트)
    assert "push" not in BRA.READ_ONLY_GIT_VERBS


def test_09_pr102_preserved_no_mutation():
    """9. PR #102 원본 mutation 경로 부재 (allocator/runner 정적)."""
    for p in (ALLOC_SRC, RUNNER_SRC):
        s = p.read_text().lower()
        assert "pr 102" not in s and "pull/102" not in s
        assert "gh pr edit" not in s and "gh pr close" not in s


def test_10_expected_files_six_exact():
    """10. effective_diff = 정확 6 파일 (constant 무결 + runner commit guard)."""
    assert len(set(TASK_2553P1_EFFECTIVE_DIFF_6)) == 6
    src = RUNNER_SRC.read_text()
    assert "6 effective-diff 파일과 정확 일치하지 않음" in src
    assert "TASK_2553P1_EFFECTIVE_DIFF_6" in src


def test_11_f2_unchanged_static():
    """11. F2(token transport) 파일 변경 경로 부재."""
    s = (ALLOC_SRC.read_text() + RUNNER_SRC.read_text()).lower()
    assert "token_transport" not in s and "owner_trigger_token" not in s


def test_12_phase3_mqe_unchanged_static():
    """12. phase3 / merge_queue_executor 변경 경로 부재."""
    for p in (ALLOC_SRC, RUNNER_SRC):
        s = p.read_text()
        assert "merge_queue_executor" not in s
        assert "phase3" not in s.lower()


def test_13_merge_path_unreachable():
    """13. merge 호출 경로 정적 부재 (allocator/runner AST)."""
    for p in (ALLOC_SRC, RUNNER_SRC):
        tree = ast.parse(p.read_text())
        for node in ast.walk(tree):
            if isinstance(node, ast.Call):
                lits = " ".join(
                    a.value.lower()
                    for a in ast.walk(node)
                    if isinstance(a, ast.Constant) and isinstance(a.value, str)
                ).replace(" ", "")
                assert "ghprmerge" not in lits and "prmerge" not in lits
                assert "--auto" not in lits
                assert "merge_pull_request" not in lits


def test_14_credential_api_beyond_pr_open_unreachable():
    """14. PR-open 외 credential/API 호출 부재 (gh pr create 1종만)."""
    tree = ast.parse(RUNNER_SRC.read_text())
    gh_subcmds = set()
    for node in ast.walk(tree):
        if isinstance(node, ast.Call):
            lits = [
                a.value for a in ast.walk(node)
                if isinstance(a, ast.Constant) and isinstance(a.value, str)
            ]
            if "gh" in lits:
                gh_subcmds.add(tuple(lits[:3]))
    for t in gh_subcmds:
        assert t[:3] == ("gh", "pr", "create"), f"비인가 gh 호출: {t}"


def test_15_callback_registered_mechanism_present():
    """15. callback (a) 등록 메커니즘 — collector_handoff/adjudication 존재."""
    assert hasattr(RUN, "collector_handoff")
    assert hasattr(RUN, "adjudication_hook")
    h = RUN.collector_handoff({"task_id": "task-2553+9", "status": "X"})
    assert h["authority"] == "collector_only" and h["no_write"] is True


def test_16_codex_audit_adjudication_loop_completes():
    """16. Codex audit / ANU-Codex adjudication loop 종결 산출."""
    adj = RUN.adjudication_hook(
        {"task_id": "task-2553+9", "status": RUN.STATUS_DRY_RUN_VERIFIED},
        {"verdict": "PASS", "unresolved_high": 0},
    )
    assert adj["codex_audit_attached"] is True
    assert adj["auto_converge_eligible"] is True
    assert adj["critical_7_detected"] is False


# ─────────────────────────────────────────────────────────────────────────────
# §9-R.5 추가 regression 6 (17~22)
# ─────────────────────────────────────────────────────────────────────────────
def test_17_live_repo_destructive_ops_static_absent():
    """17. live-repo checkout -B / reset / clean / stash / rm / unlink /
    rmtree 정적 부재 (isolated wt 외). AST + grep."""
    import os
    import shutil  # noqa: F401 — 부재 증명용 (사용 0 이어야 함)

    runner = RUNNER_SRC.read_text()
    alloc = ALLOC_SRC.read_text()
    # rmtree/unlink 등 FS 파괴 호출 텍스트 부재
    for bad in ("shutil.rmtree", "os.unlink", "os.remove", "Path.unlink", ".rmdir("):
        assert bad not in runner, f"runner 파괴 op: {bad}"
        assert bad not in alloc, f"allocator 파괴 op: {bad}"
    # git 파괴 verb 가 isolated worktree(-C <iso_path>) 컨텍스트 밖에서 부재.
    # checkout -B / reset --hard / clean -fd / stash 는 live repo 대상 부재.
    for tree_src, name in ((ast.parse(runner), "runner"),):
        for node in ast.walk(tree_src):
            if isinstance(node, ast.Call):
                lits = [
                    a.value for a in ast.walk(node)
                    if isinstance(a, ast.Constant) and isinstance(a.value, str)
                ]
                joined = " ".join(lits)
                if "checkout" in lits and "-B" in lits:
                    pytest.fail(f"{name}: live checkout -B 호출 잔존: {lits}")
                if "reset" in lits and any("--hard" in x for x in lits):
                    pytest.fail(f"{name}: reset --hard 호출 잔존")
                if "stash" in lits or ("clean" in lits and "-fd" in joined):
                    pytest.fail(f"{name}: stash/clean 호출 잔존")
    assert os  # silence


def test_18_allocator_loop_until_clean_regenerates(sandbox):
    """18. 1차 후보 충돌 → 재생성 → 전 source absent 확인 (확률 방어 금지)."""
    repo, sha, _ = sandbox
    _git(repo, "branch", BASE)  # base 충돌
    seq = iter(["dup0000dup000", "dup0000dup000", "cleanrunid999"])
    # 1차/2차 runid 로 만든 후보를 미리 local ref 로 점유시켜 충돌 강제
    _git(repo, "branch", f"{BASE}-dup0000dup000")
    r = allocate_branch(
        repo, BASE, base_sha=sha, runid_factory=lambda: next(seq)
    )
    assert r["status"] == STATUS_ALLOCATED
    assert r["allocated_branch_name"] == f"{BASE}-cleanrunid999"
    assert r["attempts"] >= 2  # 최소 1회 재생성
    # 9-R.6: sources_checked = base name 충돌 출처 (local 점유) 기록
    assert r["sources_checked"]["local"] is True
    # 최종 allocated name 은 loop 보장상 3-source 전부 absent (clean)
    from anu_v3.branch_ref_allocator import (
        list_checked_out_branches, local_ref_exists, remote_ref_exists,
    )
    alloc_name = r["allocated_branch_name"]
    assert alloc_name not in list_checked_out_branches(repo)
    assert local_ref_exists(repo, alloc_name) is False
    assert remote_ref_exists(repo, alloc_name) is False


def test_18b_loop_exhausted_holds(sandbox):
    """18b. max_attempts 내 clean 미발견 → HOLD (강제 reset 0)."""
    repo, sha, _ = sandbox
    _git(repo, "branch", BASE)
    _git(repo, "branch", f"{BASE}-x")  # 모든 후보가 -x 로 충돌하게
    r = allocate_branch(
        repo, BASE, base_sha=sha, max_attempts=3, runid_factory=lambda: "x"
    )
    assert r["status"] == STATUS_HOLD
    assert r["allocated_branch_name"] == ""
    assert any("clean unique branch" in x for x in r["hold_reasons"])


def test_19_pre_post_live_head_assert_equal(sandbox):
    """19. pre/post live workspace HEAD sha assertEqual (직접 불변 증명)."""
    repo, sha, _ = sandbox
    _git(repo, "branch", BASE)
    _git(repo, "checkout", "-q", BASE)
    pre = _git(repo, "rev-parse", "HEAD")
    for _ in range(3):
        allocate_branch(repo, BASE, base_sha=sha)
    post = _git(repo, "rev-parse", "HEAD")
    assert pre == post
    assert _git(repo, "branch", "--show-current") == BASE


def test_20_isolated_worktree_finally_removed_on_exception(sandbox):
    """20. isolated worktree 실패·예외 시에도 finally 로 제거 (live 무영향)."""
    repo, sha, _ = sandbox
    calls = {"remove": []}

    def fake_create(repo_, iso_path, base_sha, allocated):
        Path(iso_path).mkdir(parents=True, exist_ok=True)
        return allocated

    def fake_replay(repo_, iso_path):
        raise RUN.RunnerError("simulated replay 실패")

    def fake_remove(repo_, iso_path):
        calls["remove"].append(str(iso_path))

    with mock.patch.object(RUN, "_create_isolated_worktree", fake_create), \
         mock.patch.object(RUN, "_replay_six_file_delta", fake_replay), \
         mock.patch.object(RUN, "_remove_isolated_worktree", fake_remove), \
         mock.patch.object(
             RUN, "_allocate_branch",
             lambda r, n, base_sha: {
                 "status": STATUS_ALLOCATED,
                 "allocated_branch_name": f"{n}-deadbeef0000",
                 "collision_detected": True, "runid": "deadbeef0000",
                 "hold_reasons": [],
             },
         ), \
         mock.patch.object(RUN, "_list_checked_out_branches", lambda r: {}):
        out = RUN._activate_real_write(  # type: ignore[attr-defined]
            repo=Path(repo), base={"task_id": "task-2553+9"},
            task_id="task-2553+9", new_branch=BASE, base_sha=sha,
            hold_out_path=None,
        )
    assert calls["remove"], "finally 에서 worktree 제거 미호출"
    assert out["status"] == RUN.STATUS_HOLD


def test_21_runid_uniqueness_idempotency(sandbox):
    """21. runid uniqueness — 동일 입력 재실행 시 매번 신규 suffix
    (동일 branch 재사용 → 중복 PR open 0 보장의 토대)."""
    repo, sha, _ = sandbox
    _git(repo, "branch", BASE)
    names = {
        allocate_branch(repo, BASE, base_sha=sha)["allocated_branch_name"]
        for _ in range(5)
    }
    assert len(names) == 5  # 전부 상이 (uuid4 hex12)
    assert all(n.startswith(BASE + "-") for n in names)


def test_22_allocator_read_only_no_git_write(sandbox):
    """22. allocator read-only 증명 — git-write verb 호출 0 (detection만)."""
    repo, sha, _ = sandbox
    _git(repo, "branch", BASE)
    seen = []
    real = subprocess.run

    def spy(cmd, *a, **k):
        if isinstance(cmd, (list, tuple)) and cmd and cmd[0] == "git":
            seen.append(list(cmd))
        return real(cmd, *a, **k)

    with mock.patch("anu_v3.branch_ref_allocator.subprocess.run", spy):
        allocate_branch(repo, BASE, base_sha=sha)
    write_verbs = {
        "commit", "checkout", "branch", "push", "reset", "merge",
        "clean", "stash", "rm", "add", "worktree", "tag", "fetch", "pull",
    }
    for cmd in seen:
        # cmd = ['git','-C',<path>, <verb>, ...]
        verb = cmd[3] if len(cmd) > 3 else ""
        assert verb in BRA.READ_ONLY_GIT_VERBS, f"비read-only git: {cmd}"
        assert verb not in write_verbs or verb == "worktree", (
            f"git-write verb 호출: {verb}"
        )
        if verb == "worktree":
            assert cmd[4] == "list", f"worktree 비-list 호출: {cmd}"


# ── fixture 4-시나리오 직접 구동 ───────────────────────────────────────────
@pytest.mark.parametrize("fname", [
    "branch_alloc_live_checkedout_collision",
    "branch_alloc_local_not_checkedout",
    "branch_alloc_remote_conflict",
    "branch_alloc_no_collision",
])
def test_fixture_scenarios(sandbox, fname):
    """4 fixture 시나리오 → allocator 결과가 expected 와 일치, reset 0."""
    repo, sha, _ = sandbox
    spec = json.loads((FIXDIR / f"{fname}.json").read_text())
    occ = spec["occupancy"]
    if occ["local_ref"] or occ["worktree_checked_out"] or occ["remote_ref"]:
        _git(repo, "branch", BASE)
    if occ["remote_ref"]:
        _git(repo, "push", "-q", "origin", BASE)
    if occ["worktree_checked_out"]:
        _git(repo, "checkout", "-q", BASE)
    elif not occ["local_ref"] and not occ["remote_ref"]:
        pass
    if occ["remote_ref"] and not occ["local_ref"]:
        _git(repo, "checkout", "-q", "main")
        _git(repo, "branch", "-D", BASE)
    ref_before = None
    if occ["worktree_checked_out"]:
        ref_before = _git(repo, "rev-parse", f"refs/heads/{BASE}")

    r = allocate_branch(repo, BASE, base_sha=sha)
    exp = spec["expected"]
    assert r["status"] == STATUS_ALLOCATED
    assert r["collision_detected"] == exp["collision_detected"]
    assert r["chosen_strategy"] == exp["chosen_strategy"]
    if exp["allocated_differs_from_live_checkedout"] and occ[
        "worktree_checked_out"
    ]:
        assert r["allocated_branch_name"] != BASE
        # 강제 reset 0 — 기존 live-checked-out ref 불변
        assert _git(repo, "rev-parse", f"refs/heads/{BASE}") == ref_before
