"""task-2556 §11 / §4 — head 변경 후 FAILED (race condition fail-closed) 회귀.

회장 §명시 2026-05-12:
  §4 scheduler 가 trigger 호출 직전에 PR head 가 변경되면 race condition.
  §11 token boundary + decision schema 검증 — current_head_actual 가 decision.current_head 와
       불일치하면 DecisionInvalidError(E_HEAD_MISMATCH) → owner trigger FAILED.

검증 포인트:
  1. snapshot 진단 후 trigger 호출 직전에 head 가 바뀌면 owner_trigger 가 fail-closed.
  2. scheduler 는 OWNER_TRIGGER_FAILED marker 생성.
  3. http_post 호출 0.
  4. audit JSONL 에 POSTED 기록 없음.
  5. fixture (executor_head_mismatch.json) 결과 어셀션.
  6. decision.json 은 생성됨 (스냅샷 head 기준).
"""

from __future__ import annotations

import json
import sys
from pathlib import Path


WORKSPACE_ROOT = Path(__file__).resolve().parents[2]
if str(WORKSPACE_ROOT) not in sys.path:
    sys.path.insert(0, str(WORKSPACE_ROOT))

from anu_v2.executor_scheduler import (  # noqa: E402
    ACTION_OWNER_TRIGGER_FAILED,
    ExecutorScheduler,
)
from anu_v2.idle_pr_diagnoser import (  # noqa: E402
    IdlePRDiagnoser,
    IdlePRSnapshot,
    STATE_FIRST_GEMINI_TRIGGER_MISSING,
)
from anu_v2.merge_queue_executor import MergeQueueExecutor  # noqa: E402
from anu_v2.owner_trigger_audit import (  # noqa: E402
    AUDIT_REL_PATH,
    OwnerTriggerAudit,
    RESULT_POSTED,
)
from anu_v2.owner_trigger_decision import DecisionInvalidError  # noqa: E402
from anu_v2.owner_trigger_only import (  # noqa: E402
    OwnerTriggerOnly,
    invoke_from_scheduler,
)


_HEAD_DECIDED = "d" * 40
_HEAD_ACTUAL_DIFFERENT = "e" * 40


def _snapshot(number=111, head=_HEAD_DECIDED):
    return IdlePRSnapshot(
        number=number,
        head_sha=head,
        head_ref="task/task-2556-dev5",
        created_at="2026-05-12T10:00:00+00:00",
        gemini_reviews=(),
        ci_required_all_success=True,
    )


def _make_owner_trigger(tmp_path: Path, *, http_calls: list, token="ghp_owner_fake_xxxxxxxxxxxxxxxxxxxx"):
    def http_post(method, path, body, headers):
        http_calls.append({"path": path, "body": body})
        return {"id": 1}
    return OwnerTriggerOnly(
        workspace_root=tmp_path,
        http_post=http_post,
        token_provider=lambda: token,
        audit=OwnerTriggerAudit(tmp_path),
    )


def test_invoke_from_scheduler_head_mismatch_raises_decision_invalid(tmp_path):
    """invoke_from_scheduler 가 head_mismatch 시 DecisionInvalidError raise."""
    http_calls: list = []
    runner = _make_owner_trigger(tmp_path, http_calls=http_calls)

    # decision.json 직접 생성 (head = _HEAD_DECIDED)
    decision_dir = tmp_path / "events"
    decision_dir.mkdir(parents=True, exist_ok=True)
    decision_path = decision_dir / "task-2556.owner_trigger_decision.json"
    decision_path.write_text(
        json.dumps({
            "schema": "anu_v2.owner_trigger_decision.v1",
            "task_id": "task-2556",
            "pr": 111,
            "current_head": _HEAD_DECIDED,
            "queue_head": True,
            "current_head_confirmed": True,
            "gemini_evidence_fresh": False,
            "nudge_count_for_pr_head": 0,
            "allowed_action": "POST_GEMINI_REVIEW_TRIGGER_COMMENT",
            "comment_body": "/gemini review",
            "allowed": True,
        }, sort_keys=True),
        encoding="utf-8",
    )

    import pytest
    with pytest.raises(DecisionInvalidError) as excinfo:
        invoke_from_scheduler(
            runner,
            decision_path=decision_path,
            owner="o", repo="r",
            current_head_actual=_HEAD_ACTUAL_DIFFERENT,
        )
    assert excinfo.value.code == "E_HEAD_MISMATCH"
    assert len(http_calls) == 0


