"""task-2560 FUC-4 per-PR exception isolation 회귀 테스트.

회장 §명시 2026-05-12 Track A 1순위 본질 1:1:
  ExecutorScheduler._handle_single_diagnosis 에서 한 PR 예외가 전체 cycle 을
  중단하지 않게 한다. per-PR try/except isolation + non-critical exception 시
  cycle 유지.

회장 §명시 필수 테스트 6 (작업 지시서 1:1):
  1. PR A diagnosis raises exception
  2. PR B diagnosis still processed
  3. failed PR marker 생성
  4. next PR owner_trigger path 정상 유지
  5. whole cycle crash 0
  6. long polling 0

추가 critical 분류 회귀:
  7. PR C critical exception (PermissionError) → ESCALATED marker 박제
  8. exception summary 가 decision/audit JSONL 양쪽에 기록
  9. BotSessionExitRequired 는 isolation 으로 삼키지 않고 전파
"""

from __future__ import annotations

import json
import sys
from pathlib import Path


WORKSPACE_ROOT = Path(__file__).resolve().parents[2]
if str(WORKSPACE_ROOT) not in sys.path:
    sys.path.insert(0, str(WORKSPACE_ROOT))

from anu_v2.executor_scheduler import (  # noqa: E402
    ACTION_OWNER_TRIGGER_DISPATCHED,
    ACTION_PR_EXCEPTION_CRITICAL_ESCALATED,
    ACTION_PR_EXCEPTION_ISOLATED,
    ExecutorScheduler,
    PR_EXCEPTION_AUDIT_SCHEMA,
    SCHEDULER_AUDIT_REL_PATH,
    SchedulerCycleResult,
    _CRITICAL_EXCEPTIONS,
    _is_critical_exception,
    _summarize_exception,
)
from anu_v2.idle_pr_diagnoser import (  # noqa: E402
    GeminiReviewMeta,
    IdlePRSnapshot,
)
from anu_v2.merge_queue_executor import MergeQueueExecutor  # noqa: E402
from anu_v2.owner_trigger_audit import OwnerTriggerAudit  # noqa: E402
from anu_v2.owner_trigger_only import (  # noqa: E402
    OwnerTriggerOnly,
    TokenBoundaryViolation,
)
from anu_v2.polling_policy import (  # noqa: E402
    BotSessionExitRequired,
)


FIXTURE_PATH = (
    Path(__file__).resolve().parents[1]
    / "fixtures"
    / "executor_per_pr_exception_isolation.json"
)


def _load_fixture() -> dict:
    return json.loads(FIXTURE_PATH.read_text(encoding="utf-8"))


def _build_snapshot(snapshot_dict: dict) -> IdlePRSnapshot:
    reviews = tuple(
        GeminiReviewMeta(commit_id=r["commit_id"], submitted_at=r["submitted_at"])
        for r in snapshot_dict.get("gemini_reviews", [])
    )
    return IdlePRSnapshot(
        number=snapshot_dict["number"],
        head_sha=snapshot_dict["head_sha"],
        head_ref=snapshot_dict["head_ref"],
        created_at=snapshot_dict["created_at"],
        gemini_reviews=reviews,
        ci_required_all_success=snapshot_dict["ci_required_all_success"],
        state=snapshot_dict.get("state", "OPEN"),
        author_is_bot=snapshot_dict.get("author_is_bot", True),
    )


def _build_merge_executor(tmp_path: Path) -> MergeQueueExecutor:
    return MergeQueueExecutor(
        gh_runner=lambda args, env: {},
        git_runner=lambda args: "",
        pytest_runner=lambda paths: 0,
        audit_writer=lambda payload: None,
        task_md_root=tmp_path,
    )


def _build_owner_trigger(
    tmp_path: Path,
    *,
    http_calls: list,
    token: str = "ghp_owner_test_token_xxxxxxxxxxxxxxxx",
) -> OwnerTriggerOnly:
    def http_post(method, path, body, headers):
        http_calls.append({"method": method, "path": path, "body": body})
        return {"id": 1, "html_url": "https://example/comments/1"}

    audit = OwnerTriggerAudit(tmp_path)
    return OwnerTriggerOnly(
        workspace_root=tmp_path,
        http_post=http_post,
        token_provider=lambda: token,
        audit=audit,
    )


# ─── 핵심 통합 테스트 — 회장 §명시 필수 6 ────────────────────────────────


