"""task-2554+2 §5 신규 fixture #2: PR #105 fresh medium 3건 §1~§2 충족 어셀션.

회장 §명시 (2026-05-12) Gemini fresh medium 3건 (false-positive 아님, §1~§2 직접 일치):
  1. ``_iter_rows`` O(N) full scan → §2 bounded/reverse scan 으로 해결
  2. ``RESULT_PENDING`` import 누락 → §1 import 추가
  3. ``http_post`` 직전 PENDING 미기록 → §1 fail-closed crash-safety 추가

본 fixture 는 위 3건이 모두 충족됨을 정적/동적 양쪽으로 어셀션.
"""

from __future__ import annotations

import json
from pathlib import Path

import pytest

from anu_v2 import owner_trigger_audit, owner_trigger_only
from anu_v2.owner_trigger_audit import (
    DEDUPE_SCAN_MAX_ROWS,
    OwnerTriggerAudit,
    RESULT_PENDING,
    RESULT_POSTED,
)
from anu_v2.owner_trigger_only import OwnerTriggerOnly


_HEAD_A = "a" * 40


def _write_decision(tmp_path: Path, *, pr: int = 105, head: str = _HEAD_A) -> Path:
    decision = {
        "schema": "anu_v2.owner_trigger_decision.v1",
        "task_id": "task-2554+2-test",
        "pr": pr,
        "current_head": head,
        "queue_head": True,
        "current_head_confirmed": True,
        "gemini_evidence_fresh": False,
        "nudge_count_for_pr_head": 0,
        "allowed_action": "POST_GEMINI_REVIEW_TRIGGER_COMMENT",
        "comment_body": "/gemini review",
        "allowed": True,
    }
    p = tmp_path / "decision.json"
    p.write_text(json.dumps(decision), encoding="utf-8")
    return p


# ─── fix 1: bounded reverse scan ────────────────────────────────────────────


def test_iter_rows_reverse_method_exists_and_bounded():
    """``_iter_rows_reverse`` 가 audit 모듈에 존재하고 max_rows 기본값이 DEDUPE_SCAN_MAX_ROWS."""
    assert hasattr(owner_trigger_audit.OwnerTriggerAudit, "_iter_rows_reverse")
    # 기본 max_rows 가 bounded 인지 (회장 §2)
    assert DEDUPE_SCAN_MAX_ROWS == 512


def test_iter_rows_reverse_yields_newest_first(tmp_path):
    """audit JSONL 끝 행부터 역순으로 yield."""
    audit = OwnerTriggerAudit(tmp_path)
    audit._ensure_parent()
    rows = [
        {"action": "POST_GEMINI_REVIEW_TRIGGER_COMMENT", "pr": 1, "head": _HEAD_A,
         "result": "POSTED", "token_value_logged": False},
        {"action": "POST_GEMINI_REVIEW_TRIGGER_COMMENT", "pr": 2, "head": "b" * 40,
         "result": "POSTED", "token_value_logged": False},
        {"action": "POST_GEMINI_REVIEW_TRIGGER_COMMENT", "pr": 3, "head": "c" * 40,
         "result": "POSTED", "token_value_logged": False},
    ]
    for r in rows:
        audit.append(r)
    reversed_rows = list(audit._iter_rows_reverse())
    assert len(reversed_rows) == 3
    # 가장 최근 (마지막 append) 가 첫 번째
    assert reversed_rows[0]["pr"] == 3
    assert reversed_rows[1]["pr"] == 2
    assert reversed_rows[2]["pr"] == 1


def test_iter_rows_reverse_respects_max_rows(tmp_path):
    """max_rows 한정 시 그 이상은 yield 안 함."""
    audit = OwnerTriggerAudit(tmp_path)
    audit._ensure_parent()
    for i in range(10):
        audit.append({
            "action": "POST_GEMINI_REVIEW_TRIGGER_COMMENT",
            "pr": i + 1,
            "head": f"{i:040x}",
            "result": "POSTED",
            "token_value_logged": False,
        })
    sample = list(audit._iter_rows_reverse(max_rows=3))
    assert len(sample) == 3
    # 마지막 3개 (pr=10, 9, 8) 가 newest-first 순으로
    assert [r["pr"] for r in sample] == [10, 9, 8]


def test_dedupe_uses_bounded_scan_not_full_file(tmp_path):
    """``_has_posted`` / ``_has_active_trigger`` 가 ``_iter_rows`` 가 아닌 reverse scan 을 사용.

    동작 어셀션: 작은 audit 에서도 결과가 같고, 파일이 거대해질 때 dedupe 비용이 일정.
    검증: 정적 (소스 코드 grep) + 동적 (큰 파일에서 마지막 1행이 dedupe 결정).
    """
    audit = OwnerTriggerAudit(tmp_path)
    # 200 행 추가 (max_rows=512 안 함)
    audit._ensure_parent()
    for i in range(200):
        audit.append({
            "action": "POST_GEMINI_REVIEW_TRIGGER_COMMENT",
            "pr": 999,
            "head": f"{i:040x}",
            "result": "POSTED",
            "token_value_logged": False,
        })
    # 마지막 head 에 대해 _has_posted 가 True 인지 (가장 최근 행이 검출되어야)
    last_head = f"{199:040x}"
    assert audit._has_posted(pr=999, head=last_head) is True
    # 정적 grep: _has_posted/_has_active_trigger 는 _iter_rows_reverse 호출
    import inspect

    source = inspect.getsource(owner_trigger_audit.OwnerTriggerAudit._has_posted)
    assert "_iter_rows_reverse" in source, "_has_posted must use bounded reverse scan"
    source_active = inspect.getsource(owner_trigger_audit.OwnerTriggerAudit._has_active_trigger)
    assert "_iter_rows_reverse" in source_active, "_has_active_trigger must use bounded reverse scan"


