# -*- coding: utf-8 -*-
"""tests.regression.test_ci_watch_handoff_runner — task-2642.

회장 verbatim (2026-05-23 19:38 KST) — CI_WATCH_HANDOFF state machine + terminal
classification 1:1 회귀.

Layer A / NO-CRON: subprocess / cokacdir / merge / cron / live gh 호출 0.
모든 inject 는 in-memory mock (ci_status_fn / gemini_router_call_fn /
auto_remediation_fn / callback_send_fn).
"""
from __future__ import annotations

import json
from typing import Iterator

import pytest

from utils.ci_watch_handoff_audit import (
    CiWatchHandoffAudit,
    EVENT_AUTO_REMEDIATE,
    EVENT_CALLBACK_FIRED,
    EVENT_HANDOFF_RECEIVED,
    EVENT_OWNER_NUDGE,
    EVENT_POLL_TICK,
    EVENT_TERMINAL_REACHED,
)
from utils.ci_watch_handoff_runner import (
    ANU_COLLECTOR_KEY,
    CALLBACK_ENVELOPE_BYTE_LIMIT,
    CANONICAL_ROOT,
    CI_STATUS_FAIL,
    CI_STATUS_PASS,
    CI_STATUS_PENDING,
    CiWatchHandoffRunner,
    CIStatusSnapshot,
    REMEDIATE_APPLIED,
    REMEDIATE_FORBIDDEN_HIT,
    REMEDIATE_LOOP_BOUNDARY,
    REMEDIATE_NON_REMEDIABLE,
    ROUTER_CHAIR_UI_FALLBACK,
    ROUTER_FRESH,
    ROUTER_NUDGE_DEDUPED,
    ROUTER_NUDGE_FAILED,
    ROUTER_NUDGE_PERMISSION_DENIED,
    ROUTER_NUDGE_POSTED,
    ROUTER_NOT_GEMINI_TRIGGER,
    ROUTER_STALE,
    RouterCallResult,
    RunnerContractError,
)
from utils.ci_watch_handoff_schema import (
    TERMINAL_CHAIR_REQUIRED,
    TERMINAL_CI_FAILED_NON_REMEDIABLE,
    TERMINAL_GEMINI_EXTERNAL_TRIGGER_STALE,
    TERMINAL_LOOP_BOUNDARY,
    TERMINAL_MERGE_READY,
)


HEAD = "a" * 40


def _canonical_handoff(**overrides) -> dict:
    base = {
        "pr_number": 146,
        "head_sha": HEAD,
        "branch": "task/task-2642-runner",
        "expected_files": [
            "utils/ci_watch_handoff_runner.py",
        ],
        "forbidden_paths": [
            "scripts/finish-task.sh",
            "cokacdir/**",
        ],
        "watcher_owner": "dev6-cron-watcher",
        "max_watch_minutes": 120,
        "poll_interval_seconds": 60,
        "gemini_nudge_policy": {
            "enabled": True,
            "max_nudges_per_pr_head": 1,
            "on_403": "report",
        },
        "auto_remediation_policy": {
            "enabled": True,
            "allow_severities": ["medium", "style", "quality", "non-critical-high"],
        },
        "callback_on_terminal_state": True,
        "terminal_states": [
            "MERGE_READY",
            "CHAIR_REQUIRED",
            "GEMINI_EXTERNAL_TRIGGER_STALE",
            "CI_FAILED_NON_REMEDIABLE",
            "LOOP_BOUNDARY",
        ],
    }
    base.update(overrides)
    return base


def _sequence_ci(values: list[CIStatusSnapshot]):
    it: Iterator[CIStatusSnapshot] = iter(values)

    def fn(_handoff: dict) -> CIStatusSnapshot:
        return next(it)

    return fn


def _sequence_router(values: list[RouterCallResult]):
    it: Iterator[RouterCallResult] = iter(values)

    def fn(_handoff: dict) -> RouterCallResult:
        return next(it)

    return fn


def _sequence_remediate(values: list[str]):
    it = iter(values)

    def fn(_handoff: dict, _ci_snap: CIStatusSnapshot) -> str:
        return next(it)

    return fn


