"""task-2556 §10 — duplicate same-head dedupe (fcntl atomic) 회귀.

회장 §명시 2026-05-12 §10:
  "duplicate same-head dedupe — fcntl.flock 기반 atomic, 이미 trigger 된 head 재호출 차단"

검증 포인트:
  1. audit 에 POSTED 가 이미 있는 (pr, head) 에 대해 scheduler 가 SAME_HEAD_DEDUPED 로 처리.
  2. http_post 호출 0 (재발사 차단).
  3. audit 에 PENDING (이전 crash) 가 있어도 dedupe (fail-closed).
  4. head 가 바뀌면 새 trigger 허용 (이전 record 무시).
  5. fixture (executor_duplicate_dedupe.json) 결과 어셀션.
  6. scheduler 가 fcntl lock 을 통해 동시 진입 차단 (lock file 존재 확인).
"""

from __future__ import annotations

import json
import sys
from pathlib import Path


WORKSPACE_ROOT = Path(__file__).resolve().parents[2]
if str(WORKSPACE_ROOT) not in sys.path:
    sys.path.insert(0, str(WORKSPACE_ROOT))

from anu_v2.executor_scheduler import (  # noqa: E402
    ACTION_OWNER_TRIGGER_DISPATCHED,
    ACTION_SAME_HEAD_DEDUPED,
    ExecutorScheduler,
    SCHEDULER_LOCK_REL_PATH,
)
from anu_v2.idle_pr_diagnoser import (  # noqa: E402
    GeminiReviewMeta,
    IdlePRSnapshot,
)
from anu_v2.merge_queue_executor import MergeQueueExecutor  # noqa: E402
from anu_v2.owner_trigger_audit import (  # noqa: E402
    AUDIT_REL_PATH,
    OwnerTriggerAudit,
    RESULT_PENDING,
    RESULT_POSTED,
)
from anu_v2.owner_trigger_only import OwnerTriggerOnly  # noqa: E402


_HEAD = "c" * 40
_HEAD_NEW = "d" * 40


def _snapshot(number=110, head=_HEAD):
    return IdlePRSnapshot(
        number=number,
        head_sha=head,
        head_ref="task/task-2556-dev5",
        created_at="2026-05-12T10:00:00+00:00",
        gemini_reviews=(),
        ci_required_all_success=True,
    )


def _make_scheduler(tmp_path: Path, snapshots, *, http_calls: list):
    def http_post(method, path, body, headers):
        http_calls.append({"path": path, "body": body})
        return {"id": 1}
    audit = OwnerTriggerAudit(tmp_path)
    runner = OwnerTriggerOnly(
        workspace_root=tmp_path,
        http_post=http_post,
        token_provider=lambda: "ghp_owner_fake_xxxxxxxxxxxxxxxxxxxx",
        audit=audit,
    )
    return ExecutorScheduler(
        workspace_root=tmp_path,
        decision_dir=tmp_path / "memory" / "events",
        snapshot_provider=lambda: snapshots,
        owner_trigger=runner,
        merge_executor=MergeQueueExecutor(
            gh_runner=lambda a, e: {},
            git_runner=lambda a: "",
            pytest_runner=lambda p: 0,
            audit_writer=lambda p: None,
            task_md_root=tmp_path,
        ),
        owner="o",
        repo="r",
    )


def _seed_audit_posted(tmp_path: Path, *, pr: int, head: str) -> None:
    """audit JSONL 에 기존 POSTED 한 줄 직접 seed (race simulation)."""
    audit_path = tmp_path / AUDIT_REL_PATH
    audit_path.parent.mkdir(parents=True, exist_ok=True)
    rec = {
        "schema": "anu_v2.owner_trigger_audit.v1",
        "ts": "2026-05-12T10:40:00+00:00",
        "task_id": "task-2556",
        "pr": pr,
        "head": head,
        "action": "POST_GEMINI_REVIEW_TRIGGER_COMMENT",
        "result": RESULT_POSTED,
        "comment_body": "/gemini review",
        "endpoint": f"/repos/o/r/issues/{pr}/comments",
        "decision_path": "/tmp/decision.json",
        "token_present": True,
        "token_hash_prefix": "deadbeef",
        "token_value_logged": False,
    }
    with open(audit_path, "a", encoding="utf-8") as fh:
        fh.write(json.dumps(rec, ensure_ascii=False, sort_keys=True) + "\n")


def _seed_audit_pending(tmp_path: Path, *, pr: int, head: str) -> None:
    """audit JSONL 에 기존 PENDING 한 줄 (crashed mid-flight)."""
    audit_path = tmp_path / AUDIT_REL_PATH
    audit_path.parent.mkdir(parents=True, exist_ok=True)
    rec = {
        "schema": "anu_v2.owner_trigger_audit.v1",
        "ts": "2026-05-12T10:40:00+00:00",
        "task_id": "task-2556",
        "pr": pr,
        "head": head,
        "action": "POST_GEMINI_REVIEW_TRIGGER_COMMENT",
        "result": RESULT_PENDING,
        "comment_body": "/gemini review",
        "endpoint": f"/repos/o/r/issues/{pr}/comments",
        "decision_path": "/tmp/decision.json",
        "token_present": True,
        "token_hash_prefix": "deadbeef",
        "token_value_logged": False,
    }
    with open(audit_path, "a", encoding="utf-8") as fh:
        fh.write(json.dumps(rec, ensure_ascii=False, sort_keys=True) + "\n")


