"""anu_v2.tests.test_owner_trigger_race_fix_2554plus1 — HIGH race condition fix 회귀 (task-2554+1).

회장 §명시 (2026-05-12 KST) 필수 fix §1~§6 + medium #3/#4/#6:
  §1 dedupe check 와 API call 사이의 TOCTOU 제거
  §2 PR/head 단위 중복 trigger 가 동시 실행에서도 불가능
  §3 fcntl.flock sidecar lock atomic 적용
  §4 lock 범위 안에서 check → record → call → update 순서 유지
  §5 API call 성공 후 audit fail 시 fail-closed
  §6 same PR/head 동시 2 proc → comment 1 회 (concurrency 파일에서 검증)

  medium #3: _read_all 스트리밍 개선 — _iter_rows
  medium #4: http_post 예외 시 audit FAILED 기록
  medium #6: _read_all 손상 라인 관용 (json.JSONDecodeError skip)
  medium #2/#5: dead code _resolve_owner_repo 제거 정적 검사

본 회귀는 anu_v2/* 모듈만 import 한다 (one-way isolation).
"""

from __future__ import annotations

import json
import sys
from pathlib import Path

import pytest

WORKSPACE_ROOT = Path(__file__).resolve().parents[2]
if str(WORKSPACE_ROOT) not in sys.path:
    sys.path.insert(0, str(WORKSPACE_ROOT))

from anu_v2.owner_trigger_audit import (  # noqa: E402
    AUDIT_REL_PATH,
    ALLOWED_ACTION,
    DedupeViolation,
    OwnerTriggerAudit,
    RESULT_DEDUPED,
    RESULT_FAILED,
    RESULT_PENDING,
    RESULT_POSTED,
)
from anu_v2.owner_trigger_only import OwnerTriggerOnly  # noqa: E402


_HEAD_A = "a" * 40
_HEAD_B = "b" * 40


def _write_decision(tmp_path: Path, *, pr: int = 103, head: str = _HEAD_A) -> Path:
    d = {
        "schema": "anu_v2.owner_trigger_decision.v1",
        "task_id": "task-2554+1",
        "pr": pr,
        "current_head": head,
        "queue_head": True,
        "current_head_confirmed": True,
        "gemini_evidence_fresh": False,
        "nudge_count_for_pr_head": 0,
        "allowed_action": "POST_GEMINI_REVIEW_TRIGGER_COMMENT",
        "comment_body": "/gemini review",
        "allowed": True,
    }
    p = tmp_path / "decision.json"
    p.write_text(json.dumps(d), encoding="utf-8")
    return p


def _build_module(
    tmp_path: Path,
    *,
    http_post=None,
    token: str = "tok-RACE-FIX-MUST-NOT-LEAK-2554plus1",
    posts: list | None = None,
):
    if posts is None:
        posts = []

    def _default_http_post(method, path, body, headers):
        posts.append({"method": method, "path": path, "body": body, "headers": headers})
        return {"status": 201, "id": 9999}

    audit = OwnerTriggerAudit(tmp_path)
    mod = OwnerTriggerOnly(
        workspace_root=tmp_path,
        http_post=http_post or _default_http_post,
        token_provider=lambda: token,
        audit=audit,
    )
    return mod, posts, audit


# ─── §1~§4: transaction atomic — check_dedupe + http_post + record 단일 lock ─


def test_transaction_context_manager_exists_on_audit(tmp_path):
    """audit.transaction() context manager 가 정의되어 있다 (race fix 진입점)."""
    audit = OwnerTriggerAudit(tmp_path)
    assert hasattr(audit, "transaction"), "OwnerTriggerAudit.transaction must exist (race fix)"
    # context manager 호출 가능 — 별 부수효과 없이 진입/이탈 가능
    with audit.transaction() as txn:
        assert hasattr(txn, "check_dedupe")
        assert hasattr(txn, "record")


def test_transaction_sidecar_lock_path_separated_from_audit_path(tmp_path):
    """sidecar lock 파일은 audit JSONL 본 파일과 다른 경로."""
    audit = OwnerTriggerAudit(tmp_path)
    assert audit.lock_path != audit.path
    assert str(audit.lock_path).endswith(".jsonl.lock")
    assert audit.path == tmp_path / AUDIT_REL_PATH


