#!/usr/bin/env python3
"""
taskctl.py — main 진입 단일화 + 상태 enforcement layer (task-2467 Phase A+B 통합)

회장 절대 기준:
    "taskctl을 거치지 않고는 main을 절대 변경할 수 없다."
    "모든 PR은 bot이 생성하고, 인간은 승인만 하며, main 반영은 taskctl만 수행한다."

상태 모델 (14정상 + 5예외 = 19종):
    정상: CREATED → WORKTREE_READY → RUNNING → [HANDOFF_READY →] COMMITTED
          → PR_OPEN → CI_PENDING → GEMINI_PENDING → REVIEW_READY → VERIFIED
          → HUMAN_APPROVED → MERGING → MERGED → DONE
    예외: BLOCKED / CANCELLED / FAILED / ESCALATED / ADMIN_OVERRIDE_USED
    호환: DISPATCHED / ACKED / GUARD_PASS (기존 11상태 backwards-compat)

저장 위치:
    {WORKSPACE}/.tasks/state/<task-id>.json

Evidence:
    {WORKSPACE}/.tasks/evidence/<task-id>/<name>.json

bypass (회장 전용):
    TASKCTL_BYPASS=1 환경변수로 skip. evidence에 강제 기록.

종료 코드:
    0   PASS / 정상 전이
    1   FAIL (가드 차단, 잘못된 전이, evidence 미달, etc.)
    2   internal error (subprocess 실행 실패 등)
"""
from __future__ import annotations

import argparse
import getpass
import hashlib
import json
import os
import subprocess
import sys
import tempfile
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, NoReturn, overload

# ---------------------------------------------------------------------------
# 상수 / 경로
# ---------------------------------------------------------------------------

WORKSPACE = Path(os.environ.get("WORKSPACE_ROOT", "/home/jay/workspace"))
STATE_DIR = WORKSPACE / ".tasks" / "state"
EVIDENCE_DIR = WORKSPACE / ".tasks" / "evidence"

GUARD_SH = WORKSPACE / "scripts" / "guard.sh"
QC_REPORT_GUARD = WORKSPACE / "scripts" / "qc_report_guard.py"
GEMINI_EVIDENCE_VERIFY = WORKSPACE / "scripts" / "gemini_evidence_verify.py"

ADMIN_OVERRIDE_LOG = WORKSPACE / "memory" / "orchestration-audit" / "admin-override.jsonl"
CHAIRMAN_EMAILS = ("jonghyuk.jeon@gmail.com",)

# 8 required CI checks (task-2440/2445)
REQUIRED_CHECKS: tuple[str, ...] = (
    "cancel-kill-switch",
    "qc-check",
    "hidden-path-audit",
    "lock-in-check",
    "merge-safety-check",
    "gemini-review-gate",
    "ci/guard",
    "guard",
)

# 상태 모델 (19종 + 3 backwards-compat alias)
STATES: tuple[str, ...] = (
    # 14 정상 상태
    "CREATED",
    "WORKTREE_READY",
    "RUNNING",
    "HANDOFF_READY",
    "COMMITTED",
    "PR_OPEN",
    "CI_PENDING",
    "GEMINI_PENDING",
    "REVIEW_READY",
    "VERIFIED",
    "HUMAN_APPROVED",
    "MERGING",
    "MERGED",
    "DONE",
    # 5 예외 상태
    "BLOCKED",
    "CANCELLED",
    "FAILED",
    "ESCALATED",
    "ADMIN_OVERRIDE_USED",
    # 3 backwards-compat alias (기존 11상태 호환)
    "DISPATCHED",
    "ACKED",
    "GUARD_PASS",
)

# 정규 전이 표 (forward-only)
ALLOWED_TRANSITIONS: dict[str, set[str]] = {
    # 신규 14+5 상태
    "CREATED": {"DISPATCHED", "WORKTREE_READY", "ACKED"},
    "DISPATCHED": {"ACKED", "WORKTREE_READY"},
    "ACKED": {"RUNNING", "WORKTREE_READY"},
    "WORKTREE_READY": {"RUNNING"},
    "RUNNING": {"HANDOFF_READY", "COMMITTED", "PR_OPEN"},
    "HANDOFF_READY": {"RUNNING", "COMMITTED"},
    "COMMITTED": {"PR_OPEN"},
    "PR_OPEN": {"CI_PENDING", "GUARD_PASS", "PR_OPEN", "VERIFIED"},
    "CI_PENDING": {"GEMINI_PENDING", "PR_OPEN"},
    "GEMINI_PENDING": {"REVIEW_READY", "BLOCKED", "PR_OPEN"},
    "REVIEW_READY": {"VERIFIED", "GUARD_PASS", "BLOCKED", "PR_OPEN"},
    "VERIFIED": {"HUMAN_APPROVED", "PR_OPEN"},
    "GUARD_PASS": {"HUMAN_APPROVED", "VERIFIED", "PR_OPEN"},  # 호환
    "HUMAN_APPROVED": {"MERGING", "MERGED", "VERIFIED"},
    "MERGING": {"MERGED", "FAILED"},
    "MERGED": {"DONE"},
    "DONE": set(),
    # 예외 상태 (terminal)
    "CANCELLED": set(),
    "FAILED": set(),
    "ADMIN_OVERRIDE_USED": set(),
    # BLOCKED / ESCALATED는 특수 처리 (비-terminal에서 진입 가능, 이후 다시 PR_OPEN으로)
    "BLOCKED": {"PR_OPEN", "CANCELLED", "FAILED", "ESCALATED"},
    "ESCALATED": {"PR_OPEN", "ADMIN_OVERRIDE_USED"},
}

# Terminal 상태 (전이 불가, cancel/fail은 별도 규칙)
TERMINAL_STATES: set[str] = {"DONE", "CANCELLED", "FAILED", "ADMIN_OVERRIDE_USED"}


# ---------------------------------------------------------------------------
# 시간 / 직렬화
# ---------------------------------------------------------------------------


def _now() -> str:
    return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")


def _actor() -> str:
    try:
        user = getpass.getuser()
    except Exception:
        user = os.environ.get("USER", "unknown")
    try:
        email = subprocess.run(
            ["git", "config", "user.email"],
            capture_output=True, text=True, timeout=5, cwd=str(WORKSPACE),
        ).stdout.strip() or "unknown@local"
    except Exception:
        email = "unknown@local"
    return f"{user} <{email}>"


# ---------------------------------------------------------------------------
# state 파일 입출력 + 무결성 검증
# ---------------------------------------------------------------------------


def _state_path(task_id: str) -> Path:
    return STATE_DIR / f"{task_id}.json"


def _canonical_json(state: dict[str, Any]) -> str:
    payload = {k: v for k, v in state.items() if k != "_checksum"}
    return json.dumps(payload, ensure_ascii=False, sort_keys=True, separators=(",", ":"))


def _compute_checksum(state: dict[str, Any]) -> str:
    return hashlib.sha256(_canonical_json(state).encode("utf-8")).hexdigest()


def _new_state(task_id: str) -> dict[str, Any]:
    return {
        "task_id": task_id,
        "current_state": "CREATED",
        "transitions": [],
        "evidence": {
            "git_diff_sha": None,
            "changed_paths": [],
            "branch": None,
            "pr_number": None,
            "pr_state": None,
            "ci_checks": {},
            "guard_sh_result": None,
            "qc_report_guard_result": None,
            "merge_timestamp": None,
            "exit_codes": {},
        },
        "human_approved": False,
        "bypass": {"used": False, "ts": None, "actor": None},
        "admin_override": {
            "used": False,
            "ts": None,
            "actor": None,
            "reason": None,
            "audit_log_offset": None,
        },
    }


def _save(state: dict[str, Any]) -> None:
    STATE_DIR.mkdir(parents=True, exist_ok=True)
    state.pop("_checksum", None)
    state["_checksum"] = _compute_checksum(state)
    p = _state_path(state["task_id"])
    tmp = p.with_suffix(".json.tmp")
    tmp.write_text(json.dumps(state, ensure_ascii=False, indent=2), encoding="utf-8")
    tmp.replace(p)


@overload
def _load(task_id: str) -> dict[str, Any]: ...
@overload
def _load(task_id: str, *, allow_missing: bool) -> dict[str, Any] | None: ...
def _load(task_id: str, *, allow_missing: bool = False) -> dict[str, Any] | None:
    p = _state_path(task_id)
    if not p.exists():
        if allow_missing:
            return None
        _die(f"state 파일 없음: {p} (먼저 'taskctl init {task_id}' 실행)", 1)
    try:
        state = json.loads(p.read_text(encoding="utf-8"))
    except Exception as exc:
        _die(f"state 파일 파싱 실패: {p} → {exc}", 1)
    stored = state.pop("_checksum", None)
    expected = _compute_checksum(state)
    if stored != expected:
        _die(
            f"checksum 불일치: 외부 수정 의심 (state file tampered).\n"
            f"  task_id={task_id}\n"
            f"  stored={stored}\n"
            f"  expected={expected}\n"
            f"  → taskctl만 상태 변경 가능합니다.",
            1,
        )
    state["_checksum"] = stored
    return state


