"""utils/lifecycle_reconciliation_manager.py — task-2518 P0.

회장 명시 요구사항:
  - bot session ↔ task lifecycle 결합 제거
  - GitHub/CI/smoke evidence를 source-of-truth로 사용하는 idempotent reconcile state machine
  - manual .done 위장 차단 (evidence 없으면 RuntimeError)
  - dry-run 기본 (--apply 없이 side effect 0)

7 Lifecycle States:
  RUNNING                 — task-timer running, 작업 진행 중
  PR_OPEN                 — PR 생성됨, 머지 대기
  MERGED_PENDING_RECONCILE — PR merged, finalize 누락 (회장 §1 사례)
  RECONCILING             — reconcile 진행 중
  FINALIZED               — .done.acked + .merge-done + timer end 모두 정상
  STUCK_NEEDS_RECONCILE   — stuck 자동 감지
  ESCALATED               — Critical 7종 발동

8 Stuck Cases (회장 §1~7 + Telegram cut-off):
  TIMER_RUNNING_BUT_PR_MERGED          — 회장 §2
  PR_MERGED_BUT_DONE_MISSING           — 회장 §1
  MERGE_COMMIT_BUT_MERGE_DONE_MISSING  — 회장 §1
  CI_PASS_BUT_NOT_FINALIZED
  TELEGRAM_REPLY_CUT_OFF               — cron history truncation
  BOT_SESSION_ENDED_BUT_TASK_OK        — 회장 §7
  FINISH_TASK_INTERRUPTED              — 회장 §3
  STALE_ESCALATE_MARKER                — 회장 §4

task-2521 §3 — bot session cancelled 격상 4종 (BOT_CANCELLED_*)
task-2529 §5 — AUTO_FINALIZE_CHAIN_MISSING 4종:
  CODE_DONE_BUT_NO_COMMIT              — 자체 검증 PASS 후 commit 단계 미진입
  COMMIT_DONE_BUT_NO_PR                — commit push 후 PR 생성 단계 미진입
  PR_OPEN_BUT_NO_MERGE_ATTEMPT         — PR open + CI SUCCESS 후 merge 시도 미발생
  SELF_VERIFIED_BUT_NOT_FINALIZED      — 본 사건 fixture (task-2524+1, dev5 사라스와티)

Evidence priority (회장 명시):
  PR state > mergeCommit > origin/main 포함 > CI > smoke > timer > file marker
"""
from __future__ import annotations

import argparse
import json
import logging
import subprocess
import sys
import uuid
from dataclasses import dataclass, field
from datetime import datetime, timezone
from enum import Enum
from pathlib import Path
from typing import Any, Callable, Optional

# CLI 직접 실행 시 패키지 루트를 sys.path에 추가
_HERE = Path(__file__).resolve().parent.parent  # utils/ → worktree root
if str(_HERE) not in sys.path:
    sys.path.insert(0, str(_HERE))

from utils.canonical_workspace_resolver import (  # noqa: E402  # pyright: ignore[reportMissingImports]
    CanonicalWorkspace,
    resolve_canonical_workspace,
)
from utils.automation_contracts import (  # noqa: E402  # pyright: ignore[reportMissingImports]
    AutomationDecision,
    CriticalEscalationType,
    EscalationPacket,
    SmokeResult,
)
# ★ task-2535 — schedule_id freshness validator (신호등 sync fix C, 회장 §명시 2026-05-10).
from utils.schedule_id_freshness import (  # noqa: E402  # pyright: ignore[reportMissingImports]
    SCHEDULE_FRESHNESS_THRESHOLD_MIN as _SCHEDULE_FRESHNESS_THRESHOLD_MIN,
)

logger = logging.getLogger(__name__)

# ---------------------------------------------------------------------------
# Type aliases
# ---------------------------------------------------------------------------
RunnerType = Callable[..., subprocess.CompletedProcess]

_DEFAULT_WORKSPACE = Path("/home/jay/workspace")
_DEFAULT_CRON_HISTORY_DIR = Path("/home/jay/.cokacdir/schedule_history")
_EVENTS_DIR_NAME = "memory/events"
_TIMERS_FILE_NAME = "memory/task-timers.json"
_TIMERS_ARCHIVE_FILE_NAME = "memory/task-timers-archived.json"

# Telegram cut-off heuristic: response near this byte count is suspicious
_TELEGRAM_TRUNCATION_THRESHOLD_BYTES = 4000


# ---------------------------------------------------------------------------
# Enums
# ---------------------------------------------------------------------------

class LifecycleState(str, Enum):
    """7 lifecycle states (회장 명시 정확 매칭)."""

    RUNNING = "RUNNING"
    PR_OPEN = "PR_OPEN"
    MERGED_PENDING_RECONCILE = "MERGED_PENDING_RECONCILE"
    RECONCILING = "RECONCILING"
    FINALIZED = "FINALIZED"
    STUCK_NEEDS_RECONCILE = "STUCK_NEEDS_RECONCILE"
    ESCALATED = "ESCALATED"


class StuckReason(str, Enum):
    """기존 8 + task-2521 §3 BOT_CANCELLED_* 4 + task-2529 §5 AUTO_FINALIZE_CHAIN_MISSING 4."""

    TIMER_RUNNING_BUT_PR_MERGED = "TIMER_RUNNING_BUT_PR_MERGED"
    PR_MERGED_BUT_DONE_MISSING = "PR_MERGED_BUT_DONE_MISSING"
    MERGE_COMMIT_BUT_MERGE_DONE_MISSING = "MERGE_COMMIT_BUT_MERGE_DONE_MISSING"
    CI_PASS_BUT_NOT_FINALIZED = "CI_PASS_BUT_NOT_FINALIZED"
    TELEGRAM_REPLY_CUT_OFF = "TELEGRAM_REPLY_CUT_OFF"
    BOT_SESSION_ENDED_BUT_TASK_OK = "BOT_SESSION_ENDED_BUT_TASK_OK"
    FINISH_TASK_INTERRUPTED = "FINISH_TASK_INTERRUPTED"
    STALE_ESCALATE_MARKER = "STALE_ESCALATE_MARKER"
    # ★ task-2521 §3 — bot session cancelled 격상 4종.
    # 7 signal 함께 검토: worktree mtime / process / PR / commit / CI / report artefact / cron status.
    # Telegram 무응답만으로 stuck 판정 절대 X (feedback_bot_no_response_not_dead_260509 정합).
    BOT_CANCELLED_TIMER_RUNNING_PR_MISSING = "BOT_CANCELLED_TIMER_RUNNING_PR_MISSING"
    BOT_CANCELLED_WITH_ACTIVE_WORKTREE = "BOT_CANCELLED_WITH_ACTIVE_WORKTREE"
    BOT_CANCELLED_AFTER_COMMIT_BEFORE_PR = "BOT_CANCELLED_AFTER_COMMIT_BEFORE_PR"
    BOT_CANCELLED_AFTER_PR_BEFORE_FINALIZE = "BOT_CANCELLED_AFTER_PR_BEFORE_FINALIZE"
    # ★ task-2529 §5 — AUTO_FINALIZE_CHAIN_MISSING 격상 4종.
    # 회장 §명시 (2026-05-10): 봇이 본질+검증 PASS 후 PR/commit/merge로 자동 진입하지 않은 시스템 결함을
    # lifecycle 단계에서 탐지해 stuck으로 박제한다. 본 사건 fixture: task-2524+1 (dev5 사라스와티).
    # AUTO_FINALIZE_STUCK_AGE_MIN 보다 오래된 경우만 stuck 처리 (in-progress 오탐 방지).
    CODE_DONE_BUT_NO_COMMIT = "CODE_DONE_BUT_NO_COMMIT"
    COMMIT_DONE_BUT_NO_PR = "COMMIT_DONE_BUT_NO_PR"
    PR_OPEN_BUT_NO_MERGE_ATTEMPT = "PR_OPEN_BUT_NO_MERGE_ATTEMPT"
    SELF_VERIFIED_BUT_NOT_FINALIZED = "SELF_VERIFIED_BUT_NOT_FINALIZED"
    # ★ task-2535 — schedule_id freshness validator 신호등 sync fix C (회장 §명시 2026-05-10).
    # cron schedule_id 가 SCHEDULE_FRESHNESS_THRESHOLD_MIN (60분) 이상 무응답인데
    # task-timer 는 여전히 'running' 으로 남아 있는 stuck 신호. 자동머지 시그널이 stale schedule_id
    # 로 잘못된 활성 판정을 내리는 것을 차단한다. utils/schedule_id_freshness.py 와 정합.
    STALE_SCHEDULE_ID = "STALE_SCHEDULE_ID"


# task-2521 §3 — worktree active 판정 임계 (mtime이 5분 이내)
_BOT_CANCELLED_ACTIVE_WORKTREE_MAX_AGE_SECONDS = 300.0

# task-2529 §5 — AUTO_FINALIZE_CHAIN_MISSING stuck 판정 임계.
# 본 4 stuck은 시간 기반 휴리스틱이 핵심. report_artifact가 만들어졌거나 commit이 push된
# 시점에서 일정 시간 내에 다음 단계로 진입하지 않으면 stuck으로 판정.
# 너무 짧으면 in-progress 작업을 오탐(false positive)하고, 너무 길면 회장이 보고 받기 전
# stuck 신호가 자동 분류되지 않는 문제. 회장 §명시 task-2524+1 (사라스와티) 사례에서
# 자체 검증 PASS 후 PR 미생성을 즉시 stuck으로 분류해야 한다는 요구가 있어 보수적 5분 사용.
_AUTO_FINALIZE_STUCK_AGE_SECONDS = 300.0