# ─── fix 2: RESULT_PENDING import ────────────────────────────────────────────


def test_result_pending_imported_in_owner_trigger_only():
    """``RESULT_PENDING`` 가 owner_trigger_only 모듈에 import 되어 있어야 함."""
    assert hasattr(owner_trigger_only, "RESULT_PENDING")
    assert owner_trigger_only.RESULT_PENDING == "PENDING"


def test_result_pending_constant_value_matches_audit_module():
    """audit 모듈의 RESULT_PENDING 과 동일 상수."""
    assert RESULT_PENDING == "PENDING"
    assert owner_trigger_only.RESULT_PENDING is RESULT_PENDING or owner_trigger_only.RESULT_PENDING == RESULT_PENDING


# ─── fix 3: http_post 직전 PENDING 기록 (crash-safety) ───────────────────────


def test_pending_record_written_before_http_post(tmp_path):
    """http_post 호출 시점에 audit 에 PENDING 이 이미 기록되어 있어야 함 (crash-safety)."""
    decision_path = _write_decision(tmp_path)
    audit = OwnerTriggerAudit(tmp_path)
    recorded_at_http_post: list[list[dict]] = []

    def http_post(method, path, body, headers):
        # http_post 호출 직전 audit 에 PENDING 이 있어야 함
        recorded_at_http_post.append(list(audit._iter_rows()))
        return {"status": 201}

    mod = OwnerTriggerOnly(
        workspace_root=tmp_path,
        http_post=http_post,
        token_provider=lambda: "owner-token",
        audit=audit,
    )
    result = mod.trigger_gemini_review(
        decision_path=decision_path, owner="o", repo="r", current_head_actual=_HEAD_A,
    )
    assert result.status == RESULT_POSTED
    # http_post 호출 시점에 audit 에 PENDING 한 행이 있었어야 함
    assert len(recorded_at_http_post) == 1
    snapshot = recorded_at_http_post[0]
    assert len(snapshot) == 1
    assert snapshot[0]["result"] == "PENDING"
    assert snapshot[0]["pr"] == 105
    # 최종 상태: PENDING + POSTED = 2 행
    final_rows = list(audit._iter_rows())
    assert len(final_rows) == 2
    assert final_rows[0]["result"] == "PENDING"
    assert final_rows[1]["result"] == "POSTED"


def test_pending_record_persists_after_simulated_crash(tmp_path):
    """http_post 직후 process crash 시뮬레이션 — PENDING 만 audit 에 남아있어야 함."""
    decision_path = _write_decision(tmp_path)
    audit = OwnerTriggerAudit(tmp_path)

    class _SimulatedCrash(SystemExit):
        pass

    def http_post(method, path, body, headers):
        # 일반 예외 아닌 SystemExit subclass — try/except 가 잡지 않도록 (실제 process crash 흉내)
        raise _SimulatedCrash("crash after http_post invocation")

    mod = OwnerTriggerOnly(
        workspace_root=tmp_path,
        http_post=http_post,
        token_provider=lambda: "owner-token",
        audit=audit,
    )
    # task-2554+2 §1: 일반 Exception 만 잡고 BaseException 은 통과시키므로 SystemExit 가 전파됨.
    with pytest.raises(_SimulatedCrash):
        mod.trigger_gemini_review(
            decision_path=decision_path, owner="o", repo="r", current_head_actual=_HEAD_A,
        )
    # crash 후에도 PENDING 은 audit 에 살아있어야 함 (crash-safety fail-closed)
    rows = list(audit._iter_rows())
    # owner_trigger_only 가 일반 Exception 만 catch 하므로 SystemExit 전파.
    # BUT _SimulatedCrash 는 SystemExit 이고 generic `except Exception` 으로는 잡히지 않음.
    # 그래도 PENDING 은 transaction.record 로 fsync 되어 disk 에 남아있음.
    assert len(rows) >= 1
    assert rows[0]["result"] == "PENDING"


def test_pending_then_failed_allows_retry(tmp_path):
    """PENDING → FAILED 후 retry 가 DEDUPED 가 아닌 새 trigger 로 진행."""
    decision_path = _write_decision(tmp_path)
    audit = OwnerTriggerAudit(tmp_path)
    state = {"calls": 0}

    def http_post(method, path, body, headers):
        state["calls"] += 1
        if state["calls"] == 1:
            raise RuntimeError("transient network error")
        return {"status": 201}

    mod = OwnerTriggerOnly(
        workspace_root=tmp_path,
        http_post=http_post,
        token_provider=lambda: "owner-token",
        audit=audit,
    )
    with pytest.raises(RuntimeError):
        mod.trigger_gemini_review(
            decision_path=decision_path, owner="o", repo="r", current_head_actual=_HEAD_A,
        )
    # 1st: PENDING + FAILED
    after_first = list(audit._iter_rows())
    assert len(after_first) == 2
    assert after_first[0]["result"] == "PENDING"
    assert after_first[1]["result"] == "FAILED"
    # retry — 새 PENDING + POSTED
    r2 = mod.trigger_gemini_review(
        decision_path=decision_path, owner="o", repo="r", current_head_actual=_HEAD_A,
    )
    assert r2.status == RESULT_POSTED
    final = list(audit._iter_rows())
    assert final[-1]["result"] == "POSTED"
    assert final[-2]["result"] == "PENDING"
