"""Silent corruption guard for task done preconditions.

Three hardcoded fail-closed checks:

1. ``check_pr_merged_at`` — ``gh pr view <PR> --json mergedAt`` not null.
2. ``check_pr_merge_commit_oid`` — ``gh pr view <PR> --json mergeCommit`` oid not null.
3. ``check_origin_main_ancestry`` — race-safe fetch + 2회 교차 검증으로
   ``origin/<base>`` 가 merge commit 의 ancestor 인지 확인.

각 함수는 ``GuardResult-like`` dict 를 반환 한다::

    {"ok": bool, "reason": str, "detail": dict}

외부 호출 (subprocess, gh, git) 실패 시 항상 fail-closed (``ok=False``).
"""

from __future__ import annotations

import json
import subprocess
import time
from pathlib import Path
from typing import Optional

# Subprocess hard limits (taskctl-friendly).
_GH_TIMEOUT_SEC = 30
_GIT_TIMEOUT_SEC = 30
_FETCH_RETRY_SLEEP_SEC = 0.5

DEFAULT_GH_CMD: list[str] = ["gh"]


def _run(
    cmd: list[str],
    *,
    cwd: Optional[Path] = None,
    timeout: int = _GH_TIMEOUT_SEC,
) -> tuple[int, str, str]:
    """Run subprocess with shell=False, capture_output, timeout.

    Returns ``(returncode, stdout, stderr)``. On any exception returns
    ``(-1, "", str(exc))`` so callers can fail-closed without try/except noise.
    """
    try:
        proc = subprocess.run(
            cmd,
            cwd=str(cwd) if cwd else None,
            capture_output=True,
            text=True,
            timeout=timeout,
            shell=False,
            check=False,
        )
        return proc.returncode, proc.stdout, proc.stderr
    except (subprocess.TimeoutExpired, FileNotFoundError, OSError) as exc:
        return -1, "", f"{type(exc).__name__}: {exc}"


def _gh_view_field(
    pr_number: int,
    repo: str,
    field: str,
    *,
    gh_cmd: Optional[list[str]] = None,
    cwd: Optional[Path] = None,
) -> tuple[bool, object, str]:
    """Run ``gh pr view <PR> --repo <repo> --json mergedAt,mergeCommit -q '<expr>'``.

    Returns ``(ok, value, raw_stderr_or_msg)``. ``value`` is whatever JSON the
    ``-q`` selector emits (string for mergedAt, object/None for mergeCommit).
    """
    base = list(gh_cmd) if gh_cmd else list(DEFAULT_GH_CMD)
    cmd = base + [
        "pr",
        "view",
        str(pr_number),
        "--repo",
        repo,
        "--json",
        "mergedAt,mergeCommit",
    ]
    rc, stdout, stderr = _run(cmd, cwd=cwd, timeout=_GH_TIMEOUT_SEC)
    if rc != 0:
        return False, None, stderr.strip() or f"gh exited rc={rc}"

    try:
        payload = json.loads(stdout)
    except json.JSONDecodeError as exc:
        return False, None, f"gh JSON decode failed: {exc}"

    if field not in payload:
        return False, None, f"missing field {field}"

    return True, payload[field], ""


def check_pr_merged_at(
    pr_number: int,
    repo: str,
    *,
    gh_cmd: Optional[list[str]] = None,
    cwd: Optional[Path] = None,
) -> dict:
    """``gh pr view --json mergedAt`` not null 검증."""
    ok, value, msg = _gh_view_field(
        pr_number, repo, "mergedAt", gh_cmd=gh_cmd, cwd=cwd
    )
    if not ok:
        return {
            "ok": False,
            "reason": f"gh pr view failed: {msg}",
            "detail": {"pr_number": pr_number, "repo": repo, "field": "mergedAt"},
        }

    if value is None or value == "":
        return {
            "ok": False,
            "reason": "mergedAt is null (PR not merged)",
            "detail": {"pr_number": pr_number, "repo": repo, "mergedAt": value},
        }

    return {
        "ok": True,
        "reason": "mergedAt present",
        "detail": {"pr_number": pr_number, "repo": repo, "mergedAt": value},
    }


