"""Silent corruption guard for task done preconditions.

task-2472 보강: 기존 3 check에 3개 추가 → 총 6 check.

기존 3 check:
1. ``check_pr_merged_at`` — ``gh pr view <PR> --json mergedAt`` not null.
2. ``check_pr_merge_commit_oid`` — ``gh pr view <PR> --json mergeCommit`` oid not null.
3. ``check_origin_main_ancestry`` — race-safe fetch + 2회 교차 검증.

task-2472 신규 3 check:
4. ``check_done_escalated_coexistence`` — .done + .done.escalated 동시 존재 차단.
5. ``check_escalated_payload`` — .done.escalated 0-byte/비-JSON fail-closed.
6. ``check_state_file_present`` — .tasks/state/{task_id}.json 존재 + checksum 검증.

각 함수는 ``GuardResult-like`` dict 를 반환 한다::

    {"ok": bool, "reason": str, "detail": dict}

외부 호출 (subprocess, gh, git) 실패 시 항상 fail-closed (``ok=False``).
"""

from __future__ import annotations

import hashlib
import json
import os
import subprocess
import time
from pathlib import Path
from typing import Optional

# Subprocess hard limits (taskctl-friendly).
_GH_TIMEOUT_SEC = 30
_GIT_TIMEOUT_SEC = 30
_FETCH_RETRY_SLEEP_SEC = 0.5

DEFAULT_GH_CMD: list[str] = ["gh"]


def _run(
    cmd: list[str],
    *,
    cwd: Optional[Path] = None,
    timeout: int = _GH_TIMEOUT_SEC,
) -> tuple[int, str, str]:
    """Run subprocess with shell=False, capture_output, timeout.

    Returns ``(returncode, stdout, stderr)``. On any exception returns
    ``(-1, "", str(exc))`` so callers can fail-closed without try/except noise.
    """
    try:
        proc = subprocess.run(
            cmd,
            cwd=str(cwd) if cwd else None,
            capture_output=True,
            text=True,
            timeout=timeout,
            shell=False,
            check=False,
        )
        return proc.returncode, proc.stdout, proc.stderr
    except (subprocess.TimeoutExpired, FileNotFoundError, OSError) as exc:
        return -1, "", f"{type(exc).__name__}: {exc}"


def _gh_view_field(
    pr_number: int,
    repo: str,
    field: str,
    *,
    gh_cmd: Optional[list[str]] = None,
    cwd: Optional[Path] = None,
) -> tuple[bool, object, str]:
    """Run ``gh pr view <PR> --repo <repo> --json mergedAt,mergeCommit -q '<expr>'``.

    Returns ``(ok, value, raw_stderr_or_msg)``. ``value`` is whatever JSON the
    ``-q`` selector emits (string for mergedAt, object/None for mergeCommit).
    """
    base = list(gh_cmd) if gh_cmd else list(DEFAULT_GH_CMD)
    cmd = base + [
        "pr",
        "view",
        str(pr_number),
        "--repo",
        repo,
        "--json",
        "mergedAt,mergeCommit",
    ]
    rc, stdout, stderr = _run(cmd, cwd=cwd, timeout=_GH_TIMEOUT_SEC)
    if rc != 0:
        return False, None, stderr.strip() or f"gh exited rc={rc}"

    try:
        payload = json.loads(stdout)
    except json.JSONDecodeError as exc:
        return False, None, f"gh JSON decode failed: {exc}"

    if field not in payload:
        return False, None, f"missing field {field}"

    return True, payload[field], ""


