# -*- coding: utf-8 -*-
"""tests/regression/test_pr_convergence_pipeline_2729.py — task-2729 Phase 2 회귀.

PR convergence pipeline 결함4·5·6·7 회귀 보호:
  - 결함7: live_prune (collect_and_prune success gating + summarize_live_prune)
  - 결함6: owner_gemini_trigger (fire_owner_gemini_review + _validate_head_lock + dedupe)
  - 결함5: base_sync_before_merge (merge 전 base sync 파이프라인)
  - 결함4: authoritative_completion_marker (scope evidence 3개 기반 authoritative completion)

구현 파일(utils/*, scripts/*)은 절대 수정하지 않는다.
fake remover / mock 주입 / 임시 registry 사용. 실 gh/subprocess/cokacdir 호출 0.
"""
from __future__ import annotations

import importlib.util
import pathlib
import sys
import tempfile
import types

# ---------------------------------------------------------------------------
# worktree root sys.path 보강 (test_progress_watcher_gate_2729.py 동일 패턴)
# ---------------------------------------------------------------------------
_ROOT = pathlib.Path(__file__).resolve().parents[2]
_ROOT_STR = str(_ROOT)
while _ROOT_STR in sys.path:
    sys.path.remove(_ROOT_STR)
sys.path.insert(0, _ROOT_STR)

# worktree 외부에서 먼저 로드된 dispatch/utils 패키지 캐시를 제거
for _name in list(sys.modules):
    if _name in ("dispatch", "utils") or _name.startswith("dispatch.") or _name.startswith("utils."):
        _mod = sys.modules[_name]
        _f = getattr(_mod, "__file__", None) or ""
        if not _f.startswith(_ROOT_STR):
            del sys.modules[_name]

# ---------------------------------------------------------------------------
# scripts/ci_watch_handoff_runner.py import (패키지 아님 → importlib.util)
# test_progress_watcher_gate_2729.py 와 동일 패턴
# ---------------------------------------------------------------------------
_RUNNER_PATH = _ROOT / "scripts" / "ci_watch_handoff_runner.py"
_spec = importlib.util.spec_from_file_location("ci_watch_handoff_runner", _RUNNER_PATH)
assert _spec is not None and _spec.loader is not None
runner_mod = importlib.util.module_from_spec(_spec)
sys.modules["ci_watch_handoff_runner"] = runner_mod
_spec.loader.exec_module(runner_mod)

# ---------------------------------------------------------------------------
# 대상 모듈 import
# ---------------------------------------------------------------------------
from utils.fallback_schedule_registry import register_fallback  # noqa: E402
from utils.completion_callback_fallback_cancel import (  # noqa: E402
    RemoverResult,
    CancelClassification,
    summarize_live_prune,
    PruneOutcome,
    PruneCause,
)
from utils.normal_completion_callback_collector_entrypoint import (  # noqa: E402
    collect_and_prune,
)
from utils.owner_gemini_trigger import (  # noqa: E402
    fire_owner_gemini_review,
    _validate_head_lock,
    OWNER_GEMINI_REVIEW_BODY,
)

# runner_mod 에서 결함4·5·6 결선 함수 추출
base_sync_before_merge = runner_mod.base_sync_before_merge
authoritative_completion_marker = runner_mod.authoritative_completion_marker
maybe_fire_owner_gemini_on_new_head = runner_mod.maybe_fire_owner_gemini_on_new_head

# ---------------------------------------------------------------------------
# fake removers (test_callback_fallback_prune_2728.py 동일 패턴)
# ---------------------------------------------------------------------------


def _remover_removed(cron_id: str, *, dry_run: bool) -> RemoverResult:
    return RemoverResult(status="removed", detail=f"fake removed {cron_id}")


def _remover_already_gone(cron_id: str, *, dry_run: bool) -> RemoverResult:
    return RemoverResult(status="already_gone", detail=f"fake already_gone {cron_id}")


def _remover_failed(cron_id: str, *, dry_run: bool) -> RemoverResult:
    return RemoverResult(status="failed", detail=f"fake failed {cron_id}")


# ===========================================================================
# 결함7: live prune
# ===========================================================================


