"""utils/replacement_pr_runner.py — task-2510 5 모듈 #2.
회장 명시: contaminated PR 자동 감지 + origin/main 기준 clean branch에서
expected_files만 이식하여 replacement PR 자동 생성.
원 PR은 close/delete 없이 보존.
"""
from __future__ import annotations

import argparse
import json
import logging
import os
import re
import subprocess
import sys
from dataclasses import asdict
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Callable, Optional, TYPE_CHECKING, TypeAlias

WORKSPACE = Path(os.environ.get("WORKSPACE_ROOT", "/home/jay/workspace"))

# CLI 직접 실행 시 패키지 루트를 sys.path에 추가
_HERE = Path(__file__).resolve().parent.parent  # utils/ → worktree root
if str(_HERE) not in sys.path:
    sys.path.insert(0, str(_HERE))

from utils.automation_contracts import (  # noqa: E402  # pyright: ignore[reportMissingImports]
    ReplacementResult,
    CriticalEscalationType,
    EscalationPacket,
)

# ─── circular import 회피 (task-2516) ────────────────────────────────────────
# merge_queue_executor.py가 top-level에서 ReplacementPRRunner를 import하기 때문에
# 본 모듈이 top-level에서 merge_queue_executor를 import하면 cycle이 발생한다.
# wrapper 함수로 lazy 위임하여 default wiring path(_WIRING_AVAILABLE=True)를 활성화한다.

TaskSpec: TypeAlias = Any  # runtime placeholder — 실제 클래스는 type hint 용도로만 참조됨
if TYPE_CHECKING:
    from utils.merge_queue_executor import TaskSpec  # type: ignore[no-redef]  # noqa: F401  # pyright: ignore[reportMissingImports]

def compare_effective_diff(*args, **kwargs):
    from utils import merge_queue_executor as _mqe  # pyright: ignore[reportMissingImports]
    return _mqe.compare_effective_diff(*args, **kwargs)


def detect_forbidden_paths(*args, **kwargs):
    from utils import merge_queue_executor as _mqe  # pyright: ignore[reportMissingImports]
    return _mqe.detect_forbidden_paths(*args, **kwargs)


def assert_no_forbidden_git_flags(*args, **kwargs):
    from utils import merge_queue_executor as _mqe  # pyright: ignore[reportMissingImports]
    return _mqe.assert_no_forbidden_git_flags(*args, **kwargs)


def load_task_spec(*args, **kwargs):
    from utils import merge_queue_executor as _mqe  # pyright: ignore[reportMissingImports]
    return _mqe.load_task_spec(*args, **kwargs)

logger = logging.getLogger(__name__)
RunnerType = Callable[..., subprocess.CompletedProcess]


def _default_runner(args, cwd=None, timeout=60):
    return subprocess.run(args, cwd=cwd or str(WORKSPACE), capture_output=True, text=True, timeout=timeout)


# ─── §1 cherry-pick 정적 차단 ────────────────────────────────────────────────
def assert_no_cherry_pick(args: list[str]) -> None:
    """args에 'cherry-pick' 토큰이 들어가면 RuntimeError(CHERRY_PICK_FORBIDDEN)."""
    if "cherry-pick" in args:
        raise RuntimeError("CHERRY_PICK_FORBIDDEN")


# ─── §2 PR metadata 수집 ────────────────────────────────────────────────────
def fetch_pr_metadata(pr_number: int, runner: RunnerType) -> dict:
    """gh pr view --json headRefName,headRefOid,baseRefName,files,title.
    return {"head_ref": ..., "head_sha": ..., "base_ref": ..., "files": [...], "task_id": ..., "title": ..., "number": pr_number}
    실패 시 RuntimeError.
    task_id는 title 또는 head_ref에서 [task-NNNN] 패턴 추출.
    """
    args = ["gh", "pr", "view", str(pr_number), "--json",
            "headRefName,headRefOid,baseRefName,files,title"]
    result = runner(args)
    if result.returncode != 0:
        raise RuntimeError(f"FETCH_PR_METADATA_FAILED: pr={pr_number} stderr={result.stderr!r}")
    data = json.loads(result.stdout or "{}")
    head_ref = data.get("headRefName", "")
    title = data.get("title", "")
    files = [f.get("path", "") for f in (data.get("files") or []) if f.get("path")]
    # task_id 추출
    m = re.search(r"task-\d+(?:\+\d+)?", title) or re.search(r"task-\d+(?:\+\d+)?", head_ref)
    task_id = m.group(0) if m else "unknown"
    return {
        "head_ref": head_ref,
        "head_sha": data.get("headRefOid", ""),
        "base_ref": data.get("baseRefName", "main"),
        "files": files,
        "task_id": task_id,
        "title": title,
        "number": pr_number,
    }