def check_pr_merged_at(
    pr_number: int,
    repo: str,
    *,
    gh_cmd: Optional[list[str]] = None,
    cwd: Optional[Path] = None,
) -> dict:
    """``gh pr view --json mergedAt`` not null 검증."""
    ok, value, msg = _gh_view_field(
        pr_number, repo, "mergedAt", gh_cmd=gh_cmd, cwd=cwd
    )
    if not ok:
        return {
            "ok": False,
            "reason": f"gh pr view failed: {msg}",
            "detail": {"pr_number": pr_number, "repo": repo, "field": "mergedAt"},
        }

    if value is None or value == "":
        return {
            "ok": False,
            "reason": "mergedAt is null (PR not merged)",
            "detail": {"pr_number": pr_number, "repo": repo, "mergedAt": value},
        }

    return {
        "ok": True,
        "reason": "mergedAt present",
        "detail": {"pr_number": pr_number, "repo": repo, "mergedAt": value},
    }


def check_pr_merge_commit_oid(
    pr_number: int,
    repo: str,
    *,
    gh_cmd: Optional[list[str]] = None,
    cwd: Optional[Path] = None,
) -> dict:
    """``gh pr view --json mergeCommit`` oid not null 검증."""
    ok, value, msg = _gh_view_field(
        pr_number, repo, "mergeCommit", gh_cmd=gh_cmd, cwd=cwd
    )
    if not ok:
        return {
            "ok": False,
            "reason": f"gh pr view failed: {msg}",
            "detail": {"pr_number": pr_number, "repo": repo, "field": "mergeCommit"},
        }

    if not isinstance(value, dict):
        return {
            "ok": False,
            "reason": "mergeCommit is null/not-object",
            "detail": {"pr_number": pr_number, "repo": repo, "mergeCommit": value},
        }

    oid = value.get("oid")
    if not oid or not isinstance(oid, str):
        return {
            "ok": False,
            "reason": "mergeCommit.oid missing",
            "detail": {"pr_number": pr_number, "repo": repo, "mergeCommit": value},
        }

    return {
        "ok": True,
        "reason": "mergeCommit.oid present",
        "detail": {"pr_number": pr_number, "repo": repo, "merge_commit_sha": oid},
    }


def _git_fetch_base(base_branch: str, *, cwd: Optional[Path] = None) -> tuple[bool, str]:
    """``git fetch origin --no-tags +refs/heads/<base>:refs/remotes/origin/<base>``."""
    refspec = f"+refs/heads/{base_branch}:refs/remotes/origin/{base_branch}"
    rc, _, stderr = _run(
        ["git", "fetch", "origin", "--no-tags", refspec],
        cwd=cwd,
        timeout=_GIT_TIMEOUT_SEC,
    )
    if rc != 0:
        return False, stderr.strip() or f"git fetch rc={rc}"
    return True, ""


def _git_rev_parse_remote(base_branch: str, *, cwd: Optional[Path] = None) -> Optional[str]:
    """``git rev-parse --verify origin/<base>``."""
    rc, stdout, _ = _run(
        ["git", "rev-parse", "--verify", f"refs/remotes/origin/{base_branch}"],
        cwd=cwd,
        timeout=_GIT_TIMEOUT_SEC,
    )
    if rc != 0:
        return None
    sha = stdout.strip()
    return sha or None


def _git_is_ancestor(
    merge_commit_sha: str, base_branch: str, *, cwd: Optional[Path] = None
) -> bool:
    rc, _, _ = _run(
        [
            "git",
            "merge-base",
            "--is-ancestor",
            merge_commit_sha,
            f"refs/remotes/origin/{base_branch}",
        ],
        cwd=cwd,
        timeout=_GIT_TIMEOUT_SEC,
    )
    return rc == 0


