"""scripts/safe_cron_dispatch.py — task-2526 §3 wrapper 본체.

회장 §본질 (2026-05-10 PR #74 재위임 오류):
  독립 bot task cron에 ``--session`` 옵션이 잘못 붙어 봇이 아니라 **아누 자기 세션이
  resume된 사고**가 발생함 (cron 5C9995CCB → PID 448820 = 아누 자기 세션 resume → kill).
  동일 사고가 다시 발생하면 안 된다.

  이 wrapper는 cokacdir cron 명령을 직접 조립하는 모든 호출자가 **반드시 거쳐야 하는**
  preflight gate다. 잘못된 조합 (independent_task + ``--session``, merge_task + ``--session``,
  bot_task + bot_key 누락, owner_pat fallback path) 을 **실행 전에** 차단한다.

회장 §명시 5 차단 조건:
  1. ``independent_task`` + ``--session`` 존재 → BLOCK
  2. ``merge_task`` + ``--session`` 존재 → BLOCK
  3. ``bot_task`` 인데 bot_key 없이 merge/PR 작업 → BLOCK
  4. target bot과 session owner 불일치 → BLOCK
  5. owner_pat fallback 가능성 (gh pr merge 명령에 GH_TOKEN=$BOT_GITHUB_TOKEN 미설정) → BLOCK

회장 §명시 1 ALLOW 예외:
  - ``followup_readonly`` + ``--session`` → 같은 chat 아누 session resume 허용 (오직 이 케이스만)

회장 §금지:
  - ❌ dispatch.py 본체 수정
  - ❌ cokacdir CLI 본체 수정
  - ❌ 직접 cokacdir cron 명령 조립 (외부 호출자도 본 wrapper만 사용)
  - ❌ admin override / owner_pat fallback 정당화
  - ❌ Critical 7종 외 회장 보고 (CRON_TARGETING_GUARD_BLOCKED는 Critical enum이 아니라
    audit JSONL용 짧은 운영 신호 — 별도 chairman 메시지는 호출자 책임)
  - ❌ token / raw key / session secret 원문 기록

API:
  result = safe_cron_dispatch(
      prompt="...",
      schedule="0 9 * * *",
      chat="6937032012",
      target_bot_key="0b94683120a691cf",
      task_kind="bot_task",
      session_id=None,
  )

  if result.status == DispatchStatus.BLOCKED:
      # blocked_reason / audit_record 활용
      ...
  else:
      # result.command_argv 를 caller가 subprocess로 실행하거나
      # safe_cron_dispatch_run() 헬퍼로 실행 (선택)
      ...
"""
from __future__ import annotations

import argparse
import json
import re
import shlex
import subprocess
import sys
from dataclasses import asdict, dataclass
from pathlib import Path
from typing import Callable, List, Literal, Optional, Sequence, Tuple

# worktree root → sys.path (utils import용)
_WORKTREE_ROOT = Path(__file__).resolve().parent.parent
if str(_WORKTREE_ROOT) not in sys.path:
    sys.path.insert(0, str(_WORKTREE_ROOT))

from utils.cron_targeting_audit import (  # noqa: E402  # pyright: ignore[reportMissingImports]
    ACTOR_ANU_SESSION,
    ACTOR_BOT_SESSION,
    ALLOWED_TASK_KINDS,
    BlockedReason,
    CronTargetingAuditRecord,
    TASK_KIND_BOT,
    TASK_KIND_FOLLOWUP_RO,
    TASK_KIND_INDEPENDENT,
    TASK_KIND_MERGE,
    append_audit_jsonl,
    build_audit_record,
    hash_bot_key,
)
from utils.cron_timers_upsert import (  # noqa: E402  # pyright: ignore[reportMissingImports]
    DEFAULT_TIMERS_PATH,
    extract_task_id_from_prompt,
    extract_team_id_from_task_md,
    extract_team_id_from_text,
    upsert_cron_dispatch,
)

__all__ = [
    "DEFAULT_COKACDIR_CLI",
    "DispatchStatus",
    "CronDispatchResult",
    "safe_cron_dispatch",
    "format_chairman_block_notice",
    # task-2529 — auto finalize chain default
    "AUTO_FINALIZE_FOOTER_MARKER",
    "AUTO_FINALIZE_OPT_OUT_TOKENS",
    "auto_inject_finalize_footer",
    "should_auto_inject_finalize_footer",
    "is_finalize_opt_out",
    # task-2533 — cron --session timers upsert hook
    "parse_schedule_id_from_cokacdir_stdout",
    "run_cron_with_timers_upsert",
]


# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------

DEFAULT_COKACDIR_CLI = "/usr/local/bin/cokacdir"

# Recognised "bot session owner" mapping — chair_chat → 아누(anu) 자기 세션
# 본 사건 fixture: chat=6937032012 (회장 chat) ↔ session=anu_session
_CHAIR_CHATS: Tuple[str, ...] = ("6937032012",)


# ---------------------------------------------------------------------------
# task-2529 — auto finalize chain default
# ---------------------------------------------------------------------------
#
# 회장 §명시 (2026-05-10):
#   "명시적 opt-out이 없는 모든 code task는 자동 finalize chain을 기본값으로 가진다."
#
# 본 wrapper는 cron 발사 직전에 prompt를 검사해서:
#   1) opt-out 토큰 (read_only / analysis_only / report_only / finalize_policy:no_pr) 미포함
#   2) task_kind in (independent_task, merge_task, bot_task)
#   3) prompt에 이미 12단계 footer가 박혀있지 않음
#   세 조건 모두 만족 시 prompt 끝에 자동 finalize 12단계 footer를 삽입한다.
#
# 핵심 원칙:
#   - dispatch.py는 절대 수정하지 않는다 (회장 §외과적 변경 only)
#   - opt-out이 명시된 prompt는 footer 삽입하지 않는다 (회장 §명시 4 필드)
#   - footer가 이미 있으면 중복 삽입하지 않는다 (멱등성)
#   - human_response / followup_readonly task는 자동 footer 삽입 대상이 아님

AUTO_FINALIZE_FOOTER_MARKER = "[AUTO_FINALIZE_CHAIN_FOOTER_v1]"

# 회장 §명시 4 opt-out 토큰. 어느 하나라도 prompt에 등장하면 footer 삽입 생략.
AUTO_FINALIZE_OPT_OUT_TOKENS: Tuple[str, ...] = (
    "finalize_policy: no_pr",
    "finalize_policy:no_pr",
    "read_only: true",
    "read_only:true",
    "analysis_only: true",
    "analysis_only:true",
    "report_only: true",
    "report_only:true",
)

# 자동 footer 삽입 대상 task_kind 집합.
_AUTO_FINALIZE_TASK_KINDS: Tuple[str, ...] = (
    TASK_KIND_INDEPENDENT,
    TASK_KIND_MERGE,
    TASK_KIND_BOT,
)

# 회장 §명시 12단계 자동 finalize chain. 본문은 한국어 — 봇 prompt에 그대로 노출.
_AUTO_FINALIZE_FOOTER_BODY = (
    "\n\n"
    f"{AUTO_FINALIZE_FOOTER_MARKER}\n"
    "## 자동 finalize chain (회장 §명시 — code task default, opt-out 없으면 자동 진입)\n"
    "본질 작업과 자체 검증이 PASS하면 별도 지시 없이 다음 12단계를 그대로 수행한다.\n"
    "read_only / analysis_only / report_only / finalize_policy:no_pr 명시가 없으면 이 chain은 default다.\n\n"
    "1. commit (관련 파일만 명시 add, 의미 있는 메시지)\n"
    "2. push (task 전용 브랜치 — main/master push 금지)\n"
    "3. PR 생성 (gh pr create — 본문에 expected_files / 회귀 / 영향 범위 포함)\n"
    "4. CI 통과 확인 (gh pr checks — FAILURE 시 재시도/수정 후 재push)\n"
    "5. Gemini 리뷰 통과 (high/critical 응답 → fix 후 push)\n"
    "6. auto_gemini_triage 응답 (gemini high/critical은 코드로 수용)\n"
    "7. bot identity merge (GH_TOKEN=$BOT_GITHUB_TOKEN gh pr merge — owner_pat fallback 금지)\n"
    "8. post_merge_smoke (utils/post_merge_smoke_runner — FAIL 시 Critical 7종 ESCALATE)\n"
    "9. reconcile (utils/lifecycle_reconciliation_manager --reconcile --apply)\n"
    "10. .done.acked + .merge-done 마커 검증 (manual forgery 금지 — evidence 기반만)\n"
    "11. timer end + audit JSONL 1줄 추가\n"
    "12. 한 줄 결과 보고 (성공: <task> AUTO_FINALIZE_PASS — mergeCommit <sha>, mergedBy=app/jeon-jonghyuk-taskctl-bot)\n"
)