def test_transaction_record_blocks_posted_under_lock(tmp_path):
    """transaction 안에서 record(POSTED) 후 같은 txn 안의 check_dedupe 가 DedupeViolation 차단."""
    audit = OwnerTriggerAudit(tmp_path)
    with audit.transaction() as txn:
        txn.record(
            {
                "task_id": "task-2554+1",
                "pr": 103,
                "head": _HEAD_A,
                "action": ALLOWED_ACTION,
                "result": RESULT_POSTED,
                "comment_body": "/gemini review",
                "endpoint": "/repos/o/r/issues/103/comments",
                "decision_path": "decision.json",
                "token_present": True,
                "token_hash_prefix": "deadbeef",
            }
        )
    # 새 transaction 에서 dedupe 확인
    with audit.transaction() as txn:
        with pytest.raises(DedupeViolation):
            txn.check_dedupe(pr=103, head=_HEAD_A)


def test_transaction_record_blocks_pending_under_lock(tmp_path):
    """PENDING sentinel 도 transaction.check_dedupe 가 차단 (fail-closed crash safety)."""
    audit = OwnerTriggerAudit(tmp_path)
    with audit.transaction() as txn:
        txn.record(
            {
                "task_id": "task-2554+1",
                "pr": 103,
                "head": _HEAD_A,
                "action": ALLOWED_ACTION,
                "result": RESULT_PENDING,
                "comment_body": "/gemini review",
                "endpoint": "/repos/o/r/issues/103/comments",
                "decision_path": "decision.json",
                "token_present": True,
                "token_hash_prefix": "deadbeef",
            }
        )
    with audit.transaction() as txn:
        with pytest.raises(DedupeViolation):
            txn.check_dedupe(pr=103, head=_HEAD_A)


def test_transaction_failed_does_not_block_retry(tmp_path):
    """FAILED 결과는 dedupe 차단 사유가 아님 — 다음 시도 허용."""
    audit = OwnerTriggerAudit(tmp_path)
    with audit.transaction() as txn:
        txn.record(
            {
                "task_id": "task-2554+1",
                "pr": 103,
                "head": _HEAD_A,
                "action": ALLOWED_ACTION,
                "result": RESULT_FAILED,
                "comment_body": "/gemini review",
                "endpoint": "/repos/o/r/issues/103/comments",
                "decision_path": "decision.json",
                "token_present": True,
                "token_hash_prefix": "deadbeef",
                "error_code": "HTTP_POST_FAIL",
            }
        )
    # 다음 시도는 dedupe 차단되지 않음
    with audit.transaction() as txn:
        txn.check_dedupe(pr=103, head=_HEAD_A)  # raise X


# ─── §5: API call 성공 후 audit fail 시 fail-closed ──────────────────────────


def test_http_post_called_inside_lock_and_audit_posted_recorded(tmp_path):
    """trigger_gemini_review 전체 흐름이 transaction lock 안에서 직렬화되며 POSTED 기록."""
    decision_path = _write_decision(tmp_path)
    mod, posts, audit = _build_module(tmp_path)

    result = mod.trigger_gemini_review(
        decision_path=decision_path,
        owner="o",
        repo="r",
        current_head_actual=_HEAD_A,
    )
    assert result.status == RESULT_POSTED
    assert len(posts) == 1
    rows = audit._read_all()
    # task-2554+2 §1: http_post 직전 PENDING + 성공 후 POSTED — 2 행.
    assert len(rows) == 2
    assert rows[0]["result"] == "PENDING"
    assert rows[1]["result"] == RESULT_POSTED


