"""IDS Phase 5 — 모션 카드뉴스 (HTML→MP4) 테스트.

테스트 커버리지:
1. SNS 규격 상수 검증 (3종)
2. 5가지 모션 효과 상수 검증
3. 알 수 없는 효과 ValueError 검증
4. 실제 ffmpeg로 MP4 렌더링 (L1 스모크)
5. 키프레임 추출 (첫/중/끝 3개)
6. OCR 검증 — pytesseract 없이 폴백
7. 큐 enqueue + process 성공
8. 큐 동시성 제한 검증
9. 큐 재시도 동작 검증
10. 큐 타임아웃 → TIMEOUT 상태
11. 외부 네트워크 호출 금지 검증 (§0.5)
12. BGM 라이선스 검증 — 거부 케이스
"""
from __future__ import annotations

import importlib.util
import importlib.machinery
import os
import shutil
import subprocess
import sys
import time
import urllib.request
import warnings
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
from unittest.mock import patch

import pytest
from PIL import Image

_SKILL_DIR = "/home/jay/workspace/skills/motion-cardnews-ko"
_SKILL_MODULE_NAME = "motion_cardnews_ko"


def _load_skill_package() -> object:
    """스킬 패키지 전체를 로드하고 반환합니다."""
    pkg_name = _SKILL_MODULE_NAME

    # 패키지 서브모듈 등록
    skill_dir = Path(_SKILL_DIR)
    for submod in ("sizes", "effects", "frames", "render", "ocr", "bgm"):
        mod_name = f"{pkg_name}.{submod}"
        if mod_name not in sys.modules:
            spec = importlib.util.spec_from_file_location(
                mod_name,
                skill_dir / f"{submod}.py",
            )
            assert spec is not None and spec.loader is not None, f"failed to load {submod} from {skill_dir}"
            mod = importlib.util.module_from_spec(spec)
            mod.__package__ = pkg_name  # type: ignore[assignment]
            sys.modules[mod_name] = mod
            try:
                spec.loader.exec_module(mod)  # type: ignore[union-attr]
            except Exception:
                del sys.modules[mod_name]
                raise

    if pkg_name not in sys.modules:
        init_spec = importlib.util.spec_from_file_location(
            pkg_name,
            skill_dir / "__init__.py",
            submodule_search_locations=[str(skill_dir)],
        )
        assert init_spec is not None and init_spec.loader is not None, f"failed to load {pkg_name} from __init__.py"
        pkg_mod = importlib.util.module_from_spec(init_spec)
        pkg_mod.__package__ = pkg_name  # type: ignore[assignment]
        sys.modules[pkg_name] = pkg_mod
        try:
            init_spec.loader.exec_module(pkg_mod)  # type: ignore[union-attr]
        except Exception:
            del sys.modules[pkg_name]
            raise

    return sys.modules[pkg_name]


def _load_queue_module(module_alias: str = "motion_render_queue") -> object:
    """렌더 큐 스크립트를 동적으로 로드합니다."""
    queue_path = Path("/home/jay/workspace/scripts/motion_render_queue.py")
    if module_alias in sys.modules:
        return sys.modules[module_alias]
    spec = importlib.util.spec_from_file_location(module_alias, str(queue_path))
    assert spec is not None and spec.loader is not None, f"failed to load {module_alias} from {queue_path}"
    mod = importlib.util.module_from_spec(spec)
    sys.modules[module_alias] = mod
    try:
        spec.loader.exec_module(mod)  # type: ignore[union-attr]
    except Exception:
        del sys.modules[module_alias]
        raise
    return mod


# Pre-load the skill package at import time to trigger submodule registration
_load_skill_package()

# Check ffmpeg availability
_ffmpeg_paths = ["/home/jay/.local/bin/ffmpeg", "ffmpeg"]
_FFMPEG_AVAILABLE = (
    Path("/home/jay/.local/bin/ffmpeg").exists()
    or shutil.which("ffmpeg") is not None
)
_FFMPEG_SKIP = pytest.mark.skipif(not _FFMPEG_AVAILABLE, reason="ffmpeg required but not found")


# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------

@pytest.fixture
def small_size() -> tuple[int, int]:
    """테스트용 소형 해상도 (속도 향상)."""
    return (160, 160)


