"""
lifecycle_guards.py — task-2467+3 silent corruption 4대 결함 차단 공통 Guard 모듈 (task-2468)

회장 명시: "task-2467+3 사례 재현 후 차단됨 증명 필수".

P0-1  ~ P0-10 가드 구현:
  P0-1  : .g3-fail 마커 존재 시 .done 발행 금지
  P0-2  : done 직전 G3 PASS evidence 재검증
  P0-3  : Gemini High severity 판정 표준화
  P0-4  : auto-approve / auto-merge 7조건 통합 차단
  P0-5  : approver identity 시스템화
  P0-6  : merge commit SHA 검증
  P0-7  : state transition 엄격화
  P0-8  : bypass / override audit 강제
  P0-9  : bot author allowlist 외부 config화
  P0-10 : TASKCTL_CWD 제거 / --worktree 정식화
"""
from __future__ import annotations

import importlib.util
import json
import os
import subprocess
import sys
import warnings
from datetime import datetime, timezone
from pathlib import Path

# ---------------------------------------------------------------------------
# 경로 상수
# ---------------------------------------------------------------------------

WORKSPACE = Path(os.environ.get("WORKSPACE_ROOT", "/home/jay/workspace"))
EVENTS_DIR = WORKSPACE / "memory" / "events"
EVIDENCE_DIR = WORKSPACE / ".tasks" / "evidence"
SPECS_DIR = WORKSPACE / "memory" / "specs"
AUDIT_DIR = WORKSPACE / "memory" / "orchestration-audit"
ADMIN_OVERRIDE_LOG = AUDIT_DIR / "admin-override.jsonl"

DEFAULT_ALLOWED_BOT_PATH = SPECS_DIR / "allowed_bot_accounts.json"
DEFAULT_ALLOWED_APPROVERS_PATH = SPECS_DIR / "allowed_approvers.json"

# G3 fail marker variants
G3_FAIL_MARKER_NAMES = ("g3-fail", "g3-failed", "g3_fail", "g3_failed")
G3_PASS_EVIDENCE_NAMES = ("g3-pass.json", "g3.json", "g3_verifier.json")


# ---------------------------------------------------------------------------
# GuardResult
# ---------------------------------------------------------------------------


class GuardResult:
    """공통 결과 컨테이너."""

    def __init__(
        self,
        *,
        ok: bool,
        reason: str,
        detail: dict | None = None,
        blocking: list[str] | None = None,
    ):
        self.ok = ok
        self.reason = reason
        self.detail = detail or {}
        self.blocking = blocking or []

    def as_dict(self) -> dict:
        return {
            "result": "PASS" if self.ok else "FAIL",
            "ok": self.ok,
            "reason": self.reason,
            "detail": self.detail,
            "blocking": self.blocking,
        }

    def __repr__(self) -> str:
        status = "PASS" if self.ok else "FAIL"
        return f"GuardResult({status}: {self.reason!r})"


# ---------------------------------------------------------------------------
# 내부 헬퍼
# ---------------------------------------------------------------------------


def _now_iso() -> str:
    return datetime.now(timezone.utc).isoformat()


def _run_cmd(
    args: list[str],
    timeout: int = 30,
    cwd: Path | None = None,
) -> tuple[int, str, str]:
    """subprocess.run wrapper. (returncode, stdout, stderr)."""
    try:
        proc = subprocess.run(
            args,
            capture_output=True,
            text=True,
            timeout=timeout,
            cwd=str(cwd) if cwd else None,
        )
        return proc.returncode, proc.stdout.strip(), proc.stderr.strip()
    except subprocess.TimeoutExpired:
        return -1, "", f"timeout after {timeout}s"
    except FileNotFoundError as e:
        return -1, "", f"command not found: {e}"
    except Exception as e:
        return -1, "", str(e)


def _g3_fail_marker_paths(task_id: str, events_dir: Path) -> list[Path]:
    """events_dir에서 task_id 관련 g3 fail marker 파일 경로 목록."""
    found: list[Path] = []
    for name in G3_FAIL_MARKER_NAMES:
        p = events_dir / f"{task_id}.{name}"
        if p.exists():
            found.append(p)
    return found


def _g3_pass_evidence_paths(task_id: str, evidence_dir: Path) -> list[Path]:
    """evidence_dir/<task_id>/ 에서 g3 PASS evidence 파일 경로 목록."""
    task_ev_dir = evidence_dir / task_id
    found: list[Path] = []
    if not task_ev_dir.exists():
        return found
    for name in G3_PASS_EVIDENCE_NAMES:
        p = task_ev_dir / name
        if p.exists():
            found.append(p)
    return found