def check_pr_merge_commit_oid(
    pr_number: int,
    repo: str,
    *,
    gh_cmd: Optional[list[str]] = None,
    cwd: Optional[Path] = None,
) -> dict:
    """``gh pr view --json mergeCommit`` oid not null 검증."""
    ok, value, msg = _gh_view_field(
        pr_number, repo, "mergeCommit", gh_cmd=gh_cmd, cwd=cwd
    )
    if not ok:
        return {
            "ok": False,
            "reason": f"gh pr view failed: {msg}",
            "detail": {"pr_number": pr_number, "repo": repo, "field": "mergeCommit"},
        }

    if not isinstance(value, dict):
        return {
            "ok": False,
            "reason": "mergeCommit is null/not-object",
            "detail": {"pr_number": pr_number, "repo": repo, "mergeCommit": value},
        }

    oid = value.get("oid")
    if not oid or not isinstance(oid, str):
        return {
            "ok": False,
            "reason": "mergeCommit.oid missing",
            "detail": {"pr_number": pr_number, "repo": repo, "mergeCommit": value},
        }

    return {
        "ok": True,
        "reason": "mergeCommit.oid present",
        "detail": {"pr_number": pr_number, "repo": repo, "merge_commit_sha": oid},
    }


def _git_fetch_base(base_branch: str, *, cwd: Optional[Path] = None) -> tuple[bool, str]:
    """``git fetch origin --no-tags +refs/heads/<base>:refs/remotes/origin/<base>``."""
    refspec = f"+refs/heads/{base_branch}:refs/remotes/origin/{base_branch}"
    rc, _, stderr = _run(
        ["git", "fetch", "origin", "--no-tags", refspec],
        cwd=cwd,
        timeout=_GIT_TIMEOUT_SEC,
    )
    if rc != 0:
        return False, stderr.strip() or f"git fetch rc={rc}"
    return True, ""


def _git_rev_parse_remote(base_branch: str, *, cwd: Optional[Path] = None) -> Optional[str]:
    """``git rev-parse --verify origin/<base>``."""
    rc, stdout, _ = _run(
        ["git", "rev-parse", "--verify", f"refs/remotes/origin/{base_branch}"],
        cwd=cwd,
        timeout=_GIT_TIMEOUT_SEC,
    )
    if rc != 0:
        return None
    sha = stdout.strip()
    return sha or None


def _git_is_ancestor(
    merge_commit_sha: str, base_branch: str, *, cwd: Optional[Path] = None
) -> bool:
    rc, _, _ = _run(
        [
            "git",
            "merge-base",
            "--is-ancestor",
            merge_commit_sha,
            f"refs/remotes/origin/{base_branch}",
        ],
        cwd=cwd,
        timeout=_GIT_TIMEOUT_SEC,
    )
    return rc == 0


def check_origin_main_ancestry(
    merge_commit_sha: str,
    base_branch: str = "main",
    *,
    cwd: Optional[Path] = None,
) -> dict:
    """Race-safe ancestry verification (fetch + 2회 교차 검증).

    Sequence
    --------
    1. ``git fetch origin --no-tags +refs/heads/<base>:refs/remotes/origin/<base>``
    2. 1차 ``git rev-parse origin/<base>`` -> ``sha_a``
    3. ``time.sleep(0.5)``
    4. 2차 ``git rev-parse origin/<base>`` -> ``sha_b``
    5. ``sha_a == sha_b`` -> ``git merge-base --is-ancestor`` 통과 검증
       그렇지 않으면 1회 fetch+재조회 후 그래도 불일치면 fail-closed.
    """
    if not isinstance(merge_commit_sha, str) or not merge_commit_sha.strip():
        return {
            "ok": False,
            "reason": "merge_commit_sha empty",
            "detail": {"merge_commit_sha": merge_commit_sha, "base_branch": base_branch},
        }

    fetched, msg = _git_fetch_base(base_branch, cwd=cwd)
    if not fetched:
        return {
            "ok": False,
            "reason": f"git fetch failed: {msg}",
            "detail": {"merge_commit_sha": merge_commit_sha, "base_branch": base_branch},
        }

    sha_a = _git_rev_parse_remote(base_branch, cwd=cwd)
    if not sha_a:
        return {
            "ok": False,
            "reason": "rev-parse origin/<base> failed (1st)",
            "detail": {"merge_commit_sha": merge_commit_sha, "base_branch": base_branch},
        }

    time.sleep(_FETCH_RETRY_SLEEP_SEC)

    sha_b = _git_rev_parse_remote(base_branch, cwd=cwd)
    if not sha_b:
        return {
            "ok": False,
            "reason": "rev-parse origin/<base> failed (2nd)",
            "detail": {"merge_commit_sha": merge_commit_sha, "base_branch": base_branch},
        }

    if sha_a != sha_b:
        # 1회 재시도: fetch + 재조회.
        _git_fetch_base(base_branch, cwd=cwd)
        sha_c = _git_rev_parse_remote(base_branch, cwd=cwd)
        time.sleep(_FETCH_RETRY_SLEEP_SEC)
        sha_d = _git_rev_parse_remote(base_branch, cwd=cwd)
        if not sha_c or not sha_d or sha_c != sha_d:
            return {
                "ok": False,
                "reason": "origin/<base> SHA unstable across 2 fetches (race detected)",
                "detail": {
                    "merge_commit_sha": merge_commit_sha,
                    "base_branch": base_branch,
                    "sha_a": sha_a,
                    "sha_b": sha_b,
                    "sha_c": sha_c,
                    "sha_d": sha_d,
                },
            }
        sha_a = sha_c

    if not _git_is_ancestor(merge_commit_sha, base_branch, cwd=cwd):
        return {
            "ok": False,
            "reason": "merge_commit not ancestor of origin/<base>",
            "detail": {
                "merge_commit_sha": merge_commit_sha,
                "base_branch": base_branch,
                "origin_sha": sha_a,
            },
        }

    return {
        "ok": True,
        "reason": "ancestry verified",
        "detail": {
            "merge_commit_sha": merge_commit_sha,
            "base_branch": base_branch,
            "origin_sha": sha_a,
        },
    }


