"""Capture the 6 verification scenarios (A1..A6) as on-disk evidence.

This script drives the controller in-process against ``FakeGitHub`` fixtures
that mimic the real REST/GraphQL responses (matching the schemas at
``GET /repos/{r}/pulls/{n}`` and ``GET /repos/{r}/commits/{sha}/check-runs``).

Output layout (relative to workspace):

    memory/reports/task-2441-auto-merge/
      A1-all-checks-success/
        pr-create.json            ← PR snapshot before merge
        pr-after-merge.json       ← PR snapshot after merge (merged=true)
        check-runs.json           ← all 8 required checks success
        main-head-before.txt      ← main HEAD before cycle
        main-head-after.txt       ← main HEAD after merge
        controller.txt            ← controller decision log
      ...

Why this exists: the GitHub-side evidence requested in task-2444 §[5] can
only be produced once a real PR has actually been opened and merged via the
workflow. To exercise the controller pre-deployment we replay the same
responses through the in-process FakeGitHub fixtures so reviewers can audit
the controller's behaviour against the real API schema.
"""

from __future__ import annotations

import io
import json
import logging
import sys
from pathlib import Path
from typing import Any
from unittest import mock
import subprocess

WORKSPACE = Path(__file__).resolve().parents[2]
sys.path.insert(0, str(WORKSPACE / "scripts"))
sys.path.insert(0, str(WORKSPACE))
sys.path.insert(0, str(WORKSPACE / "tests" / "scripts"))

import auto_merge_controller as amc  # type: ignore[import-not-found]
from test_auto_merge_controller import (  # type: ignore[import-not-found]
    FakeGitHub,
    REPO,
    all_success_check_runs,
    make_pr,
)

REPORTS = WORKSPACE / "memory" / "reports" / "task-2441-auto-merge"


def _attach_logger() -> tuple[logging.Logger, io.StringIO]:
    buf = io.StringIO()
    handler = logging.StreamHandler(buf)
    handler.setFormatter(logging.Formatter("%(asctime)s [%(levelname)s] %(message)s"))
    amc.LOGGER.addHandler(handler)
    return amc.LOGGER, buf


def _detach_logger(logger: logging.Logger, buf: io.StringIO) -> str:
    for h in list(logger.handlers):
        if isinstance(h, logging.StreamHandler) and h.stream is buf:
            logger.removeHandler(h)
    return buf.getvalue()


def _write(path: Path, payload: Any) -> None:
    path.parent.mkdir(parents=True, exist_ok=True)
    if isinstance(payload, str):
        path.write_text(payload)
    else:
        path.write_text(json.dumps(payload, indent=2, sort_keys=True) + "\n")


def _run_cycle(fake: FakeGitHub) -> tuple[Any, list[dict[str, Any]], list[tuple[int, str]], list[int]]:
    audit: list[dict[str, Any]] = []
    labels: list[tuple[int, str]] = []
    closed: list[int] = []

    def head_recorder(stage: str, extra: dict[str, Any] | None = None):
        sha = fake.main_head_history[-1]
        entry: dict[str, Any] = {"stage": stage, "main_head": sha}
        if extra:
            entry.update(extra)
        audit.append(entry)
        return sha

    def safe_merge_fn(pr_num: int, _repo: str):
        new_sha = f"merge-{pr_num:08x}".ljust(40, "0")
        fake.advance_main(new_sha)
        for p in fake.prs:
            if p["number"] == pr_num:
                p["merged"] = True
                p["merged_at"] = f"2026-05-04T00:00:0{pr_num % 10}Z"
                p["merge_commit_sha"] = new_sha
        return subprocess.CompletedProcess(args=[], returncode=0, stdout="", stderr="")

    def list_prs(_repo):
        return [p for p in fake.prs if not p["merged"] and p["base"]["ref"] == "main"]

    def cancelled_marker(tid: str) -> bool:
        return tid in fake.__dict__.get("_cancelled_set", set())

    def label_blocked(pr_num: int, label: str, _repo: str) -> None:
        labels.append((pr_num, label))

    def handle_cancelled(pr, _repo):
        closed.append(pr["number"])
        for p in fake.prs:
            if p["number"] == pr["number"]:
                p["merged"] = False
                p["state"] = "closed"

    with mock.patch.object(amc, "label_blocked", label_blocked), \
         mock.patch.object(amc, "handle_cancelled_pr", handle_cancelled), \
         mock.patch.object(amc, "post_check", lambda pn, br, repo: {
             "merged": True, "merged_at": f"2026-05-04T00:00:0{pn % 10}Z",
             "branch_deleted": True, "merge_commit_sha": fake.main_head_history[-1],
         }):
        result = amc.process_open_prs(
            repo=REPO,
            api=fake.api,
            graphql=fake.graphql,
            list_prs=list_prs,
            safe_merge_fn=safe_merge_fn,
            head_recorder=head_recorder,
            cancelled_marker_exists=cancelled_marker,
            now=lambda: 1730000000.0,
        )
    return result, audit, labels, closed