# ---------------------------------------------------------------------------
# P0-1, P0-7 보조
# ---------------------------------------------------------------------------


def find_g3_fail_markers(task_id: str, events_dir: Path | None = None) -> list[Path]:
    """task_id의 g3 fail 관련 마커 모두 탐지.

    (.g3-fail / .g3-failed / .g3_fail / .g3_failed 모두 인식)
    """
    _dir = events_dir or EVENTS_DIR
    return _g3_fail_marker_paths(task_id, _dir)


def check_g3_fail_blocks_done(
    task_id: str,
    events_dir: Path | None = None,
) -> GuardResult:
    """P0-1: .g3-fail 마커 존재 → .done 발행 차단."""
    _dir = events_dir or EVENTS_DIR
    markers = find_g3_fail_markers(task_id, _dir)
    if markers:
        names = [m.name for m in markers]
        # blocking 리스트에 마커 파일명 포함 (테스트에서 .g3-fail/.g3_fail 검색)
        blocking_entries = ["g3_fail_marker"] + names
        return GuardResult(
            ok=False,
            reason=f"G3 fail marker 존재: {names} — .done 발행 차단 (P0-1)",
            detail={"markers": [str(m) for m in markers]},
            blocking=blocking_entries,
        )
    return GuardResult(
        ok=True,
        reason="G3 fail marker 없음 — .done 발행 허용",
        detail={"checked_names": list(G3_FAIL_MARKER_NAMES)},
    )


def check_done_fail_conflict(
    task_id: str,
    events_dir: Path | None = None,
) -> GuardResult:
    """P0-7 보조: .done + .g3-fail 동시 존재 시 conflict 판정."""
    _dir = events_dir or EVENTS_DIR
    done_file = _dir / f"{task_id}.done"
    markers = find_g3_fail_markers(task_id, _dir)

    done_exists = done_file.exists()
    fail_exists = len(markers) > 0

    if done_exists and fail_exists:
        return GuardResult(
            ok=False,
            reason=(
                f"CONFLICT: .done과 .g3-fail 마커가 동시에 존재 — "
                f"task={task_id} done={done_file.name} fail_markers={[m.name for m in markers]}"
            ),
            detail={
                "done_file": str(done_file),
                "fail_markers": [str(m) for m in markers],
                "conflict": True,
            },
            blocking=["done_fail_conflict"],
        )
    if done_exists:
        return GuardResult(
            ok=True,
            reason=".done 존재, g3-fail 마커 없음 — 정상",
            detail={"done_file": str(done_file)},
        )
    if fail_exists:
        return GuardResult(
            ok=True,
            reason=".g3-fail 마커 존재, .done 없음 — 정상 (fail 상태)",
            detail={"fail_markers": [str(m) for m in markers]},
        )
    return GuardResult(
        ok=True,
        reason=".done과 .g3-fail 마커 모두 없음 — 정상",
        detail={},
    )


# ---------------------------------------------------------------------------
# P0-2
# ---------------------------------------------------------------------------


