from __future__ import annotations

"""knowledge_extractor_v2.py — 카카오톡 인사이트 추출 고도화 Phase 1

다층 LLM 파이프라인:
  Stage 1 (Haiku)  : 인사이트 여부 빠른 판별 + 유형 분류
  Stage 2 (Sonnet) : 심층 인사이트 추출
  Fallback         : LLM 없을 때 규칙 기반 처리
"""

import dataclasses
import gc
import json
import logging
import os
import re
import subprocess
import time
from datetime import datetime
from pathlib import Path
from typing import Optional

from .knowledge_extractor import (_EXCLUDED_TYPES, _RE_QUESTION_PATTERN,
                                  _THREAD_GAP_MINUTES, _extract_keywords,
                                  _is_noise_message, _mask_phone)
from .models import ChatMessage
from .models_v2 import (ALL_CATEGORIES, BatchProgress, InsightType, InsightV2,
                        Stage1Result, ThreadV2)

logger = logging.getLogger(__name__)

_recent_logs: list[str] = []

# ---------------------------------------------------------------------------
# 카테고리 키워드 매핑 (규칙 기반용)
# ---------------------------------------------------------------------------

_CATEGORY_KEYWORDS_V2: dict[str, list[str]] = {
    "보상/자동차": ["자동차", "교통사고", "차량", "렌트", "대물", "대인", "자차"],
    "보상/일반": ["보상", "보험금", "청구", "지급", "실비", "입원", "통원"],
    "보상/장기": ["장기보험", "종신", "연금"],
    "고지의무": ["고지", "알릴의무", "고지의무", "계약전알릴의무"],
    "약관해석": ["약관", "조항", "해석", "특약", "면책", "보장"],
    "상품비교": ["상품", "비교", "추천", "어디", "어떤보험", "태아보험"],
    "언더라이팅": ["심사", "인수", "거절", "유보", "부담보"],
    "민원처리": ["민원", "분쟁", "금감원", "소비자보호"],
    "손해사정": ["손해사정", "손사", "사정", "조사"],
    "의학지식": ["진단", "수술", "질병코드", "의학", "병원", "치료"],
    "세금/절세": ["세금", "절세", "연말정산", "세액공제"],
    "법률/판례": ["판례", "법원", "소송", "법률", "판결"],
    "영업노하우": ["영업", "모집", "설계", "제안", "리크루팅"],
    "고객관리": ["고객", "CS", "유지", "계약관리"],
    "GA운영": ["GA", "대리점", "지점", "점장"],
    "수수료체계": ["수수료", "인센티브", "성과급", "FP"],
    "디지털/IT": ["앱", "디지털", "온라인", "플랫폼"],
    "기타": [],
}

# ---------------------------------------------------------------------------
# 인사이트 유형 감지 키워드 (규칙 기반용)
# ---------------------------------------------------------------------------

_INSIGHT_TYPE_PATTERNS: dict[str, list[str]] = {
    "qa": ["질문", "궁금", "문의", "여쭤", "확인부탁", "알고싶", "어떻게"],
    "expert_opinion": ["제 경험", "실무에서", "예전에", "제 생각에", "오래 해보니"],
    "case_analysis": ["사례", "사고", "보상받", "실제로", "경험담", "케이스"],
    "regulation_interpretation": ["약관", "조항", "해석", "적용", "적용범위"],
    "practical_tip": ["팁", "노하우", "방법", "이렇게 하면", "추천"],
    "regulation_change": ["개정", "변경", "시행", "신규", "적용일"],
    "warning": ["주의", "조심", "위험", "금지", "절대", "하지마"],
}

# ---------------------------------------------------------------------------
# Stage 1 Haiku 필터 프롬프트
# ---------------------------------------------------------------------------

_STAGE1_PROMPT_TEMPLATE = """당신은 보험 전문가 채팅 분석기입니다.
아래 대화 스레드를 분석하여 보험 실무 인사이트가 포함되어 있는지 판별하세요.

인사이트 유형 7가지:
1. qa — 질문 + 전문가 답변
2. expert_opinion — "제 경험상...", "실무에서는..." 같은 경험 공유
3. case_analysis — 실제 보험 사고/보상 사례 공유
4. regulation_interpretation — 약관 조항에 대한 해석/논쟁
5. practical_tip — 설계사 영업/업무 관련 노하우
6. regulation_change — 법/규정/고시 변경 정보
7. warning — "이런 건 주의하세요" 유형

JSON 응답:
{{
  "has_insight": true/false,
  "insight_types": ["qa", "expert_opinion"],
  "noise_reason": ""
}}

대화 스레드:
{thread_text}"""

# ---------------------------------------------------------------------------
# Stage 2 Sonnet 추출 프롬프트
# ---------------------------------------------------------------------------

_STAGE2_PROMPT_TEMPLATE = """당신은 보험 도메인 지식 정제 전문가입니다.
아래 대화 스레드에서 보험 실무 인사이트를 추출하세요.

## 인사이트 유형: {insight_types}

## 카테고리 목록
보상/자동차, 보상/일반, 보상/장기, 고지의무, 약관해석, 상품비교,
언더라이팅, 민원처리, 손해사정, 의학지식, 세금/절세, 법률/판례,
영업노하우, 고객관리, GA운영, 수수료체계, 디지털/IT, 기타

## 출력 형식 (JSON)
{{
  "title": "핵심 주제 50자 이내",
  "type": "qa|expert_opinion|case_analysis|regulation_interpretation|practical_tip|regulation_change|warning",
  "category": "카테고리 목록 중 하나",
  "summary": "핵심 내용 200자 이내 요약",
  "key_points": ["핵심 포인트 1", "핵심 포인트 2"],
  "expert": "가장 전문적 답변을 한 사용자 이름",
  "confidence": "high|medium|low",
  "related_topics": ["관련", "키워드"],
  "tags": ["#태그1", "#태그2"],
  "question": "Q&A 유형일 때 질문 요약",
  "answer": "Q&A 유형일 때 답변 요약"
}}

대화 스레드:
{thread_text}"""


# ---------------------------------------------------------------------------
# 내부 헬퍼
# ---------------------------------------------------------------------------


def _thread_to_dict(thread: ThreadV2) -> dict:  # type: ignore[type-arg]
    """ThreadV2(dataclass) → 직렬화 가능한 dict로 변환한다.

    messages 필드는 Pydantic ChatMessage이므로 model_dump()로 변환.
    나머지 필드는 dataclasses.asdict()의 기본 변환 사용.
    """
    d = dataclasses.asdict(thread)
    d["messages"] = [m.model_dump() for m in thread.messages]
    return d


