"""
tests/dev7/test_done_semantics.py

옵션 D 시맨틱 핵심 동작 검증 (task-2374)
Phase 마커 3개 (work-done / merged / merge-failed) × 시나리오 4개 매트릭스

카마소츠(개발7팀 테스터) 작성
"""

import importlib.util
import json
import os
import subprocess
import sys
import time
from datetime import datetime, timedelta, timezone
from pathlib import Path
from unittest.mock import MagicMock

import pytest

# ---------------------------------------------------------------------------
# 워크트리 경로 설정: importlib로 직접 로드 (main workspace 오염 방지)
# ---------------------------------------------------------------------------

WORKTREE = "/home/jay/workspace/.worktrees/task-2374-dev7"

# auto_merge — importlib 직접 로드
_am_spec = importlib.util.spec_from_file_location(
    "auto_merge_wt",
    os.path.join(WORKTREE, "scripts", "auto_merge.py"),
)
auto_merge = importlib.util.module_from_spec(_am_spec)  # type: ignore[arg-type]
sys.modules["auto_merge_wt"] = auto_merge
_am_spec.loader.exec_module(auto_merge)  # type: ignore[union-attr]

_create_merged_marker = auto_merge._create_merged_marker
_create_merge_failed_marker = auto_merge._create_merge_failed_marker
_bump_retry_counter = auto_merge._bump_retry_counter
_update_state_json = auto_merge._update_state_json

# done-watcher import (하이픈 때문에 importlib 사용)
_dw_spec = importlib.util.spec_from_file_location(
    "done_watcher",
    os.path.join(WORKTREE, "scripts", "done-watcher.py"),
)
done_watcher = importlib.util.module_from_spec(_dw_spec)  # type: ignore[arg-type]
_dw_spec.loader.exec_module(done_watcher)  # type: ignore[union-attr]


# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------

@pytest.fixture(autouse=True)
def _patch_state_dir(tmp_path, monkeypatch):
    """auto_merge._STATE_DIR을 tmp_path 기반으로 redirect."""
    state_dir = tmp_path / "memory" / "state"
    state_dir.mkdir(parents=True, exist_ok=True)
    monkeypatch.setattr(auto_merge, "_STATE_DIR", state_dir)
    return state_dir


@pytest.fixture()
def events_dir(tmp_path) -> Path:
    d = tmp_path / "memory" / "events"
    d.mkdir(parents=True, exist_ok=True)
    return d


@pytest.fixture()
def state_dir(tmp_path) -> Path:
    return tmp_path / "memory" / "state"


# ---------------------------------------------------------------------------
# auto_merge helpers 단위 테스트
# ---------------------------------------------------------------------------

class TestCreateMergedMarkerAtomic:
    def test_creates_merged_file(self, events_dir, tmp_path):
        """_create_merged_marker가 .merged 파일을 atomic 생성한다."""
        result = _create_merged_marker(events_dir, "task-9999", merge_commit_sha="abc123")

        assert result == events_dir / "task-9999.merged"
        assert result.exists()

    def test_merged_file_has_correct_json(self, events_dir, tmp_path):
        """.merged 파일 내용에 task_id / merge_commit_sha가 있다."""
        _create_merged_marker(events_dir, "task-9999", merge_commit_sha="abc123")

        data = json.loads((events_dir / "task-9999.merged").read_text(encoding="utf-8"))
        assert data["task_id"] == "task-9999"
        assert data["merge_commit_sha"] == "abc123"
        assert "merged_at" in data

    def test_state_json_phase_merged(self, events_dir, tmp_path):
        """state.json의 phase가 merged로 갱신된다."""
        _create_merged_marker(events_dir, "task-9999", merge_commit_sha="abc123")

        state_file = auto_merge._STATE_DIR / "task-9999.json"
        assert state_file.exists()
        state = json.loads(state_file.read_text(encoding="utf-8"))
        assert state["phase"] == "merged"
        assert state["merge_commit_sha"] == "abc123"


