#!/usr/bin/env python3
"""
작업 위임 디스패처 (dispatch.py)

아누가 팀장에게 작업을 위임할 때 사용.
cokacdir --cron으로 독립 세션을 생성하여 현재 대화를 막지 않음.

Usage:
    # 권장: 지시 내용을 파일에 먼저 작성 후 --task-file로 전달
    python3 dispatch.py --team dev1-team --task-file /path/to/task-desc.md [--level normal|critical|security]
    # 짧은 한 줄 작업만 --task 직접 전달 (100자 이내 권장)
    python3 dispatch.py --team dev2-team --task "간단한 버그 수정" --level critical
"""

import argparse
import fcntl
import json
import os
import re
import subprocess
import sys
from datetime import datetime, timedelta
from pathlib import Path
from typing import Any, List, Optional

# .env.keys 먼저 로드 (모듈 수준 os.environ.get()보다 먼저 실행되어야 함)
try:
    sys.path.insert(0, str(Path(__file__).parent))
    from utils.env_loader import load_env_keys

    load_env_keys()
except Exception:
    pass

# atomic write 유틸리티 로드 (task-timers.json 파손 방지)
try:
    from utils.atomic_write import atomic_json_write
except Exception:

    def atomic_json_write(path: Any, data: Any, **_kw: Any) -> None:  # type: ignore[misc]
        """fallback: utils.atomic_write 로드 실패 시 인라인 구현."""
        import tempfile as _tf

        target = Path(path)
        target.parent.mkdir(parents=True, exist_ok=True)
        fd, tmp_path = _tf.mkstemp(dir=str(target.parent), suffix=".tmp")
        try:
            with os.fdopen(fd, "w", encoding="utf-8") as f:
                json.dump(data, f, indent=2, ensure_ascii=False)
                f.flush()
                os.fsync(f.fileno())
            os.replace(tmp_path, str(target))
        except BaseException:
            try:
                os.unlink(tmp_path)
            except OSError:
                pass
            raise


# config 참조 로드 (Phase 0 산출물)
try:
    from config.loader import ConfigManager

    _cfg = ConfigManager.get_instance()
    _CONFIG_AVAILABLE = True
except Exception:
    _cfg = None
    _CONFIG_AVAILABLE = False

# 프롬프트 모듈 임포트
try:
    from prompts.team_prompts import TEAM_INFO
    from prompts.team_prompts import build_prompt as _build_team_prompt
    from utils.logger import get_logger
except ImportError:
    from prompts.team_prompts import TEAM_INFO
    from prompts.team_prompts import build_prompt as _build_team_prompt
    from utils.logger import get_logger

from utils.composite_constants import (
    COMPOSITE_ALLOWED_TEAMS,
    MAX_COMPOSITE_TEAMS,
)

logger = get_logger(__name__)

# ── 선택적 유틸리티 모듈 임포트 ──────────────────────────────────────────
try:
    from utils.redact import redact_sensitive_text as _redact_text

    _REDACT_AVAILABLE = True
except ImportError:

    def _redact_text(text: str) -> str:  # type: ignore[misc]
        return text

    _REDACT_AVAILABLE = False

try:
    from utils.injection_guard import scan_content as _scan_content

    _INJECTION_GUARD_AVAILABLE = True
except ImportError:
    _scan_content = None  # type: ignore[assignment]
    _INJECTION_GUARD_AVAILABLE = False

try:
    from utils.approval import check_command as _check_command

    _APPROVAL_AVAILABLE = True
except ImportError:
    _check_command = None  # type: ignore[assignment]
    _APPROVAL_AVAILABLE = False

try:
    from utils.audit_logger import log_file_operation as _log_file_operation

    _AUDIT_LOGGER_AVAILABLE = True
except ImportError:
    _log_file_operation = None  # type: ignore[assignment]
    _AUDIT_LOGGER_AVAILABLE = False

try:
    from utils.model_router import route_model as _route_model

    _MODEL_ROUTER_AVAILABLE = True
except ImportError:
    _route_model = None  # type: ignore[assignment]
    _MODEL_ROUTER_AVAILABLE = False

try:
    from utils.sanitize_gate import should_sanitize as _should_sanitize

    _SANITIZE_GATE_AVAILABLE = True
except ImportError:
    _should_sanitize = None  # type: ignore[assignment]
    _SANITIZE_GATE_AVAILABLE = False

try:
    from utils.bot_status import BotStatusManager as _BotStatusManager

    _BOT_STATUS_AVAILABLE = True
except Exception:
    _BotStatusManager = None  # type: ignore[assignment,misc]
    _BOT_STATUS_AVAILABLE = False

try:
    from utils.session_resilience import SessionResilience as _SessionResilience

    _SESSION_RESILIENCE_AVAILABLE = True
except ImportError:
    _SessionResilience = None  # type: ignore[assignment,misc]
    _SESSION_RESILIENCE_AVAILABLE = False

try:
    import importlib.util as _ilu

    _isr_spec = _ilu.spec_from_file_location(
        "image_skill_router",
        str(Path(__file__).parent / "tools" / "image-skill-router.py"),
    )
    _isr_mod = _ilu.module_from_spec(_isr_spec)  # type: ignore[arg-type]
    _isr_spec.loader.exec_module(_isr_mod)  # type: ignore[union-attr]
    _get_skill_recommendation = _isr_mod.get_skill_recommendation
    _IMAGE_SKILL_ROUTER_AVAILABLE = True
except Exception:
    _get_skill_recommendation = None  # type: ignore[assignment]
    _IMAGE_SKILL_ROUTER_AVAILABLE = False


# 조직 구조 로드
_ws_fallback = _cfg.get_path("roots.workspace") if _CONFIG_AVAILABLE and _cfg else "/home/jay/workspace"
WORKSPACE = Path(os.environ.get("WORKSPACE_ROOT", _ws_fallback))
ORG_FILE = WORKSPACE / "memory" / "organization-structure.json"
TASK_TIMER = WORKSPACE / "memory" / "task-timer.py"

# ultimate fallback: config 로드 실패 시에만 하드코딩 값 사용
_chat_fallback = _cfg.get_constant("chat_id") if _CONFIG_AVAILABLE and _cfg else "6937032012"
CHAT_ID = os.environ.get("COKACDIR_CHAT_ID", _chat_fallback)

MAX_REF_FILE_TOTAL_BYTES = 50 * 1024  # 한 세션 참조 파일 합계 50KB 한도

# Memory 체크용 경로 상수
_MEMORY_BASE_PATH = Path.home() / ".claude" / "projects" / "-home-jay--cokacdir-workspace-autoset" / "memory"

# 봇별 키 (토큰 SHA256 앞 16자리)
BOT_KEYS = {
    "anu": os.environ.get("COKACDIR_KEY_ANU"),  # 봇A 아누
    "dev1": os.environ.get("COKACDIR_KEY_DEV1"),  # 봇B 헤르메스
    "dev2": os.environ.get("COKACDIR_KEY_DEV2"),  # 봇C 오딘
    "dev3": os.environ.get("COKACDIR_KEY_DEV3"),  # 봇D 다그다
    "dev4": os.environ.get("COKACDIR_KEY_DEV4"),  # 봇E 비슈누
    "dev5": os.environ.get("COKACDIR_KEY_DEV5"),  # 봇F 마르둑
    "dev6": os.environ.get("COKACDIR_KEY_DEV6"),  # 봇G 페룬
    "dev7": os.environ.get("COKACDIR_KEY_DEV7"),  # 봇H 이참나
    "dev8": os.environ.get("COKACDIR_KEY_DEV8"),  # 봇I 라
}

# 팀 → 봇 라우팅 (org_loader에서 동적 로드)
try:
    from utils.org_loader import build_bot_to_key_map, build_team_bot_map, build_team_to_bot_id_map

    TEAM_BOT = build_team_bot_map()
    BOT_TO_KEY = build_bot_to_key_map()
    TEAM_TO_BOT_ID = build_team_to_bot_id_map()
except ImportError:
    # fallback — config/constants.json에서 로딩
    if _CONFIG_AVAILABLE and _cfg:
        _teams_map = _cfg.get_constant("teams")
        _team_to_bot_map = _cfg.get_constant("team_to_bot")
        TEAM_BOT = dict(_teams_map)
        TEAM_TO_BOT_ID = dict(_team_to_bot_map)
        BOT_TO_KEY = {bot_id: _teams_map[team] for team, bot_id in _team_to_bot_map.items()}
    else:
        # ultimate fallback — config도 없을 때
        TEAM_BOT = {
            "dev1-team": "dev1",
            "dev2-team": "dev2",
            "dev3-team": "dev3",
            "dev4-team": "dev4",
            "dev5-team": "dev5",
            "dev6-team": "dev6",
            "dev7-team": "dev7",
            "dev8-team": "dev8",
        }
        BOT_TO_KEY = {
            "bot-b": "dev1",
            "bot-c": "dev2",
            "bot-d": "dev3",
            "bot-e": "dev4",
            "bot-f": "dev5",
            "bot-g": "dev6",
            "bot-h": "dev7",
            "bot-i": "dev8",
        }
        TEAM_TO_BOT_ID = {
            "dev1-team": "bot-b",
            "dev2-team": "bot-c",
            "dev3-team": "bot-d",
            "dev4-team": "bot-e",
            "dev5-team": "bot-f",
            "dev6-team": "bot-g",
            "dev7-team": "bot-h",
            "dev8-team": "bot-i",
        }

# 횡단 조직
CROSS_FUNCTIONAL = {
    "qc": {"name": "마아트 (Ma'at)", "role": "QC 매니저"},
    "redteam": {"name": "로키 (Loki)", "role": "레드팀 리더"},
    "design": {"name": "비너스 (Venus)", "role": "디자인 디렉터"},
    "devops": {"name": "야누스 (Janus)", "role": "DevOps"},
}

# 마케팅/컨설팅 전용: 가용 봇 자동 선택 대상
DYNAMIC_BOT_TEAMS = {"marketing", "consulting", "publishing", "design", "content"}

# 봇 → 기본 팀 역매핑 (TEAM_TO_BOT_ID의 역)
BOT_TO_DEFAULT_TEAM = {v: k for k, v in TEAM_TO_BOT_ID.items()}


def _get_busy_bots_info(exclude_task_id: str | None = None) -> dict[str, dict[str, str]]:
    """task-timers.json에서 running 상태 봇의 점유 정보 반환.

    Args:
        exclude_task_id: 이 task_id의 엔트리는 결과에서 제외 (자기 자신 제외용)

    Returns:
        {bot_id: {"task_id": ..., "team_id": ...}} 매핑
    """
    if _BOT_STATUS_AVAILABLE and _BotStatusManager is not None:
        return _BotStatusManager(workspace_root=WORKSPACE).get_busy_bots(exclude_task_id=exclude_task_id)

    # fallback: BotStatusManager 미사용 시 기존 로직
    timer_file = WORKSPACE / "memory" / "task-timers.json"
    busy: dict[str, dict[str, str]] = {}

    if not timer_file.exists():
        return busy

    try:
        with open(timer_file, "r", encoding="utf-8") as f:
            data = json.load(f)
        for task_id, task_entry in data.get("tasks", {}).items():
            if task_entry.get("status") != "running":
                continue
            if exclude_task_id and task_id == exclude_task_id:
                continue
            team_id = task_entry.get("team_id", "")
            # dev team → TEAM_TO_BOT_ID mapping
            if team_id in TEAM_TO_BOT_ID:
                bot = TEAM_TO_BOT_ID[team_id]
                busy[bot] = {"task_id": task_id, "team_id": team_id}
            # composite/marketing/design 등 → bot 필드 직접 활용
            bot_field = task_entry.get("bot")
            if bot_field:
                busy[bot_field] = {"task_id": task_id, "team_id": team_id}
    except (json.JSONDecodeError, OSError) as e:
        logger.warning(f"task-timers.json 읽기 실패 (봇 점유 정보): {e}")

    return busy


def _find_available_bot(required_model: str | None = None) -> str:
    """task-timers.json에서 running 상태 확인 후 가용 봇 반환.

    우선순위: bot-b > bot-c > bot-d > bot-e > bot-f > bot-g > bot-h > bot-i
    dev team running → 해당 봇 사용 중
    composite/marketing/design running → bot 필드로 판별

    Args:
        required_model: 필요한 모델명 (예: "claude-opus-4-6"). 지정 시 해당 모델의 봇만 선택.

    Returns:
        "bot-b" | "bot-c" | ... | "bot-i"
    Raises:
        RuntimeError: 조건에 맞는 가용 봇이 없을 때
    """
    busy_bots = set(_get_busy_bots_info().keys())

    # 모델 필터링이 필요한 경우 봇별 모델 맵 구축
    bot_model_map: dict[str, str] = {}
    if required_model:
        key_models = _read_bot_models()
        for bot_id in ["bot-b", "bot-c", "bot-d", "bot-e", "bot-f", "bot-g", "bot-h", "bot-i"]:
            default_team = BOT_TO_DEFAULT_TEAM.get(bot_id, "")
            key_name = TEAM_BOT.get(default_team, "")
            key_hash = BOT_KEYS.get(key_name, "")
            if key_hash and key_hash in key_models:
                bot_model_map[bot_id] = key_models[key_hash]

    for bot in ["bot-b", "bot-c", "bot-d", "bot-e", "bot-f", "bot-g", "bot-h", "bot-i"]:
        if bot in busy_bots:
            continue
        if required_model and bot_model_map.get(bot, "") != required_model:
            logger.info(f"[model-filter] {bot} 스킵: model={bot_model_map.get(bot, 'unknown')} ≠ {required_model}")
            continue
        return bot

    if required_model:
        raise RuntimeError(
            f"모델 조건({required_model})에 맞는 가용 봇이 없습니다. "
            f"모든 opus 봇이 작업 중이거나 모델 미설정입니다."
        )
    raise RuntimeError("모든 봇이 작업 중입니다. 잠시 후 다시 시도하세요.")


def _get_available_bots_with_teams(busy_bots: dict[str, dict[str, str]]) -> list[dict[str, str]]:
    """전체 봇 중 busy가 아닌 가용 봇 목록과 기본 팀 매핑을 반환.

    Args:
        busy_bots: _get_busy_bots_info()의 반환값 {bot_id: {"task_id": ..., "team_id": ...}}

    Returns:
        [{"bot_id": "bot-c", "default_team": "dev2-team"}, ...]
    """
    all_bots = ["bot-b", "bot-c", "bot-d", "bot-e", "bot-f", "bot-g", "bot-h", "bot-i"]
    available = []
    for bot in all_bots:
        if bot not in busy_bots:
            available.append(
                {
                    "bot_id": bot,
                    "default_team": BOT_TO_DEFAULT_TEAM.get(bot, "unknown"),
                }
            )
    return available


def _select_and_reserve_bot(
    timer_task_id: str,
    required_model: str | None = None,
) -> str:
    """봇 선택과 타이머 기록을 원자적으로 수행.

    파일 락을 잡은 상태에서:
    1. busy bots 확인
    2. 가용 봇 선택
    3. task-timers.json에 bot 필드 즉시 기록

    이렇게 해야 동시 dispatch 간 봇 충돌을 방지할 수 있음.

    Args:
        timer_task_id: 봇을 예약할 태스크 ID
        required_model: 필요한 모델명 (예: "claude-opus-4-6")

    Returns:
        선택된 봇 ID (예: "bot-b")
    Raises:
        RuntimeError: 조건에 맞는 가용 봇이 없을 때
    """
    timer_file = WORKSPACE / "memory" / "task-timers.json"
    lock_file_path = WORKSPACE / "memory" / ".task-timers.lock"
    lock_file_path.parent.mkdir(parents=True, exist_ok=True)

    lock_fd = None
    try:
        lock_fd = open(lock_file_path, "w")
        fcntl.flock(lock_fd, fcntl.LOCK_EX)

        # busy bots 수집 (락 안에서 읽기)
        busy_bots: set[str] = set()
        if timer_file.exists():
            try:
                with open(timer_file, "r", encoding="utf-8") as f:
                    data = json.load(f)
                for tid, entry in data.get("tasks", {}).items():
                    if entry.get("status") != "running":
                        continue
                    if tid == timer_task_id:
                        continue
                    team = entry.get("team_id", "")
                    if team in TEAM_TO_BOT_ID:
                        busy_bots.add(TEAM_TO_BOT_ID[team])
                    bot_field = entry.get("bot")
                    if bot_field:
                        busy_bots.add(bot_field)
            except (json.JSONDecodeError, OSError) as e:
                logger.warning(f"task-timers.json 읽기 실패 (봇 예약): {e}")

        # 모델 필터링
        bot_model_map: dict[str, str] = {}
        if required_model:
            key_models = _read_bot_models()
            for bot_id in ["bot-b", "bot-c", "bot-d", "bot-e", "bot-f", "bot-g", "bot-h", "bot-i"]:
                default_team = BOT_TO_DEFAULT_TEAM.get(bot_id, "")
                key_name = TEAM_BOT.get(default_team, "")
                key_hash = BOT_KEYS.get(key_name, "")
                if key_hash and key_hash in key_models:
                    bot_model_map[bot_id] = key_models[key_hash]

        # 가용 봇 선택
        selected_bot: str | None = None
        for bot in ["bot-b", "bot-c", "bot-d", "bot-e", "bot-f", "bot-g", "bot-h", "bot-i"]:
            if bot in busy_bots:
                continue
            if required_model and bot_model_map.get(bot, "") != required_model:
                logger.info(f"[model-filter] {bot} 스킵: model={bot_model_map.get(bot, 'unknown')} ≠ {required_model}")
                continue
            selected_bot = bot
            break

        if selected_bot is None:
            if required_model:
                raise RuntimeError(
                    f"모델 조건({required_model})에 맞는 가용 봇이 없습니다. "
                    f"모든 opus 봇이 작업 중이거나 모델 미설정입니다."
                )
            raise RuntimeError("모든 봇이 작업 중입니다. 잠시 후 다시 시도하세요.")

        # 즉시 bot 필드 기록 (원자적 예약)
        if timer_file.exists():
            try:
                with open(timer_file, "r", encoding="utf-8") as f:
                    data = json.load(f)
                task_entry = data.get("tasks", {}).get(timer_task_id)
                if task_entry:
                    task_entry["bot"] = selected_bot
                    with open(timer_file, "w", encoding="utf-8") as f:
                        json.dump(data, f, ensure_ascii=False, indent=2)
                    logger.info(f"[봇 예약] {timer_task_id} → {selected_bot} (원자적 예약 완료)")
            except (json.JSONDecodeError, OSError) as e:
                logger.warning(f"봇 예약 기록 실패 ({timer_task_id}): {e}")

        return selected_bot
    finally:
        if lock_fd is not None:
            try:
                fcntl.flock(lock_fd, fcntl.LOCK_UN)
                lock_fd.close()
            except Exception:
                pass