def _threads_from_dicts(data: list[dict]) -> list[ThreadV2]:  # type: ignore[type-arg]
    """dict 리스트 → ThreadV2 리스트로 역직렬화한다."""
    threads: list[ThreadV2] = []
    for item in data:
        msgs = [ChatMessage(**m) for m in item.get("messages", [])]
        threads.append(
            ThreadV2(
                messages=msgs,
                start_time=item.get("start_time", ""),
                topic_label=item.get("topic_label", ""),
                has_insight=item.get("has_insight"),
                insight_types=item.get("insight_types", []),
            )
        )
    return threads


def _parse_datetime_v2(date: str, time_str: str) -> Optional[datetime]:
    """날짜 + 시간 문자열을 datetime으로 변환한다."""
    if not date or not time_str:
        return None
    try:
        return datetime.strptime(f"{date} {time_str}", "%Y-%m-%d %H:%M")
    except ValueError:
        return None


def _thread_to_text(thread: ThreadV2) -> str:
    """ThreadV2 → 전화번호 마스킹된 텍스트 변환."""
    return "\n".join(f"[{m.user}] {_mask_phone(m.content)}" for m in thread.messages)


def _parse_json_response(raw_text: str) -> dict:  # type: ignore[type-arg]
    """LLM 응답에서 JSON을 파싱한다. 코드블록 제거 및 JSON 객체 추출."""
    text = raw_text.strip()
    # ```json ... ``` 또는 ``` ... ``` 블록 처리
    code_block = re.search(r"```(?:json)?\s*\n(.*?)\n\s*```", text, re.DOTALL)
    if code_block:
        text = code_block.group(1).strip()
    # 1차 시도: 전체 텍스트 파싱
    try:
        return json.loads(text)
    except json.JSONDecodeError:
        pass
    # 2차 시도: 첫 번째 JSON 객체 { ... } 추출
    brace_start = text.find("{")
    if brace_start >= 0:
        depth = 0
        for i in range(brace_start, len(text)):
            if text[i] == "{":
                depth += 1
            elif text[i] == "}":
                depth -= 1
                if depth == 0:
                    return json.loads(text[brace_start : i + 1])
    raise json.JSONDecodeError("JSON 객체를 찾을 수 없음", text, 0)


def _call_claude(prompt: str, model: str = "haiku", timeout: int = 120) -> str:
    """내부 Claude CLI로 LLM 호출."""
    model_map = {
        "haiku": "haiku",
        "sonnet": "sonnet",
    }
    model_arg = model_map.get(model, model)
    result = subprocess.run(
        ["/home/jay/.local/bin/claude", "-p", prompt, "--model", model_arg],
        capture_output=True,
        text=True,
        timeout=timeout,
        cwd="/tmp",
        env={**os.environ, "CLAUDE_CODE_DISABLE_NONESSENTIAL_TRAFFIC": "1"},
    )
    if result.returncode != 0:
        raise RuntimeError(f"Claude CLI 오류: {result.stderr[:200]}")
    return result.stdout.strip()


def _detect_category_v2(text: str) -> str:
    """텍스트에서 v2 카테고리를 감지한다 (키워드 매핑 기반)."""
    for category, keywords in _CATEGORY_KEYWORDS_V2.items():
        if category == "기타":
            continue
        for kw in keywords:
            if kw in text:
                return category
    return "기타"


def _detect_insight_types(text: str) -> list[str]:
    """텍스트에서 인사이트 유형을 감지한다 (키워드 기반, 매칭 수 기준 정렬)."""
    scored: dict[str, int] = {}
    for itype, keywords in _INSIGHT_TYPE_PATTERNS.items():
        match_count = sum(1 for kw in keywords if kw in text)
        if match_count > 0:
            scored[itype] = match_count
    if not scored:
        return ["qa"]
    # 매칭 키워드 수 내림차순 정렬, 동점이면 "qa"를 뒤로
    sorted_types = sorted(
        scored.keys(),
        key=lambda t: (scored[t], 0 if t == "qa" else 1),
        reverse=True,
    )
    return sorted_types


def _resolve_insight_type(type_str: str) -> InsightType:
    """문자열을 InsightType 열거형으로 변환. 실패 시 QA 반환."""
    try:
        return InsightType(type_str)
    except ValueError:
        return InsightType.QA


def _save_batch(output_dir: str, batch_num: int, results: list[dict]) -> None:  # type: ignore[type-arg]
    """중간 결과를 JSON 파일로 저장한다."""
    path = Path(output_dir) / f"batch_{batch_num:03d}.json"
    path.parent.mkdir(parents=True, exist_ok=True)
    with path.open("w", encoding="utf-8") as f:
        json.dump(results, f, ensure_ascii=False, indent=2)
    logger.info("배치 %d 중간 저장 완료: %s (%d건)", batch_num, path, len(results))


def _cleanup_checkpoints(output_dir: str) -> None:
    """정제 완료 후 체크포인트 파일을 정리한다."""
    for name in ("checkpoint_threads.json", "checkpoint_refined_threads.json"):
        cp = Path(output_dir) / name
        if cp.exists():
            try:
                cp.unlink()
                _add_log(f"체크포인트 정리: {name} 삭제")
            except OSError as exc:
                logger.warning("체크포인트 삭제 실패 (%s): %s", name, exc)


# ---------------------------------------------------------------------------
# 1. 스레드 분리 (_split_threads_v2)
# ---------------------------------------------------------------------------