@pytest.fixture
def three_frames(tmp_path: Path, small_size: tuple[int, int]) -> list[Path]:
    """3개의 단색 PIL 프레임을 생성하고 경로 목록을 반환합니다."""
    generate_solid_frame = sys.modules[f"{_SKILL_MODULE_NAME}.frames"].generate_solid_frame  # type: ignore[attr-defined]
    w, h = small_size
    colors = [(220, 50, 50), (50, 200, 50), (50, 50, 220)]
    texts = ["첫번째", "두번째", "세번째"]
    frames: list[Path] = []
    for i, (color, text) in enumerate(zip(colors, texts)):
        out = tmp_path / f"frame_{i}.png"
        generate_solid_frame(w, h, color, text, out)
        frames.append(out)
    return frames


@pytest.fixture(scope="session")
def session_mp4(tmp_path_factory: pytest.TempPathFactory) -> Path:
    """세션 공유 MP4 파일 (test 4용). 5초 분량."""
    if not _FFMPEG_AVAILABLE:
        pytest.skip("ffmpeg not available")

    tmp_path = tmp_path_factory.mktemp("session_mp4")
    generate_solid_frame = sys.modules[f"{_SKILL_MODULE_NAME}.frames"].generate_solid_frame  # type: ignore[attr-defined]
    render_motion = sys.modules[f"{_SKILL_MODULE_NAME}.render"].render_motion  # type: ignore[attr-defined]

    size = (160, 160)
    colors = [(200, 100, 50), (50, 150, 200), (100, 200, 80)]
    texts = ["시작", "중간", "끝"]
    frame_paths: list[Path] = []
    for i, (color, text) in enumerate(zip(colors, texts)):
        out = tmp_path / f"frame_{i}.png"
        generate_solid_frame(size[0], size[1], color, text, out)
        frame_paths.append(out)

    output = tmp_path / "test_output.mp4"
    render_motion(
        frame_paths=frame_paths,
        output_path=output,
        size=size,
        effect="fade",
        fps=10,
        duration_per_frame=1.67,
    )
    return output


# ---------------------------------------------------------------------------
# Tests
# ---------------------------------------------------------------------------

def test_3_sns_sizes_defined() -> None:
    """instagram_reels(1080×1920), twitter(1920×1080), threads(1080×1080) 정의 확인."""
    sizes_mod = sys.modules[f"{_SKILL_MODULE_NAME}.sizes"]
    SIZES = sizes_mod.SIZES  # type: ignore[attr-defined]

    assert "instagram_reels" in SIZES, "instagram_reels가 SIZES에 없습니다."
    assert "twitter" in SIZES, "twitter가 SIZES에 없습니다."
    assert "threads" in SIZES, "threads가 SIZES에 없습니다."

    assert SIZES["instagram_reels"] == (1080, 1920)
    assert SIZES["twitter"] == (1920, 1080)
    assert SIZES["threads"] == (1080, 1080)


def test_5_motion_effects_defined() -> None:
    """fade, slide, zoom, dissolve, sequence 키가 EFFECTS에 존재하는지 확인."""
    effects_mod = sys.modules[f"{_SKILL_MODULE_NAME}.effects"]
    EFFECTS = effects_mod.EFFECTS  # type: ignore[attr-defined]

    for effect in ("fade", "slide", "zoom", "dissolve", "sequence"):
        assert effect in EFFECTS, f"효과 '{effect}'이 EFFECTS에 없습니다."


def test_get_effect_filter_unknown_raises() -> None:
    """알 수 없는 효과 이름에 대해 ValueError가 발생하는지 확인."""
    effects_mod = sys.modules[f"{_SKILL_MODULE_NAME}.effects"]
    get_effect_filter = effects_mod.get_effect_filter  # type: ignore[attr-defined]

    with pytest.raises(ValueError, match="알 수 없는 효과"):
        get_effect_filter("unknown_effect_xyz", fps=30, duration=1.0)