# ★ task-2535 — schedule_id freshness validator (신호등 sync fix C / 회장 §명시 2026-05-10).
# schedule_id_freshness 모듈의 SCHEDULE_FRESHNESS_THRESHOLD_MIN 과 정합 — 60분 이상 cron
# schedule 응답이 없는데 timer 가 still running 이면 STALE_SCHEDULE_ID 로 박제한다.


# ---------------------------------------------------------------------------
# Dataclasses
# ---------------------------------------------------------------------------

@dataclass
class LifecycleEvidence:
    """모든 source-of-truth 증거를 모은 unified envelope."""

    task_id: str
    # GitHub upstream
    pr_number: Optional[int]
    pr_state: Optional[str]              # OPEN / CLOSED / MERGED
    merge_commit: Optional[str]
    merged_into_main: bool               # origin/main에 mergeCommit 포함 여부
    ci_status: Optional[str]             # SUCCESS / FAILURE / PENDING / null
    # 후속 evidence
    smoke_status: Optional[str]          # PASS / FAIL / SKIPPED / null
    # task-timer
    timer_status: Optional[str]          # running / completed / null
    timer_end_time: Optional[str]
    # file marker (derived; 우선순위 가장 낮음)
    has_done: bool
    has_done_acked: bool
    has_merge_done: bool
    has_qc_result: bool
    has_followup: bool
    has_escalate_marker: bool
    escalate_marker_age_minutes: Optional[float]
    # cron history 기반
    telegram_reply_truncated: bool       # cron history 마지막 레코드 truncate 의심
    bot_session_status: Optional[str]    # cron status: ok/cancelled/error/null
    # finish-task 흔적
    worktree_exists: bool
    branch_pushed_to_remote: bool
    # ★ task-2521 §3 — 7 signal 보강 (BOT_CANCELLED_* 분류용)
    worktree_mtime_seconds_ago: Optional[float] = None  # worktree active 판정 (None=알 수 없음)
    has_pushed_commits: bool = False                    # task 브랜치에 commit 푸시됨 (PR 없어도)
    report_artifact_present: bool = False               # memory/reports/<task>* 등 artefact 존재
    process_alive: Optional[bool] = None                # process 신호 (None=감지 미수행)
    # ★ task-2529 §5 — AUTO_FINALIZE_CHAIN_MISSING 보강
    # PR createdAt → 현재 시각 차이. None이면 PR이 없거나 createdAt 미수집.
    # PR_OPEN_BUT_NO_MERGE_ATTEMPT 판정의 핵심 신호 (worktree mtime은 PR 연식과 무관).
    pr_open_age_seconds: Optional[float] = None
    # report_artifact의 mtime 기준 경과 시간. None=artifact 없거나 mtime 추출 실패.
    # SELF_VERIFIED_BUT_NOT_FINALIZED / CODE_DONE_BUT_NO_COMMIT 판정용 — 자체 검증 보고서가
    # 만들어진 시점부터 일정 시간 내에 finalize chain이 진입했는지 확인한다.
    report_artifact_age_seconds: Optional[float] = None
    # ★ task-2535 — schedule_id freshness (신호등 sync fix C / 회장 §명시 2026-05-10).
    # cron schedule_id (cokacdir schedule_history 의 식별자) 와 freshness 분류.
    # schedule_id 가 매핑되지 않으면 둘 다 None — STALE_SCHEDULE_ID stuck 판정 보류.
    # schedule_id_freshness ∈ {"FRESH", "STALE", "MISSING", None}.
    schedule_id: Optional[str] = None
    schedule_id_freshness: Optional[str] = None
    # 마지막 chairman record 로부터 경과 초 (detail 메시지 노출용). None=log 없음.
    schedule_id_age_seconds: Optional[float] = None


@dataclass
class StuckCase:
    reason: StuckReason
    detail: str


@dataclass
class LifecycleReport:
    """reconcile 결과 envelope."""

    task_id: str
    state: LifecycleState
    stuck_cases: list[StuckCase] = field(default_factory=list)
    evidence: Optional[LifecycleEvidence] = None
    actions_taken: list[str] = field(default_factory=list)
    actions_planned: list[str] = field(default_factory=list)
    dry_run: bool = True
    timestamp: str = ""
    reconcile_run_id: str = ""           # uuid4 hex
    backfill_metadata: dict[str, Any] = field(default_factory=dict)

    def evidence_to_dict(self) -> dict:
        """evidence를 dict로 직렬화 (EscalationPacket 연동 등에서 사용)."""
        if self.evidence is None:
            return {}
        return {k: v for k, v in self.evidence.__dict__.items()}

    def to_dict(self) -> dict:
        return {
            "task_id": self.task_id,
            "state": self.state.value,
            "stuck_cases": [
                {"reason": sc.reason.value, "detail": sc.detail}
                for sc in self.stuck_cases
            ],
            "evidence": self.evidence_to_dict(),
            "actions_taken": self.actions_taken,
            "actions_planned": self.actions_planned,
            "dry_run": self.dry_run,
            "timestamp": self.timestamp,
            "reconcile_run_id": self.reconcile_run_id,
            "backfill_metadata": self.backfill_metadata,
        }

    def to_json(self) -> str:
        return json.dumps(self.to_dict(), indent=2, ensure_ascii=False)


# ---------------------------------------------------------------------------
# Internal helpers
# ---------------------------------------------------------------------------

def _default_runner(
    args: list[str],
    *,
    cwd: Optional[str] = None,
) -> subprocess.CompletedProcess:
    return subprocess.run(
        args,
        cwd=cwd,
        capture_output=True,
        text=True,
        timeout=30,
    )


def _run(
    args: list[str],
    *,
    cwd: Optional[str | Path] = None,
    runner: Optional[RunnerType] = None,
) -> subprocess.CompletedProcess:
    fn = runner if runner is not None else _default_runner
    return fn(args, cwd=str(cwd) if cwd is not None else None)


def _now_iso() -> str:
    return datetime.now(timezone.utc).isoformat()


def _events_dir(workspace_root: Path) -> Path:
    return workspace_root / _EVENTS_DIR_NAME


def _marker_path(workspace_root: Path, task_id: str, suffix: str) -> Path:
    return _events_dir(workspace_root) / f"{task_id}.{suffix}"


def _default_workspace_root() -> Path:
    return _DEFAULT_WORKSPACE


def _resolve_workspace(task_id: str, *, runner: Optional[RunnerType] = None) -> CanonicalWorkspace:
    """Resolve canonical workspace for the task (delegates to canonical_workspace_resolver)."""
    try:
        return resolve_canonical_workspace(task_id, fetch=False, runner=runner)
    except Exception:
        # fallback: build minimal workspace stub pointing to default workspace
        return CanonicalWorkspace(
            task_id=task_id,
            workspace_root=_DEFAULT_WORKSPACE,
            worktree_path=_DEFAULT_WORKSPACE / ".worktrees" / task_id,
            branch_name=f"task/{task_id}",
            main_head_sha="",
            base_sha="",
            cwd=_DEFAULT_WORKSPACE,
            is_main=True,
            is_clean=True,
        )


# ---------------------------------------------------------------------------
# Evidence collection helpers
# ---------------------------------------------------------------------------

def _gather_pr_info(
    task_id: str,
    *,
    runner: Optional[RunnerType] = None,
    workspace_root: Optional[Path] = None,
    pr_lookup: Optional[Callable[[str], dict]] = None,
) -> dict:
    """Fetch PR info via gh CLI or injected pr_lookup."""
    if pr_lookup is not None:
        try:
            return pr_lookup(task_id)
        except Exception as exc:
            logger.warning("pr_lookup failed for %s: %s", task_id, exc)
            return {}

    cwd = str(workspace_root or _default_workspace_root())
    try:
        r = _run(
            [
                "gh", "pr", "list",
                "--search", f"head:task/{task_id}",
                "--state", "all",
                # task-2529: createdAt 필요 (PR_OPEN_BUT_NO_MERGE_ATTEMPT 판정)
                "--json", "number,state,mergeCommit,createdAt",
                "--limit", "5",
            ],
            cwd=cwd,
            runner=runner,
        )
        if r.returncode != 0 or not r.stdout.strip():
            return {}
        items = json.loads(r.stdout)
        if not items:
            return {}
        # Prefer MERGED, then OPEN, then first
        for item in items:
            if item.get("state") == "MERGED":
                return item
        return items[0]
    except Exception as exc:
        logger.warning("gh pr list failed for %s: %s", task_id, exc)
        return {}


def _check_merged_into_main(
    merge_commit: str,
    *,
    runner: Optional[RunnerType] = None,
    workspace_root: Optional[Path] = None,
) -> bool:
    """Check if merge_commit is an ancestor of origin/main."""
    if not merge_commit:
        return False
    cwd = str(workspace_root or _default_workspace_root())
    try:
        r = _run(
            ["git", "merge-base", "--is-ancestor", merge_commit, "origin/main"],
            cwd=cwd,
            runner=runner,
        )
        return r.returncode == 0
    except Exception as exc:
        logger.warning("merge-base check failed for %s: %s", merge_commit, exc)
        return False