def _split_threads_v2(
    messages: list[ChatMessage],
    use_llm: bool = False,
    progress_file: str | None = None,
    output_dir: str | None = None,
) -> list[ThreadV2]:
    """개선된 스레드 분리.

    기존 규칙 (15분 gap, 날짜 변경, #궁금증 태그, 질문 패턴) +
    use_llm=True 시 메시지 20개씩 Haiku로 주제 연속성 판별.
    """
    # 입퇴장/미디어 등 제외 + 노이즈 메시지 제외
    filtered = [
        m
        for m in messages
        if m.type not in _EXCLUDED_TYPES and not _is_noise_message(m)
    ]
    if not filtered:
        return []

    threads: list[ThreadV2] = []
    current: ThreadV2 = ThreadV2()

    msg_idx = 0
    for msg in filtered:
        msg_idx += 1
        if progress_file and msg_idx % 20 == 0:
            pct = int((msg_idx / len(filtered)) * 5)  # 0~5% 구간
            _add_log(f"스레드 분리 중... ({msg_idx}/{len(filtered)} 메시지)")
            _write_progress(
                progress_file,
                {
                    "status": "running",
                    "progress": pct,
                    "currentStep": f"스레드 분리 중 ({msg_idx}/{len(filtered)} 메시지)",
                    "processedThreads": 0,
                    "totalThreads": 0,
                    "insightsFound": 0,
                    "noiseFiltered": 0,
                    "errors": 0,
                },
            )

        if not current.messages:
            current.messages.append(msg)
            current.start_time = f"{msg.date} {msg.time}"
            continue

        prev_msg = current.messages[-1]
        should_split = False

        # 규칙 1: #궁금증 태그 → 무조건 새 스레드
        if "#궁금증" in msg.content:
            should_split = True
        # 규칙 2: 질문 패턴 → 새 스레드
        elif _RE_QUESTION_PATTERN.search(msg.content):
            should_split = True
        # 규칙 3: 날짜 변경 → 새 스레드
        elif msg.date != prev_msg.date:
            should_split = True
        # 규칙 4: 15분 gap → 새 스레드
        else:
            prev_dt = _parse_datetime_v2(prev_msg.date, prev_msg.time)
            curr_dt = _parse_datetime_v2(msg.date, msg.time)
            if prev_dt and curr_dt:
                gap_minutes = (curr_dt - prev_dt).total_seconds() / 60
                if gap_minutes >= _THREAD_GAP_MINUTES:
                    should_split = True

        if should_split:
            if current.messages:
                threads.append(current)
            current = ThreadV2()
            current.messages.append(msg)
            current.start_time = f"{msg.date} {msg.time}"
        else:
            current.messages.append(msg)

    if current.messages:
        threads.append(current)

    # 최소 2개 이상 실질적 메시지 있는 스레드만 유지
    threads = [t for t in threads if len(t.messages) >= 2]

    # LLM 강화: 메시지 20개씩 묶어서 주제 연속성 판별
    if use_llm:
        if progress_file:
            _add_log(f"LLM 스레드 정밀 분리 시작 ({len(threads)}개 스레드)")
            _write_progress(
                progress_file,
                {
                    "status": "running",
                    "progress": 5,
                    "currentStep": f"LLM 스레드 정밀 분리 중 ({len(threads)}개 스레드)",
                    "processedThreads": 0,
                    "totalThreads": 0,
                    "insightsFound": 0,
                    "noiseFiltered": 0,
                    "errors": 0,
                },
            )
        if output_dir:
            checkpoint_path = Path(output_dir) / "checkpoint_threads.json"
            checkpoint_data = [_thread_to_dict(t) for t in threads]
            checkpoint_path.write_text(
                json.dumps(checkpoint_data, ensure_ascii=False, indent=2)
            )
            _add_log(f"스레드 분리 체크포인트 저장: {len(threads)}개 스레드")
        threads = _llm_refine_thread_splits(threads, progress_file=progress_file)
        if output_dir:
            checkpoint_path = Path(output_dir) / "checkpoint_refined_threads.json"
            checkpoint_data = [_thread_to_dict(t) for t in threads]
            checkpoint_path.write_text(
                json.dumps(checkpoint_data, ensure_ascii=False, indent=2)
            )
            _add_log(f"LLM 정밀 분리 체크포인트 저장: {len(threads)}개 스레드")

    if progress_file:
        _add_log(f"스레드 분리 완료: {len(threads)}개 스레드")
        _write_progress(
            progress_file,
            {
                "status": "running",
                "progress": 10,
                "currentStep": f"스레드 분리 완료 ({len(threads)}개)",
                "processedThreads": 0,
                "totalThreads": len(threads),
                "insightsFound": 0,
                "noiseFiltered": 0,
                "errors": 0,
            },
        )

    logger.info("총 %d개 스레드 분리됨", len(threads))
    return threads


def _split_thread_at(thread: ThreadV2, split_indices: list[int]) -> list[ThreadV2]:
    """split_at 인덱스 배열에 따라 스레드를 분할한다."""
    sub_threads: list[ThreadV2] = []
    prev_idx = 0
    for idx in sorted(split_indices):
        if prev_idx < idx < len(thread.messages):
            sub = ThreadV2()
            sub.messages = thread.messages[prev_idx:idx]
            if prev_idx == 0:
                sub.start_time = thread.start_time
            elif sub.messages:
                sub.start_time = f"{sub.messages[0].date} {sub.messages[0].time}"
            else:
                sub.start_time = thread.start_time
            sub_threads.append(sub)
            prev_idx = idx
    sub = ThreadV2()
    sub.messages = thread.messages[prev_idx:]
    if sub.messages:
        if prev_idx == 0 and not sub_threads:
            sub.start_time = thread.start_time
        else:
            sub.start_time = f"{sub.messages[0].date} {sub.messages[0].time}"
        sub_threads.append(sub)
    return sub_threads


def _is_noise_thread(thread: ThreadV2) -> bool:
    """메시지 3개 미만이면서 보험 키워드가 없는 스레드를 노이즈로 판별한다."""
    if len(thread.messages) >= 3:
        return False
    full_text = " ".join(m.content for m in thread.messages)
    all_domain_keywords = [kw for kws in _CATEGORY_KEYWORDS_V2.values() for kw in kws]
    matched = [kw for kw in all_domain_keywords if kw in full_text]
    return len(matched) == 0


def _is_non_question_thread(thread: ThreadV2) -> bool:
    """인사/일기/감사 등 비질문만으로 구성된 스레드를 필터링한다.

    모든 메시지가 noise(_is_noise_message)이면 True.
    """
    if not thread.messages:
        return True
    return all(_is_noise_message(m) for m in thread.messages)