def is_finalize_opt_out(prompt: str) -> bool:
    """opt-out 토큰이 prompt에 포함되어 있는지 검사.

    회장 §명시 4 토큰 (대소문자 무시):
      - finalize_policy: no_pr
      - read_only: true
      - analysis_only: true
      - report_only: true

    공백 변형 (`field: value` / `field:value`)도 모두 인식.
    """
    if not prompt:
        return False
    lowered = prompt.lower()
    return any(tok.lower() in lowered for tok in AUTO_FINALIZE_OPT_OUT_TOKENS)


def should_auto_inject_finalize_footer(task_kind: str, prompt: str) -> bool:
    """자동 finalize footer 삽입 여부 결정.

    True 조건 (모두 만족):
      - task_kind ∈ {independent_task, merge_task, bot_task}
      - prompt에 opt-out 토큰 부재
      - prompt에 footer marker 부재 (중복 방지)
    """
    if task_kind not in _AUTO_FINALIZE_TASK_KINDS:
        return False
    if AUTO_FINALIZE_FOOTER_MARKER in (prompt or ""):
        return False
    if is_finalize_opt_out(prompt):
        return False
    return True


def auto_inject_finalize_footer(task_kind: str, prompt: str) -> str:
    """조건 충족 시 prompt 끝에 12단계 footer 추가, 그 외 그대로 반환.

    멱등적: 동일 prompt를 두 번 inject해도 marker 검사로 중복 삽입 안 됨.
    """
    if not should_auto_inject_finalize_footer(task_kind, prompt):
        return prompt
    return f"{prompt.rstrip()}{_AUTO_FINALIZE_FOOTER_BODY}"


# ---------------------------------------------------------------------------
# Result types
# ---------------------------------------------------------------------------

class DispatchStatus:
    ALLOWED = "ALLOWED"
    BLOCKED = "BLOCKED"


TaskKindLiteral = Literal[
    "independent_task",
    "merge_task",
    "bot_task",
    "followup_readonly",
    "human_response",
]


@dataclass(frozen=True)
class CronDispatchResult:
    """preflight 결과. caller는 status 검사 후 분기.

    BLOCKED 상태에서는 command_argv가 비어있고 blocked_reason이 채워진다.
    ALLOWED 상태에서는 command_argv (cokacdir cron CLI 인자)가 채워진다.
    audit_record는 항상 채워진다 (성공/차단 모두 audit JSONL에 기록).
    """

    status: str
    blocked_reason: Optional[str]
    audit_record: CronTargetingAuditRecord
    command_argv: Tuple[str, ...]
    chairman_notice: Optional[str]


# ---------------------------------------------------------------------------
# Static merge / owner_pat detection (회장 §2 #5 — owner_pat fallback path)
# ---------------------------------------------------------------------------

_GH_PR_MERGE_RE = re.compile(r"\bgh\s+pr\s+merge\b")
_BOT_GH_TOKEN_INJECTION_RE = re.compile(
    r"\bGH_TOKEN\s*=\s*\$?\{?\s*BOT_GITHUB_TOKEN\b"
)


def _prompt_uses_gh_pr_merge(prompt: str) -> bool:
    return bool(_GH_PR_MERGE_RE.search(prompt))


def _prompt_injects_bot_gh_token(prompt: str) -> bool:
    return bool(_BOT_GH_TOKEN_INJECTION_RE.search(prompt))


def _looks_like_owner_pat_fallback(task_kind: str, prompt: str) -> bool:
    """``gh pr merge`` 등장 + ``GH_TOKEN=$BOT_GITHUB_TOKEN`` 미설정 → True.

    merge_task / bot_task에만 적용. independent / followup은 무관.
    """
    if task_kind not in (TASK_KIND_MERGE, TASK_KIND_BOT):
        return False
    if not _prompt_uses_gh_pr_merge(prompt):
        return False
    return not _prompt_injects_bot_gh_token(prompt)


