"""anu_v2.tests.test_owner_trigger_concurrency_2554plus1 — 동시 trigger race 차단 (task-2554+1).

회장 §명시 (2026-05-12 KST) 필수 fix §6 1:1 박제:
  - same PR/head 동시 2 process (or thread) → comment 정확히 1 회 생성 어셀션.
  - HIGH race condition (anu_v2/owner_trigger_only.py:352 baseline) 제거 회귀.

본 파일은 sidecar lock + transaction 으로 직렬화되는 trigger 흐름이 실제 동시 호출에서도
정확히 1회만 http_post 를 발생시키는지 검증한다. dry-run/mock 만 — actual OWNER token /
GitHub comment 작성 0.

implementation note:
  - thread 기반: threading.Thread + Barrier 로 두 worker 가 같은 순간에 trigger 시도.
  - process 기반: multiprocessing.Process — fcntl.flock 은 process 간에도 보호되므로
    별도 process 테스트도 포함.

본 회귀는 anu_v2/* 모듈만 import 한다 (one-way isolation).
"""

from __future__ import annotations

import json
import multiprocessing
import os
import sys
import threading
import time
from pathlib import Path

WORKSPACE_ROOT = Path(__file__).resolve().parents[2]
if str(WORKSPACE_ROOT) not in sys.path:
    sys.path.insert(0, str(WORKSPACE_ROOT))

from anu_v2.owner_trigger_audit import (  # noqa: E402
    OwnerTriggerAudit,
    RESULT_DEDUPED,
    RESULT_POSTED,
)
from anu_v2.owner_trigger_only import OwnerTriggerOnly  # noqa: E402


_HEAD_A = "a" * 40


def _write_decision(tmp_path: Path, *, pr: int = 103, head: str = _HEAD_A) -> Path:
    d = {
        "schema": "anu_v2.owner_trigger_decision.v1",
        "task_id": "task-2554+1",
        "pr": pr,
        "current_head": head,
        "queue_head": True,
        "current_head_confirmed": True,
        "gemini_evidence_fresh": False,
        "nudge_count_for_pr_head": 0,
        "allowed_action": "POST_GEMINI_REVIEW_TRIGGER_COMMENT",
        "comment_body": "/gemini review",
        "allowed": True,
    }
    tmp_path.mkdir(parents=True, exist_ok=True)
    p = tmp_path / "decision.json"
    p.write_text(json.dumps(d), encoding="utf-8")
    return p


# ─── thread-based concurrency: same PR/head 동시 2 thread → comment 1 회 ────


def test_same_pr_head_concurrent_two_threads_one_post_only(tmp_path):
    """2 thread 가 같은 (pr, head) trigger_gemini_review 호출 → http_post 정확히 1 회."""
    decision_path = _write_decision(tmp_path)
    audit = OwnerTriggerAudit(tmp_path)

    posts: list[dict] = []
    posts_lock = threading.Lock()

    def http_post(method, path, body, headers):
        # 의도적 짧은 sleep — race 가능성을 키운다 (lock 없으면 두 thread 모두 통과).
        time.sleep(0.02)
        with posts_lock:
            posts.append({"method": method, "path": path, "body": body})
        return {"status": 201}

    def token_provider() -> str:
        return "concurrency-test-tok-MUST-NOT-LEAK"

    def make_module():
        return OwnerTriggerOnly(
            workspace_root=tmp_path,
            http_post=http_post,
            token_provider=token_provider,
            audit=audit,  # same audit object — share sidecar lock file
        )

    barrier = threading.Barrier(2)
    results: list[str] = []
    errors: list[BaseException] = []
    results_lock = threading.Lock()

    def worker():
        mod = make_module()
        try:
            barrier.wait()
            r = mod.trigger_gemini_review(
                decision_path=decision_path,
                owner="o",
                repo="r",
                current_head_actual=_HEAD_A,
            )
            with results_lock:
                results.append(r.status)
        except BaseException as e:  # pragma: no cover — diagnostic only
            with results_lock:
                errors.append(e)

    threads = [threading.Thread(target=worker) for _ in range(2)]
    for t in threads:
        t.start()
    for t in threads:
        t.join(timeout=10)
    assert errors == [], f"unexpected errors: {errors!r}"
    # 정확히 1회 POSTED, 1회 DEDUPED
    assert results.count(RESULT_POSTED) == 1
    assert results.count(RESULT_DEDUPED) == 1
    # http_post 정확히 1회 — race fix 핵심 어셀션
    assert len(posts) == 1, f"expected exactly 1 http_post call, got {len(posts)}"