def _read_bot_models(chat_id: str = CHAT_ID) -> dict[str, str]:
    """bot_settings.json에서 봇 키 해시별 모델 설정을 읽어 반환.

    Returns:
        {key_hash: model_name} 예: {"c38fb9955616e24d": "claude-opus-4-6"}
    """
    settings_path = Path.home() / ".cokacdir" / "bot_settings.json"
    if not settings_path.exists():
        return {}
    try:
        with open(settings_path, "r", encoding="utf-8") as f:
            settings = json.load(f)
        result = {}
        for key_hash, cfg in settings.items():
            models = cfg.get("models", {})
            model = models.get(str(chat_id), "")
            if model:
                result[key_hash] = model
        return result
    except Exception as e:
        logger.debug(f"[bot_models] bot_settings.json 읽기 실패: {e}")
        return {}


def _set_bot_model(key_hash: str, model: str, chat_id: str = CHAT_ID) -> str | None:
    """bot_settings.json에서 특정 봇의 모델을 변경한다.

    Args:
        key_hash: 봇 키 해시 (예: "c38fb9955616e24d")
        model: 설정할 모델명 (예: "claude-opus-4-6")
        chat_id: 채팅 ID

    Returns:
        이전 모델명 또는 None (변경 실패)
    """
    settings_path = Path.home() / ".cokacdir" / "bot_settings.json"
    if not settings_path.exists():
        logger.warning("[model-override] bot_settings.json 없음")
        return None
    try:
        with open(settings_path, "r", encoding="utf-8") as f:
            settings = json.load(f)
        bot_cfg = settings.get(key_hash)
        if not bot_cfg:
            logger.warning(f"[model-override] 봇 설정 없음: key={key_hash}")
            return None
        models = bot_cfg.get("models", {})
        prev_model = models.get(str(chat_id))
        models[str(chat_id)] = model
        bot_cfg["models"] = models
        settings[key_hash] = bot_cfg
        with open(settings_path, "w", encoding="utf-8") as f:
            json.dump(settings, f, ensure_ascii=False, indent=2)
        logger.info(f"[model-override] 봇 모델 변경: key={key_hash}, {prev_model} → {model}")
        return prev_model
    except Exception as e:
        logger.error(f"[model-override] 봇 모델 변경 실패: {e}")
        return None


def _schedule_model_restore(key_hash: str, model: str, delay: int = 30) -> None:
    """delay초 후 봇 모델을 복원하는 백그라운드 프로세스를 포크한다.

    세션이 시작된 후 모델을 원래대로 복원하여 다음 일반 작업에서 Sonnet이 사용되도록 한다.
    """
    restore_script = (
        "import json, pathlib; "
        f"p=pathlib.Path.home()/'.cokacdir'/'bot_settings.json'; "
        "d=json.loads(p.read_text(encoding='utf-8')); "
        f"d['{key_hash}']['models']['{CHAT_ID}']='{model}'; "
        f"p.write_text(json.dumps(d,ensure_ascii=False,indent=2),encoding='utf-8')"
    )
    subprocess.Popen(
        ["python3", "-c", f"import time; time.sleep({delay}); {restore_script}"],
        start_new_session=True,
        stdout=subprocess.DEVNULL,
        stderr=subprocess.DEVNULL,
    )
    logger.info(f"[model-override] {delay}초 후 모델 복원 예약: key={key_hash} → {model}")


def _validate_model_consistency(team_id: str) -> dict:
    """org-structure.json과 bot_settings.json의 모델 설정을 비교 검증한다.

    org-structure.json에서 해당 team_id의 lead.model 값과
    bot_settings.json에서 해당 봇의 models[chat_id] 값을 비교하여
    불일치 시 WARNING 로그를 출력한다.
    디스패치는 bot_settings.json 기준을 유지한다.

    Args:
        team_id: 검증할 팀 ID (예: "dev8-team")

    Returns:
        dict: {"consistent": bool, "org_model": str, "bot_model": str, "team_id": str}
    """
    result: dict = {"consistent": True, "org_model": "", "bot_model": "", "team_id": team_id}
    try:
        # org-structure.json에서 team_id의 lead.model 찾기
        org_model = ""
        if ORG_FILE.exists():
            with open(ORG_FILE, "r", encoding="utf-8") as f:
                org_data = json.load(f)
            teams = org_data.get("structure", {}).get("columns", {}).get("teams", [])
            for team in teams:
                # 직접 team_id 매칭 (논리팀)
                if team.get("team_id") == team_id:
                    lead = team.get("lead", {})
                    org_model = lead.get("model", "")
                    break
                # sub_teams 탐색 (개발팀 등 하위 구조)
                for sub in team.get("sub_teams", []):
                    if sub.get("sub_team_id") == team_id:
                        lead = sub.get("lead", {})
                        org_model = lead.get("model", "")
                        break
                if org_model:
                    break

        # bot_settings.json에서 해당 팀의 봇 모델 읽기
        bot_model = ""
        key_name = TEAM_BOT.get(team_id)
        if key_name:
            key_hash = BOT_KEYS.get(key_name)
            if key_hash:
                settings_path = Path.home() / ".cokacdir" / "bot_settings.json"
                if settings_path.exists():
                    with open(settings_path, "r", encoding="utf-8") as f:
                        settings = json.load(f)
                    bot_cfg = settings.get(key_hash, {})
                    bot_model = bot_cfg.get("models", {}).get(str(CHAT_ID), "")

        result["org_model"] = org_model
        result["bot_model"] = bot_model

        if org_model and bot_model and org_model != bot_model:
            result["consistent"] = False
            logger.warning(
                f"[모델 불일치] {team_id}: org-structure={org_model}, "
                f"bot_settings={bot_model} — 디스패치는 bot_settings 기준 유지"
            )
        else:
            logger.info(
                f"[모델 일관성] {team_id}: org={org_model or 'N/A'}, " f"bot={bot_model or 'N/A'} — 일치 또는 확인 불가"
            )
    except Exception as e:
        logger.warning(f"[모델 검증] 검증 실패 (무시): {e}")

    return result


def _sync_bot_settings() -> None:
    """bot_settings.json의 민감 정보(token)를 마스킹한 사본을 workspace에 저장한다.

    원본의 "token" 키 값을 "***REDACTED***"로 교체한 사본을
    {WORKSPACE}/memory/bot_settings_sync.json에 저장한다.
    기타 필드는 그대로 유지된다.
    실패해도 디스패치 자체는 중단되지 않는다.
    """
    import copy

    try:
        settings_path = Path.home() / ".cokacdir" / "bot_settings.json"
        if not settings_path.exists():
            logger.debug("[bot_settings_sync] bot_settings.json 없음, 동기화 스킵")
            return

        with open(settings_path, "r", encoding="utf-8") as f:
            settings = json.load(f)

        # 깊은 복사 후 token 마스킹
        masked = copy.deepcopy(settings)
        for key_hash, cfg in masked.items():
            if "token" in cfg:
                cfg["token"] = "***REDACTED***"

        sync_path = WORKSPACE / "memory" / "bot_settings_sync.json"
        sync_path.parent.mkdir(parents=True, exist_ok=True)
        with open(sync_path, "w", encoding="utf-8") as f:
            json.dump(masked, f, ensure_ascii=False, indent=2)

        logger.info(f"[bot_settings_sync] 동기화 완료: {sync_path}")
    except Exception as e:
        logger.warning(f"[bot_settings_sync] 동기화 실패 (무시): {e}")

    # constants.json에 bot 정보 반영 (token 제외)
    try:
        settings_path = Path.home() / ".cokacdir" / "bot_settings.json"
        if not settings_path.exists():
            return

        with open(settings_path, "r", encoding="utf-8") as f:
            settings = json.load(f)

        constants_path = WORKSPACE / "config" / "constants.json"
        with open(constants_path, "r", encoding="utf-8") as f:
            constants = json.load(f)

        chat_id_key = constants.get("chat_id", "")

        for _key_hash, cfg in settings.items():
            display_name: str = cfg.get("display_name", "")
            username: str = cfg.get("username", "")
            models: dict = cfg.get("models", {})
            last_sessions: dict = cfg.get("last_sessions", {})

            if not display_name or not username:
                continue

            # bot_short_id 추출
            parts = display_name.split("_")
            if len(parts) >= 3 and parts[0].startswith("dev") and parts[0][3:].isdigit():
                bot_short_id = parts[0]  # e.g. "dev3"
            else:
                # anu 봇 등 devN 패턴이 아닌 경우
                bot_short_id = "anu"

            # model 및 team_dir 추출 (chat_id 우선, 없으면 첫 번째 키)
            if chat_id_key and chat_id_key in models:
                model: str = models[chat_id_key]
            elif models:
                model = next(iter(models.values()))
            else:
                model = ""

            if chat_id_key and chat_id_key in last_sessions:
                team_dir: str = last_sessions[chat_id_key]
            elif last_sessions:
                team_dir = next(iter(last_sessions.values()))
            else:
                team_dir = ""

            # bots 섹션 추가/갱신 (기존 항목도 최신 정보로 업데이트)
            constants.setdefault("bots", {})[bot_short_id] = {
                "display_name": display_name,
                "username": username,
                "team_dir": team_dir,
                "model": model,
            }

            # teams 섹션: devN 봇만 추가 (기존 항목 유지)
            if bot_short_id != "anu":
                team_id = f"{bot_short_id}-team"
                constants.setdefault("teams", {}).setdefault(team_id, bot_short_id)

        # meta.last_updated 갱신
        constants.setdefault("meta", {})["last_updated"] = datetime.now().strftime("%Y-%m-%d")

        with open(constants_path, "w", encoding="utf-8") as f:
            json.dump(constants, f, ensure_ascii=False, indent=2)

        logger.info("[bot_settings_sync] constants.json 봇 정보 반영 완료")
    except Exception as e:
        logger.warning(f"[bot_settings_sync] constants.json 업데이트 실패 (무시): {e}")


def _patch_timer_metadata(task_id: str, **metadata: Any) -> None:
    """task-timers.json에서 지정된 task_id 항목에 메타데이터 추가"""
    timer_file = WORKSPACE / "memory" / "task-timers.json"
    lock_file_path = WORKSPACE / "memory" / ".task-timers.lock"
    lock_file_path.parent.mkdir(parents=True, exist_ok=True)

    lock_fd = None
    try:
        lock_fd = open(lock_file_path, "w")
        fcntl.flock(lock_fd, fcntl.LOCK_EX)
        if not timer_file.exists():
            return
        with open(timer_file, "r", encoding="utf-8") as f:
            data = json.load(f)
        task_entry = data.get("tasks", {}).get(task_id)
        if task_entry:
            task_entry.update(metadata)
            atomic_json_write(timer_file, data)
            logger.info(f"메타데이터 패치 완료: {task_id} → {metadata}")
    except Exception as e:
        logger.warning(f"메타데이터 패치 실패 ({task_id}): {e}")
    finally:
        if lock_fd is not None:
            try:
                fcntl.flock(lock_fd, fcntl.LOCK_UN)
                lock_fd.close()
            except Exception:
                pass


# ---------------------------------------------------------------------------
# affected_files / batch_id / level 추정 헬퍼 함수 (task-1858)
# ---------------------------------------------------------------------------


def _parse_affected_files(task_desc: str) -> list:
    """task_desc 문자열에서 affected_files 정보를 찾아 파일명 리스트를 반환한다.

    세 가지 형식을 지원한다:
    - 인라인: 'affected_files: server.py, app.js'
    - 섹션 목록: '## affected_files' 헤더 이후 '- path' 줄들
    - 섹션 인라인: '## affected_files' 헤더 이후 쉼표 구분 값 (괄호 코멘트 자동 제거)

    인라인 → 섹션 목록 → 섹션 인라인 순서로 시도한다.
    없으면 빈 리스트를 반환한다.
    """
    lines = task_desc.splitlines()

    # 1단계: 인라인 형식 시도 (하위 호환)
    for line in lines:
        stripped = line.strip()
        if stripped.startswith("affected_files:"):
            value = stripped[len("affected_files:") :].strip()
            if not value:
                return []
            return [f.strip() for f in value.split(",") if f.strip()]

    # 2단계: 섹션 형식 시도 (## affected_files 헤더)
    collecting = False
    result = []
    for line in lines:
        stripped = line.strip()
        if collecting:
            if stripped.startswith("##"):
                break
            if not stripped:
                continue
            if stripped.startswith("- "):
                result.append(stripped[2:].strip())
            elif not result:
                # 첫 비어있지 않은 줄이 - 목록이 아니면 쉼표 구분 인라인으로 파싱
                items = [re.sub(r"\s*\(.*?\)\s*$", "", f.strip()) for f in stripped.split(",") if f.strip()]
                return [item for item in items if item]
            else:
                break
        elif stripped == "## affected_files":
            collecting = True

    return result


def _enrich_affected_files_with_ast(affected_files: list, workspace_root: str) -> list:
    """AST 의존성 스크립트를 호출하여 blast radius를 계산하고 affected_files를 보강한다.

    scripts/ast_dependency_map.py를 subprocess로 호출하여 직접 임포터(direct_importers)와
    테스트 파일(test_files)을 추출한 뒤 기존 목록에 중복 없이 병합한다.
    스크립트 실패·타임아웃·JSON 파싱 오류 시 원래 목록을 그대로 반환한다 (graceful fallback).
    """
    if not affected_files:
        return affected_files

    ast_script = Path(workspace_root) / "scripts" / "ast_dependency_map.py"
    if not ast_script.exists():
        logger.warning(f"[ast-blast-radius] AST 스크립트를 찾을 수 없음: {ast_script}")
        return affected_files

    try:
        cmd = [
            "python3",
            str(ast_script),
            "--root",
            workspace_root,
            "--files",
            *affected_files,
            "--json",
        ]
        result = subprocess.run(
            cmd,
            capture_output=True,
            text=True,
            timeout=30,
        )
        if result.returncode != 0:
            logger.warning(
                f"[ast-blast-radius] 스크립트 비정상 종료 (rc={result.returncode}): " f"{result.stderr.strip()[:200]}"
            )
            return affected_files

        data = json.loads(result.stdout)
        existing = set(affected_files)
        extra: list = []
        for key in ("direct_importers", "test_files"):
            for f in data.get(key, []):
                if f and f not in existing:
                    existing.add(f)
                    extra.append(f)

        if extra:
            logger.info(f"[ast-blast-radius] blast radius 보강: +{len(extra)}개 파일 추가")
            return affected_files + extra

        return affected_files

    except subprocess.TimeoutExpired:
        logger.warning("[ast-blast-radius] 타임아웃(30s) — 원래 affected_files 사용")
        return affected_files
    except json.JSONDecodeError as e:
        logger.warning(f"[ast-blast-radius] JSON 파싱 실패: {e} — 원래 affected_files 사용")
        return affected_files
    except Exception as e:
        logger.warning(f"[ast-blast-radius] 예외 발생: {e} — 원래 affected_files 사용")
        return affected_files


def _check_affected_files_overlap(affected_files: list, current_task_id: str) -> list:
    """task-timers.json의 running 상태 task들과 affected_files 교집합을 확인한다.

    current_task_id는 제외(자기 자신).
    겹침 발견 시 경고 메시지 문자열 리스트를 반환한다.
    파일이 없으면 빈 리스트를 반환한다.
    """
    timer_file = WORKSPACE / "memory" / "task-timers.json"
    if not timer_file.exists():
        return []

    try:
        with open(timer_file, "r", encoding="utf-8") as f:
            data = json.load(f)
    except Exception as e:
        logger.warning(f"[_check_affected_files_overlap] task-timers.json 읽기 실패: {e}")
        return []

    warnings = []
    tasks = data.get("tasks", {})
    affected_set = set(affected_files)

    for task_id, task_info in tasks.items():
        if task_id == current_task_id:
            continue
        if task_info.get("status") != "running":
            continue
        other_files = set(task_info.get("affected_files", []))
        overlap = affected_set & other_files
        if overlap:
            overlap_list = ", ".join(sorted(overlap))
            warnings.append(f"[파일 충돌 경고] {task_id}(running)와 파일 겹침: {overlap_list}")

    return warnings


def _send_overlap_telegram_warning(warnings: list) -> None:
    """겹침 경고를 Telegram으로 발송한다. 실패해도 dispatch를 중단하지 않는다."""
    if not warnings:
        return
    bot_token = os.environ.get("ANU_BOT_TOKEN", "")
    if not bot_token:
        logger.warning("[overlap-telegram] ANU_BOT_TOKEN 미설정, Telegram 경고 스킵")
        return
    message = "⚠️ affected_files 겹침 감지:\n" + "\n".join(warnings)
    try:
        import urllib.request

        url = f"https://api.telegram.org/bot{bot_token}/sendMessage"
        data = json.dumps({"chat_id": CHAT_ID, "text": message}).encode("utf-8")
        req = urllib.request.Request(url, data=data, headers={"Content-Type": "application/json"})
        urllib.request.urlopen(req, timeout=10)
        logger.info(f"[overlap-telegram] 겹침 경고 Telegram 발송 완료 ({len(warnings)}건)")
    except Exception as e:
        logger.warning(f"[overlap-telegram] Telegram 발송 실패: {e}")


def _warn_missing_affected_files(task_desc: str, task_level: int):
    """task_level >= 2이고 affected_files가 없으면 경고 문자열을 반환한다.

    그 외에는 None을 반환한다.
    """
    if task_level < 2:
        return None
    files = _parse_affected_files(task_desc)
    if not files:
        return (
            f"[경고] Lv.{task_level} 작업에 affected_files가 기재되지 않았습니다. "
            "영향받는 파일 목록을 추가해 주세요."
        )
    return None


# ---------------------------------------------------------------------------
# affected_files / goal_assertions 자동 주입 (task-2147)
# ---------------------------------------------------------------------------