def test_pr_a_exception_isolated_pr_b_proceeds(tmp_path):
    """필수 1+2+3+4+5: PR A raise / PR B 정상 처리 / failed marker / cycle 유지.

    회장 §명시 본질 1~5 1:1 박제:
      - PR A: snapshot_provider 가 정상 반환하지만, 내부에서 owner trigger 가
        runner 호출 시 RuntimeError raise. PR A 만 PR_EXCEPTION_ISOLATED + failed marker.
      - PR B: 정상 owner trigger 경로 진행 (DISPATCHED + posted marker).
      - cycle 전체 중단 0, 다음 PR 진행 유지.
    """
    fixture = _load_fixture()
    snap_a = _build_snapshot(fixture["snapshots"][0])  # PR 9001
    snap_b = _build_snapshot(fixture["snapshots"][1])  # PR 9002

    http_calls: list = []
    merge_executor = _build_merge_executor(tmp_path)
    owner_trigger = _build_owner_trigger(tmp_path, http_calls=http_calls)
    decision_dir = tmp_path / "memory" / "events"

    scheduler = ExecutorScheduler(
        workspace_root=tmp_path,
        decision_dir=decision_dir,
        snapshot_provider=lambda: [snap_a, snap_b],
        owner_trigger=owner_trigger,
        merge_executor=merge_executor,
        owner="test-owner",
        repo="test-repo",
    )

    # PR A 만 owner trigger dispatch 단계에서 RuntimeError raise. PR B 는 정상.
    original = scheduler._dispatch_owner_trigger

    def faulty_dispatch(*, diag, snapshot):
        if diag.pr_number == 9001:
            raise RuntimeError("simulated PR A runner failure")
        return original(diag=diag, snapshot=snapshot)

    scheduler._dispatch_owner_trigger = faulty_dispatch  # type: ignore[assignment]

    result = scheduler.run_one_cycle(
        env={"OWNER_GEMINI_TRIGGER_TOKEN": "ghp_owner_test_token_xxxxxxxxxxxxxxxx"},
        now=fixture["now"],
    )

    # ── 필수 5: whole cycle crash 0 — return 정상.
    assert isinstance(result, SchedulerCycleResult)
    assert result.cycle_crashed is False
    assert len(result.pr_actions) == 2

    # ── 필수 1: PR A exception isolated.
    action_a = next(a for a in result.pr_actions if a.pr_number == 9001)
    assert action_a.action == ACTION_PR_EXCEPTION_ISOLATED
    assert "RuntimeError" in action_a.runner_result
    assert "simulated PR A runner failure" in action_a.reason

    # ── 필수 3: failed PR marker 생성.
    failed_marker = decision_dir / "task-9001.pr-9001.failed"
    summary_path = decision_dir / "task-9001.pr-9001.exception.json"
    assert failed_marker.exists()
    assert summary_path.exists()
    summary = json.loads(summary_path.read_text(encoding="utf-8"))
    assert summary["schema"] == PR_EXCEPTION_AUDIT_SCHEMA
    assert summary["pr_number"] == 9001
    assert summary["critical"] is False
    assert summary["exception_summary"]["type"] == "RuntimeError"
    assert summary["chat_notifications"] == 0

    # ── 필수 2+4: PR B 정상 owner trigger path 유지.
    action_b = next(a for a in result.pr_actions if a.pr_number == 9002)
    assert action_b.action == ACTION_OWNER_TRIGGER_DISPATCHED
    assert action_b.runner_result == "POSTED"
    # PR B 의 owner trigger HTTP POST 실제 호출 1 회.
    assert len(http_calls) == 1
    assert http_calls[0]["path"] == "/repos/test-owner/test-repo/issues/9002/comments"

    # ── 필수 5: chat_notifications == 0 / exception 카운터.
    assert result.chat_notifications == 0
    assert result.pr_exceptions_isolated == 1
    assert result.pr_exceptions_critical_escalated == 0


