"""anu_v2.merge_queue_executor — ANU v2 자동 머지 큐 실행기 v0 (task-2531).

회장 §명시 (2026-05-10) 9 기능 박제:
  1. queue head 자동 검증
  2. expected_files diff gate
  3. forbidden path gate
  4. CI / Gemini / mergeStateStatus CLEAN / HEAD SHA lock 확인
  5. BOT GITHUB_TOKEN process-local injection으로 squash merge
  6. post-merge smoke (회귀 재실행)
  7. downstream stale revalidation
  8. Critical 7종 분류 (회장 보고 대상)
  9. 비critical 자동 처리 (escalation 없이 self-resolve)

설계 원칙:
  - one-way isolation: anu_v2/* 만 import. utils/dispatch/scripts/dashboard 의존성 0.
  - 외부 부수효과는 모두 주입 가능한 callable 로 추상화 (gh_runner, git_runner,
    pytest_runner, audit_writer) — 테스트 시 mock 으로 대체 가능.
  - admin override / force / rebase / owner_pat fallback / manual .done 일체 사용 금지.
"""

from __future__ import annotations

import os
import re
import subprocess
from dataclasses import dataclass, field
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Callable, Iterable, Mapping, Sequence


# ─── Decision codes (gate 결과) ───────────────────────────────────────────────
AUTO_MERGE_ALLOWED = "AUTO_MERGE_ALLOWED"
AUTO_MERGE_SUCCESS = "AUTO_MERGE_SUCCESS"
WAITING_FOR_PREDECESSOR = "WAITING_FOR_PREDECESSOR"
BLOCKED_WITH_REASON = "BLOCKED_WITH_REASON"
HEAD_SHA_LOCK_BROKEN = "HEAD_SHA_LOCK_BROKEN"
CI_FAILURE_BLOCK = "CI_FAILURE_BLOCK"
GEMINI_UNRESOLVED_BLOCK = "GEMINI_UNRESOLVED_BLOCK"
MERGE_STATE_NOT_CLEAN = "MERGE_STATE_NOT_CLEAN"
DIFF_CONTAMINATION = "DIFF_CONTAMINATION"
FORBIDDEN_PATH_HIT = "FORBIDDEN_PATH_HIT"
NON_CRITICAL_AUTO_RESOLVED = "NON_CRITICAL_AUTO_RESOLVED"


# ─── Critical 7종 (회장 §) ───────────────────────────────────────────────────
CRITICAL_FORBIDDEN_PATH = "FORBIDDEN_PATH_INVASION"
CRITICAL_DIFF_REPLACEMENT_FAILED = "EFFECTIVE_DIFF_CONTAMINATION_REPLACEMENT_FAILED"
CRITICAL_GEMINI_SCOPE_EXPANSION = "GEMINI_REAL_BUG_SCOPE_EXPANSION"
CRITICAL_BLOCK_OVERRIDE = "BLOCK_OVERRIDE_REQUIRED_OR_INSUFFICIENT_REASON"
CRITICAL_DEPENDENCY_CYCLE = "DEPENDENCY_CYCLE_OR_SERIAL_ONLY_CONFLICT"
CRITICAL_REPLACEMENT_FAILED = "REPLACEMENT_PR_ALSO_FAILED"
CRITICAL_POST_MERGE_SMOKE = "POST_MERGE_SMOKE_FAILURE"

CRITICAL_CODES: frozenset[str] = frozenset({
    CRITICAL_FORBIDDEN_PATH,
    CRITICAL_DIFF_REPLACEMENT_FAILED,
    CRITICAL_GEMINI_SCOPE_EXPANSION,
    CRITICAL_BLOCK_OVERRIDE,
    CRITICAL_DEPENDENCY_CYCLE,
    CRITICAL_REPLACEMENT_FAILED,
    CRITICAL_POST_MERGE_SMOKE,
})