def test_live_prune_collector_success_real_tombstone():
    """임시 registry에 pending fallback 1건 → collect_and_prune(success trigger, fake remover,
    dry_run=False) → live_prune.fired=True, real_tombstone=True, prune_outcomes에 pruned=True.
    """
    with tempfile.NamedTemporaryFile(suffix=".jsonl", delete=False) as f:
        reg = f.name

    register_fallback(
        task_id="task-2729-lp",
        round=1,
        head_sha="a" * 40,
        cron_id="CRON_LP_REAL",
        owner_key="anu",
        registry_path=reg,
    )

    result = collect_and_prune(
        task_id="task-2729-lp",
        round=1,
        head_sha="a" * 40,
        trigger="normal_callback_collected",
        remover=_remover_removed,
        dry_run=False,
        normal_callback_collected=True,
        dispatch_fired=True,
        require_success=True,
        registry_path=reg,
    )

    lp = result["live_prune"]
    assert lp["wired"] is True, f"live_prune.wired must be True, got {lp}"
    assert lp["fired"] is True, f"live_prune.fired must be True, got {lp}"
    assert lp["real_tombstone"] is True, f"live_prune.real_tombstone must be True, got {lp}"

    # prune_outcomes에 pruned=True 건 존재
    outcomes = result["prune_outcomes"]
    assert len(outcomes) == 1, f"Expected 1 prune_outcome, got {len(outcomes)}"
    assert outcomes[0]["pruned"] is True, f"prune_outcomes[0].pruned must be True"


def test_live_prune_no_success_signal_gated():
    """normal_callback_collected=False + result 없음 + trigger 비success + require_success=True
    → live_prune.fired=False, reason=NO_SUCCESS_SIGNAL, prune 미수행.
    """
    with tempfile.NamedTemporaryFile(suffix=".jsonl", delete=False) as f:
        reg = f.name

    # registry에 pending 등록 (prune이 수행되면 안 됨)
    register_fallback(
        task_id="task-2729-gate",
        round=1,
        head_sha="b" * 40,
        cron_id="CRON_GATE",
        owner_key="anu",
        registry_path=reg,
    )

    # result_json_path: 존재하지 않는 경로 (result.json 미존재 보장)
    missing_rj = "/tmp/does_not_exist_task_2729_gate_regression.result.json"
    import os; os.path.exists(missing_rj) and os.unlink(missing_rj)

    result = collect_and_prune(
        task_id="task-2729-gate",
        round=1,
        head_sha="b" * 40,
        trigger="some_unknown_trigger",     # 비success trigger
        remover=_remover_removed,
        dry_run=True,
        normal_callback_collected=False,    # success signal 없음
        dispatch_fired=True,
        require_success=True,
        result_json_path=missing_rj,        # result.json 미존재
        registry_path=reg,
    )

    lp = result["live_prune"]
    assert lp["wired"] is True, f"live_prune.wired must be True, got {lp}"
    assert lp["fired"] is False, f"live_prune.fired must be False (gated), got {lp}"
    assert lp.get("reason") == "NO_SUCCESS_SIGNAL", (
        f"live_prune.reason must be NO_SUCCESS_SIGNAL, got {lp.get('reason')!r}"
    )
    # prune 미수행 확인
    assert result["prune_outcomes"] == [], (
        f"prune_outcomes must be [] when gated, got {result['prune_outcomes']}"
    )


def test_summarize_live_prune_aggregates():
    """summarize_live_prune 직접 단위테스트: pruned=True+cron_remove_invoked=True 건이 있으면
    real_tombstone=True, pruned_count 집계, causes 리스트.
    """
    from utils.completion_callback_fallback_cancel import _now_utc

    outcomes = [
        PruneOutcome(
            task_id="t1",
            round=1,
            head_sha="c" * 40,
            cron_id="C1",
            classification=CancelClassification.CANCELLED,
            cause=PruneCause.NORMAL_CALLBACK_ALREADY_COLLECTED.value,
            cron_remove_invoked=True,
            pruned=True,
            detail="removed",
            ts_utc=_now_utc(),
        ),
        PruneOutcome(
            task_id="t1",
            round=1,
            head_sha="c" * 40,
            cron_id="C2",
            classification=CancelClassification.REMOVE_FAILED_WARNING,
            cause=PruneCause.CANCEL_FAILED.value,
            cron_remove_invoked=True,
            pruned=False,
            detail="failed",
            ts_utc=_now_utc(),
        ),
    ]

    summary = summarize_live_prune(outcomes)

    assert summary["real_tombstone"] is True, (
        f"real_tombstone must be True when pruned+cron_remove_invoked, got {summary}"
    )
    assert summary["pruned_count"] == 1, (
        f"pruned_count must be 1, got {summary['pruned_count']}"
    )
    assert len(summary["causes"]) == 2, (
        f"causes must have 2 entries, got {summary['causes']}"
    )

    # 모두 pruned=False 이면 real_tombstone=False
    outcomes_none_pruned = [
        PruneOutcome(
            task_id="t2",
            round=1,
            head_sha="d" * 40,
            cron_id="C3",
            classification=CancelClassification.REMOVE_FAILED_WARNING,
            cause=PruneCause.CANCEL_FAILED.value,
            cron_remove_invoked=True,
            pruned=False,
            detail="failed",
            ts_utc=_now_utc(),
        ),
    ]
    summary2 = summarize_live_prune(outcomes_none_pruned)
    assert summary2["real_tombstone"] is False, (
        f"real_tombstone must be False when no pruned+invoked, got {summary2}"
    )
    assert summary2["pruned_count"] == 0