# ---------------------------------------------------------------------------
# Evidence 헬퍼
# ---------------------------------------------------------------------------


def _save_evidence(task_id: str, name: str, payload: dict[str, Any]) -> None:
    """9종 evidence를 .tasks/evidence/<task-id>/<name>.json에 기록."""
    ev_dir = EVIDENCE_DIR / task_id
    ev_dir.mkdir(parents=True, exist_ok=True)
    record = {
        "command": " ".join(sys.argv),
        "actor": _actor(),
        "timestamp": _now(),
        "pid": os.getpid(),
    }
    record.update(payload)
    # atomic write
    path = ev_dir / f"{name}.json"
    fd, tmp_path = tempfile.mkstemp(dir=str(ev_dir), suffix=".tmp")
    try:
        with os.fdopen(fd, "w", encoding="utf-8") as f:
            json.dump(record, f, ensure_ascii=False, indent=2)
        os.replace(tmp_path, str(path))
    except Exception:
        try:
            os.unlink(tmp_path)
        except OSError:
            pass
        raise


# ---------------------------------------------------------------------------
# 상태 전이
# ---------------------------------------------------------------------------


def _transition(state: dict[str, Any], target: str, *, actor: str | None = None,
                meta: dict[str, Any] | None = None,
                force: bool = False) -> None:
    src = state["current_state"]
    if force:
        # bypass / chairman override: 상태 전이 검증 skip
        pass
    elif target == src and target in {"PR_OPEN"}:
        # idempotent re-verify
        pass
    elif target in {"CANCELLED", "FAILED", "ESCALATED", "BLOCKED"}:
        if src in TERMINAL_STATES:
            _die(f"비정상 전이: {src} → {target} (terminal 상태에서 전이 불가)", 1)
    else:
        # ★ 6개 금지 전이 명시 차단 (회장 §3.9)
        _check_forbidden_transitions(src, target)
        allowed = ALLOWED_TRANSITIONS.get(src, set())
        if target not in allowed:
            _die(
                f"비정상 전이: {src} → {target} (허용: {sorted(allowed) or 'none'})",
                1,
            )
    state["current_state"] = target
    task_id = state.get("task_id", "unknown")
    entry: dict[str, Any] = {
        "from": src,
        "to": target,
        "ts": _now(),
        "actor": actor or "taskctl",
        "command": " ".join(sys.argv),
        "exit_code": 0,
        "evidence_path": str(EVIDENCE_DIR / task_id),
    }
    if force:
        entry["forced"] = True
    if meta:
        entry["meta"] = meta
    state["transitions"].append(entry)


def _check_forbidden_transitions(src: str, target: str) -> None:
    """6개 금지 전이를 명시적으로 차단 (회장 §3.9)."""
    # 1. PR_OPEN 없이 VERIFIED: REVIEW_READY/GUARD_PASS 상태에서만 VERIFIED 허용
    if target == "VERIFIED" and src not in {"REVIEW_READY", "GUARD_PASS", "PR_OPEN", "VERIFIED"}:
        _die(
            f"금지 전이: {src} → VERIFIED (반드시 REVIEW_READY 또는 GUARD_PASS 상태 필요)",
            1,
        )
    # 2. VERIFIED 없이 HUMAN_APPROVED
    if target == "HUMAN_APPROVED" and src not in {"VERIFIED", "GUARD_PASS"}:
        _die(
            f"금지 전이: {src} → HUMAN_APPROVED (VERIFIED 또는 GUARD_PASS 상태 필요)",
            1,
        )
    # 3. HUMAN_APPROVED 없이 MERGING
    if target == "MERGING" and src != "HUMAN_APPROVED":
        _die(
            f"금지 전이: {src} → MERGING (HUMAN_APPROVED 상태 필요)",
            1,
        )
    # 4. MERGED 없이 DONE
    if target == "DONE" and src != "MERGED":
        _die(
            f"금지 전이: {src} → DONE (MERGED 상태 필요)",
            1,
        )
    # 5. CANCELLED에서 다른 상태로 복귀
    if src == "CANCELLED" and target not in set():
        _die(
            f"금지 전이: CANCELLED → {target} (CANCELLED는 terminal 상태)",
            1,
        )
    # 6. BLOCKED에서 MERGING
    if src == "BLOCKED" and target == "MERGING":
        _die(
            f"금지 전이: BLOCKED → MERGING (BLOCKED 상태에서 merge 차단)",
            1,
        )


# ---------------------------------------------------------------------------
# 출력 / 종료
# ---------------------------------------------------------------------------


def _die(msg: str, code: int = 1) -> NoReturn:
    print(f"[taskctl] {msg}", file=sys.stderr)
    sys.exit(code)


def _print_ok(msg: str) -> None:
    print(f"[taskctl] {msg}")


# ---------------------------------------------------------------------------
# subprocess 헬퍼
# ---------------------------------------------------------------------------


def _run(cmd: list[str], *, cwd: Path | None = None, timeout: int = 120,
         env: dict[str, str] | None = None) -> subprocess.CompletedProcess:
    return subprocess.run(
        cmd,
        capture_output=True,
        text=True,
        timeout=timeout,
        cwd=str(cwd or WORKSPACE),
        env={**os.environ, **(env or {})},
    )


# ---------------------------------------------------------------------------
# Bot token 로딩 헬퍼
# ---------------------------------------------------------------------------


def _load_bot_token() -> str | None:
    tok = os.environ.get("BOT_GITHUB_TOKEN")
    if tok:
        return tok
    env_keys = WORKSPACE / ".env.keys"
    if env_keys.exists():
        try:
            for line in env_keys.read_text(encoding="utf-8").splitlines():
                if line.startswith("BOT_GITHUB_TOKEN="):
                    return line.split("=", 1)[1].strip().strip('"').strip("'")
        except Exception:
            pass
    return None


# ---------------------------------------------------------------------------
# Admin override 헬퍼
# ---------------------------------------------------------------------------


def _verify_chairman() -> bool:
    try:
        email = subprocess.run(
            ["git", "config", "user.email"],
            capture_output=True, text=True, timeout=5, cwd=str(WORKSPACE),
        ).stdout.strip()
    except Exception:
        email = ""
    allowed = os.environ.get("CHAIRMAN_EMAILS", ",".join(CHAIRMAN_EMAILS)).split(",")
    return email in [e.strip() for e in allowed]


def _check_admin_cap() -> tuple[int, int, str]:
    """returns (soft_count_30d, hard_count_90d, status: OK|SOFT_CAP_WARNING|HARD_CAP_EXCEEDED)"""
    if not ADMIN_OVERRIDE_LOG.exists():
        return 0, 0, "OK"
    now = datetime.now(timezone.utc)
    soft, hard = 0, 0
    for line in ADMIN_OVERRIDE_LOG.read_text(encoding="utf-8").splitlines():
        if not line.strip():
            continue
        try:
            rec = json.loads(line)
            ts_str = rec.get("ts", "")
            ts = datetime.fromisoformat(ts_str.replace("Z", "+00:00"))
            delta = (now - ts).days
            if delta <= 30:
                soft += 1
            if delta <= 90:
                hard += 1
        except Exception:
            continue
    if hard >= 5:
        return soft, hard, "HARD_CAP_EXCEEDED"
    if soft >= 3:
        return soft, hard, "SOFT_CAP_WARNING"
    return soft, hard, "OK"


def _record_admin_override(
    task_id: str, pr_number: int | None, reason: str,
    head_sha: str, bypassed_checks: list[str],
) -> None:
    ADMIN_OVERRIDE_LOG.parent.mkdir(parents=True, exist_ok=True)
    soft, hard, _ = _check_admin_cap()
    rec = {
        "ts": _now(),
        "task_id": task_id,
        "pr_number": pr_number,
        "actor": "chairman",
        "reason": reason,
        "head_sha": head_sha,
        "bypassed_checks": bypassed_checks,
        "soft_count_this_month": soft + 1,
        "hard_count_this_quarter": hard + 1,
    }
    with ADMIN_OVERRIDE_LOG.open("a", encoding="utf-8") as f:
        f.write(json.dumps(rec, ensure_ascii=False) + "\n")


# ---------------------------------------------------------------------------
# bypass 헬퍼
# ---------------------------------------------------------------------------


def _bypass_active() -> bool:
    return os.environ.get("TASKCTL_BYPASS", "") == "1"


# ---------------------------------------------------------------------------
# PR / gh 헬퍼
# ---------------------------------------------------------------------------


def _gh_pr_view(pr_number: int) -> dict[str, Any]:
    """gh pr view로 PR 정보 조회."""
    proc = _run([
        "gh", "pr", "view", str(pr_number),
        "--json", "state,mergeable,mergeStateStatus,statusCheckRollup,author,headRefName",
    ], timeout=30)
    if proc.returncode != 0:
        return {"error": proc.stderr[-300:].strip()}
    try:
        return json.loads(proc.stdout or "{}")
    except json.JSONDecodeError:
        return {"error": "json_parse_failed"}


# ---------------------------------------------------------------------------
# evidence 수집 헬퍼
# ---------------------------------------------------------------------------