def _gather_ci_status(
    pr_number: int,
    *,
    runner: Optional[RunnerType] = None,
    workspace_root: Optional[Path] = None,
) -> Optional[str]:
    """Fetch CI rollup status via gh pr view --json statusCheckRollup."""
    cwd = str(workspace_root or _default_workspace_root())
    try:
        r = _run(
            [
                "gh", "pr", "view", str(pr_number),
                "--json", "statusCheckRollup",
            ],
            cwd=cwd,
            runner=runner,
        )
        if r.returncode != 0 or not r.stdout.strip():
            return None
        data = json.loads(r.stdout)
        rollup = data.get("statusCheckRollup") or []
        if not rollup:
            return None
        # Aggregate: all SUCCESS → SUCCESS, any FAILURE → FAILURE, else PENDING
        states = {item.get("state") or item.get("conclusion") or "" for item in rollup}
        if "FAILURE" in states or "FAILED" in states:
            return "FAILURE"
        if all(s in {"SUCCESS", "COMPLETED"} for s in states if s):
            return "SUCCESS"
        return "PENDING"
    except Exception as exc:
        logger.warning("CI status fetch failed for PR %s: %s", pr_number, exc)
        return None


def _gather_timer_info(
    task_id: str,
    *,
    workspace_root: Optional[Path] = None,
    timer_loader: Optional[Callable[[str], dict]] = None,
) -> dict:
    """Load timer info from task-timers.json or injected timer_loader."""
    if timer_loader is not None:
        try:
            return timer_loader(task_id)
        except Exception as exc:
            logger.warning("timer_loader failed for %s: %s", task_id, exc)
            return {}

    timers_file = (workspace_root or _default_workspace_root()) / _TIMERS_FILE_NAME
    try:
        if not timers_file.exists():
            return {}
        raw = timers_file.read_text(encoding="utf-8")
        data = json.loads(raw)
        tasks = data.get("tasks", {})
        return tasks.get(task_id, {})
    except Exception as exc:
        logger.warning("task-timers.json read failed for %s: %s", task_id, exc)
        return {}


def _gather_file_markers(
    task_id: str,
    *,
    workspace_root: Optional[Path] = None,
) -> dict:
    """Check file markers in memory/events/."""
    wd = workspace_root or _default_workspace_root()
    ev = _events_dir(wd)

    def exists(suffix: str) -> bool:
        return (ev / f"{task_id}.{suffix}").exists()

    has_escalate = exists("done.escalated")
    escalate_age: Optional[float] = None
    if has_escalate:
        try:
            mtime = (ev / f"{task_id}.done.escalated").stat().st_mtime
            age_secs = datetime.now(timezone.utc).timestamp() - mtime
            escalate_age = age_secs / 60.0
        except Exception:
            escalate_age = None

    return {
        "has_done": exists("done"),
        "has_done_acked": exists("done.acked"),
        "has_merge_done": exists("merge-done"),
        "has_qc_result": exists("qc-result"),
        "has_followup": exists("followup.txt"),
        "has_escalate_marker": has_escalate,
        "escalate_marker_age_minutes": escalate_age,
    }


def _is_truncated_response(text: str) -> bool:
    """Heuristic: response is likely truncated if it ends abruptly."""
    if not text:
        return False
    stripped = text.rstrip()
    # Near max bytes and ends with incomplete sentence / word
    if len(text.encode("utf-8")) >= _TELEGRAM_TRUNCATION_THRESHOLD_BYTES:
        return True
    # Ends with Korean syllable mid-word (hangul range U+AC00–U+D7A3)
    if stripped and "가" <= stripped[-1] <= "힣":
        return True
    # Ends with opening code block marker without closing
    if "```" in stripped:
        count = stripped.count("```")
        if count % 2 != 0:
            return True
    return False


def _gather_cron_history(
    task_id: str,
    *,
    cron_history_dir: Optional[Path] = None,
) -> dict:
    """Scan cron history logs to detect Telegram cut-off and bot session status."""
    history_dir = cron_history_dir or _DEFAULT_CRON_HISTORY_DIR
    result = {
        "telegram_reply_truncated": False,
        "bot_session_status": None,
    }

    try:
        if not history_dir.exists():
            return result

        # Find log files whose last record has task_id in prompt
        matching_last_record: Optional[dict] = None
        for log_file in sorted(history_dir.glob("*.log"), key=lambda p: p.stat().st_mtime, reverse=True):
            try:
                lines = log_file.read_text(encoding="utf-8", errors="replace").splitlines()
                # Search from end for a line containing task_id
                for line in reversed(lines):
                    line = line.strip()
                    if not line:
                        continue
                    try:
                        record = json.loads(line)
                    except json.JSONDecodeError:
                        continue
                    prompt = record.get("prompt", "") or ""
                    if task_id in prompt:
                        matching_last_record = record
                        break
                if matching_last_record is not None:
                    break
            except Exception:
                continue

        if matching_last_record is None:
            return result

        # Check truncation
        response = matching_last_record.get("response", "") or ""
        result["telegram_reply_truncated"] = _is_truncated_response(response)

        # Bot session status
        status = matching_last_record.get("status", None)
        if status is not None:
            result["bot_session_status"] = str(status)

    except Exception as exc:
        logger.warning("cron history scan failed for %s: %s", task_id, exc)

    return result


def _gather_worktree_info(
    task_id: str,
    *,
    runner: Optional[RunnerType] = None,
    workspace_root: Optional[Path] = None,
) -> dict:
    """Check worktree + branch + (task-2521 §3) mtime / pushed commits / artefact."""
    wd = workspace_root or _default_workspace_root()
    worktrees_root = wd / ".worktrees"
    result: dict[str, Any] = {
        "worktree_exists": False,
        "branch_pushed_to_remote": False,
        "worktree_mtime_seconds_ago": None,
        "has_pushed_commits": False,
        "report_artifact_present": False,
    }

    # Check worktree existence + mtime (task-2521 §3 signal)
    try:
        import glob as _glob
        pattern = str(worktrees_root / f"{task_id}-*")
        candidates = _glob.glob(pattern)
        result["worktree_exists"] = bool(candidates)
        if candidates:
            try:
                latest_mtime = max(
                    Path(c).stat().st_mtime for c in candidates if Path(c).exists()
                )
                age_secs = datetime.now(timezone.utc).timestamp() - latest_mtime
                result["worktree_mtime_seconds_ago"] = age_secs
            except Exception:
                pass
    except Exception:
        pass

    # Check if branch exists on remote
    try:
        r = _run(
            ["git", "ls-remote", "--heads", "origin", f"task/{task_id}*"],
            cwd=str(wd),
            runner=runner,
        )
        if r.returncode == 0 and r.stdout.strip():
            result["branch_pushed_to_remote"] = True
            result["has_pushed_commits"] = True
    except Exception as exc:
        logger.warning("ls-remote failed for %s: %s", task_id, exc)

    # task-2521 §3 — report artefact (memory/reports/<task>*)
    # task-2529 §5: report artefact 최신 mtime 추출 (AUTO_FINALIZE_CHAIN_MISSING age 판정용)
    try:
        reports_dir = wd / "memory" / "reports"
        if reports_dir.exists():
            matches = list(reports_dir.glob(f"{task_id}*"))
            if matches:
                result["report_artifact_present"] = True
                try:
                    latest = max(
                        Path(m).stat().st_mtime for m in matches if Path(m).exists()
                    )
                    age = datetime.now(timezone.utc).timestamp() - latest
                    result["report_artifact_age_seconds"] = age
                except Exception:
                    result["report_artifact_age_seconds"] = None
    except Exception:
        pass

    return result


# ---------------------------------------------------------------------------
# Public API: gather_evidence
# ---------------------------------------------------------------------------