@_FFMPEG_SKIP
def test_render_short_mp4_real_ffmpeg(session_mp4: Path) -> None:
    """실제 ffmpeg로 5초 MP4를 렌더링하고 파일 크기와 지속 시간을 검증합니다."""
    assert session_mp4.exists(), f"MP4 파일이 존재하지 않습니다: {session_mp4}"
    assert session_mp4.stat().st_size >= 1024, f"MP4 파일이 너무 작습니다: {session_mp4.stat().st_size} bytes"

    # ffprobe로 지속 시간 검증
    ffprobe_bin = str(Path("/home/jay/.local/bin/ffmpeg").parent / "ffprobe")
    if not Path(ffprobe_bin).exists():
        ffprobe_bin = shutil.which("ffprobe") or ""

    if ffprobe_bin:
        result = subprocess.run(
            [
                ffprobe_bin, "-v", "error",
                "-show_entries", "format=duration",
                "-of", "default=noprint_wrappers=1:nokey=1",
                str(session_mp4),
            ],
            capture_output=True,
            text=True,
        )
        if result.returncode == 0 and result.stdout.strip():
            duration = float(result.stdout.strip())
            assert 3.0 <= duration <= 8.0, f"MP4 지속 시간이 예상 범위를 벗어났습니다: {duration}초"


@_FFMPEG_SKIP
def test_extract_keyframes_returns_3_paths(session_mp4: Path, tmp_path: Path) -> None:
    """MP4에서 첫/중간/끝 3개의 키프레임 PNG를 추출합니다."""
    ocr_mod = sys.modules[f"{_SKILL_MODULE_NAME}.ocr"]
    extract_keyframes = ocr_mod.extract_keyframes  # type: ignore[attr-defined]

    output_dir = tmp_path / "keyframes"
    first, middle, last = extract_keyframes(session_mp4, output_dir)

    for label, frame_path in [("first", first), ("middle", middle), ("last", last)]:
        assert frame_path.exists(), f"{label} 키프레임 파일이 없습니다: {frame_path}"
        assert frame_path.stat().st_size > 0, f"{label} 키프레임이 비어있습니다"
        assert frame_path.suffix == ".png"


@_FFMPEG_SKIP
def test_ocr_validate_korean_frames_fallback(session_mp4: Path, tmp_path: Path) -> None:
    """pytesseract 없이 validate_korean_frames가 3개 엔트리를 반환하는지 확인."""
    ocr_mod = sys.modules[f"{_SKILL_MODULE_NAME}.ocr"]
    validate_korean_frames = ocr_mod.validate_korean_frames  # type: ignore[attr-defined]

    output_dir = tmp_path / "ocr_out"
    result = validate_korean_frames(
        session_mp4,
        expected_korean_chars=["시작", "중간", "끝"],
        output_dir=output_dir,
    )

    assert isinstance(result, dict), "결과가 dict여야 합니다"
    assert set(result.keys()) == {"first", "middle", "last"}

    for label in ("first", "middle", "last"):
        entry = result[label]
        assert isinstance(entry, dict)
        assert set(entry.keys()) >= {"frame", "ocr_text", "has_expected", "fallback"}
        assert isinstance(entry["has_expected"], bool)
        assert isinstance(entry["fallback"], bool)


def test_queue_enqueue_and_process_success(tmp_path: Path, three_frames: list[Path]) -> None:
    """2개 작업을 큐에 추가하고 처리 후 모두 SUCCESS인지 확인합니다."""
    mq = _load_queue_module("motion_render_queue_t7")
    queue_dir = str(tmp_path / "queue_test")
    os.environ["MOTION_QUEUE_DIR"] = queue_dir

    try:
        job1_id = mq.enqueue({  # type: ignore[attr-defined]
            "frame_paths": [str(p) for p in three_frames],
            "output_path": str(tmp_path / "job1_output.mp4"),
            "size": [160, 160],
            "effect": "fade",
            "fps": 10,
            "duration_per_frame": 0.5,
            "bgm_path": None,
        })
        job2_id = mq.enqueue({  # type: ignore[attr-defined]
            "frame_paths": [str(p) for p in three_frames],
            "output_path": str(tmp_path / "job2_output.mp4"),
            "size": [160, 160],
            "effect": "sequence",
            "fps": 10,
            "duration_per_frame": 0.5,
            "bgm_path": None,
        })

        summary = mq.process_queue(max_concurrent=2, timeout_per_job=60, max_retries=1)  # type: ignore[attr-defined]

        assert summary["total"] == 2
        assert summary["success"] == 2
        assert summary["failed"] == 0
    finally:
        os.environ.pop("MOTION_QUEUE_DIR", None)