def check_origin_main_ancestry(
    merge_commit_sha: str,
    base_branch: str = "main",
    *,
    cwd: Optional[Path] = None,
) -> dict:
    """Race-safe ancestry verification (fetch + 2회 교차 검증).

    Sequence
    --------
    1. ``git fetch origin --no-tags +refs/heads/<base>:refs/remotes/origin/<base>``
    2. 1차 ``git rev-parse origin/<base>`` -> ``sha_a``
    3. ``time.sleep(0.5)``
    4. 2차 ``git rev-parse origin/<base>`` -> ``sha_b``
    5. ``sha_a == sha_b`` -> ``git merge-base --is-ancestor`` 통과 검증
       그렇지 않으면 1회 fetch+재조회 후 그래도 불일치면 fail-closed.
    """
    if not isinstance(merge_commit_sha, str) or not merge_commit_sha.strip():
        return {
            "ok": False,
            "reason": "merge_commit_sha empty",
            "detail": {"merge_commit_sha": merge_commit_sha, "base_branch": base_branch},
        }

    fetched, msg = _git_fetch_base(base_branch, cwd=cwd)
    if not fetched:
        return {
            "ok": False,
            "reason": f"git fetch failed: {msg}",
            "detail": {"merge_commit_sha": merge_commit_sha, "base_branch": base_branch},
        }

    sha_a = _git_rev_parse_remote(base_branch, cwd=cwd)
    if not sha_a:
        return {
            "ok": False,
            "reason": "rev-parse origin/<base> failed (1st)",
            "detail": {"merge_commit_sha": merge_commit_sha, "base_branch": base_branch},
        }

    time.sleep(_FETCH_RETRY_SLEEP_SEC)

    sha_b = _git_rev_parse_remote(base_branch, cwd=cwd)
    if not sha_b:
        return {
            "ok": False,
            "reason": "rev-parse origin/<base> failed (2nd)",
            "detail": {"merge_commit_sha": merge_commit_sha, "base_branch": base_branch},
        }

    if sha_a != sha_b:
        # 1회 재시도: fetch + 재조회.
        _git_fetch_base(base_branch, cwd=cwd)
        sha_c = _git_rev_parse_remote(base_branch, cwd=cwd)
        time.sleep(_FETCH_RETRY_SLEEP_SEC)
        sha_d = _git_rev_parse_remote(base_branch, cwd=cwd)
        if not sha_c or not sha_d or sha_c != sha_d:
            return {
                "ok": False,
                "reason": "origin/<base> SHA unstable across 2 fetches (race detected)",
                "detail": {
                    "merge_commit_sha": merge_commit_sha,
                    "base_branch": base_branch,
                    "sha_a": sha_a,
                    "sha_b": sha_b,
                    "sha_c": sha_c,
                    "sha_d": sha_d,
                },
            }
        sha_a = sha_c

    if not _git_is_ancestor(merge_commit_sha, base_branch, cwd=cwd):
        return {
            "ok": False,
            "reason": "merge_commit not ancestor of origin/<base>",
            "detail": {
                "merge_commit_sha": merge_commit_sha,
                "base_branch": base_branch,
                "origin_sha": sha_a,
            },
        }

    return {
        "ok": True,
        "reason": "ancestry verified",
        "detail": {
            "merge_commit_sha": merge_commit_sha,
            "base_branch": base_branch,
            "origin_sha": sha_a,
        },
    }


# ---------------------------------------------------------------------------
# task-2472 신규 3 check (추가 보강)
# ---------------------------------------------------------------------------

# memory/events 기본 디렉토리 (workspace root 상대)
_EVENTS_DIR_REL = "memory/events"

# .tasks/state 기본 디렉토리 (workspace root 상대)
_STATE_DIR_REL = ".tasks/state"

# .done.escalated 필수 JSON 필드
_ESCALATED_REQUIRED_FIELDS = frozenset(
    {"reason", "ts", "task_id", "source", "blocking_condition", "evidence_path"}
)


def _workspace_root_path(workspace: Optional[Path] = None) -> Path:
    """workspace root Path 반환."""
    return workspace or Path(os.environ.get("WORKSPACE_ROOT", "/home/jay/workspace"))