# ---------------------------------------------------------------------------
# Actor inference (회장 §2 — actor_expected vs session owner)
# ---------------------------------------------------------------------------

def _expected_actor(task_kind: str) -> str:
    """task_kind로 expected actor 추론.

    - ``followup_readonly`` → ``anu_session`` (예외 케이스, 같은 chat resume 허용)
    - ``human_response`` → ``anu_session``
    - 그 외 (independent / merge / bot) → ``bot_session``
    """
    if task_kind in (TASK_KIND_FOLLOWUP_RO, "human_response"):
        return ACTOR_ANU_SESSION
    return ACTOR_BOT_SESSION


def _session_owner_is_anu(session_id: Optional[str], chat: str) -> bool:
    """현재는 chat이 회장 chat이고 session_id가 채워졌으면 아누 self-session으로 추정."""
    if not session_id:
        return False
    return chat in _CHAIR_CHATS


# ---------------------------------------------------------------------------
# Preflight (회장 §2)
# ---------------------------------------------------------------------------

def _run_preflight(
    *,
    prompt: str,
    chat: str,
    target_bot_key: Optional[str],
    task_kind: str,
    session_id: Optional[str],
) -> Tuple[Optional[str], bool]:
    """preflight 검사 — (blocked_reason, session_id_allowed) 반환.

    blocked_reason가 None이면 ALLOW.
    """
    if task_kind not in ALLOWED_TASK_KINDS:
        raise ValueError(
            f"task_kind must be one of {ALLOWED_TASK_KINDS}, got {task_kind!r}"
        )

    has_session = bool(session_id)
    session_allowed = False

    # 1. independent_task + --session → BLOCK
    if task_kind == TASK_KIND_INDEPENDENT and has_session:
        return BlockedReason.INDEPENDENT_TASK_WITH_SESSION, False

    # 2. merge_task + --session → BLOCK
    if task_kind == TASK_KIND_MERGE and has_session:
        return BlockedReason.MERGE_TASK_WITH_SESSION, False

    # 3. followup_readonly + --session → ALLOW (같은 chat 가정)
    if task_kind == TASK_KIND_FOLLOWUP_RO and has_session:
        # 회장 §followup cron 정책 — read-only이고 session owner = 회장 chat이면 ALLOW
        if _session_owner_is_anu(session_id, chat):
            session_allowed = True
        else:
            return BlockedReason.TARGET_SESSION_OWNER_MISMATCH, False

    # 4. bot_task / merge_task without bot_key → BLOCK
    if task_kind in (TASK_KIND_BOT, TASK_KIND_MERGE):
        if not target_bot_key:
            return BlockedReason.BOT_KEY_MISSING_FOR_BOT_TASK, False

    # 5. owner_pat fallback path (gh pr merge 동반 + bot token injection 누락) → BLOCK
    if _looks_like_owner_pat_fallback(task_kind, prompt):
        return BlockedReason.OWNER_PAT_FALLBACK_DETECTED, False

    # 6. (보강) bot_session owner mismatch — bot_task 인데 session_id가 회장 chat 아누 세션
    if task_kind in (TASK_KIND_INDEPENDENT, TASK_KIND_BOT, TASK_KIND_MERGE):
        if has_session and _session_owner_is_anu(session_id, chat):
            # 이미 1/2 BLOCK이 트리거되지만, bot_task에서 #4 통과했어도 여기서 추가 차단.
            return BlockedReason.TARGET_SESSION_OWNER_MISMATCH, False

    return None, session_allowed


# ---------------------------------------------------------------------------
# Command builder
# ---------------------------------------------------------------------------

def _build_command_argv(
    *,
    prompt: str,
    schedule: str,
    chat: str,
    target_bot_key: Optional[str],
    session_id: Optional[str],
    session_allowed: bool,
    cli: str,
    once: bool,
) -> Tuple[str, ...]:
    """ALLOWED 시 cokacdir cron 인자 리스트 조립.

    BLOCKED인 경우 호출자가 이 함수를 부르지 않는다.
    """
    argv: List[str] = [cli, "--cron", prompt, "--at", schedule, "--chat", chat]
    if target_bot_key:
        argv.extend(["--key", target_bot_key])
    if session_id and session_allowed:
        argv.extend(["--session", session_id])
    if once:
        argv.append("--once")
    return tuple(argv)