def test_scheduler_dispatches_owner_trigger_failed_when_decision_invalid(tmp_path):
    """scheduler 의 _dispatch_owner_trigger 가 DecisionInvalidError 를 catch 해
    OWNER_TRIGGER_FAILED marker 생성 후 SchedulerPRAction 반환.

    시나리오: emit_owner_trigger_decision 이 새 head=_HEAD_DECIDED 로 decision.json 생성.
    그러나 invoke_from_scheduler 의 current_head_actual 를 다른 값으로 강제하기 위해
    OwnerTriggerOnly 를 wrapping — 정상 path 에서는 mismatch 가 발생할 수 없지만
    race 조건 시뮬레이션을 위해 SnapshotProvider 가 한 head 를 반환하고 그 뒤 또 다른 head 가
    actual 로 들어가는 상황을 흉내내는 wrapper subclass 를 사용.
    """
    http_calls: list = []
    underlying = _make_owner_trigger(tmp_path, http_calls=http_calls)

    class HeadMutatingRunner(OwnerTriggerOnly):
        """trigger_gemini_review 호출 시 current_head_actual 를 강제로 변조해 mismatch 유발.

        실제 environment 에서는 scheduler 가 _dispatch_owner_trigger 에서 diag.head_sha 를
        current_head_actual 로 넘기므로 mismatch 가 발생하지 않지만, race 시뮬레이션을 위해
        wrapper 가 호출 직전에 _HEAD_ACTUAL_DIFFERENT 로 교체.
        """

        def trigger_gemini_review(self, *, decision_path, owner, repo,
                                   current_head_actual, env_override=None,
                                   comment_body=None):
            return super().trigger_gemini_review(
                decision_path=decision_path,
                owner=owner, repo=repo,
                current_head_actual=_HEAD_ACTUAL_DIFFERENT,  # race 시뮬
                env_override=env_override,
                comment_body=comment_body,
            )

    runner = HeadMutatingRunner(
        workspace_root=tmp_path,
        http_post=underlying._http_post,
        token_provider=underlying._token_provider,
        audit=underlying._audit,
    )

    scheduler = ExecutorScheduler(
        workspace_root=tmp_path,
        decision_dir=tmp_path / "memory" / "events",
        snapshot_provider=lambda: [_snapshot(111, _HEAD_DECIDED)],
        owner_trigger=runner,
        merge_executor=MergeQueueExecutor(
            gh_runner=lambda a, e: {},
            git_runner=lambda a: "",
            pytest_runner=lambda p: 0,
            audit_writer=lambda p: None,
            task_md_root=tmp_path,
        ),
        owner="o", repo="r",
    )
    result = scheduler.run_one_cycle(
        env={"OWNER_GEMINI_TRIGGER_TOKEN": "ghp_xxxxxxxxxxxxxxxxxxxxxxxx"},
        now="2026-05-12T11:00:00+00:00",
    )

    action = result.pr_actions[0]
    assert action.action == ACTION_OWNER_TRIGGER_FAILED
    assert action.state == STATE_FIRST_GEMINI_TRIGGER_MISSING

    # decision.json 은 생성됨 (snapshot head 기준)
    decision_path = tmp_path / "memory" / "events" / "task-2556.owner_trigger_decision.json"
    assert decision_path.exists()
    decision = json.loads(decision_path.read_text(encoding="utf-8"))
    assert decision["current_head"] == _HEAD_DECIDED

    # FAILED marker 생성됨
    failed_marker = tmp_path / "memory" / "events" / "task-2556.owner-trigger.failed"
    assert failed_marker.exists()

    # http_post 호출 0
    assert len(http_calls) == 0

    # audit JSONL 에 POSTED 0
    audit_path = tmp_path / AUDIT_REL_PATH
    if audit_path.exists():
        for line in audit_path.read_text(encoding="utf-8").splitlines():
            if not line:
                continue
            rec = json.loads(line)
            assert rec.get("result") != RESULT_POSTED, (
                f"audit unexpectedly contains POSTED record: {rec}"
            )


def test_head_mismatch_fixture_matches_expected(tmp_path):
    fixture_path = (
        Path(__file__).resolve().parents[1] / "fixtures" / "executor_head_mismatch.json"
    )
    fixture = json.loads(fixture_path.read_text(encoding="utf-8"))
    assert fixture["expected_owner_trigger_outcome"] == "DECISION_INVALID_E_HEAD_MISMATCH"
    assert fixture["expected_scheduler_action"] == ACTION_OWNER_TRIGGER_FAILED

    # diagnoser quick check
    snap_dict = fixture["snapshot"]
    snap = IdlePRSnapshot(
        number=snap_dict["number"],
        head_sha=snap_dict["head_sha"],
        head_ref=snap_dict["head_ref"],
        created_at=snap_dict["created_at"],
        gemini_reviews=(),
        ci_required_all_success=snap_dict["ci_required_all_success"],
    )
    diag = IdlePRDiagnoser().diagnose(snap, now="2026-05-12T12:00:00+00:00")
    # snapshot 단계에서는 FIRST_GEMINI_TRIGGER_MISSING (mismatch 는 invoke 시점에서만 발생)
    assert diag.state == STATE_FIRST_GEMINI_TRIGGER_MISSING