def check_done_escalated_coexistence(
    task_id: str,
    *,
    workspace: Optional[Path] = None,
) -> dict:
    """.done + .done.escalated 동시 존재 시 fail.

    task-2472 추가 11: DONE 인정 금지 조건.
    두 마커가 동시 존재하면 silent corruption 가능성 → fail-closed.

    Returns
    -------
    dict
        {"ok": bool, "reason": str, "detail": {"done_exists": bool, "escalated_exists": bool}}
    """
    work_root = _workspace_root_path(workspace)
    events_dir = work_root / _EVENTS_DIR_REL

    done_path = events_dir / f"{task_id}.done"
    escalated_path = events_dir / f"{task_id}.done.escalated"

    done_exists = done_path.exists()
    escalated_exists = escalated_path.exists()

    detail = {
        "done_exists": done_exists,
        "escalated_exists": escalated_exists,
        "done_path": str(done_path),
        "escalated_path": str(escalated_path),
    }

    if done_exists and escalated_exists:
        return {
            "ok": False,
            "reason": (
                f".done + .done.escalated 동시 존재 → DONE 인정 불가 (silent corruption 의심). "
                f"task_id={task_id}"
            ),
            "detail": detail,
        }

    return {
        "ok": True,
        "reason": ".done + .done.escalated 동시 존재 없음 — 정상",
        "detail": detail,
    }


def check_escalated_payload(escalated_path: Path) -> dict:
    """.done.escalated 파일이 0-byte이거나 비-JSON이면 fail-closed.

    task-2472 추가 10: 0-byte escalation marker 차단.
    필수 JSON 필드: reason, ts, task_id, source, blocking_condition, evidence_path

    Returns
    -------
    dict
        {"ok": bool, "reason": str, "detail": {"size": N, "json_valid": bool, "missing_fields": [...]}}
    """
    detail: dict = {"size": 0, "json_valid": False, "missing_fields": []}

    if not escalated_path.exists():
        return {
            "ok": False,
            "reason": f".done.escalated 파일 없음: {escalated_path}",
            "detail": detail,
        }

    try:
        size = escalated_path.stat().st_size
    except Exception as exc:
        return {
            "ok": False,
            "reason": f"stat 실패: {exc}",
            "detail": detail,
        }

    detail["size"] = size

    if size == 0:
        return {
            "ok": False,
            "reason": f".done.escalated 파일이 0-byte → fail-closed (raw shell emit 의심). path={escalated_path}",
            "detail": detail,
        }

    try:
        content = escalated_path.read_text(encoding="utf-8")
        payload = json.loads(content)
        detail["json_valid"] = True
    except json.JSONDecodeError as exc:
        return {
            "ok": False,
            "reason": f".done.escalated 비-JSON → fail-closed: {exc}",
            "detail": detail,
        }
    except Exception as exc:
        return {
            "ok": False,
            "reason": f".done.escalated 읽기 실패: {exc}",
            "detail": detail,
        }

    # 필수 필드 검증
    missing = sorted(_ESCALATED_REQUIRED_FIELDS - set(payload.keys()))
    detail["missing_fields"] = missing

    if missing:
        return {
            "ok": False,
            "reason": f".done.escalated 필수 필드 누락: {missing}",
            "detail": detail,
        }

    return {
        "ok": True,
        "reason": ".done.escalated payload 검증 통과 (JSON valid, 필수 필드 OK)",
        "detail": detail,
    }