# ===========================================================================
# 결함6: owner_gemini_trigger
# ===========================================================================

_VALID_SHA = "a1b2c3d4e5f6a1b2c3d4e5f6a1b2c3d4e5f6a1b2"


def test_owner_gemini_body_hardcoded():
    """반환 body == '/gemini review', active False."""
    result = fire_owner_gemini_review(
        pr_number=42,
        current_head_sha=_VALID_SHA,
        owner="testowner",
        repo="testrepo",
        task_id="task-2729",
    )
    assert result["body"] == OWNER_GEMINI_REVIEW_BODY == "/gemini review", (
        f"body must be '/gemini review', got {result['body']!r}"
    )
    assert result["active"] is False, (
        f"active must always be False, got {result['active']!r}"
    )


def test_owner_gemini_head_lock_invalid_raises():
    """비40hex head_sha → ValueError."""
    import pytest
    with pytest.raises(ValueError):
        fire_owner_gemini_review(
            pr_number=42,
            current_head_sha="tooshort",
            owner="testowner",
            repo="testrepo",
        )

    # 40자이지만 비-hex
    with pytest.raises(ValueError):
        fire_owner_gemini_review(
            pr_number=42,
            current_head_sha="z" * 40,  # z는 hex 아님
            owner="testowner",
            repo="testrepo",
        )

    # _validate_head_lock 직접 테스트
    with pytest.raises(ValueError):
        _validate_head_lock("not-a-sha")

    # 유효 SHA는 예외 없음
    _validate_head_lock(_VALID_SHA)  # 예외 없으면 OK


def test_owner_gemini_dedupe_skips():
    """already_fired_heads에 head 포함 → dedupe True, fired False, status DEDUPED."""
    result = fire_owner_gemini_review(
        pr_number=42,
        current_head_sha=_VALID_SHA,
        owner="testowner",
        repo="testrepo",
        task_id="task-2729",
        already_fired_heads={_VALID_SHA},
    )
    assert result["dedupe"] is True, (
        f"dedupe must be True when already in already_fired_heads, got {result}"
    )
    assert result["fired"] is False, (
        f"fired must be False on dedupe, got {result}"
    )
    assert result["status"] == "DEDUPED", (
        f"status must be DEDUPED, got {result['status']!r}"
    )


def test_owner_gemini_token_hash_only_no_raw():
    """token_provider 주입 → 반환에 raw token 미포함, token_hash_prefix는 raw와 다름."""
    raw_token = "ghp_SECRETTOKEN"

    result = fire_owner_gemini_review(
        pr_number=42,
        current_head_sha=_VALID_SHA,
        owner="testowner",
        repo="testrepo",
        task_id="task-2729",
        token_provider=lambda: raw_token,
    )

    # raw token이 반환값에 포함되지 않음
    result_str = str(result)
    assert raw_token not in result_str, (
        f"Raw token must NOT appear in result, got {result_str}"
    )

    # token_hash_prefix 존재하고 raw token과 다름
    prefix = result.get("token_hash_prefix")
    assert prefix is not None, "token_hash_prefix must not be None when token_provider is given"
    assert prefix != raw_token, (
        f"token_hash_prefix must differ from raw token, got {prefix!r}"
    )
    # sha256 앞 12자임을 검증
    import hashlib
    expected_prefix = hashlib.sha256(raw_token.encode("utf-8")).hexdigest()[:12]
    assert prefix == expected_prefix, (
        f"token_hash_prefix must be sha256[:12], got {prefix!r}, expected {expected_prefix!r}"
    )


def test_maybe_fire_no_new_head():
    """head_changed=False → NO_NEW_HEAD."""
    state = runner_mod.WatcherState()
    result = maybe_fire_owner_gemini_on_new_head(
        task_id="task-2729",
        pr_number=42,
        owner="testowner",
        repo="testrepo",
        current_head_sha=_VALID_SHA,
        head_changed=False,
        state=state,
    )
    assert result["status"] == "NO_NEW_HEAD", (
        f"status must be NO_NEW_HEAD when head_changed=False, got {result['status']!r}"
    )
    assert result["fired"] is False, (
        f"fired must be False when head_changed=False, got {result['fired']!r}"
    )