def test_http_post_exception_records_failed_in_audit(tmp_path):
    """medium #4: http_post 예외 시 audit FAILED 기록 (lock 안에서 atomic).

    task-2554+2 §1: http_post 직전 PENDING + 예외 시 FAILED — 2 행.
    """
    decision_path = _write_decision(tmp_path)

    def failing_http(method, path, body, headers):
        raise RuntimeError("network timeout simulated")

    mod, _, audit = _build_module(tmp_path, http_post=failing_http, posts=[])
    with pytest.raises(RuntimeError, match="network timeout"):
        mod.trigger_gemini_review(
            decision_path=decision_path,
            owner="o",
            repo="r",
            current_head_actual=_HEAD_A,
        )
    rows = audit._read_all()
    assert len(rows) == 2
    assert rows[0]["result"] == "PENDING"
    assert rows[1]["result"] == RESULT_FAILED
    assert rows[1]["error_code"] == "HTTP_POST_FAIL"
    assert rows[1]["token_value_logged"] is False


def test_http_post_exception_does_not_leak_token_to_audit(tmp_path):
    """http_post 예외 시 audit FAILED 기록에 raw token 흔적 없음."""
    decision_path = _write_decision(tmp_path)

    secret = "ghp_RACE_FIX_SECRET_TOKEN_MUST_NEVER_LEAK_xyzz"

    def failing_http(method, path, body, headers):
        raise RuntimeError(f"include header: {headers.get('Authorization', '')}")

    audit = OwnerTriggerAudit(tmp_path)
    mod = OwnerTriggerOnly(
        workspace_root=tmp_path,
        http_post=failing_http,
        token_provider=lambda: secret,
        audit=audit,
    )
    with pytest.raises(RuntimeError):
        mod.trigger_gemini_review(
            decision_path=decision_path,
            owner="o",
            repo="r",
            current_head_actual=_HEAD_A,
        )
    raw = audit.path.read_text(encoding="utf-8")
    assert secret not in raw
    for sent in ("Bearer ", "ghp_", "github_pat_"):
        assert sent not in raw


def test_dedupe_failed_attempts_allows_retry_via_full_flow(tmp_path):
    """FAILED audit 후 다시 trigger 호출 가능 (재시도 허용)."""
    decision_path = _write_decision(tmp_path)
    state = {"calls": 0}

    def http_post(method, path, body, headers):
        state["calls"] += 1
        if state["calls"] == 1:
            raise RuntimeError("first try fails")
        return {"status": 201}

    mod, _, audit = _build_module(tmp_path, http_post=http_post, posts=[])
    with pytest.raises(RuntimeError):
        mod.trigger_gemini_review(
            decision_path=decision_path,
            owner="o",
            repo="r",
            current_head_actual=_HEAD_A,
        )
    # 두번째 시도 — 통과
    r2 = mod.trigger_gemini_review(
        decision_path=decision_path,
        owner="o",
        repo="r",
        current_head_actual=_HEAD_A,
    )
    assert r2.status == RESULT_POSTED
    rows = audit._read_all()
    # task-2554+2 §1: 1차 PENDING+FAILED, 2차 PENDING+POSTED — 4 행.
    assert len(rows) == 4
    assert rows[0]["result"] == "PENDING"
    assert rows[1]["result"] == RESULT_FAILED
    assert rows[2]["result"] == "PENDING"
    assert rows[3]["result"] == RESULT_POSTED


def test_dedupe_blocks_after_posted_in_full_flow(tmp_path):
    """동일 (pr, head) 두번째 trigger 호출 → DEDUPED + http_post 1 회만.

    task-2554+2 §1: 1차 PENDING+POSTED, 2차 check_dedupe 가 POSTED 감지 → DEDUPED — 3 행.
    """
    decision_path = _write_decision(tmp_path)
    mod, posts, audit = _build_module(tmp_path)

    r1 = mod.trigger_gemini_review(
        decision_path=decision_path, owner="o", repo="r", current_head_actual=_HEAD_A
    )
    r2 = mod.trigger_gemini_review(
        decision_path=decision_path, owner="o", repo="r", current_head_actual=_HEAD_A
    )
    assert r1.status == RESULT_POSTED
    assert r2.status == RESULT_DEDUPED
    assert len(posts) == 1
    rows = audit._read_all()
    # PENDING + POSTED + DEDUPED — 3 줄
    assert len(rows) == 3
    assert rows[0]["result"] == "PENDING"
    assert rows[1]["result"] == RESULT_POSTED
    assert rows[2]["result"] == RESULT_DEDUPED