# 회장 §6 — 절대 금지 git/gh flag (정적 차단)
FORBIDDEN_GIT_FLAGS: frozenset[str] = frozenset({
    "--admin", "--force", "--force-with-lease", "-f", "--rebase",
})

# Gemini 상태값
GEMINI_COMPLETED = "GEMINI_COMPLETED"
GEMINI_UNRESOLVED = "GEMINI_UNRESOLVED"
GEMINI_REAL_BUG = "GEMINI_REAL_BUG"
GEMINI_SCOPE_EXPANSION = "GEMINI_SCOPE_EXPANSION"


# ─── Data types ──────────────────────────────────────────────────────────────
@dataclass(frozen=True)
class TaskSpec:
    """task md 에서 추출한 머지 게이트 메타데이터."""
    task_id: str
    expected_files: tuple[str, ...]
    forbidden_paths: tuple[str, ...] = ()
    cherry_pick_allowed: bool = False


@dataclass(frozen=True)
class PRMeta:
    """PR 메타 (gh pr view 결과 정규화)."""
    number: int
    head_sha: str
    head_ref: str
    base_ref: str
    changed_files: tuple[str, ...]
    ci_required_all_success: bool
    gemini_status: str
    merge_state_status: str          # CLEAN / DIRTY / BLOCKED / BEHIND / UNKNOWN
    queue_predecessors_open: int     # 선행 PR 중 OPEN 상태 개수


@dataclass
class GateOutcome:
    """gate 평가 결과."""
    decision: str
    reason: str = ""
    critical_code: str = ""
    extra: dict[str, Any] = field(default_factory=dict)

    @property
    def passed(self) -> bool:
        return self.decision in (AUTO_MERGE_ALLOWED, AUTO_MERGE_SUCCESS)

    @property
    def is_critical(self) -> bool:
        return bool(self.critical_code) and self.critical_code in CRITICAL_CODES


# ─── 외부 부수효과 콜백 시그니처 ─────────────────────────────────────────────
GhRunner = Callable[[Sequence[str], Mapping[str, str] | None], subprocess.CompletedProcess]
GitRunner = Callable[[Sequence[str]], subprocess.CompletedProcess]
PytestRunner = Callable[[Sequence[str]], int]            # 종료 코드 반환
AuditWriter = Callable[[Mapping[str, Any]], None]


# ─── Helpers ────────────────────────────────────────────────────────────────
def _now_iso() -> str:
    return datetime.now(timezone.utc).isoformat(timespec="seconds")


def _coerce_stream(value: Any) -> str:
    """subprocess stdout/stderr 정규화: None → "", bytes → utf-8 decode, str → 그대로.

    `text=True` 미설정 호출이 섞여 들어와도 직렬화 시점에 `bytes` 가 dict 에 박히지
    않게 한다.
    """
    if value is None:
        return ""
    if isinstance(value, bytes):
        return value.decode("utf-8", errors="replace")
    return str(value)


def assert_no_forbidden_git_flags(args: Sequence[str]) -> None:
    """gh / git 호출 인자에 admin/force/rebase 금지 플래그가 섞이면 즉시 RuntimeError.

    회장 §6 정적 차단 — 호출부에서 우회를 막는 단방향 hard-stop.

    검사 규칙:
      - exact: 인자 토큰이 FORBIDDEN 셋과 정확히 일치 (`--admin`, `--force`, ...).
      - prefix: 값 형태 (`--admin=true`, `--force-with-lease=...`) 도 차단.
      - 단축 결합 (`-af`): `-`/`--` 접두어가 1개일 때 각 문자가 단축 금지에 포함되는지.
        본 모듈은 `-f` 만 단축 형태로 사용 가정 (`-af`, `-fa` 모두 매칭).
    """
    short_chars = {flag[1:] for flag in FORBIDDEN_GIT_FLAGS if len(flag) == 2 and flag.startswith("-")}
    bad: list[str] = []
    for token in args:
        if not isinstance(token, str):
            continue
        if token in FORBIDDEN_GIT_FLAGS:
            bad.append(token)
            continue
        # `--admin=...` 등 값 포함형
        if "=" in token:
            head = token.split("=", 1)[0]
            if head in FORBIDDEN_GIT_FLAGS:
                bad.append(token)
                continue
        # 단축 결합 `-af` 처리: `--` (long) 가 아닌 `-X..` 형태에서만
        if (
            short_chars
            and len(token) > 2
            and token.startswith("-")
            and not token.startswith("--")
        ):
            if any(ch in short_chars for ch in token[1:]):
                bad.append(token)
                continue
    if bad:
        # 중복 제거 + 정렬 (테스트 결정성)
        unique = sorted(dict.fromkeys(bad))
        raise RuntimeError(f"FORBIDDEN_GIT_FLAG_BLOCKED: {unique}")