def _build_command_preview_for_audit(argv: Tuple[str, ...]) -> str:
    """argv를 shell-quote된 단일 문자열로 결합 (sanitize 직전 형태)."""
    return " ".join(shlex.quote(a) for a in argv)


# ---------------------------------------------------------------------------
# Public API (회장 §3)
# ---------------------------------------------------------------------------

def safe_cron_dispatch(
    prompt: str,
    schedule: str,
    chat: str,
    target_bot_key: Optional[str],
    task_kind: TaskKindLiteral,
    session_id: Optional[str] = None,
    *,
    once: bool = False,
    audit_path: Optional[Path] = None,
    cli_path: str = DEFAULT_COKACDIR_CLI,
) -> CronDispatchResult:
    """preflight → BLOCK 또는 ALLOW.

    이 함수는 외부 부수효과 없음:
      - subprocess 실행 X (caller가 결정)
      - chairman 메시지 송신 X (caller 책임)
      - audit JSONL append O (성공/차단 모두 1건 기록)

    raw bot_key / raw session_id는 audit에 절대 기록되지 않는다 (해시/플래그만).
    """
    if not isinstance(prompt, str) or not prompt:
        raise ValueError("prompt must be a non-empty str")
    if not isinstance(schedule, str) or not schedule:
        raise ValueError("schedule must be a non-empty str")
    if not isinstance(chat, str) or not chat:
        raise ValueError("chat must be a non-empty str")
    if task_kind not in ALLOWED_TASK_KINDS:
        raise ValueError(
            f"task_kind must be one of {ALLOWED_TASK_KINDS}, got {task_kind!r}"
        )

    # task-2529 — auto finalize chain default.
    # 핵심 원칙: preflight는 사용자 원본 prompt에 대해 실행한다. 그 이유는 owner_pat
    # fallback detector(_looks_like_owner_pat_fallback)가 prompt 내 정적 패턴을 검사하는데,
    # footer에 `GH_TOKEN=$BOT_GITHUB_TOKEN gh pr merge` 같은 모범 패턴이 포함되어 있으면
    # 사용자가 원본에서 owner_pat path로 호출하려는 의도를 footer가 위장(masking)할 수 있기
    # 때문이다. 그래서 1) preflight는 원본에 대해 → 2) ALLOWED 시에만 footer 주입 → 3) 주입된
    # prompt로 cokacdir argv 조립이라는 순서를 강제한다.
    blocked_reason, session_allowed = _run_preflight(
        prompt=prompt,
        chat=chat,
        target_bot_key=target_bot_key,
        task_kind=task_kind,
        session_id=session_id,
    )

    # ALLOWED인 경우에만 footer 주입. BLOCKED는 dispatch 자체가 안 일어나므로 footer 의미 X.
    if blocked_reason is None:
        prompt = auto_inject_finalize_footer(task_kind, prompt)

    if blocked_reason is None:
        argv = _build_command_argv(
            prompt=prompt,
            schedule=schedule,
            chat=chat,
            target_bot_key=target_bot_key,
            session_id=session_id,
            session_allowed=session_allowed,
            cli=cli_path,
            once=once,
        )
    else:
        # BLOCKED 시에도 사용자가 시도한 입력 그대로 sanitized preview에 남긴다.
        # session_id가 있었다면 audit preview에 --session이 포함되어야
        # "어떤 위반이 차단되었는지" 추적 가능하다 (값은 sanitize 단계에서 redact).
        argv = _build_command_argv(
            prompt=prompt,
            schedule=schedule,
            chat=chat,
            target_bot_key=target_bot_key,
            session_id=session_id,
            session_allowed=True,
            cli=cli_path,
            once=once,
        )

    command_preview = _build_command_preview_for_audit(argv)
    actor_expected = _expected_actor(task_kind)
    _key_hash = hash_bot_key(target_bot_key)
    target_bot = (
        f"chat={chat}/key=<hash:{_key_hash[:8] if _key_hash else ''}…>"
        if target_bot_key
        else f"chat={chat}/no-key"
    )

    record = build_audit_record(
        cron_id=None,  # cron은 아직 발사되지 않았음
        target_bot=target_bot,
        bot_key=target_bot_key,
        session_id=session_id,
        session_id_allowed=session_allowed,
        task_kind=task_kind,
        actor_expected=actor_expected,
        actor_actual_if_known=None,
        command_preview=command_preview,
        blocked_reason=blocked_reason,
    )
    append_audit_jsonl(record, audit_path=audit_path)

    if blocked_reason is not None:
        return CronDispatchResult(
            status=DispatchStatus.BLOCKED,
            blocked_reason=blocked_reason,
            audit_record=record,
            command_argv=tuple(),
            chairman_notice=format_chairman_block_notice(
                task_kind=task_kind,
                blocked_reason=blocked_reason,
                target_bot=target_bot,
            ),
        )

    return CronDispatchResult(
        status=DispatchStatus.ALLOWED,
        blocked_reason=None,
        audit_record=record,
        command_argv=argv,
        chairman_notice=None,
    )