# ─── §3 effective diff 산출 ─────────────────────────────────────────────────
def compute_effective_diff(pr_meta: dict, runner: RunnerType) -> list[str]:
    """gh pr view에서 받은 files 우선 사용, 비어있으면 git diff origin/main...PR_HEAD --name-only.
    """
    if pr_meta.get("files"):
        return list(pr_meta["files"])
    head_sha = pr_meta.get("head_sha", "")
    if not head_sha:
        return []
    args = ["git", "diff", "--name-only", f"origin/{pr_meta.get('base_ref', 'main')}...{head_sha}"]
    assert_no_forbidden_git_flags(args)
    result = runner(args)
    if result.returncode != 0:
        return []
    return [line.strip() for line in (result.stdout or "").splitlines() if line.strip()]


# ─── §4 contaminated 판정 ──────────────────────────────────────────────────
def detect_contamination(
    effective_files: list[str],
    expected_files: list[str],
    extra_forbidden_patterns: Optional[list] = None,
) -> dict:
    """반환 dict: {"contaminated": bool, "forbidden_paths": [...], "extra": [...], "missing": [...]}"""
    extra_pats = None
    if extra_forbidden_patterns:
        extra_pats = [p if hasattr(p, "search") else re.compile(p) for p in extra_forbidden_patterns]
    forbidden = detect_forbidden_paths(effective_files, expected_files, extra_patterns=extra_pats)
    equal, extra, missing = compare_effective_diff(effective_files, expected_files)
    contaminated = bool(forbidden) or not equal
    return {"contaminated": contaminated, "forbidden_paths": forbidden, "extra": extra, "missing": missing}


# ─── §5 clean replacement branch 생성 ───────────────────────────────────────
def create_clean_replacement_branch(
    task_id: str,
    runner: RunnerType,
    *,
    timestamp: Optional[str] = None,
    repo_dir: Optional[str] = None,
) -> str:
    """origin/main 기준 신규 brace `task/<task_id>-replacement-<timestamp>` 생성.
    fetch origin main으로 stale base 회피. force/rebase/cherry-pick 금지.
    """
    if timestamp is None:
        timestamp = datetime.now(timezone.utc).strftime("%Y%m%d%H%M%S")
    branch = f"task/{task_id}-replacement-{timestamp}"

    # ★ Codex high #1: stale origin/main 회피를 위해 fetch 먼저 수행
    fetch_args = ["git", "fetch", "origin", "main", "--quiet"]
    assert_no_forbidden_git_flags(fetch_args)
    assert_no_cherry_pick(fetch_args)
    fetch_result = runner(fetch_args, cwd=repo_dir)
    if fetch_result.returncode != 0:
        raise RuntimeError(
            f"FETCH_ORIGIN_MAIN_FAILED: stderr={fetch_result.stderr!r}"
        )

    args = ["git", "checkout", "-b", branch, "origin/main"]
    assert_no_forbidden_git_flags(args)
    assert_no_cherry_pick(args)
    result = runner(args, cwd=repo_dir)
    if result.returncode != 0:
        raise RuntimeError(f"CREATE_CLEAN_BRANCH_FAILED: branch={branch} stderr={result.stderr!r}")
    return branch