def _collect_git_evidence() -> dict[str, Any]:
    branch = _run(["git", "rev-parse", "--abbrev-ref", "HEAD"]).stdout.strip()
    diff = _run(["git", "diff", "origin/main..HEAD", "--name-only"])
    changed = [p for p in diff.stdout.splitlines() if p.strip()]
    diff_sha = _run(["git", "rev-parse", "HEAD"]).stdout.strip()
    porcelain = _run(["git", "status", "--porcelain"]).stdout
    return {
        "branch": branch or None,
        "git_diff_sha": diff_sha or None,
        "changed_paths": changed,
        "git_status_porcelain_lines": len([1 for ln in porcelain.splitlines() if ln.strip()]),
    }


def _collect_pr_evidence(pr_number: int | None) -> dict[str, Any]:
    if not pr_number:
        return {"pr_state": None, "ci_checks": {}, "mergeable": None,
                "merge_state_status": None, "pr_view_error": "no_pr_number"}
    proc = _run([
        "gh", "pr", "view", str(pr_number),
        "--json", "state,mergeable,mergeStateStatus,statusCheckRollup",
    ], timeout=30)
    if proc.returncode != 0:
        return {"pr_state": None, "ci_checks": {}, "mergeable": None,
                "merge_state_status": None,
                "pr_view_error": proc.stderr[-300:].strip() or "gh_error"}
    try:
        data = json.loads(proc.stdout or "{}")
    except json.JSONDecodeError as exc:
        return {"pr_state": None, "ci_checks": {}, "mergeable": None,
                "merge_state_status": None, "pr_view_error": f"json_parse: {exc}"}
    rollup = data.get("statusCheckRollup") or []
    ci_map: dict[str, str] = {}
    for chk in rollup:
        name = chk.get("name") or chk.get("context") or ""
        conclusion = (chk.get("conclusion") or chk.get("state")
                      or chk.get("status") or "UNKNOWN")
        if name:
            ci_map[name] = conclusion
    required_map = {n: ci_map.get(n, "MISSING") for n in REQUIRED_CHECKS}
    return {
        "pr_state": data.get("state"),
        "ci_checks": required_map,
        "ci_checks_all": ci_map,
        "mergeable": data.get("mergeable"),
        "merge_state_status": data.get("mergeStateStatus"),
    }


def _run_guard_sh(stage: str, task_id: str) -> dict[str, Any]:
    if not GUARD_SH.exists():
        return {"result": "MISSING", "exit_code": 127, "stderr": "guard.sh not found"}
    proc = _run(["bash", str(GUARD_SH), stage, task_id], timeout=180)
    return {
        "result": "PASS" if proc.returncode == 0 else "FAIL",
        "exit_code": proc.returncode,
        "stdout_tail": proc.stdout[-500:],
        "stderr_tail": proc.stderr[-500:],
    }


def _run_qc_report_guard(task_id: str) -> dict[str, Any]:
    if not QC_REPORT_GUARD.exists():
        return {"result": "MISSING", "exit_code": 127, "stderr": "qc_report_guard.py not found"}
    proc = _run(
        ["python3", str(QC_REPORT_GUARD), "--task-id", task_id, "--workspace", str(WORKSPACE)],
        timeout=120,
    )
    return {
        "result": "PASS" if proc.returncode == 0 else "FAIL",
        "exit_code": proc.returncode,
        "stdout_tail": proc.stdout[-500:],
        "stderr_tail": proc.stderr[-500:],
    }


def _all_required_checks_pass(ci_checks: dict[str, str]) -> bool:
    if not ci_checks:
        return False
    return all(v == "SUCCESS" for v in ci_checks.values())


# ---------------------------------------------------------------------------
# atomic write 헬퍼
# ---------------------------------------------------------------------------


def _atomic_write(path: Path, data: dict[str, Any]) -> None:
    """tempfile + os.replace 를 이용한 atomic write."""
    path.parent.mkdir(parents=True, exist_ok=True)
    fd, tmp_path = tempfile.mkstemp(dir=str(path.parent), suffix=".tmp")
    try:
        with os.fdopen(fd, "w", encoding="utf-8") as f:
            json.dump(data, f, ensure_ascii=False, indent=2)
        os.replace(tmp_path, str(path))
    except Exception:
        try:
            os.unlink(tmp_path)
        except OSError:
            pass
        raise


# ---------------------------------------------------------------------------
# 명령 핸들러
# ---------------------------------------------------------------------------


def cmd_init(args: argparse.Namespace) -> int:
    p = _state_path(args.task_id)
    if p.exists():
        _print_ok(f"already initialized: {args.task_id}")
        return 0
    state = _new_state(args.task_id)
    state["transitions"].append({
        "from": None, "to": "CREATED", "ts": _now(), "actor": _actor(),
        "command": " ".join(sys.argv), "exit_code": 0,
        "evidence_path": str(EVIDENCE_DIR / args.task_id),
    })
    _save(state)
    # start.json evidence 기록
    git_ev = _collect_git_evidence()
    _save_evidence(args.task_id, "start", {
        "command": " ".join(sys.argv),
        "actor": _actor(),
        "ts": _now(),
        "branch": git_ev.get("branch"),
        "head_sha": git_ev.get("git_diff_sha"),
        "stdout": "",
        "stderr": "",
        "exit_code": 0,
        "sha": git_ev.get("git_diff_sha"),
        "pr_number": None,
    })
    _print_ok(f"init {args.task_id} → CREATED ({p})")
    return 0


def _simple_transition_cmd(args: argparse.Namespace, target: str,
                            meta: dict[str, Any] | None = None) -> int:
    state = _load(args.task_id)
    _transition(state, target, actor=_actor(), meta=meta)
    _save(state)
    _print_ok(f"{args.task_id}: → {target}")
    return 0


def cmd_dispatch(args: argparse.Namespace) -> int:
    return _simple_transition_cmd(args, "DISPATCHED")


def cmd_ack(args: argparse.Namespace) -> int:
    return _simple_transition_cmd(args, "ACKED")


def cmd_run(args: argparse.Namespace) -> int:
    state = _load(args.task_id)
    src = state["current_state"]
    # ACKED / WORKTREE_READY / DISPATCHED 상태에서 RUNNING 가능
    if src in {"ACKED", "WORKTREE_READY", "DISPATCHED"}:
        _transition(state, "RUNNING", actor=_actor())
        _save(state)
        _print_ok(f"{args.task_id}: → RUNNING")
        return 0
    # 기타 allowed 전이 시도
    _transition(state, "RUNNING", actor=_actor())
    _save(state)
    _print_ok(f"{args.task_id}: → RUNNING")
    return 0


def cmd_worktree_ready(args: argparse.Namespace) -> int:
    """worktree-ready: CREATED/DISPATCHED/ACKED → WORKTREE_READY."""
    return _simple_transition_cmd(args, "WORKTREE_READY")


def cmd_handoff(args: argparse.Namespace) -> int:
    """handoff: RUNNING → HANDOFF_READY."""
    state = _load(args.task_id)
    if state["current_state"] != "RUNNING":
        _die(f"handoff 불가: 현재 상태={state['current_state']} (RUNNING 필요)", 1)
    _transition(state, "HANDOFF_READY", actor=_actor())
    _save(state)
    _print_ok(f"{args.task_id}: → HANDOFF_READY")
    return 0


def cmd_commit(args: argparse.Namespace) -> int:
    """commit: RUNNING/HANDOFF_READY → COMMITTED + commit.json evidence."""
    state = _load(args.task_id)
    src = state["current_state"]
    if src not in {"RUNNING", "HANDOFF_READY"}:
        _die(f"commit 불가: 현재 상태={src} (RUNNING 또는 HANDOFF_READY 필요)", 1)
    git_ev = _collect_git_evidence()
    head_sha = git_ev.get("git_diff_sha") or ""
    # commit evidence
    _save_evidence(args.task_id, "commit", {
        "git_diff_sha": head_sha,
        "changed_paths": git_ev.get("changed_paths", []),
        "commit_hash": head_sha,
        "branch": git_ev.get("branch"),
        "stdout": "",
        "stderr": "",
        "exit_code": 0,
        "sha": head_sha,
        "pr_number": state["evidence"].get("pr_number"),
    })
    state["evidence"]["git_diff_sha"] = head_sha
    state["evidence"]["branch"] = git_ev.get("branch")
    state["evidence"]["changed_paths"] = git_ev.get("changed_paths", [])
    _transition(state, "COMMITTED", actor=_actor())
    _save(state)
    _print_ok(f"{args.task_id}: → COMMITTED (sha={head_sha[:8]})")
    return 0


