"""anu_v2.worktree_cleanup — 6대 안전조건 기반 worktree cleanup helper (task-2550).

회장 §명시 (2026-05-11 C방안 승인):
  - 81개 누적 .worktrees 정리 — dry-run 우선, --apply는 별도 승인
  - 6대 안전조건 AND 게이트 — 어느 하나라도 FAIL 시 skip + log
  - main worktree 절대 보호 (workspace_root path 차단)
  - one-way isolation: anu_v2 외부 import 금지

6대 안전조건:
  1. task .done.acked 마커 존재
  2. PR state = MERGED (gh API)
  3. task .merge-done 마커 존재
  4. branch가 main에 ancestor (git merge-base --is-ancestor)
  5. worktree 사용 중 X (pgrep + git worktree list lock)
  6. dry-run default; --apply 명시 시에만 실제 삭제

dirty worktree skip + log (memory/events/worktree-cleanup-skipped-<ts>.json)
"""

from __future__ import annotations

import json
import re
import subprocess
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Callable

# 상수
DEFAULT_CHAT_ID: int = 6937032012
_KST = timezone(timedelta(hours=9))

# token mask 패턴 (post_merge_smoke_runner와 동일 패턴 — anu_v2 isolation으로 import X)
_TOKEN_PREFIX_RE = re.compile(
    r"(?:ghp_|ghs_|github_pat_)[A-Za-z0-9_\-]+",
    re.IGNORECASE,
)
TOKEN_KEY_HINTS = (
    "github_token", "bot_github_token", "gh_token", "owner_pat",
    "x-api-key", "authorization", "secret", "password",
)
_KEY_VALUE_RE = re.compile(
    r"(?i)(" + "|".join(re.escape(h) for h in TOKEN_KEY_HINTS) + r")"
    r"([=:\s]+[^\s,;\"']{1,200})",
)


def _sanitize_text(text: object) -> str:
    """raw token / API key 마스킹."""
    s = text if isinstance(text, str) else str(text)
    s = _TOKEN_PREFIX_RE.sub("***MASKED***", s)
    s = _KEY_VALUE_RE.sub(lambda m: m.group(1) + m.group(2)[0] + "***MASKED***", s)
    return s


@dataclass
class WorktreeCandidate:
    """worktree 후보 1개."""
    path: str           # 절대 경로
    branch: str         # task/task-2474-dev2 등
    task_id: str | None # branch 패턴에서 추출
    head_sha: str       # commit SHA


@dataclass
class SafetyConditionResult:
    """6대 안전조건 결과."""
    condition: int      # 1~6
    name: str           # "done_acked", "pr_merged", ...
    passed: bool
    detail: str         # FAIL 사유


@dataclass
class CleanupResult:
    """worktree 1개 cleanup 결과."""
    worktree_path: str
    task_id: str | None
    safety_results: list[SafetyConditionResult]
    all_safe: bool
    dirty: bool
    is_main: bool
    applied: bool       # 실제 삭제 여부 (dry-run False)
    skipped: bool       # skip 발동 여부
    skip_reason: str | None
    ts: str             # KST ISO8601