def check_done_g3_pass_evidence(
    task_id: str,
    pr_number: int | str | None,
    head_sha: str | None,
    evidence_dir: Path | None = None,
) -> GuardResult:
    """P0-2: done 직전 G3 PASS evidence 재검증.

    - evidence file 없음 → FAIL
    - evidence['result'] != 'PASS' → FAIL
    - evidence['task_id'] != task_id → FAIL (다른 task PASS 재사용)
    - evidence['pr_number'] != pr_number → FAIL (이전 PR PASS 재사용)
    - evidence['sha'] != head_sha → FAIL (stale PASS)
    """
    _ev_dir = evidence_dir or EVIDENCE_DIR
    paths = _g3_pass_evidence_paths(task_id, _ev_dir)

    if not paths:
        return GuardResult(
            ok=False,
            reason=f"G3 PASS evidence 파일 없음 ({G3_PASS_EVIDENCE_NAMES}) — .done 차단 (P0-2)",
            detail={"evidence_dir": str(_ev_dir / task_id)},
            blocking=["no_g3_pass_evidence"],
        )

    # 가장 앞 파일 사용 (이름 우선순위: g3-pass.json > g3.json > g3_verifier.json)
    ev_path = paths[0]
    try:
        ev = json.loads(ev_path.read_text(encoding="utf-8"))
    except Exception as e:
        return GuardResult(
            ok=False,
            reason=f"G3 PASS evidence 파일 파싱 실패: {e}",
            detail={"path": str(ev_path)},
            blocking=["evidence_parse_error"],
        )

    # result 확인
    ev_result = ev.get("result", "")
    if ev_result != "PASS":
        return GuardResult(
            ok=False,
            reason=f"G3 evidence result={ev_result!r} (PASS 필요) — .done 차단 (P0-2)",
            detail={"path": str(ev_path), "evidence_result": ev_result},
            blocking=["g3_not_pass"],
        )

    # task_id 매칭
    ev_task = ev.get("task_id", "")
    if ev_task and ev_task != task_id:
        return GuardResult(
            ok=False,
            reason=f"G3 PASS evidence task_id={ev_task!r} != {task_id!r} — 다른 task PASS 재사용 차단 (P0-2)",
            detail={"path": str(ev_path), "evidence_task_id": ev_task, "expected": task_id},
            blocking=["evidence_task_id_mismatch"],
        )

    # pr_number 매칭
    if pr_number is not None:
        ev_pr = ev.get("pr_number")
        try:
            pr_n_cmp = int(pr_number) if pr_number else None
        except (ValueError, TypeError):
            pr_n_cmp = None
        try:
            ev_pr_cmp = int(ev_pr) if ev_pr is not None else None
        except (ValueError, TypeError):
            ev_pr_cmp = None
        if ev_pr_cmp is not None and pr_n_cmp is not None and ev_pr_cmp != pr_n_cmp:
            return GuardResult(
                ok=False,
                reason=(
                    f"G3 PASS evidence pr_number={ev_pr_cmp} != {pr_n_cmp} "
                    f"— 이전 PR PASS 재사용 차단 (P0-2)"
                ),
                detail={
                    "path": str(ev_path),
                    "evidence_pr": ev_pr_cmp,
                    "expected_pr": pr_n_cmp,
                },
                blocking=["evidence_pr_mismatch"],
            )

    # sha 매칭
    if head_sha:
        ev_sha = ev.get("sha") or ev.get("head_sha") or ev.get("merge_commit_sha") or ""
        if ev_sha and ev_sha != head_sha:
            return GuardResult(
                ok=False,
                reason=(
                    f"G3 PASS evidence sha={ev_sha!r} != {head_sha!r} "
                    f"— stale PASS 차단 (P0-2)"
                ),
                detail={
                    "path": str(ev_path),
                    "evidence_sha": ev_sha,
                    "expected_sha": head_sha,
                },
                blocking=["evidence_sha_mismatch"],
            )

    return GuardResult(
        ok=True,
        reason=f"G3 PASS evidence 유효 (path={ev_path.name}, result=PASS)",
        detail={"path": str(ev_path), "evidence": ev},
    )


# ---------------------------------------------------------------------------
# P0-3
# ---------------------------------------------------------------------------


def check_gemini_severity(
    text: str | None = None,
    *,
    pr_number: int | None = None,
    repo: str | None = None,
) -> GuardResult:
    """P0-3: gemini_severity_parser.count_severities 위임. high>=1이면 FAIL.

    text 우선, 없으면 pr_number+repo로 PR 리뷰 fetch.
    """
    _scripts_dir = str(Path(__file__).parent)
    if _scripts_dir not in sys.path:
        sys.path.insert(0, _scripts_dir)
    try:
        import gemini_severity_parser as _gsp  # type: ignore[import]
    except ImportError as e:
        return GuardResult(
            ok=False,
            reason=f"gemini_severity_parser import 실패: {e}",
            blocking=["import_error"],
        )

    if text is not None:
        sv = _gsp.count_severities(text)
    elif pr_number is not None and repo:
        sv = _gsp.parse_pr_review_severities(repo, pr_number)
    else:
        return GuardResult(
            ok=False,
            reason="check_gemini_severity: text 또는 pr_number+repo 필요",
            blocking=["missing_args"],
        )

    high = sv.get("high", 0)
    if high >= 1:
        return GuardResult(
            ok=False,
            reason=f"Gemini High severity {high}건 검출 — auto-approve/merge 차단 (P0-3)",
            detail=sv,
            blocking=["gemini_high_severity"],
        )

    return GuardResult(
        ok=True,
        reason=f"Gemini High severity 0건 — 통과 (medium={sv.get('medium',0)}, low={sv.get('low',0)})",
        detail=sv,
    )


# ---------------------------------------------------------------------------
# P0-4
# ---------------------------------------------------------------------------