ALLOWED_COMMANDS = {"grep", "curl", "pytest", "python3", "tsc", "cat", "jq", "npx", "npm"}


def _auto_inject_affected_files(task_desc: str, workspace_root: str) -> str:
    """task_desc에 ## affected_files 미기재 시, 백틱 코드 토큰 기반 자동 탐지"""
    if "## affected_files" in task_desc:
        return task_desc  # 이미 기재됨 → SKIP

    # 1. 백틱 코드 토큰 추출
    tokens = re.findall(r'`([A-Za-z_]\w*(?:\(\))?)`', task_desc)

    # 2. COMMON_FILTER 제외
    COMMON_FILTER = {"data", "result", "config", "props", "state", "error",
                     "value", "item", "items", "list", "name", "path",
                     "type", "id", "key", "index", "event", "options"}
    tokens = [t.rstrip("()") for t in tokens if t.rstrip("()").lower() not in COMMON_FILTER]
    tokens = list(dict.fromkeys(tokens))[:10]  # 중복 제거, 최대 10개

    # 3. grep -rl 실행
    affected = set()
    for token in tokens:
        try:
            result = subprocess.run(
                ["grep", "-rl", "--include=*.py", "--include=*.ts", "--include=*.tsx",
                 "--exclude-dir=node_modules", "--exclude-dir=.git",
                 token, workspace_root],
                capture_output=True, text=True, timeout=10
            )
            for line in result.stdout.strip().split("\n"):
                if line:
                    affected.add(os.path.relpath(line, workspace_root))
        except subprocess.TimeoutExpired:
            logger.warning(f"[auto-affected] grep 타임아웃: {token}")
            continue

    # 4. 20파일 초과 시 주입 안 함 (정밀도 부족)
    if len(affected) > 20:
        logger.warning(f"[auto-affected] {len(affected)}개 파일 감지 (>20) — 주입 안 함")
        return task_desc

    # 5. 섹션 추가
    if affected:
        section = "\n\n## affected_files (auto-detected)\n" + "\n".join(f"- {f}" for f in sorted(affected))
        task_desc += section

    return task_desc


def _auto_generate_goal_assertions(task_desc: str, workspace_root: str) -> str:
    """검증 시나리오에서 실행 가능한 goal_assertions 자동 생성"""
    if "## goal_assertions" in task_desc:
        return task_desc

    # 검증 시나리오 섹션에서 백틱 command 추출
    commands = re.findall(r'`((?:grep|curl|pytest|python3|tsc|cat|jq|npx|npm)\s[^`]+)`', task_desc)

    if not commands:
        return task_desc

    # 보안: ALLOWED_COMMANDS 화이트리스트 체크
    safe_commands = []
    for cmd in commands:
        first_word = cmd.split()[0]
        if first_word in ALLOWED_COMMANDS:
            safe_commands.append(cmd)

    if safe_commands:
        section = "\n\n## goal_assertions (auto-generated)\n"
        for cmd in safe_commands[:5]:  # 최대 5개
            section += f"- `{cmd}`\n"
        task_desc += section

    return task_desc


def _get_ast_blast_radius(affected_files: list, workspace_root) -> dict:
    """affected_files의 .py 파일을 AST 분석하여 blast radius(영향 범위)를 반환한다.

    각 .py 파일에 대해 ast_dependency_map.py 스크립트를 호출하여
    direct_importers와 test_files를 수집한다.

    Args:
        affected_files: 영향받는 파일 경로 리스트
        workspace_root: 워크스페이스 루트 경로 (Path 또는 str)

    Returns:
        {
            "direct_importers": [...],  # 직접 임포터 목록 (중복 제거)
            "test_files": [...],        # 관련 테스트 파일 목록 (중복 제거)
            "total_affected": int,      # 총 영향 파일 수
        }
    """
    ast_script = "/home/jay/workspace/scripts/ast_dependency_map.py"
    direct_importers: list = []
    test_files: list = []

    py_files = [f for f in affected_files if str(f).endswith(".py")]
    if not py_files:
        return {"direct_importers": [], "test_files": [], "total_affected": 0}

    for fpath in py_files:
        fpath_str = str(fpath)
        root_dir = str(Path(fpath_str).parent)
        filename = Path(fpath_str).name
        try:
            result = subprocess.run(
                ["python3", ast_script, "--root", root_dir, "--files", filename],
                capture_output=True,
                text=True,
                timeout=30,
            )
            if result.returncode != 0:
                logger.warning(
                    f"[ast-blast-radius] {filename} 분석 실패 (returncode={result.returncode}): {result.stderr[:200]}"
                )
                continue
            data = json.loads(result.stdout)
            blast = data.get("blast_radius", {})
            direct_importers.extend(blast.get("direct_importers", []))
            test_files.extend(blast.get("test_files", []))
        except subprocess.TimeoutExpired:
            logger.warning(f"[ast-blast-radius] {filename} 분석 timeout(30s) 초과 — 건너뜀")
        except json.JSONDecodeError as e:
            logger.warning(f"[ast-blast-radius] {filename} JSON 파싱 실패: {e}")
        except Exception as e:
            logger.warning(f"[ast-blast-radius] {filename} 예외 발생: {e}")

    # 중복 제거
    direct_importers = list(dict.fromkeys(direct_importers))
    test_files = list(dict.fromkeys(test_files))

    return {
        "direct_importers": direct_importers,
        "test_files": test_files,
        "total_affected": len(direct_importers) + len(test_files),
    }


def check_batch_completion(batch_id: str) -> dict:
    """task-timers.json에서 batch_id가 일치하는 모든 task의 완료 여부를 반환한다.

    Returns:
        {
            "complete": bool,   # total > 0이고 모두 done/completed인 경우 True
            "total": int,       # 해당 batch_id task 수
            "done": int,        # done 또는 completed 상태 수
            "pending": list[str],  # 미완료 task id 목록
        }
    """
    timer_file = WORKSPACE / "memory" / "task-timers.json"
    if not timer_file.exists():
        return {"complete": False, "total": 0, "done": 0, "pending": []}

    try:
        with open(timer_file, "r", encoding="utf-8") as f:
            data = json.load(f)
    except Exception as e:
        logger.warning(f"[check_batch_completion] task-timers.json 읽기 실패: {e}")
        return {"complete": False, "total": 0, "done": 0, "pending": []}

    tasks = data.get("tasks", {})
    matched = {task_id: info for task_id, info in tasks.items() if info.get("batch_id") == batch_id}

    total = len(matched)
    if total == 0:
        return {"complete": False, "total": 0, "done": 0, "pending": []}

    done_count = 0
    pending = []
    for task_id, info in matched.items():
        status = info.get("status", "")
        if status in ("done", "completed"):
            done_count += 1
        else:
            pending.append(task_id)

    complete = done_count == total
    return {"complete": complete, "total": total, "done": done_count, "pending": pending}


# 핵심 파일 목록: 이 파일들이 affected_files에 포함되면 Lv.2 이상 권장
_CORE_FILES = {
    "server.py",
    "dispatch.py",
    "app.py",
    "main.py",
    "wsgi.py",
    "asgi.py",
    "settings.py",
    "config.py",
    "manage.py",
    "celery.py",
}

# 고레벨 키워드: task_desc에 포함되면 Lv.3 이상 권장
_HIGH_LEVEL_KEYWORDS = [
    "아키텍처",
    "리아키텍처",
    "전체 구조",
    "architecture",
]


def _estimate_task_level(task_desc: str, affected_files: list) -> tuple:
    """task_desc와 affected_files를 분석하여 권장 레벨과 이유를 반환한다.

    우선순위:
    1. 고레벨 키워드('아키텍처' 등) → (3, 이유)
    2. 핵심 파일 포함(server.py 등) → (2, 이유)
    3. affected_files >= 5 → (3, 이유)
    4. affected_files >= 3 → (2, 이유)
    5. 그 외 → (1, "")

    Returns:
        (level: int, reason: str)
    """
    # 1. 고레벨 키워드 체크 (최우선)
    for keyword in _HIGH_LEVEL_KEYWORDS:
        if keyword in task_desc:
            return (3, f"'{keyword}' 키워드 감지")

    # 2. 핵심 파일 포함 체크
    for f in affected_files:
        filename = Path(f).name
        if filename in _CORE_FILES:
            return (2, f"핵심 파일 포함: {filename}")

    # 3. affected_files 개수 기반
    n = len(affected_files)
    if n >= 5:
        return (3, f"affected_files {n}개로 Lv.3 이상 권장")
    if n >= 3:
        return (2, f"affected_files {n}개로 Lv.2 이상 권장")

    return (1, "")


def _parse_task_level(task_desc: str) -> int:
    """task_desc에서 Lv.{숫자} 패턴을 찾아 정수로 반환한다.

    없으면 기본값 1을 반환한다.
    """
    match = re.search(r"Lv\.(\d+)", task_desc)
    if match:
        return int(match.group(1))
    return 1


def _check_agent_meeting(task_id: str, task_desc: str, level: str, skip_meeting: bool = False) -> Optional[str]:
    """Lv.4 작업의 Agent 미팅 만장일치 결과 파일 존재 여부를 검증한다.

    Lv.4 판정: level이 'critical' 또는 'security'이고, task_desc에 Lv.4 패턴이 포함.
    미팅 파일 경로: memory/meetings/agent-meeting-{task_id}.md
    파일이 있으면 '만장일치' 또는 'unanimous' 키워드 포함 여부도 체크.
    부정 패턴('만장일치 실패', 'unanimous not', 'not unanimous')은 제외.

    Returns:
        WARNING 메시지 문자열, 또는 None (체크 통과/대상 아님)
    """
    # Lv.4 여부 판정
    _level_to_int_meeting = {"normal": 2, "critical": 3, "security": 4}
    dispatch_level = _level_to_int_meeting.get(level, 2)
    if dispatch_level < 3:
        return None

    # task_desc에서 Lv.4 패턴 탐지
    lv4_pattern = re.compile(r"(?:Lv\.4|레벨:\s*4|레벨:\s*Lv\.4)", re.IGNORECASE)
    if not lv4_pattern.search(task_desc):
        return None

    # skip_meeting 플래그 체크
    if skip_meeting:
        logger.info(f"[agent-meeting] --skip-meeting 플래그로 미팅 체크 스킵: {task_id}")
        return None

    # 미팅 파일 존재 체크
    meeting_file = WORKSPACE / "memory" / "meetings" / f"agent-meeting-{task_id}.md"
    if not meeting_file.exists():
        msg = (
            f"⚠️ Lv.4 작업({task_id})에 Agent 미팅 결과 파일이 없습니다: {meeting_file.name}. "
            f"--skip-meeting 플래그로 스킵 가능."
        )
        logger.warning(f"[agent-meeting] {msg}")
        return msg

    # 파일이 있으면 만장일치 키워드 체크
    try:
        content = meeting_file.read_text(encoding="utf-8")
        # 부정 패턴 먼저 체크
        negative_patterns = ["만장일치 실패", "unanimous not", "not unanimous", "만장일치하지"]
        has_negative = any(neg in content for neg in negative_patterns)
        has_unanimous = "만장일치" in content or "unanimous" in content.lower()

        if has_negative or not has_unanimous:
            msg = (
                f"⚠️ Lv.4 작업({task_id}) 미팅 파일에 만장일치 합의가 확인되지 않습니다: {meeting_file.name}"
            )
            logger.warning(f"[agent-meeting] {msg}")
            return msg
    except OSError as e:
        logger.warning(f"[agent-meeting] 미팅 파일 읽기 실패: {e}")

    return None



def _create_task_docs(task_id: str, level: int) -> "Optional[Path]":
    """작업 3문서(계획서/맥락노트/체크리스트)를 자동 생성한다.

    템플릿 파일을 복사하고 YAML의 {task_id}, {date} 플레이스홀더를 치환한다.
    파일이 이미 존재하면 개별 파일 단위로 스킵(덮어쓰기 금지).

    Args:
        task_id: 작업 ID (예: task-1872, task-1872.1, task-1872_6.2)
        level:   작업 레벨 정수 (3 이상일 때만 호출됨)

    Returns:
        생성된 디렉토리 Path, 또는 검증 실패/에러 시 None
    """
    import re as _re
    import shutil as _shutil
    from datetime import date as _date

    # task_id 정규식 검증
    if not _re.match(r"^task-[\d._]+$", task_id):
        logger.warning(f"[3docs] task_id 검증 실패 (허용 패턴: ^task-[\\d._]+$): {task_id!r}")
        return None

    # 대상 디렉토리 경로 및 path traversal 방어
    tasks_root = WORKSPACE / "memory" / "plans" / "tasks"
    target_dir = tasks_root / task_id
    real_target = Path(os.path.realpath(str(target_dir)))
    real_root = Path(os.path.realpath(str(tasks_root)))
    try:
        real_target.relative_to(real_root)
    except ValueError:
        logger.warning(f"[3docs] path traversal 감지: {real_target} 는 {real_root} 하위가 아님")
        return None

    # 템플릿 디렉토리
    template_dir = WORKSPACE / "prompts" / "templates" / "task-docs"
    templates = [
        ("plan.template.md", "plan.md"),
        ("context-notes.template.md", "context-notes.md"),
        ("checklist.template.md", "checklist.md"),
    ]

    today = _date.today().isoformat()

    try:
        real_target.mkdir(parents=True, exist_ok=True, mode=0o755)
    except OSError as exc:
        logger.error(f"[3docs] 디렉토리 생성 실패: {real_target}: {exc}")
        return None

    for tmpl_name, out_name in templates:
        out_path = real_target / out_name
        if out_path.exists():
            logger.debug(f"[3docs] 이미 존재, 스킵: {out_path}")
            continue
        tmpl_path = template_dir / tmpl_name
        if not tmpl_path.exists():
            logger.warning(f"[3docs] 템플릿 파일 없음: {tmpl_path}")
            continue
        try:
            content = tmpl_path.read_text(encoding="utf-8")
            content = content.replace("{task_id}", task_id).replace("{date}", today)
            out_path.write_text(content, encoding="utf-8")
            logger.debug(f"[3docs] 생성: {out_path}")
        except OSError as exc:
            logger.error(f"[3docs] 파일 쓰기 실패: {out_path}: {exc}")

    return real_target


def check_sessions() -> dict:
    """모든 running 세션의 토큰 사용량 체크 및 자동 대응.

    SessionResilience를 사용하여:
    - 70% 이상: WARNING 이벤트 기록
    - 85% 이상: CRITICAL 이벤트 + 세션 요약 저장 + resume 트리거

    Returns:
        check_all_sessions()의 결과 dict
    """
    if not _SESSION_RESILIENCE_AVAILABLE or _SessionResilience is None:
        return {
            "status": "unavailable",
            "message": "SessionResilience 모듈을 사용할 수 없습니다.",
            "checked": 0,
            "warnings": [],
            "criticals": [],
            "normals": 0,
        }

    resilience = _SessionResilience(workspace_root=str(WORKSPACE))
    result = resilience.check_all_sessions()
    logger.info(
        "세션 체크 완료: checked=%d, warnings=%d, criticals=%d",
        result.get("checked", 0),
        len(result.get("warnings", [])),
        len(result.get("criticals", [])),
    )
    return result


def _check_bot_process(bot_key_hash: str) -> bool:
    """봇의 cokacdir 프로세스가 실행 중인지 확인.

    pgrep -f로 'cokacdir.*{key_hash}' 패턴을 검색하여
    해당 봇의 cokacdir 프로세스 존재 여부를 판별한다.

    Args:
        bot_key_hash: 봇의 키 해시 (BOT_KEYS 값)

    Returns:
        프로세스 존재 시 True, 미존재 시 False
    """
    if not bot_key_hash:
        return False
    try:
        result = subprocess.run(
            ["pgrep", "-f", f"cokacdir.*{bot_key_hash}"],
            capture_output=True,
            text=True,
            timeout=10,
        )
        # pgrep: exit 0 = match found, exit 1 = no match
        return result.returncode == 0
    except Exception:
        return False



def get_dispatch_time(delay_seconds: int = 10) -> str:
    """현재 시간 + delay_seconds 후의 절대 시간 반환"""
    t = datetime.now() + timedelta(seconds=delay_seconds)
    return t.strftime("%Y-%m-%d %H:%M:%S")


def _compute_next_id_from_timers(timer_file: Path) -> int:
    """task-timers.json에서 다음 ID 계산 (이상치 필터링 적용)"""
    if not timer_file.exists():
        return 1
    try:
        with open(timer_file, "r") as f:
            data = json.load(f)
    except (json.JSONDecodeError, OSError) as e:
        raise RuntimeError(f"task-timers.json이 손상됨. 수동 복구 필요: {e}")

    existing = list(data.get("tasks", {}).keys())
    if not existing:
        return 1

    nums = []
    for t in existing:
        base = t.replace("task-", "")
        # Phase/병렬/재시도 접미사 제거: task-1845_2.2 → 1845
        m = re.match(r"^(\d+)", base)
        if m:
            nums.append(int(m.group(1)))
        else:
            logger.warning(f"task ID 파싱 실패 ({t})")

    if not nums:
        return 1

    # 이상치 필터링: 연속 ID 간 갭이 1000 이상이면 그 위는 이상치로 간주
    sorted_nums = sorted(set(nums))
    filtered_max = sorted_nums[0]
    for i in range(1, len(sorted_nums)):
        if sorted_nums[i] - sorted_nums[i - 1] >= 1000:
            break  # 이 지점부터는 이상치
        filtered_max = sorted_nums[i]

    return filtered_max + 1