# ─── medium #3, #6: _iter_rows 스트리밍 + JSONDecodeError 관용 ────────────────


def test_iter_rows_is_streaming_generator(tmp_path):
    """_iter_rows 는 Iterator (generator) — 전체 로딩 0."""
    import types

    audit = OwnerTriggerAudit(tmp_path)
    it = audit._iter_rows()
    assert isinstance(it, types.GeneratorType)


def test_iter_rows_tolerates_corrupted_lines(tmp_path):
    """audit 파일에 손상된 JSON 라인이 있어도 _iter_rows 가 skip + 정상 라인 yield."""
    audit = OwnerTriggerAudit(tmp_path)
    audit._ensure_parent()
    # 정상 1 줄, 손상 1 줄, 정상 1 줄
    audit.append(
        {
            "task_id": "task-2554+1",
            "pr": 103,
            "head": _HEAD_A,
            "action": ALLOWED_ACTION,
            "result": "DEDUPED",
            "comment_body": "/gemini review",
            "endpoint": "/repos/o/r/issues/103/comments",
            "decision_path": "decision.json",
            "token_present": False,
            "token_hash_prefix": "",
            "error_code": "DEDUPE",
        }
    )
    # 손상 라인 직접 append
    with open(audit.path, "a", encoding="utf-8") as fh:
        fh.write("{this-is-not-json}\n")
    audit.append(
        {
            "task_id": "task-2554+1",
            "pr": 104,
            "head": _HEAD_B,
            "action": ALLOWED_ACTION,
            "result": "DEDUPED",
            "comment_body": "/gemini review",
            "endpoint": "/repos/o/r/issues/104/comments",
            "decision_path": "decision.json",
            "token_present": False,
            "token_hash_prefix": "",
            "error_code": "DEDUPE",
        }
    )
    rows = list(audit._iter_rows())
    # 손상 1 줄 skip — 정상 2 줄만 반환
    assert len(rows) == 2
    assert rows[0]["pr"] == 103
    assert rows[1]["pr"] == 104


def test_iter_rows_empty_when_no_file(tmp_path):
    """audit 파일 미존재 시 _iter_rows 는 빈 iterator."""
    audit = OwnerTriggerAudit(tmp_path)
    rows = list(audit._iter_rows())
    assert rows == []


# ─── medium #2, #5: dead code _resolve_owner_repo 제거 정적 검사 ─────────────


def test_resolve_owner_repo_method_removed_from_module():
    """_resolve_owner_repo 메서드는 제거됨 (PR #104 baseline 대비)."""
    from anu_v2 import owner_trigger_only

    assert not hasattr(owner_trigger_only.OwnerTriggerOnly, "_resolve_owner_repo"), (
        "task-2554+1 medium #2/#5: _resolve_owner_repo dead code must be removed"
    )


def test_module_source_no_resolve_owner_repo_definition():
    """정적 grep: owner_trigger_only.py 본문에 _resolve_owner_repo 정의 0건."""
    src = (WORKSPACE_ROOT / "anu_v2" / "owner_trigger_only.py").read_text(encoding="utf-8")
    assert "def _resolve_owner_repo" not in src


# ─── module source / lock target separation static check ─────────────────────


def test_audit_source_uses_sidecar_lock_for_transaction():
    """audit 본 파일 ("a" mode) 외에 sidecar lock 파일 ("a" mode lock_fh) 사용."""
    src = (WORKSPACE_ROOT / "anu_v2" / "owner_trigger_audit.py").read_text(encoding="utf-8")
    # transaction 컨텍스트 + sidecar lock 사용 흔적
    assert "def transaction" in src
    assert "lock_path" in src
    assert "fcntl.LOCK_EX" in src
    # audit JSONL 본 파일은 "a" mode 만
    assert 'open(self._path, "a"' in src
    # forbidden mode 0
    for forbidden in ('open(self._path, "w"', 'open(self._path, "r+"', 'open(self._path, "a+"'):
        assert forbidden not in src