def cmd_pr_open(args: argparse.Namespace) -> int:
    """pr-open: COMMITTED → PR_OPEN.
    --pr <int>: 기존 PR 번호 입력
    --auto: bot token으로 직접 PR 생성
    """
    state = _load(args.task_id)
    src = state["current_state"]

    # RUNNING에서도 PR_OPEN 허용 (기존 호환)
    if src not in {"COMMITTED", "RUNNING", "HANDOFF_READY", "PR_OPEN"}:
        _die(f"pr-open 불가: 현재 상태={src} (COMMITTED 또는 RUNNING 필요)", 1)

    pr_number: int | None = None
    pr_url: str = ""
    pr_author: str = ""
    stdout_str: str = ""
    stderr_str: str = ""
    exit_code: int = 0
    created_by: str = "manual"

    git_ev = _collect_git_evidence()
    head_sha = git_ev.get("git_diff_sha") or ""
    branch = git_ev.get("branch") or ""

    if getattr(args, "auto", False):
        # --auto: bot token으로 PR 생성
        bot_token = _load_bot_token()
        if not bot_token:
            print(
                "[taskctl] WARNING: BOT_GITHUB_TOKEN 미설정 — 현재 사용자 token으로 PR 생성 (ESCALATED 후보)",
                file=sys.stderr,
            )
        # PR 제목/본문 구성
        short_desc = args.task_id
        title = f"[{args.task_id}] {short_desc}"
        body = f"Task {args.task_id} PR (auto-created by taskctl pr-open)"
        env_override: dict[str, str] = {}
        if bot_token:
            env_override["GH_TOKEN"] = bot_token
            created_by = "bot"
        # gh pr create 호출
        create_cmd = [
            "gh", "pr", "create",
            "--title", title,
            "--body", body,
        ]
        proc = _run(create_cmd, env=env_override, timeout=60)
        stdout_str = proc.stdout.strip()
        stderr_str = proc.stderr.strip()
        exit_code = proc.returncode
        if proc.returncode != 0:
            # PR already exists → find it
            find_proc = _run(
                ["gh", "pr", "view", branch, "--json", "number,url,author"],
                env=env_override, timeout=30,
            )
            if find_proc.returncode == 0:
                try:
                    pr_info = json.loads(find_proc.stdout)
                    pr_number = pr_info.get("number")
                    pr_url = pr_info.get("url", "")
                    pr_author = (pr_info.get("author") or {}).get("login", "")
                    exit_code = 0
                except Exception:
                    pass
            if pr_number is None:
                _save_evidence(args.task_id, "pr-open", {
                    "error": f"PR 생성 실패: {stderr_str}",
                    "exit_code": proc.returncode,
                    "stdout": stdout_str,
                    "stderr": stderr_str,
                    "sha": head_sha,
                    "pr_number": None,
                })
                _die(f"PR 생성 실패: {stderr_str}", 1)
        else:
            # PR URL 파싱
            pr_url = stdout_str
            # PR 번호 조회
            view_proc = _run(
                ["gh", "pr", "view", branch, "--json", "number,url,author"],
                env=env_override, timeout=30,
            )
            if view_proc.returncode == 0:
                try:
                    vi = json.loads(view_proc.stdout)
                    pr_number = vi.get("number")
                    pr_url = vi.get("url", pr_url)
                    pr_author = (vi.get("author") or {}).get("login", "")
                except Exception:
                    pass
        # bot token 없으면 ESCALATED 후보 표시
        if not bot_token:
            state["evidence"]["bot_token_missing"] = True
            state["evidence"]["escalated_candidate"] = True
            state["evidence"]["pr_author"] = "current_user"
    else:
        # --pr: 기존 PR 번호 입력
        if not getattr(args, "pr", None):
            _die("pr-open: --pr <번호> 또는 --auto 필요", 1)
        pr_number = args.pr
        created_by = "manual"

    if pr_number:
        state["evidence"]["pr_number"] = pr_number
    _transition(state, "PR_OPEN", actor=_actor(), meta={"pr": pr_number})

    # base_sha (origin/main HEAD)
    base_sha = _run(["git", "rev-parse", "origin/main"]).stdout.strip() or ""

    _save_evidence(args.task_id, "pr-open", {
        "task_id": args.task_id,
        "branch": branch,
        "base_sha": base_sha,
        "head_sha": head_sha,
        "pr_number": pr_number,
        "pr_url": pr_url,
        "pr_author": pr_author or created_by,
        "created_by": created_by,
        "stdout": stdout_str,
        "stderr": stderr_str,
        "exit_code": exit_code,
        "sha": head_sha,
    })
    _save(state)
    _print_ok(f"{args.task_id}: → PR_OPEN (PR #{pr_number})")
    return 0


def cmd_ci_check(args: argparse.Namespace) -> int:
    """ci-check: PR_OPEN → CI_PENDING. 8 required CI checks 수집 + ci.json."""
    state = _load(args.task_id)
    src = state["current_state"]
    if src != "PR_OPEN":
        _die(f"ci-check 불가: 현재 상태={src} (PR_OPEN 필요)", 1)
    pr_n = state["evidence"].get("pr_number")
    pr_ev = _collect_pr_evidence(pr_n)
    state["evidence"]["pr_state"] = pr_ev["pr_state"]
    state["evidence"]["ci_checks"] = pr_ev["ci_checks"]
    git_ev = _collect_git_evidence()
    head_sha = git_ev.get("git_diff_sha") or ""
    _save_evidence(args.task_id, "ci", {
        "pr_number": pr_n,
        "ci_checks": pr_ev["ci_checks"],
        "all_pass": _all_required_checks_pass(pr_ev["ci_checks"]),
        "head_sha": head_sha,
        "stdout": "",
        "stderr": pr_ev.get("pr_view_error", ""),
        "exit_code": 0,
        "sha": head_sha,
    })
    _transition(state, "CI_PENDING", actor=_actor(), meta={"pr": pr_n})
    _save(state)
    _print_ok(f"{args.task_id}: → CI_PENDING (PR #{pr_n})")
    return 0


def cmd_gemini_evidence(args: argparse.Namespace) -> int:
    """gemini-evidence: CI_PENDING → GEMINI_PENDING. evaluate_gate 호출 + gemini.json."""
    state = _load(args.task_id)
    src = state["current_state"]
    if src != "CI_PENDING":
        _die(f"gemini-evidence 불가: 현재 상태={src} (CI_PENDING 필요)", 1)
    pr_n = state["evidence"].get("pr_number")
    git_ev = _collect_git_evidence()
    head_sha = git_ev.get("git_diff_sha") or ""
    repo = os.environ.get("GH_REPO", "Jeon-Jonghyuk/dev_workspace")
    gemini_result: dict[str, Any] = {"hold_block_pass": "SKIP", "head_sha": head_sha}
    # evaluate_gate 호출 (graceful skip)
    if GEMINI_EVIDENCE_VERIFY.exists():
        scripts_dir = str(WORKSPACE / "scripts")
        if scripts_dir not in sys.path:
            sys.path.insert(0, scripts_dir)
        try:
            from gemini_evidence_verify import evaluate_gate  # type: ignore[import]
            gate_result = evaluate_gate(pr_n, head_sha, repo)
            if isinstance(gate_result, dict):
                gemini_result.update(gate_result)
            else:
                gemini_result["raw_result"] = str(gate_result)
        except Exception as exc:
            print(f"[taskctl] WARNING: evaluate_gate 호출 실패 (graceful skip): {exc}", file=sys.stderr)
            gemini_result["warning"] = f"evaluate_gate 실패: {exc}"
    else:
        print(
            f"[taskctl] WARNING: {GEMINI_EVIDENCE_VERIFY} 없음 — gemini gate graceful skip",
            file=sys.stderr,
        )
        gemini_result["warning"] = "gemini_evidence_verify.py 없음"
    _save_evidence(args.task_id, "gemini", {
        "pr_number": pr_n,
        "head_sha": head_sha,
        "app_slug": "gemini-code-assist",
        "severity_count": gemini_result.get("severity_count", 0),
        "hold_block_pass": gemini_result.get("hold_block_pass", "SKIP"),
        "stdout": "",
        "stderr": gemini_result.get("warning", ""),
        "exit_code": 0,
        "sha": head_sha,
    })
    _transition(state, "GEMINI_PENDING", actor=_actor(), meta={"pr": pr_n})
    _save(state)
    _print_ok(f"{args.task_id}: → GEMINI_PENDING")
    return 0


def cmd_review_ready(args: argparse.Namespace) -> int:
    """review-ready: GEMINI_PENDING → REVIEW_READY."""
    state = _load(args.task_id)
    src = state["current_state"]
    if src != "GEMINI_PENDING":
        _die(f"review-ready 불가: 현재 상태={src} (GEMINI_PENDING 필요)", 1)
    _transition(state, "REVIEW_READY", actor=_actor())
    _save(state)
    _print_ok(f"{args.task_id}: → REVIEW_READY")
    return 0


# ---------------------------------------------------------------------------
# verify: evidence 자동 수집 + guard.sh + qc_report_guard
# ---------------------------------------------------------------------------