def verify_done_preconditions(
    pr_number: int,
    repo: str,
    *,
    base_branch: str = "main",
    gh_cmd: Optional[list[str]] = None,
    cwd: Optional[Path] = None,
) -> dict:
    """3 check 통합. 하나라도 FAIL 시 ``ok=False``.

    Returns
    -------
    dict
        ``{"ok": bool, "reason": str, "detail": {...}}``.

        성공 시 ``detail`` 은 ``merge_commit_sha``, ``merged_at``, ``checks`` 키를 포함.
        실패 시 어느 check 가 실패 했는지 ``detail["failed_check"]`` 에 명시.
    """
    checks: dict[str, dict] = {}

    merged_at_result = check_pr_merged_at(pr_number, repo, gh_cmd=gh_cmd, cwd=cwd)
    checks["merged_at"] = merged_at_result
    if not merged_at_result["ok"]:
        return {
            "ok": False,
            "reason": f"merged_at check failed: {merged_at_result['reason']}",
            "detail": {
                "failed_check": "merged_at",
                "pr_number": pr_number,
                "repo": repo,
                "checks": checks,
            },
        }

    oid_result = check_pr_merge_commit_oid(pr_number, repo, gh_cmd=gh_cmd, cwd=cwd)
    checks["merge_commit_oid"] = oid_result
    if not oid_result["ok"]:
        return {
            "ok": False,
            "reason": f"merge_commit_oid check failed: {oid_result['reason']}",
            "detail": {
                "failed_check": "merge_commit_oid",
                "pr_number": pr_number,
                "repo": repo,
                "checks": checks,
            },
        }

    merge_commit_sha = oid_result["detail"]["merge_commit_sha"]
    merged_at = merged_at_result["detail"]["mergedAt"]

    ancestry_result = check_origin_main_ancestry(
        merge_commit_sha, base_branch=base_branch, cwd=cwd
    )
    checks["ancestry"] = ancestry_result
    if not ancestry_result["ok"]:
        return {
            "ok": False,
            "reason": f"ancestry check failed: {ancestry_result['reason']}",
            "detail": {
                "failed_check": "ancestry",
                "pr_number": pr_number,
                "repo": repo,
                "merge_commit_sha": merge_commit_sha,
                "merged_at": merged_at,
                "base_branch": base_branch,
                "checks": checks,
            },
        }

    return {
        "ok": True,
        "reason": "all 3 silent_corruption checks passed",
        "detail": {
            "merge_commit_sha": merge_commit_sha,
            "merged_at": merged_at,
            "pr_number": pr_number,
            "repo": repo,
            "base_branch": base_branch,
            "checks": checks,
        },
    }