def _parse_yaml_list(block: str) -> tuple[str, ...]:
    """간단한 inline YAML list 파서 (`expected_files:` 블록의 `- "..."`).

    외부 yaml 모듈 의존성을 피하기 위해 직접 구현. 본 모듈이 다루는
    task md format 만 지원하면 되므로 minimal grammar.
    """
    items: list[str] = []
    for line in block.splitlines():
        # 인용된 항목 (큰/작은 따옴표 모두) 은 `#` 을 그대로 허용 (예: `C#_file.cs`),
        # 비인용 항목에서만 `#` 이후를 주석으로 절단한다.
        m = re.match(
            r"^\s*-\s*(?:\"([^\"]+)\"|'([^']+)'|([^#]+?))\s*(?:#.*)?$",
            line,
        )
        if m:
            value = next((g for g in m.groups() if g is not None), "")
            items.append(value.strip())
    return tuple(items)


def load_task_spec_from_md(task_md_path: Path) -> TaskSpec:
    """task md 파일에서 expected_files / forbidden_paths / cherry_pick_allowed 추출."""
    text = task_md_path.read_text(encoding="utf-8")
    task_id = task_md_path.stem  # e.g. "task-2531"

    def _extract_block(key: str) -> str:
        # `key:` 다음 줄부터 들여쓰기된 라인을 모은다.
        # - 키 자체가 들여쓰기 (YAML 블록 / fenced code block 내부) 된 경우도 허용.
        # - 종료 조건: 키와 같거나 더 얕은 들여쓰기 깊이의 비빈 라인을 만나면 종료
        #   (sibling key 오인 방지).
        # - 빈 줄 / 주석 라인은 보존하여 _parse_yaml_list 에서 무시하도록 한다.
        head_re = re.compile(rf"^([ \t]*){re.escape(key)}\s*:\s*$", re.MULTILINE)
        head = head_re.search(text)
        if not head:
            return ""
        key_indent = len(head.group(1))
        lines = text[head.end():].splitlines()
        block: list[str] = []
        for ln in lines:
            stripped = ln.lstrip()
            if not stripped:
                block.append(ln)
                continue
            if stripped.startswith("#"):
                block.append(ln)
                continue
            # 들여쓰기 깊이 비교 — key 보다 깊은 라인만 블록에 포함
            current_indent = len(ln) - len(stripped)
            if current_indent <= key_indent:
                break
            block.append(ln)
        return "\n".join(block) + ("\n" if block else "")

    expected = _parse_yaml_list(_extract_block("expected_files"))
    forbidden = _parse_yaml_list(_extract_block("forbidden_paths"))

    # cherry_pick_allowed 는 YAML 블록 내부의 값만 신뢰한다. 본문 prose 에 같은
    # 키워드가 등장해도 영향을 주지 않게, ```yaml ... ``` 펜스 내부 텍스트와
    # expected_files / forbidden_paths 인접 영역을 우선 검색.
    cherry_search_targets: list[str] = []
    for fence in re.findall(r"```ya?ml\s*\n([\s\S]*?)```", text, re.IGNORECASE):
        cherry_search_targets.append(fence)
    cherry_search_targets.append(_extract_block("expected_files"))
    cherry_search_targets.append(_extract_block("forbidden_paths"))
    cherry = False
    cherry_re = re.compile(
        # 행 끝 주석 (`# ...`) 도 허용하여 다른 파서들과 일관성 유지.
        r"^[ \t]*cherry_pick_allowed\s*:\s*(true|false)\s*(?:#.*)?$",
        re.MULTILINE | re.IGNORECASE,
    )
    for chunk in cherry_search_targets:
        if not chunk:
            continue
        m = cherry_re.search(chunk)
        if m:
            cherry = m.group(1).lower() == "true"
            break

    return TaskSpec(
        task_id=task_id,
        expected_files=expected,
        forbidden_paths=forbidden,
        cherry_pick_allowed=cherry,
    )