# ─── §6 expected_files 이식 (cherry-pick 금지) ─────────────────────────────
def transplant_expected_files(
    expected_files: list[str],
    source_head: str,
    runner: RunnerType,
    *,
    repo_dir: Optional[str] = None,
) -> list[str]:
    """각 파일 git show <source_head>:<path> → write → git add. cherry-pick 정적 차단.
    파일 시스템에 실제로 write하므로 호출자는 임시 작업 dir(tmp_path 등)을 repo_dir로 줘야 source 손상 방지.
    """
    transplanted: list[str] = []
    cwd = repo_dir or str(WORKSPACE)
    for filepath in expected_files:
        show_args = ["git", "show", f"{source_head}:{filepath}"]
        assert_no_forbidden_git_flags(show_args)
        assert_no_cherry_pick(show_args)
        sr = runner(show_args, cwd=cwd)
        if sr.returncode != 0:
            raise RuntimeError(f"GIT_SHOW_FAILED: file={filepath} sha={source_head} stderr={sr.stderr!r}")
        target = Path(cwd) / filepath
        target.parent.mkdir(parents=True, exist_ok=True)
        target.write_text(sr.stdout, encoding="utf-8")
        add_args = ["git", "add", filepath]
        assert_no_forbidden_git_flags(add_args)
        assert_no_cherry_pick(add_args)
        ar = runner(add_args, cwd=cwd)
        if ar.returncode != 0:
            raise RuntimeError(f"GIT_ADD_FAILED: file={filepath} stderr={ar.stderr!r}")
        transplanted.append(filepath)
    return transplanted


# ─── §7 commit (push X) ────────────────────────────────────────────────────
def commit_local(
    task_id: str,
    runner: RunnerType,
    *,
    repo_dir: Optional[str] = None,
):
    """git commit -m only (push 하지 않음). force 금지. 실패 시 RuntimeError."""
    cwd = repo_dir or str(WORKSPACE)
    commit_msg = f"[{task_id}] replacement: expected_files only (auto-generated)"
    cargs = ["git", "commit", "-m", commit_msg]
    assert_no_forbidden_git_flags(cargs)
    assert_no_cherry_pick(cargs)
    cr = runner(cargs, cwd=cwd)
    if cr.returncode != 0:
        raise RuntimeError(f"COMMIT_FAILED: stderr={cr.stderr!r}")
    return cr  # CompletedProcess of commit


# ─── §7b push branch ───────────────────────────────────────────────────────
def push_branch(
    branch: str,
    runner: RunnerType,
    *,
    repo_dir: Optional[str] = None,
):
    """git push origin <branch>. force 금지. 실패 시 RuntimeError."""
    cwd = repo_dir or str(WORKSPACE)
    pargs = ["git", "push", "origin", branch]
    assert_no_forbidden_git_flags(pargs)
    assert_no_cherry_pick(pargs)
    pr = runner(pargs, cwd=cwd)
    if pr.returncode != 0:
        raise RuntimeError(f"PUSH_FAILED: stderr={pr.stderr!r}")
    return pr


# ─── §7c commit_and_push (하위 호환) ──────────────────────────────────────
def commit_and_push(
    task_id: str,
    branch: str,
    runner: RunnerType,
    *,
    repo_dir: Optional[str] = None,
    push: bool = True,
):
    """git commit -m + (선택적) git push. force 금지. 실패 시 RuntimeError."""
    cr = commit_local(task_id, runner, repo_dir=repo_dir)
    if push:
        push_branch(branch, runner, repo_dir=repo_dir)
    return cr  # CompletedProcess of commit