def generate_task_id() -> str:
    """자동 태스크 ID 생성 (카운터 파일 기반, 파일 락으로 중복 방지)"""
    timer_file = WORKSPACE / "memory" / "task-timers.json"
    counter_file = WORKSPACE / "memory" / ".task-counter"
    lock_file_path = WORKSPACE / "memory" / ".task-timers.lock"
    lock_file_path.parent.mkdir(parents=True, exist_ok=True)

    lock_file = open(lock_file_path, "w")
    try:
        fcntl.flock(lock_file, fcntl.LOCK_EX)

        # 1. 카운터 파일에서 다음 번호 읽기
        from_counter = False
        if counter_file.exists():
            try:
                next_num = int(counter_file.read_text().strip())
                if next_num <= 0:
                    logger.warning("카운터 파일 값 비정상 (%d), task-timers.json에서 복구", next_num)
                    next_num = _compute_next_id_from_timers(timer_file)
                else:
                    from_counter = True
            except (ValueError, OSError):
                logger.warning("카운터 파일 손상, task-timers.json에서 복구")
                next_num = _compute_next_id_from_timers(timer_file)
        else:
            # 카운터 파일 없으면 task-timers.json에서 계산 (마이그레이션)
            next_num = _compute_next_id_from_timers(timer_file)

        # 카운터 값과 timers.json 최대값 비교 (카운터 파일에서 읽은 경우만)
        if from_counter:
            timers_max = _compute_next_id_from_timers(timer_file)
            if next_num < timers_max:
                logger.warning("카운터(%d) < timers 최대(%d), 보정합니다", next_num, timers_max)
                next_num = timers_max
            elif next_num - timers_max >= 1000:
                logger.warning(
                    "카운터(%d)가 timers 최대(%d) 대비 1000 이상 큼, 이상치로 보정합니다", next_num, timers_max
                )
                next_num = timers_max

        next_id = f"task-{next_num}"

        # 2. 카운터 증가 저장
        counter_file.write_text(str(next_num + 1))

        # 3. 계산한 ID를 즉시 placeholder로 기록 (다른 프로세스가 같은 ID 생성 방지)
        if timer_file.exists():
            try:
                with open(timer_file, "r") as f:
                    timer_data = json.load(f)
            except (json.JSONDecodeError, OSError) as e:
                logger.warning(f"task-timers.json 재읽기 실패, 초기화: {e}")
                timer_data = {"tasks": {}}
        else:
            timer_data = {"tasks": {}}
        if "tasks" not in timer_data:
            timer_data["tasks"] = {}
        timer_data["tasks"][next_id] = {"status": "reserved", "reserved_at": datetime.now().isoformat()}
        timer_file.parent.mkdir(parents=True, exist_ok=True)
        try:
            with open(timer_file, "w") as f:
                json.dump(timer_data, f, ensure_ascii=False, indent=2)
        except OSError as e:
            logger.error(f"task-timers.json 쓰기 실패: {e}")

        return next_id
    finally:
        fcntl.flock(lock_file, fcntl.LOCK_UN)
        lock_file.close()


def _sync_counter_if_needed(task_id: str) -> None:
    """외부 지정된 task_id가 카운터보다 크면 카운터를 업데이트한다."""
    counter_file = WORKSPACE / "memory" / ".task-counter"
    lock_file_path = WORKSPACE / "memory" / ".task-timers.lock"

    base = task_id.replace("task-", "")
    # Phase/병렬/재시도 접미사 제거: task-1845_2.2 → 1845
    match = re.match(r"^(\d+)", base)
    if not match:
        return
    given_num = int(match.group(1))

    lock_file = open(lock_file_path, "w")
    try:
        fcntl.flock(lock_file, fcntl.LOCK_EX)
        current = 0
        if counter_file.exists():
            try:
                current = int(counter_file.read_text().strip())
            except (ValueError, OSError):
                current = 0
        if given_num >= current:
            counter_file.write_text(str(given_num + 1))
            logger.info("카운터 동기화: %d → %d (외부 task_id=%s)", current, given_num + 1, task_id)
    finally:
        fcntl.flock(lock_file, fcntl.LOCK_UN)
        lock_file.close()


def _resolve_resume(base_task_id: str, team_id: str, task_desc: str, force: bool = False) -> dict:
    """--resume 옵션 처리: base task 상태 확인 + 자동 채번 + task 파일 복사.

    Returns:
        성공: {"status": "ok", "new_task_id": "task-XXXX+N", "retry_count": N}
        실패: {"status": "error", "message": "..."}
    """
    # 1. base_task_id 유효성 확인 (task- 접두사 확인)
    if not base_task_id.startswith("task-"):
        return {"status": "error", "message": f"잘못된 task ID 형식입니다: {base_task_id} (task- 접두사 필요)"}

    # 2. task 파일 존재 확인: memory/tasks/{base_task_id}.md
    task_file = Path(WORKSPACE) / "memory" / "tasks" / f"{base_task_id}.md"
    if not task_file.exists():
        return {"status": "error", "message": f"task 파일이 존재하지 않습니다: {task_file}"}

    # 3. .done 파일 존재 확인: memory/events/{base_task_id}.done
    done_file = Path(WORKSPACE) / "memory" / "events" / f"{base_task_id}.done"
    if done_file.exists():
        return {"status": "error", "message": "이미 완료된 작업입니다. 새 task로 위임하세요"}

    # 4. retry_count 확인/증가
    # base에서 기존 +N 제거
    base_without_plus = re.sub(r'\+\d+$', '', base_task_id)

    # retry_count 파일 읽기 (없으면 0)
    retry_count_file = Path(WORKSPACE) / "memory" / "events" / f"{base_task_id}.retry_count"
    file_retry_count = 0
    if retry_count_file.exists():
        try:
            file_retry_count = int(retry_count_file.read_text(encoding="utf-8").strip())
        except (ValueError, OSError):
            file_retry_count = 0

    # 기존 +N 형제 task들 확인해서 최대 N 값 사용
    tasks_dir = Path(WORKSPACE) / "memory" / "tasks"
    max_sibling_n = 0
    if tasks_dir.exists():
        sibling_pattern = re.compile(rf'^{re.escape(base_without_plus)}\+(\d+)\.md$')
        for f in tasks_dir.iterdir():
            m = sibling_pattern.match(f.name)
            if m:
                n = int(m.group(1))
                if n > max_sibling_n:
                    max_sibling_n = n

    # 새 retry_count = max(retry_count파일값, 최대형제N) + 1
    new_retry_count = max(file_retry_count, max_sibling_n) + 1

    # 5. 3회 이상 재시도 경고
    if new_retry_count >= 3 and not force:
        return {"status": "error", "message": f"3회 이상 재시도. 계속하려면 --force 추가 (현재 retry #{new_retry_count})"}

    # 6. 새 task ID 결정
    new_task_id = f"{base_without_plus}+{new_retry_count}"

    # 7. task 파일 복사
    # 원본: memory/tasks/{base_without_plus}.md (base에서 +N 제거한 원본 task 파일)
    origin_task_file = Path(WORKSPACE) / "memory" / "tasks" / f"{base_without_plus}.md"
    if not origin_task_file.exists():
        # base가 이미 +N 형태인 경우 해당 파일 사용
        origin_task_file = task_file
    new_task_file = Path(WORKSPACE) / "memory" / "tasks" / f"{new_task_id}.md"

    try:
        original_content = origin_task_file.read_text(encoding="utf-8")
        retry_meta = (
            f"> **재시도**: {base_task_id} → {new_task_id} (retry #{new_retry_count})\n"
            f"> **재시도 사유**: 이전 세션 실패/중단\n\n"
        )
        new_task_file.write_text(retry_meta + original_content, encoding="utf-8")
    except OSError as e:
        return {"status": "error", "message": f"task 파일 복사 실패: {e}"}

    # 8. retry_count 파일 업데이트
    new_retry_count_file = Path(WORKSPACE) / "memory" / "events" / f"{new_task_id}.retry_count"
    try:
        new_retry_count_file.parent.mkdir(parents=True, exist_ok=True)
        new_retry_count_file.write_text(str(new_retry_count), encoding="utf-8")
    except OSError as e:
        logger.warning(f"retry_count 파일 쓰기 실패: {e}")

    # 9. 결과 반환
    return {"status": "ok", "new_task_id": new_task_id, "retry_count": new_retry_count}


def build_prompt(
    team_id: str,
    task_desc: str,
    task_id: str,
    level: str = "normal",
    project_id: Optional[str] = None,
    chain_id: Optional[str] = None,
    task_type: str = "coding",
) -> str:
    """팀장에게 전달할 프롬프트 생성 (공통 모듈 위임)"""
    if team_id not in TEAM_INFO:
        print(f"Error: 알 수 없는 팀 ID: {team_id}")
        sys.exit(1)
    return _build_team_prompt(
        team_id, task_id, task_desc, level, project_id=project_id, chain_id=chain_id, task_type=task_type
    )


def _update_chain_task(chain_id: str, team_id: str, task_id: str) -> None:
    """chain.json 파일에서 해당 팀의 pending task에 task_id, dispatched_at, status=in_progress 기록"""
    chain_file = WORKSPACE / "memory" / "chains" / f"{chain_id}.json"
    if not chain_file.exists():
        logger.warning(f"chain 파일 없음: {chain_file}")
        return
    lock_path = chain_file.with_suffix(".lock")
    lock_fd = open(lock_path, "w")
    try:
        fcntl.flock(lock_fd, fcntl.LOCK_EX)
        with open(chain_file, "r", encoding="utf-8") as f:
            data = json.load(f)
        # 현재 Phase에서 해당 팀의 pending task를 찾아 업데이트
        current_idx = data.get("current_phase_idx", 0)
        if current_idx < len(data.get("phases", [])):
            phase = data["phases"][current_idx]
            for task in phase["tasks"]:
                if (
                    task["team"] == team_id
                    and task["status"] in ("pending", "in_progress")
                    and task.get("task_id") is None
                ):
                    task["task_id"] = task_id
                    task["dispatched_at"] = datetime.now().isoformat()
                    task["status"] = "in_progress"
                    break
        with open(chain_file, "w", encoding="utf-8") as f:
            json.dump(data, f, ensure_ascii=False, indent=2)
        logger.info(f"chain 업데이트: chain={chain_id}, team={team_id}, task_id={task_id}")
    except Exception as e:
        logger.error(f"chain 업데이트 실패: {e}")
    finally:
        fcntl.flock(lock_fd, fcntl.LOCK_UN)
        lock_fd.close()


def _cleanup_task(task_id: str) -> None:
    """task-timers.json에서 해당 task_id의 reserved 또는 running 엔트리 삭제.

    dispatch() 실패 시 호출하여 orphan 항목을 정리한다.
    task-timer 자동 시작 도입으로 reserved뿐 아니라 running 상태도 정리 대상.
    에러 발생 시 로깅만 하고 무시 (본래 에러 반환이 우선).
    """
    timer_file = WORKSPACE / "memory" / "task-timers.json"
    lock_file_path = WORKSPACE / "memory" / ".task-timers.lock"
    lock_file_path.parent.mkdir(parents=True, exist_ok=True)

    lock_fd = None
    try:
        lock_fd = open(lock_file_path, "w")
        fcntl.flock(lock_fd, fcntl.LOCK_EX)

        if not timer_file.exists():
            return

        with open(timer_file, "r", encoding="utf-8") as f:
            timer_data = json.load(f)

        tasks = timer_data.get("tasks", {})
        task_entry = tasks.get(task_id)

        if task_entry is None:
            logger.debug(f"_cleanup_task: task_id {task_id} 없음, 정리 불필요")
            return

        status = task_entry.get("status")
        if status not in ("reserved", "running"):
            logger.debug(f"_cleanup_task: {task_id} 상태가 정리 대상이 아님 ({status}), 건너뜀")
            return

        del tasks[task_id]
        timer_data["tasks"] = tasks

        with open(timer_file, "w", encoding="utf-8") as f:
            json.dump(timer_data, f, ensure_ascii=False, indent=2)

        logger.info(f"_cleanup_task: {task_id} {status} 엔트리 삭제 완료")

    except Exception as e:
        logger.error(f"_cleanup_task 실패 (task_id={task_id}): {e}")
    finally:
        if lock_fd is not None:
            try:
                fcntl.flock(lock_fd, fcntl.LOCK_UN)
                lock_fd.close()
            except Exception:
                pass


def _warn_large_task_desc(task_desc: str) -> None:
    """지시서 크기가 3000자 이상이면 Phase 분리 권고 WARNING 로그 출력"""
    if len(task_desc) > 3000:
        logger.warning(
            f"[large-task-desc] 지시서 크기 {len(task_desc)}자 (3000자 초과). "
            "Phase 분리를 권장합니다. 한 세션에서 모든 작업을 처리하면 성능이 저하됩니다."
        )


def _warn_phase_without_task_id(task_desc: str, task_id: str, generated_id: str) -> None:
    """Phase 작업 감지 시 --task-id 미지정 경고.

    task_desc에서 Phase 패턴(Phase N, phase N, Phase N.N)을 감지하고,
    task_id가 자동 생성(generated_id와 동일)이면 WARNING 로그를 출력한다.
    작업을 차단하지 않는다 (경고만).
    """
    if task_id != generated_id:
        return
    if re.search(r"(?<![a-zA-Z])[Pp]hase\s+\d", task_desc):
        logger.warning(
            "⚠️ Phase 작업 감지됨. --task-id를 수동 지정하세요. "
            f"(예: --task-id {task_id}_N.N) "
            f"자동 생성 ID: {task_id}"
        )


def _check_referenced_file_sizes(task_desc: str) -> Optional[str]:
    """task_desc에서 참조 파일을 추출하여 대용량 파일 경고 + 총 크기 한도 경고 반환."""
    pattern = re.escape(str(WORKSPACE)) + r"[a-zA-Z0-9/_.\-]+"
    paths = re.findall(pattern, task_desc)
    warnings: list = []
    total_size = 0
    large_files: list = []

    for path in paths:
        if not os.path.isfile(path):
            continue
        try:
            size = os.path.getsize(path)
        except OSError:
            continue
        total_size += size
        if size > 25000:
            filename = os.path.basename(path)
            size_kb = round(size / 1024, 1)
            estimated_tokens = round(size / 2.5)
            large_files.append((filename, size_kb, estimated_tokens))
            warnings.append(
                f"⚠️ 대용량 참조 파일: {filename} ({size_kb}KB, ~{estimated_tokens}tok) → offset/limit 사용 필수"
            )

    # 총 크기 한도 경고
    total_kb = round(total_size / 1024, 1)
    if total_size > MAX_REF_FILE_TOTAL_BYTES:
        warnings.insert(
            0,
            f"🚨 참조 파일 총 크기 {total_kb}KB (한도: {MAX_REF_FILE_TOTAL_BYTES // 1024}KB 초과!) "
            f"— 전체 읽기 금지. 반드시 offset/limit 분할 읽기 또는 요약 파일 참조.",
        )
    elif total_size > MAX_REF_FILE_TOTAL_BYTES * 0.7:
        warnings.insert(
            0,
            f"⚠️ 참조 파일 총 크기 {total_kb}KB (한도: {MAX_REF_FILE_TOTAL_BYTES // 1024}KB의 70% 초과) "
            f"— offset/limit 사용을 강력 권장합니다.",
        )

    return "\n".join(warnings) if warnings else None


def _check_team_qc_env(team_id: str) -> Optional[str]:
    """팀의 QC 환경(verifiers symlink) 사전 검증.

    dev8은 독립 구조 허용으로 예외.
    비정상이면 WARNING 메시지 반환 (블로킹 아닌 경고).
    2026-04-17 에이전트 미팅 합의 항목 3.
    """
    # dev8은 독립 구조 허용
    if team_id == "dev8-team":
        return None

    # dev팀만 검사 대상
    if not team_id.startswith("dev"):
        return None

    team_short = team_id.replace("-team", "")
    verifiers_path = WORKSPACE / "teams" / team_short / "qc" / "verifiers"

    if not verifiers_path.exists():
        return f"⚠️ QC 환경 경고: {team_id}의 verifiers 디렉토리가 없습니다"

    if not os.path.islink(str(verifiers_path)):
        return (
            f"⚠️ QC 환경 경고: {team_id}의 verifiers가 symlink가 아닙니다. " f"shared/verifiers로의 symlink여야 합니다."
        )

    real_path = os.path.realpath(str(verifiers_path))
    expected_suffix = os.path.join("teams", "shared", "verifiers")
    if not real_path.endswith(expected_suffix):
        return (
            f"⚠️ QC 환경 경고: {team_id}의 verifiers symlink가 "
            f"shared/verifiers를 가리키지 않습니다 (실제: {real_path})"
        )

    return None


def _get_large_files(affected_files: list, threshold: int = 2000) -> list:
    """affected_files 중 threshold줄을 초과하는 대형 파일 목록 반환.

    Returns:
        list of dict: [{"path": "...", "lines": 3036}, ...]
    """
    large = []
    for filepath in affected_files:
        resolved = filepath
        if not os.path.isabs(filepath):
            resolved = os.path.join(str(WORKSPACE), filepath)
        if not os.path.isfile(resolved):
            continue
        try:
            with open(resolved, "r", encoding="utf-8", errors="ignore") as f:
                line_count = sum(1 for _ in f)
            if line_count > threshold:
                large.append({"path": filepath, "lines": line_count})
        except OSError:
            continue
    return large


def _inject_platform_rules(task_desc: str) -> Optional[str]:
    """지시서 키워드 기반 플랫폼 규칙 자동 주입.

    네이버 블로그 관련 키워드가 감지되면 config/naver-blog-rules.md 내용을 반환.
    향후 티스토리 등 다른 플랫폼 규칙도 동일 패턴으로 확장 가능.

    Returns:
        주입할 규칙 텍스트 또는 None
    """
    task_lower = task_desc.lower()

    # Step 2: 제외 키워드 — 비-블로그작성 작업 차단
    _EXCLUDE_KEYWORDS = [
        "검증",
        "폐기",
        "리팩토링",
        "마이그레이션",
        "삭제",
        "deprecated",
        "코드 검토",
        "셀프체크",
        "일원화",
        "verify",
        "refactor",
        "테스트 실행",
        "코드 분석",
        "버그 수정",
        "디버깅",
        "debug",
        "코드 리뷰",
    ]

    # Step 1: 긍정 키워드 — 발행/작성 의도가 명확한 경우만 감지
    _WRITE_INTENT = r"(작성|발행|글\s*쓰기|글쓰기|포스팅|올리기|publish|write|post)"
    naver_blog_detected = (
        # blog-publish-naver 스킬 명시: 항상 주입
        "blog-publish-naver" in task_lower
        # 네이버 블로그 + 작성/발행 의도 (순방향)
        or bool(re.search(rf"네이버\s*블로그.{{0,20}}{_WRITE_INTENT}", task_desc))
        # 작성/발행 의도 + 네이버 블로그 (역방향)
        or bool(re.search(rf"{_WRITE_INTENT}.{{0,20}}네이버\s*블로그", task_desc))
        # 네이버 + (블로그)? 발행 — 발행 의도 명확
        or bool(re.search(r"네이버.{0,10}(블로그\s*)?발행", task_desc))
        # 영문 조합: naver blog + 발행/작성 동사
        or bool(re.search(rf"naver.{{0,10}}blog.{{0,20}}{_WRITE_INTENT}", task_lower))
    )

    # 제외 키워드가 있으면 긍정 판정 취소
    if naver_blog_detected and any(kw in task_desc or kw in task_lower for kw in _EXCLUDE_KEYWORDS):
        naver_blog_detected = False

    if not naver_blog_detected:
        return None

    # 규칙 파일 읽기
    rules_path = WORKSPACE / "config" / "naver-blog-rules.md"
    if not rules_path.exists():
        logger.warning("[naver-blog-rules] 규칙 파일 없음: %s", rules_path)
        return None

    try:
        rules_content = rules_path.read_text(encoding="utf-8")
        logger.info("[naver-blog-rules] 네이버 블로그 규칙 자동 주입 완료")
        return rules_content
    except OSError as e:
        logger.warning("[naver-blog-rules] 규칙 파일 읽기 실패: %s", e)
        return None