def test_same_pr_head_8_threads_only_one_posts(tmp_path):
    """8 thread 동시 trigger → POSTED 1회, DEDUPED 7회, http_post 1회."""
    decision_path = _write_decision(tmp_path)
    audit = OwnerTriggerAudit(tmp_path)

    posts: list[dict] = []
    posts_lock = threading.Lock()

    def http_post(method, path, body, headers):
        time.sleep(0.01)
        with posts_lock:
            posts.append({"method": method, "path": path})
        return {"status": 201}

    barrier = threading.Barrier(8)
    results: list[str] = []
    results_lock = threading.Lock()

    def worker():
        mod = OwnerTriggerOnly(
            workspace_root=tmp_path,
            http_post=http_post,
            token_provider=lambda: "any-token",
            audit=audit,
        )
        barrier.wait()
        r = mod.trigger_gemini_review(
            decision_path=decision_path,
            owner="o",
            repo="r",
            current_head_actual=_HEAD_A,
        )
        with results_lock:
            results.append(r.status)

    threads = [threading.Thread(target=worker) for _ in range(8)]
    for t in threads:
        t.start()
    for t in threads:
        t.join(timeout=15)
    assert results.count(RESULT_POSTED) == 1
    assert results.count(RESULT_DEDUPED) == 7
    assert len(posts) == 1


def test_different_pr_concurrent_both_post(tmp_path):
    """다른 (pr, head) 동시 trigger → 각자 POSTED (서로 dedupe 안함)."""
    audit = OwnerTriggerAudit(tmp_path)
    decision_a = _write_decision(tmp_path / "a", pr=103, head=_HEAD_A)
    decision_b_path = tmp_path / "b" / "decision.json"
    decision_b_path.parent.mkdir(parents=True)
    decision_b_path.write_text(
        json.dumps(
            {
                "schema": "anu_v2.owner_trigger_decision.v1",
                "task_id": "task-2554+1",
                "pr": 200,
                "current_head": "b" * 40,
                "queue_head": True,
                "current_head_confirmed": True,
                "gemini_evidence_fresh": False,
                "nudge_count_for_pr_head": 0,
                "allowed_action": "POST_GEMINI_REVIEW_TRIGGER_COMMENT",
                "comment_body": "/gemini review",
                "allowed": True,
            }
        ),
        encoding="utf-8",
    )

    posts: list[dict] = []
    posts_lock = threading.Lock()

    def http_post(method, path, body, headers):
        time.sleep(0.01)
        with posts_lock:
            posts.append({"path": path})
        return {"status": 201}

    barrier = threading.Barrier(2)
    results: list[str] = []
    results_lock = threading.Lock()

    def worker(decision_path, head, pr_num):
        mod = OwnerTriggerOnly(
            workspace_root=tmp_path,
            http_post=http_post,
            token_provider=lambda: "t",
            audit=audit,
        )
        barrier.wait()
        r = mod.trigger_gemini_review(
            decision_path=decision_path,
            owner="o",
            repo="r",
            current_head_actual=head,
        )
        with results_lock:
            results.append(r.status)
        assert r.pr == pr_num

    t1 = threading.Thread(target=worker, args=(decision_a, _HEAD_A, 103))
    t2 = threading.Thread(target=worker, args=(decision_b_path, "b" * 40, 200))
    t1.start()
    t2.start()
    t1.join(timeout=10)
    t2.join(timeout=10)
    assert results.count(RESULT_POSTED) == 2
    assert len(posts) == 2


# ─── process-based concurrency: fcntl.flock cross-process ────────────────────