def cmd_verify(args: argparse.Namespace) -> int:
    state = _load(args.task_id)
    src = state["current_state"]
    if src in TERMINAL_STATES:
        _die(f"verify 불가: 현재 상태={src} (terminal)", 1)

    git_ev = _collect_git_evidence()
    state["evidence"]["branch"] = git_ev["branch"]
    state["evidence"]["git_diff_sha"] = git_ev["git_diff_sha"]
    state["evidence"]["changed_paths"] = git_ev["changed_paths"]
    head_sha = git_ev.get("git_diff_sha") or ""

    pr_n = state["evidence"].get("pr_number")
    pr_ev = _collect_pr_evidence(pr_n)
    state["evidence"]["pr_state"] = pr_ev["pr_state"]
    state["evidence"]["ci_checks"] = pr_ev["ci_checks"]
    state["evidence"]["mergeable"] = pr_ev.get("mergeable")
    state["evidence"]["merge_state_status"] = pr_ev.get("merge_state_status")
    if "pr_view_error" in pr_ev:
        state["evidence"]["pr_view_error"] = pr_ev["pr_view_error"]

    guard_ev = _run_guard_sh("pre-push", args.task_id)
    state["evidence"]["guard_sh_result"] = guard_ev["result"]
    state["evidence"]["guard_sh_detail"] = guard_ev

    qc_ev = _run_qc_report_guard(args.task_id)
    state["evidence"]["qc_report_guard_result"] = qc_ev["result"]
    state["evidence"]["qc_report_guard_detail"] = qc_ev

    state["evidence"]["exit_codes"]["verify_guard_sh"] = guard_ev["exit_code"]
    state["evidence"]["exit_codes"]["verify_qc_report_guard"] = qc_ev["exit_code"]

    # hidden path audit
    audit_result = _hidden_path_audit_internal()

    all_pass = (
        guard_ev["result"] == "PASS"
        and qc_ev["result"] == "PASS"
    )

    # verify.json evidence
    _save_evidence(args.task_id, "verify", {
        "guard_sh_result": guard_ev["result"],
        "qc_report_guard_result": qc_ev["result"],
        "hidden_path_audit": audit_result,
        "head_sha": head_sha,
        "stdout": guard_ev.get("stdout_tail", ""),
        "stderr": guard_ev.get("stderr_tail", ""),
        "exit_code": 0 if all_pass else 1,
        "sha": head_sha,
        "pr_number": pr_n,
    })

    if all_pass:
        # VERIFIED 전이 (GUARD_PASS alias 유지)
        # REVIEW_READY / PR_OPEN / GUARD_PASS 에서 VERIFIED 가능
        if src in {"REVIEW_READY", "PR_OPEN", "GUARD_PASS", "GEMINI_PENDING", "RUNNING",
                   "DISPATCHED", "ACKED", "WORKTREE_READY", "HANDOFF_READY", "COMMITTED",
                   "CI_PENDING"}:
            # 구버전 호환: GUARD_PASS로 전이
            _transition(state, "GUARD_PASS", actor=_actor(),
                        meta={"guard_sh": "PASS", "qc_report_guard": "PASS"})
        else:
            _transition(state, "GUARD_PASS", actor=_actor(),
                        meta={"guard_sh": "PASS", "qc_report_guard": "PASS"})
        state["evidence"]["exit_codes"]["verify"] = 0
        _save(state)
        _print_ok(f"{args.task_id}: verify PASS → GUARD_PASS (= VERIFIED)")
        if args.machine:
            print(json.dumps(state, ensure_ascii=False, indent=2))
        return 0

    if src in {"GUARD_PASS", "HUMAN_APPROVED", "VERIFIED"}:
        _transition(state, "PR_OPEN", actor=_actor(),
                    meta={"reason": "verify failed, demoting to PR_OPEN"})
    state["evidence"]["exit_codes"]["verify"] = 1
    _save(state)
    fail_reasons: list[str] = []
    if guard_ev["result"] != "PASS":
        fail_reasons.append(f"guard.sh={guard_ev['result']}")
    if qc_ev["result"] != "PASS":
        fail_reasons.append(f"qc_report_guard={qc_ev['result']}")
    _print_ok(f"{args.task_id}: verify FAIL [{', '.join(fail_reasons)}]")
    if args.machine:
        print(json.dumps(state, ensure_ascii=False, indent=2))
    return 1


# ---------------------------------------------------------------------------
# approve
# ---------------------------------------------------------------------------


def cmd_approve(args: argparse.Namespace) -> int:
    """approve: VERIFIED 또는 GUARD_PASS → HUMAN_APPROVED. self-approve 차단."""
    state = _load(args.task_id)
    src = state["current_state"]
    if src not in {"GUARD_PASS", "VERIFIED"}:
        _die(
            f"approve 불가: 현재 상태={src} (GUARD_PASS 또는 VERIFIED 필요)",
            1,
        )
    # approver 결정
    approver = getattr(args, "by", None) or _actor()
    # self-approve 차단
    pr_n = state["evidence"].get("pr_number")
    pr_author: str | None = None
    if pr_n:
        pr_data = _gh_pr_view(pr_n)
        pr_author = (pr_data.get("author") or {}).get("login") or None
    # approver에서 username만 추출 (이메일 포함 가능)
    approver_login = approver.split(" ")[0] if " " in approver else approver
    git_ev = _collect_git_evidence()
    head_sha = git_ev.get("git_diff_sha") or ""
    if pr_author and pr_author == approver_login:
        _save_evidence(args.task_id, "approval", {
            "result": "FAIL",
            "reason": "self-approve detected",
            "pr_author": pr_author,
            "approver": approver_login,
            "stdout": "",
            "stderr": "self-approve 차단",
            "exit_code": 1,
            "sha": head_sha,
            "pr_number": pr_n,
        })
        _transition(state, "ESCALATED", actor=_actor(),
                    meta={"reason": "self-approve"})
        _save(state)
        _die(f"self-approve 차단: PR author == approver ({pr_author})", 1)

    state["human_approved"] = True
    # approval evidence
    _save_evidence(args.task_id, "approval", {
        "pr_author": pr_author or "unknown",
        "approver": approver_login,
        "result": "PASS",
        "ci_pass": _all_required_checks_pass(state["evidence"].get("ci_checks", {})),
        "gemini_pass": True,  # graceful
        "head_sha": head_sha,
        "stdout": "",
        "stderr": "",
        "exit_code": 0,
        "sha": head_sha,
        "pr_number": pr_n,
        "ts": _now(),
    })
    _transition(state, "HUMAN_APPROVED", actor=_actor(),
                meta={"approver": approver_login})
    state["evidence"]["exit_codes"]["approve"] = 0
    _save(state)
    _print_ok(f"{args.task_id}: → HUMAN_APPROVED (by: {approver_login})")
    return 0


def cmd_cancel(args: argparse.Namespace) -> int:
    state = _load(args.task_id)
    _transition(state, "CANCELLED", actor=_actor(),
                meta={"reason": getattr(args, "reason", None)})
    _save(state)
    _print_ok(f"{args.task_id}: → CANCELLED")
    return 0


def cmd_fail(args: argparse.Namespace) -> int:
    state = _load(args.task_id)
    _transition(state, "FAILED", actor=_actor(),
                meta={"reason": args.reason})
    _save(state)
    _print_ok(f"{args.task_id}: → FAILED ({args.reason})")
    return 0


def cmd_status(args: argparse.Namespace) -> int:
    state = _load(args.task_id, allow_missing=True)
    if state is None:
        if args.machine:
            print(json.dumps({"task_id": args.task_id, "current_state": None,
                              "missing": True}, ensure_ascii=False))
        else:
            print(f"[taskctl] {args.task_id}: state file missing")
        return 0
    if args.machine:
        print(json.dumps(state, ensure_ascii=False, indent=2))
    else:
        print(f"task: {state['task_id']}")
        print(f"state: {state['current_state']}")
        print(f"transitions: {len(state['transitions'])}")
        ev = state["evidence"]
        print(f"branch: {ev.get('branch')}")
        print(f"pr: {ev.get('pr_number')}")
        print(f"guard_sh: {ev.get('guard_sh_result')}")
        print(f"qc_report_guard: {ev.get('qc_report_guard_result')}")
        print(f"human_approved: {state['human_approved']}")
        print(f"bypass: {state['bypass']}")
    return 0


# ---------------------------------------------------------------------------
# merge: main 진입 단일 경로
# ---------------------------------------------------------------------------