def check_state_file_present(
    task_id: str,
    *,
    workspace: Optional[Path] = None,
) -> dict:
    """.tasks/state/{task_id}.json 존재 + JSON parse + _checksum field 검증.

    task-2472 추가 12: state file missing 상태에서 done/merge 차단.

    Returns
    -------
    dict
        {"ok": bool, "reason": str, "detail": {...}}
    """
    work_root = _workspace_root_path(workspace)
    state_path = work_root / _STATE_DIR_REL / f"{task_id}.json"

    detail: dict = {
        "state_path": str(state_path),
        "exists": False,
        "json_valid": False,
        "checksum_present": False,
        "checksum_match": False,
    }

    if not state_path.exists():
        return {
            "ok": False,
            "reason": f"state 파일 없음: {state_path} → done/merge 차단 (fail-closed)",
            "detail": detail,
        }

    detail["exists"] = True

    try:
        content = state_path.read_text(encoding="utf-8")
        state = json.loads(content)
        detail["json_valid"] = True
    except json.JSONDecodeError as exc:
        return {
            "ok": False,
            "reason": f"state 파일 JSON 파싱 실패: {exc}",
            "detail": detail,
        }
    except Exception as exc:
        return {
            "ok": False,
            "reason": f"state 파일 읽기 실패: {exc}",
            "detail": detail,
        }

    stored_checksum = state.get("_checksum")
    if not stored_checksum:
        return {
            "ok": False,
            "reason": "state 파일 _checksum 필드 없음 → 수동 편집 의심 (fail-closed)",
            "detail": detail,
        }

    detail["checksum_present"] = True

    # checksum 재계산
    clean = {k: v for k, v in state.items() if k != "_checksum"}
    computed = hashlib.sha256(
        json.dumps(clean, sort_keys=True, ensure_ascii=False).encode("utf-8")
    ).hexdigest()

    if computed != stored_checksum:
        detail["stored_checksum"] = stored_checksum[:16] + "..."
        detail["computed_checksum"] = computed[:16] + "..."
        return {
            "ok": False,
            "reason": (
                "state 파일 checksum mismatch → 수동 편집 감지 (fail-closed). "
                "taskctl state repair 명령으로만 복구 가능"
            ),
            "detail": detail,
        }

    detail["checksum_match"] = True

    # Gemini 리뷰 medium: .verify-pending-{task_id} 마커 존재 시 차단.
    # state_repair() 후 verify_consistency() 미실행을 의미 → done/merge 금지.
    verify_pending_marker = work_root / _STATE_DIR_REL / f".verify-pending-{task_id}"
    if verify_pending_marker.exists():
        detail["verify_pending_marker"] = str(verify_pending_marker)
        return {
            "ok": False,
            "reason": (
                f"state repair 후 verify_consistency 미실행 → done/merge 차단 "
                f"(.verify-pending 마커 잔존: {verify_pending_marker})"
            ),
            "detail": detail,
        }

    return {
        "ok": True,
        "reason": "state 파일 존재 + JSON valid + checksum 일치 + verify-pending 마커 없음",
        "detail": detail,
    }