def check_auto_block_conditions(
    *,
    task_id: str,
    gemini_high: int = 0,
    g3_fail: bool = False,
    ci_pass: bool = True,
    hidden_path_pass: bool = True,
    merge_sha_ok: bool = True,
    author_ok: bool = True,
    approver_ok: bool = True,
) -> GuardResult:
    """P0-4: 7조건 중 하나라도 fail → block. blocking 리스트로 사유 모두 반환."""
    blocking: list[str] = []

    if gemini_high > 0:
        blocking.append(f"gemini_high={gemini_high} (High severity 검출)")
    if g3_fail:
        blocking.append("g3_fail=True (G3 verifier fail marker 존재)")
    if not ci_pass:
        blocking.append("ci_pass=False (required CI checks 미통과)")
    if not hidden_path_pass:
        blocking.append("hidden_path_pass=False (hidden path audit fail)")
    if not merge_sha_ok:
        blocking.append("merge_sha_ok=False (merge commit SHA 검증 실패)")
    if not author_ok:
        blocking.append("author_ok=False (PR author allowlist 불일치)")
    if not approver_ok:
        blocking.append("approver_ok=False (approver identity 불일치)")

    if blocking:
        return GuardResult(
            ok=False,
            reason=f"auto-block 7조건 위반 {len(blocking)}건 — task={task_id}",
            detail={
                "task_id": task_id,
                "conditions": {
                    "gemini_high": gemini_high,
                    "g3_fail": g3_fail,
                    "ci_pass": ci_pass,
                    "hidden_path_pass": hidden_path_pass,
                    "merge_sha_ok": merge_sha_ok,
                    "author_ok": author_ok,
                    "approver_ok": approver_ok,
                },
            },
            blocking=blocking,
        )

    return GuardResult(
        ok=True,
        reason=f"auto-block 7조건 모두 통과 — task={task_id}",
        detail={
            "task_id": task_id,
            "conditions": {
                "gemini_high": gemini_high,
                "g3_fail": g3_fail,
                "ci_pass": ci_pass,
                "hidden_path_pass": hidden_path_pass,
                "merge_sha_ok": merge_sha_ok,
                "author_ok": author_ok,
                "approver_ok": approver_ok,
            },
        },
    )


# ---------------------------------------------------------------------------
# P0-5
# ---------------------------------------------------------------------------


def load_allowed_approvers(path: Path | None = None) -> list[str]:
    """allowed_approvers.json에서 시스템 승인자 list 로드.

    누락/공백/오류 → 빈 list (fail-closed).
    """
    _path = path or DEFAULT_ALLOWED_APPROVERS_PATH
    if not _path.exists():
        return []
    try:
        data = json.loads(_path.read_text(encoding="utf-8"))
        approvers = data.get("approvers", [])
        if not isinstance(approvers, list):
            return []
        logins: list[str] = []
        for entry in approvers:
            if isinstance(entry, dict):
                login = entry.get("login", "")
                if login:
                    logins.append(login)
            elif isinstance(entry, str) and entry:
                logins.append(entry)
        return logins
    except Exception:
        return []


def check_approver_identity(
    approver: str | None,
    *,
    allowlist_path: Path | None = None,
) -> GuardResult:
    """P0-5: approver가 시스템 승인자(taskctl-gate, anu-verifier, GitHub App bot)인지.

    JonghyukJeon 등 사람 → manual approval 분류 (auto evidence 인정 X).
    """
    if not approver:
        return GuardResult(
            ok=False,
            reason="approver 미지정 — auto approve 차단 (P0-5)",
            blocking=["no_approver"],
        )

    allowed = load_allowed_approvers(allowlist_path)

    # manual_logins 로드
    _path = allowlist_path or DEFAULT_ALLOWED_APPROVERS_PATH
    manual_logins: list[str] = []
    try:
        if _path.exists():
            data = json.loads(_path.read_text(encoding="utf-8"))
            ml = data.get("manual_logins", [])
            if isinstance(ml, list):
                manual_logins = [x for x in ml if isinstance(x, str)]
    except Exception:
        pass

    approver_login = approver.split(" ")[0] if " " in approver else approver

    # 사람(manual) 계정 먼저 확인
    if approver_login in manual_logins:
        return GuardResult(
            ok=False,
            reason=(
                f"approver={approver_login!r} 는 human(manual) 승인자 — "
                f"auto evidence 인정 불가 (P0-5). manual approval로 분류."
            ),
            detail={"approver": approver_login, "type": "manual", "manual_logins": manual_logins},
            blocking=["human_approver_not_allowed_for_auto"],
        )

    # 시스템 승인자 allowlist 확인
    if not allowed:
        return GuardResult(
            ok=False,
            reason="allowed_approvers.json 미설정/비어있음 — fail-closed (P0-5)",
            blocking=["empty_approver_allowlist"],
        )

    if approver_login in allowed:
        return GuardResult(
            ok=True,
            reason=f"approver={approver_login!r} 시스템 승인자 확인 — auto evidence 인정 (P0-5)",
            detail={"approver": approver_login, "type": "system"},
        )

    return GuardResult(
        ok=False,
        reason=(
            f"approver={approver_login!r} 가 시스템 승인자 allowlist에 없음 — "
            f"auto evidence 인정 불가 (P0-5)"
        ),
        detail={"approver": approver_login, "allowed": allowed},
        blocking=["approver_not_in_allowlist"],
    )


