"""anu_v2.tests.test_owner_trigger_dedupe_2554 — audit append-only + atomic dedupe 회귀 (task-2554).

회장 §명시 14장 §8 1:1:
  - same pr + same head + action=POST_GEMINI_REVIEW_TRIGGER_COMMENT + result=POSTED → 차단
  - audit JSONL append-only (``open("a")`` 강제)
  - atomic fcntl.flock
  - head 변경 시 stale reset

system spec §10 tests #6, #12, #13 (3건) + 본 파일 추가 dedupe 회귀.
"""

from __future__ import annotations

import json
import sys
import threading
from pathlib import Path

import pytest

WORKSPACE_ROOT = Path(__file__).resolve().parents[2]
if str(WORKSPACE_ROOT) not in sys.path:
    sys.path.insert(0, str(WORKSPACE_ROOT))

from anu_v2.owner_trigger_audit import (  # noqa: E402
    AUDIT_REL_PATH,
    AUDIT_SCHEMA,
    AuditRedactionError,
    DedupeViolation,
    OwnerTriggerAudit,
    token_hash_prefix,
)


_HEAD_A = "a" * 40
_HEAD_B = "b" * 40


def _row(pr: int, head: str, *, result: str = "POSTED") -> dict:
    return {
        "task_id": "task-2554",
        "pr": pr,
        "head": head,
        "action": "POST_GEMINI_REVIEW_TRIGGER_COMMENT",
        "result": result,
        "comment_body": "/gemini review",
        "endpoint": f"/repos/o/r/issues/{pr}/comments",
        "decision_path": "decision.json",
        "token_present": True,
        "token_hash_prefix": "deadbeef",
    }


# ─── §10-12: audit JSONL append-only 확인 ────────────────────────────────────

def test_audit_path_under_workspace_root(tmp_path):
    audit = OwnerTriggerAudit(tmp_path)
    assert audit.path == tmp_path / AUDIT_REL_PATH


def test_append_writes_jsonl_line_with_required_fields(tmp_path):
    audit = OwnerTriggerAudit(tmp_path)
    audit.append(_row(103, _HEAD_A))
    lines = [json.loads(line) for line in audit.path.read_text(encoding="utf-8").splitlines() if line]
    assert len(lines) == 1
    rec = lines[0]
    assert rec["schema"] == AUDIT_SCHEMA
    assert rec["pr"] == 103
    assert rec["head"] == _HEAD_A
    assert rec["action"] == "POST_GEMINI_REVIEW_TRIGGER_COMMENT"
    assert rec["result"] == "POSTED"
    assert rec["token_value_logged"] is False
    assert "ts" in rec


def test_append_only_no_truncate(tmp_path):
    audit = OwnerTriggerAudit(tmp_path)
    audit.append(_row(103, _HEAD_A))
    audit.append(_row(104, _HEAD_B))
    # 두 줄 모두 보존
    raw = audit.path.read_text(encoding="utf-8")
    assert raw.count("\n") == 2
    rows = [json.loads(l) for l in raw.splitlines() if l]
    assert {r["pr"] for r in rows} == {103, 104}


def test_open_mode_is_append_only_static():
    """source 정적 검사: ``open(.., "a")`` 만 사용. ``"w"`` / ``"r+"`` / ``"a+"`` 0건."""
    audit_src = (WORKSPACE_ROOT / "anu_v2" / "owner_trigger_audit.py").read_text(encoding="utf-8")
    # write-mode 가 audit append 함수 내부에서 사용되지 않음을 단순 grep 검사
    assert 'open(self._path, "a"' in audit_src
    forbidden_modes = ['open(self._path, "w"', 'open(self._path, "r+"', 'open(self._path, "a+"', 'open(self._path, "w+"']
    for f in forbidden_modes:
        assert f not in audit_src, f"forbidden mode found in audit source: {f}"


# ─── §10-13: same head 중복 trigger 차단 ──────────────────────────────────────