def _build_runner(
    tmp_path,
    *,
    ci_seq: list[CIStatusSnapshot],
    router_seq: list[RouterCallResult] | None = None,
    remediate_seq: list[str] | None = None,
    callback_capture: list | None = None,
    max_polls: int = 20,
    loop_boundary_attempts: int = 3,
) -> tuple[CiWatchHandoffRunner, CiWatchHandoffAudit]:
    audit = CiWatchHandoffAudit(tmp_path)

    def callback_send_fn(envelope: str):
        if callback_capture is not None:
            callback_capture.append(envelope)
        return len(envelope.encode("utf-8"))

    runner = CiWatchHandoffRunner(
        workspace_root=tmp_path,
        ci_status_fn=_sequence_ci(ci_seq),
        gemini_router_call_fn=_sequence_router(router_seq or []),
        auto_remediation_fn=_sequence_remediate(remediate_seq) if remediate_seq else None,
        callback_send_fn=callback_send_fn,
        audit=audit,
        max_polls=max_polls,
        loop_boundary_attempts=loop_boundary_attempts,
    )
    return runner, audit


# ── ANCHOR-1: state machine 정상 경로 MERGE_READY ─────────────────────────────


def test_run_ci_pass_router_fresh_yields_merge_ready(tmp_path):
    capture: list = []
    runner, audit = _build_runner(
        tmp_path,
        ci_seq=[CIStatusSnapshot(status=CI_STATUS_PASS)],
        router_seq=[RouterCallResult(final_state=ROUTER_FRESH, reason="fresh")],
        callback_capture=capture,
    )
    result = runner.run(_canonical_handoff(), task_id="t-2642", watcher_schedule_id="sched-1")
    assert result.decision.terminal_state == TERMINAL_MERGE_READY
    assert result.decision.router_final_state == ROUTER_FRESH
    assert result.callback_fired is True
    assert result.callback_prompt_bytes > 0
    assert result.callback_prompt_bytes <= CALLBACK_ENVELOPE_BYTE_LIMIT
    assert len(capture) == 1
    # envelope contains required axes
    env = capture[0]
    assert "[CI_WATCH_HANDOFF_TERMINAL]" in env
    assert "terminal_state=MERGE_READY" in env
    assert "watcher_owner=dev6-cron-watcher" in env
    assert "watcher_schedule_id=sched-1" in env
    assert f"canonical_root={CANONICAL_ROOT}" in env
    assert f"owner_key={ANU_COLLECTOR_KEY}" in env


# ── ANCHOR-2: router → terminal 매핑 (state별 1:1) ────────────────────────────


def test_router_stale_maps_to_gemini_external_trigger_stale(tmp_path):
    runner, _ = _build_runner(
        tmp_path,
        ci_seq=[CIStatusSnapshot(status=CI_STATUS_PASS)],
        router_seq=[RouterCallResult(final_state=ROUTER_STALE)],
    )
    result = runner.run(_canonical_handoff())
    assert result.decision.terminal_state == TERMINAL_GEMINI_EXTERNAL_TRIGGER_STALE
    assert "OWNER nudge 1회 hard limit" in result.decision.reason


@pytest.mark.parametrize(
    "router_state,expected_reason_fragment",
    [
        (ROUTER_NUDGE_PERMISSION_DENIED, "NUDGE_PERMISSION_DENIED"),
        (ROUTER_CHAIR_UI_FALLBACK, "CHAIR_UI_FALLBACK_REQUIRED"),
        (ROUTER_NUDGE_FAILED, "NUDGE_FAILED"),
        (ROUTER_NOT_GEMINI_TRIGGER, "NOT_GEMINI_TRIGGER"),
    ],
)
def test_router_critical_states_map_to_chair_required(tmp_path, router_state, expected_reason_fragment):
    runner, _ = _build_runner(
        tmp_path,
        ci_seq=[CIStatusSnapshot(status=CI_STATUS_PASS)],
        router_seq=[RouterCallResult(final_state=router_state)],
    )
    result = runner.run(_canonical_handoff())
    assert result.decision.terminal_state == TERMINAL_CHAIR_REQUIRED
    assert expected_reason_fragment in result.decision.reason