class TestCreateMergedMarkerIdempotent:
    def test_no_op_when_already_exists(self, events_dir, tmp_path):
        """이미 .merged가 있으면 no-op (기존 파일 유지)."""
        merged = events_dir / "task-9999.merged"
        merged.write_text(json.dumps({"task_id": "task-9999", "original": True}), encoding="utf-8")

        result = _create_merged_marker(events_dir, "task-9999", merge_commit_sha="new_sha")

        data = json.loads(merged.read_text(encoding="utf-8"))
        assert data.get("original") is True  # 기존 파일 내용 유지
        assert result == merged


class TestBumpRetryCounter:
    def test_increments_from_zero(self, events_dir, tmp_path):
        """retry_count가 0 → 1 → 2 → 3으로 증가한다."""
        count1 = _bump_retry_counter(events_dir, "task-9999")
        assert count1 == 1

        count2 = _bump_retry_counter(events_dir, "task-9999")
        assert count2 == 2

        count3 = _bump_retry_counter(events_dir, "task-9999")
        assert count3 == 3

    def test_retry_count_file_synced(self, events_dir, tmp_path):
        """.retry_count 파일도 동기화된다."""
        _bump_retry_counter(events_dir, "task-9999")

        retry_file = events_dir / "task-9999.retry_count"
        assert retry_file.exists()
        assert retry_file.read_text(encoding="utf-8").strip() == "1"

    def test_last_error_stored(self, events_dir, tmp_path):
        """last_error가 state.json에 저장된다."""
        _bump_retry_counter(events_dir, "task-9999", last_error="merge conflict")

        state = json.loads((auto_merge._STATE_DIR / "task-9999.json").read_text(encoding="utf-8"))
        assert state["last_error"] == "merge conflict"


class TestCreateMergeFailedMarker:
    def test_creates_merge_failed_file(self, events_dir, tmp_path):
        """.merge-failed 파일이 atomic 생성된다."""
        result = _create_merge_failed_marker(
            events_dir, "task-9999", retry_count=5, last_error="branch conflict"
        )

        assert result == events_dir / "task-9999.merge-failed"
        assert result.exists()

    def test_merge_failed_json_content(self, events_dir, tmp_path):
        """.merge-failed JSON에 필수 필드가 있다."""
        _create_merge_failed_marker(events_dir, "task-9999", retry_count=5, last_error="err")

        data = json.loads((events_dir / "task-9999.merge-failed").read_text(encoding="utf-8"))
        assert data["task_id"] == "task-9999"
        assert data["retry_count"] == 5
        assert "failed_at" in data

    def test_state_json_phase_merge_failed(self, events_dir, tmp_path):
        """state.json의 phase가 merge-failed로 갱신된다."""
        _create_merge_failed_marker(events_dir, "task-9999", retry_count=5, last_error="err")

        state = json.loads((auto_merge._STATE_DIR / "task-9999.json").read_text(encoding="utf-8"))
        assert state["phase"] == "merge-failed"


# ---------------------------------------------------------------------------
# done-watcher reconcile 시나리오
# ---------------------------------------------------------------------------

@pytest.fixture()
def patched_dw_dirs(tmp_path, monkeypatch):
    """done-watcher의 EVENTS_DIR / STATE_DIR을 tmp_path 기반으로 redirect."""
    events = tmp_path / "memory" / "events"
    state = tmp_path / "memory" / "state"
    events.mkdir(parents=True, exist_ok=True)
    state.mkdir(parents=True, exist_ok=True)
    monkeypatch.setattr(done_watcher, "EVENTS_DIR", events)
    monkeypatch.setattr(done_watcher, "STATE_DIR", state)
    return events, state