def format_chairman_block_notice(
    *,
    task_kind: str,
    blocked_reason: str,
    target_bot: str,
) -> str:
    """회장 chat 짧은 보고용 텍스트 (Critical 7종 X — 별도 운영 신호)."""
    return (
        f"CRON_TARGETING_GUARD_BLOCKED — "
        f"task_kind={task_kind} reason={blocked_reason} target={target_bot}"
    )


# ---------------------------------------------------------------------------
# task-2533 — cron 발사 hook (timers upsert / 신호등 sync fix A)
# ---------------------------------------------------------------------------
#
# 회장 §본질 (2026-05-10 신호등 sync fix A):
#   cron ``--session`` 발사 직후 ``memory/task-timers.json`` entry 가 자동 갱신되지 않아
#   대시보드 활성 봇 표시(신호등) 100% gap 발생. 본 hook 은 cron 발사가 **성공한 직후** 만
#   발동하며 (atomic), 동일 task 재발사 시에도 entry 중복 X (idempotent).
#
# 핵심 원칙:
#   - AUTO_FINALIZE chain footer 삽입과 별개. footer 삽입은 prompt 변형, 본 hook 은 timers 갱신.
#   - cron 발사 실패 (rc != 0) 시 hook 호출 X (atomic).
#   - schedule_id 는 cokacdir CLI 의 stdout JSON ``id`` 필드 에서 파싱 (raw token 아님).
#   - upsert 자체가 실패해도 dispatch 결과(rc)는 영향 X (운영 신호 stderr 1줄).

# cokacdir cron 등록 stdout 의 schedule_id 파싱.
# 정상 응답: ``{"status":"ok","id":"<schedule_id>","prompt":"...","schedule":"..."}``.
_COKACDIR_OK_STATUS = "ok"


def _try_parse_cokacdir_payload(candidate: str) -> Optional[str]:
    """JSON 후보 문자열에서 ``status=='ok'`` 인 schedule id 추출. 실패 시 None."""
    try:
        payload = json.loads(candidate)
    except (json.JSONDecodeError, ValueError):
        return None
    if not isinstance(payload, dict):
        return None
    if payload.get("status") != _COKACDIR_OK_STATUS:
        return None
    schedule_id = payload.get("id")
    if not isinstance(schedule_id, str) or not schedule_id:
        return None
    return schedule_id


def parse_schedule_id_from_cokacdir_stdout(stdout: str) -> Optional[str]:
    """cokacdir cron 등록 stdout 에서 ``id`` 필드를 파싱한다 (회귀 8 견고화).

    파싱 실패 / status != ``ok`` / id 누락 → None 반환 (caller 가 hook skip 결정).

    파싱 전략 (Gemini PR #82 review L577 medium 수용 — 견고화):
      1. 줄 단위 스캔 — 각 줄을 trim 후 ``{`` 시작 / ``}`` 종료 면 ``json.loads`` 시도
      2. 줄 합계 시도 (single-line) — ``"status": "ok"`` substring 포함 줄들만 후보
      3. 마지막 fallback — 첫 ``{`` ~ 마지막 ``}`` 한 덩어리 시도
    어떤 단계에서든 ``status='ok' + id`` 가 추출되면 즉시 반환.
    """
    if not stdout:
        return None
    text = stdout.strip()
    if not text:
        return None

    # 1) 줄 단위 — cokacdir CLI 는 보통 한 줄 JSON 으로 응답.
    for line in text.splitlines():
        line_s = line.strip()
        if not line_s.startswith("{") or not line_s.endswith("}"):
            continue
        if "\"status\"" not in line_s and "'status'" not in line_s:
            continue
        result = _try_parse_cokacdir_payload(line_s)
        if result:
            return result

    # 2) 마지막 fallback — 첫 ``{`` ~ 마지막 ``}`` (멀티라인 JSON 가능성).
    start = text.find("{")
    end = text.rfind("}")
    if start != -1 and end != -1 and end > start:
        candidate = text[start : end + 1]
        result = _try_parse_cokacdir_payload(candidate)
        if result:
            return result

    return None