def verify_done_preconditions(
    pr_number: int,
    repo: str,
    *,
    task_id: Optional[str] = None,
    base_branch: str = "main",
    gh_cmd: Optional[list[str]] = None,
    cwd: Optional[Path] = None,
    workspace: Optional[Path] = None,
) -> dict:
    """6 check 통합 (task-2472 보강: 기존 3 → 6). 하나라도 FAIL 시 ``ok=False``.

    기존 3 check (순서 유지):
    1. merged_at
    2. merge_commit_oid
    3. ancestry

    task-2472 신규 3 check (task_id 필요):
    4. done_escalated_coexistence
    5. escalated_payload (escalated 존재 시에만)
    6. state_file_present

    Backward-compatible: task_id=None 시 신규 3 check 생략 (기존 동작 유지).

    Returns
    -------
    dict
        ``{"ok": bool, "reason": str, "detail": {...}}``.

        성공 시 ``detail`` 은 ``merge_commit_sha``, ``merged_at``, ``checks`` 키를 포함.
        실패 시 어느 check 가 실패 했는지 ``detail["failed_check"]`` 에 명시.
    """
    checks: dict[str, dict] = {}

    # --- check 1: merged_at ---
    merged_at_result = check_pr_merged_at(pr_number, repo, gh_cmd=gh_cmd, cwd=cwd)
    checks["merged_at"] = merged_at_result
    if not merged_at_result["ok"]:
        return {
            "ok": False,
            "reason": f"merged_at check failed: {merged_at_result['reason']}",
            "detail": {
                "failed_check": "merged_at",
                "pr_number": pr_number,
                "repo": repo,
                "checks": checks,
            },
        }

    # --- check 2: merge_commit_oid ---
    oid_result = check_pr_merge_commit_oid(pr_number, repo, gh_cmd=gh_cmd, cwd=cwd)
    checks["merge_commit_oid"] = oid_result
    if not oid_result["ok"]:
        return {
            "ok": False,
            "reason": f"merge_commit_oid check failed: {oid_result['reason']}",
            "detail": {
                "failed_check": "merge_commit_oid",
                "pr_number": pr_number,
                "repo": repo,
                "checks": checks,
            },
        }

    merge_commit_sha = oid_result["detail"]["merge_commit_sha"]
    merged_at = merged_at_result["detail"]["mergedAt"]

    # --- check 3: ancestry ---
    ancestry_result = check_origin_main_ancestry(
        merge_commit_sha, base_branch=base_branch, cwd=cwd
    )
    checks["ancestry"] = ancestry_result
    if not ancestry_result["ok"]:
        return {
            "ok": False,
            "reason": f"ancestry check failed: {ancestry_result['reason']}",
            "detail": {
                "failed_check": "ancestry",
                "pr_number": pr_number,
                "repo": repo,
                "merge_commit_sha": merge_commit_sha,
                "merged_at": merged_at,
                "base_branch": base_branch,
                "checks": checks,
            },
        }

    # --- task-2472 신규 check 4~6 (task_id 필요) ---
    if task_id:
        # check 4: .done + .done.escalated 동시 존재 차단
        coexistence_result = check_done_escalated_coexistence(
            task_id, workspace=workspace
        )
        checks["done_escalated_coexistence"] = coexistence_result
        if not coexistence_result["ok"]:
            return {
                "ok": False,
                "reason": f"done_escalated_coexistence check failed: {coexistence_result['reason']}",
                "detail": {
                    "failed_check": "done_escalated_coexistence",
                    "pr_number": pr_number,
                    "repo": repo,
                    "task_id": task_id,
                    "checks": checks,
                },
            }

        # check 5: .done.escalated payload 검증 (파일 존재 시)
        work_root = _workspace_root_path(workspace)
        escalated_path = work_root / _EVENTS_DIR_REL / f"{task_id}.done.escalated"
        if escalated_path.exists():
            payload_result = check_escalated_payload(escalated_path)
            checks["escalated_payload"] = payload_result
            if not payload_result["ok"]:
                return {
                    "ok": False,
                    "reason": f"escalated_payload check failed: {payload_result['reason']}",
                    "detail": {
                        "failed_check": "escalated_payload",
                        "pr_number": pr_number,
                        "repo": repo,
                        "task_id": task_id,
                        "checks": checks,
                    },
                }
        else:
            checks["escalated_payload"] = {
                "ok": True,
                "reason": ".done.escalated 파일 없음 — payload 검증 생략",
                "detail": {},
            }

        # check 6: state 파일 존재 + checksum 검증
        state_result = check_state_file_present(task_id, workspace=workspace)
        checks["state_file_present"] = state_result
        if not state_result["ok"]:
            return {
                "ok": False,
                "reason": f"state_file_present check failed: {state_result['reason']}",
                "detail": {
                    "failed_check": "state_file_present",
                    "pr_number": pr_number,
                    "repo": repo,
                    "task_id": task_id,
                    "checks": checks,
                },
            }

        return {
            "ok": True,
            "reason": "all 6 silent_corruption checks passed (task-2472 보강)",
            "detail": {
                "merge_commit_sha": merge_commit_sha,
                "merged_at": merged_at,
                "pr_number": pr_number,
                "repo": repo,
                "base_branch": base_branch,
                "task_id": task_id,
                "checks": checks,
            },
        }

    return {
        "ok": True,
        "reason": "all 3 silent_corruption checks passed",
        "detail": {
            "merge_commit_sha": merge_commit_sha,
            "merged_at": merged_at,
            "pr_number": pr_number,
            "repo": repo,
            "base_branch": base_branch,
            "checks": checks,
        },
    }
