#!/usr/bin/env python3
"""
taskctl.py — main 진입 단일화 + 상태 enforcement layer (task-2449 MVP)

회장 절대 기준:
    "taskctl을 거치지 않고는 main을 절대 변경할 수 없다."

상태 모델 (11종):
    CREATED → DISPATCHED → ACKED → RUNNING → PR_OPEN → GUARD_PASS
        → HUMAN_APPROVED → MERGED → DONE
                                ↘ CANCELLED
                                ↘ FAILED

저장 위치:
    {WORKSPACE}/.tasks/state/<task-id>.json

bypass (회장 전용):
    TASKCTL_BYPASS=1 환경변수로 1~5단계 skip. evidence에 강제 기록.

종료 코드:
    0   PASS / 정상 전이
    1   FAIL (가드 차단, 잘못된 전이, evidence 미달, etc.)
    2   internal error (subprocess 실행 실패 등)
"""
from __future__ import annotations

import argparse
import getpass
import hashlib
import json
import os
import subprocess
import sys
import tempfile
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, NoReturn, overload

# ---------------------------------------------------------------------------
# 상수 / 경로
# ---------------------------------------------------------------------------

WORKSPACE = Path(os.environ.get("WORKSPACE_ROOT", "/home/jay/workspace"))
STATE_DIR = WORKSPACE / ".tasks" / "state"

GUARD_SH = WORKSPACE / "scripts" / "guard.sh"
QC_REPORT_GUARD = WORKSPACE / "scripts" / "qc_report_guard.py"

# 8 required CI checks (task-2440/2445)
REQUIRED_CHECKS: tuple[str, ...] = (
    "cancel-kill-switch",
    "qc-check",
    "hidden-path-audit",
    "lock-in-check",
    "merge-safety-check",
    "gemini-review-gate",
    "ci/guard",
    "guard",
)

# 상태 모델
STATES: tuple[str, ...] = (
    "CREATED",
    "DISPATCHED",
    "ACKED",
    "RUNNING",
    "PR_OPEN",
    "GUARD_PASS",
    "HUMAN_APPROVED",
    "MERGED",
    "DONE",
    "CANCELLED",
    "FAILED",
)

# 정규 전이 표 (forward-only). cancel/fail은 별도 규칙으로 보강.
ALLOWED_TRANSITIONS: dict[str, set[str]] = {
    "CREATED": {"DISPATCHED"},
    "DISPATCHED": {"ACKED"},
    "ACKED": {"RUNNING"},
    "RUNNING": {"PR_OPEN"},
    "PR_OPEN": {"GUARD_PASS", "PR_OPEN"},  # verify 재시도 허용
    "GUARD_PASS": {"HUMAN_APPROVED", "PR_OPEN"},  # 재verify시 PR_OPEN로 강등
    "HUMAN_APPROVED": {"MERGED"},
    "MERGED": {"DONE"},
    "DONE": set(),
    "CANCELLED": set(),
    "FAILED": set(),
}

# cancel/fail은 terminal 4종(MERGED/DONE/CANCELLED/FAILED) 외에서 허용
TERMINAL_STATES: set[str] = {"DONE", "CANCELLED", "FAILED"}


# ---------------------------------------------------------------------------
# 시간 / 직렬화
# ---------------------------------------------------------------------------


def _now() -> str:
    return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")


def _actor() -> str:
    try:
        user = getpass.getuser()
    except Exception:
        user = os.environ.get("USER", "unknown")
    try:
        email = subprocess.run(
            ["git", "config", "user.email"],
            capture_output=True, text=True, timeout=5, cwd=str(WORKSPACE),
        ).stdout.strip() or "unknown@local"
    except Exception:
        email = "unknown@local"
    return f"{user} <{email}>"


# ---------------------------------------------------------------------------
# state 파일 입출력 + 무결성 검증
# ---------------------------------------------------------------------------


def _state_path(task_id: str) -> Path:
    return STATE_DIR / f"{task_id}.json"


def _canonical_json(state: dict[str, Any]) -> str:
    payload = {k: v for k, v in state.items() if k != "_checksum"}
    return json.dumps(payload, ensure_ascii=False, sort_keys=True, separators=(",", ":"))


def _compute_checksum(state: dict[str, Any]) -> str:
    return hashlib.sha256(_canonical_json(state).encode("utf-8")).hexdigest()


def _new_state(task_id: str) -> dict[str, Any]:
    return {
        "task_id": task_id,
        "current_state": "CREATED",
        "transitions": [],
        "evidence": {
            "git_diff_sha": None,
            "changed_paths": [],
            "branch": None,
            "pr_number": None,
            "pr_state": None,
            "ci_checks": {},
            "guard_sh_result": None,
            "qc_report_guard_result": None,
            "merge_timestamp": None,
            "exit_codes": {},
        },
        "human_approved": False,
        "bypass": {"used": False, "ts": None, "actor": None},
    }