# ===========================================================================
# 결함5: base_sync_before_merge
# ===========================================================================


def test_base_sync_not_approved():
    """merge_approved=False → NOT_APPROVED."""
    result = base_sync_before_merge(
        pr_number=42,
        merge_state_status="BEHIND",
        pr_workdir="/tmp",
        runner=None,
        merge_approved=False,
    )
    assert result["status"] == "NOT_APPROVED", (
        f"Expected NOT_APPROVED, got {result['status']!r}"
    )
    assert result["merge_ok"] is False


def test_base_sync_admin_force_required():
    """merge_state_status 'DIRTY' → ABORT_ADMIN_OR_FORCE_REQUIRED."""
    result = base_sync_before_merge(
        pr_number=42,
        merge_state_status="DIRTY",
        pr_workdir="/tmp",
        runner=None,
        merge_approved=True,
    )
    assert result["status"] == "ABORT_ADMIN_OR_FORCE_REQUIRED", (
        f"Expected ABORT_ADMIN_OR_FORCE_REQUIRED, got {result['status']!r}"
    )
    assert result["merge_ok"] is False


def test_base_sync_up_to_date():
    """merge_state_status 'OPEN' 또는 'CLEAN' → UP_TO_DATE (sync 불필요)."""
    for status in ("OPEN", "CLEAN"):
        result = base_sync_before_merge(
            pr_number=42,
            merge_state_status=status,
            pr_workdir="/tmp",
            runner=None,
            merge_approved=True,
        )
        assert result["status"] == "UP_TO_DATE", (
            f"Expected UP_TO_DATE for merge_state={status!r}, got {result['status']!r}"
        )
        assert result["merge_ok"] is True
        assert result.get("base_synced") is False


def test_base_sync_behind_revalidated():
    """sync mock 수행(conflict False), fetch mock returns CLEAN → BASE_SYNCED_REVALIDATED."""
    sync_called = []

    def fake_sync_fn(pr_number, runner):
        sync_called.append(pr_number)
        return {"conflict": False}

    def fake_fetch_fn(pr_number, runner):
        return {"mergeStateStatus": "CLEAN"}

    result = base_sync_before_merge(
        pr_number=42,
        merge_state_status="BEHIND",
        pr_workdir="/tmp",
        runner=None,
        merge_approved=True,
        sync_fn=fake_sync_fn,
        fetch_state_fn=fake_fetch_fn,
    )
    assert result["status"] == "BASE_SYNCED_REVALIDATED", (
        f"Expected BASE_SYNCED_REVALIDATED, got {result['status']!r}"
    )
    assert result["merge_ok"] is True
    assert result.get("base_synced") is True
    assert sync_called == [42], f"sync_fn must be called once with pr_number, got {sync_called}"


def test_base_sync_conflict_abort():
    """sync mock conflict True → ABORT_CONFLICT."""
    def fake_sync_fn(pr_number, runner):
        return {"conflict": True}

    result = base_sync_before_merge(
        pr_number=42,
        merge_state_status="BEHIND",
        pr_workdir="/tmp",
        runner=None,
        merge_approved=True,
        sync_fn=fake_sync_fn,
    )
    assert result["status"] == "ABORT_CONFLICT", (
        f"Expected ABORT_CONFLICT, got {result['status']!r}"
    )
    assert result["merge_ok"] is False


def test_base_sync_diff_over_limit():
    """observed_diff_lines > max_diff_lines → ABORT_DIFF_OVER_LIMIT."""
    def fake_sync_fn(pr_number, runner):
        return {"conflict": False}

    result = base_sync_before_merge(
        pr_number=42,
        merge_state_status="BEHIND",
        pr_workdir="/tmp",
        runner=None,
        merge_approved=True,
        sync_fn=fake_sync_fn,
        max_diff_lines=100,
        observed_diff_lines=101,
    )
    assert result["status"] == "ABORT_DIFF_OVER_LIMIT", (
        f"Expected ABORT_DIFF_OVER_LIMIT, got {result['status']!r}"
    )
    assert result["merge_ok"] is False