def gather_evidence(
    task_id: str,
    *,
    workspace_root: Optional[Path] = None,
    runner: Optional[RunnerType] = None,
    cron_history_dir: Optional[Path] = None,
    pr_lookup: Optional[Callable[[str], dict]] = None,
    timer_loader: Optional[Callable[[str], dict]] = None,
) -> LifecycleEvidence:
    """모든 evidence source 수집 — gh/git/file/timer/cron history.

    workspace_root가 None이면 resolve_canonical_workspace(task_id, fetch=False)로
    자동 결정 (§6 CanonicalWorkspace 연동). 실패 시 fallback Path('/home/jay/workspace').
    """
    # CanonicalWorkspace 연동 (회장 §6): workspace_root override가 없으면 canonical resolver 사용
    if workspace_root is None:
        try:
            cws: CanonicalWorkspace = _resolve_workspace(task_id, runner=runner)
            wd = cws.workspace_root
        except Exception:
            wd = _DEFAULT_WORKSPACE
    else:
        wd = workspace_root

    # 1. PR info
    pr_info = _gather_pr_info(
        task_id,
        runner=runner,
        workspace_root=wd,
        pr_lookup=pr_lookup,
    )
    pr_number: Optional[int] = pr_info.get("number")
    raw_state = pr_info.get("state")
    pr_state: Optional[str] = str(raw_state).upper() if raw_state else None

    # mergeCommit may be dict {"oid": "..."} or plain string
    raw_mc = pr_info.get("mergeCommit")
    if isinstance(raw_mc, dict):
        merge_commit: Optional[str] = raw_mc.get("oid") or raw_mc.get("sha")
    elif isinstance(raw_mc, str) and raw_mc:
        merge_commit = raw_mc
    else:
        merge_commit = None

    # task-2529 §5: PR createdAt → 경과 시간 (PR_OPEN_BUT_NO_MERGE_ATTEMPT 판정용)
    pr_open_age_seconds: Optional[float] = None
    raw_created = pr_info.get("createdAt")
    if isinstance(raw_created, str) and raw_created:
        try:
            # ISO8601 (gh format ends with Z) → datetime → epoch
            iso = raw_created.replace("Z", "+00:00")
            created_dt = datetime.fromisoformat(iso)
            now_ts = datetime.now(timezone.utc).timestamp()
            pr_open_age_seconds = max(0.0, now_ts - created_dt.timestamp())
        except Exception:
            pr_open_age_seconds = None

    # 2. merged_into_main
    merged_into_main = False
    if merge_commit:
        merged_into_main = _check_merged_into_main(
            merge_commit,
            runner=runner,
            workspace_root=wd,
        )

    # 3. CI status
    ci_status: Optional[str] = None
    if pr_number is not None:
        ci_status = _gather_ci_status(pr_number, runner=runner, workspace_root=wd)

    # 4. smoke status — check qc-result file for PASS/FAIL marker
    smoke_status: Optional[str] = None
    ev_dir = _events_dir(wd)
    qc_file = ev_dir / f"{task_id}.qc-result"
    if qc_file.exists():
        try:
            content = qc_file.read_text(encoding="utf-8").upper()
            if "PASS" in content:
                smoke_status = "PASS"
            elif "FAIL" in content:
                smoke_status = "FAIL"
            else:
                smoke_status = "SKIPPED"
        except Exception:
            smoke_status = None

    # 5. Timer
    timer_info = _gather_timer_info(task_id, workspace_root=wd, timer_loader=timer_loader)
    timer_status: Optional[str] = timer_info.get("status")
    timer_end_time: Optional[str] = timer_info.get("end_time")

    # 6. File markers
    markers = _gather_file_markers(task_id, workspace_root=wd)

    # 7. Cron history
    cron = _gather_cron_history(task_id, cron_history_dir=cron_history_dir)

    # 8. Worktree / branch
    wt_info = _gather_worktree_info(task_id, runner=runner, workspace_root=wd)

    return LifecycleEvidence(
        task_id=task_id,
        pr_number=pr_number,
        pr_state=pr_state,
        merge_commit=merge_commit,
        merged_into_main=merged_into_main,
        ci_status=ci_status,
        smoke_status=smoke_status,
        timer_status=timer_status,
        timer_end_time=timer_end_time,
        has_done=markers["has_done"],
        has_done_acked=markers["has_done_acked"],
        has_merge_done=markers["has_merge_done"],
        has_qc_result=markers["has_qc_result"],
        has_followup=markers["has_followup"],
        has_escalate_marker=markers["has_escalate_marker"],
        escalate_marker_age_minutes=markers["escalate_marker_age_minutes"],
        telegram_reply_truncated=cron["telegram_reply_truncated"],
        bot_session_status=cron["bot_session_status"],
        worktree_exists=wt_info["worktree_exists"],
        branch_pushed_to_remote=wt_info["branch_pushed_to_remote"],
        # task-2521 §3 — 7 signal 보강
        worktree_mtime_seconds_ago=wt_info.get("worktree_mtime_seconds_ago"),
        has_pushed_commits=bool(wt_info.get("has_pushed_commits", False)),
        report_artifact_present=bool(wt_info.get("report_artifact_present", False)),
        process_alive=None,  # 외부 주입 전용 (현재는 감지 미수행)
        # task-2529 §5 — AUTO_FINALIZE_CHAIN_MISSING 보강
        pr_open_age_seconds=pr_open_age_seconds,
        report_artifact_age_seconds=wt_info.get("report_artifact_age_seconds"),
    )


# ---------------------------------------------------------------------------
# Public API: detect_stuck_cases
# ---------------------------------------------------------------------------