def _save(state: dict[str, Any]) -> None:
    STATE_DIR.mkdir(parents=True, exist_ok=True)
    state.pop("_checksum", None)
    state["_checksum"] = _compute_checksum(state)
    p = _state_path(state["task_id"])
    tmp = p.with_suffix(".json.tmp")
    tmp.write_text(json.dumps(state, ensure_ascii=False, indent=2), encoding="utf-8")
    tmp.replace(p)


@overload
def _load(task_id: str) -> dict[str, Any]: ...
@overload
def _load(task_id: str, *, allow_missing: bool) -> dict[str, Any] | None: ...
def _load(task_id: str, *, allow_missing: bool = False) -> dict[str, Any] | None:
    p = _state_path(task_id)
    if not p.exists():
        if allow_missing:
            return None
        _die(f"state 파일 없음: {p} (먼저 'taskctl init {task_id}' 실행)", 1)
    try:
        state = json.loads(p.read_text(encoding="utf-8"))
    except Exception as exc:
        _die(f"state 파일 파싱 실패: {p} → {exc}", 1)
    stored = state.pop("_checksum", None)
    expected = _compute_checksum(state)
    if stored != expected:
        _die(
            f"checksum 불일치: 외부 수정 의심 (state file tampered).\n"
            f"  task_id={task_id}\n"
            f"  stored={stored}\n"
            f"  expected={expected}\n"
            f"  → taskctl만 상태 변경 가능합니다.",
            1,
        )
    state["_checksum"] = stored
    return state


# ---------------------------------------------------------------------------
# 상태 전이
# ---------------------------------------------------------------------------


def _transition(state: dict[str, Any], target: str, *, actor: str | None = None,
                meta: dict[str, Any] | None = None,
                force: bool = False) -> None:
    src = state["current_state"]
    if force:
        # bypass / chairman override: 상태 전이 검증 skip
        pass
    elif target == src and target in {"PR_OPEN"}:
        # idempotent re-verify
        pass
    elif target in {"CANCELLED", "FAILED"}:
        if src in TERMINAL_STATES:
            _die(f"비정상 전이: {src} → {target} (terminal 상태에서 전이 불가)", 1)
    else:
        allowed = ALLOWED_TRANSITIONS.get(src, set())
        if target not in allowed:
            _die(
                f"비정상 전이: {src} → {target} (허용: {sorted(allowed) or 'none'})",
                1,
            )
    state["current_state"] = target
    entry = {
        "from": src,
        "to": target,
        "ts": _now(),
        "actor": actor or "taskctl",
    }
    if force:
        entry["forced"] = True
    if meta:
        entry["meta"] = meta
    state["transitions"].append(entry)


# ---------------------------------------------------------------------------
# 출력 / 종료
# ---------------------------------------------------------------------------


def _die(msg: str, code: int = 1) -> NoReturn:
    print(f"[taskctl] {msg}", file=sys.stderr)
    sys.exit(code)


def _print_ok(msg: str) -> None:
    print(f"[taskctl] {msg}")


# ---------------------------------------------------------------------------
# subprocess 헬퍼
# ---------------------------------------------------------------------------


def _run(cmd: list[str], *, cwd: Path | None = None, timeout: int = 120,
         env: dict[str, str] | None = None) -> subprocess.CompletedProcess:
    return subprocess.run(
        cmd,
        capture_output=True,
        text=True,
        timeout=timeout,
        cwd=str(cwd or WORKSPACE),
        env={**os.environ, **(env or {})},
    )


# ---------------------------------------------------------------------------
# 명령 핸들러
# ---------------------------------------------------------------------------


def cmd_init(args: argparse.Namespace) -> int:
    p = _state_path(args.task_id)
    if p.exists():
        _print_ok(f"already initialized: {args.task_id}")
        return 0
    state = _new_state(args.task_id)
    state["transitions"].append({
        "from": None, "to": "CREATED", "ts": _now(), "actor": _actor(),
    })
    _save(state)
    _print_ok(f"init {args.task_id} → CREATED ({p})")
    return 0


def _simple_transition_cmd(args: argparse.Namespace, target: str,
                           meta: dict[str, Any] | None = None) -> int:
    state = _load(args.task_id)
    _transition(state, target, actor=_actor(), meta=meta)
    _save(state)
    _print_ok(f"{args.task_id}: → {target}")
    return 0


def cmd_dispatch(args: argparse.Namespace) -> int:
    return _simple_transition_cmd(args, "DISPATCHED")


def cmd_ack(args: argparse.Namespace) -> int:
    return _simple_transition_cmd(args, "ACKED")


def cmd_run(args: argparse.Namespace) -> int:
    return _simple_transition_cmd(args, "RUNNING")