def _warn_research_impl_mix(task_desc: str, task_type: str) -> None:
    """리서치+구현 혼합 지시서 감지 시 WARNING 로그 출력"""
    research_keywords = [
        "리서치",
        "조사",
        "파악",
        "현재 상태",
        "가능한지",
        "방법 조사",
        "API 문서",
        "HTML 구조",
        "셀렉터",
        "외부 서비스",
        "인증 방식",
        "2FA",
        "OTP",
        "research",
        "investigate",
        "feasibility",
    ]
    impl_keywords = [
        "구현",
        "구축",
        "코딩",
        "코드 작성",
        "테스트 작성",
        "파이프라인",
        "implement",
        "build",
        "Publisher",
        "Pipeline",
    ]

    has_research = any(kw in task_desc for kw in research_keywords)
    has_impl = any(kw in task_desc for kw in impl_keywords)

    if has_research and has_impl and task_type != "research":
        logger.warning(
            "[research-impl-mix] 지시서에 리서치와 구현이 혼합되어 있습니다. "
            "Phase 분리를 권장합니다. (specs/research-impl-separation.md 참조) "
            "세션 경량화: 리서치 후 /compact 실행 후 구현에 진입하세요."
        )


def _check_memory_before_dispatch(team_id: str, task_desc: str) -> None:
    """MEMORY.md ★ 항목 확인 및 디자인 작업 dev팀 위임 위반 감지.

    파일 I/O 실패 시 예외를 전파하지 않고 debug 로그만 남긴다.
    """
    memory_file = _MEMORY_BASE_PATH / "MEMORY.md"
    if not memory_file.exists():
        logger.debug("[memory_check] MEMORY.md 파일 없음, 체크 스킵")
        return

    try:
        memory_md = memory_file.read_text(encoding="utf-8")
    except OSError as e:
        logger.debug(f"[memory_check] MEMORY.md 읽기 실패: {e}")
        return

    # ★ 항목 추출
    star_items = [line.strip() for line in memory_md.split("\n") if "★" in line]

    # ★ 항목에 링크된 .md 파일 읽기
    loaded_files: set = set()
    matched_count = 0
    for item in star_items:
        for fname in re.findall(r"\[.*?\]\((.*?\.md)\)", item):
            if fname in loaded_files:
                continue
            ref_path = _MEMORY_BASE_PATH / fname
            if ref_path.exists():
                try:
                    ref_path.read_text(encoding="utf-8")
                    loaded_files.add(fname)
                    matched_count += 1
                except OSError as e:
                    logger.debug(f"[memory_check] 링크 파일 읽기 실패: {fname} – {e}")

    # 디자인 키워드 매칭
    design_keywords = ["디자인", "배너", "이미지", "포스터", "일러스트", "banner", "image", "poster", "design"]
    task_lower = task_desc.lower()
    detected = [kw for kw in design_keywords if kw in task_lower]
    design_keywords_found = bool(detected)

    # 디자인 키워드 발견 시 피드백 파일 추가 로드 (중복 방지)
    if design_keywords_found:
        extra_fname = "feedback_design_team_routing_v2.md"
        if extra_fname not in loaded_files:
            extra_path = _MEMORY_BASE_PATH / extra_fname
            if extra_path.exists():
                try:
                    extra_path.read_text(encoding="utf-8")
                    loaded_files.add(extra_fname)
                    matched_count += 1
                except OSError as e:
                    logger.debug(f"[memory_check] 피드백 파일 읽기 실패: {extra_fname} – {e}")

    # FTS5 Layer 1 인덱스 검색으로 관련 메모리 빠르게 확인
    fts_count = 0
    try:
        from utils.memory_indexer import MemoryIndexer

        _indexer = MemoryIndexer()
        try:
            fts_results = _indexer.search(task_desc, limit=3, layer="index")
            fts_count = len(fts_results)
        finally:
            _indexer.close()
    except Exception as e:
        logger.debug(f"[memory_check] FTS5 Layer 1 검색 실패 (무시): {e}")

    logger.info(
        f"[memory_check] ★ {len(star_items)}개 확인, 관련 피드백 {matched_count}개 확인"
        f", FTS5 관련 메모리 {fts_count}건"
    )

    # 위반 감지: dev팀에 디자인 작업 위임
    if team_id.startswith("dev") and design_keywords_found:
        logger.warning(
            "⚠️ [memory_check] 디자인 작업을 dev팀에 위임하려고 합니다! " f"team={team_id}, 감지 키워드: {detected}"
        )


def _check_brainstorming_gate(task_id: str, task_desc: str, level: str, skip_brainstorming: bool = False) -> None:
    """Lv.3+ UX 작업에 brainstorming 사전 실행 여부를 확인한다.

    skip_brainstorming=True이면 즉시 반환한다.
    level이 3 미만이면 즉시 반환한다.
    UX 관련 키워드가 없으면 즉시 반환한다.
    brainstorming 파일이 없으면 warning을 남긴다.
    """
    if skip_brainstorming:
        logger.debug("[brainstorming-gate] --skip-brainstorming으로 스킵")
        return

    level_map = {"normal": 2, "critical": 3, "security": 4}
    level_int = level_map.get(level, 2)
    if level_int < 3:
        return

    ux_keywords = [
        "UX", "UI", "디자인", "사용자 경험", "인터페이스", "레이아웃",
        "화면", "페이지", "컴포넌트", "프론트", "frontend",
    ]
    task_lower = task_desc.lower()
    detected_keywords = [kw for kw in ux_keywords if kw.lower() in task_lower]
    if not detected_keywords:
        return

    path = _MEMORY_BASE_PATH / "meetings" / f"brainstorming-{task_id}.md"
    if path.exists():
        logger.info(f"[brainstorming-gate] brainstorming 파일 확인됨: {path}")
        return

    logger.warning(
        f"⚠️ [brainstorming-gate] Lv.{level_int} UX 작업에 brainstorming 미실행. "
        f"감지 키워드: {detected_keywords}. "
        f"/brainstorming 스킬 실행 후 재위임하세요. --skip-brainstorming 플래그로 스킵 가능"
    )


def _load_logical_teams() -> dict:
    """config/constants.json에서 logical_teams 도메인 매핑을 로드."""
    # ConfigManager가 가용한 경우 우선 사용 (BotStatusManager 내부도 동일 경로)
    if _CONFIG_AVAILABLE and _cfg:
        try:
            result = _cfg.get_constant("logical_teams")
            if result:
                return dict(result)
        except Exception:
            pass

    # BotStatusManager가 가용한 경우 위임 (파일 직접 읽기 포함)
    if _BOT_STATUS_AVAILABLE and _BotStatusManager is not None:
        mgr = _BotStatusManager(workspace_root=WORKSPACE)
        # ConfigManager 없이 파일 직접 읽기만 수행
        constants = mgr._load_constants()
        result = constants.get("logical_teams", {})
        if result:
            return result

    # fallback: 파일 직접 읽기
    constants_path = WORKSPACE / "config" / "constants.json"
    if constants_path.exists():
        try:
            with open(constants_path, "r", encoding="utf-8") as f:
                data = json.load(f)
            return data.get("logical_teams", {})
        except (json.JSONDecodeError, OSError):
            pass
    return {}


def _suggest_team(task_desc: str) -> Optional[str]:
    """task 설명에서 키워드 매칭 → 적합한 논리적 팀 추천.

    Returns:
        추천 팀 ID (예: "design") 또는 None (매칭 없음)
    """
    if _BOT_STATUS_AVAILABLE and _BotStatusManager is not None:
        return _BotStatusManager(workspace_root=WORKSPACE).suggest_team(task_desc)

    # fallback: BotStatusManager 미사용 시 기존 로직
    logical_teams = _load_logical_teams()
    if not logical_teams:
        return None

    best_team: Optional[str] = None
    best_score: int = 0

    for team_id, config in logical_teams.items():
        if team_id == "composite":
            continue  # composite은 추천 대상이 아님

        keywords: list = config.get("keywords", [])
        anti_keywords: list = config.get("anti_keywords", [])

        # anti-keyword가 있으면 해당 팀 제외
        if any(ak in task_desc for ak in anti_keywords):
            continue

        # keyword 매칭 점수 계산
        score = sum(1 for kw in keywords if kw in task_desc)

        if score > best_score:
            best_score = score
            best_team = team_id

    return best_team if best_score > 0 else None


def _validate_team_routing(team_id: str, task_desc: str, override_routing: bool = False) -> Optional[str]:
    """dev팀에 논리적 팀 소관 작업이 위임되면 WARNING 반환.

    Args:
        team_id: 위임 대상 팀 ID
        task_desc: 작업 설명
        override_routing: True이면 라우팅 경고를 무시

    Returns:
        경고 메시지 또는 None (문제 없음)
    """
    if _BOT_STATUS_AVAILABLE and _BotStatusManager is not None:
        msg = _BotStatusManager(workspace_root=WORKSPACE).validate_routing(team_id, task_desc, override_routing)
        if msg is None:
            return None
        # BotStatusManager 메시지를 CLI 스타일로 변환 (호환성 유지)
        msg = msg.replace("override_routing=True를 사용하세요", "--override-routing을 추가하세요")
        if not msg.startswith("⚠️"):
            msg = "⚠️ " + msg
        return msg

    # fallback: BotStatusManager 미사용 시 기존 로직
    # 논리적 팀이면 검증 불필요
    if team_id in DYNAMIC_BOT_TEAMS:
        return None

    # dev팀에 논리적 팀 소관 작업이 위임되는지 확인
    suggested = _suggest_team(task_desc)
    if suggested is None:
        return None

    if override_routing:
        logger.warning(f"[routing-override] {team_id}에 {suggested} 소관 작업 위임 (--override-routing 적용)")
        return None

    logical_teams = _load_logical_teams()
    description = logical_teams.get(suggested, {}).get("description", "")
    return (
        f"⚠️ 이 작업은 --team {suggested}이 적합합니다 ({description}). " f"계속하려면 --override-routing을 추가하세요."
    )


def _validate_composite_teams(teams_str: str) -> List[str]:
    """composite teams 문자열 파싱 + 유효성 검증.

    Args:
        teams_str: 쉼표 구분 팀 ID 문자열 (예: "marketing,design")

    Returns:
        검증된 팀 ID 리스트

    Raises:
        ValueError: 유효하지 않은 입력
    """
    teams = [t.strip() for t in teams_str.split(",") if t.strip()]
    if len(teams) < 2:
        raise ValueError(f"composite는 2개 이상의 팀이 필요합니다 (입력: {teams_str!r})")
    if len(teams) > MAX_COMPOSITE_TEAMS:
        raise ValueError(f"최대 {MAX_COMPOSITE_TEAMS}개 팀까지 허용 (입력: {len(teams)}개)")
    unknown = [t for t in teams if t not in COMPOSITE_ALLOWED_TEAMS]
    if unknown:
        raise ValueError(f"알 수 없는 팀 ID: {unknown}. 허용 목록(소문자): {sorted(COMPOSITE_ALLOWED_TEAMS)}")
    if len(teams) != len(set(teams)):
        raise ValueError(f"중복 팀 ID: {teams}")
    return teams


def _dispatch_composite(
    composite_teams: List[str],
    task_desc: str,
    level: str = "normal",
    task_id: Optional[str] = None,
    force: bool = False,
    model: Optional[str] = None,
) -> dict:
    """복합업무 임시팀 디스패치.

    하나의 봇 세션에 여러 논리적 팀을 조합하여 Phase별로 순차 실행.
    임시팀장(Opus)이 Phase 관리, 핸드오프, 품질 검수를 일괄 수행.
    """
    generated_id = None
    if task_id is None:
        task_id = generate_task_id()
        generated_id = task_id
    else:
        # 외부 지정된 task_id로 카운터 동기화
        _sync_counter_if_needed(task_id)

    # Phase 작업 감지 경고
    if generated_id is not None:
        _warn_phase_without_task_id(task_desc, task_id, generated_id)

    # 재검증 (Defense in Depth)
    _validate_composite_teams(",".join(composite_teams))

    # injection_guard: composite는 high/critical 즉시 차단
    if _INJECTION_GUARD_AVAILABLE and _scan_content is not None:
        try:
            _scan_result = _scan_content(task_desc)
            if not _scan_result.is_safe:
                _high_threats = [t for t in _scan_result.threats if t.severity in ("high", "critical")]
                if _high_threats:
                    _cleanup_task(task_id)
                    return {
                        "status": "error",
                        "message": f"보안 위협 감지 (composite 차단): {[t.pattern_name for t in _high_threats]}",
                    }
        except Exception as _e:
            logger.debug(f"[injection_guard] 스캔 실패 (무시): {_e}")

    # approval 검사
    if _APPROVAL_AVAILABLE and _check_command is not None:
        try:
            _approval_input = f"composite {task_desc}"
            _approval_result = _check_command(_approval_input)
            logger.info(
                f"[approval] risk_level={_approval_result.risk_level}, "
                f"composite_teams={composite_teams}, "
                f"patterns={_approval_result.matched_patterns}"
            )
        except Exception as _e:
            logger.debug(f"[approval] 검사 실패 (무시): {_e}")

    # model_router 검사
    _recommended_model: str | None = model  # 명시적 model 파라미터 우선
    if _recommended_model is None and _MODEL_ROUTER_AVAILABLE and _route_model is not None:
        try:
            _recommended_model = _route_model(task_desc)
            logger.info(f"[model_router] 권장 모델: {_recommended_model}, composite_teams={composite_teams}")
        except Exception as _e:
            logger.debug(f"[model_router] 라우팅 실패 (무시): {_e}")
    elif _recommended_model:
        logger.info(f"[model_router] 명시적 모델 지정: {_recommended_model}, composite_teams={composite_teams}")

    logger.info(f"복합업무 위임 시작: composite_teams={composite_teams}, task_id={task_id}")

    # task-timer 자동 시작
    short_desc = task_desc[:60] + ("..." if len(task_desc) > 60 else "")
    timer_cmd = [
        "python3",
        str(TASK_TIMER),
        "start",
        task_id,
        "--team",
        "composite",
        "--desc",
        short_desc,
    ]
    timer_result = subprocess.run(timer_cmd, capture_output=True, text=True, timeout=30)
    if timer_result.returncode != 0:
        logger.warning(f"task-timer start 실패 (task_id={task_id}): {timer_result.stderr.strip()}")

    # 같은 팀에 이미 running 태스크가 있는지 확인 (composite 포함 팀 각각 순회)
    timer_file = WORKSPACE / "memory" / "task-timers.json"
    if timer_file.exists():
        try:
            with open(timer_file, "r", encoding="utf-8") as f:
                existing_data = json.load(f)
            for existing_tid, existing_task in existing_data.get("tasks", {}).items():
                if existing_task.get("status") != "running" or existing_tid == task_id:
                    continue
                existing_team = existing_task.get("team_id", "")
                existing_composite = existing_task.get("composite_teams", [])
                # 기존 running 태스크가 사용 중인 팀 집합
                existing_team_set = set(existing_composite) if existing_composite else {existing_team}
                # 새 composite 팀과 겹치는 팀
                overlap = existing_team_set & set(composite_teams)
                if overlap:
                    if not force:
                        _cleanup_task(task_id)
                        return {
                            "status": "error",
                            "message": (
                                f"겹치는 팀({overlap})에 이미 running 태스크가 있습니다: "
                                f"{existing_tid}. 완료 후 재시도하거나 --force로 강제 실행하세요."
                            ),
                        }
                    logger.warning(
                        f"[force] 겹치는 팀({overlap})에 이미 running 태스크 있음: "
                        f"{existing_tid}. --force로 강제 진행."
                    )
                    break
        except (json.JSONDecodeError, OSError) as e:
            logger.warning(f"running 태스크 확인 실패: {e}")

    # task_desc 첫 줄의 task-id를 실제 task_id로 교체
    task_desc = re.sub(r"^# task-\d+\.\d+:", f"# {task_id}:", task_desc, count=1)

    # task_desc를 파일에 저장
    task_file = WORKSPACE / "memory" / "tasks" / f"{task_id}.md"
    task_file.parent.mkdir(parents=True, exist_ok=True)
    task_file.write_text(task_desc, encoding="utf-8")

    # audit_logger
    if _AUDIT_LOGGER_AVAILABLE and _log_file_operation is not None:
        try:
            _log_file_operation(task_id=task_id, filepath=str(task_file), tool="dispatch", operation="write")
        except Exception as _e:
            logger.debug(f"[audit_logger] 감사 기록 실패 (무시): {_e}")

    # build_composite_prompt 호출
    try:
        from prompts.team_prompts import build_composite_prompt

        prompt = build_composite_prompt(composite_teams, task_id, task_desc, level)
    except Exception as e:
        logger.error(f"build_composite_prompt 실패: {e}")
        _cleanup_task(task_id)
        return {"status": "error", "message": f"프롬프트 생성 실패: {e}"}

    # ── sanitize 게이트 경고 삽입 (Lv.3+, composite 경로) ──────────────
    _composite_level_to_int = {"normal": 2, "critical": 3, "security": 4}
    _composite_dispatch_level = _composite_level_to_int.get(level, 2)
    if _SANITIZE_GATE_AVAILABLE and _should_sanitize is not None and _should_sanitize(_composite_dispatch_level):
        prompt += (
            "\n\n## ⚠️ Sanitize 게이트 (Lv.3+)\n"
            "외부 AI(Codex/Gemini) 호출 전 반드시 sanitize 검사를 수행하세요:\n"
            "- 대상: affected_files의 코드 + 설계 문서\n"
            "- 마스킹 항목: 주민등록번호, 연락처, API 키, 계좌번호, 보험 증권번호\n"
            "- 방법: `from utils.sanitize_gate import sanitize_text` 사용\n"
            "- PII가 마스킹된 코드만 외부 AI에 전달할 것\n"
        )
        logger.info(f"[sanitize-gate] Lv.{_composite_dispatch_level} composite 작업에 sanitize 게이트 지시 삽입")

    # 가용 봇 원자적 선택 + 즉시 예약 (봇 충돌 방지)
    try:
        selected_bot = _select_and_reserve_bot(task_id, required_model=model)
    except RuntimeError as e:
        _cleanup_task(task_id)
        return {"status": "error", "message": str(e)}
    key_name = BOT_TO_KEY[selected_bot]
    key = BOT_KEYS.get(key_name)
    if key is None:
        logger.warning(f"환경변수 COKACDIR_KEY_{key_name.upper()} 미설정")
        _cleanup_task(task_id)
        return {"status": "error", "message": f"봇 키가 설정되지 않았습니다: COKACDIR_KEY_{key_name.upper()}"}

    # timer metadata 패치 (role, bot, composite_teams)
    _patch_timer_metadata(task_id, role="composite", bot=selected_bot, composite_teams=composite_teams)

    _dispatch_delay = 10  # cron이 자체적으로 세션을 시작하므로 고정 딜레이

    # cokacdir --cron으로 독립 세션 예약
    cmd = [
        "cokacdir",
        "--cron",
        prompt,
        "--at",
        get_dispatch_time(_dispatch_delay),
        "--chat",
        CHAT_ID,
        "--key",
        key,
        "--once",
    ]

    try:
        result = subprocess.run(cmd, capture_output=True, text=True, timeout=60)
    except subprocess.TimeoutExpired:
        logger.error("cokacdir 호출 타임아웃 (60초 초과)")
        _cleanup_task(task_id)
        return {"status": "error", "message": "cokacdir 호출 타임아웃 (60초 초과)"}

    if result.returncode == 0:
        try:
            response = json.loads(result.stdout)
        except json.JSONDecodeError:
            response = {"raw": result.stdout.strip()}

        # 봇 상태를 "processing"으로 설정
        from utils.bot_activity import set_bot_status

        set_bot_status("composite", "processing")
        _sync_bot_settings()

        # daily 로그 기록
        short_desc = task_desc[:60] + ("..." if len(task_desc) > 60 else "")
        if _REDACT_AVAILABLE:
            try:
                short_desc = _redact_text(short_desc)
            except Exception:
                pass
        teams_label = "+".join(composite_teams)
        log_msg = f"[{task_id}] 복합업무 임시팀({teams_label})에게 위임: {short_desc}"
        subprocess.run(
            ["python3", str(TASK_TIMER), "log", log_msg, "--type", "dispatch"],
            capture_output=True,
            text=True,
            timeout=30,
        )

        logger.info(f"복합업무 위임 완료: {task_id} → {teams_label}")
        _result: dict = {
            "status": "dispatched",
            "task_id": task_id,
            "team": "composite",
            "composite_teams": composite_teams,
            "lead": "복합업무 임시팀장",
            "level": level,
            "description": task_desc,
            "message": f"복합업무 임시팀({teams_label})에게 위임 완료. 즉시 독립 세션에서 작업 시작.",
            "cron_response": response,
        }

        # watchdog용 메타데이터 패치: schedule_id, retry_count, max_retry, task_file
        _schedule_id = response.get("id") if isinstance(response, dict) else None
        if _schedule_id:
            _watchdog_meta = {
                "schedule_id": _schedule_id,
                "retry_count": 0,
                "max_retry": 2,
                "task_file": f"memory/tasks/{task_id}.md",
            }
            _patch_timer_metadata(task_id, **_watchdog_meta)

        # Memory Check 발급 (MC)
        try:
            from utils.memory_check import issue_mc

            mc_result = issue_mc(task_id, task_desc)
            if mc_result:
                logger.info(f"[memory-check] MC 발급: {mc_result.get('mc_id')} for {task_id}")
                _result["mc_id"] = mc_result.get("mc_id")
        except Exception as _mc_err:
            logger.warning(f"[memory-check] MC 발급 실패 (무시): {_mc_err}")

        return _result
    else:
        logger.error(f"복합업무 위임 실패: {result.stderr.strip()}")
        _cleanup_task(task_id)
        return {"status": "error", "message": result.stderr.strip(), "command": " ".join(cmd[:4] + ["...(생략)"])}