def test_queue_concurrency_limit(tmp_path: Path, three_frames: list[Path], monkeypatch: pytest.MonkeyPatch) -> None:
    """ThreadPoolExecutor가 max_concurrent=3으로 호출되는지 확인합니다."""
    mq = _load_queue_module("motion_render_queue_t8")
    queue_dir = str(tmp_path / "queue_concurrency")
    os.environ["MOTION_QUEUE_DIR"] = queue_dir

    try:
        # SpyExecutor: max_workers 인자를 기록하는 래퍼
        class SpyExecutor:
            _calls: list[int] = []

            def __init__(self, max_workers: int = 1, **kwargs: object) -> None:
                SpyExecutor._calls.append(max_workers)
                self._real = ThreadPoolExecutor(max_workers=max_workers)

            def __enter__(self) -> "SpyExecutor":
                self._real.__enter__()
                return self

            def __exit__(self, *args: object) -> None:
                self._real.__exit__(*args)

            def submit(self, fn: object, *args: object, **kwargs: object) -> object:  # type: ignore[override]
                return self._real.submit(fn, *args, **kwargs)  # type: ignore[arg-type]

        SpyExecutor._calls = []

        # _execute_job을 빠른 성공으로 모킹
        def fast_execute(job_data: dict) -> dict:  # type: ignore[type-arg]
            job_data["status"] = "SUCCESS"
            job_data["completed_at"] = time.time()
            return job_data

        mq.enqueue({  # type: ignore[attr-defined]
            "frame_paths": [str(p) for p in three_frames],
            "output_path": str(tmp_path / "concurrency_dummy.mp4"),
            "size": [160, 160],
            "effect": "fade",
            "fps": 10,
            "duration_per_frame": 0.5,
            "bgm_path": None,
        })

        with patch.object(mq, "ThreadPoolExecutor", SpyExecutor):
            with patch.object(mq, "_execute_job", side_effect=fast_execute):
                mq.process_queue(max_concurrent=3, timeout_per_job=30, max_retries=0)  # type: ignore[attr-defined]

        # max_workers=3으로 호출됐는지 확인
        assert any(w == 3 for w in SpyExecutor._calls), f"max_workers=3으로 호출되지 않았습니다. 실제: {SpyExecutor._calls}"

    finally:
        os.environ.pop("MOTION_QUEUE_DIR", None)


def test_queue_retry_on_failure(tmp_path: Path, three_frames: list[Path]) -> None:
    """render_motion이 실패하면 재시도하고 retry_total이 증가하는지 확인합니다."""
    mq = _load_queue_module("motion_render_queue_t9")
    queue_dir = str(tmp_path / "queue_retry")
    os.environ["MOTION_QUEUE_DIR"] = queue_dir

    try:
        mq.enqueue({  # type: ignore[attr-defined]
            "frame_paths": [str(p) for p in three_frames],
            "output_path": str(tmp_path / "retry_output.mp4"),
            "size": [160, 160],
            "effect": "fade",
            "fps": 10,
            "duration_per_frame": 0.5,
            "bgm_path": None,
        })

        attempt_count = {"n": 0}

        def patched_execute_job(job_data: dict) -> dict:  # type: ignore[type-arg]
            attempt_count["n"] += 1
            if attempt_count["n"] <= 1:
                raise RuntimeError(f"의도적 실패 (시도 {attempt_count['n']})")
            job_data["status"] = "SUCCESS"
            job_data["completed_at"] = time.time()
            return job_data

        with patch.object(mq, "_execute_job", side_effect=patched_execute_job):
            summary = mq.process_queue(max_concurrent=1, timeout_per_job=60, max_retries=2)  # type: ignore[attr-defined]

        assert summary.get("retry_total", 0) > 0, f"재시도가 기록되지 않았습니다: {summary}"

    finally:
        os.environ.pop("MOTION_QUEUE_DIR", None)