# ---------------------------------------------------------------------------
# P0-6
# ---------------------------------------------------------------------------


def fetch_pr_merge_sha(pr_number: int, repo: str) -> dict:
    """gh api로 PR merge_commit_sha + base.ref 조회.

    Returns: {"merge_commit_sha": str, "base_ref": str, "ok": bool, "raw": dict}
    """
    rc, stdout, stderr = _run_cmd(
        ["gh", "api", f"repos/{repo}/pulls/{pr_number}"],
        timeout=30,
    )
    if rc != 0 or not stdout:
        return {
            "merge_commit_sha": "",
            "base_ref": "",
            "ok": False,
            "raw": {},
            "error": stderr or f"gh api 호출 실패 (rc={rc})",
        }
    try:
        data = json.loads(stdout)
    except json.JSONDecodeError as e:
        return {
            "merge_commit_sha": "",
            "base_ref": "",
            "ok": False,
            "raw": {},
            "error": f"JSON 파싱 실패: {e}",
        }

    merge_sha = data.get("merge_commit_sha") or ""
    base_ref = (data.get("base") or {}).get("ref") or ""
    return {
        "merge_commit_sha": merge_sha,
        "base_ref": base_ref,
        "ok": True,
        "raw": data,
    }


def _safe_git_fetch(base_ref: str, cwd: Path) -> bool:
    """Race-safe ``git fetch origin --no-tags +refs/heads/<base>:refs/remotes/origin/<base>``.

    subprocess shell=False, timeout=30. 실패 시 ``False``.
    """
    refspec = f"+refs/heads/{base_ref}:refs/remotes/origin/{base_ref}"
    rc, _, _ = _run_cmd(
        ["git", "fetch", "origin", "--no-tags", refspec],
        timeout=30,
        cwd=cwd,
    )
    return rc == 0


def _rev_parse_origin(base_ref: str, cwd: Path) -> str | None:
    """``git rev-parse --verify refs/remotes/origin/<base>`` (정확한 ref 경로)."""
    rc, stdout, _ = _run_cmd(
        ["git", "rev-parse", "--verify", f"refs/remotes/origin/{base_ref}"],
        timeout=15,
        cwd=cwd,
    )
    if rc != 0:
        return None
    return stdout.strip() or None


def fetch_origin_head_sha(
    base_ref: str,
    *,
    cwd: Path | None = None,
    force_fetch: bool = True,
) -> str | None:
    """Race-safe origin HEAD SHA fetch (P0-6).

    Sequence
    --------
    1. ``force_fetch=True`` 면 ``git fetch origin --no-tags +refs/heads/<base>:...``.
    2. 1차 ``git rev-parse origin/<base>`` -> ``sha_a``.
    3. ``time.sleep(0.5)`` (race window 노출).
    4. 2차 ``git rev-parse`` -> ``sha_b``.
    5. ``sha_a == sha_b`` 이면 그 값을 반환.
       불일치 시 1회 재 fetch + 재 조회. 그래도 불일치면 ``None``.

    Parameters
    ----------
    base_ref:
        Base branch (보통 ``main``).
    cwd:
        Git 작업 디렉토리. 기본 ``WORKSPACE``.
    force_fetch:
        ``True`` (기본) 면 호출 직전에 ``git fetch`` 강제 실행. 테스트에서
        ``False`` 로 끄고 mock 가능.
    """
    import time

    work_dir = cwd or WORKSPACE

    if force_fetch:
        _safe_git_fetch(base_ref, work_dir)

    sha_a = _rev_parse_origin(base_ref, work_dir)
    if sha_a is None:
        return None

    time.sleep(0.5)

    sha_b = _rev_parse_origin(base_ref, work_dir)
    if sha_b is None:
        return None

    if sha_a == sha_b:
        return sha_a

    # 1회 재시도 (race detection 시).
    if force_fetch:
        _safe_git_fetch(base_ref, work_dir)
    sha_c = _rev_parse_origin(base_ref, work_dir)
    if sha_c is None:
        return None
    time.sleep(0.5)
    sha_d = _rev_parse_origin(base_ref, work_dir)
    if sha_d is None or sha_c != sha_d:
        return None
    return sha_c