# ─── §8 로컬 diff 사전 검증 (push 전) ────────────────────────────────────
def precheck_local_replacement_diff(
    branch: str,
    expected_files: list[str],
    runner: RunnerType,
    *,
    repo_dir: Optional[str] = None,
) -> tuple[bool, list[str], list[str]]:
    """commit 후 push 전, 로컬 git diff 기반 사전 검증.

    git show --stat HEAD 또는 git diff --name-only origin/main...HEAD 호출.
    return (valid, extra, missing).
    valid=True이면 expected_files와 일치하고 forbidden path 없음.
    """
    cwd = repo_dir or str(WORKSPACE)
    # git diff --name-only origin/main...HEAD
    diff_args = ["git", "diff", "--name-only", "origin/main...HEAD"]
    assert_no_forbidden_git_flags(diff_args)
    assert_no_cherry_pick(diff_args)
    result = runner(diff_args, cwd=cwd)
    if result.returncode != 0:
        # fallback: git show --stat HEAD
        show_args = ["git", "show", "--stat", "--name-only", "--format=", "HEAD"]
        assert_no_forbidden_git_flags(show_args)
        assert_no_cherry_pick(show_args)
        result = runner(show_args, cwd=cwd)
        if result.returncode != 0:
            return False, [], list(expected_files)
    local_files = [line.strip() for line in (result.stdout or "").splitlines() if line.strip()]
    equal, extra, missing = compare_effective_diff(local_files, expected_files)
    # forbidden path 검사
    forbidden = detect_forbidden_paths(local_files, expected_files)
    if forbidden:
        return False, extra + forbidden, missing
    return equal, extra, missing


# ─── §9 dirty tree 사전 검사 (High #2) ────────────────────────────────────
def assert_clean_working_tree(runner: RunnerType, *, repo_dir: Optional[str] = None) -> None:
    """git status --porcelain 출력이 비어있지 않으면 RuntimeError(DIRTY_WORKING_TREE)."""
    cwd = repo_dir or str(WORKSPACE)
    args = ["git", "status", "--porcelain"]
    assert_no_forbidden_git_flags(args)
    result = runner(args, cwd=cwd)
    if result.returncode != 0:
        raise RuntimeError(f"DIRTY_WORKING_TREE: git status failed stderr={result.stderr!r}")
    if (result.stdout or "").strip():
        raise RuntimeError("DIRTY_WORKING_TREE: cannot proceed with replacement")


# ─── §10 replacement PR open ───────────────────────────────────────────────
def open_replacement_pr(
    task_id: str,
    branch: str,
    source_pr: int,
    runner: RunnerType,
    *,
    repo_dir: Optional[str] = None,
) -> int:
    """gh pr create. base=main, head=branch. body에 source PR 링크. return PR number.
    repo_dir은 다른 git/gh 호출과 일관되게 worktree/temp repo에 라우팅하기 위해 필요.
    """
    title = f"[{task_id}] replacement (auto for #{source_pr})"
    body = f"Auto-generated replacement for #{source_pr}. Original PR preserved (no close/delete)."
    args = ["gh", "pr", "create", "--base", "main", "--head", branch, "--title", title, "--body", body]
    assert_no_forbidden_git_flags(args)
    assert_no_cherry_pick(args)
    result = runner(args, cwd=repo_dir)
    if result.returncode != 0:
        raise RuntimeError(f"OPEN_REPLACEMENT_PR_FAILED: stderr={result.stderr!r}")
    # gh pr create는 PR URL을 stdout에 출력. 끝에서 숫자 추출.
    out = (result.stdout or "").strip()
    # gh pr create stdout은 https://github.com/<owner>/<repo>/pull/<N> 형식 — /pull/<N> 만 신뢰.
    # last-digits fallback은 task id/version과 충돌 위험이 있어 제거 (Gemini high #1).
    m = re.search(r"/pull/(\d+)\b", out)
    if not m:
        raise RuntimeError(f"OPEN_REPLACEMENT_PR_NO_PR_NUMBER: stdout={out!r}")
    return int(m.group(1))


# ─── §11 원 PR 보존 (close 절대 금지) ─────────────────────────────────────
def post_replaced_comment(source_pr: int, replacement_pr: int, runner: RunnerType):
    """gh pr comment <source_pr> -b "[REPLACED] by #<replacement_pr>". close/delete 호출 금지.
    args에 'close', 'delete', 'edit --state closed' 들어가면 즉시 RuntimeError.
    """
    body = f"[REPLACED] by #{replacement_pr} — automated by replacement_pr_runner (task-2510). Original PR preserved."
    args = ["gh", "pr", "comment", str(source_pr), "-b", body]
    # 정적 검증: close/delete/edit 토큰이 args에 들어가지 않도록
    forbidden_tokens = {"close", "delete", "delete-branch"}
    if any(t in args for t in forbidden_tokens):
        raise RuntimeError(f"ORIGINAL_PR_PRESERVE_FORBIDDEN_OP: args={args}")
    assert_no_forbidden_git_flags(args)
    assert_no_cherry_pick(args)
    result = runner(args)
    if result.returncode != 0:
        raise RuntimeError(f"POST_REPLACED_COMMENT_FAILED: stderr={result.stderr!r}")
    return result