def cmd_pr_open(args: argparse.Namespace) -> int:
    state = _load(args.task_id)
    state["evidence"]["pr_number"] = args.pr
    _transition(state, "PR_OPEN", actor=_actor(), meta={"pr": args.pr})
    _save(state)
    _print_ok(f"{args.task_id}: → PR_OPEN (PR #{args.pr})")
    return 0


# ---------------------------------------------------------------------------
# verify: evidence 자동 수집 + guard.sh + qc_report_guard
# ---------------------------------------------------------------------------


def _collect_git_evidence() -> dict[str, Any]:
    branch = _run(["git", "rev-parse", "--abbrev-ref", "HEAD"]).stdout.strip()
    diff = _run(["git", "diff", "origin/main..HEAD", "--name-only"])
    changed = [p for p in diff.stdout.splitlines() if p.strip()]
    diff_sha = _run(["git", "rev-parse", "HEAD"]).stdout.strip()
    porcelain = _run(["git", "status", "--porcelain"]).stdout
    return {
        "branch": branch or None,
        "git_diff_sha": diff_sha or None,
        "changed_paths": changed,
        "git_status_porcelain_lines": len([1 for ln in porcelain.splitlines() if ln.strip()]),
    }


def _collect_pr_evidence(pr_number: int | None) -> dict[str, Any]:
    if not pr_number:
        return {"pr_state": None, "ci_checks": {}, "mergeable": None,
                "merge_state_status": None, "pr_view_error": "no_pr_number"}
    proc = _run([
        "gh", "pr", "view", str(pr_number),
        "--json", "state,mergeable,mergeStateStatus,statusCheckRollup",
    ], timeout=30)
    if proc.returncode != 0:
        return {"pr_state": None, "ci_checks": {}, "mergeable": None,
                "merge_state_status": None,
                "pr_view_error": proc.stderr[-300:].strip() or "gh_error"}
    try:
        data = json.loads(proc.stdout or "{}")
    except json.JSONDecodeError as exc:
        return {"pr_state": None, "ci_checks": {}, "mergeable": None,
                "merge_state_status": None, "pr_view_error": f"json_parse: {exc}"}
    rollup = data.get("statusCheckRollup") or []
    ci_map: dict[str, str] = {}
    for chk in rollup:
        name = chk.get("name") or chk.get("context") or ""
        conclusion = (chk.get("conclusion") or chk.get("state")
                      or chk.get("status") or "UNKNOWN")
        if name:
            ci_map[name] = conclusion
    required_map = {n: ci_map.get(n, "MISSING") for n in REQUIRED_CHECKS}
    return {
        "pr_state": data.get("state"),
        "ci_checks": required_map,
        "ci_checks_all": ci_map,
        "mergeable": data.get("mergeable"),
        "merge_state_status": data.get("mergeStateStatus"),
    }


def _run_guard_sh(stage: str, task_id: str) -> dict[str, Any]:
    if not GUARD_SH.exists():
        return {"result": "MISSING", "exit_code": 127, "stderr": "guard.sh not found"}
    proc = _run(["bash", str(GUARD_SH), stage, task_id], timeout=180)
    return {
        "result": "PASS" if proc.returncode == 0 else "FAIL",
        "exit_code": proc.returncode,
        "stdout_tail": proc.stdout[-500:],
        "stderr_tail": proc.stderr[-500:],
    }


def _run_qc_report_guard(task_id: str) -> dict[str, Any]:
    if not QC_REPORT_GUARD.exists():
        return {"result": "MISSING", "exit_code": 127, "stderr": "qc_report_guard.py not found"}
    proc = _run(
        ["python3", str(QC_REPORT_GUARD), "--task-id", task_id, "--workspace", str(WORKSPACE)],
        timeout=120,
    )
    return {
        "result": "PASS" if proc.returncode == 0 else "FAIL",
        "exit_code": proc.returncode,
        "stdout_tail": proc.stdout[-500:],
        "stderr_tail": proc.stderr[-500:],
    }


def _all_required_checks_pass(ci_checks: dict[str, str]) -> bool:
    if not ci_checks:
        return False
    return all(v == "SUCCESS" for v in ci_checks.values())


