"""
test_atomic_timer_write.py

task-timers.json 동시 쓰기 손상 수정(atomic write fix) 회귀 테스트.

배경:
  dispatch.py의 _patch_timer_metadata()와 memory/task-timer.py의 _save_timers()가
  task-timers.json에 동시 쓰기하여 JSON이 깨지는 버그가 발생함.
  수정 후 두 쪽 모두 아래 패턴을 사용한다:
    1. flock(LOCK_EX)로 진입 직렬화
    2. temp 파일 → fsync → os.replace() 원자적 교체
    3. (또는 utils/atomic_write.py의 atomic_json_write() 활용)

테스트는 dispatch.py / task-timer.py를 직접 임포트하지 않고
동일한 패턴을 복제하여 패턴 자체의 올바름을 검증한다.

실패 기준:
  - 원자적 쓰기 없이 open("w") + json.dump 만 쓸 경우 TC1/TC2/TC5는 확률적으로 FAIL.
  - atomic_json_write + flock 사용 시 모두 PASS.

작성자: 아르고스 (Argos) — dev1-team tester
"""

import fcntl
import json
import sys
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
from typing import Any

# ---------------------------------------------------------------------------
# 모듈 경로 설정
# ---------------------------------------------------------------------------
sys.path.insert(0, str(Path(__file__).parent.parent))

from utils.atomic_write import atomic_json_write  # type: ignore[import-not-found]  # noqa: E402  # pyright: ignore[reportMissingImports]

# ---------------------------------------------------------------------------
# 샘플 데이터 팩토리
# ---------------------------------------------------------------------------

SAMPLE_TASK_ID = "task-100.1"


def _make_sample_data(counter: int = 100) -> dict:
    """테스트용 task-timers.json 초기 데이터."""
    return {
        "tasks": {
            SAMPLE_TASK_ID: {
                "task_id": SAMPLE_TASK_ID,
                "team_id": "dev1-team",
                "status": "running",
                "start_time": "2026-04-12T10:00:00",
            }
        },
        "counter": counter,
        "last_updated": "2026-04-12T10:00:00",
    }


# ---------------------------------------------------------------------------
# 헬퍼: _patch_timer_metadata 패턴 복제 (flock + atomic write)
# ---------------------------------------------------------------------------

def _atomic_patch_timer(timer_file: Path, lock_file: Path, task_id: str, **metadata: Any) -> None:
    """dispatch.py _patch_timer_metadata() 의 수정 후 패턴을 복제.

    flock(LOCK_EX) → 읽기 → 패치 → atomic_json_write → flock(LOCK_UN)
    """
    lock_file.parent.mkdir(parents=True, exist_ok=True)
    with open(lock_file, "w") as lock_fd:
        fcntl.flock(lock_fd, fcntl.LOCK_EX)
        try:
            if not timer_file.exists():
                return
            with open(timer_file, "r", encoding="utf-8") as f:
                data = json.load(f)
            task_entry = data.get("tasks", {}).get(task_id)
            if task_entry is not None:
                task_entry.update(metadata)
                data["last_updated"] = "2026-04-12T10:00:01"
                atomic_json_write(timer_file, data)
        finally:
            fcntl.flock(lock_fd, fcntl.LOCK_UN)


def _unsafe_patch_timer(timer_file: Path, _lock_file: Path, task_id: str, **metadata: Any) -> None:
    """수정 전 패턴: flock 없이 open("w") + json.dump 직접 쓰기.

    이 함수는 TC1/TC2가 원자적 패턴 없이 FAIL함을 보이기 위한 대조군이다.
    실제 프로덕션 코드에서는 절대 사용 금지.
    """
    if not timer_file.exists():
        return
    with open(timer_file, "r", encoding="utf-8") as f:
        data = json.load(f)
    task_entry = data.get("tasks", {}).get(task_id)
    if task_entry is not None:
        task_entry.update(metadata)
        data["last_updated"] = "2026-04-12T10:00:01"
    # 의도적으로 lock 없이, atomic write 없이 덮어쓴다 (버그 재현)
    with open(timer_file, "w", encoding="utf-8") as f:
        json.dump(data, f, ensure_ascii=False, indent=2)


