"""Silent corruption guard for task done preconditions.

Three hardcoded fail-closed checks:

1. ``check_pr_merged_at`` — ``gh pr view <PR> --json mergedAt`` not null.
2. ``check_pr_merge_commit_oid`` — ``gh pr view <PR> --json mergeCommit`` oid not null.
3. ``check_origin_main_ancestry`` — race-safe fetch + 2회 교차 검증으로
   ``origin/<base>`` 가 merge commit 의 ancestor 인지 확인.

각 함수는 ``GuardResult-like`` dict 를 반환 한다::

    {"ok": bool, "reason": str, "detail": dict}

외부 호출 (subprocess, gh, git) 실패 시 항상 fail-closed (``ok=False``).
"""

from __future__ import annotations

import json
import subprocess
import time
from pathlib import Path
from typing import Optional

# Subprocess hard limits (taskctl-friendly).
_GH_TIMEOUT_SEC = 30
_GIT_TIMEOUT_SEC = 30
_FETCH_RETRY_SLEEP_SEC = 0.5

DEFAULT_GH_CMD: list[str] = ["gh"]


def _run(
    cmd: list[str],
    *,
    cwd: Optional[Path] = None,
    timeout: int = _GH_TIMEOUT_SEC,
) -> tuple[int, str, str]:
    """Run subprocess with shell=False, capture_output, timeout.

    Returns ``(returncode, stdout, stderr)``. On any exception returns
    ``(-1, "", str(exc))`` so callers can fail-closed without try/except noise.
    """
    try:
        proc = subprocess.run(
            cmd,
            cwd=str(cwd) if cwd else None,
            capture_output=True,
            text=True,
            timeout=timeout,
            shell=False,
            check=False,
        )
        return proc.returncode, proc.stdout, proc.stderr
    except (subprocess.TimeoutExpired, FileNotFoundError, OSError) as exc:
        return -1, "", f"{type(exc).__name__}: {exc}"


def _gh_view_field(
    pr_number: int,
    repo: str,
    field: str,
    *,
    gh_cmd: Optional[list[str]] = None,
    cwd: Optional[Path] = None,
) -> tuple[bool, object, str]:
    """Run ``gh pr view <PR> --repo <repo> --json mergedAt,mergeCommit -q '<expr>'``.

    Returns ``(ok, value, raw_stderr_or_msg)``. ``value`` is whatever JSON the
    ``-q`` selector emits (string for mergedAt, object/None for mergeCommit).
    """
    base = list(gh_cmd) if gh_cmd else list(DEFAULT_GH_CMD)
    cmd = base + [
        "pr",
        "view",
        str(pr_number),
        "--repo",
        repo,
        "--json",
        "mergedAt,mergeCommit",
    ]
    rc, stdout, stderr = _run(cmd, cwd=cwd, timeout=_GH_TIMEOUT_SEC)
    if rc != 0:
        return False, None, stderr.strip() or f"gh exited rc={rc}"

    try:
        payload = json.loads(stdout)
    except json.JSONDecodeError as exc:
        return False, None, f"gh JSON decode failed: {exc}"

    if field not in payload:
        return False, None, f"missing field {field}"

    return True, payload[field], ""


def check_pr_merged_at(
    pr_number: int,
    repo: str,
    *,
    gh_cmd: Optional[list[str]] = None,
    cwd: Optional[Path] = None,
) -> dict:
    """``gh pr view --json mergedAt`` not null 검증."""
    ok, value, msg = _gh_view_field(
        pr_number, repo, "mergedAt", gh_cmd=gh_cmd, cwd=cwd
    )
    if not ok:
        return {
            "ok": False,
            "reason": f"gh pr view failed: {msg}",
            "detail": {"pr_number": pr_number, "repo": repo, "field": "mergedAt"},
        }

    if value is None or value == "":
        return {
            "ok": False,
            "reason": "mergedAt is null (PR not merged)",
            "detail": {"pr_number": pr_number, "repo": repo, "mergedAt": value},
        }

    return {
        "ok": True,
        "reason": "mergedAt present",
        "detail": {"pr_number": pr_number, "repo": repo, "mergedAt": value},
    }