# ─── §12 replacement diff 사후 검증 ──────────────────────────────────────
def validate_replacement_diff(
    replacement_pr: int,
    expected_files: list[str],
    runner: RunnerType,
) -> tuple[bool, list[str], list[str]]:
    """gh pr view <replacement_pr> --json files → compare_effective_diff.
    return (valid, extra, missing).
    """
    args = ["gh", "pr", "view", str(replacement_pr), "--json", "files"]
    result = runner(args)
    if result.returncode != 0:
        raise RuntimeError(f"VALIDATE_REPLACEMENT_DIFF_FAILED: pr={replacement_pr} stderr={result.stderr!r}")
    data = json.loads(result.stdout or "{}")
    effective = [f["path"] for f in (data.get("files") or []) if f.get("path")]
    return compare_effective_diff(effective, expected_files)


# ─── §13 EscalationPacket helper ────────────────────────────────────────
def build_escalation_packet(
    task_id: str,
    pr_number: int,
    escalation_type: CriticalEscalationType,
    reason: str,
    evidence: dict,
) -> EscalationPacket:
    """본 task에서는 EscalationPacket dataclass 인스턴스만 생성. 실제 보고는 task-2513 영역."""
    return EscalationPacket(
        task_id=task_id,
        pr_number=pr_number,
        escalation_type=escalation_type,
        reason=reason,
        why_auto_cannot_continue=f"replacement_pr_runner cannot continue without chair: {reason}",
        safe_options=[
            "회장 수동 검토 후 재개",
            "원 PR 보존 + clean branch 수동 생성",
            "expected_files 재정의",
        ],
        recommended_option="회장 수동 검토 후 재개",
        evidence=evidence,
    )