def test_router_nudge_posted_continues_polling(tmp_path):
    runner, _ = _build_runner(
        tmp_path,
        ci_seq=[
            CIStatusSnapshot(status=CI_STATUS_PASS),
            CIStatusSnapshot(status=CI_STATUS_PASS),
        ],
        router_seq=[
            RouterCallResult(final_state=ROUTER_NUDGE_POSTED),
            RouterCallResult(final_state=ROUTER_FRESH),
        ],
    )
    result = runner.run(_canonical_handoff())
    assert result.decision.terminal_state == TERMINAL_MERGE_READY
    assert result.decision.loop_iterations == 2


def test_router_nudge_deduped_continues_polling(tmp_path):
    runner, _ = _build_runner(
        tmp_path,
        ci_seq=[
            CIStatusSnapshot(status=CI_STATUS_PASS),
            CIStatusSnapshot(status=CI_STATUS_PASS),
        ],
        router_seq=[
            RouterCallResult(final_state=ROUTER_NUDGE_DEDUPED),
            RouterCallResult(final_state=ROUTER_FRESH),
        ],
    )
    result = runner.run(_canonical_handoff())
    assert result.decision.terminal_state == TERMINAL_MERGE_READY


# ── ANCHOR-3: auto_remediation outcome 분기 ──────────────────────────────────


def test_ci_fail_remediation_applied_recovers_to_merge_ready(tmp_path):
    runner, _ = _build_runner(
        tmp_path,
        ci_seq=[
            CIStatusSnapshot(status=CI_STATUS_FAIL, severity="medium"),
            CIStatusSnapshot(status=CI_STATUS_PASS),
        ],
        router_seq=[RouterCallResult(final_state=ROUTER_FRESH)],
        remediate_seq=[REMEDIATE_APPLIED],
    )
    result = runner.run(_canonical_handoff())
    assert result.decision.terminal_state == TERMINAL_MERGE_READY
    assert result.decision.auto_remediation_attempts == 1


def test_ci_fail_remediation_non_remediable_terminal(tmp_path):
    runner, _ = _build_runner(
        tmp_path,
        ci_seq=[CIStatusSnapshot(status=CI_STATUS_FAIL, severity="critical", failing_checks=("test-x",))],
        remediate_seq=[REMEDIATE_NON_REMEDIABLE],
    )
    result = runner.run(_canonical_handoff())
    assert result.decision.terminal_state == TERMINAL_CI_FAILED_NON_REMEDIABLE
    assert "non-remediable" in result.decision.reason


def test_ci_fail_remediation_forbidden_hit_escalates_chair(tmp_path):
    runner, _ = _build_runner(
        tmp_path,
        ci_seq=[CIStatusSnapshot(status=CI_STATUS_FAIL, severity="medium")],
        remediate_seq=[REMEDIATE_FORBIDDEN_HIT],
    )
    result = runner.run(_canonical_handoff())
    assert result.decision.terminal_state == TERMINAL_CHAIR_REQUIRED


def test_ci_fail_remediation_loop_boundary_outcome(tmp_path):
    runner, _ = _build_runner(
        tmp_path,
        ci_seq=[CIStatusSnapshot(status=CI_STATUS_FAIL, severity="high")],
        remediate_seq=[REMEDIATE_LOOP_BOUNDARY],
    )
    result = runner.run(_canonical_handoff())
    assert result.decision.terminal_state == TERMINAL_LOOP_BOUNDARY


def test_loop_boundary_same_function_repeated(tmp_path):
    """3회 attempt + same_function_high_repeated=True → 4번째 poll 에서 LOOP_BOUNDARY."""
    runner, _ = _build_runner(
        tmp_path,
        ci_seq=[
            CIStatusSnapshot(status=CI_STATUS_FAIL, severity="non-critical-high", same_function_high_repeated=True),
            CIStatusSnapshot(status=CI_STATUS_FAIL, severity="non-critical-high", same_function_high_repeated=True),
            CIStatusSnapshot(status=CI_STATUS_FAIL, severity="non-critical-high", same_function_high_repeated=True),
            CIStatusSnapshot(status=CI_STATUS_FAIL, severity="non-critical-high", same_function_high_repeated=True),
        ],
        remediate_seq=[REMEDIATE_APPLIED, REMEDIATE_APPLIED, REMEDIATE_APPLIED],
        loop_boundary_attempts=3,
    )
    result = runner.run(_canonical_handoff())
    assert result.decision.terminal_state == TERMINAL_LOOP_BOUNDARY
    assert result.decision.auto_remediation_attempts == 3
    assert result.decision.loop_iterations == 4