def cmd_verify(args: argparse.Namespace) -> int:
    state = _load(args.task_id)
    src = state["current_state"]
    if src in TERMINAL_STATES:
        _die(f"verify 불가: 현재 상태={src} (terminal)", 1)

    git_ev = _collect_git_evidence()
    state["evidence"]["branch"] = git_ev["branch"]
    state["evidence"]["git_diff_sha"] = git_ev["git_diff_sha"]
    state["evidence"]["changed_paths"] = git_ev["changed_paths"]

    pr_n = state["evidence"].get("pr_number")
    pr_ev = _collect_pr_evidence(pr_n)
    state["evidence"]["pr_state"] = pr_ev["pr_state"]
    state["evidence"]["ci_checks"] = pr_ev["ci_checks"]
    state["evidence"]["mergeable"] = pr_ev.get("mergeable")
    state["evidence"]["merge_state_status"] = pr_ev.get("merge_state_status")
    if "pr_view_error" in pr_ev:
        state["evidence"]["pr_view_error"] = pr_ev["pr_view_error"]

    guard_ev = _run_guard_sh("pre-push", args.task_id)
    state["evidence"]["guard_sh_result"] = guard_ev["result"]
    state["evidence"]["guard_sh_detail"] = guard_ev

    qc_ev = _run_qc_report_guard(args.task_id)
    state["evidence"]["qc_report_guard_result"] = qc_ev["result"]
    state["evidence"]["qc_report_guard_detail"] = qc_ev

    state["evidence"]["exit_codes"]["verify_guard_sh"] = guard_ev["exit_code"]
    state["evidence"]["exit_codes"]["verify_qc_report_guard"] = qc_ev["exit_code"]

    all_pass = (
        guard_ev["result"] == "PASS"
        and qc_ev["result"] == "PASS"
    )
    if all_pass:
        _transition(state, "GUARD_PASS", actor=_actor(),
                    meta={"guard_sh": "PASS", "qc_report_guard": "PASS"})
        state["evidence"]["exit_codes"]["verify"] = 0
        _save(state)
        _print_ok(f"{args.task_id}: verify PASS → GUARD_PASS")
        if args.machine:
            print(json.dumps(state, ensure_ascii=False, indent=2))
        return 0

    if src in {"GUARD_PASS", "HUMAN_APPROVED"}:
        _transition(state, "PR_OPEN", actor=_actor(),
                    meta={"reason": "verify failed, demoting to PR_OPEN"})
    state["evidence"]["exit_codes"]["verify"] = 1
    _save(state)
    fail_reasons: list[str] = []
    if guard_ev["result"] != "PASS":
        fail_reasons.append(f"guard.sh={guard_ev['result']}")
    if qc_ev["result"] != "PASS":
        fail_reasons.append(f"qc_report_guard={qc_ev['result']}")
    _print_ok(f"{args.task_id}: verify FAIL [{', '.join(fail_reasons)}]")
    if args.machine:
        print(json.dumps(state, ensure_ascii=False, indent=2))
    return 1


# ---------------------------------------------------------------------------
# approve / cancel / fail / status
# ---------------------------------------------------------------------------


def cmd_approve(args: argparse.Namespace) -> int:
    state = _load(args.task_id)
    if state["current_state"] != "GUARD_PASS":
        _die(
            f"approve 불가: 현재 상태={state['current_state']} (GUARD_PASS 필요)",
            1,
        )
    state["human_approved"] = True
    _transition(state, "HUMAN_APPROVED", actor=_actor())
    state["evidence"]["exit_codes"]["approve"] = 0
    _save(state)
    _print_ok(f"{args.task_id}: → HUMAN_APPROVED")
    return 0


def cmd_cancel(args: argparse.Namespace) -> int:
    state = _load(args.task_id)
    _transition(state, "CANCELLED", actor=_actor(),
                meta={"reason": getattr(args, "reason", None)})
    _save(state)
    _print_ok(f"{args.task_id}: → CANCELLED")
    return 0


def cmd_fail(args: argparse.Namespace) -> int:
    state = _load(args.task_id)
    _transition(state, "FAILED", actor=_actor(),
                meta={"reason": args.reason})
    _save(state)
    _print_ok(f"{args.task_id}: → FAILED ({args.reason})")
    return 0


def cmd_status(args: argparse.Namespace) -> int:
    state = _load(args.task_id, allow_missing=True)
    if state is None:
        if args.machine:
            print(json.dumps({"task_id": args.task_id, "current_state": None,
                              "missing": True}, ensure_ascii=False))
        else:
            print(f"[taskctl] {args.task_id}: state file missing")
        return 0  # MVP: 미초기화는 차단하지 않음
    if args.machine:
        print(json.dumps(state, ensure_ascii=False, indent=2))
    else:
        print(f"task: {state['task_id']}")
        print(f"state: {state['current_state']}")
        print(f"transitions: {len(state['transitions'])}")
        ev = state["evidence"]
        print(f"branch: {ev.get('branch')}")
        print(f"pr: {ev.get('pr_number')}")
        print(f"guard_sh: {ev.get('guard_sh_result')}")
        print(f"qc_report_guard: {ev.get('qc_report_guard_result')}")
        print(f"human_approved: {state['human_approved']}")
        print(f"bypass: {state['bypass']}")
    return 0


# ---------------------------------------------------------------------------
# atomic write 헬퍼
# ---------------------------------------------------------------------------


