"""anu_v2.replacement_pr_runner — ANU v2 replacement PR runner v0 (task-2537).

회장 §명시 (2026-05-10) — ANU v2 자동화 5 모듈 시리즈 2번째.

본질 4축:
  1. effective diff contamination 감지 (expected_files vs PR diff 정확 일치 검증)
  2. original PR 보존 (close / abort / 재라벨 일체 X — OPEN 유지)
  3. clean replacement PR 생성 (expected_files 정확히 N개만 담은 새 branch + 새 PR)
  4. replacement PR 실패 시에만 Critical 7종 #N 보고 (성공 시 보고 0건)

설계 원칙:
  - one-way isolation: anu_v2/* 만 import. utils/dispatch/scripts/dashboard 의존성 0.
  - 외부 부수효과는 모두 주입 가능한 callable 로 추상화 (gh_runner, git_runner,
    audit_writer) — 테스트 시 mock 으로 대체 가능.
  - admin override / force / rebase / owner_pat fallback / manual .done 일체 사용 금지.
  - executor 인터페이스 contract: dict 키 `replacement_pr_required`,
    `replacement_pr_runner_input` 두 개만 약속. executor 코드 자체는 변경하지 않는다.
"""

from __future__ import annotations

import os
import subprocess
from dataclasses import dataclass, field
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Callable, Iterable, Mapping, Sequence

# anu_v2 내부 의존성만 사용 (one-way isolation 준수)
from anu_v2.merge_queue_executor import (
    CRITICAL_DIFF_REPLACEMENT_FAILED,
    CRITICAL_REPLACEMENT_FAILED,
    assert_no_forbidden_git_flags,
)


# ─── Decision codes ─────────────────────────────────────────────────────────
CONTAMINATION_CLEAN = "CONTAMINATION_CLEAN"
CONTAMINATION_DETECTED = "CONTAMINATION_DETECTED"
ORIGINAL_PR_PRESERVED = "ORIGINAL_PR_PRESERVED"
REPLACEMENT_PR_CREATED = "REPLACEMENT_PR_CREATED"
REPLACEMENT_PR_FAILED = "REPLACEMENT_PR_FAILED"


# ─── 외부 부수효과 콜백 시그니처 ─────────────────────────────────────────────
GhRunner = Callable[[Sequence[str], Mapping[str, str] | None], subprocess.CompletedProcess]
GitRunner = Callable[[Sequence[str], Mapping[str, str] | None], subprocess.CompletedProcess]
AuditWriter = Callable[[Mapping[str, Any]], None]


# ─── Helpers ────────────────────────────────────────────────────────────────
def _now_iso() -> str:
    return datetime.now(timezone.utc).isoformat(timespec="seconds")


def _coerce_stream(value: Any) -> str:
    """subprocess stdout/stderr 정규화: None → "", bytes → utf-8, str → 그대로."""
    if value is None:
        return ""
    if isinstance(value, bytes):
        return value.decode("utf-8", errors="replace")
    return str(value)


# ─── Data types ──────────────────────────────────────────────────────────────
@dataclass(frozen=True)
class ContaminationReport:
    """effective diff contamination 평가 결과."""
    contaminated: bool
    extra_files: tuple[str, ...]
    missing_files: tuple[str, ...]

    def to_dict(self) -> dict[str, Any]:
        return {
            "contaminated": self.contaminated,
            "extra_files": list(self.extra_files),
            "missing_files": list(self.missing_files),
        }


@dataclass(frozen=True)
class PreservationRecord:
    """original PR 보존 결과 (OPEN 유지 + audit log 박제)."""
    original_pr: int
    preserved_state: str    # 항상 "OPEN" — close/abort 미수행
    audit_path: str
    ts: str


@dataclass(frozen=True)
class ReplacementResult:
    """clean replacement PR 생성 결과 (executor 큐 재진입 가능 형태)."""
    replacement_pr: int
    clean_sha: str
    clean_branch: str
    merge_queue_ready: bool

    def to_dict(self) -> dict[str, Any]:
        return {
            "replacement_pr": self.replacement_pr,
            "clean_sha": self.clean_sha,
            "clean_branch": self.clean_branch,
            "merge_queue_ready": self.merge_queue_ready,
        }


@dataclass
class ReplacementFailure:
    """replacement PR 시도 실패 시 회장 보고 payload."""
    stage: str          # "branch" / "checkout" / "commit" / "push" / "pr_create"
    reason: str
    extra: dict[str, Any] = field(default_factory=dict)