# ── ANCHOR: forbidden_path immediate escalation ──────────────────────────────


def test_forbidden_path_touched_immediate_chair_required(tmp_path):
    runner, _ = _build_runner(
        tmp_path,
        ci_seq=[
            CIStatusSnapshot(status=CI_STATUS_FAIL, severity="critical", forbidden_path_touched=True)
        ],
    )
    result = runner.run(_canonical_handoff())
    assert result.decision.terminal_state == TERMINAL_CHAIR_REQUIRED
    assert "forbidden_paths" in result.decision.reason
    # auto_remediation 호출 0 (forbidden 가 우선)
    assert result.decision.auto_remediation_attempts == 0


# ── ANCHOR: pending → polling continue ───────────────────────────────────────


def test_pending_ci_continues_polling(tmp_path):
    runner, _ = _build_runner(
        tmp_path,
        ci_seq=[
            CIStatusSnapshot(status=CI_STATUS_PENDING),
            CIStatusSnapshot(status=CI_STATUS_PENDING),
            CIStatusSnapshot(status=CI_STATUS_PASS),
        ],
        router_seq=[RouterCallResult(final_state=ROUTER_FRESH)],
    )
    result = runner.run(_canonical_handoff())
    assert result.decision.terminal_state == TERMINAL_MERGE_READY
    assert result.decision.loop_iterations == 3


# ── ANCHOR: max_polls timeout → CHAIR_REQUIRED ───────────────────────────────


def test_max_polls_timeout_yields_chair_required(tmp_path):
    runner, _ = _build_runner(
        tmp_path,
        ci_seq=[CIStatusSnapshot(status=CI_STATUS_PENDING)] * 5,
        max_polls=5,
    )
    result = runner.run(_canonical_handoff())
    assert result.decision.terminal_state == TERMINAL_CHAIR_REQUIRED
    assert "max_polls" in result.decision.reason


# ── ANCHOR-4: envelope ≤3900 bytes hard limit ────────────────────────────────


def test_envelope_byte_count_within_limit(tmp_path):
    capture: list = []
    runner, _ = _build_runner(
        tmp_path,
        ci_seq=[CIStatusSnapshot(status=CI_STATUS_PASS)],
        router_seq=[RouterCallResult(final_state=ROUTER_FRESH)],
        callback_capture=capture,
    )
    result = runner.run(_canonical_handoff())
    # measure UTF-8 byte count (NOT character count)
    measured = len(capture[0].encode("utf-8"))
    assert measured == result.callback_prompt_bytes
    assert measured <= CALLBACK_ENVELOPE_BYTE_LIMIT


# ── ANCHOR-5: terminal_states subset 외부 → CHAIR_REQUIRED escalate ──────────


def test_terminal_outside_subset_escalates_to_chair_required(tmp_path):
    """handoff.terminal_states 에 MERGE_READY 가 빠진 경우 escalate."""
    handoff = _canonical_handoff(terminal_states=["CHAIR_REQUIRED", "LOOP_BOUNDARY"])
    runner, _ = _build_runner(
        tmp_path,
        ci_seq=[CIStatusSnapshot(status=CI_STATUS_PASS)],
        router_seq=[RouterCallResult(final_state=ROUTER_FRESH)],
    )
    result = runner.run(handoff)
    assert result.decision.terminal_state == TERMINAL_CHAIR_REQUIRED
    assert "subset" in result.decision.reason


# ── ANCHOR: callback_on_terminal_state=False → 발사 0 ────────────────────────