class TestReconcileMetaTaskImmediateMerged:
    def test_meta_kind_creates_merged_immediately(self, patched_dw_dirs, tmp_path):
        """kind=meta + merge_required=False → .merged 즉시 생성 (system-done)."""
        events, state = patched_dw_dirs

        # state.json에 kind=meta 설정
        state_data = {
            "task_id": "task-9001",
            "phase": "work-done",
            "kind": "meta",
            "merge_required": False,
        }
        (state / "task-9001.json").write_text(json.dumps(state_data), encoding="utf-8")

        # .work-done 파일 생성
        wd = events / "task-9001.work-done"
        wd.write_text(json.dumps({"task_id": "task-9001"}), encoding="utf-8")

        result = done_watcher.reconcile_work_done(wd)

        assert result == "system-done"
        assert (events / "task-9001.merged").exists()

    def test_merge_required_false_creates_merged_immediately(self, patched_dw_dirs, tmp_path):
        """merge_required=False (kind=code) → .merged 즉시 생성."""
        events, state = patched_dw_dirs

        state_data = {
            "task_id": "task-9002",
            "kind": "code",
            "merge_required": False,
        }
        (state / "task-9002.json").write_text(json.dumps(state_data), encoding="utf-8")

        wd = events / "task-9002.work-done"
        wd.write_text(json.dumps({"task_id": "task-9002"}), encoding="utf-8")

        result = done_watcher.reconcile_work_done(wd)

        assert result == "system-done"
        assert (events / "task-9002.merged").exists()


class TestReconcilePollingNoMergedYet:
    def test_gh_pr_no_merged_at_returns_polling(self, patched_dw_dirs, monkeypatch, tmp_path):
        """gh pr view mergedAt=null → polling 상태 유지 (.merged 미생성)."""
        events, state = patched_dw_dirs

        state_data = {
            "task_id": "task-9003",
            "kind": "code",
            "merge_required": True,
            "pr_url": "https://github.com/test/repo/pull/42",
        }
        (state / "task-9003.json").write_text(json.dumps(state_data), encoding="utf-8")

        wd = events / "task-9003.work-done"
        wd.write_text(json.dumps({"task_id": "task-9003", "pr_url": "https://github.com/test/repo/pull/42"}), encoding="utf-8")

        # gh pr view → mergedAt: null
        mock_result = MagicMock()
        mock_result.returncode = 0
        mock_result.stdout = json.dumps({"mergedAt": None, "mergeCommit": None})
        mock_result.stderr = ""
        monkeypatch.setattr(subprocess, "run", lambda *a, **kw: mock_result)

        result = done_watcher.reconcile_work_done(wd)

        assert result == "polling"
        assert not (events / "task-9003.merged").exists()


class TestReconcileMergedObserved:
    def test_gh_pr_merged_at_creates_merged_marker(self, patched_dw_dirs, monkeypatch, tmp_path):
        """gh pr view mergedAt!=null → atomic .merged 생성."""
        events, state = patched_dw_dirs

        state_data = {
            "task_id": "task-9004",
            "kind": "code",
            "merge_required": True,
            "pr_url": "https://github.com/test/repo/pull/43",
        }
        (state / "task-9004.json").write_text(json.dumps(state_data), encoding="utf-8")

        wd = events / "task-9004.work-done"
        wd.write_text(json.dumps({"task_id": "task-9004", "pr_url": "https://github.com/test/repo/pull/43"}), encoding="utf-8")

        mock_result = MagicMock()
        mock_result.returncode = 0
        mock_result.stdout = json.dumps({
            "mergedAt": "2026-05-02T10:00:00Z",
            "mergeCommit": {"oid": "deadbeef123"},
        })
        mock_result.stderr = ""
        monkeypatch.setattr(subprocess, "run", lambda *a, **kw: mock_result)

        result = done_watcher.reconcile_work_done(wd)

        assert result == "merged"
        merged_file = events / "task-9004.merged"
        assert merged_file.exists()
        data = json.loads(merged_file.read_text(encoding="utf-8"))
        assert data["merge_commit_sha"] == "deadbeef123"