def _llm_refine_thread_splits(
    threads: list[ThreadV2],
    progress_file: str | None = None,
) -> list[ThreadV2]:
    """Haiku를 사용하여 스레드 경계를 정밀 조정한다.

    연속된 스레드를 20개씩 묶어서 각 스레드가 이전 대화의 연속인지
    새 주제인지 판별(merge_with_prev)하고, 스레드 내부 주제 전환 지점
    (split_at)도 함께 처리한다. 노이즈 스레드(메시지 3개 미만 + 키워드 없음)는
    최종 결과에서 제거한다. merge_with_prev 배열 형식도 하위 호환 지원한다.
    """
    if not threads:
        return threads

    refine_prompt = """아래는 규칙 기반으로 분리된 대화 스레드입니다.
각 스레드에 대해 두 가지를 판별하세요:

1. 이전 스레드와 같은 주제의 연속인가? → merge_with_prev
2. 이 스레드 내부에 주제 전환이 있는가? → split_at (메시지 인덱스 리스트)
   - 질문-답변이 끝난 후 새 질문이 시작되는 지점
   - 잡담/인사가 섞인 구간 이후 새 주제 시작 지점

스레드 목록:
{threads_text}

JSON 응답:
{{
  "threads": [
    {{"merge_with_prev": false, "split_at": []}},
    {{"merge_with_prev": true, "split_at": [5, 12]}},
    ...
  ]
}}
- 첫 번째 스레드는 merge_with_prev가 항상 false
- split_at이 [5, 12]이면 메시지 0~4, 5~11, 12~끝 으로 3개 스레드로 분리
"""

    chunk_size = 20
    refined: list[ThreadV2] = []

    for chunk_start in range(0, len(threads), chunk_size):
        chunk_end = min(chunk_start + chunk_size, len(threads))
        logger.info(
            "LLM 정밀 분리: 스레드 %d~%d / %d 처리 중",
            chunk_start + 1,
            chunk_end,
            len(threads),
        )
        chunk_idx = chunk_start // chunk_size + 1
        total_chunks = (len(threads) + chunk_size - 1) // chunk_size
        _add_log(
            f"LLM 정밀 분리 chunk {chunk_idx}/{total_chunks} 시작 ({chunk_start + 1}~{chunk_end}/{len(threads)} 스레드)"
        )
        if progress_file:
            pct = 5 + int((chunk_start / len(threads)) * 5)  # 5~10% 구간
            _write_progress(
                progress_file,
                {
                    "status": "running",
                    "progress": pct,
                    "currentStep": f"LLM 정밀 분리 중 ({chunk_start + 1}~{chunk_end}/{len(threads)} 스레드)",
                    "processedThreads": 0,
                    "totalThreads": len(threads),
                    "insightsFound": 0,
                    "noiseFiltered": 0,
                    "errors": 0,
                },
            )
        chunk = threads[chunk_start : chunk_start + chunk_size]

        # 각 스레드의 메시지를 요약하여 프롬프트 구성
        threads_text_parts: list[str] = []
        for i, t in enumerate(chunk):
            msgs = t.messages
            if len(msgs) <= 10:
                preview_msgs = msgs
            else:
                preview_msgs = list(msgs[:3]) + list(msgs[-3:])
            lines = []
            for m in preview_msgs:
                truncated = m.content[:100].replace("\n", " ")
                lines.append(f"  [{m.user}] {truncated}")
            if len(msgs) > 10:
                lines.insert(3, f"  ... ({len(msgs) - 6}개 생략)")
            preview = "\n".join(lines)
            threads_text_parts.append(f"[스레드 {i}] (메시지 {len(msgs)}개)\n{preview}")
        threads_text = "\n\n".join(threads_text_parts)

        prompt = refine_prompt.format(threads_text=threads_text)

        try:
            raw_text = _call_claude(prompt, model="haiku", timeout=120)
            parsed = _parse_json_response(raw_text)
            thread_decisions: list[dict] = parsed.get("threads", [])
            if not thread_decisions:
                merge_flags = parsed.get("merge_with_prev", [False] * len(chunk))
                thread_decisions = [
                    {"merge_with_prev": mf, "split_at": []} for mf in merge_flags
                ]
        except Exception as exc:
            logger.warning(
                "스레드 분리 LLM 정밀화 실패 (chunk %d~%d): %s(%s) — 규칙 기반 결과 사용",
                chunk_start + 1,
                chunk_end,
                type(exc).__name__,
                exc,
            )
            _add_log(
                f"⚠ LLM 분리 chunk {chunk_start // chunk_size + 1} 오류: {type(exc).__name__}: {exc}"
            )
            thread_decisions = [
                {"merge_with_prev": False, "split_at": []} for _ in chunk
            ]

        for i, thread in enumerate(chunk):
            decision = (
                thread_decisions[i]
                if i < len(thread_decisions)
                else {"merge_with_prev": False, "split_at": []}
            )
            should_merge = bool(decision.get("merge_with_prev", False))
            split_indices: list[int] = list(decision.get("split_at", []))

            if i == 0 or not should_merge or not refined:
                if split_indices:
                    sub_threads = _split_thread_at(thread, split_indices)
                    refined.extend(sub_threads)
                else:
                    refined.append(thread)
            else:
                prev = refined[-1]
                prev.messages.extend(thread.messages)
                if split_indices:
                    combined_msgs = prev.messages
                    refined.pop()
                    combined_thread = ThreadV2()
                    combined_thread.messages = combined_msgs
                    combined_thread.start_time = prev.start_time
                    sub_threads = _split_thread_at(combined_thread, split_indices)
                    refined.extend(sub_threads)
                logger.debug(
                    "스레드 병합: chunk[%d] → chunk[%d]",
                    chunk_start + i,
                    chunk_start + i - 1,
                )
        _add_log(
            f"LLM 정밀 분리 chunk {chunk_start // chunk_size + 1}/{(len(threads) + chunk_size - 1) // chunk_size} 완료"
        )

    _add_log(f"✅ LLM 정밀 분리 완료 ({len(threads)}개 스레드)")
    # 노이즈 스레드 필터링: 메시지 3개 미만 + 키워드 매칭 0
    refined = [t for t in refined if not _is_noise_thread(t)]

    return refined


# ---------------------------------------------------------------------------
# 2. Stage 1 Haiku 필터 (_stage1_filter)
# ---------------------------------------------------------------------------


def _stage1_filter(
    threads: list[ThreadV2],
) -> list[Stage1Result]:
    """Haiku로 인사이트 여부 판별 (배치 20개씩).

    Returns:
        Stage1Result 리스트 (has_insight=True인 항목만 Stage 2로 진행)
    """
    results: list[Stage1Result] = []

    batch_size = 20
    for batch_start in range(0, len(threads), batch_size):
        batch = threads[batch_start : batch_start + batch_size]
        logger.info(
            "Stage 1 배치 처리: %d~%d / %d",
            batch_start + 1,
            batch_start + len(batch),
            len(threads),
        )

        for idx_in_batch, thread in enumerate(batch, 1):
            result = _stage1_single(thread)
            results.append(result)
            # 5개마다 또는 마지막 스레드에서 진행 로그
            if idx_in_batch % 5 == 0 or idx_in_batch == len(batch):
                batch_num = batch_start // batch_size + 1
                _add_log(f"배치 {batch_num}: {idx_in_batch}/{len(batch)} 스레드 처리됨")

        # 배치 간 rate limit 완화 (마지막 배치 제외)
        if batch_start + batch_size < len(threads):
            time.sleep(0.5)

    return results