def _process_worker(barrier_path: str, decision_path: str, audit_root: str, output_queue):
    """별도 process 에서 trigger_gemini_review 실행. 결과 (status, http_post_call_marker) 를
    큐에 push.

    barrier_path: 두 process 가 동시에 진입하도록 파일 기반 barrier.
    """
    import sys
    sys.path.insert(0, str(audit_root))
    from anu_v2.owner_trigger_audit import OwnerTriggerAudit
    from anu_v2.owner_trigger_only import OwnerTriggerOnly

    audit_obj = OwnerTriggerAudit(audit_root)

    # 마커 파일 — http_post 가 호출됐는지 디스크 기반 표식
    marker_dir = Path(audit_root) / "_post_markers"
    marker_dir.mkdir(parents=True, exist_ok=True)

    def http_post(method, path, body, headers):
        # 각 process 가 자신의 PID 마커 파일을 디스크에 남긴다
        marker = marker_dir / f"post-pid{os.getpid()}.marker"
        marker.write_text("posted", encoding="utf-8")
        # 짧은 sleep — race 가능성 키우기
        time.sleep(0.02)
        return {"status": 201}

    # 파일 기반 barrier — 두 process 가 거의 동시에 trigger 호출
    Path(barrier_path).touch()
    # spin wait until both processes have touched the barrier
    deadline = time.time() + 5
    while time.time() < deadline:
        # 두 process 모두 barrier 등록될 때까지 대기 (외부 조정자가 barrier_path 옆에 ready 파일 생성)
        if (Path(barrier_path).parent / "ready").exists():
            break
        time.sleep(0.005)

    mod = OwnerTriggerOnly(
        workspace_root=audit_root,
        http_post=http_post,
        token_provider=lambda: "proc-token",
        audit=audit_obj,
    )
    try:
        r = mod.trigger_gemini_review(
            decision_path=decision_path,
            owner="o",
            repo="r",
            current_head_actual=_HEAD_A,
        )
        output_queue.put(("ok", r.status, os.getpid()))
    except Exception as e:  # pragma: no cover — diagnostic only
        output_queue.put(("err", str(e), os.getpid()))


def test_same_pr_head_concurrent_two_processes_one_post_only(tmp_path):
    """별도 2 process 가 같은 (pr, head) trigger → http_post 마커 정확히 1 개 (fcntl.flock 보호).

    spec §6 1:1: same PR/head 동시 2 proc → comment 1회.
    """
    decision_path = _write_decision(tmp_path)

    barrier_root = tmp_path / "_barrier"
    barrier_root.mkdir(parents=True, exist_ok=True)
    barrier_file_a = str(barrier_root / "a.barrier")
    barrier_file_b = str(barrier_root / "b.barrier")
    ready_file = barrier_root / "ready"

    ctx = multiprocessing.get_context("fork")
    queue = ctx.Queue()

    p1 = ctx.Process(target=_process_worker, args=(barrier_file_a, str(decision_path), str(tmp_path), queue))
    p2 = ctx.Process(target=_process_worker, args=(barrier_file_b, str(decision_path), str(tmp_path), queue))
    p1.start()
    p2.start()

    # 두 process 가 barrier_file_*.barrier 파일을 만들 때까지 대기 후 ready 표식
    deadline = time.time() + 5
    while time.time() < deadline:
        if Path(barrier_file_a).exists() and Path(barrier_file_b).exists():
            ready_file.touch()
            break
        time.sleep(0.005)

    p1.join(timeout=15)
    p2.join(timeout=15)
    assert p1.exitcode == 0, f"process 1 exit {p1.exitcode}"
    assert p2.exitcode == 0, f"process 2 exit {p2.exitcode}"

    results = []
    while not queue.empty():
        results.append(queue.get())
    assert len(results) == 2
    statuses = sorted([r[1] for r in results if r[0] == "ok"])
    assert statuses == [RESULT_DEDUPED, RESULT_POSTED], f"got {statuses!r}"

    # 디스크 마커 — http_post 실 호출 정확히 1 개
    marker_dir = tmp_path / "_post_markers"
    markers = sorted(marker_dir.glob("post-pid*.marker"))
    assert len(markers) == 1, f"expected exactly 1 http_post marker (one process posted), got {len(markers)}"


# ─── 보안 sanity: concurrent 호출에서도 token leak 0 ─────────────────────────


def test_concurrent_trigger_no_token_leak_to_audit(tmp_path):
    """8 thread 동시 trigger 후 audit jsonl 에 token sentinel 0건."""
    decision_path = _write_decision(tmp_path)
    audit = OwnerTriggerAudit(tmp_path)
    secret = "ghp_CONCURRENCY_RACE_SECRET_MUST_NEVER_LEAK_qwerty"

    def http_post(method, path, body, headers):
        time.sleep(0.005)
        return {"status": 201}

    barrier = threading.Barrier(8)

    def worker():
        mod = OwnerTriggerOnly(
            workspace_root=tmp_path,
            http_post=http_post,
            token_provider=lambda: secret,
            audit=audit,
        )
        barrier.wait()
        try:
            mod.trigger_gemini_review(
                decision_path=decision_path,
                owner="o",
                repo="r",
                current_head_actual=_HEAD_A,
            )
        except Exception:
            pass

    threads = [threading.Thread(target=worker) for _ in range(8)]
    for t in threads:
        t.start()
    for t in threads:
        t.join(timeout=15)

    raw = audit.path.read_text(encoding="utf-8")
    assert secret not in raw
    for sent in ("Bearer ", "ghp_", "github_pat_"):
        assert sent not in raw, f"audit contains sentinel {sent!r}"