class WorktreeCleanup:
    """6대 안전조건 기반 worktree cleanup helper.

    외부 부수효과는 모두 Callable 주입 가능.
    """

    def __init__(
        self,
        *,
        subprocess_runner: Callable[..., subprocess.CompletedProcess] | None = None,
        clock: Callable[[], datetime] | None = None,
        workspace_root: Path | None = None,
    ) -> None:
        self._subprocess_runner = subprocess_runner if subprocess_runner is not None else subprocess.run
        self._clock = clock if clock is not None else (lambda: datetime.now(tz=_KST))
        self._workspace_root = workspace_root if workspace_root is not None else Path("/home/jay/workspace")

    # ─── 6대 안전조건 ────────────────────────────────────────────────────

    def check_safety_1_done_acked(self, task_id: str) -> SafetyConditionResult:
        """안전조건 1: memory/events/<task_id>.done.acked 존재."""
        marker = self._workspace_root / "memory" / "events" / f"{task_id}.done.acked"
        passed = marker.exists()
        return SafetyConditionResult(
            condition=1, name="done_acked", passed=passed,
            detail=("ok" if passed else f"missing: {marker}"),
        )

    def check_safety_2_pr_merged(self, task_id: str, branch: str | None = None) -> SafetyConditionResult:
        """안전조건 2: gh API로 task의 PR state == MERGED 확인.

        gh API 호출 전략:
          1. branch가 주어지면 `--head <branch suffix>` 정확 매칭 시도
          2. branch가 None이거나 1번 실패 시 head 없이 전체 검색 후 headRefName에서 task_id 매칭
        gh API 실패 시 FAIL (보수적). 어느 dev 팀(dev1~dev7)이든 매칭 가능.
        """
        # branch에서 head_ref 추출 (refs/heads/ prefix 제거)
        head_ref: str | None = None
        if branch:
            head_ref = branch[len("refs/heads/"):] if branch.startswith("refs/heads/") else branch
        try:
            cmd: list[str] = ["gh", "pr", "list", "--state", "all", "--json", "number,state,headRefName"]
            if head_ref:
                cmd.extend(["--head", head_ref])
            else:
                cmd.extend(["--search", task_id])
            proc = self._subprocess_runner(cmd, capture_output=True, text=True, check=False, cwd=str(self._workspace_root), timeout=30)
            if proc.returncode != 0:
                return SafetyConditionResult(
                    condition=2, name="pr_merged", passed=False,
                    detail=f"gh api failed: {_sanitize_text(proc.stderr)[:200]}",
                )
            prs = json.loads(proc.stdout or "[]")
            if not prs:
                return SafetyConditionResult(condition=2, name="pr_merged", passed=False, detail="no PR found")
            # 보안: task_id가 headRefName에 정확히 포함되는 PR만 인정.
            # 불일치 시 unrelated PR 재사용 금지 → FAIL.
            candidates = [pr for pr in prs if task_id in str(pr.get("headRefName", ""))]
            if not candidates:
                return SafetyConditionResult(
                    condition=2, name="pr_merged", passed=False,
                    detail=f"no PR with task_id={task_id} in headRefName (unrelated PR rejected)",
                )
            for pr in candidates:
                if pr.get("state") == "MERGED":
                    return SafetyConditionResult(condition=2, name="pr_merged", passed=True, detail=f"PR #{pr.get('number')} MERGED")
            return SafetyConditionResult(condition=2, name="pr_merged", passed=False, detail=f"PR state: {[p.get('state') for p in candidates]}")
        except (subprocess.TimeoutExpired, json.JSONDecodeError, OSError) as e:
            return SafetyConditionResult(condition=2, name="pr_merged", passed=False, detail=f"error: {_sanitize_text(str(e))[:100]}")

    def check_safety_3_merge_done(self, task_id: str) -> SafetyConditionResult:
        """안전조건 3: memory/events/<task_id>.merge-done 존재."""
        marker = self._workspace_root / "memory" / "events" / f"{task_id}.merge-done"
        passed = marker.exists()
        return SafetyConditionResult(
            condition=3, name="merge_done", passed=passed,
            detail=("ok" if passed else f"missing: {marker}"),
        )

    def check_safety_4_branch_in_main(self, branch: str) -> SafetyConditionResult:
        """안전조건 4: git merge-base --is-ancestor <branch> origin/main → 0 (PASS).

        branch가 main에 머지되었으면 ancestor.
        """
        try:
            cmd = ["git", "merge-base", "--is-ancestor", branch, "origin/main"]
            proc = self._subprocess_runner(cmd, capture_output=True, text=True, check=False, cwd=str(self._workspace_root), timeout=15)
            passed = (proc.returncode == 0)
            return SafetyConditionResult(
                condition=4, name="branch_in_main", passed=passed,
                detail=("ancestor of main" if passed else f"NOT ancestor (rc={proc.returncode})"),
            )
        except (subprocess.TimeoutExpired, OSError) as e:
            return SafetyConditionResult(condition=4, name="branch_in_main", passed=False, detail=f"error: {str(e)[:100]}")

    def check_safety_5_not_in_use(self, worktree_path: str) -> SafetyConditionResult:
        """안전조건 5: worktree 사용 중 X.

        이중 안전장치:
          (a) git worktree list --porcelain: 해당 path가 locked / prunable이 아닌지 확인
          (b) pgrep -f <worktree_path>: 다른 봇 process가 path를 사용 중이 아닌지 확인
        둘 다 PASS여야 안전 (AND 게이트). 한쪽이라도 FAIL이면 사용 중으로 판정.
        """
        # (a) git worktree list --porcelain 검사
        try:
            cmd_a = ["git", "worktree", "list", "--porcelain"]
            proc_a = self._subprocess_runner(
                cmd_a, capture_output=True, text=True, check=False,
                cwd=str(self._workspace_root), timeout=15,
            )
            if proc_a.returncode != 0:
                return SafetyConditionResult(
                    condition=5, name="not_in_use", passed=False,
                    detail=f"git worktree list failed: rc={proc_a.returncode}",
                )
            # porcelain output에서 path block 찾기
            locked_or_prunable = False
            in_target_block = False
            for line in proc_a.stdout.splitlines():
                if line.startswith("worktree "):
                    in_target_block = line[len("worktree "):].strip() == worktree_path
                elif in_target_block and line.strip() in ("locked", "prunable"):
                    locked_or_prunable = True
                    break
                elif in_target_block and line.startswith(("locked ", "prunable ")):
                    locked_or_prunable = True
                    break
            if locked_or_prunable:
                return SafetyConditionResult(
                    condition=5, name="not_in_use", passed=False,
                    detail="worktree is locked or prunable per git worktree list",
                )
        except (subprocess.TimeoutExpired, OSError) as e:
            return SafetyConditionResult(condition=5, name="not_in_use", passed=False, detail=f"git worktree list error: {str(e)[:100]}")

        # (b) pgrep 검사
        try:
            cmd_b = ["pgrep", "-f", worktree_path]
            proc_b = self._subprocess_runner(cmd_b, capture_output=True, text=True, check=False, timeout=15)
            # pgrep returncode 0 = 매치 있음 (사용 중) → FAIL
            # returncode 1 = 매치 없음 (안전) → PASS
            if proc_b.returncode == 1:
                return SafetyConditionResult(condition=5, name="not_in_use", passed=True, detail="git list OK + no process using path")
            elif proc_b.returncode == 0:
                pid_count = len(proc_b.stdout.strip().splitlines())
                return SafetyConditionResult(condition=5, name="not_in_use", passed=False, detail=f"{pid_count} process(es) using path")
            else:
                return SafetyConditionResult(condition=5, name="not_in_use", passed=False, detail=f"pgrep rc={proc_b.returncode}")
        except (subprocess.TimeoutExpired, OSError) as e:
            return SafetyConditionResult(condition=5, name="not_in_use", passed=False, detail=f"pgrep error: {str(e)[:100]}")

    def check_safety_6_apply_explicit(self, apply_flag: bool) -> SafetyConditionResult:
        """안전조건 6: dry-run default; --apply 명시 시에만 PASS."""
        # 본 조건은 cleanup_worktree 호출 시점의 apply 파라미터 명시성 검증.
        # PASS 의미: "실제 삭제를 허용함". dry-run 모드에서는 본 조건 PASS X (실제 삭제 X).
        return SafetyConditionResult(
            condition=6, name="apply_explicit", passed=apply_flag,
            detail=("--apply specified" if apply_flag else "dry-run (no actual delete)"),
        )

    # ─── 추가 안전 ──────────────────────────────────────────────────────

    def is_dirty_worktree(self, worktree_path: str) -> bool:
        """uncommitted changes 존재 여부.

        `git status --porcelain` 결과가 비어있지 않으면 dirty.
        """
        try:
            cmd = ["git", "-C", worktree_path, "status", "--porcelain"]
            proc = self._subprocess_runner(cmd, capture_output=True, text=True, check=False, timeout=15)
            return bool(proc.stdout.strip())
        except (subprocess.TimeoutExpired, OSError):
            # 오류 시 보수적으로 dirty 취급 → skip
            return True

    def is_main_worktree(self, worktree_path: str) -> bool:
        """★ workspace_root와 동일 경로면 main worktree → 절대 삭제 X."""
        try:
            wp = Path(worktree_path).resolve()
            wr = self._workspace_root.resolve()
            return wp == wr
        except OSError:
            return True  # 보수적으로 main 취급

    # ─── enumerate ──────────────────────────────────────────────────────

    def enumerate_worktrees(self) -> list[WorktreeCandidate]:
        """git worktree list로 모든 worktree 열거.

        main worktree는 결과에 포함되지만, cleanup_worktree에서 차단.
        """
        try:
            cmd = ["git", "worktree", "list", "--porcelain"]
            proc = self._subprocess_runner(cmd, capture_output=True, text=True, check=False, cwd=str(self._workspace_root), timeout=15)
            if proc.returncode != 0:
                return []
            candidates: list[WorktreeCandidate] = []
            blocks = proc.stdout.strip().split("\n\n")
            for blk in blocks:
                lines = blk.strip().splitlines()
                if not lines:
                    continue
                path = ""
                head = ""
                branch = ""
                for line in lines:
                    if line.startswith("worktree "):
                        path = line[len("worktree "):].strip()
                    elif line.startswith("HEAD "):
                        head = line[len("HEAD "):].strip()
                    elif line.startswith("branch "):
                        branch = line[len("branch "):].strip()
                if path:
                    task_id = self._extract_task_id(branch or path)
                    candidates.append(WorktreeCandidate(path=path, branch=branch, task_id=task_id, head_sha=head))
            return candidates
        except (subprocess.TimeoutExpired, OSError):
            return []

    @staticmethod
    def _extract_task_id(s: str) -> str | None:
        """branch / path에서 task_id 추출 (예: task/task-2474-dev2 → task-2474)."""
        m = re.search(r"task-(\d+(?:\.\d+)?)", s)
        return f"task-{m.group(1)}" if m else None

    # ─── cleanup ───────────────────────────────────────────────────────

    def cleanup_worktree(self, candidate: WorktreeCandidate, apply: bool = False) -> CleanupResult:
        """단일 worktree cleanup 시도. 6대 안전조건 AND 검증 후 실행."""
        ts = self._clock().isoformat()

        # ★ main worktree 절대 보호 — 회장 가시성 확보 위해 log 박제
        if self.is_main_worktree(candidate.path):
            self._log_skipped(candidate, "main_worktree_protected", ts)
            return CleanupResult(
                worktree_path=candidate.path, task_id=candidate.task_id,
                safety_results=[], all_safe=False, dirty=False, is_main=True,
                applied=False, skipped=True, skip_reason="main worktree (workspace_root) — never deleted",
                ts=ts,
            )

        # dirty 검사
        dirty = self.is_dirty_worktree(candidate.path)
        if dirty:
            self._log_skipped(candidate, "dirty", ts)
            return CleanupResult(
                worktree_path=candidate.path, task_id=candidate.task_id,
                safety_results=[], all_safe=False, dirty=True, is_main=False,
                applied=False, skipped=True, skip_reason="dirty worktree (uncommitted changes)",
                ts=ts,
            )

        # task_id 추출 실패 시 skip
        if not candidate.task_id:
            return CleanupResult(
                worktree_path=candidate.path, task_id=None,
                safety_results=[], all_safe=False, dirty=False, is_main=False,
                applied=False, skipped=True, skip_reason="task_id cannot be extracted",
                ts=ts,
            )

        # 6대 안전조건 검증
        results = [
            self.check_safety_1_done_acked(candidate.task_id),
            self.check_safety_2_pr_merged(candidate.task_id, branch=candidate.branch),
            self.check_safety_3_merge_done(candidate.task_id),
            self.check_safety_4_branch_in_main(candidate.branch),
            self.check_safety_5_not_in_use(candidate.path),
            self.check_safety_6_apply_explicit(apply),
        ]
        all_safe = all(r.passed for r in results)

        applied = False
        skipped = False
        skip_reason: str | None = None

        if not all_safe:
            skipped = True
            failed = [r.name for r in results if not r.passed]
            skip_reason = f"safety failed: {failed}"
            # 운영 감사성: safety 1~5 FAIL 시에도 log 박제 (apply_explicit 단독 FAIL은 dry-run 정상 동작이므로 제외)
            failed_set = set(failed)
            non_apply_failures = failed_set - {"apply_explicit"}
            if non_apply_failures:
                self._log_skipped(candidate, f"safety_failed:{sorted(non_apply_failures)}", ts)
        elif not apply:
            # 모든 safety PASS이지만 dry-run (조건 6 PASS는 apply=True 의미이므로 여기는 실질적으로 도달 X)
            skipped = True
            skip_reason = "dry-run mode (apply=False)"
        else:
            # 실제 삭제: git worktree remove
            try:
                cmd = ["git", "worktree", "remove", candidate.path]
                proc = self._subprocess_runner(cmd, capture_output=True, text=True, check=False, cwd=str(self._workspace_root), timeout=30)
                if proc.returncode == 0:
                    applied = True
                else:
                    skipped = True
                    skip_reason = f"git worktree remove failed: {_sanitize_text(proc.stderr)[:200]}"
            except (subprocess.TimeoutExpired, OSError) as e:
                skipped = True
                skip_reason = f"error: {_sanitize_text(str(e))[:100]}"

        return CleanupResult(
            worktree_path=candidate.path, task_id=candidate.task_id,
            safety_results=results, all_safe=all_safe, dirty=False, is_main=False,
            applied=applied, skipped=skipped, skip_reason=skip_reason,
            ts=ts,
        )

    def cleanup_all_dry_run(self, apply: bool = False) -> list[CleanupResult]:
        """모든 worktree 후보 검사. dry-run default."""
        results: list[CleanupResult] = []
        for cand in self.enumerate_worktrees():
            results.append(self.cleanup_worktree(cand, apply=apply))
        return results

    # ─── log ───────────────────────────────────────────────────────────

    def _log_skipped(self, candidate: WorktreeCandidate, reason: str, ts: str) -> None:
        """skip log 박제 → memory/events/worktree-cleanup-skipped-<ts_compact>-<hash>.json.

        ts + path hash로 파일명 유일성 확보 (동시 실행 시 덮어쓰기 방지).
        branch / worktree_path 필드는 _sanitize_text로 token-like 노출 방어.
        """
        try:
            ts_compact = ts.replace(":", "-").replace(".", "-")
            # path 기반 짧은 hash로 파일명 충돌 회피
            path_hash = abs(hash(candidate.path)) % (10**8)
            log_path = self._workspace_root / "memory" / "events" / f"worktree-cleanup-skipped-{ts_compact}-{path_hash}.json"
            log_path.parent.mkdir(parents=True, exist_ok=True)
            payload = {
                "ts": ts,
                "task_id": _sanitize_text(candidate.task_id) if candidate.task_id else None,
                "worktree_path": _sanitize_text(candidate.path),
                "branch": _sanitize_text(candidate.branch),
                "reason": reason,
                "chat_id": DEFAULT_CHAT_ID,
            }
            with open(log_path, "w", encoding="utf-8") as f:
                json.dump(payload, f, ensure_ascii=False, indent=2)
        except OSError:
            pass