# ---------------------------------------------------------------------------
# PRD 자동 분해
# ---------------------------------------------------------------------------


def _parse_prd_regex(prd_content: str) -> list[dict]:
    """PRD 문서에서 Phase/Sprint 섹션을 정규식으로 추출한다.

    Args:
        prd_content: PRD 파일 전체 텍스트

    Returns:
        phase 딕셔너리 리스트. 각 항목은 phase_number, phase_type, title,
        duration, body, features, dod 키를 가진다.
    """
    header_pattern = re.compile(
        r"^###\s+(Sprint|Phase)\s+(\d+)\s*\(([^)]+)\)\s*[-—–]\s*(.+)$",
        re.MULTILINE,
    )
    next_section_pattern = re.compile(r"^###\s+(Sprint|Phase)\s+|^##\s+", re.MULTILINE)

    phases: list[dict] = []
    matches = list(header_pattern.finditer(prd_content))

    for i, m in enumerate(matches):
        phase_type = m.group(1)  # "Sprint" or "Phase"
        phase_number = int(m.group(2))
        duration = m.group(3).strip()
        title = m.group(4).strip()

        # 본문: 이 헤더 다음 줄부터 다음 섹션 헤더 직전까지
        body_start = m.end()
        # 다음 Phase/Sprint 또는 ## 헤더 찾기
        next_m = next_section_pattern.search(prd_content, body_start)
        body_end = next_m.start() if next_m else len(prd_content)
        body = prd_content[body_start:body_end].strip()

        # DoD 추출
        dod: Optional[str] = None
        dod_pattern = re.compile(r"\*\*DoD\*\*:(.*?)(?=\n##|\Z)", re.DOTALL)
        dod_m = dod_pattern.search(body)
        if dod_m:
            dod = dod_m.group(1).strip()

        # Features 추출: title 안의 [F1, F2, ...] 패턴
        features: list[str] = []
        feat_m = re.search(r"\[([^\]]+)\]", title)
        if feat_m:
            features = [f.strip() for f in feat_m.group(1).split(",")]

        phases.append(
            {
                "phase_number": phase_number,
                "phase_type": phase_type,
                "title": title,
                "duration": duration,
                "body": body,
                "features": features,
                "dod": dod,
            }
        )

    return phases


def _parse_prd_claude(prd_path: str, prd_content: str) -> list[dict]:
    """claude CLI를 호출해 PRD에서 Phase/Sprint 정보를 JSON으로 추출한다.

    Args:
        prd_path: PRD 파일 경로 (프롬프트 참조용)
        prd_content: PRD 파일 전체 텍스트

    Returns:
        phase 딕셔너리 리스트. 실패 시 빈 리스트.
    """
    prompt = (
        "다음 PRD 문서에서 구현 로드맵 섹션의 Phase/Sprint별 태스크를 JSON 배열로 추출해줘.\n\n"
        "각 항목 형식:\n"
        '{"phase_number": 0, "phase_type": "Sprint" 또는 "Phase", "title": "제목", '
        '"duration": "소요시간", "body": "항목 내용 전체", "features": ["F1", "F2"], '
        '"dod": "완료조건 또는 null"}\n\n'
        "JSON 배열만 출력하세요. 다른 텍스트 없이.\n\n"
        "--- PRD 문서 시작 ---\n"
        f"{prd_content}\n"
        "--- PRD 문서 끝 ---"
    )
    claude_bin = "/home/jay/.local/bin/claude"
    try:
        result = subprocess.run(
            [claude_bin, "-p", prompt, "--output-format", "json"],
            capture_output=True,
            text=True,
            timeout=120,
            cwd="/tmp",
        )
        if result.returncode != 0:
            logger.error(f"[handle_prd] claude CLI 오류: {result.stderr.strip()}")
            return []
        raw = result.stdout.strip()
        # claude --output-format json 은 {"result": "..."} 형태로 감싸서 반환할 수 있음
        try:
            outer = json.loads(raw)
            if isinstance(outer, list):
                return outer
            # {"result": "[...]"} 형태인 경우
            if isinstance(outer, dict) and "result" in outer:
                inner = outer["result"]
                if isinstance(inner, str):
                    return json.loads(inner)
                if isinstance(inner, list):
                    return inner
        except json.JSONDecodeError:
            pass
        # 마지막 수단: raw 텍스트에서 JSON 배열 탐색
        arr_m = re.search(r"\[.*\]", raw, re.DOTALL)
        if arr_m:
            return json.loads(arr_m.group(0))
        logger.error("[handle_prd] claude 출력에서 JSON 배열을 찾지 못했습니다.")
        return []
    except subprocess.TimeoutExpired:
        logger.error("[handle_prd] claude CLI timeout (120s)")
        return []
    except Exception as e:
        logger.error(f"[handle_prd] claude CLI 호출 실패: {e}")
        return []


def handle_prd(prd_path: str, team_id: str) -> dict:
    """PRD 파일을 읽어 Phase별 task 파일을 생성한다. 위임은 하지 않는다.

    Args:
        prd_path: PRD 파일 경로 (절대경로 또는 WORKSPACE 기준 상대경로)
        team_id:  담당 팀 ID

    Returns:
        처리 결과 딕셔너리
    """
    # step 1: PRD 파일 읽기
    prd_file = Path(prd_path)
    if not prd_file.exists():
        prd_file = WORKSPACE / prd_path
    if not prd_file.exists():
        return {"status": "error", "message": f"PRD 파일을 찾을 수 없습니다: {prd_path}"}

    prd_content = prd_file.read_text(encoding="utf-8")
    prd_abs = str(prd_file.resolve())

    # step 2: 정규식 파싱 시도 → 실패 시 claude CLI 폴백
    phases = _parse_prd_regex(prd_content)
    method = "regex"
    if not phases:
        logger.info("[handle_prd] 정규식 파싱 결과 없음, claude CLI 폴백 시도")
        phases = _parse_prd_claude(prd_abs, prd_content)
        method = "claude"

    # step 3: 각 Phase를 task 파일로 생성
    stem = prd_file.stem  # 예: prd-fireauto-integration
    prd_stem = stem[4:] if stem.startswith("prd-") else stem  # 예: fireauto-integration

    tasks_dir = WORKSPACE / "memory" / "tasks"
    tasks_dir.mkdir(parents=True, exist_ok=True)

    created: list[str] = []
    skipped: list[str] = []

    for idx, phase in enumerate(phases):
        phase_num = phase.get("phase_number", idx)
        phase_type = phase.get("phase_type", "Phase")
        title = phase.get("title", "")
        duration = phase.get("duration", "")
        body = phase.get("body", "")
        features: list[str] = phase.get("features") or []
        dod = phase.get("dod")

        task_id = f"dispatch-{prd_stem}-phase{phase_num}"
        file_path = tasks_dir / f"{task_id}.md"

        if file_path.exists():
            skipped.append(str(file_path))
            continue

        # depends_on: 이전 phase의 task_id (phase0이면 빈 리스트)
        depends_on: list[str] = []
        if idx > 0:
            prev_phase_num = phases[idx - 1].get("phase_number", idx - 1)
            depends_on = [f"dispatch-{prd_stem}-phase{prev_phase_num}"]

        created_at = datetime.now().isoformat(timespec="seconds")
        features_str = ", ".join(features) if features else ""
        prd_ref_line = f"`{prd_abs}`"
        if features_str:
            prd_ref_line += f" — {features_str}"

        depends_on_yaml = repr(depends_on)  # [] 또는 ['id']

        dod_section = ""
        if dod:
            dod_section = f"\n## 완료 조건 (DoD)\n{dod}\n"

        content = (
            f"---\n"
            f'task_id: "{task_id}"\n'
            f'team: "{team_id}"\n'
            f"level: 2\n"
            f"priority: P2\n"
            f"depends_on: {depends_on_yaml}\n"
            f'created_at: "{created_at}"\n'
            f"deadline: null\n"
            f"---\n\n"
            f"# {prd_stem} {phase_type} {phase_num}: {title}\n\n"
            f"## PRD 참조 (필수)\n"
            f"{prd_ref_line}\n\n"
            f"---\n\n"
            f"{body}\n"
            f"{dod_section}"
        )

        file_path.write_text(content, encoding="utf-8")
        created.append(str(file_path))
        logger.info(f"[handle_prd] 파일 생성: {file_path}")

    # step 4: 결과 반환
    return {
        "status": "ok",
        "prd": prd_path,
        "team": team_id,
        "method": method,
        "created": created,
        "skipped": skipped,
        "total_phases": len(phases),
    }