def check_pr_merge_commit_oid(
    pr_number: int,
    repo: str,
    *,
    gh_cmd: Optional[list[str]] = None,
    cwd: Optional[Path] = None,
) -> dict:
    """``gh pr view --json mergeCommit`` oid not null 검증."""
    ok, value, msg = _gh_view_field(
        pr_number, repo, "mergeCommit", gh_cmd=gh_cmd, cwd=cwd
    )
    if not ok:
        return {
            "ok": False,
            "reason": f"gh pr view failed: {msg}",
            "detail": {"pr_number": pr_number, "repo": repo, "field": "mergeCommit"},
        }

    if not isinstance(value, dict):
        return {
            "ok": False,
            "reason": "mergeCommit is null/not-object",
            "detail": {"pr_number": pr_number, "repo": repo, "mergeCommit": value},
        }

    oid = value.get("oid")
    if not oid or not isinstance(oid, str):
        return {
            "ok": False,
            "reason": "mergeCommit.oid missing",
            "detail": {"pr_number": pr_number, "repo": repo, "mergeCommit": value},
        }

    return {
        "ok": True,
        "reason": "mergeCommit.oid present",
        "detail": {"pr_number": pr_number, "repo": repo, "merge_commit_sha": oid},
    }


def _git_fetch_base(base_branch: str, *, cwd: Optional[Path] = None) -> tuple[bool, str]:
    """``git fetch origin --no-tags +refs/heads/<base>:refs/remotes/origin/<base>``."""
    refspec = f"+refs/heads/{base_branch}:refs/remotes/origin/{base_branch}"
    rc, _, stderr = _run(
        ["git", "fetch", "origin", "--no-tags", refspec],
        cwd=cwd,
        timeout=_GIT_TIMEOUT_SEC,
    )
    if rc != 0:
        return False, stderr.strip() or f"git fetch rc={rc}"
    return True, ""


def _git_rev_parse_remote(base_branch: str, *, cwd: Optional[Path] = None) -> Optional[str]:
    """``git rev-parse --verify origin/<base>``."""
    rc, stdout, _ = _run(
        ["git", "rev-parse", "--verify", f"refs/remotes/origin/{base_branch}"],
        cwd=cwd,
        timeout=_GIT_TIMEOUT_SEC,
    )
    if rc != 0:
        return None
    sha = stdout.strip()
    return sha or None


def _git_is_ancestor(
    merge_commit_sha: str, base_branch: str, *, cwd: Optional[Path] = None
) -> bool:
    rc, _, _ = _run(
        [
            "git",
            "merge-base",
            "--is-ancestor",
            merge_commit_sha,
            f"refs/remotes/origin/{base_branch}",
        ],
        cwd=cwd,
        timeout=_GIT_TIMEOUT_SEC,
    )
    return rc == 0


def check_origin_main_ancestry(
    merge_commit_sha: str,
    base_branch: str = "main",
    *,
    cwd: Optional[Path] = None,
) -> dict:
    """Race-safe ancestry verification (fetch + 2회 교차 검증).

    Sequence
    --------
    1. ``git fetch origin --no-tags +refs/heads/<base>:refs/remotes/origin/<base>``
    2. 1차 ``git rev-parse origin/<base>`` -> ``sha_a``
    3. ``time.sleep(0.5)``
    4. 2차 ``git rev-parse origin/<base>`` -> ``sha_b``
    5. ``sha_a == sha_b`` -> ``git merge-base --is-ancestor`` 통과 검증
       그렇지 않으면 1회 fetch+재조회 후 그래도 불일치면 fail-closed.
    """
    if not isinstance(merge_commit_sha, str) or not merge_commit_sha.strip():
        return {
            "ok": False,
            "reason": "merge_commit_sha empty",
            "detail": {"merge_commit_sha": merge_commit_sha, "base_branch": base_branch},
        }

    fetched, msg = _git_fetch_base(base_branch, cwd=cwd)
    if not fetched:
        return {
            "ok": False,
            "reason": f"git fetch failed: {msg}",
            "detail": {"merge_commit_sha": merge_commit_sha, "base_branch": base_branch},
        }

    sha_a = _git_rev_parse_remote(base_branch, cwd=cwd)
    if not sha_a:
        return {
            "ok": False,
            "reason": "rev-parse origin/<base> failed (1st)",
            "detail": {"merge_commit_sha": merge_commit_sha, "base_branch": base_branch},
        }

    time.sleep(_FETCH_RETRY_SLEEP_SEC)

    sha_b = _git_rev_parse_remote(base_branch, cwd=cwd)
    if not sha_b:
        return {
            "ok": False,
            "reason": "rev-parse origin/<base> failed (2nd)",
            "detail": {"merge_commit_sha": merge_commit_sha, "base_branch": base_branch},
        }

    if sha_a != sha_b:
        # 1회 재시도: fetch + 재조회.
        _git_fetch_base(base_branch, cwd=cwd)
        sha_c = _git_rev_parse_remote(base_branch, cwd=cwd)
        time.sleep(_FETCH_RETRY_SLEEP_SEC)
        sha_d = _git_rev_parse_remote(base_branch, cwd=cwd)
        if not sha_c or not sha_d or sha_c != sha_d:
            return {
                "ok": False,
                "reason": "origin/<base> SHA unstable across 2 fetches (race detected)",
                "detail": {
                    "merge_commit_sha": merge_commit_sha,
                    "base_branch": base_branch,
                    "sha_a": sha_a,
                    "sha_b": sha_b,
                    "sha_c": sha_c,
                    "sha_d": sha_d,
                },
            }
        sha_a = sha_c

    if not _git_is_ancestor(merge_commit_sha, base_branch, cwd=cwd):
        return {
            "ok": False,
            "reason": "merge_commit not ancestor of origin/<base>",
            "detail": {
                "merge_commit_sha": merge_commit_sha,
                "base_branch": base_branch,
                "origin_sha": sha_a,
            },
        }

    return {
        "ok": True,
        "reason": "ancestry verified",
        "detail": {
            "merge_commit_sha": merge_commit_sha,
            "base_branch": base_branch,
            "origin_sha": sha_a,
        },
    }