def _atomic_write(path: Path, data: dict[str, Any]) -> None:
    """tempfile + os.replace 를 이용한 atomic write."""
    path.parent.mkdir(parents=True, exist_ok=True)
    fd, tmp_path = tempfile.mkstemp(dir=str(path.parent), suffix=".tmp")
    try:
        with os.fdopen(fd, "w", encoding="utf-8") as f:
            json.dump(data, f, ensure_ascii=False, indent=2)
        os.replace(tmp_path, str(path))
    except Exception:
        try:
            os.unlink(tmp_path)
        except OSError:
            pass
        raise


# ---------------------------------------------------------------------------
# takeover: handoff evidence 기반 봇 인계
# ---------------------------------------------------------------------------


def cmd_takeover(args: argparse.Namespace) -> int:
    """takeover: handoff 검증 → 새 worktree+branch 생성 → start_task_guard 자동 호출 → evidence 저장."""
    task_id: str = args.task_id
    from_branch: str = args.from_branch
    new_bot: str = args.bot

    # validate_handoff 모듈 import (scripts/ 디렉토리 sys.path 추가)
    scripts_dir = str(WORKSPACE / "scripts")
    if scripts_dir not in sys.path:
        sys.path.insert(0, scripts_dir)
    try:
        from validate_handoff import validate_handoff  # type: ignore[import]
    except ImportError as exc:
        _die(
            f"validate_handoff 모듈 import 실패: {exc}\n"
            f"  {scripts_dir}/validate_handoff.py 파일이 있는지 확인하세요.",
            2,
        )

    # ts/evidence_dir 사전 준비 — 모든 실패 경로에서 evidence 가능하도록
    evidence_dir_early = WORKSPACE / ".tasks" / "evidence" / task_id
    now_utc = datetime.now(timezone.utc)
    ts_filename = now_utc.strftime("%Y%m%dT%H%M%SZ")  # filename 안전
    ts_iso = now_utc.strftime("%Y-%m-%dT%H:%M:%SZ")  # ISO 8601

    def _save_early_failure(reason: str, failure_check: str,
                            handoff_data_partial: dict | None = None) -> None:
        try:
            evidence_dir_early.mkdir(parents=True, exist_ok=True)
            ev = {
                "task_id": task_id,
                "previous_bot": (handoff_data_partial or {}).get("previous_bot"),
                "new_bot": new_bot,
                "from_branch": from_branch,
                "new_branch": f"task/{task_id}-{new_bot}",
                "started_at": ts_iso,
                "ts_filename": ts_filename,
                "takeover_status": "failed",
                "failure_reason": reason,
                "failure_check": failure_check,
            }
            _atomic_write(evidence_dir_early / f"takeover-{ts_filename}.json", ev)
        except Exception:
            pass

    # 1. handoff 파일 존재 확인
    handoff_path = WORKSPACE / "memory" / "handoffs" / f"{task_id}.json"
    if not handoff_path.exists():
        _save_early_failure(f"handoff 파일 없음: {handoff_path}", "1_handoff_exists")
        _die(f"handoff 파일 없음: {handoff_path}", 1)

    # 2. validate_handoff 호출 — schema + branch + head_sha 일치 모두 단일 검증 소스에서
    ok, errors, handoff_data = validate_handoff(
        task_id=task_id,
        branch=from_branch,
        check_head_sha=True,
        workspace_root=WORKSPACE,
    )
    if not ok:
        err_lines = "\n  ".join(errors)
        _save_early_failure(f"handoff 검증 실패: {'; '.join(errors)}",
                            "2_validate_handoff",
                            handoff_data)
        _die(f"handoff 검증 실패:\n  {err_lines}", 1)

    # 3. from-branch 존재 확인 (validate_handoff에서 head_sha 검증 시 이미 git rev-parse 호출됨,
    #    하지만 조기 실패 evidence를 위한 명시적 check 유지)
    proc = _run(["git", "rev-parse", "--verify", from_branch])
    if proc.returncode != 0:
        _save_early_failure(
            f"from-branch '{from_branch}' 존재하지 않음: {proc.stderr.strip()}",
            "3_from_branch_exists",
            handoff_data,
        )
        _die(
            f"from-branch '{from_branch}' 존재하지 않음: {proc.stderr.strip()}",
            1,
        )

    # 4. head_sha 검증은 validate_handoff(check_head_sha=True)에서 이미 수행됨.
    recorded_head = handoff_data.get("head_sha", "")

    # 5. 새 worktree 경로 결정
    new_worktree_path = WORKSPACE / ".worktrees" / f"{task_id}-{new_bot}"
    if new_worktree_path.exists():
        _save_early_failure(
            f"새 worktree 경로 이미 존재: {new_worktree_path}",
            "5_worktree_path_collision",
            handoff_data,
        )
        _die(
            f"새 worktree 경로가 이미 존재합니다: {new_worktree_path}",
            1,
        )

    # 6. 새 branch 이름 결정
    new_branch = f"task/{task_id}-{new_bot}"
    proc_branch = _run(["git", "rev-parse", "--verify", new_branch])
    if proc_branch.returncode == 0:
        _save_early_failure(
            f"새 branch 이미 존재: {new_branch}",
            "6_branch_collision",
            handoff_data,
        )
        _die(
            f"새 branch '{new_branch}'가 이미 존재합니다. "
            f"다른 bot ID를 사용하거나 기존 branch를 삭제하세요.",
            1,
        )

    # origin/main HEAD (base_sha for evidence)
    proc_main = _run(["git", "rev-parse", "origin/main"])
    if proc_main.returncode != 0:
        _die(f"origin/main HEAD 조회 실패: {proc_main.stderr.strip()}", 1)
    base_sha = proc_main.stdout.strip()

    # evidence 저장 준비 — 실패 시에도 기록 (ts_filename/ts_iso는 위 사전 준비에서 결정)
    evidence_dir = WORKSPACE / ".tasks" / "evidence" / task_id
    evidence_file = evidence_dir / f"takeover-{ts_filename}.json"

    def _save_failed_evidence(reason: str) -> None:
        try:
            ev = {
                "task_id": task_id,
                "previous_bot": handoff_data.get("previous_bot"),
                "new_bot": new_bot,
                "base_sha": base_sha,
                "head_sha": recorded_head,
                "handoff_path": str(handoff_path),
                "from_branch": from_branch,
                "new_branch": new_branch,
                "new_worktree_path": str(new_worktree_path),
                "started_at": ts_iso,
                "ts_filename": ts_filename,
                "takeover_status": "failed",
                "failure_reason": reason,
            }
            _atomic_write(evidence_file, ev)
        except Exception:
            pass  # evidence 저장 실패는 무시

    # 7. 새 worktree 생성 (origin/main 기반)
    proc_wt = _run([
        "git", "worktree", "add",
        str(new_worktree_path),
        "-b", new_branch,
        "origin/main",
    ])
    if proc_wt.returncode != 0:
        reason = f"git worktree add 실패: {proc_wt.stderr.strip()}"
        _save_failed_evidence(reason)
        _die(reason, 1)

    # 9. start_task_guard 자동 호출 — 새 worktree 안의 스크립트를 사용 (격리/재현성)
    start_guard_script = new_worktree_path / "scripts" / "start_task_guard.py"
    if not start_guard_script.exists():
        # 새 worktree에 없으면 main workspace 폴백
        start_guard_script = WORKSPACE / "scripts" / "start_task_guard.py"
    if not start_guard_script.exists():
        # worktree 정리 시도
        _run(["git", "worktree", "remove", "--force", str(new_worktree_path)])
        _run(["git", "branch", "-D", new_branch])
        reason = f"start_task_guard.py 없음: {start_guard_script}"
        _save_failed_evidence(reason)
        _die(reason, 1)

    proc_guard = subprocess.run(
        [sys.executable, str(start_guard_script), "--task", task_id, "--bot", new_bot],
        capture_output=True,
        text=True,
        timeout=120,
        cwd=str(new_worktree_path),
        env={**os.environ, "WORKSPACE_ROOT": str(WORKSPACE)},
    )
    if proc_guard.returncode != 0:
        # worktree 정리 시도
        _run(["git", "worktree", "remove", "--force", str(new_worktree_path)])
        _run(["git", "branch", "-D", new_branch])
        reason = (
            f"start_task_guard 실패 (exit={proc_guard.returncode}): "
            f"{proc_guard.stderr[-300:].strip()}"
        )
        _save_failed_evidence(reason)
        _die(reason, 1)

    # 10. evidence 저장 (atomic write)
    evidence = {
        "task_id": task_id,
        "previous_bot": handoff_data.get("previous_bot"),
        "new_bot": new_bot,
        "base_sha": base_sha,
        "head_sha": recorded_head,
        "handoff_path": str(handoff_path),
        "from_branch": from_branch,
        "new_branch": new_branch,
        "new_worktree_path": str(new_worktree_path),
        "started_at": ts_iso,
        "ts_filename": ts_filename,
        "takeover_status": "success",
        "start_guard_stdout": proc_guard.stdout[-500:],
    }
    _atomic_write(evidence_file, evidence)

    # 11. 성공 메시지
    _print_ok(
        f"takeover 성공: {task_id}\n"
        f"  previous_bot={handoff_data.get('previous_bot')} → new_bot={new_bot}\n"
        f"  new branch:   {new_branch}\n"
        f"  new worktree: {new_worktree_path}\n"
        f"  evidence:     {evidence_file}"
    )
    return 0