def _stage1_single(thread: ThreadV2) -> Stage1Result:
    """단일 스레드에 대해 Stage 1 필터를 수행한다."""
    thread_text = _thread_to_text(thread)
    prompt = _STAGE1_PROMPT_TEMPLATE.format(thread_text=thread_text)

    try:
        raw_text = _call_claude(prompt, model="haiku", timeout=120)
        parsed = _parse_json_response(raw_text)

        has_insight: bool = bool(parsed.get("has_insight", False))
        insight_types: list[str] = list(parsed.get("insight_types", []))
        noise_reason: str = str(parsed.get("noise_reason", ""))

        thread.has_insight = has_insight
        thread.insight_types = insight_types

        return Stage1Result(
            thread=thread,
            has_insight=has_insight,
            insight_types=insight_types,
            noise_reason=noise_reason,
        )

    except Exception as exc:
        logger.warning("Stage 1 LLM 실패: %s — 규칙 기반 필터로 전환", exc)
        # 규칙 기반 폴백
        rule_result = _rule_based_filter_single(thread)
        thread.has_insight = rule_result.has_insight
        thread.insight_types = rule_result.insight_types
        return rule_result


# ---------------------------------------------------------------------------
# 3. Stage 2 Sonnet 추출 (_stage2_extract)
# ---------------------------------------------------------------------------


def _stage2_extract(
    stage1_results: list[Stage1Result],
    source_chat: str,
    start_index: int = 1,
) -> list[dict]:  # type: ignore[type-arg]
    """Sonnet으로 심층 인사이트 추출.

    Stage 1에서 has_insight=True로 판정된 스레드만 처리.
    """
    insight_candidates = [r for r in stage1_results if r.has_insight]
    logger.info(
        "Stage 2 진입: %d개 스레드 (Stage 1 필터 통과)",
        len(insight_candidates),
    )

    extracted: list[dict] = []  # type: ignore[type-arg]

    for i, s1 in enumerate(insight_candidates, start=start_index):
        entry = _stage2_single(s1, i, source_chat)
        if entry is not None:
            extracted.append(entry)
        # 5개마다 또는 마지막에서 진행 로그
        done_count = i - start_index + 1
        if done_count % 5 == 0 or done_count == len(insight_candidates):
            _add_log(
                f"Stage 2: {done_count}/{len(insight_candidates)} 스레드 추출 완료"
            )

    return extracted


def _stage2_single(
    s1: Stage1Result,
    index: int,
    source_chat: str,
) -> Optional[dict]:  # type: ignore[type-arg]
    """단일 스레드에 대해 Stage 2 추출을 수행한다."""
    thread = s1.thread
    thread_text = _thread_to_text(thread)
    insight_types_str = ", ".join(s1.insight_types) if s1.insight_types else "qa"
    prompt = _STAGE2_PROMPT_TEMPLATE.format(
        thread_text=thread_text,
        insight_types=insight_types_str,
    )

    try:
        raw_text = _call_claude(prompt, model="sonnet", timeout=300)
        parsed = _parse_json_response(raw_text)

        return _build_insight_from_llm(
            parsed, thread, index, source_chat, s1.insight_types
        )

    except Exception as exc:
        logger.warning(
            "Stage 2 LLM 실패 (스레드 %d): %s — 규칙 기반 추출로 전환", index, exc
        )
        return _rule_based_extract_single(thread, index, source_chat, s1.insight_types)


def _build_insight_from_llm(
    parsed: dict,  # type: ignore[type-arg]
    thread: ThreadV2,
    index: int,
    source_chat: str,
    fallback_types: list[str],
) -> Optional[dict]:  # type: ignore[type-arg]
    """LLM 파싱 결과로 InsightV2 딕셔너리를 생성한다."""
    msgs = thread.messages

    # 전문가 식별: LLM 응답 우선, 없으면 답변 메시지 중 최다 발언자
    expert = str(parsed.get("expert", ""))
    if not expert and len(msgs) > 1:
        answer_msgs = msgs[1:]
        user_count: dict[str, int] = {}
        for m in answer_msgs:
            user_count[m.user] = user_count.get(m.user, 0) + 1
        if user_count:
            expert = max(user_count, key=lambda u: user_count[u])

    source_date = thread.start_time.split(" ")[0] if thread.start_time else ""
    raw_thread = [f"[{m.user}] {_mask_phone(m.content)}" for m in msgs]
    participants = list({m.user for m in msgs})

    # InsightType 검증
    type_str = str(parsed.get("type", fallback_types[0] if fallback_types else "qa"))
    insight_type = _resolve_insight_type(type_str)

    # 카테고리 검증: 목록에 없으면 텍스트 기반 감지
    category = str(parsed.get("category", "기타"))
    if category not in ALL_CATEGORIES:
        full_text = " ".join(m.content for m in msgs)
        category = _detect_category_v2(full_text)

    insight = InsightV2(
        id=f"insight-{index:03d}",
        title=str(parsed.get("title", msgs[0].content[:50])),
        type=insight_type,
        category=category,
        summary=str(parsed.get("summary", "")),
        key_points=list(parsed.get("key_points", [])),
        expert=expert,
        confidence=str(parsed.get("confidence", "medium")),
        related_topics=list(parsed.get("related_topics", [])),
        tags=list(parsed.get("tags", [])),
        source_date=source_date,
        source_chat=source_chat,
        raw_thread=raw_thread,
        participants=participants,
        question=str(parsed.get("question", "")),
        answer=str(parsed.get("answer", "")),
    )
    return insight.model_dump()


# ---------------------------------------------------------------------------
# 4. 규칙 기반 필터 (_rule_based_filter)
# ---------------------------------------------------------------------------


def _rule_based_filter(threads: list[ThreadV2]) -> list[Stage1Result]:
    """LLM 없을 때 규칙 기반 필터링."""
    return [_rule_based_filter_single(t) for t in threads]