def test_base_sync_still_behind():
    """sync 성공 후 fetch_state_fn이 여전히 BEHIND → ABORT_STILL_BEHIND."""
    def fake_sync_fn(pr_number, runner):
        return {"conflict": False}

    def fake_fetch_fn(pr_number, runner):
        return {"mergeStateStatus": "BEHIND"}

    result = base_sync_before_merge(
        pr_number=42,
        merge_state_status="BEHIND",
        pr_workdir="/tmp",
        runner=None,
        merge_approved=True,
        sync_fn=fake_sync_fn,
        fetch_state_fn=fake_fetch_fn,
    )
    assert result["status"] == "ABORT_STILL_BEHIND", (
        f"Expected ABORT_STILL_BEHIND, got {result['status']!r}"
    )
    assert result["merge_ok"] is False


# ===========================================================================
# 결함4: authoritative_completion_marker
# ===========================================================================


def test_authoritative_completion_full_evidence():
    """전부 True → AUTHORITATIVE_COMPLETION_BY_SCOPE_EVIDENCE, manual_done_forgery False."""
    result = authoritative_completion_marker(
        task_id="task-2729",
        external_dirty=True,
        merge_base_clean=True,
        ci_passed=True,
        local_fix_verified=True,
    )
    assert result["status"] == "AUTHORITATIVE_COMPLETION_BY_SCOPE_EVIDENCE", (
        f"Expected AUTHORITATIVE_COMPLETION_BY_SCOPE_EVIDENCE, got {result['status']!r}"
    )
    assert result["authoritative_completion"] is True
    assert result["manual_done_forgery"] is False, (
        f"manual_done_forgery must always be False, got {result['manual_done_forgery']!r}"
    )
    # evidence 키 존재
    ev = result.get("evidence", {})
    assert ev.get("merge_base_clean") is True
    assert ev.get("ci") is True
    assert ev.get("local_fix_verified") is True


def test_authoritative_completion_missing_evidence():
    """ci_passed=False → EXTERNAL_DIRTY_BLOCK_UNRESOLVED."""
    result = authoritative_completion_marker(
        task_id="task-2729",
        external_dirty=True,
        merge_base_clean=True,
        ci_passed=False,   # 하나라도 빠지면
        local_fix_verified=True,
    )
    assert result["status"] == "EXTERNAL_DIRTY_BLOCK_UNRESOLVED", (
        f"Expected EXTERNAL_DIRTY_BLOCK_UNRESOLVED, got {result['status']!r}"
    )
    assert result["authoritative_completion"] is False
    assert result["manual_done_forgery"] is False


def test_authoritative_completion_no_external_dirty():
    """external_dirty=False → NO_EXTERNAL_DIRTY_BLOCK."""
    result = authoritative_completion_marker(
        task_id="task-2729",
        external_dirty=False,
        merge_base_clean=True,
        ci_passed=True,
        local_fix_verified=True,
    )
    assert result["status"] == "NO_EXTERNAL_DIRTY_BLOCK", (
        f"Expected NO_EXTERNAL_DIRTY_BLOCK, got {result['status']!r}"
    )
    assert result["authoritative_completion"] is False
    assert result["manual_done_forgery"] is False


def test_no_manual_done_forgery():
    """marker_writer 주입해도 .done 파일 생성 안 함을 확인.

    marker_writer는 marker dict를 인자로 받아 호출되고, 본 함수는 .done 파일을
    직접 생성하지 않는다. marker_writer 호출 결과를 캡처하여 .done 경로가
    기록되지 않음을 확인한다.
    """
    import tempfile, os

    captured_markers = []

    def fake_marker_writer(marker_dict):
        captured_markers.append(marker_dict)

    result = authoritative_completion_marker(
        task_id="task-2729",
        external_dirty=True,
        merge_base_clean=True,
        ci_passed=True,
        local_fix_verified=True,
        marker_writer=fake_marker_writer,
    )

    # marker_writer가 호출됨
    assert len(captured_markers) == 1, (
        f"marker_writer should be called once, got {len(captured_markers)}"
    )

    # manual_done_forgery 항상 False
    assert result["manual_done_forgery"] is False

    # .done 파일이 직접 생성되지 않았음을 확인 (marker_writer에 전달된 dict에 .done 파일 경로 없음)
    marker_dict = captured_markers[0]
    marker_str = str(marker_dict)
    # marker dict에는 .done 파일 경로가 포함되지 않음
    assert ".done" not in marker_str or "manual_done_forgery" in marker_str, (
        "marker_writer dict must not encode direct .done file creation intent"
    )
    # 실제로 .done 파일이 시스템에 생성되지 않았는지 확인 (tmpdir 기반)
    # authoritative_completion_marker는 .done 파일을 직접 생성하는 코드가 없음
    # → marker_writer만 호출될 뿐, 함수 자체에서 open/write 없음
    assert result["status"] == "AUTHORITATIVE_COMPLETION_BY_SCOPE_EVIDENCE"