def check_merge_commit_sha(
    pr_number: int | None,
    *,
    repo: str | None = None,
    base_branch: str | None = None,
    cwd: Path | None = None,
) -> GuardResult:
    """P0-6: merge_commit_sha <-> origin/<base> HEAD 일치 검증.

    - merge_commit_sha empty/null -> FAIL
    - base_branch 미지정 -> PR base.ref에서 동적 조회 (hardcoded 'main' 금지)
    - origin/<base> HEAD != merge_commit_sha -> FAIL
    """
    if not pr_number or not repo:
        return GuardResult(
            ok=False,
            reason="check_merge_commit_sha: pr_number와 repo 필요 (P0-6)",
            blocking=["missing_args"],
        )

    pr_info = fetch_pr_merge_sha(pr_number, repo)
    if not pr_info["ok"]:
        return GuardResult(
            ok=False,
            reason=f"PR 정보 조회 실패: {pr_info.get('error', 'unknown')} (P0-6)",
            detail=pr_info,
            blocking=["pr_fetch_failed"],
        )

    merge_sha = pr_info["merge_commit_sha"]
    if not merge_sha:
        return GuardResult(
            ok=False,
            reason="merge_commit_sha empty/null — done 차단 (P0-6)",
            detail=pr_info,
            blocking=["empty_merge_sha"],
        )

    # base_branch 동적 결정
    _base = base_branch or pr_info["base_ref"]
    if not _base:
        return GuardResult(
            ok=False,
            reason="PR base branch 조회 실패 — hardcoded 'main' 사용 금지 (P0-6)",
            detail=pr_info,
            blocking=["no_base_branch"],
        )

    origin_sha = fetch_origin_head_sha(_base, cwd=cwd, force_fetch=True)
    if origin_sha is None:
        return GuardResult(
            ok=False,
            reason=f"origin/{_base} HEAD SHA 조회 실패 (P0-6)",
            detail={"base_branch": _base, "merge_sha": merge_sha},
            blocking=["origin_sha_fetch_failed"],
        )

    # P0-6 race-safe: 불일치 시 1회 재fetch+재비교 후 그래도 불일치면 fail.
    if origin_sha != merge_sha:
        retry_sha = fetch_origin_head_sha(_base, cwd=cwd, force_fetch=True)
        if retry_sha is None or retry_sha != merge_sha:
            return GuardResult(
                ok=False,
                reason=(
                    f"SHA mismatch: merge_commit_sha({merge_sha[:12]}...) "
                    f"≠ origin/{_base} HEAD({origin_sha[:12]}...) "
                    f"— done 차단 (P0-6)"
                ),
                detail={
                    "merge_commit_sha": merge_sha,
                    "origin_head_sha": origin_sha,
                    "retry_origin_sha": retry_sha,
                    "base_branch": _base,
                },
                blocking=["sha_mismatch"],
            )
        # retry 일치 -> origin_sha 갱신 후 통과로 진행.
        origin_sha = retry_sha

    return GuardResult(
        ok=True,
        reason=f"merge_commit_sha == origin/{_base} HEAD ({merge_sha[:12]}...) — 검증 통과 (P0-6)",
        detail={
            "merge_commit_sha": merge_sha,
            "origin_head_sha": origin_sha,
            "base_branch": _base,
        },
    )


# ---------------------------------------------------------------------------
# P0-7
# ---------------------------------------------------------------------------


def _load_allowed_transitions() -> dict[str, set[str]]:
    """scripts/taskctl.py의 ALLOWED_TRANSITIONS를 동적 import (circular 회피)."""
    taskctl_path = Path(__file__).parent / "taskctl.py"
    if not taskctl_path.exists():
        return {}
    try:
        spec = importlib.util.spec_from_file_location("_taskctl_module", str(taskctl_path))
        if spec is None or spec.loader is None:
            return {}
        mod = importlib.util.module_from_spec(spec)
        spec.loader.exec_module(mod)  # type: ignore[union-attr]
        return getattr(mod, "ALLOWED_TRANSITIONS", {})
    except Exception:
        return {}