def test_owner_trigger_only_uses_audit_transaction():
    """owner_trigger_only.py 가 audit.transaction() context 를 사용 (race fix)."""
    src = (WORKSPACE_ROOT / "anu_v2" / "owner_trigger_only.py").read_text(encoding="utf-8")
    assert "self._audit.transaction()" in src
    # check_dedupe 가 transaction 안에 위치 — txn.check_dedupe 호출
    assert "txn.check_dedupe" in src
    # http_post 호출이 lock 영역 안 — txn.record 가 POSTED / FAILED / DEDUPED 모두 명시
    assert "RESULT_POSTED" in src
    assert "RESULT_FAILED" in src
    assert "RESULT_DEDUPED" in src


# ─── PR #105 Gemini follow-up — record() 의 2 단 lock + atomic re-check ──────


def test_transaction_record_double_lock_blocks_legacy_append_race(tmp_path):
    """transaction.record(POSTED) 가 audit 본 파일 LOCK_EX 도 잡아 legacy append() 와 직렬화.

    회귀 시나리오: sidecar lock 보호 외부에서 직접 ``OwnerTriggerAudit.append(POSTED)`` 가
    먼저 들어왔을 때 transaction.record(POSTED) 가 dedupe re-check 로 차단.
    """
    audit = OwnerTriggerAudit(tmp_path)
    # 1) 외부 (legacy) append 가 먼저 POSTED 한 줄 기록
    audit.append(
        {
            "task_id": "task-2554+1",
            "pr": 103,
            "head": _HEAD_A,
            "action": ALLOWED_ACTION,
            "result": RESULT_POSTED,
            "comment_body": "/gemini review",
            "endpoint": "/repos/o/r/issues/103/comments",
            "decision_path": "decision.json",
            "token_present": True,
            "token_hash_prefix": "deadbeef",
        }
    )
    # 2) 같은 (pr, head) 로 transaction.record(POSTED) 시도 → atomic re-check 가 차단
    with audit.transaction() as txn:
        with pytest.raises(DedupeViolation):
            txn.record(
                {
                    "task_id": "task-2554+1",
                    "pr": 103,
                    "head": _HEAD_A,
                    "action": ALLOWED_ACTION,
                    "result": RESULT_POSTED,
                    "comment_body": "/gemini review",
                    "endpoint": "/repos/o/r/issues/103/comments",
                    "decision_path": "decision.json",
                    "token_present": True,
                    "token_hash_prefix": "deadbeef",
                }
            )


def test_audit_source_record_uses_audit_file_flock_ex():
    """정적 검사: record 메서드가 audit 본 파일 fcntl.flock(LOCK_EX) 를 사용."""
    src = (WORKSPACE_ROOT / "anu_v2" / "owner_trigger_audit.py").read_text(encoding="utf-8")
    # _AtomicTriggerTransaction.record 내부 흔적
    assert "transaction atomic dedupe blocked" in src
    # fcntl.LOCK_EX 패턴은 sidecar transaction() + append() + record() 3 회 등장
    assert src.count("fcntl.LOCK_EX") >= 3


# ─── PR #105 Gemini follow-up — _normalise_head 40-char hex 검증 ─────────────


def test_normalise_head_rejects_non_hex_40_char_string():
    """_normalise_head 가 40-char 길이지만 비-hex 문자가 있으면 ValueError."""
    from anu_v2.owner_trigger_audit import _normalise_head

    with pytest.raises(ValueError):
        _normalise_head("z" * 40)


def test_normalise_head_rejects_short_or_long_strings():
    """길이가 40 이 아니면 ValueError."""
    from anu_v2.owner_trigger_audit import _normalise_head

    with pytest.raises(ValueError):
        _normalise_head("abc")
    with pytest.raises(ValueError):
        _normalise_head("a" * 39)
    with pytest.raises(ValueError):
        _normalise_head("a" * 41)


def test_normalise_head_accepts_valid_hex_and_lowercases():
    from anu_v2.owner_trigger_audit import _normalise_head

    head = "ABCDEF" + "0" * 34
    assert _normalise_head(head) == "abcdef" + "0" * 34
