#!/usr/bin/env python3
"""utils/session_store.py 테스트 스위트 (TDD — RED → GREEN)"""

from __future__ import annotations

import sqlite3
import sys
import threading
import time
from pathlib import Path

import pytest

sys.path.insert(0, str(Path(__file__).parent.parent.parent))

from utils.session_store import SessionStore, sanitize_title

# ---------------------------------------------------------------------------
# 픽스처
# ---------------------------------------------------------------------------


@pytest.fixture
def store(tmp_path):
    """임시 DB 경로를 사용하는 SessionStore 인스턴스."""
    db_path = str(tmp_path / "test_sessions.db")
    s = SessionStore(db_path=db_path)
    yield s
    s.close()


# ---------------------------------------------------------------------------
# 1. 세션 CRUD
# ---------------------------------------------------------------------------


class TestSessionCRUD:
    """세션 생성 / 조회 / 종료 기본 동작"""

    def test_create_session(self, store):
        """create_session()이 예외 없이 완료된다."""
        store.create_session("sess-001", source="api")

    def test_get_session_returns_created(self, store):
        """create_session() 후 get_session()으로 조회할 수 있다."""
        store.create_session("sess-002", source="cli", model="gpt-4")
        row = store.get_session("sess-002")
        assert row is not None
        assert row["session_id"] == "sess-002"
        assert row["source"] == "cli"
        assert row["model"] == "gpt-4"

    def test_get_session_missing_returns_none(self, store):
        """존재하지 않는 session_id 조회 시 None 반환."""
        assert store.get_session("does-not-exist") is None

    def test_end_session(self, store):
        """end_session()으로 세션이 종료 상태로 변경된다."""
        store.create_session("sess-003", source="api")
        store.end_session("sess-003", end_reason="user_exit")
        row = store.get_session("sess-003")
        assert row["end_reason"] == "user_exit"
        assert row["ended_at"] is not None

    def test_list_sessions_returns_created(self, store):
        """list_sessions()가 생성된 세션을 반환한다."""
        store.create_session("sess-list-1", source="api")
        store.create_session("sess-list-2", source="cli")
        rows = store.list_sessions()
        ids = [r["session_id"] for r in rows]
        assert "sess-list-1" in ids
        assert "sess-list-2" in ids

    def test_list_sessions_filter_by_source(self, store):
        """source 필터가 올바르게 동작한다."""
        store.create_session("src-api-1", source="api")
        store.create_session("src-cli-1", source="cli")
        rows = store.list_sessions(source="api")
        ids = [r["session_id"] for r in rows]
        assert "src-api-1" in ids
        assert "src-cli-1" not in ids

    def test_list_sessions_limit(self, store):
        """limit 파라미터가 반환 개수를 제한한다."""
        for i in range(5):
            store.create_session(f"lim-{i}", source="test")
        rows = store.list_sessions(limit=3)
        assert len(rows) <= 3

    def test_parent_session_id_stored(self, store):
        """parent_session_id가 DB에 올바르게 저장된다."""
        store.create_session("parent-1", source="api")
        store.create_session("child-1", source="api", parent_session_id="parent-1")
        row = store.get_session("child-1")
        assert row["parent_session_id"] == "parent-1"


# ---------------------------------------------------------------------------
# 2. 메시지 추가 / 조회
# ---------------------------------------------------------------------------


class TestMessages:
    """메시지 append / get 동작"""

    def test_append_message(self, store):
        """append_message()가 예외 없이 완료된다."""
        store.create_session("msg-sess-1", source="api")
        store.append_message("msg-sess-1", role="user", content="hello")

    def test_get_messages_returns_appended(self, store):
        """get_messages()가 추가된 메시지를 반환한다."""
        store.create_session("msg-sess-2", source="api")
        store.append_message("msg-sess-2", role="user", content="hi")
        store.append_message("msg-sess-2", role="assistant", content="hello")
        msgs = store.get_messages("msg-sess-2")
        assert len(msgs) == 2
        assert msgs[0]["role"] == "user"
        assert msgs[1]["role"] == "assistant"

    def test_message_with_tool_calls(self, store):
        """tool_calls JSON이 올바르게 저장/조회된다."""
        store.create_session("msg-sess-3", source="api")
        store.append_message(
            "msg-sess-3",
            role="assistant",
            tool_calls=[{"id": "call_1", "type": "function", "function": {"name": "foo"}}],
        )
        msgs = store.get_messages("msg-sess-3")
        assert msgs[0]["tool_calls"] is not None

    def test_message_ordering_by_created_at(self, store):
        """메시지가 추가 순서(created_at)로 반환된다."""
        store.create_session("msg-sess-4", source="api")
        for i in range(3):
            store.append_message("msg-sess-4", role="user", content=str(i))
        msgs = store.get_messages("msg-sess-4")
        contents = [m["content"] for m in msgs]
        assert contents == ["0", "1", "2"]