def test_check_dedupe_blocks_same_pr_same_head(tmp_path):
    audit = OwnerTriggerAudit(tmp_path)
    audit.append(_row(103, _HEAD_A))
    with pytest.raises(DedupeViolation):
        audit.check_dedupe(pr=103, head=_HEAD_A)


def test_check_dedupe_allows_after_head_change_stale_reset(tmp_path):
    audit = OwnerTriggerAudit(tmp_path)
    audit.append(_row(103, _HEAD_A))
    # 같은 PR 이지만 새 head — stale reset
    audit.check_dedupe(pr=103, head=_HEAD_B)


def test_check_dedupe_allows_different_pr(tmp_path):
    audit = OwnerTriggerAudit(tmp_path)
    audit.append(_row(103, _HEAD_A))
    audit.check_dedupe(pr=104, head=_HEAD_A)


def test_check_dedupe_does_not_block_failed_attempts(tmp_path):
    audit = OwnerTriggerAudit(tmp_path)
    # result=DEDUPED/FAILED 는 dedupe 차단 사유가 아님 (POSTED 만)
    audit.append(_row(103, _HEAD_A, result="DEDUPED"))
    audit.check_dedupe(pr=103, head=_HEAD_A)  # raise X


# ─── atomic re-check inside append (concurrency simulation) ──────────────────

def test_atomic_re_check_inside_append_blocks_race(tmp_path):
    """append() 내부 두 번째 dedupe check (lock 보호) — race 시 같은 (pr, head) 두 번 기록 못함."""
    audit = OwnerTriggerAudit(tmp_path)

    # 사전에 POSTED 한 줄 기록
    audit.append(_row(103, _HEAD_A))
    # 같은 (pr, head, action, result) 두 번째 append 시도 → atomic check가 차단
    with pytest.raises(DedupeViolation):
        audit.append(_row(103, _HEAD_A))


def test_concurrent_appends_only_one_succeeds(tmp_path):
    """동시 두 thread 가 같은 (pr, head) POSTED append 시도 → 정확히 하나만 성공."""
    audit = OwnerTriggerAudit(tmp_path)

    results = []
    lock = threading.Lock()

    def worker():
        try:
            audit.append(_row(103, _HEAD_A))
            with lock:
                results.append("OK")
        except DedupeViolation:
            with lock:
                results.append("DEDUPED")

    threads = [threading.Thread(target=worker) for _ in range(8)]
    for t in threads:
        t.start()
    for t in threads:
        t.join()
    # 정확히 1건만 OK, 나머지는 DEDUPED
    assert results.count("OK") == 1
    assert results.count("DEDUPED") == 7


# ─── token redaction in audit ────────────────────────────────────────────────

def test_audit_rejects_raw_token_sentinel(tmp_path):
    audit = OwnerTriggerAudit(tmp_path)
    bad = _row(103, _HEAD_A)
    bad["token_hash_prefix"] = "ghp_realtoken_should_not_leak"  # sentinel 포함
    with pytest.raises(AuditRedactionError):
        audit.append(bad)


def test_audit_rejects_extra_keys(tmp_path):
    audit = OwnerTriggerAudit(tmp_path)
    bad = _row(103, _HEAD_A)
    bad["leaked_secret"] = "anything"
    with pytest.raises(AuditRedactionError):
        audit.append(bad)


def test_audit_rejects_token_value_logged_true(tmp_path):
    audit = OwnerTriggerAudit(tmp_path)
    bad = _row(103, _HEAD_A)
    bad["token_value_logged"] = True
    with pytest.raises(AuditRedactionError):
        audit.append(bad)


def test_token_hash_prefix_is_sha256_first_8_chars():
    t = "any-secret"
    p = token_hash_prefix(t)
    import hashlib

    assert p == hashlib.sha256(t.encode()).hexdigest()[:8]
    assert len(p) == 8


def test_token_hash_prefix_empty_token_raises():
    with pytest.raises(ValueError):
        token_hash_prefix("")