def test_callback_disabled_does_not_fire(tmp_path):
    capture: list = []
    handoff = _canonical_handoff(callback_on_terminal_state=False)
    runner, audit = _build_runner(
        tmp_path,
        ci_seq=[CIStatusSnapshot(status=CI_STATUS_PASS)],
        router_seq=[RouterCallResult(final_state=ROUTER_FRESH)],
        callback_capture=capture,
    )
    result = runner.run(handoff)
    assert result.decision.terminal_state == TERMINAL_MERGE_READY
    assert result.callback_fired is False
    assert result.callback_prompt_bytes == 0
    assert capture == []
    # audit 에 CALLBACK_FIRED record 0
    lines = audit.path.read_text(encoding="utf-8").strip().splitlines()
    events = [json.loads(l)["event"] for l in lines]
    assert EVENT_CALLBACK_FIRED not in events


# ── ANCHOR: audit event sequence 검증 ────────────────────────────────────────


def test_audit_event_sequence_merge_ready(tmp_path):
    runner, audit = _build_runner(
        tmp_path,
        ci_seq=[CIStatusSnapshot(status=CI_STATUS_PASS)],
        router_seq=[RouterCallResult(final_state=ROUTER_FRESH)],
    )
    runner.run(_canonical_handoff(), task_id="t-2642", watcher_schedule_id="sched-1")
    lines = audit.path.read_text(encoding="utf-8").strip().splitlines()
    events = [json.loads(l)["event"] for l in lines]
    assert events == [
        EVENT_HANDOFF_RECEIVED,
        EVENT_POLL_TICK,
        EVENT_OWNER_NUDGE,
        EVENT_TERMINAL_REACHED,
        EVENT_CALLBACK_FIRED,
    ]


def test_audit_records_watcher_schedule_id(tmp_path):
    runner, audit = _build_runner(
        tmp_path,
        ci_seq=[CIStatusSnapshot(status=CI_STATUS_PASS)],
        router_seq=[RouterCallResult(final_state=ROUTER_FRESH)],
    )
    runner.run(_canonical_handoff(), task_id="t-2642", watcher_schedule_id="sched-xyz")
    rows = audit.records_for_pr_head(pr_number=146, head=HEAD)
    assert all(r.get("watcher_schedule_id") == "sched-xyz" for r in rows)
    assert all(r.get("watcher_owner") == "dev6-cron-watcher" for r in rows)


# ── ANCHOR: contract violations ──────────────────────────────────────────────


def test_runner_rejects_missing_ci_status_fn(tmp_path):
    with pytest.raises(RunnerContractError, match="ci_status_fn"):
        CiWatchHandoffRunner(
            workspace_root=tmp_path,
            ci_status_fn=None,
            gemini_router_call_fn=lambda h: RouterCallResult(final_state=ROUTER_FRESH),
        )


def test_runner_rejects_missing_gemini_router_call_fn(tmp_path):
    with pytest.raises(RunnerContractError, match="gemini_router_call_fn"):
        CiWatchHandoffRunner(
            workspace_root=tmp_path,
            ci_status_fn=lambda h: CIStatusSnapshot(status=CI_STATUS_PASS),
            gemini_router_call_fn=None,
        )


def test_runner_rejects_invalid_max_polls(tmp_path):
    with pytest.raises(RunnerContractError, match="max_polls"):
        CiWatchHandoffRunner(
            workspace_root=tmp_path,
            ci_status_fn=lambda h: CIStatusSnapshot(status=CI_STATUS_PASS),
            gemini_router_call_fn=lambda h: RouterCallResult(final_state=ROUTER_FRESH),
            max_polls=0,
        )


def test_runner_rejects_invalid_ci_status_return(tmp_path):
    def bad_ci_fn(_h: dict):
        return CIStatusSnapshot(status="UNKNOWN")

    runner, _ = _build_runner(
        tmp_path,
        ci_seq=[CIStatusSnapshot(status=CI_STATUS_PASS)],  # not used
    )
    runner._ci_status_fn = bad_ci_fn  # type: ignore[attr-defined]
    with pytest.raises(RunnerContractError, match="invalid status"):
        runner.run(_canonical_handoff())


def test_runner_rejects_invalid_remediation_outcome(tmp_path):
    runner, _ = _build_runner(
        tmp_path,
        ci_seq=[CIStatusSnapshot(status=CI_STATUS_FAIL, severity="medium")],
        remediate_seq=["BOGUS_OUTCOME"],
    )
    with pytest.raises(RunnerContractError, match="invalid outcome"):
        runner.run(_canonical_handoff())