def test_queue_timeout_marks_failed(tmp_path: Path, three_frames: list[Path]) -> None:
    """render가 타임아웃을 초과하면 TIMEOUT 또는 FAILED 상태가 기록됩니다."""
    mq = _load_queue_module("motion_render_queue_t10")
    queue_dir = str(tmp_path / "queue_timeout")
    os.environ["MOTION_QUEUE_DIR"] = queue_dir

    try:
        mq.enqueue({  # type: ignore[attr-defined]
            "frame_paths": [str(p) for p in three_frames],
            "output_path": str(tmp_path / "timeout_output.mp4"),
            "size": [160, 160],
            "effect": "fade",
            "fps": 10,
            "duration_per_frame": 0.5,
            "bgm_path": None,
        })

        def slow_execute(job_data: dict) -> dict:  # type: ignore[type-arg]
            time.sleep(10)
            return job_data

        with patch.object(mq, "_execute_job", side_effect=slow_execute):
            summary = mq.process_queue(max_concurrent=1, timeout_per_job=2, max_retries=0)  # type: ignore[attr-defined]

        assert (summary.get("timeout", 0) + summary.get("failed", 0)) > 0, \
            f"타임아웃/실패가 기록되지 않았습니다: {summary}"

    finally:
        os.environ.pop("MOTION_QUEUE_DIR", None)


def test_no_external_api_direct_calls(tmp_path: Path, three_frames: list[Path]) -> None:
    """렌더링 및 큐 처리 중 외부 HTTP API 호출이 없는지 확인합니다 (§0.5).

    urllib.request.urlopen 과 requests.get 을 모킹하여 외부 호출 여부를 감지합니다.
    socket.create_connection은 ffmpeg 서브프로세스와의 충돌을 피하기 위해 모킹하지 않습니다.
    """
    external_calls: list[bool] = []

    def mock_urlopen(*args: object, **kwargs: object) -> None:  # type: ignore[return]
        external_calls.append(True)
        raise AssertionError("외부 네트워크 호출이 금지되어 있습니다 (urllib.request.urlopen)")

    mq = _load_queue_module("motion_render_queue_t11")
    queue_dir = str(tmp_path / "queue_no_net")
    os.environ["MOTION_QUEUE_DIR"] = queue_dir

    try:
        mq.enqueue({  # type: ignore[attr-defined]
            "frame_paths": [str(p) for p in three_frames],
            "output_path": str(tmp_path / "no_net_output.mp4"),
            "size": [160, 160],
            "effect": "fade",
            "fps": 10,
            "duration_per_frame": 0.5,
            "bgm_path": None,
        })

        with patch("urllib.request.urlopen", side_effect=mock_urlopen):
            try:
                import requests  # type: ignore[import-not-found]
                with patch.object(requests, "get", side_effect=mock_urlopen):
                    summary = mq.process_queue(max_concurrent=1, timeout_per_job=60, max_retries=0)  # type: ignore[attr-defined]
            except ImportError:
                summary = mq.process_queue(max_concurrent=1, timeout_per_job=60, max_retries=0)  # type: ignore[attr-defined]

        assert not external_calls, "외부 네트워크 호출이 감지되었습니다!"
        assert summary.get("success", 0) > 0, f"렌더링 성공이 기대되었으나 실패했습니다: {summary}"

    finally:
        os.environ.pop("MOTION_QUEUE_DIR", None)


def test_bgm_license_validation_rejects_unknown() -> None:
    """알 수 없는 트랙 ID와 허용되지 않은 라이선스에 대해 ValueError가 발생합니다."""
    bgm_mod = sys.modules[f"{_SKILL_MODULE_NAME}.bgm"]
    validate_license = bgm_mod.validate_license  # type: ignore[attr-defined]
    BGM_LIBRARY = bgm_mod.BGM_LIBRARY  # type: ignore[attr-defined]

    # 존재하지 않는 트랙 ID
    with pytest.raises(ValueError, match="알 수 없는 트랙"):
        validate_license("nonexistent_track_xyz")

    # 허용되지 않는 라이선스 추가 후 검증
    BGM_LIBRARY["test_bad_license_track"] = {
        "name": "Bad License Track",
        "source": "unknown",
        "license": "PROPRIETARY",
        "url": "https://placeholder.example/bad",
        "filename": "bad_license.mp3",
    }

    try:
        with pytest.raises(ValueError, match="허용되지 않은 라이선스"):
            validate_license("test_bad_license_track")
    finally:
        # 정리
        if "test_bad_license_track" in list(BGM_LIBRARY.keys()):
            del BGM_LIBRARY["test_bad_license_track"]