def _resolve_team_id(
    *,
    explicit_team: Optional[str],
    task_id: Optional[str],
    prompt: str,
    tasks_dir: Optional[Path] = None,
) -> str:
    """team_id 결정 우선순위.

    1. ``--team`` 인자 (caller 명시) — 비어있지 않으면 즉시 채택
    2. task md 파일 (memory/tasks/<task_id>.md) 에서 ``devN-team`` 추출
    3. cron prompt 본문에서 ``devN-team`` 추출
    4. ``"unknown-team"``
    """
    if explicit_team:
        return explicit_team
    # _WORKTREE_ROOT 는 본 모듈 line 62 에서 정의됨 (sys.path insertion 용도).
    base_dir = tasks_dir if tasks_dir is not None else (_WORKTREE_ROOT / "memory" / "tasks")
    if task_id:
        candidate_md = base_dir / f"{task_id}.md"
        team_from_md = extract_team_id_from_task_md(candidate_md)
        if team_from_md != "unknown-team":
            return team_from_md
    # prompt body fallback — public helper 사용 (Gemini PR #82 L622 medium 수용).
    if prompt:
        team_from_prompt = extract_team_id_from_text(prompt)
        if team_from_prompt != "unknown-team":
            return team_from_prompt
    return "unknown-team"


def run_cron_with_timers_upsert(
    *,
    argv: Sequence[str],
    prompt: str,
    chat: str,
    explicit_team: Optional[str] = None,
    timers_path: Optional[Path] = None,
    tasks_dir: Optional[Path] = None,
    runner: Optional[Callable[[Sequence[str]], Tuple[int, str]]] = None,
) -> Tuple[int, str, Optional[dict]]:
    """ALLOWED command argv 를 실행하고, 성공 시 timers entry 를 upsert.

    Returns:
        ``(returncode, captured_stdout, upserted_entry_or_None)``

    동작:
      - rc != 0 → hook skip (atomic)
      - schedule_id 파싱 실패 → hook skip + stderr 1줄 운영 신호
      - upsert 자체 실패 → rc 보존 + stderr 1줄 (caller dispatch 결과 영향 X)
    """
    actual_runner = runner if runner is not None else _capture_runner
    rc, stdout = actual_runner(argv)

    if rc != 0:
        return rc, stdout, None

    schedule_id = parse_schedule_id_from_cokacdir_stdout(stdout)
    if not schedule_id:
        sys.stderr.write(
            "WARN: timers upsert skipped — schedule_id parse failed (cron rc=0 but no JSON id)\n"
        )
        return rc, stdout, None

    task_id = extract_task_id_from_prompt(prompt)
    if not task_id:
        sys.stderr.write(
            "WARN: timers upsert skipped — task_id not found in prompt\n"
        )
        return rc, stdout, None

    team_id = _resolve_team_id(
        explicit_team=explicit_team,
        task_id=task_id,
        prompt=prompt,
        tasks_dir=tasks_dir,
    )

    try:
        entry = upsert_cron_dispatch(
            task_id=task_id,
            team_id=team_id,
            schedule_id=schedule_id,
            cron_prompt=prompt,
            chat_id=chat,
            timers_path=timers_path or DEFAULT_TIMERS_PATH,
        )
    except (OSError, ValueError, json.JSONDecodeError) as exc:
        sys.stderr.write(f"WARN: timers upsert failed: {exc}\n")
        return rc, stdout, None

    return rc, stdout, entry