def detect_stuck_cases(evidence: LifecycleEvidence) -> list[StuckCase]:
    """8 케이스 자동 감지."""
    cases: list[StuckCase] = []

    # 1. TIMER_RUNNING_BUT_PR_MERGED
    if evidence.timer_status == "running" and evidence.pr_state == "MERGED":
        cases.append(StuckCase(
            reason=StuckReason.TIMER_RUNNING_BUT_PR_MERGED,
            detail=(
                f"timer_status=running but pr_state=MERGED "
                f"(pr_number={evidence.pr_number}, merge_commit={evidence.merge_commit})"
            ),
        ))

    # 2. PR_MERGED_BUT_DONE_MISSING
    if evidence.pr_state == "MERGED" and not evidence.has_done:
        cases.append(StuckCase(
            reason=StuckReason.PR_MERGED_BUT_DONE_MISSING,
            detail=(
                f"pr_state=MERGED but .done marker missing "
                f"(pr_number={evidence.pr_number})"
            ),
        ))

    # 3. MERGE_COMMIT_BUT_MERGE_DONE_MISSING
    if evidence.merge_commit and not evidence.has_merge_done:
        cases.append(StuckCase(
            reason=StuckReason.MERGE_COMMIT_BUT_MERGE_DONE_MISSING,
            detail=(
                f"merge_commit={evidence.merge_commit} present but .merge-done missing"
            ),
        ))

    # 4. CI_PASS_BUT_NOT_FINALIZED
    if (
        evidence.ci_status == "SUCCESS"
        and evidence.smoke_status == "PASS"
        and evidence.pr_state == "MERGED"
        and (not evidence.has_done_acked or not evidence.has_merge_done)
    ):
        cases.append(StuckCase(
            reason=StuckReason.CI_PASS_BUT_NOT_FINALIZED,
            detail=(
                f"ci=SUCCESS, smoke=PASS, pr=MERGED but "
                f"has_done_acked={evidence.has_done_acked}, has_merge_done={evidence.has_merge_done}"
            ),
        ))

    # 5. TELEGRAM_REPLY_CUT_OFF
    if evidence.telegram_reply_truncated:
        cases.append(StuckCase(
            reason=StuckReason.TELEGRAM_REPLY_CUT_OFF,
            detail=(
                f"cron history last response appears truncated "
                f"(bot_session_status={evidence.bot_session_status})"
            ),
        ))

    # 6. BOT_SESSION_ENDED_BUT_TASK_OK
    if evidence.bot_session_status in {"cancelled", "error"} and evidence.merged_into_main:
        cases.append(StuckCase(
            reason=StuckReason.BOT_SESSION_ENDED_BUT_TASK_OK,
            detail=(
                f"bot_session_status={evidence.bot_session_status} but "
                f"merged_into_main=True (task evidence is OK)"
            ),
        ))

    # 7. FINISH_TASK_INTERRUPTED
    if (
        evidence.worktree_exists
        and evidence.branch_pushed_to_remote
        and not evidence.pr_number
    ):
        cases.append(StuckCase(
            reason=StuckReason.FINISH_TASK_INTERRUPTED,
            detail=(
                "worktree exists + branch pushed to remote but no PR found — "
                "finish-task likely interrupted before PR creation"
            ),
        ))

    # ------------------------------------------------------------------
    # ★ task-2521 §3 — BOT_CANCELLED_* 4종 격상 (7 signal 함께 검토)
    # 정책: bot_session_status=cancelled + (timer_running or worktree_active or
    #       commit_pushed or pr_open) 조합을 정확히 분류.
    # 절대 조건: Telegram 무응답만으로 stuck 판정 X — 다른 signal 함께 있어야 함.
    # ------------------------------------------------------------------
    if evidence.bot_session_status == "cancelled":
        # 7 signal 점검: worktree mtime / process / PR / commit / CI / report artefact / cron status
        wt_active = (
            evidence.worktree_mtime_seconds_ago is not None
            and evidence.worktree_mtime_seconds_ago
            <= _BOT_CANCELLED_ACTIVE_WORKTREE_MAX_AGE_SECONDS
        )
        pr_open = evidence.pr_state == "OPEN"
        pr_missing = evidence.pr_number is None
        timer_running = evidence.timer_status == "running"

        # 정확 분류 우선순위 (회장 §3 제공 순서대로):
        #   (1) PR_OPEN + finalize 미완 → AFTER_PR_BEFORE_FINALIZE
        #   (2) commit pushed + PR missing → AFTER_COMMIT_BEFORE_PR
        #   (3) timer running + PR missing → TIMER_RUNNING_PR_MISSING
        #   (4) worktree active (mtime<5min) → ACTIVE_WORKTREE
        # PR open + finalize 미완 (.done.acked 또는 .merge-done 누락 — PR가 살아있으면 finalize도 미수행)
        if pr_open and (
            not evidence.has_done_acked or not evidence.has_merge_done
        ):
            cases.append(StuckCase(
                reason=StuckReason.BOT_CANCELLED_AFTER_PR_BEFORE_FINALIZE,
                detail=(
                    f"bot_session=cancelled + pr_state=OPEN (pr={evidence.pr_number}) + "
                    f"finalize 미완 (has_done_acked={evidence.has_done_acked}, "
                    f"has_merge_done={evidence.has_merge_done})"
                ),
            ))
        elif pr_missing and evidence.has_pushed_commits:
            cases.append(StuckCase(
                reason=StuckReason.BOT_CANCELLED_AFTER_COMMIT_BEFORE_PR,
                detail=(
                    f"bot_session=cancelled + pr_missing + commits_pushed "
                    f"(branch_pushed_to_remote={evidence.branch_pushed_to_remote})"
                ),
            ))
        elif pr_missing and timer_running:
            cases.append(StuckCase(
                reason=StuckReason.BOT_CANCELLED_TIMER_RUNNING_PR_MISSING,
                detail=(
                    f"bot_session=cancelled + timer_status=running + pr_missing "
                    f"(worktree_exists={evidence.worktree_exists})"
                ),
            ))
        elif wt_active:
            cases.append(StuckCase(
                reason=StuckReason.BOT_CANCELLED_WITH_ACTIVE_WORKTREE,
                detail=(
                    f"bot_session=cancelled + worktree active "
                    f"(mtime≤{_BOT_CANCELLED_ACTIVE_WORKTREE_MAX_AGE_SECONDS:.0f}s ago: "
                    f"{evidence.worktree_mtime_seconds_ago:.1f}s); "
                    f"timer_status={evidence.timer_status}, pr_state={evidence.pr_state}"
                ),
            ))

    # ------------------------------------------------------------------
    # ★ task-2529 §5 — AUTO_FINALIZE_CHAIN_MISSING 4종 (회장 §명시 2026-05-10)
    # 본 4 stuck은 봇이 본질+검증 PASS 후 다음 단계(commit/push/PR/merge)로 자동
    # 진입하지 않은 시스템 결함을 lifecycle 단계에서 박제한다.
    # 본 사건 fixture: task-2524+1 (dev5 사라스와티) — 자체 검증 PASS 후 PR 미생성.
    #
    # 절대 조건:
    #   - PR이 이미 MERGED인 task는 제외 (이미 finalize chain 성공한 것)
    #   - in-progress 오탐 방지를 위해 _AUTO_FINALIZE_STUCK_AGE_SECONDS 임계 사용
    #   - 임계 기준 신호: report_artifact_age_seconds (자체 검증 보고서 mtime) 우선,
    #     없으면 worktree_mtime_seconds_ago, 둘 다 없으면 stuck 판정 보류 (false negative 허용)
    # ------------------------------------------------------------------
    if evidence.pr_state != "MERGED":
        # 자체 검증 보고서 mtime을 1차 신호로, worktree mtime을 2차 신호로 사용.
        report_age = evidence.report_artifact_age_seconds
        worktree_age = evidence.worktree_mtime_seconds_ago
        # "stable enough to be considered stuck" — 5분 이상 정체된 신호 1개라도 있으면 True
        report_stale = (
            report_age is not None and report_age >= _AUTO_FINALIZE_STUCK_AGE_SECONDS
        )
        worktree_stale = (
            worktree_age is not None and worktree_age >= _AUTO_FINALIZE_STUCK_AGE_SECONDS
        )
        is_stale = report_stale or worktree_stale

        # 1) CODE_DONE_BUT_NO_COMMIT — 자체 검증 보고서가 있는데 push된 commit 없음
        if (
            evidence.report_artifact_present
            and not evidence.has_pushed_commits
            and not evidence.branch_pushed_to_remote
            and is_stale
        ):
            cases.append(StuckCase(
                reason=StuckReason.CODE_DONE_BUT_NO_COMMIT,
                detail=(
                    f"report_artifact_present=True (age={report_age}) but "
                    f"no commits pushed; "
                    f"worktree_age={worktree_age}; "
                    f"AUTO_FINALIZE_CHAIN_MISSING — commit 단계로 자동 진입하지 않음"
                ),
            ))

        # 2) COMMIT_DONE_BUT_NO_PR — branch가 push되어 있는데 PR 없음
        if (
            evidence.has_pushed_commits
            and evidence.branch_pushed_to_remote
            and evidence.pr_number is None
            and is_stale
        ):
            cases.append(StuckCase(
                reason=StuckReason.COMMIT_DONE_BUT_NO_PR,
                detail=(
                    f"branch_pushed_to_remote=True but pr_number=None; "
                    f"worktree_age={worktree_age}, report_age={report_age}; "
                    f"AUTO_FINALIZE_CHAIN_MISSING — PR 생성 단계로 자동 진입하지 않음"
                ),
            ))

        # 3) PR_OPEN_BUT_NO_MERGE_ATTEMPT — PR open + CI SUCCESS인데 merge attempt 흔적 없음
        # PR createdAt 기반 경과 시간을 사용 (worktree mtime으로 대체 안 됨 — PR이 오래
        # 머물렀는지가 핵심).
        if evidence.pr_state == "OPEN":
            pr_age = evidence.pr_open_age_seconds
            pr_age_stale = (
                pr_age is not None and pr_age >= _AUTO_FINALIZE_STUCK_AGE_SECONDS
            )
            if (
                evidence.ci_status == "SUCCESS"
                and pr_age_stale
                and not evidence.merge_commit  # 머지 시도 결과 absent
            ):
                cases.append(StuckCase(
                    reason=StuckReason.PR_OPEN_BUT_NO_MERGE_ATTEMPT,
                    detail=(
                        f"pr_state=OPEN + ci_status=SUCCESS (pr_number={evidence.pr_number}, "
                        f"pr_open_age={pr_age:.0f}s ≥ {_AUTO_FINALIZE_STUCK_AGE_SECONDS:.0f}s) but "
                        f"no merge_commit; AUTO_FINALIZE_CHAIN_MISSING — "
                        f"bot identity merge 단계로 자동 진입하지 않음"
                    ),
                ))

        # 4) SELF_VERIFIED_BUT_NOT_FINALIZED — 본 사건 fixture (task-2524+1).
        # report_artifact가 PASS를 주장하는데 PR이 없거나 OPEN 상태로 stuck.
        # 위 #1/#2/#3와 어느 정도 겹치지만, 본 항목은 "self-verified PASS 보고서가 있는데
        # finalize 미완"이라는 가장 일반적 신호를 잡기 위해 별도 박제.
        if (
            evidence.report_artifact_present
            and (
                evidence.pr_number is None
                or evidence.pr_state in ("OPEN", "CLOSED")
            )
            and is_stale
        ):
            cases.append(StuckCase(
                reason=StuckReason.SELF_VERIFIED_BUT_NOT_FINALIZED,
                detail=(
                    f"report_artifact_present=True (age={report_age}) + "
                    f"pr_state={evidence.pr_state} (pr_number={evidence.pr_number}); "
                    f"AUTO_FINALIZE_CHAIN_MISSING — 자체 검증 PASS 후 finalize 단계 미완"
                ),
            ))

    # ------------------------------------------------------------------
    # ★ task-2535 — STALE_SCHEDULE_ID (신호등 sync fix C / 회장 §명시 2026-05-10).
    # cron schedule_id 가 SCHEDULE_FRESHNESS_THRESHOLD_MIN (60분) 이상 무응답인데
    # task-timer 가 still 'running' 인 경우만 stuck 박제. 자동머지 시그널이
    # stale schedule_id 를 활성으로 오판하는 것을 차단하는 evidence-only stuck.
    #
    # 절대 조건:
    #   - schedule_id_freshness == "STALE"  (FRESH/MISSING/None 은 보류)
    #   - timer_status == "running"
    #   - PR 이 이미 MERGED 인 task 는 제외 (별도 stuck reason 들이 처리)
    # ------------------------------------------------------------------
    if (
        evidence.schedule_id_freshness == "STALE"
        and evidence.timer_status == "running"
        and evidence.pr_state != "MERGED"
    ):
        age_secs = evidence.schedule_id_age_seconds
        age_repr = (
            f"{age_secs:.0f}s" if isinstance(age_secs, (int, float)) else "unknown"
        )
        cases.append(StuckCase(
            reason=StuckReason.STALE_SCHEDULE_ID,
            detail=(
                f"schedule_id={evidence.schedule_id} freshness=STALE "
                f"(age={age_repr} ≥ {_SCHEDULE_FRESHNESS_THRESHOLD_MIN}min) but "
                f"timer_status=running — cron 응답 없는 schedule_id 로 활성 판정 차단"
            ),
        ))

    # 8. STALE_ESCALATE_MARKER
    if evidence.has_escalate_marker and (evidence.escalate_marker_age_minutes or 0) > 30:
        # Check: does any Critical escalation evidence match?
        # If there's no active Critical signal (pr_state not stuck due to CI, etc.) → stale
        has_active_critical = (
            evidence.ci_status in {"FAILURE"} or
            (evidence.pr_state == "MERGED" and not evidence.merged_into_main)
        )
        if not has_active_critical:
            cases.append(StuckCase(
                reason=StuckReason.STALE_ESCALATE_MARKER,
                detail=(
                    f"escalate marker exists for "
                    f"{evidence.escalate_marker_age_minutes:.1f} min but no Critical evidence found"
                ),
            ))

    return cases


# ---------------------------------------------------------------------------
# Public API: determine_state
# ---------------------------------------------------------------------------