def cmd_merge(args: argparse.Namespace) -> int:
    state = _load(args.task_id)
    bypass = _bypass_active()
    admin = getattr(args, "admin", False)
    dry_run = getattr(args, "dry_run", False)

    if bypass:
        state["bypass"] = {
            "used": True,
            "ts": _now(),
            "actor": _actor(),
        }
        print("★★★ TASKCTL BYPASS USED — Chairman override", file=sys.stderr)
    elif admin:
        # admin override: chairman 검증 + cap 검사
        if not _verify_chairman():
            _die("admin override 차단: 회장(chairman) 자격 검증 실패", 1)
        soft, hard, cap_status = _check_admin_cap()
        if cap_status == "HARD_CAP_EXCEEDED":
            _die(
                f"admin override 차단: HARD_CAP_EXCEEDED "
                f"(분기 {hard}회 사용 ≥ 5회)",
                1,
            )
        if cap_status == "SOFT_CAP_WARNING":
            print(
                f"[taskctl] WARNING: SOFT_CAP_WARNING — 이번 달 {soft}회 사용 (soft limit 3회)",
                file=sys.stderr,
            )
    else:
        # 1) HUMAN_APPROVED 검사
        if state["current_state"] != "HUMAN_APPROVED":
            state["evidence"]["exit_codes"]["merge"] = 1
            _save(state)
            _die(
                f"merge 차단: 현재 상태={state['current_state']} "
                f"(HUMAN_APPROVED 필요. 'taskctl approve {args.task_id}' 먼저)",
                1,
            )
        # 2) CANCELLED 검사
        if state["current_state"] == "CANCELLED":
            state["evidence"]["exit_codes"]["merge"] = 1
            _save(state)
            _die("merge 차단: state=CANCELLED", 1)
        # 3) BLOCKED 상태 검사 (금지 전이 6번)
        if state["current_state"] == "BLOCKED":
            state["evidence"]["exit_codes"]["merge"] = 1
            _save(state)
            _die("merge 차단: state=BLOCKED (HOLD/CONFLICT 시 머지 차단)", 1)
        # 4) guard.sh pre-push 재실행
        guard_ev = _run_guard_sh("pre-push", args.task_id)
        state["evidence"]["guard_sh_result"] = guard_ev["result"]
        state["evidence"]["guard_sh_detail_merge"] = guard_ev
        if guard_ev["result"] != "PASS":
            state["evidence"]["exit_codes"]["merge"] = 1
            _save(state)
            _die(
                f"merge 차단: guard.sh pre-push FAIL "
                f"(exit={guard_ev['exit_code']})",
                1,
            )
        # 5) qc_report_guard 재실행
        qc_ev = _run_qc_report_guard(args.task_id)
        state["evidence"]["qc_report_guard_result"] = qc_ev["result"]
        state["evidence"]["qc_report_guard_detail_merge"] = qc_ev
        if qc_ev["result"] != "PASS":
            state["evidence"]["exit_codes"]["merge"] = 1
            _save(state)
            _die(
                f"merge 차단: qc_report_guard FAIL "
                f"(exit={qc_ev['exit_code']})",
                1,
            )
        # 6) PR view 검증
        pr_n = state["evidence"].get("pr_number")
        if pr_n:
            pr_ev = _collect_pr_evidence(pr_n)
            state["evidence"]["pr_state"] = pr_ev["pr_state"]
            state["evidence"]["ci_checks"] = pr_ev["ci_checks"]
            state["evidence"]["mergeable"] = pr_ev.get("mergeable")
            state["evidence"]["merge_state_status"] = pr_ev.get("merge_state_status")
            if pr_ev.get("mergeable") not in {"MERGEABLE", None}:
                if pr_ev.get("mergeable") is not None:
                    state["evidence"]["exit_codes"]["merge"] = 1
                    _save(state)
                    _die(
                        f"merge 차단: PR mergeable={pr_ev.get('mergeable')}",
                        1,
                    )
            if not _all_required_checks_pass(pr_ev["ci_checks"]):
                state["evidence"]["exit_codes"]["merge"] = 1
                _save(state)
                _die(
                    f"merge 차단: 8 required CI checks 미통과 — "
                    f"{pr_ev['ci_checks']}",
                    1,
                )

    # PR 번호 확인
    pr_n = state["evidence"].get("pr_number")
    if not pr_n:
        state["evidence"]["exit_codes"]["merge"] = 1
        _save(state)
        _die("merge 차단: PR 번호 없음 (taskctl pr-open 먼저)", 1)

    git_ev = _collect_git_evidence()
    head_sha = git_ev.get("git_diff_sha") or ""

    if dry_run:
        # dry-run: actual gh pr merge 호출 skip
        merge_ts = _now()
        state["evidence"]["merge_timestamp"] = merge_ts
        state["evidence"]["merge_dry_run"] = True
        state["evidence"]["exit_codes"]["merge"] = 0
        # HUMAN_APPROVED → MERGING → MERGED (dry-run)
        if state["current_state"] == "HUMAN_APPROVED" or bypass or admin:
            _transition(state, "MERGING", actor=_actor(),
                        meta={"dry_run": True, "bypass": bypass, "admin": admin},
                        force=bypass)
            _save(state)
            _transition(state, "MERGED", actor=_actor(),
                        meta={"dry_run": True},
                        force=bypass)
        else:
            _transition(state, "MERGED", actor=_actor(),
                        meta={"dry_run": True, "bypass": bypass},
                        force=bypass)
        _save_evidence(args.task_id, "merge", {
            "command": " ".join(sys.argv),
            "exit_code": 0,
            "merge_commit_sha": "dry_run",
            "pr_state": "dry_run",
            "head_sha": head_sha,
            "stdout": "",
            "stderr": "",
            "sha": head_sha,
            "pr_number": pr_n,
            "dry_run": True,
        })
        _save(state)
        _print_ok(f"{args.task_id}: dry-run merge → MERGING → MERGED (PR #{pr_n})")
        return 0

    # HUMAN_APPROVED → MERGING 전이 (admin override 포함)
    if state["current_state"] == "HUMAN_APPROVED":
        _transition(state, "MERGING", actor=_actor(),
                    meta={"bypass": bypass, "admin": admin},
                    force=bypass or admin)
        _save(state)
    elif bypass or admin:
        # bypass/admin: 현재 상태에서 force MERGING
        _transition(state, "MERGING", actor=_actor(),
                    meta={"bypass": bypass, "admin": admin},
                    force=True)
        _save(state)

    # admin override: audit log 기록
    if admin:
        reason = getattr(args, "reason", "admin override") or "admin override"
        _record_admin_override(
            task_id=args.task_id,
            pr_number=pr_n,
            reason=reason,
            head_sha=head_sha,
            bypassed_checks=["self_approve_constraint"],
        )
        state["admin_override"] = {
            "used": True,
            "ts": _now(),
            "actor": "chairman",
            "reason": reason,
            "audit_log_offset": None,
        }
        _save(state)

    # 실제 머지: gh pr merge 호출 (본 코드베이스 유일 호출 지점)
    repo = os.environ.get("GH_REPO", "Jeon-Jonghyuk/dev_workspace")
    merge_cmd = ["gh", "pr", "merge", str(pr_n), "--merge", "--delete-branch", "--repo", repo]
    if admin:
        merge_cmd.append("--admin")
    proc = _run(
        merge_cmd,
        env={"TASKCTL_INVOKED": "1"},
        timeout=120,
    )
    state["evidence"]["merge_subprocess"] = {
        "exit_code": proc.returncode,
        "stdout_tail": proc.stdout[-500:],
        "stderr_tail": proc.stderr[-500:],
    }
    merge_commit_sha = ""
    if proc.returncode != 0:
        state["evidence"]["exit_codes"]["merge"] = proc.returncode
        _transition(state, "FAILED", actor=_actor(),
                    meta={"reason": f"gh pr merge exit={proc.returncode}"},
                    force=True)
        _save_evidence(args.task_id, "merge", {
            "command": " ".join(sys.argv),
            "exit_code": proc.returncode,
            "merge_commit_sha": "",
            "pr_state": "FAILED",
            "head_sha": head_sha,
            "stdout": proc.stdout[-500:],
            "stderr": proc.stderr[-500:],
            "sha": head_sha,
            "pr_number": pr_n,
        })
        _save(state)
        _die(
            f"merge 실패: gh pr merge exit={proc.returncode} "
            f"stderr={proc.stderr[-300:]}",
            proc.returncode or 1,
        )

    merge_ts = _now()
    state["evidence"]["merge_timestamp"] = merge_ts
    state["evidence"]["exit_codes"]["merge"] = 0

    # merge commit SHA 확인 (gh api)
    owner_repo = repo
    api_proc = _run(
        ["gh", "api", f"/repos/{owner_repo}/commits/main"],
        timeout=30,
    )
    if api_proc.returncode == 0:
        try:
            api_data = json.loads(api_proc.stdout)
            merge_commit_sha = api_data.get("sha", "")
        except Exception:
            merge_commit_sha = ""

    # MERGING → MERGED
    _transition(state, "MERGED", actor=_actor(),
                meta={"pr": pr_n, "bypass": bypass, "admin": admin,
                      "merge_commit_sha": merge_commit_sha},
                force=bypass or admin)

    # admin override 시 ADMIN_OVERRIDE_USED로 별도 박제
    if admin:
        _transition(state, "ADMIN_OVERRIDE_USED", actor=_actor(),
                    meta={"reason": "admin override used"},
                    force=True)

    _save_evidence(args.task_id, "merge", {
        "command": " ".join(sys.argv),
        "exit_code": 0,
        "merge_commit_sha": merge_commit_sha,
        "pr_state": "MERGED",
        "head_sha": head_sha,
        "stdout": proc.stdout[-500:],
        "stderr": proc.stderr[-500:],
        "sha": head_sha,
        "pr_number": pr_n,
        "admin_override": admin,
    })
    _save(state)
    if admin:
        _print_ok(f"{args.task_id}: admin-override merge PR #{pr_n} → MERGING → MERGED → ADMIN_OVERRIDE_USED")
    else:
        _print_ok(f"{args.task_id}: merged PR #{pr_n} → MERGING → MERGED (taskctl done 필요)")
    return 0


# ---------------------------------------------------------------------------
# done: MERGED → DONE + .done 파일 생성
# ---------------------------------------------------------------------------