# ─── 본체 ────────────────────────────────────────────────────────────────────
class MergeQueueExecutor:
    """ANU v2 자동 머지 큐 실행기 v0.

    9 기능을 method 단위로 분리. 부수효과는 생성자 주입 callable 로 분리되어
    테스트는 mock callable 만 교체하면 충분하다.
    """

    def __init__(
        self,
        *,
        gh_runner: GhRunner,
        git_runner: GitRunner,
        pytest_runner: PytestRunner,
        audit_writer: AuditWriter,
        task_md_root: Path,
        bot_token_env: str = "BOT_GITHUB_TOKEN",
    ) -> None:
        self._gh = gh_runner
        self._git = git_runner
        self._pytest = pytest_runner
        self._audit = audit_writer
        self._task_md_root = Path(task_md_root)
        self._bot_token_env = bot_token_env

    # ── 1. queue head 자동 검증 ──────────────────────────────────────────────
    def evaluate_queue_head(self, pr: PRMeta) -> GateOutcome:
        """선행 PR 모두 머지 완료 (queue_predecessors_open == 0) 인지 검증."""
        if pr.queue_predecessors_open > 0:
            return GateOutcome(
                decision=WAITING_FOR_PREDECESSOR,
                reason=f"{pr.queue_predecessors_open} predecessor(s) still open",
            )
        return GateOutcome(decision=AUTO_MERGE_ALLOWED, reason="queue_head_confirmed")

    # ── 2. expected_files diff gate ──────────────────────────────────────────
    def check_expected_files_diff(
        self,
        spec: TaskSpec,
        pr: PRMeta,
    ) -> GateOutcome:
        """PR diff 가 task md expected_files 와 정확히 일치하는지 검증.

        불일치는 DIFF_CONTAMINATION (비critical) — 정정 트랙은 본 v0 범위 외.
        """
        expected = set(spec.expected_files)
        actual = set(pr.changed_files)
        if expected == actual:
            return GateOutcome(decision=AUTO_MERGE_ALLOWED, reason="expected_files_match")
        missing = expected - actual
        extra = actual - expected
        return GateOutcome(
            decision=DIFF_CONTAMINATION,
            reason="expected_files_mismatch",
            extra={"missing": sorted(missing), "extra": sorted(extra)},
        )

    # ── 3. forbidden path gate ───────────────────────────────────────────────
    def check_forbidden_paths(self, spec: TaskSpec, pr: PRMeta) -> GateOutcome:
        """PR diff 에 task md forbidden_paths 매칭 파일이 있으면 Critical 7종."""
        if not spec.forbidden_paths:
            return GateOutcome(decision=AUTO_MERGE_ALLOWED, reason="no_forbidden_paths")

        hits: list[str] = []
        for changed in pr.changed_files:
            for pattern in spec.forbidden_paths:
                if _glob_match(pattern, changed):
                    hits.append(changed)
                    break
        if hits:
            return GateOutcome(
                decision=BLOCKED_WITH_REASON,
                reason=FORBIDDEN_PATH_HIT,
                critical_code=CRITICAL_FORBIDDEN_PATH,
                extra={"hits": sorted(set(hits))},
            )
        return GateOutcome(decision=AUTO_MERGE_ALLOWED, reason="forbidden_paths_clean")

    # ── 4. CI / Gemini / CLEAN / HEAD SHA lock ──────────────────────────────
    def check_ci_gemini_clean_sha_lock(
        self,
        pr: PRMeta,
        *,
        head_sha_at_lock: str,
    ) -> GateOutcome:
        """CI required all SUCCESS + Gemini unresolved 0 + CLEAN + HEAD SHA 일치."""
        if not pr.ci_required_all_success:
            return GateOutcome(
                decision=CI_FAILURE_BLOCK,
                reason="ci_required_not_all_success",
            )
        if pr.gemini_status == GEMINI_SCOPE_EXPANSION:
            return GateOutcome(
                decision=BLOCKED_WITH_REASON,
                reason="gemini_scope_expansion",
                critical_code=CRITICAL_GEMINI_SCOPE_EXPANSION,
            )
        if pr.gemini_status not in (GEMINI_COMPLETED,):
            return GateOutcome(
                decision=GEMINI_UNRESOLVED_BLOCK,
                reason=f"gemini_status={pr.gemini_status}",
            )
        if pr.merge_state_status != "CLEAN":
            decision = MERGE_STATE_NOT_CLEAN
            critical = ""
            if pr.merge_state_status == "BLOCKED":
                decision = BLOCKED_WITH_REASON
                critical = CRITICAL_BLOCK_OVERRIDE
            return GateOutcome(
                decision=decision,
                reason=f"mergeStateStatus={pr.merge_state_status}",
                critical_code=critical,
            )
        if pr.head_sha != head_sha_at_lock:
            return GateOutcome(
                decision=HEAD_SHA_LOCK_BROKEN,
                reason="head_sha_changed_during_evaluation",
                extra={"locked": head_sha_at_lock, "current": pr.head_sha},
            )
        return GateOutcome(decision=AUTO_MERGE_ALLOWED, reason="all_gates_clean")

    # ── 5. BOT identity squash merge ─────────────────────────────────────────
    def execute_bot_squash_merge(
        self,
        pr: PRMeta,
        *,
        head_sha_at_lock: str,
    ) -> GateOutcome:
        """`GH_TOKEN=$BOT_GITHUB_TOKEN` process-local injection 으로 squash merge.

        - 호스트 환경의 GH_TOKEN / GITHUB_TOKEN 은 절대 그대로 사용하지 않는다.
        - admin / force / rebase 플래그는 정적 차단 (assert_no_forbidden_git_flags).
        """
        token = os.environ.get(self._bot_token_env, "").strip()
        if not token:
            return GateOutcome(
                decision=BLOCKED_WITH_REASON,
                reason="bot_token_unavailable",
                critical_code=CRITICAL_BLOCK_OVERRIDE,
            )

        args = [
            "pr", "merge", str(pr.number),
            "--squash",
            "--match-head-commit", head_sha_at_lock,
        ]
        assert_no_forbidden_git_flags(args)

        # process-local env: parent의 PATH 등 실행 환경은 유지하되, 토큰은 BOT 으로 덮어쓴다.
        # 빈 dict 로 시작하면 PATH 누락 → gh/git 미발견. owner PAT 등 외부 토큰은 그대로
        # 두지 않고 BOT 토큰으로 명시적 덮어써서 격리한다.
        env = os.environ.copy()
        env["GH_TOKEN"] = token
        env["GITHUB_TOKEN"] = token
        cp = self._gh(args, env)
        if cp.returncode != 0:
            return GateOutcome(
                decision=BLOCKED_WITH_REASON,
                reason="gh_pr_merge_failed",
                critical_code=CRITICAL_BLOCK_OVERRIDE,
                # capture_output=False 이거나 텍스트 모드 비활성화 등으로 stdout/stderr 가
                # None / bytes 일 수 있다. 항상 str 로 정규화하여 직렬화 안전성을 보장.
                extra={"stderr": _coerce_stream(getattr(cp, "stderr", None))[:512]},
            )
        return GateOutcome(
            decision=AUTO_MERGE_SUCCESS,
            reason="bot_squash_merge_completed",
            extra={"stdout": _coerce_stream(getattr(cp, "stdout", None))[:512]},
        )

    # ── 6. post-merge smoke ──────────────────────────────────────────────────
    def run_post_merge_smoke(
        self,
        *,
        smoke_test_paths: Sequence[str],
    ) -> GateOutcome:
        """머지 후 회귀 (smoke) 재실행."""
        if not smoke_test_paths:
            return GateOutcome(decision=AUTO_MERGE_SUCCESS, reason="smoke_skipped_no_targets")
        rc = self._pytest(list(smoke_test_paths))
        if rc != 0:
            return GateOutcome(
                decision=BLOCKED_WITH_REASON,
                reason="post_merge_smoke_failed",
                critical_code=CRITICAL_POST_MERGE_SMOKE,
                extra={"pytest_exit_code": rc},
            )
        return GateOutcome(decision=AUTO_MERGE_SUCCESS, reason="post_merge_smoke_passed")

    # ── 7. downstream stale revalidation ─────────────────────────────────────
    def revalidate_downstream(
        self,
        *,
        merged_task_id: str,
        downstream_pr_numbers: Iterable[int],
    ) -> list[dict[str, Any]]:
        """머지로 stale 이 된 다른 OPEN PR 의 evidence 재검증 트리거.

        모든 gh 호출은 BOT 토큰 process-local injection 으로 격리한다 (회장 §6 — 봇
        identity 강제). 외부 owner_pat 누출 차단.
        """
        token = os.environ.get(self._bot_token_env, "").strip()
        if not token:
            # 토큰 없으면 머지 단계와 동일하게 실행 거부 (silent skip 금지).
            return [
                {
                    "pr_number": pr_no,
                    "ok": False,
                    "reason": "bot_token_unavailable",
                    "ts": _now_iso(),
                }
                for pr_no in downstream_pr_numbers
            ]
        env = os.environ.copy()
        env["GH_TOKEN"] = token
        env["GITHUB_TOKEN"] = token

        results: list[dict[str, Any]] = []
        for pr_no in downstream_pr_numbers:
            args = ["pr", "comment", str(pr_no), "-b",
                    f"[anu_v2.merge_queue_executor] base merged: {merged_task_id} → "
                    "evidence revalidation requested (CI rerun + Gemini retrigger)."]
            assert_no_forbidden_git_flags(args)
            cp = self._gh(args, env)
            results.append({
                "pr_number": pr_no,
                "ok": cp.returncode == 0,
                "ts": _now_iso(),
            })
        return results

    # ── 8. Critical 7종 분류 / 9. 비critical 자동 처리 ────────────────────────
    def classify_failure(self, outcome: GateOutcome) -> str:
        """Critical 7종 코드면 그대로 반환, 그 외는 NON_CRITICAL_AUTO_RESOLVED."""
        if outcome.is_critical:
            return outcome.critical_code
        return NON_CRITICAL_AUTO_RESOLVED

    def auto_handle_non_critical(self, outcome: GateOutcome) -> GateOutcome:
        """비critical 결과는 audit 에만 기록하고 self-resolve. 회장 보고 X.

        WAITING_FOR_PREDECESSOR 는 머지 큐의 정상적인 대기 상태라 짧은 주기로 반복
        실행 시 audit 로그가 과도하게 누적된다. 대기 상태는 로깅을 생략하고 그대로
        반환만 한다.
        """
        if outcome.decision != WAITING_FOR_PREDECESSOR:
            self._audit({
                "ts": _now_iso(),
                "kind": "non_critical_auto_resolved",
                "decision": outcome.decision,
                "reason": outcome.reason,
                "extra": dict(outcome.extra),
            })
        return GateOutcome(
            decision=NON_CRITICAL_AUTO_RESOLVED,
            reason=outcome.reason,
            extra=dict(outcome.extra),
        )

    # ── 통합 평가 (전체 파이프라인) ─────────────────────────────────────────
    def evaluate(
        self,
        *,
        pr: PRMeta,
        head_sha_at_lock: str,
    ) -> GateOutcome:
        """9 기능 중 게이트 1~4 를 순차 평가하고 최종 결과를 반환.

        실제 머지 (5) / 스모크 (6) / 다운스트림 재검증 (7) 은 호출부에서 명시 호출.
        """
        # head_ref 는 외부 입력값. branch 명에 `../` 등이 들어오면 task_md_root 외부의
        # 파일을 가리킬 수 있으므로 task_id 의 파일명 부분만 추출해 root 안으로 강제 한정.
        raw_task_id = _extract_task_id_from_branch(pr.head_ref)
        safe_task_id = Path(raw_task_id).name
        spec_path = self._task_md_root / f"{safe_task_id}.md"
        if not spec_path.exists():
            return GateOutcome(
                decision=BLOCKED_WITH_REASON,
                reason="task_md_missing",
                critical_code=CRITICAL_BLOCK_OVERRIDE,
                extra={"path": str(spec_path)},
            )
        spec = load_task_spec_from_md(spec_path)

        # 평가 순서: Critical-우선. forbidden_path 와 ci/gemini/scope_expansion 같은
        # Critical 7종 후보 게이트를 먼저 실행해서 보안 위반이 큐 대기/diff 불일치보다
        # 늦게 발견되는 일이 없게 한다. 모든 게이트는 부수효과가 없는 read-only 평가.
        for gate in (
            self.check_forbidden_paths(spec, pr),
            self.check_ci_gemini_clean_sha_lock(pr, head_sha_at_lock=head_sha_at_lock),
            self.evaluate_queue_head(pr),
            self.check_expected_files_diff(spec, pr),
        ):
            if not gate.passed:
                if not gate.is_critical:
                    return self.auto_handle_non_critical(gate)
                return gate

        return GateOutcome(decision=AUTO_MERGE_ALLOWED, reason="all_4_gates_pass")