def determine_state(evidence: LifecycleEvidence) -> tuple[LifecycleState, list[StuckCase]]:
    """evidence priority 규칙으로 state + stuck cases 결정.

    우선순위 (회장 명시):
    PR state > mergeCommit > origin/main 포함 > CI > smoke > timer > file marker
    """
    stuck_cases = detect_stuck_cases(evidence)

    # FINALIZED: merged + evidence 모두 충족
    if (
        evidence.pr_state == "MERGED"
        and evidence.merged_into_main
        and evidence.ci_status in {"SUCCESS", None}  # CI may not exist for old PRs
        and evidence.has_done_acked
        and evidence.has_merge_done
        and evidence.timer_status in {"completed", None}
    ):
        return LifecycleState.FINALIZED, []

    # MERGED_PENDING_RECONCILE: PR merged + origin/main confirmed, but finalize incomplete
    if evidence.pr_state == "MERGED" and evidence.merged_into_main:
        missing = []
        if not evidence.has_done_acked:
            missing.append(".done.acked")
        if not evidence.has_merge_done:
            missing.append(".merge-done")
        if evidence.timer_status == "running":
            missing.append("timer-end")
        if missing:
            return LifecycleState.MERGED_PENDING_RECONCILE, stuck_cases

    # PR merged (no origin/main confirmation yet) but .done/.merge-done missing → stuck
    if evidence.pr_state == "MERGED" and not evidence.merged_into_main:
        if stuck_cases:
            return LifecycleState.STUCK_NEEDS_RECONCILE, stuck_cases

    # RUNNING: PR open + timer running (normal in-progress)
    if evidence.pr_state == "OPEN" and evidence.timer_status == "running":
        if stuck_cases:
            return LifecycleState.STUCK_NEEDS_RECONCILE, stuck_cases
        return LifecycleState.RUNNING, []

    # PR_OPEN: PR open + timer not running
    if evidence.pr_state == "OPEN" and evidence.timer_status != "running":
        if stuck_cases:
            return LifecycleState.STUCK_NEEDS_RECONCILE, stuck_cases
        return LifecycleState.PR_OPEN, []

    # ESCALATED: escalate marker is active (non-stale)
    if evidence.has_escalate_marker:
        age = evidence.escalate_marker_age_minutes or 0
        if age <= 30:
            return LifecycleState.ESCALATED, stuck_cases

    # Any stuck cases → STUCK_NEEDS_RECONCILE
    if stuck_cases:
        return LifecycleState.STUCK_NEEDS_RECONCILE, stuck_cases

    # No PR info + no timer → unknown, default conservative: RUNNING
    if evidence.pr_state is None and evidence.timer_status is None:
        return LifecycleState.RUNNING, []

    # Timer running, no PR → RUNNING
    if evidence.timer_status == "running" and evidence.pr_state is None:
        return LifecycleState.RUNNING, []

    # Timer completed, no PR → assume FINALIZED if all markers present
    if evidence.timer_status == "completed" and evidence.has_done_acked and evidence.has_merge_done:
        return LifecycleState.FINALIZED, []

    # Fallback
    return LifecycleState.RUNNING, []


# ---------------------------------------------------------------------------
# Public API: assert_no_manual_done_forgery
# ---------------------------------------------------------------------------

def assert_no_manual_done_forgery(
    task_id: str,
    evidence: LifecycleEvidence,
    *,
    workspace_root: Optional[Path] = None,
) -> None:
    """evidence가 부족한데 강제로 .done을 만들려고 하면 RuntimeError.

    충분한 evidence:
    - pr_state == MERGED and merged_into_main, OR
    - merge_commit not None and merged_into_main and (ci_status == SUCCESS or smoke_status == PASS)
    """
    wd = workspace_root or _default_workspace_root()
    done_path = _marker_path(wd, task_id, "done")

    cond_a = evidence.pr_state == "MERGED" and evidence.merged_into_main
    cond_b = (
        bool(evidence.merge_commit)
        and evidence.merged_into_main
        and (evidence.ci_status == "SUCCESS" or evidence.smoke_status == "PASS")
    )
    has_sufficient = cond_a or cond_b

    if not has_sufficient:
        raise RuntimeError(
            f"MANUAL_DONE_FORGERY_BLOCKED: task={task_id} — insufficient evidence to create .done. "
            f"Target path would be: {done_path}. "
            f"Requires (pr_state=MERGED + merged_into_main) OR "
            f"(merge_commit + merged_into_main + ci/smoke PASS). "
            f"Got: pr_state={evidence.pr_state}, merged_into_main={evidence.merged_into_main}, "
            f"merge_commit={evidence.merge_commit}, ci_status={evidence.ci_status}, "
            f"smoke_status={evidence.smoke_status}"
        )


# ---------------------------------------------------------------------------
# Backfill helpers
# ---------------------------------------------------------------------------

def _build_backfill_metadata(
    task_id: str,
    reconcile_run_id: str,
    evidence: LifecycleEvidence,
) -> dict:
    """Build JSON body for backfilled marker files."""
    evidence_source: list[str] = []
    if evidence.pr_state:
        evidence_source.append(f"pr_state={evidence.pr_state}")
    if evidence.merge_commit:
        evidence_source.append(f"merge_commit={evidence.merge_commit}")
    if evidence.ci_status:
        evidence_source.append(f"ci_status={evidence.ci_status}")
    if evidence.smoke_status:
        evidence_source.append(f"smoke_status={evidence.smoke_status}")
    if evidence.merged_into_main:
        evidence_source.append("merged_into_main=True")

    return {
        "task_id": task_id,
        "reconciled_by": "lifecycle_reconciliation_manager",
        "reconcile_run_id": reconcile_run_id,
        "timestamp": _now_iso(),
        "evidence_source": evidence_source,
        "merge_commit": evidence.merge_commit,
        "note": "evidence-based backfill (not manual forgery)",
    }


def _default_file_writer(path: Path, content: str) -> None:
    path.parent.mkdir(parents=True, exist_ok=True)
    path.write_text(content, encoding="utf-8")


def _default_timer_writer(task_id: str, timer_data: dict) -> None:
    """Write updated timer data back to task-timers.json."""
    timers_file = _default_workspace_root() / _TIMERS_FILE_NAME
    try:
        raw = timers_file.read_text(encoding="utf-8") if timers_file.exists() else "{}"
        data = json.loads(raw)
    except Exception:
        data = {}
    tasks = data.setdefault("tasks", {})
    tasks[task_id] = timer_data
    timers_file.write_text(json.dumps(data, indent=2, ensure_ascii=False), encoding="utf-8")


def _backfill_markers(
    task_id: str,
    reconcile_run_id: str,
    evidence: LifecycleEvidence,
    state: LifecycleState,
    *,
    workspace_root: Optional[Path] = None,
    apply: bool = False,
    file_writer: Optional[Callable[[Path, str], None]] = None,
    timer_writer: Optional[Callable[[str, dict], None]] = None,
    actions_taken: list[str],
    actions_planned: list[str],
) -> dict:
    """Backfill missing markers based on evidence. Returns backfill_metadata.

    `state` is recorded in metadata so reconcile audit trails preserve which
    LifecycleState authorised the backfill.
    """

    wd = workspace_root or _default_workspace_root()
    fw = file_writer if file_writer is not None else _default_file_writer
    tw = timer_writer if timer_writer is not None else _default_timer_writer

    meta = _build_backfill_metadata(task_id, reconcile_run_id, evidence)
    meta["authorising_state"] = state.value if isinstance(state, LifecycleState) else str(state)
    meta_json = json.dumps(meta, indent=2, ensure_ascii=False)

    def do_or_plan(action_name: str, fn: Callable[[], None]) -> None:
        if apply:
            try:
                fn()
                actions_taken.append(action_name)
            except Exception as exc:
                logger.error("backfill action %s failed: %s", action_name, exc)
        else:
            actions_planned.append(action_name)

    # Guard: must have sufficient evidence before any .done backfill
    try:
        assert_no_manual_done_forgery(task_id, evidence, workspace_root=wd)
    except RuntimeError as exc:
        logger.warning("Backfill blocked: %s", exc)
        if apply:
            actions_taken.append(f"BLOCKED:{exc}")
        else:
            actions_planned.append(f"BLOCKED (insufficient evidence)")
        return meta

    # .done
    if not evidence.has_done:
        path = _marker_path(wd, task_id, "done")
        do_or_plan("created_done", lambda p=path: fw(p, meta_json))

    # .done.acked
    if not evidence.has_done_acked:
        path = _marker_path(wd, task_id, "done.acked")
        do_or_plan("created_done_acked", lambda p=path: fw(p, meta_json))

    # .merge-done
    if not evidence.has_merge_done:
        path = _marker_path(wd, task_id, "merge-done")
        do_or_plan("wrote_merge_done", lambda p=path: fw(p, meta_json))

    # timer end
    if evidence.timer_status == "running":
        timer_end_iso = _now_iso()
        timer_payload = {
            "status": "completed",
            "end_time": timer_end_iso,
            "ended_by": "lifecycle_reconciliation_manager",
            "reconcile_run_id": reconcile_run_id,
        }
        do_or_plan("ended_timer", lambda tp=timer_payload: tw(task_id, tp))

    return meta


# ---------------------------------------------------------------------------
# task-2528: worktree completion → task-timers.json reconcile
# ---------------------------------------------------------------------------
#
# task-2527 audit (2026-05-10) root cause:
#   worktree에서 완료된 task가 task-timers.json (active/archived 양쪽)에 entry가
#   누락되면 dashboard helpers.py:450-454 mtime fallback이 발동하여 동일 mtime
#   클러스터(5/9 14:32 working-tree 재구성 케이스 등)에서 모든 record가 동일
#   timestamp로 표시되는 회귀가 발생한다.
#
# 본 helper는 root cause 권고 #1 한정 직접 fix:
#   - worktree 완료 evidence (.done/.merge-done/PR MERGED) 가 있는데
#     active+archived 양쪽 task-timers.json에 entry가 없는 경우만 처리
#   - .done JSON → .merge-done JSON → 파일 mtime 순으로 timer entry 재구성
#   - active task-timers.json에 insert (idempotent)
#   - archived에 이미 있으면 active 추가하지 않음 (collision 방지)
# ---------------------------------------------------------------------------


def _read_timers_file(path: Path) -> dict:
    """task-timers.json / task-timers-archived.json safe read."""
    try:
        if not path.exists():
            return {}
        raw = path.read_text(encoding="utf-8")
        if not raw.strip():
            return {}
        data = json.loads(raw)
        if not isinstance(data, dict):
            return {}
        return data
    except Exception as exc:
        logger.warning("timers file read failed (%s): %s", path, exc)
        return {}