def _rule_based_filter_single(thread: ThreadV2) -> Stage1Result:
    """단일 스레드에 대해 규칙 기반 필터를 적용한다.

    최소 2개 이상 실질적 메시지가 있고, 인사이트 유형 패턴이 감지된 경우만 통과.
    """
    msgs = thread.messages

    # 최소 메시지 수 조건
    if len(msgs) < 2:
        return Stage1Result(
            thread=thread,
            has_insight=False,
            insight_types=[],
            noise_reason="메시지 수 부족 (2개 미만)",
        )

    # 비질문 스레드 필터링 (인사/일기/감사만 있는 스레드)
    if _is_non_question_thread(thread):
        return Stage1Result(
            thread=thread,
            has_insight=False,
            insight_types=[],
            noise_reason="비질문 스레드 (인사/감사만 포함)",
        )

    full_text = " ".join(m.content for m in msgs)
    detected_types = _detect_insight_types(full_text)

    # #궁금증 태그 → qa 강제 맨 앞으로
    if any("#궁금증" in m.content for m in msgs):
        if "qa" in detected_types:
            detected_types.remove("qa")
        detected_types.insert(0, "qa")

    has_insight = len(detected_types) > 0
    noise_reason = "" if has_insight else "인사이트 패턴 미감지"

    thread.has_insight = has_insight
    thread.insight_types = detected_types

    return Stage1Result(
        thread=thread,
        has_insight=has_insight,
        insight_types=detected_types,
        noise_reason=noise_reason,
    )


# ---------------------------------------------------------------------------
# 5. 규칙 기반 추출 (_rule_based_extract)
# ---------------------------------------------------------------------------


def _rule_based_extract(
    stage1_results: list[Stage1Result],
    source_chat: str,
    start_index: int = 1,
) -> list[dict]:  # type: ignore[type-arg]
    """LLM 없을 때 규칙 기반 인사이트 추출."""
    candidates = [r for r in stage1_results if r.has_insight]
    logger.info("규칙 기반 추출: %d개 스레드", len(candidates))

    results: list[dict] = []  # type: ignore[type-arg]
    for i, s1 in enumerate(candidates, start=start_index):
        entry = _rule_based_extract_single(s1.thread, i, source_chat, s1.insight_types)
        if entry is not None:
            results.append(entry)
    return results


def _rule_based_extract_single(
    thread: ThreadV2,
    index: int,
    source_chat: str,
    insight_types: list[str],
) -> Optional[dict]:  # type: ignore[type-arg]
    """단일 스레드에 대해 규칙 기반 추출을 수행한다.

    기존 _build_wiki_entry_rule_based와 유사하지만 InsightV2 모델에 맞게 확장.
    """
    msgs = thread.messages
    if not msgs:
        return None

    # #궁금증 태그가 있는 메시지를 질문 시작점으로 사용
    question_idx = 0
    for i, m in enumerate(msgs):
        if "#궁금증" in m.content:
            question_idx = i
            break

    question_msg = msgs[question_idx]
    answer_msgs = msgs[question_idx + 1 :]

    # 전체 텍스트
    full_text = " ".join(m.content for m in msgs)

    # 카테고리 감지
    category = _detect_category_v2(full_text)

    # 제목: 첫 메시지 앞 50자
    title = question_msg.content[:50].replace("\n", " ").strip()

    # 답변 텍스트
    answer_text = "\n".join(_mask_phone(m.content) for m in answer_msgs)

    # 질문 텍스트 (마스킹)
    question_text = _mask_phone(question_msg.content)

    # expert: 답변 메시지에서 최다 발언자
    expert = ""
    if answer_msgs:
        user_count: dict[str, int] = {}
        for m in answer_msgs:
            user_count[m.user] = user_count.get(m.user, 0) + 1
        expert = max(user_count, key=lambda u: user_count[u])

    # 키워드 추출
    keywords = _extract_keywords(full_text)

    # 핵심 포인트: 답변 메시지 각각 첫 줄 (최대 5개)
    key_points: list[str] = []
    for m in answer_msgs[:5]:
        first_line = m.content.split("\n")[0].strip()
        if first_line and len(first_line) >= 10:
            key_points.append(_mask_phone(first_line))

    # 요약: 질문 + 답변 앞 200자
    summary_raw = (question_text + " " + answer_text).strip()
    summary = summary_raw[:200]

    # 참여자 목록
    participants = list({m.user for m in msgs})

    # raw_thread
    raw_thread = [f"[{m.user}] {_mask_phone(m.content)}" for m in msgs]

    source_date = thread.start_time.split(" ")[0] if thread.start_time else ""

    # InsightType 결정
    type_str = insight_types[0] if insight_types else "qa"
    insight_type = _resolve_insight_type(type_str)

    # 태그: #궁금증 포함 여부 + 카테고리
    tags: list[str] = []
    if any("#궁금증" in m.content for m in msgs):
        tags.append("#궁금증")
    tags.append(f"#{category.replace('/', '_')}")

    insight = InsightV2(
        id=f"insight-{index:03d}",
        title=title,
        type=insight_type,
        category=category,
        summary=summary,
        key_points=key_points if key_points else [title],
        expert=expert,
        confidence="medium",
        related_topics=keywords,
        tags=tags,
        source_date=source_date,
        source_chat=source_chat,
        raw_thread=raw_thread,
        participants=participants,
        question=question_text,
        answer=answer_text[:500],  # 500자 제한
    )
    return insight.model_dump()


# ---------------------------------------------------------------------------
# 6. 메인 엔트리 함수 (extract_knowledge_v2)
# ---------------------------------------------------------------------------


def _add_log(msg: str) -> None:
    """로그 메시지를 _recent_logs에 추가 (최근 15줄 유지)."""
    global _recent_logs
    ts = datetime.now().strftime("%H:%M:%S")
    _recent_logs.append(f"[{ts}] {msg}")
    _recent_logs = _recent_logs[-15:]


def _write_progress(progress_file: str, data: dict) -> None:
    """진행률 파일에 현재 상태를 기록한다."""
    try:
        p = Path(progress_file)
        if p.exists():
            try:
                current = json.loads(p.read_text(encoding="utf-8"))
                if current.get("status") == "cancelled":
                    return
                # startedAt 등 기존 필드를 보존하면서 새 데이터로 업데이트
                merged = current.copy()
                merged.update(data)
                data = merged
            except Exception:
                pass
        data["lastUpdated"] = datetime.now().isoformat()
        data["recentLogs"] = list(_recent_logs)
        p.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8")
    except Exception:
        logger.warning("progress file 쓰기 실패: %s", progress_file)