# ---------------------------------------------------------------------------
# merge: main 진입 단일 경로
# ---------------------------------------------------------------------------


def _bypass_active() -> bool:
    return os.environ.get("TASKCTL_BYPASS", "") == "1"


def cmd_merge(args: argparse.Namespace) -> int:
    state = _load(args.task_id)
    bypass = _bypass_active()

    if bypass:
        state["bypass"] = {
            "used": True,
            "ts": _now(),
            "actor": _actor(),
        }
        print("★★★ TASKCTL BYPASS USED — Chairman override", file=sys.stderr)
        # bypass는 1~5단계 skip하지만 evidence 기록 후 6단계로 직행
    else:
        # 1) HUMAN_APPROVED 검사
        if state["current_state"] != "HUMAN_APPROVED":
            state["evidence"]["exit_codes"]["merge"] = 1
            _save(state)
            _die(
                f"merge 차단: 현재 상태={state['current_state']} "
                f"(HUMAN_APPROVED 필요. 'taskctl approve {args.task_id}' 먼저)",
                1,
            )
        # 2) CANCELLED 검사 (상태가 HUMAN_APPROVED라도 더블체크)
        if state["current_state"] == "CANCELLED":
            state["evidence"]["exit_codes"]["merge"] = 1
            _save(state)
            _die("merge 차단: state=CANCELLED", 1)
        # 3) guard.sh pre-push 재실행
        guard_ev = _run_guard_sh("pre-push", args.task_id)
        state["evidence"]["guard_sh_result"] = guard_ev["result"]
        state["evidence"]["guard_sh_detail_merge"] = guard_ev
        if guard_ev["result"] != "PASS":
            state["evidence"]["exit_codes"]["merge"] = 1
            _save(state)
            _die(
                f"merge 차단: guard.sh pre-push FAIL "
                f"(exit={guard_ev['exit_code']})",
                1,
            )
        # 4) qc_report_guard 재실행
        qc_ev = _run_qc_report_guard(args.task_id)
        state["evidence"]["qc_report_guard_result"] = qc_ev["result"]
        state["evidence"]["qc_report_guard_detail_merge"] = qc_ev
        if qc_ev["result"] != "PASS":
            state["evidence"]["exit_codes"]["merge"] = 1
            _save(state)
            _die(
                f"merge 차단: qc_report_guard FAIL "
                f"(exit={qc_ev['exit_code']})",
                1,
            )
        # 5) PR view 검증
        pr_n = state["evidence"].get("pr_number")
        if pr_n:
            pr_ev = _collect_pr_evidence(pr_n)
            state["evidence"]["pr_state"] = pr_ev["pr_state"]
            state["evidence"]["ci_checks"] = pr_ev["ci_checks"]
            state["evidence"]["mergeable"] = pr_ev.get("mergeable")
            state["evidence"]["merge_state_status"] = pr_ev.get("merge_state_status")
            if pr_ev.get("mergeable") not in {"MERGEABLE", None}:
                # None은 gh CLI 사용 불가/네트워크 단절 등 — MVP에서는 차단하지 않고 경고
                if pr_ev.get("mergeable") is not None:
                    state["evidence"]["exit_codes"]["merge"] = 1
                    _save(state)
                    _die(
                        f"merge 차단: PR mergeable={pr_ev.get('mergeable')}",
                        1,
                    )
            if not _all_required_checks_pass(pr_ev["ci_checks"]):
                # gh CLI 미사용 환경에서는 ci_checks 비어있을 수 있음 — MVP 신뢰 모델
                state["evidence"]["exit_codes"]["merge"] = 1
                _save(state)
                _die(
                    f"merge 차단: 8 required CI checks 미통과 — "
                    f"{pr_ev['ci_checks']}",
                    1,
                )

    # 6) 실제 main 진입 (gh pr merge — 본 코드베이스 유일 호출 지점)
    pr_n = state["evidence"].get("pr_number")
    if not pr_n:
        state["evidence"]["exit_codes"]["merge"] = 1
        _save(state)
        _die("merge 차단: PR 번호 없음 (taskctl pr-open 먼저)", 1)

    if args.dry_run:
        # dry-run: actual gh pr merge 호출 skip
        merge_ts = _now()
        state["evidence"]["merge_timestamp"] = merge_ts
        state["evidence"]["merge_dry_run"] = True
        state["evidence"]["exit_codes"]["merge"] = 0
        _transition(state, "MERGED", actor=_actor(),
                    meta={"dry_run": True, "bypass": bypass},
                    force=bypass)
        _transition(state, "DONE", actor=_actor(), force=bypass)
        _save(state)
        _print_ok(f"{args.task_id}: dry-run merge → MERGED → DONE (PR #{pr_n})")
        return 0

    # 실제 머지: 환경변수 set 후 gh pr merge 호출
    repo = os.environ.get("GH_REPO", "Jeon-Jonghyuk/dev_workspace")
    proc = _run(
        ["gh", "pr", "merge", str(pr_n), "--merge", "--delete-branch", "--repo", repo],
        env={"TASKCTL_INVOKED": "1"},
        timeout=120,
    )
    state["evidence"]["merge_subprocess"] = {
        "exit_code": proc.returncode,
        "stdout_tail": proc.stdout[-500:],
        "stderr_tail": proc.stderr[-500:],
    }
    if proc.returncode != 0:
        state["evidence"]["exit_codes"]["merge"] = proc.returncode
        _save(state)
        _die(
            f"merge 실패: gh pr merge exit={proc.returncode} "
            f"stderr={proc.stderr[-300:]}",
            proc.returncode or 1,
        )
    merge_ts = _now()
    state["evidence"]["merge_timestamp"] = merge_ts
    state["evidence"]["exit_codes"]["merge"] = 0
    _transition(state, "MERGED", actor=_actor(),
                meta={"pr": pr_n, "bypass": bypass},
                force=bypass)
    _transition(state, "DONE", actor=_actor(), force=bypass)
    _save(state)
    _print_ok(f"{args.task_id}: merged PR #{pr_n} → MERGED → DONE")
    return 0