def _timer_entry_present(
    task_id: str,
    *,
    active_path: Path,
    archive_path: Path,
) -> tuple[bool, str]:
    """Return (present, source) where source ∈ {"active","archive",""}."""
    active = _read_timers_file(active_path)
    if isinstance(active.get("tasks"), dict) and task_id in active["tasks"]:
        return True, "active"
    archive = _read_timers_file(archive_path)
    if isinstance(archive.get("tasks"), dict) and task_id in archive["tasks"]:
        return True, "archive"
    return False, ""


def _read_done_json(workspace_root: Path, task_id: str) -> dict:
    """Read memory/events/<task_id>.done JSON if present (else {})."""
    p = _marker_path(workspace_root, task_id, "done")
    if not p.exists():
        return {}
    try:
        raw = p.read_text(encoding="utf-8").strip()
        if not raw:
            return {}
        data = json.loads(raw)
        if not isinstance(data, dict):
            return {}
        return data
    except Exception as exc:
        logger.warning("done JSON read failed for %s: %s", task_id, exc)
        return {}


def _read_merge_done_json(workspace_root: Path, task_id: str) -> dict:
    """Read memory/events/<task_id>.merge-done JSON if present (else {})."""
    p = _marker_path(workspace_root, task_id, "merge-done")
    if not p.exists():
        return {}
    try:
        raw = p.read_text(encoding="utf-8").strip()
        if not raw:
            return {}
        data = json.loads(raw)
        if not isinstance(data, dict):
            return {}
        return data
    except Exception as exc:
        logger.warning("merge-done JSON read failed for %s: %s", task_id, exc)
        return {}


def _build_reconciled_timer_entry(
    task_id: str,
    reconcile_run_id: str,
    evidence: LifecycleEvidence,
    *,
    workspace_root: Path,
) -> Optional[dict]:
    """Construct timer entry from .done / .merge-done evidence.

    Priority:
      1. .done JSON (task-2528 primary source — has end_time/duration/team_id)
      2. .merge-done JSON
      3. .done file mtime (last resort; flagged in metadata)
    Returns None if no completion evidence available (caller decides no-op).
    """
    done_data = _read_done_json(workspace_root, task_id)
    merge_done_data = _read_merge_done_json(workspace_root, task_id)

    end_time: Optional[str] = (
        done_data.get("end_time")
        or done_data.get("completed_at")
        or merge_done_data.get("end_time")
        or merge_done_data.get("completed_at")
    )
    team_id: Optional[str] = (
        done_data.get("team_id")
        or merge_done_data.get("team_id")
    )
    duration_seconds = (
        done_data.get("duration_seconds")
        or merge_done_data.get("duration_seconds")
    )
    qc_result = (
        done_data.get("qc_result")
        or merge_done_data.get("qc_result")
    )

    source = "done_json" if done_data else ("merge_done_json" if merge_done_data else None)

    # Final fallback: file mtime of .done if it exists
    if not end_time and evidence.has_done:
        try:
            done_path = _marker_path(workspace_root, task_id, "done")
            mtime = datetime.fromtimestamp(done_path.stat().st_mtime, tz=timezone.utc)
            end_time = mtime.isoformat()
            source = source or "done_mtime"
        except Exception:
            pass

    if not end_time and evidence.has_merge_done:
        try:
            md_path = _marker_path(workspace_root, task_id, "merge-done")
            mtime = datetime.fromtimestamp(md_path.stat().st_mtime, tz=timezone.utc)
            end_time = mtime.isoformat()
            source = source or "merge_done_mtime"
        except Exception:
            pass

    if not end_time:
        # No usable evidence — caller should no-op
        return None

    # Compute start_time from end_time - duration_seconds when available
    start_time: Optional[str] = None
    if duration_seconds:
        try:
            from datetime import timedelta
            end_dt = datetime.fromisoformat(end_time.replace("Z", "+00:00"))
            start_dt = end_dt - timedelta(seconds=float(duration_seconds))
            start_time = start_dt.isoformat()
        except Exception:
            start_time = None
    if start_time is None:
        start_time = end_time

    entry: dict[str, Any] = {
        "task_id": task_id,
        "team_id": team_id,
        "start_time": start_time,
        "end_time": end_time,
        "status": "completed",
        "ended_by": "lifecycle_reconciliation_manager",
        "reconcile_run_id": reconcile_run_id,
        "reconciled_from": source or "evidence",
        "reconciled_at": _now_iso(),
        "note": "task-2528: worktree completion → task-timers.json backfill",
    }
    if duration_seconds is not None:
        try:
            entry["duration_seconds"] = float(duration_seconds)
        except Exception:
            pass
    if qc_result:
        entry["qc_result"] = qc_result
    if evidence.merge_commit:
        entry["merge_commit"] = evidence.merge_commit
    if evidence.pr_number is not None:
        entry["pr_number"] = evidence.pr_number

    return entry


def _reconcile_worktree_completion_to_timers(
    task_id: str,
    reconcile_run_id: str,
    evidence: LifecycleEvidence,
    *,
    workspace_root: Optional[Path] = None,
    apply: bool = False,
    active_timer_inserter: Optional[Callable[[str, dict], None]] = None,
    actions_taken: list[str],
    actions_planned: list[str],
) -> dict:
    """Bridge worktree-completion evidence into task-timers.json (task-2528 fix).

    Trigger conditions (ALL required):
      - Has completion evidence: has_done OR has_merge_done OR pr_state == MERGED
      - Active task-timers.json has no entry for task_id
      - Archived task-timers-archived.json has no entry for task_id

    On match: build entry from .done JSON / .merge-done JSON / mtime fallback,
    insert into active timers (idempotent — no-op if any subsequent run finds entry).

    Returns metadata dict (caller stores it in LifecycleReport.backfill_metadata).
    """
    wd = workspace_root or _default_workspace_root()
    active_path = wd / _TIMERS_FILE_NAME
    archive_path = wd / _TIMERS_ARCHIVE_FILE_NAME

    has_completion_evidence = (
        evidence.has_done
        or evidence.has_merge_done
        or evidence.pr_state == "MERGED"
    )

    meta: dict[str, Any] = {
        "task_id": task_id,
        "reconcile_run_id": reconcile_run_id,
        "skipped": False,
        "reason": None,
        "entry_inserted": False,
        "source": None,
    }

    if not has_completion_evidence:
        meta["skipped"] = True
        meta["reason"] = "no_completion_evidence"
        return meta

    present, source = _timer_entry_present(
        task_id,
        active_path=active_path,
        archive_path=archive_path,
    )
    if present:
        meta["skipped"] = True
        meta["reason"] = f"timer_entry_present_in_{source}"
        return meta

    entry = _build_reconciled_timer_entry(
        task_id,
        reconcile_run_id,
        evidence,
        workspace_root=wd,
    )
    if entry is None:
        meta["skipped"] = True
        meta["reason"] = "no_usable_evidence_for_reconstruction"
        return meta

    meta["source"] = entry.get("reconciled_from")
    meta["proposed_entry"] = entry

    inserter = active_timer_inserter
    if inserter is None:
        # Default writer scoped to wd, not _DEFAULT_WORKSPACE — important for tests
        def _scoped_inserter(tid: str, ent: dict, _wd: Path = wd) -> None:
            timers_file = _wd / _TIMERS_FILE_NAME
            data = _read_timers_file(timers_file)
            if not isinstance(data, dict):
                data = {}
            tasks = data.setdefault("tasks", {})
            if not isinstance(tasks, dict):
                tasks = {}
                data["tasks"] = tasks
            if tid in tasks:
                return
            tasks[tid] = ent
            timers_file.parent.mkdir(parents=True, exist_ok=True)
            timers_file.write_text(
                json.dumps(data, indent=2, ensure_ascii=False),
                encoding="utf-8",
            )

        inserter = _scoped_inserter

    action_label = "reconciled_worktree_timer"
    if apply:
        try:
            inserter(task_id, entry)
            meta["entry_inserted"] = True
            actions_taken.append(action_label)
        except Exception as exc:
            logger.error("worktree timer reconcile insert failed for %s: %s", task_id, exc)
            meta["reason"] = f"insert_failed: {exc}"
    else:
        actions_planned.append(action_label)

    return meta


# ---------------------------------------------------------------------------
# Public API: reconcile
# ---------------------------------------------------------------------------