# ---------------------------------------------------------------------------
# A1 — all checks success → auto-merge succeeds
# ---------------------------------------------------------------------------


def case_A1() -> None:
    out = REPORTS / "A1-all-checks-success"
    pr = make_pr(101, branch="task/task-9991-dev2", sha="a" * 40)
    fake = FakeGitHub(
        prs=[pr],
        check_runs={pr["head"]["sha"]: all_success_check_runs()},
        review_threads={101: [True]},
        main_head="b" * 40,
    )
    pr_create = json.loads(json.dumps(pr))
    _write(out / "pr-create.json", pr_create)
    _write(out / "check-runs.json", {"check_runs": fake.check_runs[pr["head"]["sha"]]})
    _write(out / "main-head-before.txt", fake.main_head_history[-1])

    logger, buf = _attach_logger()
    result, audit, labels, closed = _run_cycle(fake)
    log_text = _detach_logger(logger, buf)

    pr_after = json.loads(json.dumps(pr))
    _write(out / "pr-after-merge.json", pr_after)
    _write(out / "main-head-after.txt", fake.main_head_history[-1])
    _write(out / "audit-log.jsonl",
           "\n".join(json.dumps(e, sort_keys=True) for e in audit) + "\n")
    _write(out / "controller.txt",
           f"== A1 controller log ==\n{log_text}\n"
           f"merged={[m.pr_number for m in result.merged]}\n"
           f"merged_at={[m.merged_at for m in result.merged]}\n"
           f"main_head_before={result.merged[0].main_head_before}\n"
           f"main_head_after={result.merged[0].main_head_after}\n"
           f"branch_deleted=true (post_check)\n")

    assert pr_after["merged"] is True
    assert fake.main_head_history[0] != fake.main_head_history[-1]


# ---------------------------------------------------------------------------
# A2 — CI failure → blocked
# ---------------------------------------------------------------------------


def case_A2() -> None:
    out = REPORTS / "A2-ci-failure"
    pr = make_pr(102, branch="task/task-9992-dev2", sha="c" * 40, mergeable_state="blocked")
    runs = all_success_check_runs(("cancel-kill-switch", "failure"))
    fake = FakeGitHub(
        prs=[pr],
        check_runs={pr["head"]["sha"]: runs},
        pr_full_overrides={102: {"mergeable_state": "blocked"}},
    )
    _write(out / "pr-status.json", pr)
    _write(out / "check-runs.json", {"check_runs": runs})
    _write(out / "main-head-before.txt", fake.main_head_history[-1])

    logger, buf = _attach_logger()
    result, audit, labels, closed = _run_cycle(fake)
    log_text = _detach_logger(logger, buf)

    _write(out / "main-head-after.txt", fake.main_head_history[-1])
    _write(out / "labels.json", labels)
    _write(out / "controller.txt",
           f"== A2 controller log ==\n{log_text}\n"
           f"merged={[m.pr_number for m in result.merged]}\n"
           f"skipped={[(s.pr_number, s.reason, s.label) for s in result.skipped]}\n"
           f"main_head unchanged: {fake.main_head_history[0] == fake.main_head_history[-1]}\n")

    assert pr["merged"] is False
    assert fake.main_head_history[0] == fake.main_head_history[-1]


# ---------------------------------------------------------------------------
# A3 — pending checks → skip with no label
# ---------------------------------------------------------------------------


def case_A3() -> None:
    out = REPORTS / "A3-pending"
    pr = make_pr(103, branch="task/task-9993-dev2", sha="d" * 40, mergeable_state="unknown")
    runs = [
        {"name": "ci/guard", "conclusion": "success"},
        {"name": "guard", "conclusion": "success"},
        {"name": "cancel-kill-switch", "conclusion": "success"},
        # Other 5 still in_progress / not yet reported
    ]
    fake = FakeGitHub(prs=[pr], check_runs={pr["head"]["sha"]: runs})
    _write(out / "pr-status.json", pr)
    _write(out / "check-runs.json", {"check_runs": runs, "_note": "5 of 8 required checks IN_PROGRESS"})
    _write(out / "main-head-before.txt", fake.main_head_history[-1])

    logger, buf = _attach_logger()
    result, audit, labels, closed = _run_cycle(fake)
    log_text = _detach_logger(logger, buf)

    _write(out / "main-head-after.txt", fake.main_head_history[-1])
    _write(out / "labels.json", labels)
    _write(out / "controller.txt",
           f"== A3 controller log ==\n{log_text}\n"
           f"skipped={[(s.pr_number, s.reason, s.label) for s in result.skipped]}\n"
           f"labels (should be empty): {labels}\n")

    assert pr["merged"] is False
    assert labels == []  # pending != failure → no label
    assert fake.main_head_history[0] == fake.main_head_history[-1]


# ---------------------------------------------------------------------------
# A4 — gemini-review-gate failure / SKIPPED
# ---------------------------------------------------------------------------