def check_state_transition(src: str, target: str) -> GuardResult:
    """P0-7: ALLOWED_TRANSITIONS 기반 엄격 검증. fail->done 등 금지 전이 차단.

    동적 import로 scripts.taskctl 참조 (circular import 회피).
    """
    allowed_transitions = _load_allowed_transitions()

    if not allowed_transitions:
        return GuardResult(
            ok=False,
            reason="ALLOWED_TRANSITIONS 로드 실패 — fail-closed (P0-7)",
            blocking=["transitions_load_failed"],
        )

    allowed_next = allowed_transitions.get(src, set())
    terminal_states: set[str] = {"DONE", "CANCELLED", "FAILED", "ADMIN_OVERRIDE_USED"}

    if src in terminal_states and src != "FAILED":
        return GuardResult(
            ok=False,
            reason=f"terminal 상태 {src!r}에서 전이 불가 — {target!r} 차단 (P0-7)",
            detail={"src": src, "target": target, "terminal": True},
            blocking=["terminal_state_no_transition"],
        )

    if target not in allowed_next:
        return GuardResult(
            ok=False,
            reason=(
                f"금지된 상태 전이: {src!r} -> {target!r} "
                f"(허용: {sorted(allowed_next)}) (P0-7)"
            ),
            detail={"src": src, "target": target, "allowed": sorted(allowed_next)},
            blocking=["forbidden_transition"],
        )

    return GuardResult(
        ok=True,
        reason=f"상태 전이 허용: {src!r} -> {target!r} (P0-7)",
        detail={"src": src, "target": target},
    )


# ---------------------------------------------------------------------------
# P0-8
# ---------------------------------------------------------------------------


def check_bypass_audit(
    task_id: str,
    *,
    env: dict | None = None,
    audit_path: Path | None = None,
    production: bool | None = None,
) -> GuardResult:
    """P0-8: TASKCTL_BYPASS=1 / TASKCTL_PR_AUTHOR_OVERRIDE 사용 시 audit log 필수.

    - production env에서는 bypass/override 기본 금지 (PRODUCTION=1 fail-fast)
    - audit path 미생성/항목 없음 -> FAIL
    - audit jsonl에 본 task_id + 사용 사유 기록 필요
    """
    _env = env if env is not None else dict(os.environ)
    _audit_path = audit_path or ADMIN_OVERRIDE_LOG

    bypass_active = _env.get("TASKCTL_BYPASS", "") == "1"
    override_active = bool(_env.get("TASKCTL_PR_AUTHOR_OVERRIDE", ""))

    if not bypass_active and not override_active:
        return GuardResult(
            ok=True,
            reason="bypass/override 미사용 — audit 불필요 (P0-8)",
        )

    _production = production
    if _production is None:
        _production = _env.get("PRODUCTION", "") == "1"

    if _production:
        active_vars: list[str] = []
        if bypass_active:
            active_vars.append("TASKCTL_BYPASS=1")
        if override_active:
            active_vars.append("TASKCTL_PR_AUTHOR_OVERRIDE")
        return GuardResult(
            ok=False,
            reason=(
                f"production 환경에서 bypass/override 사용 금지: {active_vars} "
                f"— fail-fast (P0-8)"
            ),
            detail={"env_vars": active_vars, "production": True},
            blocking=["production_bypass_forbidden"],
        )

    if not _audit_path.exists():
        return GuardResult(
            ok=False,
            reason=(
                f"bypass/override 사용 중이나 audit log 없음: {_audit_path} "
                f"— fail-closed (P0-8)"
            ),
            detail={"audit_path": str(_audit_path), "bypass": bypass_active, "override": override_active},
            blocking=["missing_audit_log"],
        )

    try:
        lines = _audit_path.read_text(encoding="utf-8").splitlines()
        task_entries = []
        for line in lines:
            line = line.strip()
            if not line:
                continue
            try:
                entry = json.loads(line)
                if entry.get("task_id") == task_id:
                    task_entries.append(entry)
            except json.JSONDecodeError:
                continue
    except Exception as e:
        return GuardResult(
            ok=False,
            reason=f"audit log 읽기 실패: {e} (P0-8)",
            blocking=["audit_read_error"],
        )

    if not task_entries:
        return GuardResult(
            ok=False,
            reason=(
                f"audit log에 task_id={task_id!r} 항목 없음 — "
                f"bypass/override 사유 기록 필수 (P0-8)"
            ),
            detail={"audit_path": str(_audit_path), "task_id": task_id},
            blocking=["missing_audit_entry"],
        )

    return GuardResult(
        ok=True,
        reason=f"bypass/override audit log 확인 — {len(task_entries)}건 기록됨 (P0-8)",
        detail={"task_id": task_id, "audit_entries": task_entries},
    )