def reconcile(
    task_id: str,
    *,
    apply: bool = False,
    workspace_root: Optional[Path] = None,
    runner: Optional[RunnerType] = None,
    cron_history_dir: Optional[Path] = None,
    pr_lookup: Optional[Callable[[str], dict]] = None,
    timer_loader: Optional[Callable[[str], dict]] = None,
    timer_writer: Optional[Callable[[str, dict], None]] = None,
    file_writer: Optional[Callable[[Path, str], None]] = None,
) -> LifecycleReport:
    """idempotent reconcile.

    - state == FINALIZED → no-op
    - state == MERGED_PENDING_RECONCILE + apply=True → backfill with evidence metadata
    - state == STUCK_NEEDS_RECONCILE + apply=True → reason별 backfill
    - apply=False → actions_planned에만 기록, 실제 변경 없음
    - 동일 reconcile 반복 호출 → 동일 state, no-op (멱등)
    """
    reconcile_run_id = uuid.uuid4().hex
    timestamp = _now_iso()

    evidence = gather_evidence(
        task_id,
        workspace_root=workspace_root,
        runner=runner,
        cron_history_dir=cron_history_dir,
        pr_lookup=pr_lookup,
        timer_loader=timer_loader,
    )

    state, stuck_cases = determine_state(evidence)

    actions_taken: list[str] = []
    actions_planned: list[str] = []
    backfill_metadata: dict[str, Any] = {}

    if state == LifecycleState.FINALIZED:
        # Already finalized — pure no-op
        logger.info("task=%s already FINALIZED, no-op", task_id)
    elif state in {
        LifecycleState.MERGED_PENDING_RECONCILE,
        LifecycleState.STUCK_NEEDS_RECONCILE,
    }:
        backfill_metadata = _backfill_markers(
            task_id,
            reconcile_run_id,
            evidence,
            state,
            workspace_root=workspace_root,
            apply=apply,
            file_writer=file_writer,
            timer_writer=timer_writer,
            actions_taken=actions_taken,
            actions_planned=actions_planned,
        )
    else:
        logger.debug("task=%s state=%s, no backfill needed", task_id, state.value)

    # task-2528: worktree completion → task-timers.json reconcile.
    # Runs regardless of LifecycleState — required so that 8 tasks from
    # 2026-05-09/14:32 fixture (FINALIZED-but-no-timer) get an entry too.
    timer_reconcile_meta = _reconcile_worktree_completion_to_timers(
        task_id,
        reconcile_run_id,
        evidence,
        workspace_root=workspace_root,
        apply=apply,
        actions_taken=actions_taken,
        actions_planned=actions_planned,
    )
    if not timer_reconcile_meta.get("skipped"):
        backfill_metadata.setdefault("worktree_timer_reconcile", timer_reconcile_meta)
    elif timer_reconcile_meta.get("reason") and timer_reconcile_meta["reason"].startswith("timer_entry_present_in_"):
        # Idempotent no-op — record presence source for audit
        backfill_metadata.setdefault("worktree_timer_reconcile", timer_reconcile_meta)

    return LifecycleReport(
        task_id=task_id,
        state=state,
        stuck_cases=stuck_cases,
        evidence=evidence,
        actions_taken=actions_taken,
        actions_planned=actions_planned,
        dry_run=not apply,
        timestamp=timestamp,
        reconcile_run_id=reconcile_run_id,
        backfill_metadata=backfill_metadata,
    )


# ---------------------------------------------------------------------------
# Public API: scan_stuck
# ---------------------------------------------------------------------------

def scan_stuck(
    *,
    workspace_root: Optional[Path] = None,
    runner: Optional[RunnerType] = None,
    apply: bool = False,
    timer_loader: Optional[Callable[[], dict]] = None,
) -> list[LifecycleReport]:
    """task-timers.json 전체 순회 → STUCK인 것만 보고."""
    wd = workspace_root or _default_workspace_root()
    timers_file = wd / _TIMERS_FILE_NAME

    task_ids: list[str] = []
    try:
        if timers_file.exists():
            raw = timers_file.read_text(encoding="utf-8")
            data = json.loads(raw)
            task_ids = list(data.get("tasks", {}).keys())
    except Exception as exc:
        logger.warning("scan_stuck: failed to load task-timers.json: %s", exc)

    if not task_ids:
        logger.info("scan_stuck: no tasks found in task-timers.json")
        return []

    stuck_reports: list[LifecycleReport] = []

    for tid in task_ids:
        # Adapt timer_loader signature (scan_stuck provides Callable[[], dict], reconcile expects Callable[[str], dict])
        adapted_loader: Optional[Callable[[str], dict]] = None
        if timer_loader is not None:
            def _make_adapter(tl: Callable[[], dict]) -> Callable[[str], dict]:
                def _adapter(task_id: str) -> dict:
                    all_timers = tl()
                    return all_timers.get("tasks", {}).get(task_id, {})
                return _adapter
            adapted_loader = _make_adapter(timer_loader)

        try:
            report = reconcile(
                tid,
                apply=apply,
                workspace_root=wd,
                runner=runner,
                timer_loader=adapted_loader,
            )
            if report.state in {
                LifecycleState.STUCK_NEEDS_RECONCILE,
                LifecycleState.MERGED_PENDING_RECONCILE,
                LifecycleState.ESCALATED,
            }:
                stuck_reports.append(report)
        except Exception as exc:
            logger.error("scan_stuck: reconcile failed for %s: %s", tid, exc)

    return stuck_reports


# ---------------------------------------------------------------------------
# automation_contracts 연동 헬퍼 (회장 §6)
# ---------------------------------------------------------------------------

def smoke_result_to_status(sr: SmokeResult) -> str:
    """automation_contracts.SmokeResult → lifecycle smoke_status string.

    expects post_merge_smoke_runner.SmokeResult.passed →
    smoke_status='PASS' if True else 'FAIL'
    """
    return "PASS" if sr.passed else "FAIL"


def build_automation_decision(report: LifecycleReport) -> AutomationDecision:
    """LifecycleReport를 AutomationDecision으로 매핑 (자동화 의사결정 contract 연동).

    회장 §6: CanonicalWorkspace + automation_contracts 연동 필수.
    """
    if report.state == LifecycleState.FINALIZED:
        decision_str = "NO_OP"
        requires_chair = False
        critical: Optional[CriticalEscalationType] = None
    elif report.state == LifecycleState.ESCALATED:
        decision_str = "ESCALATE"
        requires_chair = True
        critical = CriticalEscalationType.POST_MERGE_SMOKE_FAILED
    elif report.state == LifecycleState.MERGED_PENDING_RECONCILE:
        decision_str = "BACKFILL"
        requires_chair = False
        critical = None
    else:
        decision_str = "MONITOR"
        requires_chair = False
        critical = None

    return AutomationDecision(
        decision=decision_str,
        reason_codes=[c.reason.value for c in report.stuck_cases],
        critical_escalation_type=critical,
        auto_handled=(not requires_chair),
        requires_chair=requires_chair,
        audit={
            "task_id": report.task_id,
            "reconcile_run_id": report.reconcile_run_id,
            "timestamp": report.timestamp,
        },
    )


def build_escalation_packet(
    report: LifecycleReport, *, pr_number: int = 0
) -> Optional[EscalationPacket]:
    """ESCALATED state에 대해 EscalationPacket 생성. 그 외 None.

    ⚠️ lifecycle은 일반적으로 ESCALATED를 직접 발동하지 않으므로,
    이 헬퍼는 explicit caller가 호출해야 함.
    """
    if report.state != LifecycleState.ESCALATED:
        return None

    stuck_details = "; ".join(
        f"{sc.reason.value}: {sc.detail}" for sc in report.stuck_cases
    ) or "lifecycle ESCALATED state detected"

    return EscalationPacket(
        task_id=report.task_id,
        pr_number=pr_number,
        escalation_type=CriticalEscalationType.POST_MERGE_SMOKE_FAILED,
        reason=stuck_details,
        why_auto_cannot_continue=(
            "lifecycle state is ESCALATED — human review required before proceeding"
        ),
        safe_options=[
            "Review stuck_cases and resolve manually",
            "Run reconcile --apply after resolving root cause",
            "Escalate to chair if unresolvable",
        ],
        recommended_option="Review stuck_cases and resolve manually",
        evidence=report.evidence_to_dict(),
    )


# ---------------------------------------------------------------------------
# CLI entrypoint
# ---------------------------------------------------------------------------

def _build_cli_parser() -> argparse.ArgumentParser:
    p = argparse.ArgumentParser(
        description=(
            "lifecycle_reconciliation_manager — "
            "idempotent task lifecycle reconcile (task-2518 P0)"
        )
    )
    group = p.add_mutually_exclusive_group(required=True)
    group.add_argument(
        "--reconcile",
        action="store_true",
        help="Reconcile a single task (requires --task-id)",
    )
    group.add_argument(
        "--scan-stuck",
        action="store_true",
        help="Scan all tasks in task-timers.json and report stuck ones",
    )
    p.add_argument("--task-id", help="task-NNNN identifier (required for --reconcile)")
    p.add_argument(
        "--apply",
        action="store_true",
        default=False,
        help="Actually apply backfill (default: dry-run)",
    )
    p.add_argument(
        "--json",
        action="store_true",
        dest="output_json",
        default=True,
        help="Output as JSON (default: True)",
    )
    p.add_argument(
        "--workspace-root",
        default=None,
        help="Override workspace root path",
    )
    return p


def _cli_main(argv: Optional[list[str]] = None) -> None:
    parser = _build_cli_parser()
    args = parser.parse_args(argv)

    workspace_root: Optional[Path] = None
    if args.workspace_root:
        workspace_root = Path(args.workspace_root)

    if args.reconcile:
        if not args.task_id:
            parser.error("--reconcile requires --task-id")
        try:
            report = reconcile(
                args.task_id,
                apply=args.apply,
                workspace_root=workspace_root,
            )
            print(report.to_json())
        except Exception as exc:
            print(json.dumps({"error": str(exc), "task_id": args.task_id}), file=sys.stderr)
            sys.exit(1)

    elif args.scan_stuck:
        try:
            reports = scan_stuck(
                workspace_root=workspace_root,
                apply=args.apply,
            )
            output = [r.to_dict() for r in reports]
            print(json.dumps(output, indent=2, ensure_ascii=False))
        except Exception as exc:
            print(json.dumps({"error": str(exc)}), file=sys.stderr)
            sys.exit(1)


if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO, format="%(levelname)s %(name)s: %(message)s")
    _cli_main()