class TestReconcile30MinTimeoutToFailed:
    def test_work_done_mtime_31min_ago_creates_merge_failed(self, patched_dw_dirs, monkeypatch, tmp_path):
        """work-done mtime 31분 전 + mergedAt null → .merge-failed 생성."""
        events, state = patched_dw_dirs

        state_data = {
            "task_id": "task-9005",
            "kind": "code",
            "merge_required": True,
            "pr_url": "https://github.com/test/repo/pull/44",
            "retry_count": 0,
        }
        (state / "task-9005.json").write_text(json.dumps(state_data), encoding="utf-8")

        wd = events / "task-9005.work-done"
        wd.write_text(json.dumps({"task_id": "task-9005", "pr_url": "https://github.com/test/repo/pull/44"}), encoding="utf-8")

        # mtime을 31분 전으로 설정
        past_time = time.time() - (31 * 60)
        os.utime(wd, (past_time, past_time))

        # gh pr view → mergedAt: null
        mock_result = MagicMock()
        mock_result.returncode = 0
        mock_result.stdout = json.dumps({"mergedAt": None, "mergeCommit": None})
        mock_result.stderr = ""
        monkeypatch.setattr(subprocess, "run", lambda *a, **kw: mock_result)

        result = done_watcher.reconcile_work_done(wd)

        assert result == "merge-failed"
        failed_file = events / "task-9005.merge-failed"
        assert failed_file.exists()
        data = json.loads(failed_file.read_text(encoding="utf-8"))
        assert "timeout" in data.get("last_error", "").lower()


# ---------------------------------------------------------------------------
# 호환 shim 검증: .done symlink 시뮬레이션
# ---------------------------------------------------------------------------

class TestFinishTaskCreatesDoneSymlink:
    def test_done_symlink_points_to_work_done(self, tmp_path):
        """.work-done 생성 후 .done이 symlink로 .work-done을 가리키는 케이스를 시뮬레이트."""
        events = tmp_path / "memory" / "events"
        events.mkdir(parents=True, exist_ok=True)

        # .work-done 파일 생성
        work_done = events / "task-9999.work-done"
        payload = {
            "task_id": "task-9999",
            "status": "work_done",
            "created_at": datetime.now(timezone(timedelta(hours=9))).strftime("%Y-%m-%dT%H:%M:%S+09:00"),
        }
        work_done.write_text(json.dumps(payload), encoding="utf-8")

        # finish-task.sh 시뮬레이션: .done symlink 생성
        done_link = events / "task-9999.done"
        done_link.symlink_to(work_done)

        # 검증
        assert work_done.exists()
        assert done_link.is_symlink()
        assert done_link.resolve() == work_done.resolve()

        # .done을 통해 읽은 내용이 .work-done과 동일
        done_data = json.loads(done_link.read_text(encoding="utf-8"))
        assert done_data["task_id"] == "task-9999"
        assert done_data["status"] == "work_done"


# ---------------------------------------------------------------------------
# atomicity: tempfile + rename 패턴
# ---------------------------------------------------------------------------

class TestAtomicWriteNoPartialFile:
    def test_atomic_write_no_partial_file_exposed(self, tmp_path):
        """tempfile + rename 패턴이 partial file을 노출하지 않는다."""
        import tempfile

        target = tmp_path / "atomic_test.json"
        data = {"key": "value", "number": 42}

        # atomic write 패턴 직접 구현 후 검증
        fd, tmp_path_str = tempfile.mkstemp(
            dir=str(target.parent),
            prefix=f".{target.name}.",
            suffix=".tmp",
        )
        tmp_file = Path(tmp_path_str)

        try:
            with os.fdopen(fd, "w", encoding="utf-8") as f:
                json.dump(data, f, ensure_ascii=False, indent=2)

            # rename 전에 target이 없어야 함
            assert not target.exists()

            os.rename(tmp_path_str, target)
        except Exception:
            tmp_file.unlink(missing_ok=True)
            raise

        # rename 후 target이 완전한 JSON이어야 함
        assert target.exists()
        assert not tmp_file.exists()
        loaded = json.loads(target.read_text(encoding="utf-8"))
        assert loaded == data

    def test_auto_merge_atomic_write_json(self, tmp_path):
        """auto_merge._atomic_write_json이 올바르게 JSON을 기록한다."""
        target = tmp_path / "test_output.json"
        payload = {"task_id": "task-9999", "phase": "merged"}

        auto_merge._atomic_write_json(target, payload)

        assert target.exists()
        loaded = json.loads(target.read_text(encoding="utf-8"))
        assert loaded["task_id"] == "task-9999"
        assert loaded["phase"] == "merged"