def append_admin_override_audit(
    task_id: str,
    reason: str,
    *,
    pr_number: int | str | None = None,
    audit_path: Path | None = None,
) -> Path:
    """admin-override.jsonl append. ADMIN_OVERRIDE_LOG 기본."""
    _path = audit_path or ADMIN_OVERRIDE_LOG
    _path.parent.mkdir(parents=True, exist_ok=True)
    entry = {
        "task_id": task_id,
        "reason": reason,
        "ts": _now_iso(),
        "pr_number": pr_number,
        "actor": os.environ.get("USER", "unknown"),
    }
    with _path.open("a", encoding="utf-8") as f:
        f.write(json.dumps(entry, ensure_ascii=False) + "\n")
    return _path


# ---------------------------------------------------------------------------
# P0-9
# ---------------------------------------------------------------------------


def load_allowed_bot_accounts(path: Path | None = None) -> list[str]:
    """allowed_bot_accounts.json 로드.

    파일 없음 / 비어있음 -> [] (fail-closed 호출자에서 처리).
    """
    _path = path or DEFAULT_ALLOWED_BOT_PATH
    if not _path.exists():
        return []
    try:
        data = json.loads(_path.read_text(encoding="utf-8"))
        patterns = data.get("patterns", [])
        exact = data.get("exact", [])
        combined: list[str] = []
        for lst in (patterns, exact):
            if isinstance(lst, list):
                combined.extend(x for x in lst if isinstance(x, str) and x)
        seen: set[str] = set()
        result: list[str] = []
        for item in combined:
            if item not in seen:
                seen.add(item)
                result.append(item)
        return result
    except Exception:
        return []


def _match_bot_author(author: str, allowlist: list[str]) -> bool:
    """author를 allowlist 패턴과 매칭.

    지원 패턴:
    - 정확 일치
    - *[bot] suffix
    - app/<slug> prefix
    """
    if not author or not allowlist:
        return False

    for pattern in allowlist:
        if author == pattern:
            return True
        if pattern == "*[bot]" and author.endswith("[bot]"):
            return True
        if pattern.endswith("/") and author.startswith(pattern):
            return True
        if pattern == "app/" and author.startswith("app/"):
            return True
        if pattern.startswith("app/") and author.startswith("app/"):
            # app/<slug> 형태 prefix 매칭
            return True

    return False


def check_bot_author_allowlist(
    author: str | None,
    *,
    allowlist_path: Path | None = None,
) -> GuardResult:
    """P0-9: PR author가 allowlist에 매칭하는지.

    - allowlist 미설정/빈 list -> FAIL (fail-closed)
    - 매칭 패턴: 정확 일치 + *[bot] suffix + app/<slug> prefix
    """
    if not author:
        return GuardResult(
            ok=False,
            reason="PR author 미지정 — bot allowlist 검증 불가 (P0-9)",
            blocking=["no_author"],
        )

    allowlist = load_allowed_bot_accounts(allowlist_path)

    if not allowlist:
        return GuardResult(
            ok=False,
            reason=(
                "allowed_bot_accounts.json 미설정/비어있음 — fail-closed (P0-9). "
                "허용 bot 계정 없음."
            ),
            detail={"author": author},
            blocking=["empty_bot_allowlist"],
        )

    if _match_bot_author(author, allowlist):
        return GuardResult(
            ok=True,
            reason=f"PR author={author!r} bot allowlist 매칭 — 허용 (P0-9)",
            detail={"author": author},
        )

    return GuardResult(
        ok=False,
        reason=(
            f"PR author={author!r} 가 bot allowlist에 없음 — merge 차단 (P0-9). "
            f"허용 패턴: {allowlist}"
        ),
        detail={"author": author, "allowlist": allowlist},
        blocking=["author_not_in_allowlist"],
    )


# ---------------------------------------------------------------------------
# P0-10
# ---------------------------------------------------------------------------


def resolve_worktree_path(
    args: object | None = None,
    *,
    env: dict | None = None,
    default: Path | None = None,
) -> Path:
    """P0-10: --worktree CLI 인자 정식화.

    - argparse Namespace 또는 dict 모두 허용
    - args.worktree 우선
    - TASKCTL_CWD env는 deprecated warning 출력 후 채택
    - 둘 다 없으면 default 또는 WORKSPACE 반환
    """
    _env = env if env is not None else dict(os.environ)

    worktree_val: str | None = None
    if args is not None:
        if isinstance(args, dict):
            worktree_val = args.get("worktree")
        else:
            worktree_val = getattr(args, "worktree", None)

    if worktree_val:
        return Path(worktree_val)

    cwd_env = _env.get("TASKCTL_CWD", "")
    if cwd_env:
        warnings.warn(
            "TASKCTL_CWD is deprecated. Use --worktree <path> instead.",
            DeprecationWarning,
            stacklevel=2,
        )
        return Path(cwd_env)

    return default or WORKSPACE