def test_critical_exception_escalated_marker(tmp_path):
    """PR C critical exception (PermissionError) → ESCALATED marker, cycle 유지."""
    fixture = _load_fixture()
    snap_c = _build_snapshot(fixture["snapshots"][2])  # PR 9003
    snap_b = _build_snapshot(fixture["snapshots"][1])  # PR 9002 (정상)

    http_calls: list = []
    merge_executor = _build_merge_executor(tmp_path)
    owner_trigger = _build_owner_trigger(tmp_path, http_calls=http_calls)
    decision_dir = tmp_path / "memory" / "events"

    scheduler = ExecutorScheduler(
        workspace_root=tmp_path,
        decision_dir=decision_dir,
        snapshot_provider=lambda: [snap_c, snap_b],
        owner_trigger=owner_trigger,
        merge_executor=merge_executor,
        owner="test-owner",
        repo="test-repo",
    )

    original = scheduler._dispatch_owner_trigger

    def faulty_dispatch(*, diag, snapshot):
        if diag.pr_number == 9003:
            raise PermissionError("simulated critical disk permission")
        return original(diag=diag, snapshot=snapshot)

    scheduler._dispatch_owner_trigger = faulty_dispatch  # type: ignore[assignment]

    result = scheduler.run_one_cycle(
        env={"OWNER_GEMINI_TRIGGER_TOKEN": "ghp_owner_test_token_xxxxxxxxxxxxxxxx"},
        now=fixture["now"],
    )

    action_c = next(a for a in result.pr_actions if a.pr_number == 9003)
    assert action_c.action == ACTION_PR_EXCEPTION_CRITICAL_ESCALATED
    assert action_c.runner_result == "PermissionError"

    crit_marker = decision_dir / "task-9003.pr-9003.critical-escalated"
    summary_path = decision_dir / "task-9003.pr-9003.exception.json"
    assert crit_marker.exists()
    assert summary_path.exists()
    summary = json.loads(summary_path.read_text(encoding="utf-8"))
    assert summary["critical"] is True
    assert summary["action"] == ACTION_PR_EXCEPTION_CRITICAL_ESCALATED

    # cycle 유지 — PR B 는 정상 동작.
    assert result.cycle_crashed is False
    action_b = next(a for a in result.pr_actions if a.pr_number == 9002)
    assert action_b.action == ACTION_OWNER_TRIGGER_DISPATCHED
    assert result.pr_exceptions_critical_escalated == 1
    assert result.pr_exceptions_isolated == 0


def test_three_prs_one_each_outcome_cycle_completes(tmp_path):
    """필수 1~6 종합: PR A isolated / PR B clean / PR C critical 한 cycle 에 공존."""
    fixture = _load_fixture()
    snapshots = [_build_snapshot(s) for s in fixture["snapshots"]]

    http_calls: list = []
    merge_executor = _build_merge_executor(tmp_path)
    owner_trigger = _build_owner_trigger(tmp_path, http_calls=http_calls)
    decision_dir = tmp_path / "memory" / "events"

    scheduler = ExecutorScheduler(
        workspace_root=tmp_path,
        decision_dir=decision_dir,
        snapshot_provider=lambda: snapshots,
        owner_trigger=owner_trigger,
        merge_executor=merge_executor,
        owner="test-owner",
        repo="test-repo",
    )

    original = scheduler._dispatch_owner_trigger

    def faulty_dispatch(*, diag, snapshot):
        if diag.pr_number == 9001:
            raise RuntimeError("simulated non-critical")
        if diag.pr_number == 9003:
            raise OSError("simulated disk full")
        return original(diag=diag, snapshot=snapshot)

    scheduler._dispatch_owner_trigger = faulty_dispatch  # type: ignore[assignment]

    result = scheduler.run_one_cycle(
        env={"OWNER_GEMINI_TRIGGER_TOKEN": "ghp_owner_test_token_xxxxxxxxxxxxxxxx"},
        now=fixture["now"],
    )

    expected = fixture["expected_invariants"]
    assert result.cycle_crashed is expected["cycle_crashed"]
    assert result.chat_notifications == expected["chat_notifications"]
    assert result.pr_exceptions_isolated == expected["pr_exceptions_isolated"]
    assert result.pr_exceptions_critical_escalated == expected["pr_exceptions_critical_escalated"]
    assert len(result.pr_actions) == expected["pr_actions_count"]

    # 필수 6: long polling 0 — cycle 은 한 번만 실행 후 즉시 반환.
    # PollingState 기본값에서 rechecks_done == 0, 외부 cron 이 다음 cycle 책임.
    assert result.rechecks_done == 0

    # audit JSONL 에 3 record 모두 박제.
    audit_path = tmp_path / SCHEDULER_AUDIT_REL_PATH
    audit_lines = [
        json.loads(ln)
        for ln in audit_path.read_text(encoding="utf-8").splitlines()
        if ln
    ]
    assert len(audit_lines) == 3
    audit_actions = {rec["pr_number"]: rec["action"] for rec in audit_lines}
    assert audit_actions[9001] == ACTION_PR_EXCEPTION_ISOLATED
    assert audit_actions[9002] == ACTION_OWNER_TRIGGER_DISPATCHED
    assert audit_actions[9003] == ACTION_PR_EXCEPTION_CRITICAL_ESCALATED
    # audit chat_notifications == 0 박제.
    for rec in audit_lines:
        assert rec["chat_notifications"] == 0


# ─── 부수 어셀션 — long polling 0 / cycle exit 신호 ──────────────────────