# ---------------------------------------------------------------------------
# argparse
# ---------------------------------------------------------------------------


def build_parser() -> argparse.ArgumentParser:
    p = argparse.ArgumentParser(
        prog="taskctl",
        description="task 상태 enforcement layer + main 진입 단일 경로 (task-2449 MVP)",
    )
    sub = p.add_subparsers(dest="command", required=True)

    def _add_taskid(sp):
        sp.add_argument("task_id", help="task ID (예: task-2449)")

    sp = sub.add_parser("init", help="CREATED 상태 생성")
    _add_taskid(sp); sp.set_defaults(func=cmd_init)

    sp = sub.add_parser("dispatch", help="DISPATCHED 전환")
    _add_taskid(sp); sp.set_defaults(func=cmd_dispatch)

    sp = sub.add_parser("ack", help="ACKED 전환 (봇 수신)")
    _add_taskid(sp); sp.set_defaults(func=cmd_ack)

    sp = sub.add_parser("run", help="RUNNING 전환")
    _add_taskid(sp); sp.set_defaults(func=cmd_run)

    sp = sub.add_parser("pr-open", help="PR_OPEN 전환")
    _add_taskid(sp)
    sp.add_argument("--pr", type=int, required=True, help="PR 번호")
    sp.set_defaults(func=cmd_pr_open)

    sp = sub.add_parser("verify",
        help="evidence 자동 수집 + guard.sh + qc_report_guard 실행, PASS 시 GUARD_PASS")
    _add_taskid(sp)
    sp.add_argument("--machine", action="store_true", help="state JSON 출력")
    sp.set_defaults(func=cmd_verify)

    sp = sub.add_parser("approve", help="HUMAN_APPROVED 전환 (회장 승인)")
    _add_taskid(sp); sp.set_defaults(func=cmd_approve)

    sp = sub.add_parser("merge", help="main 진입 단일 경로")
    _add_taskid(sp)
    sp.add_argument("--dry-run", action="store_true",
                    help="actual gh pr merge skip — 상태만 전이")
    sp.set_defaults(func=cmd_merge)

    sp = sub.add_parser("cancel", help="CANCELLED 전환")
    _add_taskid(sp)
    sp.add_argument("--reason", default=None)
    sp.set_defaults(func=cmd_cancel)

    sp = sub.add_parser("fail", help="FAILED 전환")
    _add_taskid(sp)
    sp.add_argument("--reason", required=True)
    sp.set_defaults(func=cmd_fail)

    sp = sub.add_parser("status", help="현재 상태 + evidence 출력")
    _add_taskid(sp)
    sp.add_argument("--machine", action="store_true", help="JSON 출력")
    sp.set_defaults(func=cmd_status)

    sp = sub.add_parser(
        "takeover",
        help="handoff evidence 기반 봇 인계 — 새 worktree + branch + start_guard 자동",
    )
    _add_taskid(sp)
    sp.add_argument(
        "--from", dest="from_branch", required=True, metavar="BRANCH",
        help="이전 봇의 branch (예: task/task-2458-dev4)",
    )
    sp.add_argument("--bot", required=True, help="새 봇 ID (예: dev5)")
    sp.set_defaults(func=cmd_takeover)

    return p


def main(argv: list[str] | None = None) -> int:
    parser = build_parser()
    args = parser.parse_args(argv)
    try:
        rc = args.func(args)
    except SystemExit:
        raise
    except subprocess.TimeoutExpired as exc:
        _die(f"subprocess timeout: {exc}", 2)
    except Exception as exc:
        _die(f"internal error: {type(exc).__name__}: {exc}", 2)
    return rc or 0


if __name__ == "__main__":
    sys.exit(main())