def cmd_done(args: argparse.Namespace) -> int:
    """done: state == MERGED 검증 + .done 생성 → DONE 전이."""
    state = _load(args.task_id)
    if state["current_state"] != "MERGED":
        _die(
            f"done 불가: 현재 상태={state['current_state']} (MERGED 필요)",
            1,
        )
    # origin/main에 merge commit 존재 확인
    repo = os.environ.get("GH_REPO", "Jeon-Jonghyuk/dev_workspace")
    merge_commit_sha = ""
    api_proc = _run(
        ["gh", "api", f"/repos/{repo}/commits/main"],
        timeout=30,
    )
    if api_proc.returncode == 0:
        try:
            api_data = json.loads(api_proc.stdout)
            merge_commit_sha = api_data.get("sha", "")
        except Exception:
            merge_commit_sha = ""

    # .done 파일 생성
    events_dir = WORKSPACE / "memory" / "events"
    events_dir.mkdir(parents=True, exist_ok=True)
    done_file = events_dir / f"{args.task_id}.done"
    done_file.write_text(
        json.dumps({"task_id": args.task_id, "ts": _now(),
                    "merge_commit_sha": merge_commit_sha}, ensure_ascii=False, indent=2),
        encoding="utf-8",
    )

    # done.json evidence
    _save_evidence(args.task_id, "done", {
        "merge_commit_sha": merge_commit_sha,
        "done_path": str(done_file),
        "ts": _now(),
        "stdout": "",
        "stderr": "",
        "exit_code": 0,
        "sha": merge_commit_sha,
        "pr_number": state["evidence"].get("pr_number"),
    })

    _transition(state, "DONE", actor=_actor())
    _save(state)
    _print_ok(f"{args.task_id}: → DONE (.done: {done_file})")
    return 0


# ---------------------------------------------------------------------------
# audit-hidden-paths: repo 전체에서 우회 경로 grep
# ---------------------------------------------------------------------------


def _hidden_path_audit_internal() -> dict[str, Any]:
    """repo 전체 grep으로 금지 패턴 검색. PASS/FAIL 반환.

    회장 §3.8: 주석/docstring/문자열 단순 언급은 violation 아님. 실제 subprocess 호출만 차단.
    code_call_markers: subprocess.run / _run([ / 리스트 형태로 인자 전달 — 실제 호출 패턴.
    """
    forbidden_patterns = [
        "gh pr create",
        "gh pr merge",
        "git push origin main",
        "git push --force origin main",
        "worktree_manager finish --action merge",
    ]
    excludes = [".git", "tests", "fixtures", ".worktrees", "node_modules", "__pycache__"]
    # 실제 코드 호출 패턴 (주석/docstring/error message는 제외)
    code_call_markers = (
        'subprocess.', '_run(', '_run([',
        '"gh", "pr", "create"', "'gh', 'pr', 'create'",
        '"gh", "pr", "merge"', "'gh', 'pr', 'merge'",
        'check_call', 'check_output', 'Popen(',
    )
    violations: list[str] = []
    warnings: list[str] = []
    for pattern in forbidden_patterns:
        try:
            proc = subprocess.run(
                ["git", "grep", "-n", pattern],
                cwd=str(WORKSPACE),
                capture_output=True, text=True, timeout=30,
            )
            if proc.returncode == 0:
                for line in proc.stdout.splitlines():
                    parts = line.split(":", 2)
                    path = parts[0] if parts else line
                    content = parts[2] if len(parts) >= 3 else ""
                    # taskctl.py 내부 (유일한 머지 진입점)
                    if "scripts/taskctl.py" in path:
                        continue
                    # 제외 경로
                    if any(exc in path for exc in excludes):
                        continue
                    # .md/.txt/.json/.log/.yml 제외
                    if path.endswith((".md", ".txt", ".json", ".log", ".jsonl", ".yml", ".yaml")):
                        continue
                    # 주석/docstring/f-string error message 등은 violation 아님 (warning)
                    stripped = content.strip()
                    # 코멘트/리스트/docstring 시작 패턴 — 주석으로 간주
                    is_comment = (
                        stripped.startswith("#")
                        or stripped.startswith("- ")
                        or stripped.startswith("* ")
                        or stripped.startswith('"""')
                        or stripped.startswith("'''")
                    )
                    has_call_marker = any(m in content for m in code_call_markers)
                    if is_comment or not has_call_marker:
                        warnings.append(line)
                        continue
                    violations.append(line)
        except Exception as exc:
            violations.append(f"audit_error: {exc}")
    result = "PASS" if not violations else "FAIL"
    return {"result": result, "violations": violations, "warnings": warnings}


def cmd_audit_hidden_paths(args: argparse.Namespace) -> int:
    """audit-hidden-paths: repo 전체 grep으로 우회 경로 감지."""
    del args  # subcommand 인자 미사용
    result = _hidden_path_audit_internal()
    if result["result"] == "PASS":
        _print_ok("hidden-path-audit PASS: 우회 경로 0건")
        return 0
    else:
        print(f"[taskctl] hidden-path-audit FAIL: {len(result['violations'])}건 발견",
              file=sys.stderr)
        for v in result["violations"]:
            print(f"  VIOLATION: {v}", file=sys.stderr)
        return 1


# ---------------------------------------------------------------------------
# takeover: handoff evidence 기반 봇 인계
# ---------------------------------------------------------------------------


def cmd_takeover(args: argparse.Namespace) -> int:
    """takeover: handoff 검증 → 새 worktree+branch 생성 → start_task_guard 자동 호출 → evidence 저장."""
    task_id: str = args.task_id
    from_branch: str = args.from_branch
    new_bot: str = args.bot

    # validate_handoff 모듈 import (scripts/ 디렉토리 sys.path 추가)
    scripts_dir = str(WORKSPACE / "scripts")
    if scripts_dir not in sys.path:
        sys.path.insert(0, scripts_dir)
    try:
        from validate_handoff import validate_handoff  # type: ignore[import]
    except ImportError as exc:
        _die(
            f"validate_handoff 모듈 import 실패: {exc}\n"
            f"  {scripts_dir}/validate_handoff.py 파일이 있는지 확인하세요.",
            2,
        )

    evidence_dir_early = WORKSPACE / ".tasks" / "evidence" / task_id
    now_utc = datetime.now(timezone.utc)
    ts_filename = now_utc.strftime("%Y%m%dT%H%M%SZ")
    ts_iso = now_utc.strftime("%Y-%m-%dT%H:%M:%SZ")

    def _save_early_failure(reason: str, failure_check: str,
                            handoff_data_partial: dict | None = None) -> None:
        try:
            evidence_dir_early.mkdir(parents=True, exist_ok=True)
            ev = {
                "task_id": task_id,
                "previous_bot": (handoff_data_partial or {}).get("previous_bot"),
                "new_bot": new_bot,
                "from_branch": from_branch,
                "new_branch": f"task/{task_id}-{new_bot}",
                "started_at": ts_iso,
                "ts_filename": ts_filename,
                "takeover_status": "failed",
                "failure_reason": reason,
                "failure_check": failure_check,
            }
            _atomic_write(evidence_dir_early / f"takeover-{ts_filename}.json", ev)
        except Exception:
            pass

    handoff_path = WORKSPACE / "memory" / "handoffs" / f"{task_id}.json"
    if not handoff_path.exists():
        _save_early_failure(f"handoff 파일 없음: {handoff_path}", "1_handoff_exists")
        _die(f"handoff 파일 없음: {handoff_path}", 1)

    ok, errors, handoff_data = validate_handoff(
        task_id=task_id,
        branch=from_branch,
        check_head_sha=True,
        workspace_root=WORKSPACE,
    )
    if not ok:
        err_lines = "\n  ".join(errors)
        _save_early_failure(f"handoff 검증 실패: {'; '.join(errors)}",
                            "2_validate_handoff",
                            handoff_data)
        _die(f"handoff 검증 실패:\n  {err_lines}", 1)

    proc = _run(["git", "rev-parse", "--verify", from_branch])
    if proc.returncode != 0:
        _save_early_failure(
            f"from-branch '{from_branch}' 존재하지 않음: {proc.stderr.strip()}",
            "3_from_branch_exists",
            handoff_data,
        )
        _die(
            f"from-branch '{from_branch}' 존재하지 않음: {proc.stderr.strip()}",
            1,
        )

    recorded_head = handoff_data.get("head_sha", "")
    new_worktree_path = WORKSPACE / ".worktrees" / f"{task_id}-{new_bot}"
    if new_worktree_path.exists():
        _save_early_failure(
            f"새 worktree 경로 이미 존재: {new_worktree_path}",
            "5_worktree_path_collision",
            handoff_data,
        )
        _die(
            f"새 worktree 경로가 이미 존재합니다: {new_worktree_path}",
            1,
        )

    new_branch = f"task/{task_id}-{new_bot}"
    proc_branch = _run(["git", "rev-parse", "--verify", new_branch])
    if proc_branch.returncode == 0:
        _save_early_failure(
            f"새 branch 이미 존재: {new_branch}",
            "6_branch_collision",
            handoff_data,
        )
        _die(
            f"새 branch '{new_branch}'가 이미 존재합니다. "
            f"다른 bot ID를 사용하거나 기존 branch를 삭제하세요.",
            1,
        )

    proc_main = _run(["git", "rev-parse", "origin/main"])
    if proc_main.returncode != 0:
        _die(f"origin/main HEAD 조회 실패: {proc_main.stderr.strip()}", 1)
    base_sha = proc_main.stdout.strip()

    evidence_dir = WORKSPACE / ".tasks" / "evidence" / task_id
    evidence_file = evidence_dir / f"takeover-{ts_filename}.json"

    def _save_failed_evidence(reason: str) -> None:
        try:
            ev = {
                "task_id": task_id,
                "previous_bot": handoff_data.get("previous_bot"),
                "new_bot": new_bot,
                "base_sha": base_sha,
                "head_sha": recorded_head,
                "handoff_path": str(handoff_path),
                "from_branch": from_branch,
                "new_branch": new_branch,
                "new_worktree_path": str(new_worktree_path),
                "started_at": ts_iso,
                "ts_filename": ts_filename,
                "takeover_status": "failed",
                "failure_reason": reason,
            }
            _atomic_write(evidence_file, ev)
        except Exception:
            pass

    proc_wt = _run([
        "git", "worktree", "add",
        str(new_worktree_path),
        "-b", new_branch,
        "origin/main",
    ])
    if proc_wt.returncode != 0:
        reason = f"git worktree add 실패: {proc_wt.stderr.strip()}"
        _save_failed_evidence(reason)
        _die(reason, 1)

    start_guard_script = new_worktree_path / "scripts" / "start_task_guard.py"
    if not start_guard_script.exists():
        start_guard_script = WORKSPACE / "scripts" / "start_task_guard.py"
    if not start_guard_script.exists():
        _run(["git", "worktree", "remove", "--force", str(new_worktree_path)])
        _run(["git", "branch", "-D", new_branch])
        reason = f"start_task_guard.py 없음: {start_guard_script}"
        _save_failed_evidence(reason)
        _die(reason, 1)

    proc_guard = subprocess.run(
        [sys.executable, str(start_guard_script), "--task", task_id, "--bot", new_bot],
        capture_output=True,
        text=True,
        timeout=120,
        cwd=str(new_worktree_path),
        env={**os.environ, "WORKSPACE_ROOT": str(WORKSPACE)},
    )
    if proc_guard.returncode != 0:
        _run(["git", "worktree", "remove", "--force", str(new_worktree_path)])
        _run(["git", "branch", "-D", new_branch])
        reason = (
            f"start_task_guard 실패 (exit={proc_guard.returncode}): "
            f"{proc_guard.stderr[-300:].strip()}"
        )
        _save_failed_evidence(reason)
        _die(reason, 1)

    evidence = {
        "task_id": task_id,
        "previous_bot": handoff_data.get("previous_bot"),
        "new_bot": new_bot,
        "base_sha": base_sha,
        "head_sha": recorded_head,
        "handoff_path": str(handoff_path),
        "from_branch": from_branch,
        "new_branch": new_branch,
        "new_worktree_path": str(new_worktree_path),
        "started_at": ts_iso,
        "ts_filename": ts_filename,
        "takeover_status": "success",
        "start_guard_stdout": proc_guard.stdout[-500:],
    }
    _atomic_write(evidence_file, evidence)

    _print_ok(
        f"takeover 성공: {task_id}\n"
        f"  previous_bot={handoff_data.get('previous_bot')} → new_bot={new_bot}\n"
        f"  new branch:   {new_branch}\n"
        f"  new worktree: {new_worktree_path}\n"
        f"  evidence:     {evidence_file}"
    )
    return 0