def test_long_polling_zero_cycle_returns_immediately(tmp_path):
    """필수 6: long polling 0 — run_one_cycle 은 단일 scan 후 즉시 반환.

    본 모듈에 sleep loop / while True 가 없다는 사실을 동작 단위로 검증.
    """
    fixture = _load_fixture()
    snap_b = _build_snapshot(fixture["snapshots"][1])
    merge_executor = _build_merge_executor(tmp_path)
    owner_trigger = _build_owner_trigger(tmp_path, http_calls=[])
    scheduler = ExecutorScheduler(
        workspace_root=tmp_path,
        decision_dir=tmp_path / "memory" / "events",
        snapshot_provider=lambda: [snap_b],
        owner_trigger=owner_trigger,
        merge_executor=merge_executor,
        owner="test-owner",
        repo="test-repo",
    )
    import time

    started = time.monotonic()
    result = scheduler.run_one_cycle(
        env={"OWNER_GEMINI_TRIGGER_TOKEN": "ghp_owner_test_token_xxxxxxxxxxxxxxxx"},
        now=fixture["now"],
    )
    elapsed = time.monotonic() - started
    # cycle 은 sleep 없이 빠르게 반환되어야 한다 (보수적 임계 5s).
    assert elapsed < 5.0
    assert result.rechecks_done == 0
    # 외부 cron 이 다음 cycle 을 책임지므로 bot_should_exit 는 polling 정책 평가.
    assert isinstance(result.bot_should_exit, bool)


def test_bot_session_exit_required_not_swallowed(tmp_path):
    """BotSessionExitRequired 는 per-PR isolation 으로 삼키지 않고 전파.

    회장 §9: polling 정책상 exit 신호는 cycle 정상 종료의 표현. per-PR
    isolation 으로 catch 하면 봇이 영원히 안 죽음 → 정책 위반.
    """
    import pytest

    fixture = _load_fixture()
    snap_b = _build_snapshot(fixture["snapshots"][1])
    merge_executor = _build_merge_executor(tmp_path)
    owner_trigger = _build_owner_trigger(tmp_path, http_calls=[])
    scheduler = ExecutorScheduler(
        workspace_root=tmp_path,
        decision_dir=tmp_path / "memory" / "events",
        snapshot_provider=lambda: [snap_b],
        owner_trigger=owner_trigger,
        merge_executor=merge_executor,
        owner="test-owner",
        repo="test-repo",
    )

    def exit_dispatch(*, diag, snapshot):
        raise BotSessionExitRequired("polling MAX_RECHECKS exceeded")

    scheduler._dispatch_owner_trigger = exit_dispatch  # type: ignore[assignment]

    with pytest.raises(BotSessionExitRequired):
        scheduler.run_one_cycle(
            env={"OWNER_GEMINI_TRIGGER_TOKEN": "ghp_owner_test_token_xxxxxxxxxxxxxxxx"},
            now=fixture["now"],
        )


# ─── 분류 / summary 단위 테스트 ──────────────────────────────────────────


def test_critical_exception_classification_matches_7():
    """Critical 7 분류 표 (회장 §명시 본질 6) 1:1."""
    assert TokenBoundaryViolation in _CRITICAL_EXCEPTIONS
    assert PermissionError in _CRITICAL_EXCEPTIONS
    assert OSError in _CRITICAL_EXCEPTIONS
    assert MemoryError in _CRITICAL_EXCEPTIONS
    assert NotImplementedError in _CRITICAL_EXCEPTIONS
    assert TypeError in _CRITICAL_EXCEPTIONS
    assert AttributeError in _CRITICAL_EXCEPTIONS
    assert len(_CRITICAL_EXCEPTIONS) == 7

    assert _is_critical_exception(PermissionError("x")) is True
    assert _is_critical_exception(OSError("x")) is True
    assert _is_critical_exception(RuntimeError("x")) is False
    assert _is_critical_exception(ValueError("x")) is False


def test_exception_summary_origin_recorded():
    """``_summarize_exception`` 은 type/message/origin filename 박제 (PII 0)."""
    def _raiser():
        raise RuntimeError("simulated for summary test")

    summary: dict = {}
    try:
        _raiser()
    except RuntimeError as exc:
        summary = _summarize_exception(exc)
    assert summary["type"] == "RuntimeError"
    assert summary["message"] == "simulated for summary test"
    assert summary["origin_function"] == "_raiser"
    assert summary["origin_filename"].endswith(".py")
    # 광범위한 traceback frame 누설 0 — 경로 전체가 아니라 basename 만 박제.
    assert "/" not in summary["origin_filename"]