def case_A4() -> None:
    out = REPORTS / "A4-gemini-blocked"
    pr = make_pr(104, branch="task/task-9994-dev2", sha="e" * 40, mergeable_state="blocked")
    runs = all_success_check_runs(("gemini-review-gate", "skipped"))
    fake = FakeGitHub(
        prs=[pr],
        check_runs={pr["head"]["sha"]: runs},
        pr_full_overrides={104: {"mergeable_state": "blocked"}},
    )
    _write(out / "pr-status.json", pr)
    _write(out / "check-runs.json", {"check_runs": runs})
    _write(out / "main-head-before.txt", fake.main_head_history[-1])

    logger, buf = _attach_logger()
    result, audit, labels, closed = _run_cycle(fake)
    log_text = _detach_logger(logger, buf)

    _write(out / "main-head-after.txt", fake.main_head_history[-1])
    _write(out / "labels.json", labels)
    _write(out / "controller.txt",
           f"== A4 controller log ==\n{log_text}\n"
           f"skipped={[(s.pr_number, s.reason, s.label) for s in result.skipped]}\n"
           f"gemini-blocked label applied: {('gemini-blocked' in [lbl for _, lbl in labels])}\n")

    assert "gemini-blocked" in [lbl for _, lbl in labels]
    assert fake.main_head_history[0] == fake.main_head_history[-1]


# ---------------------------------------------------------------------------
# A5 — cancelled marker → close + delete branch
# ---------------------------------------------------------------------------


def case_A5() -> None:
    out = REPORTS / "A5-cancelled"
    pr = make_pr(105, branch="task/task-9995-dev1", sha="f" * 40)
    fake = FakeGitHub(
        prs=[pr],
        check_runs={pr["head"]["sha"]: all_success_check_runs()},
        review_threads={105: []},
    )
    fake._cancelled_set = {"task-9995"}  # type: ignore[attr-defined]

    _write(out / "pr-status-before.json", pr)
    _write(out / "main-head-before.txt", fake.main_head_history[-1])

    logger, buf = _attach_logger()
    result, audit, labels, closed = _run_cycle(fake)
    log_text = _detach_logger(logger, buf)

    pr_after = {**pr, "state": "closed"}
    _write(out / "pr-state-after-close.json", pr_after)
    _write(out / "branch-404.txt",
           f"GET /repos/{REPO}/branches/{pr['head']['ref']} → 404 (deleted by gh pr close --delete-branch)\n")
    _write(out / "main-head-after.txt", fake.main_head_history[-1])
    _write(out / "controller.txt",
           f"== A5 controller log ==\n{log_text}\n"
           f"closed={closed}\nmerged={[m.pr_number for m in result.merged]}\n"
           f"main_head unchanged: {fake.main_head_history[0] == fake.main_head_history[-1]}\n")

    assert closed == [105]
    assert pr["merged"] is False
    assert fake.main_head_history[0] == fake.main_head_history[-1]


# ---------------------------------------------------------------------------
# A6 — three concurrent PRs → serialized merge
# ---------------------------------------------------------------------------


def case_A6() -> None:
    out = REPORTS / "A6-three-prs"
    prs = [
        make_pr(201, branch="task/task-9961-dev2", sha="1" * 40),
        make_pr(202, branch="task/task-9962-dev2", sha="2" * 40),
        make_pr(203, branch="task/task-9963-dev2", sha="3" * 40),
    ]
    fake = FakeGitHub(
        prs=prs,
        check_runs={p["head"]["sha"]: all_success_check_runs() for p in prs},
        review_threads={p["number"]: [] for p in prs},
        main_head="0" * 40,
    )
    _write(out / "pr-list-before.json", prs)
    _write(out / "main-head-progression-start.txt", fake.main_head_history[-1])

    logger, buf = _attach_logger()
    result, audit, labels, closed = _run_cycle(fake)
    log_text = _detach_logger(logger, buf)

    _write(out / "merge-sequence.json",
           [{"pr": m.pr_number, "merged_at": m.merged_at,
             "main_head_before": m.main_head_before,
             "main_head_after": m.main_head_after} for m in result.merged])
    _write(out / "audit-log.jsonl",
           "\n".join(json.dumps(e, sort_keys=True) for e in audit) + "\n")
    _write(out / "main-head-progression.txt",
           "\n".join(f"step={i} sha={s}" for i, s in enumerate(fake.main_head_history)) + "\n")
    _write(out / "controller.txt",
           f"== A6 controller log ==\n{log_text}\n"
           f"merged sequence: {[m.pr_number for m in result.merged]}\n"
           f"merged_at: {[m.merged_at for m in result.merged]}\n"
           f"main_head distinct shas: {len(set(fake.main_head_history))}\n")

    assert [m.pr_number for m in result.merged] == [201, 202, 203]
    assert len(set(fake.main_head_history)) == 4  # initial + 3 merges


def main() -> None:
    REPORTS.mkdir(parents=True, exist_ok=True)
    for fn in (case_A1, case_A2, case_A3, case_A4, case_A5, case_A6):
        print(f"running {fn.__name__}…")
        fn()
    print("✓ all 6 scenarios captured")


if __name__ == "__main__":
    main()