def extract_knowledge_v2(
    messages: list[ChatMessage],
    use_llm: bool = False,
    source_chat: str = "",
    output_dir: str | None = None,
    batch_size: int = 50,
    progress_file: str | None = None,
    month: str = "",
    skip_threads: int = 0,
) -> list[dict]:  # type: ignore[type-arg]
    """Phase 1 다층 LLM 파이프라인으로 인사이트 추출.

    Parameters
    ----------
    messages:
        parse_kakao_chat()으로부터 얻은 ChatMessage 리스트
    use_llm:
        True이면 Claude CLI를 통한 Haiku+Sonnet 2단계, False이면 규칙 기반
    source_chat:
        채팅방 이름
    output_dir:
        중간 결과 저장 디렉토리. 지정 시 배치마다 batch_NNN.json 저장
    batch_size:
        배치 크기 (기본 50 스레드씩)
    progress_file:
        진행 상태를 기록할 JSON 파일 경로. None이면 기록하지 않음.
        배치마다 갱신되며 대시보드 서버가 이 파일을 읽어 진행률을 표시한다.
    month:
        필터링할 월 (YYYY-MM 형식, 예: "2026-03"). 빈 문자열이면 전체 기간 처리.
    skip_threads:
        처음 N개 스레드를 건너뜁니다. 중단된 정제를 이어서 실행할 때 사용.

    Returns
    -------
    InsightV2.model_dump() 딕셔너리 리스트
    """
    # ---- 월 필터링 ----
    if month:
        original_count = len(messages)
        if month.endswith("-H1"):
            base = month[:-3]  # "2026-03"
            messages = [
                m
                for m in messages
                if m.date.startswith(base) and int(m.date[8:10]) <= 15
            ]
        elif month.endswith("-H2"):
            base = month[:-3]
            messages = [
                m
                for m in messages
                if m.date.startswith(base) and int(m.date[8:10]) >= 16
            ]
        else:
            messages = [m for m in messages if m.date.startswith(month)]
        logger.info(
            "월 필터: %s → %d개 메시지 (원본 %d개)",
            month,
            len(messages),
            original_count,
        )
        _add_log(
            f"월 필터링 적용: {month} → {len(messages)}개 메시지 (원본 {original_count}개)"
        )

    # ---- 규칙 기반 경로 ----
    if not use_llm:
        logger.info("규칙 기반 모드로 실행")
        _add_log("규칙 기반 모드로 실행")
        # 규칙 기반 경로에서도 체크포인트 복원
        threads: list[ThreadV2] = []
        restored = False
        if output_dir:
            threads_cp = Path(output_dir) / "checkpoint_threads.json"
            if threads_cp.exists():
                threads = _threads_from_dicts(json.loads(threads_cp.read_text()))
                _add_log(f"체크포인트 복원: 스레드 분리 결과 {len(threads)}개 스레드")
                restored = True

        if not restored:
            threads = _split_threads_v2(
                messages,
                use_llm=False,
                progress_file=progress_file,
                output_dir=output_dir,
            )
        if skip_threads > 0:
            threads = threads[skip_threads:]
        stage1_results = _rule_based_filter(threads)
        all_results = _rule_based_extract(stage1_results, source_chat)
        logger.info("규칙 기반 추출 완료: %d건", len(all_results))
        _add_log(f"규칙 기반 추출 완료: {len(all_results)}건")
        # 규칙 기반 경로도 체크포인트 정리
        if output_dir:
            _cleanup_checkpoints(output_dir)
        if progress_file:
            _write_progress(
                progress_file,
                {
                    "status": "completed",
                    "progress": 100,
                    "currentStep": "완료",
                    "processedThreads": len(threads),
                    "totalThreads": len(threads),
                    "insightsFound": len(all_results),
                    "noiseFiltered": 0,
                    "errors": 0,
                    "startedAt": datetime.now().isoformat(),
                },
            )
        return all_results

    # ---- LLM 경로 ----
    logger.info("LLM 모드로 실행 (Claude CLI)")
    _recent_logs.clear()  # 새 정제 시작 시 로그 초기화
    _add_log("정제 프로세스 시작")

    start_time = datetime.now().isoformat()

    # 체크포인트 존재하면 스레드 분리 건너뛰기
    threads: list[ThreadV2] = []
    skip_split = False
    if output_dir:
        refined_cp = Path(output_dir) / "checkpoint_refined_threads.json"
        threads_cp = Path(output_dir) / "checkpoint_threads.json"
        if refined_cp.exists():
            threads = _threads_from_dicts(json.loads(refined_cp.read_text()))
            _add_log(f"체크포인트 복원: LLM 정밀 분리 결과 {len(threads)}개 스레드")
            skip_split = True
        elif threads_cp.exists():
            threads = _threads_from_dicts(json.loads(threads_cp.read_text()))
            _add_log(f"체크포인트 복원: 스레드 분리 결과 {len(threads)}개 스레드")
            skip_split = True

    if not skip_split:
        threads = _split_threads_v2(
            messages, use_llm=True, progress_file=progress_file, output_dir=output_dir
        )

    original_total = len(threads)
    if skip_threads > 0:
        logger.info(
            "이어서 정제: 처음 %d개 스레드 건너뜀 (전체 %d개)",
            skip_threads,
            original_total,
        )
        _add_log(
            f"이어서 정제: {skip_threads}개 스레드 건너뜀 (전체 {original_total}개)"
        )
        threads = threads[skip_threads:]

    progress = BatchProgress(total_threads=original_total)
    progress.processed_threads = skip_threads
    all_results: list[dict] = []  # type: ignore[type-arg]

    if progress_file:
        _write_progress(
            progress_file,
            {
                "status": "running",
                "progress": max(
                    10, int(skip_threads / max(original_total, 1) * 90) + 10
                ),
                "currentStep": (
                    f"배치 처리 준비 (스레드 {original_total}개, {skip_threads}개 건너뜀)"
                    if skip_threads > 0
                    else f"배치 처리 준비 (스레드 {original_total}개)"
                ),
                "processedThreads": skip_threads,
                "totalThreads": original_total,
                "insightsFound": 0,
                "noiseFiltered": 0,
                "errors": 0,
                "startedAt": start_time,
            },
        )

    try:
        # 배치 처리
        for batch_num, batch_start in enumerate(
            range(0, len(threads), batch_size), start=1
        ):
            batch_threads = threads[batch_start : batch_start + batch_size]
            logger.info(
                "배치 %d 처리 시작: 스레드 %d~%d / %d",
                batch_num,
                batch_start + 1,
                batch_start + len(batch_threads),
                len(threads),
            )
            _add_log(
                f"배치 {batch_num} 처리 시작 (스레드 #{batch_start+1}~#{batch_start+len(batch_threads)})"
            )

            batch_results: list[dict] = []  # type: ignore[type-arg]

            try:
                # LLM 호출 전 로그 + progress 갱신 (stuck 경고 방지)
                _add_log(
                    f"배치 {batch_num} LLM 분석 요청 중 (스레드 #{batch_start+1}~#{batch_start+len(batch_threads)}, {len(batch_threads)}개 스레드)..."
                )
                if progress_file:
                    total_batches = (len(threads) + batch_size - 1) // batch_size
                    pct = 10 + int(
                        progress.processed_threads / max(progress.total_threads, 1) * 90
                    )
                    _write_progress(
                        progress_file,
                        {
                            "status": "running",
                            "progress": pct,
                            "currentStep": f"배치 {batch_num}/{total_batches} — LLM 분석 중 (스레드 #{batch_start+1}~#{batch_start+len(batch_threads)})",
                            "processedThreads": progress.processed_threads,
                            "totalThreads": progress.total_threads,
                            "insightsFound": progress.insights_found,
                            "noiseFiltered": progress.noise_filtered,
                            "errors": progress.errors,
                            "startedAt": start_time,
                        },
                    )
                # Stage 1: Haiku 필터링
                stage1_results = _stage1_filter(batch_threads)
                progress.noise_filtered += sum(
                    1 for r in stage1_results if not r.has_insight
                )

                # Stage 2: Sonnet 심층 추출
                batch_results = _stage2_extract(
                    stage1_results,
                    source_chat,
                    start_index=len(all_results) + 1,
                )

                progress.insights_found += len(batch_results)
                all_results.extend(batch_results)

            except Exception as exc:
                logger.error(
                    "배치 %d 처리 오류: %s — 해당 배치 규칙 기반으로 전환",
                    batch_num,
                    exc,
                )
                _add_log(f"⚠ 배치 {batch_num} 오류 - 규칙 기반으로 전환")
                progress.errors += 1

                # 배치 전체 실패 시 규칙 기반 폴백
                try:
                    fallback_stage1 = _rule_based_filter(batch_threads)
                    fallback_results = _rule_based_extract(
                        fallback_stage1,
                        source_chat,
                        start_index=len(all_results) + 1,
                    )
                    progress.insights_found += len(fallback_results)
                    all_results.extend(fallback_results)
                    batch_results = fallback_results
                except Exception as fallback_exc:
                    logger.error("규칙 기반 폴백도 실패: %s — 배치 스킵", fallback_exc)

            finally:
                progress.processed_threads += len(batch_threads)

            # 중간 결과 저장
            if output_dir and batch_results:
                _save_batch(output_dir, batch_num, batch_results)

            # 메모리 관리
            del batch_threads
            del batch_results
            gc.collect()

            # 배치 간 rate limit 완화 (마지막 배치 제외)
            if batch_start + batch_size < len(threads):
                time.sleep(1)

            logger.info(
                "배치 %d 완료 | 처리: %d/%d | 인사이트: %d | 노이즈: %d | 오류: %d",
                batch_num,
                progress.processed_threads,
                progress.total_threads,
                progress.insights_found,
                progress.noise_filtered,
                progress.errors,
            )
            _add_log(
                f"배치 {batch_num} 완료: 인사이트 {progress.insights_found}건, 노이즈 {progress.noise_filtered}건 필터"
            )

            if progress_file:
                total_batches = (len(threads) + batch_size - 1) // batch_size
                pct = 10 + int(
                    progress.processed_threads / max(progress.total_threads, 1) * 90
                )
                # currentPreview: 현재 배치 첫 스레드의 메시지 미리보기
                preview_lines = []
                next_start = batch_start + batch_size
                if next_start < len(threads):
                    next_batch = threads[next_start : next_start + batch_size]
                    if next_batch and next_batch[0].messages:
                        for msg in next_batch[0].messages[:3]:
                            text = msg.content[:30]
                            if len(msg.content) > 30:
                                text += "..."
                            preview_lines.append(f"{msg.user}: {text}")
                current_preview = " | ".join(preview_lines) if preview_lines else None
                _write_progress(
                    progress_file,
                    {
                        "status": "running",
                        "progress": pct,
                        "currentStep": f"배치 {batch_num}/{total_batches} 완료",
                        "processedThreads": progress.processed_threads,
                        "totalThreads": progress.total_threads,
                        "insightsFound": progress.insights_found,
                        "noiseFiltered": progress.noise_filtered,
                        "errors": progress.errors,
                        "startedAt": start_time,
                        "currentPreview": current_preview,
                    },
                )

    except Exception as fatal_exc:
        logger.error("extract_knowledge_v2 치명적 오류: %s", fatal_exc)
        _add_log(f"❌ 치명적 오류: {fatal_exc}")
        if progress_file:
            _write_progress(
                progress_file,
                {
                    "status": "failed",
                    "progress": int(
                        progress.processed_threads
                        / max(progress.total_threads, 1)
                        * 100
                    ),
                    "currentStep": f"오류 발생: {fatal_exc}",
                    "processedThreads": progress.processed_threads,
                    "totalThreads": progress.total_threads,
                    "insightsFound": progress.insights_found,
                    "noiseFiltered": progress.noise_filtered,
                    "errors": progress.errors + 1,
                    "startedAt": start_time,
                },
            )
        raise

    logger.info(
        "extract_knowledge_v2 완료 | 총 인사이트: %d건 | 노이즈 필터: %d건 | 오류: %d건",
        progress.insights_found,
        progress.noise_filtered,
        progress.errors,
    )
    _add_log(
        f"✅ 정제 완료! 총 인사이트: {progress.insights_found}건, 노이즈 필터: {progress.noise_filtered}건"
    )
    # 정제 완료 시 체크포인트 정리
    if output_dir:
        _cleanup_checkpoints(output_dir)
    if progress_file:
        _write_progress(
            progress_file,
            {
                "status": "completed",
                "progress": 100,
                "currentStep": "완료",
                "processedThreads": progress.processed_threads,
                "totalThreads": progress.total_threads,
                "insightsFound": progress.insights_found,
                "noiseFiltered": progress.noise_filtered,
                "errors": progress.errors,
                "startedAt": start_time,
            },
        )
    return all_results