# ---------------------------------------------------------------------------
# 헬퍼: task-timer.py _save_timers 패턴 복제 (flock + temp+replace)
# ---------------------------------------------------------------------------

# ---------------------------------------------------------------------------
# TC1 — 동시 _patch_timer_metadata 2개: JSON 무결성 검증
# ---------------------------------------------------------------------------

class TestConcurrentDispatchPatch:
    """TC1: 2개 이상의 _patch_timer_metadata 호출이 동시에 실행되어도
    task-timers.json이 유효한 JSON 상태를 유지해야 한다."""

    CONCURRENCY = 10  # 동시 스레드 수 (요구사항: 최소 10)
    ITERATIONS = 5    # 스레드당 반복 수

    def test_concurrent_dispatch_patch(self, tmp_path):
        """TC1: 10개 스레드 × 5회 = 50번 동시 패치 후 JSON 유효성 확인."""
        timer_file = tmp_path / "memory" / "task-timers.json"
        lock_file = tmp_path / "memory" / ".task-timers.lock"
        timer_file.parent.mkdir(parents=True, exist_ok=True)

        # 초기 데이터 기록
        atomic_json_write(timer_file, _make_sample_data())

        errors = []

        def worker(thread_idx: int) -> None:
            for i in range(self.ITERATIONS):
                _atomic_patch_timer(
                    timer_file,
                    lock_file,
                    SAMPLE_TASK_ID,
                    bot=f"bot-{thread_idx}",
                    iteration=i,
                )

        with ThreadPoolExecutor(max_workers=self.CONCURRENCY) as pool:
            futures = [pool.submit(worker, idx) for idx in range(self.CONCURRENCY)]
            for fut in as_completed(futures):
                exc = fut.exception()
                if exc is not None:
                    errors.append(exc)

        assert not errors, f"워커 예외 발생: {errors}"

        # (a) JSON 파싱 가능
        raw = timer_file.read_text(encoding="utf-8")
        data = json.loads(raw)  # JSONDecodeError 시 테스트 FAIL

        # (b) 핵심 키 존재
        assert "tasks" in data
        assert SAMPLE_TASK_ID in data["tasks"]

        # (c) 데이터 손실 없음: task 엔트리에 기본 필드 유지
        entry = data["tasks"][SAMPLE_TASK_ID]
        assert entry["task_id"] == SAMPLE_TASK_ID
        assert entry["team_id"] == "dev1-team"

    def test_concurrent_dispatch_patch_without_atomic_fails(self, tmp_path):
        """대조 실험: atomic write 없이 동시 패치하면 JSON이 깨질 수 있음을 시연.

        이 테스트는 _unsafe_patch_timer를 사용하며,
        손상이 발생하면 json.loads가 JSONDecodeError를 던진다.
        손상이 발생하지 않더라도 pytest.warns 등을 쓰지 않고,
        손상이 '발생할 수 있음'을 문서화하는 smoke test다.

        주의: 파일시스템/OS 버퍼 타이밍에 따라 항상 손상이 발생하진 않으므로
        이 테스트 자체는 손상 여부를 assert하지 않는다.
        대신 atomic 패턴이 정상 동작함을 TC1_main이 보증한다.
        """
        timer_file = tmp_path / "memory" / "task-timers.json"
        lock_file = tmp_path / "memory" / ".task-timers.lock"
        timer_file.parent.mkdir(parents=True, exist_ok=True)
        atomic_json_write(timer_file, _make_sample_data())

        def unsafe_worker(_: int) -> None:
            for i in range(self.ITERATIONS):
                try:
                    _unsafe_patch_timer(
                        timer_file,
                        lock_file,
                        SAMPLE_TASK_ID,
                        iteration=i,
                    )
                except Exception:
                    pass  # 손상된 JSON을 읽으면 예외 — 억제

        with ThreadPoolExecutor(max_workers=self.CONCURRENCY) as pool:
            futures = [pool.submit(unsafe_worker, idx) for idx in range(self.CONCURRENCY)]
            for fut in as_completed(futures):
                fut.exception()  # 예외 수집 (억제)

        # 손상 여부만 기록 — 이 경로는 구체적인 assert가 없음.
        # 핵심 보증은 test_concurrent_dispatch_patch에서 제공한다.
        try:
            json.loads(timer_file.read_text(encoding="utf-8"))
        except json.JSONDecodeError:
            pass  # 예상된 실패 — atomic write 없이 손상이 발생할 수 있다는 증거