def dispatch(
    team_id: Optional[str] = None,
    task_desc: str = "",
    level: str = "normal",
    composite_teams: Optional[List[str]] = None,
    session_id: Optional[str] = None,
    project_id: Optional[str] = None,
    chain_id: Optional[str] = None,
    refresh_map: bool = True,
    task_type: str = "coding",
    task_id: Optional[str] = None,
    phases: Optional[int] = None,
    force: bool = False,
    resume_from: Optional[str] = None,
    agent_type: str = "write",
    model: Optional[str] = None,
    override_routing: bool = False,
    batch_id: Optional[str] = None,
    skip_meeting: bool = False,
    skip_brainstorming: bool = False,
) -> dict:
    """작업을 독립 세션으로 디스패치

    목차→요약→상세 원칙:
    - task_desc를 memory/tasks/<task_id>.md에 저장 (요약 파일)
    - 프롬프트 본문에는 파일 경로만 포함 (build_prompt 내에서 처리)
    - session_id: 아누의 현재 세션 ID. 전달 시 followup이 동일 대화에서 실행됨.
    - task_type: 작업 유형 (coding/research/check). coding일 때만 QC 검증 포함.
    """

    # resume_from 처리: 세션 요약 파일이 지정된 경우 task_desc 앞에 요약 prepend
    if resume_from is not None:
        resume_path = Path(resume_from)
        if not resume_path.exists():
            return {
                "status": "error",
                "message": f"resume_from 파일이 존재하지 않습니다: {resume_from}",
            }
        summary_content = resume_path.read_text(encoding="utf-8")
        task_desc = (
            f"## 이전 세션 요약\n"
            f"아래 요약을 읽고 이어서 작업하세요.\n\n"
            f"{summary_content}\n\n"
            f"---\n\n"
            f"{task_desc}"
        )

    # XOR 검증: team_id 또는 composite_teams 중 하나 필수
    if not team_id and not composite_teams:
        return {"status": "error", "message": "team_id 또는 composite_teams 중 하나 필수"}
    if team_id and composite_teams:
        return {"status": "error", "message": "team_id와 composite_teams는 동시 사용 불가"}

    # composite 분기
    if composite_teams:
        # composite 경로에서도 memory check 수행 (첫 번째 팀 기준)
        _check_memory_before_dispatch(composite_teams[0], task_desc)
        _check_brainstorming_gate(task_id or "unknown", task_desc, level, skip_brainstorming)
        return _dispatch_composite(composite_teams, task_desc, level, task_id=task_id, force=force, model=model)

    # 이 시점에서 team_id는 반드시 str (위 XOR 검증 + composite 분기에서 보장)
    assert team_id is not None

    # ── 논리적 팀 라우팅 검증 ──────────────────────────────────────────
    routing_warning = _validate_team_routing(team_id, task_desc, override_routing)
    if routing_warning:
        return {"status": "error", "message": routing_warning}

    generated_id = None
    if task_id is None:
        task_id = generate_task_id()
        generated_id = task_id
    else:
        # 외부 지정된 task_id로 카운터 동기화
        _sync_counter_if_needed(task_id)

    # Phase 작업 감지 경고
    if generated_id is not None:
        _warn_phase_without_task_id(task_desc, task_id, generated_id)

    # ── injection_guard: task_desc 인젝션 검사 ──────────────────────────
    if _INJECTION_GUARD_AVAILABLE and _scan_content is not None:
        try:
            _scan_result = _scan_content(task_desc)
            if not _scan_result.is_safe:
                _high_threats = [t for t in _scan_result.threats if t.severity in ("high", "critical")]
                if _high_threats:
                    logger.warning(
                        f"[injection_guard] task_desc에서 위험 패턴 감지 (차단 없음, 워크플로우 호환): "
                        f"threats={[t.pattern_name for t in _high_threats]}"
                    )
                else:
                    logger.info(
                        f"[injection_guard] task_desc에서 낮은 위험 패턴 감지: "
                        f"threats={[t.pattern_name for t in _scan_result.threats]}"
                    )
        except Exception as _e:
            logger.debug(f"[injection_guard] 스캔 실패 (무시): {_e}")

    # ── approval: task_type + task_desc 기반 risk level 로깅 ────────────
    if _APPROVAL_AVAILABLE and _check_command is not None:
        try:
            _approval_input = f"{task_type} {task_desc}"
            _approval_result = _check_command(_approval_input)
            logger.info(
                f"[approval] risk_level={_approval_result.risk_level}, "
                f"team={team_id}, task_type={task_type}, "
                f"patterns={_approval_result.matched_patterns}"
            )
        except Exception as _e:
            logger.debug(f"[approval] 검사 실패 (무시): {_e}")

    # ── model_router: task_desc 분석 후 권장 모델 결정 ─────────────────
    _recommended_model: str | None = model  # 명시적 model 파라미터 우선
    if _recommended_model is None and _MODEL_ROUTER_AVAILABLE and _route_model is not None:
        try:
            _recommended_model = _route_model(task_desc)
            logger.info(f"[model_router] 권장 모델: {_recommended_model}, team={team_id}")
        except Exception as _e:
            logger.debug(f"[model_router] 라우팅 실패 (무시): {_e}")
    elif _recommended_model:
        logger.info(f"[model_router] 명시적 모델 지정: {_recommended_model}, team={team_id}")

    logger.info(f"위임 시작: team={team_id}, task_id={task_id}")

    # ── 세션 건강 경량 체크: token-ledger.json에서 대상 팀의 critical 세션만 확인 ──
    if _SESSION_RESILIENCE_AVAILABLE and _SessionResilience is not None:
        try:
            _sr = _SessionResilience(workspace_root=str(WORKSPACE))
            _running = _sr._load_running_tasks()
            _ledger = _sr._load_ledger_tasks()
            for _rtid, _rtinfo in _running.items():
                if _rtinfo.get("team_id") != team_id:
                    continue
                _linfo = _ledger.get(_rtid, {})
                _status = _sr.check_session(_rtid, _rtinfo, _linfo)
                if _status["level"] == "critical":
                    logger.warning(
                        "[session-health] CRITICAL 세션 감지: task=%s, usage=%.1f%%",
                        _rtid,
                        _status.get("usage_pct", 0.0),
                    )
        except Exception as _e:
            logger.debug("[session-health] 세션 체크 실패 (무시): %s", _e)

    # P1-2: RW Isolation - 플래그 연동
    from utils.feature_flags import FeatureFlagLoader

    _rw_flags = FeatureFlagLoader()
    if _rw_flags.is_enabled("rw_isolation_enabled"):
        effective_agent_type = agent_type
    else:
        effective_agent_type = "write"  # 플래그 비활성화 시 항상 write

    # phases가 지정된 경우 chain_id 자동 생성 (chain_manager.py create 호출은 dispatch 성공 후)
    # task_id에서 base 번호 추출: "task-566.1" → 566
    _phases_chain_id: Optional[str] = None
    _phases_base_num: Optional[str] = None
    if phases is not None and task_id:
        _base_match = re.search(r"task-(\d+)\.", task_id)
        if _base_match:
            _phases_base_num = _base_match.group(1)
            _phases_chain_id = f"scoped-{_phases_base_num}"

    # 논리적 팀은 --force 자동 적용 (가용 봇 동적 선택이므로 충돌 위험 낮음)
    effective_force = force or (team_id in DYNAMIC_BOT_TEAMS)

    # 같은 팀에 이미 running 태스크가 있는지 확인
    # effective_force=False(기본): 거부(error 반환) / effective_force=True: 경고 로깅 후 진행
    timer_file = WORKSPACE / "memory" / "task-timers.json"
    if timer_file.exists():
        try:
            with open(timer_file, "r", encoding="utf-8") as f:
                existing_data = json.load(f)
            for existing_tid, existing_task in existing_data.get("tasks", {}).items():
                if (
                    existing_task.get("status") == "running"
                    and existing_task.get("team_id") == team_id
                    and existing_tid != task_id
                ):
                    if not effective_force:
                        _cleanup_task(task_id)
                        return {
                            "status": "error",
                            "message": (
                                f"같은 팀({team_id})에 이미 running 태스크가 있습니다: "
                                f"{existing_tid} ('{existing_task.get('description', '')[:30]}'). "
                                f"완료 후 재시도하거나 --force 플래그로 강제 실행하세요."
                            ),
                        }
                    logger.warning(
                        f"[force] 같은 팀({team_id})에 이미 running 태스크 있음: "
                        f"{existing_tid} ('{existing_task.get('description', '')[:30]}'). "
                        f"{'논리적 팀 자동 --force' if team_id in DYNAMIC_BOT_TEAMS else '--force 플래그로'} 강제 진행."
                    )
                    break
        except (json.JSONDecodeError, OSError) as e:
            logger.warning(f"running 태스크 확인 실패: {e}")

    # task-timer 자동 시작 (아누가 수동으로 할 필요 없음)
    short_desc = task_desc[:60] + ("..." if len(task_desc) > 60 else "")
    # timer_task_id는 task_id와 동일 (team_prompts.py의 timer_end와 일치시킴)
    timer_task_id = task_id
    timer_cmd = ["python3", str(TASK_TIMER), "start", timer_task_id, "--team", team_id, "--desc", short_desc]
    if project_id:
        timer_cmd.extend(["--project", project_id])
    if level:
        timer_cmd.extend(["--work-level", level])
    timer_result = subprocess.run(timer_cmd, capture_output=True, text=True, timeout=30)
    if timer_result.returncode != 0:
        logger.warning(f"task-timer start 실패 (task_id={timer_task_id}): {timer_result.stderr.strip()}")

    # 프로젝트 디렉토리 존재 확인 (project_id가 지정된 경우에만)
    if project_id is not None:
        project_dir = WORKSPACE / "projects" / project_id
        if not project_dir.exists():
            _cleanup_task(timer_task_id)
            return {"status": "error", "message": f"프로젝트 디렉토리가 존재하지 않습니다: {project_dir}"}

    # project-map 자동 갱신 (project_id가 있고 refresh_map이 True인 경우)
    if project_id and refresh_map:
        map_path = WORKSPACE / "memory" / "project-maps" / f"{project_id}.md"
        needs_refresh = False
        if not map_path.exists():
            needs_refresh = True
        else:
            # 파일이 24시간 이상 오래된 경우
            mtime = map_path.stat().st_mtime
            age_hours = (datetime.now().timestamp() - mtime) / 3600
            if age_hours > 24:
                needs_refresh = True

        if needs_refresh:
            project_dir = WORKSPACE / "projects" / project_id
            if project_dir.exists():
                script = WORKSPACE / "scripts" / "project-map.py"
                if script.exists():
                    subprocess.run(
                        ["python3", str(script), str(project_dir), "--output", str(map_path)],
                        capture_output=True,
                        text=True,
                        timeout=30,
                    )

    # task_desc 첫 줄의 task-id를 실제 task_id로 교체
    task_desc = re.sub(r"^# task-\d+\.\d+:", f"# {task_id}:", task_desc, count=1)

    # ── memory_check: MEMORY.md ★ 항목 + 디자인 위임 위반 감지 ─────────────
    _check_memory_before_dispatch(team_id, task_desc)
    _check_brainstorming_gate(task_id, task_desc, level, skip_brainstorming)

    # 리서치-구현 혼합 감지 가드
    _warn_research_impl_mix(task_desc, task_type)
    _warn_large_task_desc(task_desc)
    file_size_warnings = _check_referenced_file_sizes(task_desc)
    if file_size_warnings:
        logger.warning(f"[file-size-check] {file_size_warnings}")

    # ── auto_inject: affected_files + goal_assertions 자동 주입 ──────────
    task_desc = _auto_inject_affected_files(task_desc, str(WORKSPACE))
    if task_type == "coding":
        task_desc = _auto_generate_goal_assertions(task_desc, str(WORKSPACE))

    # task_desc를 파일에 저장 (목차→요약→상세 원칙의 "요약" 단계)
    task_file = WORKSPACE / "memory" / "tasks" / f"{task_id}.md"
    task_file.parent.mkdir(parents=True, exist_ok=True)
    task_file.write_text(task_desc, encoding="utf-8")

    # ── audit_logger: task 파일 생성 감사 기록 ──────────────────────────
    if _AUDIT_LOGGER_AVAILABLE and _log_file_operation is not None:
        try:
            _log_file_operation(
                task_id=task_id,
                filepath=str(task_file),
                tool="dispatch",
                operation="write",
            )
        except Exception as _e:
            logger.debug(f"[audit_logger] 감사 기록 실패 (무시): {_e}")

    # ── QC 환경 사전 체크 (2026-04-17 미팅 합의 항목 3) ──
    qc_env_warning = _check_team_qc_env(team_id)
    if qc_env_warning:
        logger.warning(f"[qc-env] {qc_env_warning}")

    prompt = build_prompt(
        team_id, task_desc, task_id, level, project_id=project_id, chain_id=chain_id, task_type=task_type
    )

    # ── sanitize 게이트 경고 삽입 (Lv.3+) ──────────────────────────────
    _level_to_int = {"normal": 2, "critical": 3, "security": 4}
    _dispatch_level = _level_to_int.get(level, 2)
    if _SANITIZE_GATE_AVAILABLE and _should_sanitize is not None and _should_sanitize(_dispatch_level):
        prompt += (
            "\n\n## ⚠️ Sanitize 게이트 (Lv.3+)\n"
            "외부 AI(Codex/Gemini) 호출 전 반드시 sanitize 검사를 수행하세요:\n"
            "- 대상: affected_files의 코드 + 설계 문서\n"
            "- 마스킹 항목: 주민등록번호, 연락처, API 키, 계좌번호, 보험 증권번호\n"
            "- 방법: `from utils.sanitize_gate import sanitize_text` 사용\n"
            "- PII가 마스킹된 코드만 외부 AI에 전달할 것\n"
        )
        logger.info(f"[sanitize-gate] Lv.{_dispatch_level} 작업에 sanitize 게이트 지시 삽입")

    # ── affected_files 겹침 감지 + 레벨 추정 경고 (task-1858) ──────────
    _af = _parse_affected_files(task_desc)
    # AST blast radius 자동 보강 (task-1869_2.2+1)
    _af = _enrich_affected_files_with_ast(_af, str(WORKSPACE))
    _task_lv = _parse_task_level(task_desc)
    if _af:
        _overlap_warnings = _check_affected_files_overlap(_af, task_id)
        for _ow in _overlap_warnings:
            logger.warning(f"[affected_files] {_ow}")
        _send_overlap_telegram_warning(_overlap_warnings)
        # task-timers.json에 affected_files 저장
        _patch_timer_metadata(timer_task_id, affected_files=_af)
    # Lv.2+ affected_files 미기재 경고
    _af_missing_warn = _warn_missing_affected_files(task_desc, _task_lv)
    if _af_missing_warn:
        logger.warning(f"[affected_files] {_af_missing_warn}")
    # 레벨 자동 추정 vs 수동 지정 비교
    _est_level, _est_reason = _estimate_task_level(task_desc, _af)
    _level_to_int = {"normal": 2, "critical": 3, "security": 4}
    _manual_level = _level_to_int.get(level, 2)
    if _manual_level < _est_level and _est_reason:
        logger.warning(
            f"[level-estimate] ⚠️ Lv.{_manual_level}({level}) 지정했지만 {_est_reason} — " f"Lv.{_est_level} 이상 권장"
        )
    # batch_id 저장
    if batch_id:
        _patch_timer_metadata(timer_task_id, batch_id=batch_id)

    if file_size_warnings:
        prompt += (
            f"\n\n## ⚠️ 참조 파일 크기 주의\n{file_size_warnings}"
            f"\n- 위 파일은 반드시 offset/limit 파라미터를 사용하여 분할 읽기하세요."
            f"\n- 요약 파일(*.summary.md)이 있으면 원본 대신 요약 파일을 먼저 읽으세요."
            f"\n- 한 번에 200줄 이상 읽지 마세요."
        )

    # ── Large-File Protocol (task-1882) ──────────────────────
    if _af:
        _large_files = _get_large_files(_af)
        if _large_files:
            _lf_list = "\n".join(f"  - {lf['path']} ({lf['lines']}줄)" for lf in _large_files)
            prompt += (
                f"\n\n## ⚠️ 대형 파일 프로토콜 (2000줄 초과)\n"
                f"대상:\n{_lf_list}\n"
                f"1. 전체 읽기 금지 → offset/limit으로 수정 대상 ±200줄만 읽기\n"
                f"2. Edit 전 수정 위치의 정확한 라인 번호 확인\n"
                f"3. Edit 후 반드시 grep -n으로 변경 반영 확인\n"
                f"4. 한 번에 50줄 이상 삽입 금지 → 여러 Edit으로 분할\n"
            )

    # ── 플랫폼 규칙 자동 주입 (네이버 블로그 등) ──────────────────────
    platform_rules = _inject_platform_rules(task_desc)
    if platform_rules:
        prompt += f"\n\n{platform_rules}"

    # design 팀 위임 시 image-skill-router 추천 스킬 삽입
    if team_id == "design" and _IMAGE_SKILL_ROUTER_AVAILABLE and _get_skill_recommendation is not None:
        try:
            recommended_skill = _get_skill_recommendation(task_desc)
            prompt += (
                f"\n\n## 🎨 이미지 스킬 라우터 추천\n"
                f"- 추천 스킬: **{recommended_skill}**\n"
                f"- 이 추천은 image-skill-router.py의 `get_skill_recommendation()` 결과입니다.\n"
                f"- 추천 스킬을 우선 사용하되, 작업 특성상 다른 스킬이 적합하면 사유를 명시하세요.\n"
            )
            logger.info(f"[image-skill-router] design 팀 추천 스킬: {recommended_skill}")
        except Exception as e:
            logger.warning(f"[image-skill-router] 추천 실패 (무시): {e}")

    lead = TEAM_INFO[team_id]

    # 팀별 봇 키 결정
    if team_id in DYNAMIC_BOT_TEAMS:
        # 논리적팀: 가용 봇 원자적 선택 + 즉시 예약 (봇 충돌 방지)
        try:
            selected_bot = _select_and_reserve_bot(timer_task_id, required_model=model)
        except RuntimeError as e:
            _cleanup_task(timer_task_id)
            return {"status": "error", "message": str(e)}
        key_name = BOT_TO_KEY[selected_bot]
        key = BOT_KEYS.get(key_name)
        if key is None:
            logger.warning(f"환경변수 COKACDIR_KEY_{key_name.upper()} 미설정")
            _cleanup_task(timer_task_id)
            return {"status": "error", "message": f"봇 키가 설정되지 않았습니다: COKACDIR_KEY_{key_name.upper()}"}
        role = team_id
        bot_id_meta = selected_bot
    else:
        # 기존 dev팀 로직
        bot_id = TEAM_BOT.get(team_id)
        if not bot_id or bot_id not in BOT_KEYS:
            _cleanup_task(timer_task_id)
            return {"status": "error", "message": f"팀 {team_id}에 할당된 봇이 없습니다. 봇 토큰을 먼저 등록하세요."}
        key = BOT_KEYS[bot_id]
        if key is None:
            logger.warning(f"환경변수 COKACDIR_KEY_{bot_id.upper()} 미설정")
            _cleanup_task(timer_task_id)
            return {"status": "error", "message": f"봇 키가 설정되지 않았습니다: COKACDIR_KEY_{bot_id.upper()}"}
        role = team_id.replace("-team", "")
        bot_id_meta = TEAM_TO_BOT_ID.get(team_id, "")
        # dev팀도 bot 필드를 조기 기록 (논리적팀 dispatch와의 충돌 감지용)
        if bot_id_meta:
            _patch_timer_metadata(timer_task_id, bot=bot_id_meta)
        # 봇 충돌 검사: 해당 봇이 다른 작업(composite/dynamic)에 점유되어 있는지 확인
        if bot_id_meta:
            busy_bots_info = _get_busy_bots_info(exclude_task_id=task_id)
            logger.info(
                f"[봇 충돌 검사] bot={bot_id_meta}, team={team_id}, " f"busy_bots={list(busy_bots_info.keys())}"
            )
            if bot_id_meta in busy_bots_info:
                conflict = busy_bots_info[bot_id_meta]
                conflict_task_id = conflict["task_id"]
                conflict_team_id = conflict["team_id"]
                # 동일 팀의 동일 태스크가 아닌 경우에만 충돌로 판단
                if conflict_task_id != task_id and conflict_team_id != team_id:
                    if not force:
                        _cleanup_task(timer_task_id)
                        available = _get_available_bots_with_teams(busy_bots_info)
                        available_str = ", ".join(f"{a['bot_id']}({a['default_team']})" for a in available)
                        return {
                            "status": "error",
                            "message": (
                                f"봇 {bot_id_meta}가 {conflict_team_id} 작업({conflict_task_id})에 "
                                f"점유 중입니다. "
                                + (f"가용 대안: {available_str}" if available else "모든 봇이 작업 중입니다.")
                            ),
                            "available_bots": available,
                        }
                    logger.warning(
                        f"[force] 봇 {bot_id_meta}가 {conflict_team_id} 작업({conflict_task_id})에 "
                        f"점유 중이지만 --force로 강제 진행."
                    )
            else:
                logger.info(f"[봇 충돌 검사] 통과: bot={bot_id_meta} 가용")

    # role/bot 메타데이터 패치
    if bot_id_meta:
        _patch_timer_metadata(
            timer_task_id,
            role=role,
            bot=bot_id_meta,
            model=_recommended_model or "default",
            task_file=f"memory/tasks/{task_id}.md",
        )

    # ── 작업 3문서 자동 생성 (Lv.3+) ──────────────────────────────
    _level_to_int_docs = {"normal": 2, "critical": 3, "security": 4}
    _docs_level = _level_to_int_docs.get(level, 2)
    if _docs_level >= 3:
        _docs_path = _create_task_docs(task_id, _docs_level)
        if _docs_path:
            logger.info(f"[3docs] 작업 3문서 생성 완료: {_docs_path}")

    # ── Lv.4 Agent 미팅 만장일치 검증 (task-2082) ──────────────────
    _meeting_warning = _check_agent_meeting(task_id, task_desc, level, skip_meeting)
    if _meeting_warning:
        prompt += f"\n\n## ⚠️ Agent 미팅 경고\n{_meeting_warning}\n"

    # ── Lv.3+ brainstorming 검증 (task-2083) ──────────────────────
    _check_brainstorming_gate(task_id, task_desc, level, skip_brainstorming)

    # ── critical/security 레벨: Opus 자동 승격 ──────────────────
    _opus_upgraded = False
    _opus_original_model = None
    _opus_key_hash = key  # 이 시점에서 key는 봇의 key_hash
    if level in ("critical", "security") and _opus_key_hash:
        _opus_original_model = _set_bot_model(_opus_key_hash, "claude-opus-4-6")
        if _opus_original_model and _opus_original_model != "claude-opus-4-6":
            _opus_upgraded = True
            logger.info(
                f"[opus-upgrade] level={level}, 봇 모델 Opus 승격: " f"{_opus_original_model} → claude-opus-4-6"
            )

    _dispatch_delay = 10  # cron이 자체적으로 세션을 시작하므로 고정 딜레이

    # cokacdir --cron으로 독립 세션 예약
    cmd = [
        "cokacdir",
        "--cron",
        prompt,
        "--at",
        get_dispatch_time(_dispatch_delay),
        "--chat",
        CHAT_ID,
        "--key",
        key,
        "--once",
    ]

    try:
        result = subprocess.run(cmd, capture_output=True, text=True, timeout=60)
    except subprocess.TimeoutExpired:
        logger.error(f"cokacdir 호출 타임아웃 (60초 초과)")
        _cleanup_task(timer_task_id)
        return {"status": "error", "message": "cokacdir 호출 타임아웃 (60초 초과)"}

    if result.returncode == 0:
        try:
            response = json.loads(result.stdout)
        except json.JSONDecodeError:
            response = {"raw": result.stdout.strip()}

        # 봇 상태를 "processing"으로 설정
        from utils.bot_activity import set_bot_status

        set_bot_status(team_id, "processing")
        consistency = _validate_model_consistency(team_id)
        logger.info(
            f"[모델 검증 결과] {team_id}: consistent={consistency['consistent']}, "
            f"org={consistency['org_model'] or 'N/A'}, bot={consistency['bot_model'] or 'N/A'}"
        )
        _sync_bot_settings()

        # Opus 승격 후 모델 복원 예약
        if _opus_upgraded and _opus_original_model:
            _schedule_model_restore(_opus_key_hash, _opus_original_model, delay=30)

        # 완료 감지: 3-Layer Defense로 대체 (L1:finish-task.sh, L2:Hook, L3:systemd Timer)

        # daily 로그에 위임 기록 자동 추가 (redact 적용: 로깅용 민감 정보 마스킹)
        short_desc = task_desc[:60] + ("..." if len(task_desc) > 60 else "")
        if _REDACT_AVAILABLE:
            try:
                short_desc = _redact_text(short_desc)
            except Exception as _e:
                logger.debug(f"[redact] 마스킹 실패 (무시): {_e}")
        log_msg = f"[{task_id}] {lead['leader']}({team_id})에게 위임: {short_desc}"
        subprocess.run(
            ["python3", str(TASK_TIMER), "log", log_msg, "--type", "dispatch"],
            capture_output=True,
            text=True,
            timeout=30,
        )

        # chain_id가 있으면 chain.json에 task_id, dispatched_at, status 기록
        if chain_id:
            _update_chain_task(chain_id, team_id, task_id)

        # phases가 지정된 경우 chain_manager.py create 호출
        if phases is not None and _phases_chain_id is not None and _phases_base_num is not None:
            _tasks_json: list = []
            for _i in range(1, phases + 1):
                _t_id = f"task-{_phases_base_num}.{_i}"
                _task_file = f"memory/tasks/{_t_id}.md"  # noqa: F821
                _tasks_json.append(
                    {
                        "order": _i,
                        "task_file": _task_file,
                        "team": team_id,
                        "task_id": _t_id,
                    }
                )
            _chain_create_cmd = [
                "python3",
                str(WORKSPACE / "chain_manager.py"),
                "create",
                "--chain-id",
                _phases_chain_id,
                "--tasks",
                json.dumps(_tasks_json, ensure_ascii=False),
                "--original-task-file",
                task_file,  # 원본 지시서 파일 경로
            ]
            _chain_result = subprocess.run(_chain_create_cmd, capture_output=True, text=True, timeout=30)
            if _chain_result.returncode != 0:
                logger.warning(
                    f"chain_manager.py create 실패 (chain_id={_phases_chain_id}): " f"{_chain_result.stderr.strip()}"
                )
            else:
                logger.info(f"chain_manager.py create 완료: chain_id={_phases_chain_id}")

        _log_leader = lead["leader"]
        if _REDACT_AVAILABLE:
            try:
                _log_leader = _redact_text(_log_leader)
            except Exception:
                _log_leader = lead["leader"]
        logger.info(f"위임 완료: {task_id} → {_log_leader}")
        _result: dict = {
            "status": "dispatched",
            "task_id": task_id,
            "team": team_id,
            "lead": lead["leader"],
            "level": level,
            "description": task_desc,
            "message": f"{lead['leader']}에게 위임 완료. 즉시 독립 세션에서 작업 시작.",
            "cron_response": response,
        }
        if _phases_chain_id is not None:
            _result["chain_id"] = _phases_chain_id

        # watchdog용 메타데이터 패치: schedule_id, retry_count, max_retry, task_file
        _schedule_id = response.get("id") if isinstance(response, dict) else None
        if _schedule_id:
            _watchdog_meta = {
                "schedule_id": _schedule_id,
                "retry_count": 0,
                "max_retry": 2,
            }
            if task_file:
                _watchdog_meta["task_file"] = task_file
            _patch_timer_metadata(task_id, **_watchdog_meta)

        # Memory Check 발급 (MC)
        try:
            from utils.memory_check import issue_mc

            mc_result = issue_mc(task_id, task_desc)
            if mc_result:
                logger.info(f"[memory-check] MC 발급: {mc_result.get('mc_id')} for {task_id}")
                _result["mc_id"] = mc_result.get("mc_id")
        except Exception as _mc_err:
            logger.warning(f"[memory-check] MC 발급 실패 (무시): {_mc_err}")

        return _result
    else:
        logger.error(f"위임 실패: {result.stderr.strip()}")
        # Opus 승격 후 실패 시 즉시 복원
        if _opus_upgraded and _opus_original_model:
            _set_bot_model(_opus_key_hash, _opus_original_model)
            logger.info(f"[opus-upgrade] 디스패치 실패로 모델 즉시 복원: {_opus_key_hash}")
        # reserved/running 엔트리 정리 (cokacdir --cron 실패 시 orphan 방지)
        _cleanup_task(timer_task_id)
        return {"status": "error", "message": result.stderr.strip(), "command": " ".join(cmd[:4] + ["...(생략)"])}