def _capture_runner(argv: Sequence[str]) -> Tuple[int, str]:
    """기본 subprocess runner — stdout 을 **실시간 스트리밍** 하면서 동시에 캡처.

    Gemini PR #82 review L694 medium 수용:
      ``capture_output=True`` 는 모든 stdout 을 끝까지 버퍼링하므로 사용자가 cron 발사
      진행 상황을 실시간으로 못 본다 (UX 퇴보). 본 함수는 ``Popen`` + 줄 단위 read 로
      tee 패턴을 구현해 사용자에게는 즉시 보여주면서 schedule_id 파싱용 buffer 도 채운다.

    stderr 는 그대로 부모 stderr 로 inherit (별도 캡처 X — 디버깅 정합).
    """
    proc = subprocess.Popen(
        list(argv),
        stdout=subprocess.PIPE,
        stderr=None,  # 부모 stderr inherit
        text=True,
        bufsize=1,  # line-buffered
    )
    captured: List[str] = []
    if proc.stdout is not None:
        for line in proc.stdout:
            sys.stdout.write(line)
            sys.stdout.flush()
            captured.append(line)
        proc.stdout.close()
    rc = proc.wait()
    return rc, "".join(captured)


# ---------------------------------------------------------------------------
# CLI surface — 직접 호출자도 본 wrapper만 사용 (cokacdir 직접 조립 금지 정합)
# ---------------------------------------------------------------------------

def _build_arg_parser() -> argparse.ArgumentParser:
    parser = argparse.ArgumentParser(
        prog="safe_cron_dispatch",
        description="task-2526 cron --session safety guard wrapper",
    )
    parser.add_argument("--prompt", required=True)
    parser.add_argument("--at", dest="schedule", required=True)
    parser.add_argument("--chat", required=True)
    parser.add_argument("--key", dest="target_bot_key", default=None)
    parser.add_argument(
        "--task-kind",
        required=True,
        choices=list(ALLOWED_TASK_KINDS),
    )
    parser.add_argument("--session", dest="session_id", default=None)
    parser.add_argument("--once", action="store_true")
    parser.add_argument(
        "--apply",
        action="store_true",
        help="ALLOWED 시 cokacdir 실행 (기본 dry-run: argv만 출력)",
    )
    parser.add_argument("--cli", default=DEFAULT_COKACDIR_CLI)
    parser.add_argument("--audit-path", default=None)
    # task-2533 — cron 발사 hook 옵션
    parser.add_argument(
        "--team",
        dest="team_id",
        default=None,
        help="(task-2533) timers upsert team_id 명시 (생략 시 task md 자동 추출)",
    )
    parser.add_argument(
        "--timers-path",
        default=None,
        help="(task-2533) timers JSON 경로 (기본: memory/task-timers.json)",
    )
    parser.add_argument(
        "--no-timers-upsert",
        action="store_true",
        help="(task-2533) cron 발사 후 timers upsert hook 비활성화",
    )
    return parser


def _cli_main(
    argv: Optional[Sequence[str]] = None,
    *,
    runner: Callable[[Sequence[str]], Tuple[int, str]] = _capture_runner,
) -> int:
    parser = _build_arg_parser()
    args = parser.parse_args(argv)

    result = safe_cron_dispatch(
        prompt=args.prompt,
        schedule=args.schedule,
        chat=args.chat,
        target_bot_key=args.target_bot_key,
        task_kind=args.task_kind,
        session_id=args.session_id,
        once=args.once,
        audit_path=Path(args.audit_path) if args.audit_path else None,
        cli_path=args.cli,
    )

    print(json.dumps({
        "status": result.status,
        "blocked_reason": result.blocked_reason,
        "command_argv": list(result.command_argv),
        "chairman_notice": result.chairman_notice,
        "audit": {
            **asdict(result.audit_record),
        },
    }, ensure_ascii=False))

    if result.status == DispatchStatus.BLOCKED:
        return 1

    if args.apply:
        if args.no_timers_upsert:
            rc, _ = runner(result.command_argv)
            return int(rc or 0)
        rc, _, _ = run_cron_with_timers_upsert(
            argv=result.command_argv,
            prompt=args.prompt,
            chat=args.chat,
            explicit_team=args.team_id,
            timers_path=Path(args.timers_path) if args.timers_path else None,
            runner=runner,
        )
        return int(rc or 0)

    return 0


if __name__ == "__main__":
    sys.exit(_cli_main())