def verify_done_preconditions(
    pr_number: int,
    repo: str,
    *,
    base_branch: str = "main",
    gh_cmd: Optional[list[str]] = None,
    cwd: Optional[Path] = None,
    task_id: Optional[str] = None,
    events_dir: Optional[str] = None,
) -> dict:
    """3 check 통합. 하나라도 FAIL 시 ``ok=False``.

    Returns
    -------
    dict
        ``{"ok": bool, "reason": str, "detail": {...}}``.

        성공 시 ``detail`` 은 ``merge_commit_sha``, ``merged_at``, ``checks`` 키를 포함.
        실패 시 어느 check 가 실패 했는지 ``detail["failed_check"]`` 에 명시.

    Notes
    -----
    task_id / events_dir 가 None 이면 task-2471+1 F2 신규 검사 스킵 (후방 호환).
    """
    checks: dict[str, dict] = {}

    merged_at_result = check_pr_merged_at(pr_number, repo, gh_cmd=gh_cmd, cwd=cwd)
    checks["merged_at"] = merged_at_result
    if not merged_at_result["ok"]:
        return {
            "ok": False,
            "reason": f"merged_at check failed: {merged_at_result['reason']}",
            "detail": {
                "failed_check": "merged_at",
                "pr_number": pr_number,
                "repo": repo,
                "checks": checks,
            },
        }

    oid_result = check_pr_merge_commit_oid(pr_number, repo, gh_cmd=gh_cmd, cwd=cwd)
    checks["merge_commit_oid"] = oid_result
    if not oid_result["ok"]:
        return {
            "ok": False,
            "reason": f"merge_commit_oid check failed: {oid_result['reason']}",
            "detail": {
                "failed_check": "merge_commit_oid",
                "pr_number": pr_number,
                "repo": repo,
                "checks": checks,
            },
        }

    merge_commit_sha = oid_result["detail"]["merge_commit_sha"]
    merged_at = merged_at_result["detail"]["mergedAt"]

    ancestry_result = check_origin_main_ancestry(
        merge_commit_sha, base_branch=base_branch, cwd=cwd
    )
    checks["ancestry"] = ancestry_result
    if not ancestry_result["ok"]:
        return {
            "ok": False,
            "reason": f"ancestry check failed: {ancestry_result['reason']}",
            "detail": {
                "failed_check": "ancestry",
                "pr_number": pr_number,
                "repo": repo,
                "merge_commit_sha": merge_commit_sha,
                "merged_at": merged_at,
                "base_branch": base_branch,
                "checks": checks,
            },
        }

    # task-2471+1 F2: done/escalated conflict + empty marker 검사
    # task_id가 None이면 스킵 (기존 caller 후방 호환)
    if task_id:
        conflict_result = check_done_escalated_conflict(task_id, events_dir=events_dir)
        checks["done_escalated_conflict"] = conflict_result
        if not conflict_result["ok"]:
            return {
                "ok": False,
                "reason": f"done_escalated_conflict failed: {conflict_result['reason']}",
                "detail": {
                    "failed_check": "done_escalated_conflict",
                    "pr_number": pr_number,
                    "repo": repo,
                    "merge_commit_sha": merge_commit_sha,
                    "merged_at": merged_at,
                    "base_branch": base_branch,
                    "checks": checks,
                },
            }
        marker_result = check_escalation_marker_payload(task_id, events_dir=events_dir)
        checks["escalation_marker_payload"] = marker_result
        if not marker_result["ok"]:
            return {
                "ok": False,
                "reason": f"escalation_marker_payload failed: {marker_result['reason']}",
                "detail": {
                    "failed_check": "escalation_marker_payload",
                    "pr_number": pr_number,
                    "repo": repo,
                    "merge_commit_sha": merge_commit_sha,
                    "merged_at": merged_at,
                    "base_branch": base_branch,
                    "checks": checks,
                },
            }

    return {
        "ok": True,
        "reason": "all 3 silent_corruption checks passed",
        "detail": {
            "merge_commit_sha": merge_commit_sha,
            "merged_at": merged_at,
            "pr_number": pr_number,
            "repo": repo,
            "base_branch": base_branch,
            "checks": checks,
        },
    }