# ---------------------------------------------------------------------------
# TC2 — dispatch._patch_timer_metadata + task-timer._save_timers 동시 실행
# ---------------------------------------------------------------------------

class TestConcurrentDispatchAndTimerEnd:
    """TC2: dispatch 패치와 task-timer 저장이 동시에 실행되어도
    JSON 무결성이 유지되어야 한다."""

    CONCURRENCY = 10

    def test_concurrent_dispatch_and_timer_end(self, tmp_path):
        """TC2: 패치 워커 5개 + 저장 워커 5개가 동시에 실행."""
        timer_file = tmp_path / "memory" / "task-timers.json"
        lock_file = tmp_path / "memory" / ".task-timers.lock"
        timer_file.parent.mkdir(parents=True, exist_ok=True)

        initial_data = _make_sample_data()
        atomic_json_write(timer_file, initial_data)

        errors = []

        def patch_worker(thread_idx: int) -> None:
            for _ in range(5):
                _atomic_patch_timer(
                    timer_file,
                    lock_file,
                    SAMPLE_TASK_ID,
                    role=f"dev{thread_idx % 8 + 1}",
                    model="claude-sonnet-4-6",
                )

        def save_worker(_thread_idx: int) -> None:
            for i in range(5):
                # _save_timers 패턴: flock(LOCK_EX) 내부에서 직접 atomic write
                # (_atomic_save_timers는 내부적으로 다시 flock을 시도하므로
                #  deadlock 방지를 위해 여기서는 직접 인라인 구현)
                with open(lock_file, "w") as lfd:
                    fcntl.flock(lfd, fcntl.LOCK_EX)
                    try:
                        with open(timer_file, "r", encoding="utf-8") as f:
                            data = json.load(f)
                        entry = data["tasks"].get(SAMPLE_TASK_ID, {})
                        entry["retry_count"] = i
                        data["tasks"][SAMPLE_TASK_ID] = entry
                        # flock을 이미 보유한 상태이므로 atomic_json_write 직접 호출
                        # (lock 재획득 없이 temp→fsync→replace 패턴만 수행)
                        atomic_json_write(timer_file, data)
                    finally:
                        fcntl.flock(lfd, fcntl.LOCK_UN)

        futures_list = []
        with ThreadPoolExecutor(max_workers=self.CONCURRENCY) as pool:
            for idx in range(self.CONCURRENCY // 2):
                futures_list.append(pool.submit(patch_worker, idx))
                futures_list.append(pool.submit(save_worker, idx))
            for fut in as_completed(futures_list):
                exc = fut.exception()
                if exc is not None:
                    errors.append(exc)

        assert not errors, f"워커 예외 발생: {errors}"

        # (a) JSON 파싱 가능
        raw = timer_file.read_text(encoding="utf-8")
        data = json.loads(raw)

        # (b) 데이터 손실 없음
        assert "tasks" in data
        assert SAMPLE_TASK_ID in data["tasks"]

        entry = data["tasks"][SAMPLE_TASK_ID]
        assert entry.get("task_id") == SAMPLE_TASK_ID

        # (c) 구조 유효 — tasks 는 dict 여야 한다
        assert isinstance(data["tasks"], dict)


# ---------------------------------------------------------------------------
# TC3 — 기본 회귀: 단일 패치 호출 후 메타데이터 정합성 확인
# ---------------------------------------------------------------------------

class TestDispatchRegressionBasic:
    """TC3: _patch_timer_metadata 단순 1회 호출 회귀 테스트."""

    def test_dispatch_regression_basic(self, tmp_path):
        """TC3: 패치 1회 후 해당 필드가 올바르게 기록되고 JSON이 유효해야 한다."""
        timer_file = tmp_path / "memory" / "task-timers.json"
        lock_file = tmp_path / "memory" / ".task-timers.lock"
        timer_file.parent.mkdir(parents=True, exist_ok=True)

        atomic_json_write(timer_file, _make_sample_data())

        _atomic_patch_timer(
            timer_file,
            lock_file,
            SAMPLE_TASK_ID,
            bot="bot-b",
            role="dev1",
            model="claude-sonnet-4-6",
            schedule_id="B8C44F05",
            retry_count=0,
            max_retry=2,
        )

        data = json.loads(timer_file.read_text(encoding="utf-8"))

        # (a) JSON 파싱 가능 (위 json.loads가 통과하면 확인됨)
        assert "tasks" in data

        entry = data["tasks"][SAMPLE_TASK_ID]

        # (b) 패치된 메타데이터 필드 확인
        assert entry.get("bot") == "bot-b"
        assert entry.get("role") == "dev1"
        assert entry.get("model") == "claude-sonnet-4-6"
        assert entry.get("schedule_id") == "B8C44F05"
        assert entry.get("retry_count") == 0
        assert entry.get("max_retry") == 2

        # (c) 기존 필드 보존 확인 (덮어쓰기 시 유실 방지)
        assert entry.get("task_id") == SAMPLE_TASK_ID
        assert entry.get("team_id") == "dev1-team"
        assert entry.get("status") == "running"

    def test_patch_nonexistent_task_id_is_noop(self, tmp_path):
        """존재하지 않는 task_id에 패치해도 파일이 유효 상태를 유지해야 한다."""
        timer_file = tmp_path / "memory" / "task-timers.json"
        lock_file = tmp_path / "memory" / ".task-timers.lock"
        timer_file.parent.mkdir(parents=True, exist_ok=True)

        atomic_json_write(timer_file, _make_sample_data())

        _atomic_patch_timer(timer_file, lock_file, "task-9999.9", bot="ghost")

        # 파일이 여전히 유효해야 한다
        data = json.loads(timer_file.read_text(encoding="utf-8"))
        assert SAMPLE_TASK_ID in data["tasks"]
        # 존재하지 않는 task_id는 생성되지 않아야 한다
        assert "task-9999.9" not in data["tasks"]

    def test_patch_preserves_other_tasks(self, tmp_path):
        """패치 시 다른 task 항목이 유실되지 않아야 한다."""
        timer_file = tmp_path / "memory" / "task-timers.json"
        lock_file = tmp_path / "memory" / ".task-timers.lock"
        timer_file.parent.mkdir(parents=True, exist_ok=True)

        data = _make_sample_data()
        data["tasks"]["task-200.1"] = {
            "task_id": "task-200.1",
            "team_id": "dev2-team",
            "status": "completed",
            "start_time": "2026-04-12T09:00:00",
        }
        atomic_json_write(timer_file, data)

        _atomic_patch_timer(timer_file, lock_file, SAMPLE_TASK_ID, bot="bot-a")

        result = json.loads(timer_file.read_text(encoding="utf-8"))
        assert "task-200.1" in result["tasks"], "다른 task 항목이 유실됨"
        assert result["tasks"]["task-200.1"]["team_id"] == "dev2-team"


# ---------------------------------------------------------------------------
# test_atomic_write_fsync — fsync 포함 원자적 쓰기 일관성 검증
# ---------------------------------------------------------------------------

class TestAtomicWriteFsync:
    """atomic_json_write()가 fsync를 포함하여 쓰기 후 데이터 일관성을 보장함을 검증."""

    def test_atomic_write_fsync(self, tmp_path):
        """쓰기 완료 후 파일을 다시 읽었을 때 동일 데이터를 반환해야 한다."""
        target = tmp_path / "task-timers.json"
        payload = _make_sample_data(counter=42)

        atomic_json_write(target, payload)

        # 파일이 존재해야 한다
        assert target.exists()

        # 파일 내용이 정확히 일치해야 한다 (fsync 없이는 OS 버퍼 미플러시 가능)
        with open(target, "r", encoding="utf-8") as f:
            loaded = json.load(f)

        assert loaded == payload
        assert loaded["counter"] == 42
        assert SAMPLE_TASK_ID in loaded["tasks"]

    def test_atomic_write_no_temp_file_left(self, tmp_path):
        """쓰기 성공 후 임시 파일(.tmp)이 남아있지 않아야 한다."""
        target = tmp_path / "task-timers.json"
        atomic_json_write(target, _make_sample_data())

        tmp_files = list(tmp_path.glob("*.tmp"))
        assert tmp_files == [], f"임시 파일이 남아있음: {tmp_files}"

    def test_atomic_write_overwrites_correctly(self, tmp_path):
        """기존 파일을 덮어쓸 때 새 데이터만 정확히 기록되어야 한다."""
        target = tmp_path / "task-timers.json"

        # 초기 데이터 기록
        atomic_json_write(target, _make_sample_data(counter=1))

        # 새 데이터로 덮어쓰기
        new_data = _make_sample_data(counter=999)
        new_data["tasks"]["task-200.1"] = {
            "task_id": "task-200.1",
            "team_id": "dev2-team",
            "status": "running",
            "start_time": "2026-04-12T11:00:00",
        }
        atomic_json_write(target, new_data)

        result = json.loads(target.read_text(encoding="utf-8"))
        assert result["counter"] == 999
        assert "task-200.1" in result["tasks"]

    def test_atomic_write_uses_replace_not_in_place(self, tmp_path):
        """atomic_json_write는 os.replace 기반으로 동작해야 한다 (in-place 쓰기 아님).

        검증 방법: 쓰기 전 원본 파일의 inode를 기록하고,
        쓰기 후 inode가 달라졌는지 확인한다.
        (os.replace는 새 inode를 갖는 파일로 교체하므로 inode가 바뀐다.)
        """
        target = tmp_path / "task-timers.json"
        atomic_json_write(target, _make_sample_data(counter=1))
        inode_before = target.stat().st_ino

        atomic_json_write(target, _make_sample_data(counter=2))
        inode_after = target.stat().st_ino

        # os.replace를 사용하면 inode가 달라진다
        assert inode_before != inode_after, (
            "inode가 같음 — os.replace 대신 in-place 쓰기가 사용되고 있을 수 있음"
        )


# ---------------------------------------------------------------------------
# test_flock_prevents_race — flock이 경쟁 조건을 직렬화함을 검증
# ---------------------------------------------------------------------------

class TestFlockPreventsRace:
    """flock(LOCK_EX)가 동시 쓰기를 직렬화하여 데이터 손실 없음을 검증."""

    CONCURRENCY = 10
    TASKS_PER_WORKER = 3

    def test_flock_prevents_race(self, tmp_path):
        """10개 스레드가 각각 다른 task_id를 추가할 때 모든 항목이 보존되어야 한다.

        flock 없이 동시에 쓰면 나중에 쓴 스레드가 이전 스레드의 쓰기를 덮어쓰므로
        일부 task_id가 유실된다. flock이 올바르게 동작하면 모두 보존된다.
        """
        timer_file = tmp_path / "memory" / "task-timers.json"
        lock_file = tmp_path / "memory" / ".task-timers.lock"
        timer_file.parent.mkdir(parents=True, exist_ok=True)

        # 빈 초기 데이터
        atomic_json_write(timer_file, {"tasks": {}, "counter": 0, "last_updated": ""})

        expected_task_ids = set()
        errors = []

        def add_tasks_worker(thread_idx: int) -> None:
            for i in range(self.TASKS_PER_WORKER):
                task_id = f"task-{thread_idx * 100 + i}.1"
                with open(lock_file, "w") as lfd:
                    fcntl.flock(lfd, fcntl.LOCK_EX)
                    try:
                        with open(timer_file, "r", encoding="utf-8") as f:
                            data = json.load(f)
                        data["tasks"][task_id] = {
                            "task_id": task_id,
                            "team_id": f"dev{thread_idx % 8 + 1}-team",
                            "status": "running",
                            "start_time": "2026-04-12T10:00:00",
                        }
                        data["counter"] += 1
                        atomic_json_write(timer_file, data)
                    finally:
                        fcntl.flock(lfd, fcntl.LOCK_UN)

        # 기대 task_id 목록 미리 계산
        for t_idx in range(self.CONCURRENCY):
            for i in range(self.TASKS_PER_WORKER):
                expected_task_ids.add(f"task-{t_idx * 100 + i}.1")

        with ThreadPoolExecutor(max_workers=self.CONCURRENCY) as pool:
            futures = [pool.submit(add_tasks_worker, idx) for idx in range(self.CONCURRENCY)]
            for fut in as_completed(futures):
                exc = fut.exception()
                if exc is not None:
                    errors.append(exc)

        assert not errors, f"워커 예외 발생: {errors}"

        # (a) JSON 파싱 가능
        raw = timer_file.read_text(encoding="utf-8")
        final_data = json.loads(raw)

        # (b) 모든 task_id가 보존되어야 한다 (데이터 손실 없음)
        actual_task_ids = set(final_data["tasks"].keys())
        missing = expected_task_ids - actual_task_ids
        assert not missing, (
            f"flock 미적용 시 손실되는 task_id {len(missing)}개: {sorted(missing)[:5]}..."
        )

        # (c) counter 값이 정확해야 한다
        total_expected = self.CONCURRENCY * self.TASKS_PER_WORKER
        assert final_data["counter"] == total_expected, (
            f"counter 불일치: 예상={total_expected}, 실제={final_data['counter']}"
        )

    def test_flock_serializes_write_order(self, tmp_path):
        """flock이 쓰기를 직렬화하므로 마지막 writer의 데이터가 완전히 기록되어야 한다."""
        timer_file = tmp_path / "memory" / "task-timers.json"
        lock_file = tmp_path / "memory" / ".task-timers.lock"
        timer_file.parent.mkdir(parents=True, exist_ok=True)

        atomic_json_write(timer_file, _make_sample_data())

        write_log = []  # 쓰기 완료 순서 기록

        def sequential_writer(value: int) -> None:
            with open(lock_file, "w") as lfd:
                fcntl.flock(lfd, fcntl.LOCK_EX)
                try:
                    with open(timer_file, "r", encoding="utf-8") as f:
                        data = json.load(f)
                    data["counter"] = value
                    data["last_updated"] = f"2026-04-12T10:00:{value:02d}"
                    atomic_json_write(timer_file, data)
                    write_log.append(value)
                finally:
                    fcntl.flock(lfd, fcntl.LOCK_UN)

        with ThreadPoolExecutor(max_workers=self.CONCURRENCY) as pool:
            futures = [pool.submit(sequential_writer, v) for v in range(self.CONCURRENCY)]
            for fut in as_completed(futures):
                fut.result()  # 예외 재발생

        # 최종 파일은 유효한 JSON 이어야 한다
        final = json.loads(timer_file.read_text(encoding="utf-8"))
        assert isinstance(final["counter"], int)
        assert 0 <= final["counter"] < self.CONCURRENCY

        # write_log에 중복이 없어야 한다 (각 값은 정확히 1번 기록됨)
        assert len(write_log) == self.CONCURRENCY
        assert len(set(write_log)) == self.CONCURRENCY, "flock 미적용 시 중복 쓰기 발생"