# ─── glob 매칭 (forbidden path 용) ───────────────────────────────────────────
def _glob_match(pattern: str, path: str) -> bool:
    """간단한 glob 매칭.

    지원 규칙:
      - `**/`  → `(?:.*/)?` (0개 이상의 디렉토리. 루트 파일도 매칭).
      - `**`   → `.*`       (`/` 포함 임의 문자열).
      - `*`    → `[^/]*`    (디렉토리 경계를 넘지 않는 임의 문자열).

    `**/` 처리를 `**` 보다 먼저 두어 `**/foo.py` 패턴이 `foo.py` (루트 파일) 도
    매칭할 수 있게 한다 (re.escape 가 `/` 를 이스케이프하지 않는 환경 가정 외에도
    이스케이프된 `\\/` 형태도 함께 처리).
    """
    escaped = re.escape(pattern)
    regex = (
        "^"
        + escaped.replace(r"\*\*\/", r"(?:.*/)?")
                 .replace(r"\*\*/", r"(?:.*/)?")
                 .replace(r"\*\*", ".*")
                 .replace(r"\*", "[^/]*")
        + "$"
    )
    return re.match(regex, path) is not None


def _extract_task_id_from_branch(branch: str) -> str:
    """`task/task-2531-dev4` → `task-2531`."""
    m = re.match(r"^task/(?P<id>task-\d+(?:\+\d+)?)", branch)
    if not m:
        return branch
    return m.group("id")