def main() -> None:
    # .env.keys 자동 로드 (환경변수 누락 방지)
    from utils.env_loader import load_env_keys

    load_env_keys()

    parser = argparse.ArgumentParser(description="작업 위임 디스패처")

    # --check-sessions: 독립 명령 (다른 인자 없이 단독 실행)
    parser.add_argument(
        "--check-sessions",
        action="store_true",
        default=False,
        help="모든 running 세션의 토큰 사용량 체크",
    )

    team_or_composite = parser.add_mutually_exclusive_group(required=False)
    team_or_composite.add_argument(
        "--team",
        choices=[
            "dev1-team",
            "dev2-team",
            "dev3-team",
            "dev4-team",
            "dev5-team",
            "dev6-team",
            "dev7-team",
            "dev8-team",
            "marketing",
            "consulting",
            "publishing",
            "design",
            "content",
        ],
        help="위임할 팀",
    )
    team_or_composite.add_argument(
        "--composite",
        help="쉼표 구분 논리적 팀 ID 목록 (2~3개, 예: marketing,design)",
    )

    # --task와 --task-file 중 하나 필수 (상호 배타)
    task_group = parser.add_mutually_exclusive_group(required=False)
    task_group.add_argument("--task", help="작업 설명 (짧은 한 줄 용도)")
    task_group.add_argument("--task-file", help="작업 설명 파일 경로 (권장: 긴 내용은 반드시 파일로)")

    parser.add_argument(
        "--level",
        default="normal",
        choices=["normal", "critical", "security"],
        help="검증 레벨 (normal/critical/security)",
    )
    parser.add_argument(
        "--type",
        default="coding",
        choices=["coding", "research", "check"],
        help="작업 유형 - coding: QC 검증 포함, research/check: QC 검증 제외 (기본: coding)",
    )
    parser.add_argument("--session", default=None, help="아누 세션 ID (followup을 현재 대화에서 실행)")
    parser.add_argument("--project", default=None, help="프로젝트 ID (projects/ 하위 디렉토리명)")
    parser.add_argument("--chain", default=None, help="체인 ID (chain.py 연동)")
    parser.add_argument(
        "--refresh-map",
        action=argparse.BooleanOptionalAction,
        default=True,
        help="프로젝트 맵 자동 갱신 (24시간 이상 오래된 경우, 기본: 자동)",
    )
    parser.add_argument("--task-id", default=None, help="태스크 ID 직접 지정 (미지정 시 자동 생성)")
    parser.add_argument("--resume", default=None, help="재시도할 base task ID (예: task-2133). 자동 채번 후 기존 task 파일 재활용.")
    parser.add_argument(
        "--phases",
        type=int,
        default=None,
        help="한정승인(scoped delegation) Phase 수. 지정 시 chain_manager.py create로 체인 자동 생성.",
    )
    parser.add_argument(
        "--force",
        action="store_true",
        default=False,
        help="동일 팀에 running 태스크가 있어도 강제로 dispatch 허용 (기본: 거부)",
    )
    parser.add_argument(
        "--resume-from",
        default=None,
        dest="resume_from",
        help="이전 세션 요약 파일 경로. 지정 시 task_desc 앞에 요약을 prepend하여 새 세션 시작.",
    )
    parser.add_argument(
        "--workflow",
        choices=["image-qc-gate"],
        default=None,
        help="워크플로우 적용 (image-qc-gate: 이미지 QC 게이트 5Phase 자동 적용)",
    )
    parser.add_argument(
        "--skip-qc-gate",
        action="store_true",
        default=False,
        help="이미지/광고 작업의 QC 게이트를 의도적으로 스킵 (제이회장님 승인 필수)",
    )
    parser.add_argument(
        "--skip-meeting",
        action="store_true",
        default=False,
        help="Lv.4 Agent 미팅 체크를 의도적으로 스킵 (로그에 기록됨)",
    )
    parser.add_argument(
        "--agent-type",
        choices=["read", "write"],
        default="write",
        help="에이전트 유형: read(읽기 전용, worktree 미생성) | write(쓰기, worktree 생성). 기본값: write",
    )
    parser.add_argument(
        "--model",
        default=None,
        help="모델 강제 지정 (예: claude-opus-4-6). 지정 시 해당 모델의 봇만 선택.",
    )
    parser.add_argument(
        "--override-routing",
        action="store_true",
        default=False,
        dest="override_routing",
        help="라우팅 경고를 무시하고 지정 팀으로 강제 위임",
    )
    parser.add_argument(
        "--batch-id",
        default=None,
        dest="batch_id",
        help="배치 ID. 병렬 위임 시 동일 batch_id를 부여하여 전팀 완료 추적 가능.",
    )
    parser.add_argument(
        "--skip-brainstorming",
        action="store_true",
        default=False,
        dest="skip_brainstorming",
        help="Lv.3+ UX 작업의 brainstorming 사전 실행 체크를 스킵",
    )
    parser.add_argument(
        "--prd",
        default=None,
        help="PRD 파일 경로. Phase별 task 파일 자동 생성 (위임 없음, 파일 생성만)",
    )

    args = parser.parse_args()

    # --check-sessions: 독립 명령 (다른 인자 없이 단독 실행)
    if args.check_sessions:
        result = check_sessions()
        print(json.dumps(result, ensure_ascii=False, indent=2))
        return

    # --check-sessions이 없으면 --team 또는 --composite가 필수
    if not args.team and not args.composite:
        parser.error("--check-sessions이 없으면 --team 또는 --composite이 필수입니다.")

    # --prd: PRD 자동 분해 (독립 명령)
    if getattr(args, "prd", None):
        if not args.team:
            parser.error("--prd 사용 시 --team이 필수입니다.")
        result = handle_prd(args.prd, args.team)
        print(json.dumps(result, ensure_ascii=False, indent=2))
        return

    # --check-sessions이 없으면 --task 또는 --task-file이 필수 (--resume은 예외)
    if not args.task and not args.task_file and not getattr(args, "prd", None) and not args.resume:
        parser.error("--check-sessions이 없으면 --task, --task-file, --prd, 또는 --resume이 필수입니다.")

    # Lv.0 가드: 마이크로 수정은 dispatch 불필요 (Task tool haiku로 직접 실행)
    task_content = ""
    if args.task:
        task_content = args.task
    elif args.task_file:
        try:
            task_content = Path(args.task_file).read_text(encoding="utf-8")
        except Exception:
            pass
    if "레벨: Lv.0" in task_content or "## 레벨: Lv.0" in task_content:
        logger.warning("⚠️ Lv.0 마이크로 수정은 dispatch 불필요! 아누가 Task tool (haiku)로 직접 실행하세요.")
        print(
            json.dumps(
                {"status": "warning", "message": "Lv.0은 dispatch 불필요. Task tool (haiku)로 직접 실행 권장."},
                ensure_ascii=False,
            )
        )

    # task_desc 결정: --task-file이면 파일 읽기, --task면 직접 사용
    if args.task_file:
        task_file_path = Path(args.task_file)
        if not task_file_path.exists():
            print(
                json.dumps(
                    {"status": "error", "message": f"작업 설명 파일이 존재하지 않습니다: {args.task_file}"},
                    ensure_ascii=False,
                )
            )
            sys.exit(1)
        task_desc = task_file_path.read_text(encoding="utf-8").strip()
        if not task_desc:
            print(
                json.dumps(
                    {"status": "error", "message": f"작업 설명 파일이 비어있습니다: {args.task_file}"},
                    ensure_ascii=False,
                )
            )
            sys.exit(1)
    else:
        task_desc = args.task or ""
        # 긴 --task 직접 전달 시 경고 (100자 초과)
        if len(task_desc) > 100:
            logger.warning(f"--task 직접 전달 {len(task_desc)}자 — 긴 내용은 --task-file 사용을 권장합니다")

    # --workflow 처리: 워크플로우 프롬프트를 task_desc 앞에 prepend
    if args.workflow == "image-qc-gate":
        try:
            from prompts.image_workflow import build_workflow_overview_prompt

            workflow_prompt = build_workflow_overview_prompt(
                task_id=args.task_id or "auto",
            )
            task_desc = f"{workflow_prompt}\n\n---\n\n{task_desc}"
            # image-qc-gate 워크플로우는 opus 강제
            if not args.model:
                args.model = "claude-opus-4-6"
                logger.info("[workflow] image-qc-gate: opus 모델 강제 적용")
        except ImportError as e:
            print(
                json.dumps(
                    {"status": "error", "message": f"워크플로우 모듈 import 실패: {e}"},
                    ensure_ascii=False,
                )
            )
            sys.exit(1)

    # 이미지/광고 작업인데 --workflow 미적용 시 차단 (강제 적용)
    if args.workflow is None:
        _image_keywords = ["이미지", "광고", "배너", "디자인", "banner", "image"]
        if any(kw in task_desc for kw in _image_keywords):
            if getattr(args, "skip_qc_gate", False):
                logger.warning("⚠️ --skip-qc-gate로 이미지 QC 게이트 우회. 제이회장님 승인 필수.")
            else:
                logger.error(
                    "❌ 이미지/광고 작업에 --workflow image-qc-gate가 필수입니다. "
                    "의도적 스킵 시 --skip-qc-gate 플래그를 추가하세요."
                )
                sys.exit(1)

    # --resume-from 파일 존재 검증 (CLI 조기 에러 출력)
    if args.resume_from is not None:
        resume_path = Path(args.resume_from)
        if not resume_path.exists():
            print(
                json.dumps(
                    {
                        "status": "error",
                        "message": f"resume_from 파일이 존재하지 않습니다: {args.resume_from}",
                    },
                    ensure_ascii=False,
                )
            )
            sys.exit(1)

    # composite 모드 처리
    composite_teams_list = None
    if args.composite:
        try:
            composite_teams_list = _validate_composite_teams(args.composite)
        except ValueError as e:
            print(json.dumps({"status": "error", "message": str(e)}, ensure_ascii=False))
            sys.exit(1)

    # --resume 처리: 재시도 자동 채번
    if args.resume:
        # --resume과 --task-id 동시 사용 금지
        if args.task_id:
            print(json.dumps({"status": "error", "message": "--resume과 --task-id는 동시 사용 불가"}, ensure_ascii=False))
            sys.exit(1)
        # --team 필수
        if not args.team:
            print(json.dumps({"status": "error", "message": "--resume 사용 시 --team이 필수입니다."}, ensure_ascii=False))
            sys.exit(1)
        # task_desc가 없으면 빈 문자열 사용 (task 파일에서 가져옴)
        if not args.task and not args.task_file:
            # 기존 task 파일의 내용을 task_desc로 사용
            base_clean = re.sub(r'\+\d+$', '', args.resume)
            base_task_file = Path(WORKSPACE) / "memory" / "tasks" / f"{base_clean}.md"
            if base_task_file.exists():
                task_desc = base_task_file.read_text(encoding="utf-8").strip()
            else:
                print(json.dumps({"status": "error", "message": f"base task 파일이 없습니다: {base_task_file}"}, ensure_ascii=False))
                sys.exit(1)

        resolve_result = _resolve_resume(args.resume, args.team, task_desc if 'task_desc' in dir() else "", force=args.force)
        if resolve_result["status"] == "error":
            print(json.dumps(resolve_result, ensure_ascii=False))
            sys.exit(1)

        args.task_id = resolve_result["new_task_id"]
        logger.info(f"[resume] {args.resume} → {args.task_id} (retry #{resolve_result['retry_count']})")

    if args.task_id:
        TASK_ID_V2_PATTERN = re.compile(r"^task-\d+(_\d+\.\d+)?(_[a-z])?(\+\d+)?$")
        if not TASK_ID_V2_PATTERN.match(args.task_id):
            logger.warning(
                "[task-id-format] --task-id '%s'가 포맷 v2 규칙에 맞지 않습니다. 패턴: %s",
                args.task_id,
                TASK_ID_V2_PATTERN.pattern,
            )

    result = dispatch(
        team_id=args.team,
        task_desc=task_desc,
        level=args.level,
        composite_teams=composite_teams_list,
        session_id=args.session,
        project_id=args.project,
        chain_id=args.chain,
        refresh_map=args.refresh_map,
        task_type=args.type,
        task_id=args.task_id,
        phases=args.phases,
        force=args.force,
        resume_from=args.resume_from,
        agent_type=args.agent_type,
        model=args.model,
        override_routing=args.override_routing,
        batch_id=args.batch_id,
        skip_meeting=args.skip_meeting,
        skip_brainstorming=args.skip_brainstorming,
    )
    print(json.dumps(result, ensure_ascii=False, indent=2))


if __name__ == "__main__":
    main()