# ─── §14 main runner 클래스 ────────────────────────────────────────────────
class ReplacementPRRunner:
    """contaminated PR을 받아 replacement PR을 자동 생성하는 main entry."""

    def __init__(
        self,
        runner: Optional[RunnerType] = None,
        *,
        dry_run: bool = False,
        repo_dir: Optional[str] = None,
        extra_forbidden_patterns: Optional[list] = None,
        timestamp_provider: Optional[Callable[[], str]] = None,
    ):
        self.runner = runner or _default_runner
        self.dry_run = dry_run
        self.repo_dir = repo_dir
        self.extra_forbidden_patterns = extra_forbidden_patterns
        self._ts_provider = timestamp_provider or (
            lambda: datetime.now(timezone.utc).strftime("%Y%m%d%H%M%S")
        )
        self.last_escalation_packet: Optional[EscalationPacket] = None

    def _record_escalation(
        self,
        task_id: str,
        pr_number: int,
        escalation_type: CriticalEscalationType,
        reason: str,
        evidence: dict,
    ) -> None:
        """실패 경로에서 EscalationPacket을 생성하여 last_escalation_packet에 보관."""
        try:
            self.last_escalation_packet = build_escalation_packet(
                task_id=task_id,
                pr_number=pr_number,
                escalation_type=escalation_type,
                reason=reason,
                evidence=evidence,
            )
        except Exception as e:
            logger.warning("build_escalation_packet failed: %s", e)

    def execute(self, pr_number: int, task_spec: Optional[TaskSpec] = None) -> ReplacementResult:
        self.last_escalation_packet = None  # 매 실행마다 초기화

        # Step 1: PR metadata
        if self.dry_run:
            pr_meta = self._dry_run_pr_meta(pr_number, task_spec)
        else:
            try:
                pr_meta = fetch_pr_metadata(pr_number, self.runner)
            except Exception:
                task_id = (task_spec.task_id if task_spec else "unknown") or "unknown"
                self._record_escalation(
                    task_id=task_id,
                    pr_number=pr_number,
                    escalation_type=CriticalEscalationType.REPLACEMENT_PR_AUTO_CREATION_FAILED_FOR_CONTAMINATED_DIFF,
                    reason="FETCH_PR_METADATA_FAILED",
                    evidence={"pr_number": pr_number},
                )
                # Gemini medium: 원 PR에 어떤 변경도 가하지 않았으므로 preserved=True
                return ReplacementResult(
                    source_pr=pr_number, replacement_pr=None, original_pr_preserved=True,
                    expected_files=[], effective_diff_files=[], forbidden_paths=[],
                    success=False,
                    failure_reason=CriticalEscalationType.REPLACEMENT_PR_AUTO_CREATION_FAILED_FOR_CONTAMINATED_DIFF.value,
                )
        # Step 2: task_spec / expected_files
        if task_spec is not None:
            expected_files = list(task_spec.expected_files)
            task_id = task_spec.task_id
        else:
            task_id = pr_meta.get("task_id") or "unknown"
            expected_files = []
        # Step 3: effective diff
        if self.dry_run:
            effective_files = list(pr_meta.get("files", []))
        else:
            try:
                effective_files = compute_effective_diff(pr_meta, self.runner)
            except Exception:
                self._record_escalation(
                    task_id=task_id,
                    pr_number=pr_number,
                    escalation_type=CriticalEscalationType.REPLACEMENT_PR_AUTO_CREATION_FAILED_FOR_CONTAMINATED_DIFF,
                    reason="COMPUTE_EFFECTIVE_DIFF_FAILED",
                    evidence={"pr_number": pr_number, "task_id": task_id},
                )
                # Gemini medium: 원 PR에 어떤 변경도 가하지 않았으므로 preserved=True
                return ReplacementResult(
                    source_pr=pr_number, replacement_pr=None, original_pr_preserved=True,
                    expected_files=expected_files, effective_diff_files=[], forbidden_paths=[],
                    success=False,
                    failure_reason=CriticalEscalationType.REPLACEMENT_PR_AUTO_CREATION_FAILED_FOR_CONTAMINATED_DIFF.value,
                )
        # Step 4: contamination
        c = detect_contamination(effective_files, expected_files, extra_forbidden_patterns=self.extra_forbidden_patterns)
        # Step 5: forbidden path
        if c["forbidden_paths"]:
            self._record_escalation(
                task_id=task_id,
                pr_number=pr_number,
                escalation_type=CriticalEscalationType.FORBIDDEN_PATH_INTRUSION,
                reason="FORBIDDEN_PATH_DETECTED",
                evidence={"forbidden_paths": c["forbidden_paths"], "task_id": task_id},
            )
            return ReplacementResult(
                source_pr=pr_number, replacement_pr=None, original_pr_preserved=True,
                expected_files=expected_files, effective_diff_files=effective_files,
                forbidden_paths=c["forbidden_paths"],
                success=False,
                failure_reason=CriticalEscalationType.FORBIDDEN_PATH_INTRUSION.value,
            )
        # Step 6: clean → no-op
        if not c["contaminated"]:
            return ReplacementResult(
                source_pr=pr_number, replacement_pr=None, original_pr_preserved=True,
                expected_files=expected_files, effective_diff_files=effective_files,
                forbidden_paths=[],
                success=True, failure_reason=None,
            )
        # Step 7: contaminated → replacement
        if self.dry_run:
            return self._dry_run_replacement(
                pr_number=pr_number, task_id=task_id,
                expected_files=expected_files, effective_files=effective_files,
                forbidden_paths=c["forbidden_paths"],
            )

        # 실제 모드 — High #2: pre-flight dirty tree check
        try:
            assert_clean_working_tree(self.runner, repo_dir=self.repo_dir)
        except RuntimeError as e:
            self._record_escalation(
                task_id=task_id,
                pr_number=pr_number,
                escalation_type=CriticalEscalationType.REPLACEMENT_PR_AUTO_CREATION_FAILED_FOR_CONTAMINATED_DIFF,
                reason=str(e),
                evidence={"dirty_tree": True, "task_id": task_id},
            )
            return ReplacementResult(
                source_pr=pr_number, replacement_pr=None, original_pr_preserved=True,
                expected_files=expected_files, effective_diff_files=effective_files,
                forbidden_paths=[],
                success=False,
                failure_reason=CriticalEscalationType.REPLACEMENT_PR_AUTO_CREATION_FAILED_FOR_CONTAMINATED_DIFF.value,
            )

        # High #1: commit 후 push 전 사전 검증 흐름
        branch = None
        try:
            branch = create_clean_replacement_branch(task_id, self.runner, timestamp=self._ts_provider(), repo_dir=self.repo_dir)
            transplant_expected_files(expected_files, pr_meta["head_sha"], self.runner, repo_dir=self.repo_dir)
            # commit (push X)
            commit_local(task_id, self.runner, repo_dir=self.repo_dir)
        except Exception as e:
            self._record_escalation(
                task_id=task_id,
                pr_number=pr_number,
                escalation_type=CriticalEscalationType.REPLACEMENT_PR_AUTO_CREATION_FAILED_FOR_CONTAMINATED_DIFF,
                reason=str(e),
                evidence={"branch": branch, "task_id": task_id},
            )
            return ReplacementResult(
                source_pr=pr_number, replacement_pr=None, original_pr_preserved=True,
                expected_files=expected_files, effective_diff_files=effective_files,
                forbidden_paths=[],
                success=False,
                failure_reason=CriticalEscalationType.REPLACEMENT_PR_AUTO_CREATION_FAILED_FOR_CONTAMINATED_DIFF.value,
            )

        # ★ NEW: precheck_local_replacement_diff — push 전 사전 검증
        try:
            precheck_valid, precheck_extra, precheck_missing = precheck_local_replacement_diff(
                branch, expected_files, self.runner, repo_dir=self.repo_dir
            )
        except Exception:
            precheck_valid, precheck_extra, precheck_missing = False, [], []

        if not precheck_valid:
            # push 하지 않았으므로 local branch만 삭제
            try:
                del_args = ["git", "branch", "-d", branch]
                assert_no_forbidden_git_flags(del_args)
                self.runner(del_args, cwd=self.repo_dir)
            except Exception:
                pass
            self._record_escalation(
                task_id=task_id,
                pr_number=pr_number,
                escalation_type=CriticalEscalationType.REPLACEMENT_PR_FAILED,
                reason="PRECHECK_LOCAL_DIFF_MISMATCH",
                evidence={
                    "branch": branch,
                    "extra": precheck_extra,
                    "missing": precheck_missing,
                    "task_id": task_id,
                },
            )
            return ReplacementResult(
                source_pr=pr_number, replacement_pr=None, original_pr_preserved=True,
                expected_files=expected_files, effective_diff_files=effective_files,
                forbidden_paths=[],
                success=False,
                failure_reason=CriticalEscalationType.REPLACEMENT_PR_FAILED.value,
            )

        # 사전 검증 통과 → push + PR open
        try:
            push_branch(branch, self.runner, repo_dir=self.repo_dir)
            replacement_pr = open_replacement_pr(task_id, branch, pr_number, self.runner, repo_dir=self.repo_dir)
        except Exception as e:
            self._record_escalation(
                task_id=task_id,
                pr_number=pr_number,
                escalation_type=CriticalEscalationType.REPLACEMENT_PR_AUTO_CREATION_FAILED_FOR_CONTAMINATED_DIFF,
                reason=str(e),
                evidence={"branch": branch, "task_id": task_id},
            )
            return ReplacementResult(
                source_pr=pr_number, replacement_pr=None, original_pr_preserved=True,
                expected_files=expected_files, effective_diff_files=effective_files,
                forbidden_paths=[],
                success=False,
                failure_reason=CriticalEscalationType.REPLACEMENT_PR_AUTO_CREATION_FAILED_FOR_CONTAMINATED_DIFF.value,
            )

        # Step 8: validate replacement diff (사후 검증)
        try:
            valid_result = validate_replacement_diff(replacement_pr, expected_files, self.runner)
            valid = valid_result if isinstance(valid_result, bool) else valid_result[0]
        except Exception:
            valid = False
        if not valid:
            self._record_escalation(
                task_id=task_id,
                pr_number=pr_number,
                escalation_type=CriticalEscalationType.REPLACEMENT_PR_FAILED,
                reason="VALIDATE_REPLACEMENT_DIFF_FAILED",
                evidence={"replacement_pr": replacement_pr, "task_id": task_id},
            )
            return ReplacementResult(
                source_pr=pr_number, replacement_pr=replacement_pr, original_pr_preserved=True,
                expected_files=expected_files, effective_diff_files=effective_files,
                forbidden_paths=[],
                success=False,
                failure_reason=CriticalEscalationType.REPLACEMENT_PR_FAILED.value,
            )
        # Step 9: 원 PR 보존 코멘트
        try:
            post_replaced_comment(pr_number, replacement_pr, self.runner)
        except Exception as e:
            self._record_escalation(
                task_id=task_id,
                pr_number=pr_number,
                escalation_type=CriticalEscalationType.REPLACEMENT_PR_FAILED,
                reason=str(e),
                evidence={"replacement_pr": replacement_pr, "task_id": task_id},
            )
            # Gemini medium: post_replaced_comment 실패해도 원 PR은 close/delete되지 않음 → preserved=True
            return ReplacementResult(
                source_pr=pr_number, replacement_pr=replacement_pr, original_pr_preserved=True,
                expected_files=expected_files, effective_diff_files=effective_files,
                forbidden_paths=[],
                success=False,
                failure_reason=CriticalEscalationType.REPLACEMENT_PR_FAILED.value,
            )
        return ReplacementResult(
            source_pr=pr_number, replacement_pr=replacement_pr, original_pr_preserved=True,
            expected_files=expected_files, effective_diff_files=effective_files,
            forbidden_paths=[],
            success=True, failure_reason=None,
        )

    def _dry_run_pr_meta(self, pr_number, task_spec):
        task_id = task_spec.task_id if task_spec else "unknown"
        return {
            "head_ref": f"task/{task_id}-dry-run",
            "head_sha": "dry-run-sha-0000000",
            "base_ref": "main",
            "files": list(task_spec.expected_files) if task_spec else [],
            "task_id": task_id,
            "title": f"[DRY-RUN] {task_id}",
            "number": pr_number,
        }

    def _dry_run_replacement(self, *, pr_number, task_id, expected_files, effective_files, forbidden_paths):
        timestamp = self._ts_provider()
        simulated_branch = f"task/{task_id}-replacement-{timestamp}"
        logger.info("[DRY-RUN] Would create branch=%r and replacement PR for PR #%d", simulated_branch, pr_number)
        return ReplacementResult(
            source_pr=pr_number, replacement_pr=None, original_pr_preserved=True,
            expected_files=expected_files, effective_diff_files=effective_files,
            forbidden_paths=forbidden_paths,
            success=True, failure_reason=None,
        )


# ─── §15 CLI ──────────────────────────────────────────────────────────────
def main(argv: Optional[list[str]] = None) -> int:
    parser = argparse.ArgumentParser(description="task-2510 replacement_pr_runner CLI")
    parser.add_argument("--pr", type=int, required=True)
    parser.add_argument("--dry-run", action="store_true")
    parser.add_argument("--task-file", type=str, default=None)
    args = parser.parse_args(argv)
    runner = ReplacementPRRunner(dry_run=args.dry_run)
    spec = None
    if args.task_file:
        spec = load_task_spec(Path(args.task_file))
    result = runner.execute(args.pr, task_spec=spec)
    print(json.dumps(asdict(result), default=str, ensure_ascii=False, indent=2))
    if runner.last_escalation_packet:
        print(json.dumps({"escalation_packet": asdict(runner.last_escalation_packet)}, default=str), file=sys.stderr)
    return 0 if result.success else 2


if __name__ == "__main__":
    sys.exit(main())