# ─── 본체 ────────────────────────────────────────────────────────────────────
class ReplacementPRRunner:
    """ANU v2 replacement PR runner v0.

    effective diff contamination 감지 → original PR 보존 → clean replacement PR 생성.

    인터페이스 연동: anu_v2.merge_queue_executor (task-2531) — clean PR 생성 후
    queue 에 삽입 가능하도록 dict 형식 통일. executor 코드는 변경하지 않으며 dict
    키 contract (`replacement_pr_required`, `replacement_pr_runner_input`) 만 약속.
    """

    def __init__(
        self,
        *,
        gh_runner: GhRunner,
        git_runner: GitRunner,
        audit_writer: AuditWriter,
        audit_root: Path,
        bot_token_env: str = "BOT_GITHUB_TOKEN",
        bot_git_name: str = "jeon-jonghyuk-taskctl-bot",
        bot_git_email: str = "jeon-jonghyuk-taskctl-bot@users.noreply.github.com",
    ) -> None:
        self._gh = gh_runner
        self._git = git_runner
        self._audit = audit_writer
        self._audit_root = Path(audit_root)
        self._bot_token_env = bot_token_env
        self._bot_git_name = bot_git_name
        self._bot_git_email = bot_git_email

    # ── 1. contamination 감지 ─────────────────────────────────────────────────
    def detect_contamination(
        self,
        original_pr_diff: Iterable[str],
        expected_files: Iterable[str],
    ) -> ContaminationReport:
        """effective diff vs expected_files 비교 → contamination 판정.

        - extra_files: PR diff 에는 있는데 expected_files 에 없는 파일 (scope 확장).
        - missing_files: expected_files 에는 있는데 PR diff 에 없는 파일 (누락).
        - 둘 중 하나라도 있으면 contaminated=True.

        Returns:
            ContaminationReport(contaminated, extra_files, missing_files)
        """
        actual = set(original_pr_diff)
        expected = set(expected_files)
        extra = sorted(actual - expected)
        missing = sorted(expected - actual)
        contaminated = bool(extra) or bool(missing)
        return ContaminationReport(
            contaminated=contaminated,
            extra_files=tuple(extra),
            missing_files=tuple(missing),
        )

    # ── 2. original PR 보존 ──────────────────────────────────────────────────
    def preserve_original_pr(self, pr_number: int) -> PreservationRecord:
        """original PR 보존 — close/abort 일체 금지. OPEN 유지 + audit 만 박제.

        audit 파일: {audit_root}/replacement_pr_<pr>.jsonl (append-only).
        gh / git 호출 0 — 본 메서드는 어떤 PR 상태도 변경하지 않는다.
        """
        ts = _now_iso()
        audit_path = self._audit_root / f"replacement_pr_{pr_number}.jsonl"
        self._audit({
            "ts": ts,
            "kind": "original_pr_preserved",
            "pr": pr_number,
            "decision": ORIGINAL_PR_PRESERVED,
            "note": "OPEN 유지 — close/abort 미수행",
            "audit_path": str(audit_path),
        })
        return PreservationRecord(
            original_pr=pr_number,
            preserved_state="OPEN",
            audit_path=str(audit_path),
            ts=ts,
        )

    # ── 3. clean replacement PR 생성 ─────────────────────────────────────────
    def create_clean_replacement(
        self,
        *,
        original_pr: int,
        expected_files: Iterable[str],
        clean_branch_name: str,
        base_ref: str = "main",
        commit_message: str = "",
        pr_title: str = "",
        pr_body: str = "",
    ) -> ReplacementResult | ReplacementFailure:
        """clean branch + replacement PR 생성 (expected_files 정확히 N개만 포함).

        실행 순서 (실패 시 stage 명시 + Critical 보고 대상):
          1. git checkout -b <clean_branch_name> <base_ref>
          2. git checkout <original_pr branch> -- <expected_files...>  (일괄 stage)
          3. git commit (BOT identity)
          4. git push origin <clean_branch_name>
          5. gh pr create

        BOT_GITHUB_TOKEN process-local injection. owner_pat / force / rebase / admin 금지.

        Returns:
            성공: ReplacementResult(replacement_pr, clean_sha, clean_branch, merge_queue_ready=True)
            실패: ReplacementFailure(stage, reason, extra)
        """
        token = os.environ.get(self._bot_token_env, "").strip()
        if not token:
            return ReplacementFailure(
                stage="bot_token",
                reason="bot_token_unavailable",
            )

        env = os.environ.copy()
        env["GH_TOKEN"] = token
        env["GITHUB_TOKEN"] = token
        # ★ Gemini 2차 high #1 자동 수용: BOT identity git commit env 명시 주입.
        #    user.name/user.email 미설정 CI 환경에서도 commit 가능하도록 process-local env 강제.
        env["GIT_AUTHOR_NAME"] = self._bot_git_name
        env["GIT_AUTHOR_EMAIL"] = self._bot_git_email
        env["GIT_COMMITTER_NAME"] = self._bot_git_name
        env["GIT_COMMITTER_EMAIL"] = self._bot_git_email

        expected_list = list(expected_files)
        if not expected_list:
            return ReplacementFailure(
                stage="precondition",
                reason="expected_files_empty",
            )

        # 1. base 로부터 clean branch 생성 (force/rebase 사용 X)
        branch_args = ["checkout", "-b", clean_branch_name, base_ref]
        assert_no_forbidden_git_flags(branch_args)
        cp = self._git(branch_args, env)
        if cp.returncode != 0:
            return ReplacementFailure(
                stage="branch",
                reason="git_checkout_b_failed",
                extra={"stderr": _coerce_stream(getattr(cp, "stderr", None))[:512]},
            )

        # ★ Gemini 2차 high #2 자동 수용: refs/pull/{N}/head 선행 fetch.
        #    일반 git clone 은 pull refs 자동 fetch 하지 않으므로 명시 fetch 필요.
        fetch_args = [
            "fetch", "origin",
            f"refs/pull/{original_pr}/head:refs/pull/{original_pr}/head",
        ]
        assert_no_forbidden_git_flags(fetch_args)
        cp = self._git(fetch_args, env)
        if cp.returncode != 0:
            return ReplacementFailure(
                stage="fetch",
                reason="git_fetch_pull_head_failed",
                extra={
                    "ref": f"refs/pull/{original_pr}/head",
                    "stderr": _coerce_stream(getattr(cp, "stderr", None))[:512],
                },
            )

        # 2. expected_files 만 original PR head 에서 stage (회장 §효과적 diff = expected)
        # Gemini 1차 medium: 개별 호출 N회 → 일괄 호출 1회로 최적화 (subprocess overhead 절감).
        stage_args = [
            "checkout",
            f"refs/pull/{original_pr}/head",
            "--",
            *expected_list,
        ]
        assert_no_forbidden_git_flags(stage_args)
        cp = self._git(stage_args, env)
        if cp.returncode != 0:
            return ReplacementFailure(
                stage="checkout",
                reason="git_checkout_path_failed",
                extra={
                    "paths": list(expected_list),
                    "stderr": _coerce_stream(getattr(cp, "stderr", None))[:512],
                },
            )

        # 3. commit (BOT identity — owner_pat 절대 금지)
        message = commit_message or (
            f"[task-2537] clean replacement PR for #{original_pr} "
            f"(expected_files {len(expected_list)} only)"
        )
        commit_args = ["commit", "-m", message]
        assert_no_forbidden_git_flags(commit_args)
        cp = self._git(commit_args, env)
        if cp.returncode != 0:
            return ReplacementFailure(
                stage="commit",
                reason="git_commit_failed",
                extra={"stderr": _coerce_stream(getattr(cp, "stderr", None))[:512]},
            )

        # 4. push (force/rebase 일체 금지 — assert_no_forbidden_git_flags 가 정적 차단)
        push_args = ["push", "origin", clean_branch_name]
        assert_no_forbidden_git_flags(push_args)
        cp = self._git(push_args, env)
        if cp.returncode != 0:
            return ReplacementFailure(
                stage="push",
                reason="git_push_failed",
                extra={"stderr": _coerce_stream(getattr(cp, "stderr", None))[:512]},
            )

        # 5. clean SHA 캡처 (rev-parse HEAD)
        rev_args = ["rev-parse", "HEAD"]
        assert_no_forbidden_git_flags(rev_args)
        cp = self._git(rev_args, env)
        if cp.returncode != 0:
            return ReplacementFailure(
                stage="rev_parse",
                reason="git_rev_parse_failed",
                extra={"stderr": _coerce_stream(getattr(cp, "stderr", None))[:512]},
            )
        clean_sha = _coerce_stream(getattr(cp, "stdout", None)).strip()

        # 6. gh pr create (admin / force / rebase 사용 X)
        title = pr_title or f"[task-2537] replacement for #{original_pr} (clean diff)"
        body = pr_body or (
            f"effective diff contamination 감지로 clean replacement PR 생성. "
            f"원본 PR #{original_pr} 은 OPEN 유지 (close/abort 미수행)."
        )
        gh_args = [
            "pr", "create",
            "--base", base_ref,
            "--head", clean_branch_name,
            "--title", title,
            "--body", body,
        ]
        assert_no_forbidden_git_flags(gh_args)
        cp = self._gh(gh_args, env)
        if cp.returncode != 0:
            return ReplacementFailure(
                stage="pr_create",
                reason="gh_pr_create_failed",
                extra={"stderr": _coerce_stream(getattr(cp, "stderr", None))[:512]},
            )

        # 응답 stdout 마지막 줄에 PR URL 이 포함되는 gh 관행을 따라 PR 번호 추출
        pr_number = _extract_pr_number(_coerce_stream(getattr(cp, "stdout", None)))
        if pr_number <= 0:
            return ReplacementFailure(
                stage="pr_create",
                reason="gh_pr_number_unparsable",
                extra={"stdout": _coerce_stream(getattr(cp, "stdout", None))[:512]},
            )

        # audit 박제 — replacement 성공 (executor 가 큐 재진입 시 참조)
        self._audit({
            "ts": _now_iso(),
            "kind": "replacement_pr_created",
            "original_pr": original_pr,
            "replacement_pr": pr_number,
            "clean_branch": clean_branch_name,
            "clean_sha": clean_sha,
            "decision": REPLACEMENT_PR_CREATED,
        })
        return ReplacementResult(
            replacement_pr=pr_number,
            clean_sha=clean_sha,
            clean_branch=clean_branch_name,
            merge_queue_ready=True,
        )

    # ── 4. 실패 분류 (Critical 7종 매핑) ─────────────────────────────────────
    def classify_failure(self, failure: ReplacementFailure) -> tuple[str, bool]:
        """실패 단계를 Critical 7종 코드로 매핑.

        replacement PR 시도 자체가 실패한 경우만 Critical 7종 #N 회장 보고.
        - branch / checkout / commit / push / pr_create 단계 실패
            → CRITICAL_REPLACEMENT_FAILED ("REPLACEMENT_PR_ALSO_FAILED").
        - precondition / bot_token / rev_parse 등 사전 조건 실패
            → CRITICAL_DIFF_REPLACEMENT_FAILED
              ("EFFECTIVE_DIFF_CONTAMINATION_REPLACEMENT_FAILED").

        Returns: (classification_code, is_critical_7_bool)
        """
        downstream_stages = {"branch", "checkout", "commit", "push", "pr_create"}
        if failure.stage in downstream_stages:
            return (CRITICAL_REPLACEMENT_FAILED, True)
        # 사전 조건 / token / rev-parse 실패는 contamination 정정 자체가 불가능했음
        return (CRITICAL_DIFF_REPLACEMENT_FAILED, True)

    # ── executor 인터페이스 contract ─────────────────────────────────────────
    @staticmethod
    def build_executor_contract(
        *,
        contamination: ContaminationReport,
        original_pr: int,
        expected_files: Iterable[str],
        clean_branch_name: str,
    ) -> dict[str, Any]:
        """executor `evaluate*` 응답에 포함될 contamination contract dict.

        executor 코드는 변경하지 않고 호출부 (cron / dispatcher) 에서 본 dict 를
        합쳐 사용한다. 회장 §명시 합의 키:
          - replacement_pr_required: bool
          - replacement_pr_runner_input: dict
        """
        return {
            "replacement_pr_required": contamination.contaminated,
            "replacement_pr_runner_input": {
                "original_pr": original_pr,
                "expected_files": list(expected_files),
                "clean_branch_name": clean_branch_name,
                "contamination": contamination.to_dict(),
            },
        }


def _extract_pr_number(stdout: str) -> int:
    """`gh pr create` stdout 에서 PR 번호 추출. URL 마지막 path segment.

    예: "https://github.com/Jeon-Jonghyuk/dev_workspace/pull/83" → 83.
    실패 시 0 반환.
    """
    text = (stdout or "").strip()
    if not text:
        return 0
    # 마지막 토큰만 사용 (gh 출력에 따라 여러 줄/공백 포함 가능)
    last = text.splitlines()[-1].strip().rstrip("/")
    tail = last.rsplit("/", 1)[-1]
    try:
        return int(tail)
    except ValueError:
        return 0