# ---------------------------------------------------------------------------
# task-2471+1 F2: .done + .done.escalated conflict + empty marker checks
# ---------------------------------------------------------------------------


def check_done_escalated_conflict(
    task_id: str,
    *,
    events_dir: Optional[str] = None,
) -> dict:
    """`.done` + `.done.escalated` 동시 존재 reject (silent corruption pattern).

    Returns {"ok": bool, "reason": str, "detail": dict}.
    """
    if not events_dir:
        events_dir = "/home/jay/workspace/memory/events"
    done_path = Path(events_dir) / f"{task_id}.done"
    esc_path = Path(events_dir) / f"{task_id}.done.escalated"
    done_exists = done_path.exists()
    esc_exists = esc_path.exists()
    detail: dict = {
        "task_id": task_id,
        "done_path": str(done_path),
        "escalated_path": str(esc_path),
        "done_exists": done_exists,
        "escalated_exists": esc_exists,
    }
    if done_exists and esc_exists:
        return {
            "ok": False,
            "reason": (
                f".done and .done.escalated coexist for {task_id} — silent corruption: "
                "either rollback escalation (resolve + archive) or block .done emission"
            ),
            "detail": detail,
        }
    return {"ok": True, "reason": "no done/escalated conflict", "detail": detail}


def check_escalation_marker_payload(
    task_id: str,
    *,
    events_dir: Optional[str] = None,
) -> dict:
    """`.done.escalated` 가 존재한다면 0 byte 또는 JSON 미파싱 시 reject.

    빈 marker(`os.close(fd)` 직후 발행) 결함 차단. 사유 박제 강제.
    """
    if not events_dir:
        events_dir = "/home/jay/workspace/memory/events"
    esc_path = Path(events_dir) / f"{task_id}.done.escalated"
    detail: dict = {"task_id": task_id, "escalated_path": str(esc_path)}
    if not esc_path.exists():
        return {"ok": True, "reason": "no escalation marker", "detail": detail}
    try:
        size = esc_path.stat().st_size
    except OSError as exc:
        return {"ok": False, "reason": f"stat failed: {exc}", "detail": detail}
    detail["size"] = size
    if size == 0:
        return {
            "ok": False,
            "reason": f"empty escalation marker (0 bytes) for {task_id} — reason payload missing (defect)",
            "detail": detail,
        }
    try:
        text = esc_path.read_text(encoding="utf-8")
        payload = json.loads(text)
    except (OSError, json.JSONDecodeError) as exc:
        return {
            "ok": False,
            "reason": f"escalation marker payload not valid JSON: {exc}",
            "detail": detail,
        }
    detail["payload_keys"] = sorted(payload.keys()) if isinstance(payload, dict) else []
    if not isinstance(payload, dict) or "trigger" not in payload or "reason" not in payload:
        return {
            "ok": False,
            "reason": "escalation marker missing required keys: trigger, reason",
            "detail": detail,
        }
    return {"ok": True, "reason": "escalation marker payload OK", "detail": detail}