def test_audit_posted_same_head_blocks_rerun(tmp_path):
    _seed_audit_posted(tmp_path, pr=110, head=_HEAD)
    http_calls: list = []
    scheduler = _make_scheduler(tmp_path, [_snapshot(110, _HEAD)], http_calls=http_calls)
    result = scheduler.run_one_cycle(
        env={"OWNER_GEMINI_TRIGGER_TOKEN": "ghp_xxxxxxxxxxxxxxxxxxxxxxxx"},
        now="2026-05-12T11:00:00+00:00",
    )
    assert result.pr_actions[0].action == ACTION_SAME_HEAD_DEDUPED
    assert len(http_calls) == 0


def test_audit_pending_same_head_blocks_rerun_fail_closed(tmp_path):
    """PENDING (crashed mid-flight) 도 dedupe — fail-closed."""
    _seed_audit_pending(tmp_path, pr=110, head=_HEAD)
    http_calls: list = []
    scheduler = _make_scheduler(tmp_path, [_snapshot(110, _HEAD)], http_calls=http_calls)
    result = scheduler.run_one_cycle(
        env={"OWNER_GEMINI_TRIGGER_TOKEN": "ghp_xxxxxxxxxxxxxxxxxxxxxxxx"},
        now="2026-05-12T11:00:00+00:00",
    )
    assert result.pr_actions[0].action == ACTION_SAME_HEAD_DEDUPED
    assert len(http_calls) == 0


def test_new_head_after_old_posted_allows_new_trigger(tmp_path):
    """head 가 바뀌면 새 trigger 허용 (회장 §4 박제 보완)."""
    _seed_audit_posted(tmp_path, pr=110, head=_HEAD)
    http_calls: list = []
    # 동일 PR 110 이지만 head 가 _HEAD_NEW 로 변경됨
    scheduler = _make_scheduler(tmp_path, [_snapshot(110, _HEAD_NEW)], http_calls=http_calls)
    result = scheduler.run_one_cycle(
        env={"OWNER_GEMINI_TRIGGER_TOKEN": "ghp_xxxxxxxxxxxxxxxxxxxxxxxx"},
        now="2026-05-12T11:00:00+00:00",
    )
    assert result.pr_actions[0].action == ACTION_OWNER_TRIGGER_DISPATCHED
    assert result.pr_actions[0].head_sha == _HEAD_NEW
    assert len(http_calls) == 1


def test_dedupe_fixture_matches_expected_outcome(tmp_path):
    fixture_path = (
        Path(__file__).resolve().parents[1] / "fixtures" / "executor_duplicate_dedupe.json"
    )
    fixture = json.loads(fixture_path.read_text(encoding="utf-8"))
    audit_path = tmp_path / AUDIT_REL_PATH
    audit_path.parent.mkdir(parents=True, exist_ok=True)
    for rec in fixture["preexisting_audit_records"]:
        with open(audit_path, "a", encoding="utf-8") as fh:
            fh.write(json.dumps(rec, ensure_ascii=False, sort_keys=True) + "\n")
    snap_dict = fixture["snapshot"]
    snap = IdlePRSnapshot(
        number=snap_dict["number"],
        head_sha=snap_dict["head_sha"],
        head_ref=snap_dict["head_ref"],
        created_at=snap_dict["created_at"],
        gemini_reviews=(),
        ci_required_all_success=snap_dict["ci_required_all_success"],
    )
    http_calls: list = []
    scheduler = _make_scheduler(tmp_path, [snap], http_calls=http_calls)
    result = scheduler.run_one_cycle(
        env={"OWNER_GEMINI_TRIGGER_TOKEN": "ghp_xxxxxxxxxxxxxxxxxxxxxxxx"},
        now=fixture["now"],
    )
    assert result.pr_actions[0].action == fixture["expected_scheduler_action"]
    assert len(http_calls) == fixture["expected_http_post_calls"]


def test_scheduler_creates_fcntl_lock_file(tmp_path):
    """fcntl-backed scheduler lock 파일이 cycle 실행 시 생성됨."""
    http_calls: list = []
    scheduler = _make_scheduler(tmp_path, [_snapshot()], http_calls=http_calls)
    # 1st cycle (dispatches owner trigger — head still fresh w/ no reviews)
    scheduler.run_one_cycle(
        env={"OWNER_GEMINI_TRIGGER_TOKEN": "ghp_xxxxxxxxxxxxxxxxxxxxxxxx"},
        now="2026-05-12T11:00:00+00:00",
    )
    lock_path = tmp_path / SCHEDULER_LOCK_REL_PATH
    assert lock_path.exists()


def test_same_cycle_same_pr_no_double_dispatch(tmp_path):
    """동일 cycle 안에서 동일 PR snapshot 2 개가 들어와도 첫 번째는 DISPATCH, 두 번째는 DEDUPED."""
    http_calls: list = []
    snap = _snapshot(number=199, head="9" * 40)
    # 같은 PR 가 두번 들어가는 비정상 snapshot list (defensive)
    scheduler = _make_scheduler(tmp_path, [snap, snap], http_calls=http_calls)
    result = scheduler.run_one_cycle(
        env={"OWNER_GEMINI_TRIGGER_TOKEN": "ghp_xxxxxxxxxxxxxxxxxxxxxxxx"},
        now="2026-05-12T11:00:00+00:00",
    )
    actions = result.pr_actions
    assert len(actions) == 2
    # 첫 번째는 DISPATCHED, 두 번째는 PENDING 또는 POSTED audit 기록을 보고 SAME_HEAD_DEDUPED
    assert actions[0].action == ACTION_OWNER_TRIGGER_DISPATCHED
    assert actions[1].action == ACTION_SAME_HEAD_DEDUPED
    # http_post 는 정확히 1 번
    assert len(http_calls) == 1