# ---------------------------------------------------------------------------
# argparse
# ---------------------------------------------------------------------------


def build_parser() -> argparse.ArgumentParser:
    p = argparse.ArgumentParser(
        prog="taskctl",
        description=(
            "task 상태 enforcement layer + main 진입 단일 경로 "
            "(task-2467: 14+5 state machine + PR lifecycle)"
        ),
    )
    sub = p.add_subparsers(dest="command", required=True)

    def _add_taskid(sp):
        sp.add_argument("task_id", help="task ID (예: task-2467)")

    # ── 기존 명령 (backwards-compat) ──
    sp = sub.add_parser("init", help="CREATED 상태 생성")
    _add_taskid(sp); sp.set_defaults(func=cmd_init)

    sp = sub.add_parser("dispatch", help="DISPATCHED 전환 (compat alias)")
    _add_taskid(sp); sp.set_defaults(func=cmd_dispatch)

    sp = sub.add_parser("ack", help="ACKED 전환 (compat alias)")
    _add_taskid(sp); sp.set_defaults(func=cmd_ack)

    sp = sub.add_parser("run", help="RUNNING 전환")
    _add_taskid(sp); sp.set_defaults(func=cmd_run)

    sp = sub.add_parser("pr-open", help="PR_OPEN 전환 (bot token PR 생성 또는 PR 번호 입력)")
    _add_taskid(sp)
    sp.add_argument("--pr", type=int, default=None, help="PR 번호 (수동 입력)")
    sp.add_argument("--auto", action="store_true", help="bot token으로 직접 PR 생성")
    sp.set_defaults(func=cmd_pr_open)

    sp = sub.add_parser("verify",
        help="evidence 자동 수집 + guard.sh + qc_report_guard, PASS 시 GUARD_PASS(=VERIFIED)")
    _add_taskid(sp)
    sp.add_argument("--machine", action="store_true", help="state JSON 출력")
    sp.set_defaults(func=cmd_verify)

    sp = sub.add_parser("approve", help="HUMAN_APPROVED 전환 (self-approve 차단)")
    _add_taskid(sp)
    sp.add_argument("--by", default=None, help="승인자 (human login). 미지정 시 _actor() 사용")
    sp.set_defaults(func=cmd_approve)

    sp = sub.add_parser("merge", help="main 진입 단일 경로 (MERGING → MERGED)")
    _add_taskid(sp)
    sp.add_argument("--dry-run", action="store_true",
                    help="actual gh pr merge skip — 상태만 전이")
    sp.add_argument("--admin", action="store_true",
                    help="chairman admin override (audit log 자동 기록)")
    sp.add_argument("--reason", default=None, help="admin override 사유")
    sp.set_defaults(func=cmd_merge)

    sp = sub.add_parser("cancel", help="CANCELLED 전환")
    _add_taskid(sp)
    sp.add_argument("--reason", default=None)
    sp.set_defaults(func=cmd_cancel)

    sp = sub.add_parser("fail", help="FAILED 전환")
    _add_taskid(sp)
    sp.add_argument("--reason", required=True)
    sp.set_defaults(func=cmd_fail)

    sp = sub.add_parser("status", help="현재 상태 + evidence 출력")
    _add_taskid(sp)
    sp.add_argument("--machine", action="store_true", help="JSON 출력")
    sp.set_defaults(func=cmd_status)

    sp = sub.add_parser(
        "takeover",
        help="handoff evidence 기반 봇 인계 — 새 worktree + branch + start_guard 자동",
    )
    _add_taskid(sp)
    sp.add_argument(
        "--from", dest="from_branch", required=True, metavar="BRANCH",
        help="이전 봇의 branch (예: task/task-2458-dev4)",
    )
    sp.add_argument("--bot", required=True, help="새 봇 ID (예: dev5)")
    sp.set_defaults(func=cmd_takeover)

    # ── 신규 명령 (task-2467) ──
    sp = sub.add_parser("worktree-ready", help="WORKTREE_READY 전환 (dispatch/ack alias)")
    _add_taskid(sp); sp.set_defaults(func=cmd_worktree_ready)

    sp = sub.add_parser("handoff", help="HANDOFF_READY 전환 (RUNNING → HANDOFF_READY)")
    _add_taskid(sp); sp.set_defaults(func=cmd_handoff)

    sp = sub.add_parser("commit", help="COMMITTED 전환 (RUNNING/HANDOFF_READY → COMMITTED)")
    _add_taskid(sp); sp.set_defaults(func=cmd_commit)

    sp = sub.add_parser("ci-check", help="CI_PENDING 전환 + 8 required CI checks 수집")
    _add_taskid(sp); sp.set_defaults(func=cmd_ci_check)

    sp = sub.add_parser("gemini-evidence", help="GEMINI_PENDING 전환 + evaluate_gate 호출")
    _add_taskid(sp); sp.set_defaults(func=cmd_gemini_evidence)

    sp = sub.add_parser("review-ready", help="REVIEW_READY 전환 (GEMINI_PENDING → REVIEW_READY)")
    _add_taskid(sp); sp.set_defaults(func=cmd_review_ready)

    sp = sub.add_parser("done", help="DONE 전환 (MERGED → DONE + .done 파일 생성)")
    _add_taskid(sp); sp.set_defaults(func=cmd_done)

    sp = sub.add_parser("audit-hidden-paths",
        help="repo 전체 grep으로 우회 경로 감지 (gh pr create/merge, git push origin main)")
    sp.set_defaults(func=cmd_audit_hidden_paths)

    return p


def main(argv: list[str] | None = None) -> int:
    parser = build_parser()
    args = parser.parse_args(argv)
    try:
        rc = args.func(args)
    except SystemExit:
        raise
    except subprocess.TimeoutExpired as exc:
        _die(f"subprocess timeout: {exc}", 2)
    except Exception as exc:
        _die(f"internal error: {type(exc).__name__}: {exc}", 2)
    return rc or 0


if __name__ == "__main__":
    sys.exit(main())