# ---------------------------------------------------------------------------
# 3. WAL 모드
# ---------------------------------------------------------------------------


class TestWALMode:
    """SQLite WAL 저널 모드 확인"""

    def test_wal_mode_enabled(self, store):
        """DB의 journal_mode가 WAL임을 확인한다."""
        conn = sqlite3.connect(store.db_path)
        mode = conn.execute("PRAGMA journal_mode;").fetchone()[0]
        conn.close()
        assert mode == "wal"


# ---------------------------------------------------------------------------
# 4. FTS5 검색
# ---------------------------------------------------------------------------


class TestFTS5:
    """messages_fts 가상 테이블 기본 동작"""

    def test_fts5_table_exists(self, store):
        """messages_fts 가상 테이블이 생성되어 있다."""
        conn = sqlite3.connect(store.db_path)
        tables = [r[0] for r in conn.execute("SELECT name FROM sqlite_master WHERE type='table'").fetchall()]
        conn.close()
        assert "messages_fts" in tables

    def test_fts5_search_basic(self, store):
        """FTS5 검색이 삽입된 내용을 반환한다."""
        store.create_session("fts-sess-1", source="api")
        store.append_message("fts-sess-1", role="user", content="unique_keyword_xyz")
        conn = sqlite3.connect(store.db_path)
        rows = conn.execute("SELECT content FROM messages_fts WHERE messages_fts MATCH 'unique_keyword_xyz'").fetchall()
        conn.close()
        assert len(rows) >= 1


# ---------------------------------------------------------------------------
# 5. prune_sessions
# ---------------------------------------------------------------------------


class TestPrune:
    """오래된 세션 정리"""

    def test_prune_removes_old_sessions(self, store):
        """older_than_days=0으로 prune하면 ended_at이 있는 세션이 삭제된다."""
        store.create_session("prune-old", source="api")
        store.end_session("prune-old", end_reason="timeout")

        # ended_at을 과거로 강제 업데이트
        conn = sqlite3.connect(store.db_path)
        conn.execute("UPDATE sessions SET ended_at='2000-01-01T00:00:00' WHERE session_id='prune-old'")
        conn.commit()
        conn.close()

        store.prune_sessions(older_than_days=0)
        assert store.get_session("prune-old") is None

    def test_prune_keeps_active_sessions(self, store):
        """ended_at이 없는 (활성) 세션은 prune되지 않는다."""
        store.create_session("prune-active", source="api")
        store.prune_sessions(older_than_days=0)
        assert store.get_session("prune-active") is not None


# ---------------------------------------------------------------------------
# 6. 동시 접근 (threading)
# ---------------------------------------------------------------------------


class TestConcurrency:
    """threading.Lock 동시 쓰기 안전성"""

    def test_concurrent_writes_no_exception(self, store):
        """여러 스레드에서 동시에 세션을 생성해도 예외가 없다."""
        errors = []

        def worker(idx):
            try:
                store.create_session(f"conc-{idx}", source="thread")
                store.append_message(f"conc-{idx}", role="user", content=f"msg-{idx}")
            except Exception as e:
                errors.append(e)

        threads = [threading.Thread(target=worker, args=(i,)) for i in range(10)]
        for t in threads:
            t.start()
        for t in threads:
            t.join()

        assert errors == [], f"Errors in threads: {errors}"
        # 10개 세션이 모두 생성되어야 한다
        rows = store.list_sessions(limit=20)
        conc_ids = [r["session_id"] for r in rows if r["session_id"].startswith("conc-")]
        assert len(conc_ids) == 10


# ---------------------------------------------------------------------------
# 7. sanitize_title
# ---------------------------------------------------------------------------


class TestSanitizeTitle:
    """제어문자 / zero-width 문자 제거"""

    def test_sanitize_removes_control_chars(self):
        """ASCII 제어문자(\x00-\x1f, \x7f)가 제거된다."""
        result = sanitize_title("hello\x00world\x1f!")
        assert "\x00" not in result
        assert "\x1f" not in result
        assert "helloworld!" in result

    def test_sanitize_removes_zero_width_chars(self):
        """Zero-width 문자(U+200B 등)가 제거된다."""
        result = sanitize_title("he\u200bllo")
        assert "\u200b" not in result
        assert "hello" in result

    def test_sanitize_normal_text_unchanged(self):
        """일반 텍스트는 변경되지 않는다."""
        text = "Hello, World! 안녕하세요."
        assert sanitize_title(text) == text
