# -*- coding: utf-8 -*-
"""utils.real_merge_hooks — dryrun→real switch + real_merge_execute() 순수함수.

task-2637 — real merge executor wiring 코드화 (activation false default · 실제 merge 실행 0).

Spec: memory/specs/system_real_merge_executor_wiring_spec_260523.md §5 + §1
sha256: bcaf654e981a43083af50879164021c918eeac9753cad3b3ad146209a1a62765

회장 verbatim (회장 10결정 #8):
    dryrun→real switch 위치 = 신규 real_merge_hooks 모듈
    finalize_hooks 직접 확장 최소화

회장 verbatim (회장 10결정 #1):
    activation = env var + chair_authorization JSON 둘 다 필요.
    둘 중 하나라도 없으면 NO_OP_NO_AUTHORIZATION.

회장 verbatim (회장 10결정 #4):
    post-merge smoke 실패 시 자동 rollback 금지. 보고 only.

회장 verbatim (회장 10결정 #6):
    artifact 위치 = memory/events/real_merge/<pr>/<head_sha>/

ANCHOR-3 (task md): "non-admin merge 만 허용 · admin override 코드 경로 0 ·
gh pr merge 실호출 0 (subprocess injection mock)"

ANCHOR-6 (task md): "forbidden paths 11+종 정적 가드 (replacement_pr_runner/
finish-task.sh/merge_ready_classifier/merge_ready_dryrun_executor/
callback_envelope_schema/anu_callback_registrar/canonical_root_resolver/
anu_collector_action_trigger/dispatch_finalize_hooks/.tasks/tests/fixtures/
tests/regression)"
"""
from __future__ import annotations

from datetime import datetime, timedelta, timezone
from typing import Any, Callable, Dict, List, Optional

from utils.activation_flag_validator import (
    ACTIVATION_FLAG_DEFAULT,
    assert_default_off,
    resolve_activation_flag,
)
from utils.chair_authorization_validator import (
    validate_chair_authorization,
)
from utils.gate_snapshot_validator import (
    GATE_STALE,
    validate_gate_snapshot,
)
from utils.merge_ready_dryrun_executor import (
    ACTION_WOULD_MERGE,
    SCHEMA_AUTO_MERGE_CANDIDATE,
)
from utils.merge_ready_states import PASS
from utils.real_merge_artifact_schema import (
    EXECUTION_RESULT_SCHEMA,
    MERGE_DECISION_SCHEMA,
    artifact_already_present,
    write_merge_decision,
    write_merge_execution_result,
    write_pre_merge_gate_snapshot,
)
from utils.snapshot_crossref_validator import (
    ALLOW_REASON_SNAPSHOT_CROSSREF,
    validate_snapshot_crossref,
)

# Module schema id (used by dispatch wiring as a static anchor).
# task-2639: v1 → v2 bump (Step 0 흐름 정정 · snapshot crossref · allow_reason).
REAL_MERGE_HOOKS_SCHEMA = "utils.real_merge_hooks.v2"

# ─────────────────────────────────────────────────────────────────────────────
# Result enums (spec §5.2)
# ─────────────────────────────────────────────────────────────────────────────
NO_OP_FLAG_DISABLED = "NO_OP_FLAG_DISABLED"
NO_OP_NO_AUTHORIZATION = "NO_OP_NO_AUTHORIZATION"
NO_OP_NOT_PASS = "NO_OP_NOT_PASS"
NO_OP_DRYRUN_MISMATCH = "NO_OP_DRYRUN_MISMATCH"
NO_OP_GATE_FAIL = "NO_OP_GATE_FAIL"
NO_OP_STALE_SNAPSHOT = "NO_OP_STALE_SNAPSHOT"
NO_OP_DUPLICATE = "NO_OP_DUPLICATE"
NO_OP_FORBIDDEN_PATH = "NO_OP_FORBIDDEN_PATH"
# task-2639 신규 enum (spec §4) — chair_authorization snapshot 교차검증 결선.
NO_OP_AUTH_MISMATCH = "NO_OP_AUTH_MISMATCH"
CHAIR_REQUIRED_PRODUCTION_IN_SNAPSHOT = "CHAIR_REQUIRED_PRODUCTION_IN_SNAPSHOT"
CHAIR_REQUIRED_BLOCKING_SECRET_IN_SNAPSHOT = "CHAIR_REQUIRED_BLOCKING_SECRET_IN_SNAPSHOT"
CHAIR_REQUIRED_ADMIN_OVERRIDE_REQUIRED = "CHAIR_REQUIRED_ADMIN_OVERRIDE_REQUIRED"
REAL_MERGE_DONE = "REAL_MERGE_DONE"
REAL_MERGE_FAILED = "REAL_MERGE_FAILED"
POST_MERGE_SMOKE_FAILED = "POST_MERGE_SMOKE_FAILED"

NO_OP_RESULTS = frozenset({
    NO_OP_FLAG_DISABLED,
    NO_OP_NO_AUTHORIZATION,
    NO_OP_NOT_PASS,
    NO_OP_DRYRUN_MISMATCH,
    NO_OP_GATE_FAIL,
    NO_OP_STALE_SNAPSHOT,
    NO_OP_DUPLICATE,
    NO_OP_FORBIDDEN_PATH,
    NO_OP_AUTH_MISMATCH,
    CHAIR_REQUIRED_PRODUCTION_IN_SNAPSHOT,
    CHAIR_REQUIRED_BLOCKING_SECRET_IN_SNAPSHOT,
    CHAIR_REQUIRED_ADMIN_OVERRIDE_REQUIRED,
})

# ─────────────────────────────────────────────────────────────────────────────
# Forbidden paths static guard (spec §14 + task md ANCHOR-6 정본 11+종)
# ─────────────────────────────────────────────────────────────────────────────
FORBIDDEN_PATHS = (
    "utils/replacement_pr_runner.py",
    "scripts/finish-task.sh",
    "utils/merge_ready_classifier.py",
    "utils/merge_ready_dryrun_executor.py",
    "utils/callback_envelope_schema.py",
    "utils/anu_callback_registrar.py",
    "utils/canonical_root_resolver.py",
    "utils/anu_collector_action_trigger.py",
    "dispatch/finalize_hooks.py",
    # Gemini medium 대응: real merge wiring 결선 위치 자체도 forbidden — unauthorized 수정 차단.
    "dispatch/__init__.py",
)
FORBIDDEN_DIR_PREFIXES = (
    ".tasks/",
    "tests/fixtures/",
    "tests/regression/",
)

# Admin-override CLI tokens that must NEVER appear in any subprocess invocation
# this module composes (ANCHOR-3 static guard, spec §8).
_FORBIDDEN_CLI_TOKENS = frozenset({"--admin", "--force"})

# Gemini medium 대응: subprocess 호출 timeout magic-number 제거 → 명명 상수.
# 60 = pilot 권장값 (gh pr merge 통상 5~30s · 여유 60s).
_SUBPROCESS_TIMEOUT_SECONDS = 60


def detect_forbidden_paths(changed_files: Optional[List[str]]) -> List[str]:
    """Return the subset of changed_files that hit the forbidden list.

    Used by real_merge_execute before any other check so a contaminated PR
    short-circuits to NO_OP_FORBIDDEN_PATH (+ CHAIR_REPORT) without ever
    reaching the gate snapshot stage.
    """
    # Gemini security-HIGH 대응: fail-closed — None/누락 입력은 검증 불가 신호.
    # 빈 리스트 반환 시 forbidden 검사를 우회한 채 후속 단계 진입 위험.
    if changed_files is None:
        return ["__INPUT_NONE_FAIL_CLOSED__"]
    if not changed_files:
        # 빈 리스트는 명시적 "변경 파일 0" 의도 — forbidden 0 정상.
        return []
    hits: List[str] = []
    for path in changed_files:
        if not isinstance(path, str):
            continue
        if path in FORBIDDEN_PATHS:
            hits.append(path)
            continue
        for prefix in FORBIDDEN_DIR_PREFIXES:
            if path.startswith(prefix):
                hits.append(path)
                break
    return hits


def _now_kst() -> str:
    return (
        datetime.now(timezone.utc)
        .astimezone(timezone(timedelta(hours=9)))
        .strftime("%Y-%m-%dT%H:%M:%S+09:00")
    )


def _identity_or_default(pr_identity: Any) -> Dict[str, Any]:
    pi = pr_identity if isinstance(pr_identity, dict) else {}
    # Gemini medium 대응: pr / head_sha None-safe (TypeError 방지).
    raw_pr = pi.get("pr")
    try:
        safe_pr = int(raw_pr) if raw_pr is not None else 0
    except (TypeError, ValueError):
        safe_pr = 0
    raw_head = pi.get("head_sha", "")
    safe_head = raw_head if isinstance(raw_head, str) else ""
    return {
        "pr": safe_pr,
        "head_sha": safe_head,
        "task_id": pi.get("task_id", ""),
        "branch": pi.get("branch", ""),
        "base_sha": pi.get("base_sha", ""),
    }


def _build_decision(
    pr_identity: Dict[str, Any],
    *,
    result_enum: str,
    activation_flag: bool,
    chair_authorization_present: bool,
    dryrun_action: Optional[str],
    verdict: Optional[str],
    reasons: List[str],
    ts_kst: Optional[str],
    actually_executed: bool,
    allow_reason: Optional[str] = None,
    snapshot_crossref: Optional[Dict[str, Any]] = None,
) -> Dict[str, Any]:
    """Build a merge_decision payload (schema v2).

    task-2639 (spec §5): v1 → v2 bump. ``allow_reason`` + ``snapshot_crossref``
    필드 추가. 기존 callers 는 두 필드를 명시하지 않으면 ``None`` 기본값으로
    누락 (json 직렬화 시 ``null``) — v1 fixture 와 차이는 두 추가 필드뿐이며
    legacy 검증 키는 그대로 유지된다.
    """
    payload: Dict[str, Any] = {
        "schema": MERGE_DECISION_SCHEMA,
        "actually_executed": actually_executed,
        "ts_kst": ts_kst or _now_kst(),
        "pr_identity": pr_identity,
        "verdict_input": verdict or "",
        "dryrun_action_input": dryrun_action or "",
        "activation_flag": bool(activation_flag),
        "chair_authorization_present": bool(chair_authorization_present),
        "result_enum": result_enum,
        "reasons": list(reasons),
        # v2 신규 필드 — spec §5.
        "allow_reason": allow_reason,
        "snapshot_crossref": snapshot_crossref,
    }
    return payload


def _persist_decision(
    pr_identity: Dict[str, Any],
    decision: Dict[str, Any],
    *,
    canonical_root: Optional[str],
    write_artifacts: bool,
) -> Optional[str]:
    if not write_artifacts:
        return None
    pr = pr_identity.get("pr")
    head_sha = pr_identity.get("head_sha")
    if pr is None or not head_sha:
        return None
    return write_merge_decision(pr, head_sha, decision, canonical_root)


def _validate_runner_no_admin(argv: List[str]) -> List[str]:
    """Return any forbidden CLI tokens observed in argv.

    The injected subprocess_runner must receive argv composed by this module
    only — so the static guard is over our composition, not the runner.
    """
    return [tok for tok in argv if tok in _FORBIDDEN_CLI_TOKENS]


def _compose_merge_argv(pr: Any) -> List[str]:
    """Compose the ONLY allowed gh pr merge argv (non-admin, no force).

    Returns the argv but does NOT execute it. Callers MUST pass it through an
    injected subprocess_runner; the default runner is None which means "do
    not call subprocess at all" — the regression contract.
    """
    return [
        "gh", "pr", "merge", str(int(pr)),
        "--merge",
        "--delete-branch=false",
    ]


def real_merge_execute(
    merge_ready_result: Optional[Dict[str, Any]],
    pr_identity: Dict[str, Any],
    gate_snapshot: Any,
    dryrun_artifact: Any,
    callback_envelope: Any,
    chair_authorization: Any,
    *,
    activation_flag: bool = ACTIVATION_FLAG_DEFAULT,
    subprocess_runner: Optional[Callable[[List[str], int], Any]] = None,
    canonical_root: Optional[str] = None,
    changed_files: Optional[List[str]] = None,
    write_artifacts: bool = True,
    ts_kst: Optional[str] = None,
    clock: Optional[Any] = None,
) -> Dict[str, Any]:
    """Decide and (when fully authorized) execute a real PR merge.

    Defaults are fail-closed:
      * ``activation_flag=False`` (spec §6.3 hardcoded default).
      * ``chair_authorization=None`` → NO_OP_NO_AUTHORIZATION.
      * ``subprocess_runner=None`` → never invokes ``gh pr merge`` even when
        every gate passes (regression contract — task md "subprocess injection
        mock").

    The function ALWAYS returns a dict with ``result_enum`` ∈ NO_OP_* /
    REAL_MERGE_* + ``decision`` payload + ``execution_result`` (when merge
    fired) + ``merge_decision_path`` / ``execution_result_path`` (when artifact
    writers ran).
    """
    # Defensive re-assert the hardcoded default — spec ANCHOR-2.
    assert_default_off()

    pid = _identity_or_default(pr_identity)
    pr = pid["pr"]
    head_sha = pid["head_sha"]

    verdict = merge_ready_result.get("verdict") if isinstance(merge_ready_result, dict) else None
    dryrun_action = (
        dryrun_artifact.get("executor_action") if isinstance(dryrun_artifact, dict) else None
    )

    # ── Step 0a — 입력 검증 (None 입력 fail-closed) ──────────────────────────
    # 기존 detect_forbidden_paths 의 sentinel pattern 을 살리되, 새 흐름에서는
    # validator 도 동일하게 ``__INPUT_NONE_FAIL_CLOSED__`` 를
    # unauthorized_forbidden_hits 로 누적한다.
    crossref = validate_snapshot_crossref(
        pid,
        chair_authorization,
        changed_files,
        forbidden_paths=list(FORBIDDEN_PATHS),
        forbidden_dir_prefixes=list(FORBIDDEN_DIR_PREFIXES),
    )

    # ── Step 0b — chair_authorization pr/head_sha 정확 일치 (spec §3) ────────
    # chair_authorization 가 부재하면 Step 0b 건너뜀 — 기존 NO_OP_NO_AUTHORIZATION
    # (Step 2) 가 후속에서 동작. 부재가 아닌 경우에만 pr/sha exact match 강제.
    if chair_authorization is not None and not (
        crossref["pr_match"] and crossref["sha_match"]
    ):
        decision = _build_decision(
            pid,
            result_enum=NO_OP_AUTH_MISMATCH,
            activation_flag=bool(activation_flag),
            chair_authorization_present=True,
            dryrun_action=dryrun_action,
            verdict=verdict,
            reasons=[
                "chair_authorization pr/head_sha mismatch — "
                f"pr_match={crossref['pr_match']} sha_match={crossref['sha_match']}"
            ],
            ts_kst=ts_kst,
            actually_executed=False,
            allow_reason=None,
            snapshot_crossref=crossref,
        )
        path = _persist_decision(
            pid, decision, canonical_root=canonical_root, write_artifacts=write_artifacts
        )
        return {
            "result_enum": NO_OP_AUTH_MISMATCH,
            "decision": decision,
            "merge_decision_path": path,
            "execution_result_path": None,
            "execution_result": None,
            "chair_report_required": True,
            "subprocess_invocations": [],
        }

    # ── Step 0c — snapshot 외부 forbidden hit 차단 ───────────────────────────
    # snapshot 정합 forbidden 파일은 allow_reason 기록 후 통과
    # (Step 0d sanctioned split 은 crossref.classification 안에 이미 분리됨).
    unauthorized_forbidden_hits = crossref["classification"]["unauthorized_forbidden_hits"]
    authorized_forbidden_hits = crossref["classification"]["authorized_forbidden_hits"]
    if unauthorized_forbidden_hits:
        decision = _build_decision(
            pid,
            result_enum=NO_OP_FORBIDDEN_PATH,
            activation_flag=bool(activation_flag),
            chair_authorization_present=bool(chair_authorization is not None),
            dryrun_action=dryrun_action,
            verdict=verdict,
            reasons=[f"forbidden path hit: {p}" for p in unauthorized_forbidden_hits],
            ts_kst=ts_kst,
            actually_executed=False,
            allow_reason=None,
            snapshot_crossref=crossref,
        )
        path = _persist_decision(
            pid, decision, canonical_root=canonical_root, write_artifacts=write_artifacts
        )
        return {
            "result_enum": NO_OP_FORBIDDEN_PATH,
            "decision": decision,
            "merge_decision_path": path,
            "execution_result_path": None,
            "execution_result": None,
            "chair_report_required": True,
            "subprocess_invocations": [],
        }

    # ── Step 0e — snapshot 내 production / blocking_secret 검사 ─────────────
    # chair_authorization 에 expected_files_snapshot 가 명시되어 있을 때만
    # 검사 (snapshot_present=False → 기존 동작 유지 · ANCHOR-4 / spec §3 단서).
    if crossref["snapshot_present"]:
        if crossref["production_in_snapshot"]:
            decision = _build_decision(
                pid,
                result_enum=CHAIR_REQUIRED_PRODUCTION_IN_SNAPSHOT,
                activation_flag=bool(activation_flag),
                chair_authorization_present=True,
                dryrun_action=dryrun_action,
                verdict=verdict,
                reasons=[
                    "snapshot contains production-area paths: "
                    + ", ".join(crossref["production_in_snapshot"])
                ],
                ts_kst=ts_kst,
                actually_executed=False,
                allow_reason=None,
                snapshot_crossref=crossref,
            )
            path = _persist_decision(
                pid, decision, canonical_root=canonical_root, write_artifacts=write_artifacts
            )
            return {
                "result_enum": CHAIR_REQUIRED_PRODUCTION_IN_SNAPSHOT,
                "decision": decision,
                "merge_decision_path": path,
                "execution_result_path": None,
                "execution_result": None,
                "chair_report_required": True,
                "subprocess_invocations": [],
            }
        if crossref["blocking_secret_in_snapshot"]:
            decision = _build_decision(
                pid,
                result_enum=CHAIR_REQUIRED_BLOCKING_SECRET_IN_SNAPSHOT,
                activation_flag=bool(activation_flag),
                chair_authorization_present=True,
                dryrun_action=dryrun_action,
                verdict=verdict,
                reasons=[
                    "snapshot contains blocking-secret token paths: "
                    + ", ".join(crossref["blocking_secret_in_snapshot"])
                ],
                ts_kst=ts_kst,
                actually_executed=False,
                allow_reason=None,
                snapshot_crossref=crossref,
            )
            path = _persist_decision(
                pid, decision, canonical_root=canonical_root, write_artifacts=write_artifacts
            )
            return {
                "result_enum": CHAIR_REQUIRED_BLOCKING_SECRET_IN_SNAPSHOT,
                "decision": decision,
                "merge_decision_path": path,
                "execution_result_path": None,
                "execution_result": None,
                "chair_report_required": True,
                "subprocess_invocations": [],
            }

    # ── allow_reason — forbidden 파일이 snapshot exact match 로 통과한 경우 ──
    allow_reason = (
        ALLOW_REASON_SNAPSHOT_CROSSREF if authorized_forbidden_hits else None
    )

    # ── Step 1 — activation flag (default OFF) ───────────────────────────────
    if not bool(activation_flag):
        decision = _build_decision(
            pid,
            result_enum=NO_OP_FLAG_DISABLED,
            activation_flag=False,
            chair_authorization_present=bool(chair_authorization is not None),
            dryrun_action=dryrun_action,
            verdict=verdict,
            reasons=["activation_flag=False — default OFF (spec §6.3)"],
            ts_kst=ts_kst,
            actually_executed=False,
            allow_reason=allow_reason,
            snapshot_crossref=crossref,
        )
        path = _persist_decision(pid, decision, canonical_root=canonical_root, write_artifacts=write_artifacts)
        return {
            "result_enum": NO_OP_FLAG_DISABLED,
            "decision": decision,
            "merge_decision_path": path,
            "execution_result_path": None,
            "execution_result": None,
            "chair_report_required": False,
            "subprocess_invocations": [],
        }

    # ── Step 2 — chair authorization ─────────────────────────────────────────
    auth_ok, auth_code, auth_reasons = validate_chair_authorization(
        chair_authorization, pid, clock=clock
    )
    if not auth_ok:
        decision = _build_decision(
            pid,
            result_enum=NO_OP_NO_AUTHORIZATION,
            activation_flag=True,
            chair_authorization_present=bool(chair_authorization is not None),
            dryrun_action=dryrun_action,
            verdict=verdict,
            reasons=[f"chair_authorization {auth_code}: {r}" for r in auth_reasons] or [auth_code],
            ts_kst=ts_kst,
            actually_executed=False,
            allow_reason=allow_reason,
            snapshot_crossref=crossref,
        )
        path = _persist_decision(pid, decision, canonical_root=canonical_root, write_artifacts=write_artifacts)
        return {
            "result_enum": NO_OP_NO_AUTHORIZATION,
            "decision": decision,
            "merge_decision_path": path,
            "execution_result_path": None,
            "execution_result": None,
            "chair_report_required": False,
            "subprocess_invocations": [],
        }

    # ── Step 3 — verdict + dryrun action match ───────────────────────────────
    if verdict != PASS:
        decision = _build_decision(
            pid,
            result_enum=NO_OP_NOT_PASS,
            activation_flag=True,
            chair_authorization_present=True,
            dryrun_action=dryrun_action,
            verdict=verdict,
            reasons=[f"merge_ready_result.verdict={verdict!r} ≠ PASS"],
            ts_kst=ts_kst,
            actually_executed=False,
            allow_reason=allow_reason,
            snapshot_crossref=crossref,
        )
        path = _persist_decision(pid, decision, canonical_root=canonical_root, write_artifacts=write_artifacts)
        return {
            "result_enum": NO_OP_NOT_PASS,
            "decision": decision,
            "merge_decision_path": path,
            "execution_result_path": None,
            "execution_result": None,
            "chair_report_required": False,
            "subprocess_invocations": [],
        }

    if dryrun_action != ACTION_WOULD_MERGE:
        decision = _build_decision(
            pid,
            result_enum=NO_OP_DRYRUN_MISMATCH,
            activation_flag=True,
            chair_authorization_present=True,
            dryrun_action=dryrun_action,
            verdict=verdict,
            reasons=[
                f"dryrun_artifact.executor_action={dryrun_action!r} ≠ {ACTION_WOULD_MERGE!r}"
            ],
            ts_kst=ts_kst,
            actually_executed=False,
            allow_reason=allow_reason,
            snapshot_crossref=crossref,
        )
        path = _persist_decision(pid, decision, canonical_root=canonical_root, write_artifacts=write_artifacts)
        return {
            "result_enum": NO_OP_DRYRUN_MISMATCH,
            "decision": decision,
            "merge_decision_path": path,
            "execution_result_path": None,
            "execution_result": None,
            "chair_report_required": False,
            "subprocess_invocations": [],
        }

    # Schema sanity on dryrun (defensive — should be guaranteed by classifier→dryrun chain).
    if isinstance(dryrun_artifact, dict) and dryrun_artifact.get("schema") != SCHEMA_AUTO_MERGE_CANDIDATE:
        decision = _build_decision(
            pid,
            result_enum=NO_OP_DRYRUN_MISMATCH,
            activation_flag=True,
            chair_authorization_present=True,
            dryrun_action=dryrun_action,
            verdict=verdict,
            reasons=[
                f"dryrun_artifact.schema={dryrun_artifact.get('schema')!r} ≠ "
                f"{SCHEMA_AUTO_MERGE_CANDIDATE!r}"
            ],
            ts_kst=ts_kst,
            actually_executed=False,
            allow_reason=allow_reason,
            snapshot_crossref=crossref,
        )
        path = _persist_decision(pid, decision, canonical_root=canonical_root, write_artifacts=write_artifacts)
        return {
            "result_enum": NO_OP_DRYRUN_MISMATCH,
            "decision": decision,
            "merge_decision_path": path,
            "execution_result_path": None,
            "execution_result": None,
            "chair_report_required": False,
            "subprocess_invocations": [],
        }

    # ── Step 4 — gate snapshot (TTL + 11 gates) ──────────────────────────────
    gate_ok, gate_code, gate_reasons = validate_gate_snapshot(
        gate_snapshot, clock=clock
    )
    if not gate_ok:
        result_enum = NO_OP_STALE_SNAPSHOT if gate_code == GATE_STALE else NO_OP_GATE_FAIL
        # task-2639: chair_authorization 가 expected_files_snapshot 를 명시한
        # 상황에서 admin_override_required gate=true 면 별도 enum
        # CHAIR_REQUIRED_ADMIN_OVERRIDE_REQUIRED 로 격상 (spec §4).
        # snapshot_present=False (기존 fixture) 면 기존 NO_OP_GATE_FAIL 유지.
        admin_override_hit = False
        if isinstance(gate_snapshot, dict):
            for gate in gate_snapshot.get("gates", []) or []:
                if (
                    isinstance(gate, dict)
                    and gate.get("name") == "admin_override_required"
                    and bool(gate.get("value"))
                ):
                    admin_override_hit = True
                    break
        if (
            result_enum == NO_OP_GATE_FAIL
            and admin_override_hit
            and crossref["snapshot_present"]
        ):
            result_enum = CHAIR_REQUIRED_ADMIN_OVERRIDE_REQUIRED
        # Critical7 7종 + admin_override + blocking_secret 전부 CHAIR_REPORT 트리거 (spec §10).
        # Gemini medium 대응: 기존 2종 → 7종 전체 커버로 확대.
        chair_report = False
        snap_gates = (
            gate_snapshot.get("gates") if isinstance(gate_snapshot, dict) else []
        )
        # Critical7 카테고리 게이트 이름 매핑 (spec §10 / feedback_critical_escalation_only_260508).
        _C7_GATE_NAMES = {
            "admin_override_required",   # admin override 필요
            "blocking_secret",           # BLOCKING_SECRET / NET_NEW_IDENTIFIER_EXPOSURE
            "forbidden_path",            # forbidden path 수정
            "expected_files_exact",      # scope expansion (false 면 트리거)
            "critical7_hits",            # classifier 7 카테고리 hits > 0
            "replacement_pr_fail",       # replacement_pr_runner 실패
            "post_merge_smoke",          # post-merge smoke 실패 (사후 평가)
            "dep_cycle_or_serial",       # dep_cycle / serial collision
        }
        if isinstance(snap_gates, list):
            for gate in snap_gates:
                if not isinstance(gate, dict):
                    continue
                name = gate.get("name")
                if name not in _C7_GATE_NAMES:
                    continue
                if name == "admin_override_required" and bool(gate.get("value")):
                    chair_report = True
                elif name == "blocking_secret" and (
                    gate.get("value") != 0
                    or gate.get("net_new_identifier_exposure") not in (0, None)
                ):
                    chair_report = True
                elif name == "expected_files_exact" and gate.get("value") is False:
                    chair_report = True
                elif name == "forbidden_path" and (gate.get("value") or 0) != 0:
                    chair_report = True
                elif name == "critical7_hits" and (gate.get("value") or 0) != 0:
                    chair_report = True
                elif name == "replacement_pr_fail" and bool(gate.get("value")):
                    chair_report = True
                elif name == "post_merge_smoke" and gate.get("value") in (False, "FAIL"):
                    chair_report = True
                elif name == "dep_cycle_or_serial" and bool(gate.get("value")):
                    chair_report = True
        # CHAIR_REQUIRED_ADMIN_OVERRIDE_REQUIRED 격상 시 chair_report 강제.
        if result_enum == CHAIR_REQUIRED_ADMIN_OVERRIDE_REQUIRED:
            chair_report = True
        decision = _build_decision(
            pid,
            result_enum=result_enum,
            activation_flag=True,
            chair_authorization_present=True,
            dryrun_action=dryrun_action,
            verdict=verdict,
            reasons=[f"gate_snapshot {gate_code}: {r}" for r in gate_reasons] or [gate_code],
            ts_kst=ts_kst,
            actually_executed=False,
            allow_reason=allow_reason,
            snapshot_crossref=crossref,
        )
        path = _persist_decision(pid, decision, canonical_root=canonical_root, write_artifacts=write_artifacts)
        snapshot_path: Optional[str] = None
        if write_artifacts and isinstance(gate_snapshot, dict):
            snapshot_path = write_pre_merge_gate_snapshot(
                pr, head_sha, gate_snapshot, canonical_root
            )
        return {
            "result_enum": result_enum,
            "decision": decision,
            "merge_decision_path": path,
            "execution_result_path": None,
            "execution_result": None,
            "snapshot_path": snapshot_path,
            "chair_report_required": chair_report,
            "subprocess_invocations": [],
        }

    # ── Step 5 — dedupe (same pr/head_sha already merged) ────────────────────
    if artifact_already_present(pr, head_sha, canonical_root):
        decision = _build_decision(
            pid,
            result_enum=NO_OP_DUPLICATE,
            activation_flag=True,
            chair_authorization_present=True,
            dryrun_action=dryrun_action,
            verdict=verdict,
            reasons=["merge_execution_result.json already present for this (pr, head_sha)"],
            ts_kst=ts_kst,
            actually_executed=False,
            allow_reason=allow_reason,
            snapshot_crossref=crossref,
        )
        path = _persist_decision(pid, decision, canonical_root=canonical_root, write_artifacts=write_artifacts)
        return {
            "result_enum": NO_OP_DUPLICATE,
            "decision": decision,
            "merge_decision_path": path,
            "execution_result_path": None,
            "execution_result": None,
            "chair_report_required": False,
            "subprocess_invocations": [],
        }

    # ── Step 6 — real merge gate satisfied · author snapshot + decision ──────
    snapshot_path = None
    if write_artifacts and isinstance(gate_snapshot, dict):
        snapshot_path = write_pre_merge_gate_snapshot(pr, head_sha, gate_snapshot, canonical_root)

    decision = _build_decision(
        pid,
        result_enum=REAL_MERGE_DONE,  # provisional — flipped to FAILED below on exception
        activation_flag=True,
        chair_authorization_present=True,
        dryrun_action=dryrun_action,
        verdict=verdict,
        reasons=["all gates pass + chair_authorization OK + activation_flag=True"],
        ts_kst=ts_kst,
        actually_executed=True,
        allow_reason=allow_reason,
        snapshot_crossref=crossref,
    )
    # Compose the non-admin merge argv. Static guard re-checks that no admin
    # override token slipped in.
    argv = _compose_merge_argv(pr)
    leaks = _validate_runner_no_admin(argv)
    if leaks:
        # This is a hard programming error — no subprocess call attempted.
        decision["actually_executed"] = False
        decision["result_enum"] = NO_OP_GATE_FAIL
        decision["reasons"] = decision["reasons"] + [
            f"INTERNAL: admin-override token leak in argv: {leaks}"
        ]
        path = _persist_decision(pid, decision, canonical_root=canonical_root, write_artifacts=write_artifacts)
        return {
            "result_enum": NO_OP_GATE_FAIL,
            "decision": decision,
            "merge_decision_path": path,
            "execution_result_path": None,
            "execution_result": None,
            "snapshot_path": snapshot_path,
            "chair_report_required": True,
            "subprocess_invocations": [],
        }

    # ── Step 7 — invoke injected runner ──────────────────────────────────────
    subprocess_invocations: List[Dict[str, Any]] = []
    if subprocess_runner is None:
        # ★ pass_path_inert_artifact_only doctrine — runner absent means we record
        # the decision (actually_executed=True per spec for the pass branch when
        # gates pass) but NEVER invoke gh pr merge. Artifact 4종 정합 단언.
        execution_result = {
            "schema": EXECUTION_RESULT_SCHEMA,
            "ts_kst": ts_kst or _now_kst(),
            "pr_identity": pid,
            "result_enum": REAL_MERGE_DONE,
            "merge_commit_sha": "",
            "mergedAt": "",
            "mergedBy": "",
            "non_admin": True,
            "admin_override_used": False,
            "subprocess_invocations": [],
            "post_smoke_ok": None,
            "callback_handoff_event_id": "",
            "note": "subprocess_runner=None — inert pass path (no gh pr merge call)",
        }
        decision_path = _persist_decision(pid, decision, canonical_root=canonical_root, write_artifacts=write_artifacts)
        execution_path = None
        if write_artifacts:
            execution_path = write_merge_execution_result(pr, head_sha, execution_result, canonical_root)
        return {
            "result_enum": REAL_MERGE_DONE,
            "decision": decision,
            "merge_decision_path": decision_path,
            "execution_result_path": execution_path,
            "execution_result": execution_result,
            "snapshot_path": snapshot_path,
            "chair_report_required": False,
            "subprocess_invocations": [],
        }

    try:
        proc = subprocess_runner(argv, _SUBPROCESS_TIMEOUT_SECONDS)
    except Exception as exc:  # noqa: BLE001 — fail-closed wrapping is intentional
        decision["actually_executed"] = False
        decision["result_enum"] = REAL_MERGE_FAILED
        decision["reasons"] = decision["reasons"] + [f"subprocess raised: {exc!r}"]
        decision_path = _persist_decision(pid, decision, canonical_root=canonical_root, write_artifacts=write_artifacts)
        execution_result = {
            "schema": EXECUTION_RESULT_SCHEMA,
            "ts_kst": ts_kst or _now_kst(),
            "pr_identity": pid,
            "result_enum": REAL_MERGE_FAILED,
            "merge_commit_sha": "",
            "mergedAt": "",
            "mergedBy": "",
            "non_admin": True,
            "admin_override_used": False,
            "subprocess_invocations": [
                {"cmd": " ".join(argv), "exit": -1, "ts": ts_kst or _now_kst()}
            ],
            "post_smoke_ok": None,
            "callback_handoff_event_id": "",
            "error": f"{type(exc).__name__}: {exc}",
        }
        execution_path = None
        if write_artifacts:
            execution_path = write_merge_execution_result(pr, head_sha, execution_result, canonical_root)
        return {
            "result_enum": REAL_MERGE_FAILED,
            "decision": decision,
            "merge_decision_path": decision_path,
            "execution_result_path": execution_path,
            "execution_result": execution_result,
            "snapshot_path": snapshot_path,
            "chair_report_required": True,
            "subprocess_invocations": [
                {"cmd": " ".join(argv), "exit": -1, "ts": ts_kst or _now_kst()}
            ],
        }

    rc = getattr(proc, "returncode", 1)
    stdout = getattr(proc, "stdout", "") or ""
    stderr = getattr(proc, "stderr", "") or ""
    invocation = {
        "cmd": " ".join(argv),
        "exit": int(rc),
        "ts": ts_kst or _now_kst(),
        "stdout_tail": stdout[-200:],
        "stderr_tail": stderr[-200:],
    }
    subprocess_invocations.append(invocation)

    if rc != 0:
        decision["actually_executed"] = False
        decision["result_enum"] = REAL_MERGE_FAILED
        decision["reasons"] = decision["reasons"] + [
            f"subprocess exit={rc} stderr={stderr.strip()[:200]}"
        ]
        decision_path = _persist_decision(pid, decision, canonical_root=canonical_root, write_artifacts=write_artifacts)
        execution_result = {
            "schema": EXECUTION_RESULT_SCHEMA,
            "ts_kst": ts_kst or _now_kst(),
            "pr_identity": pid,
            "result_enum": REAL_MERGE_FAILED,
            "merge_commit_sha": "",
            "mergedAt": "",
            "mergedBy": "",
            "non_admin": True,
            "admin_override_used": False,
            "subprocess_invocations": subprocess_invocations,
            "post_smoke_ok": None,
            "callback_handoff_event_id": "",
        }
        execution_path = None
        if write_artifacts:
            execution_path = write_merge_execution_result(pr, head_sha, execution_result, canonical_root)
        return {
            "result_enum": REAL_MERGE_FAILED,
            "decision": decision,
            "merge_decision_path": decision_path,
            "execution_result_path": execution_path,
            "execution_result": execution_result,
            "snapshot_path": snapshot_path,
            "chair_report_required": True,
            "subprocess_invocations": subprocess_invocations,
        }

    decision_path = _persist_decision(pid, decision, canonical_root=canonical_root, write_artifacts=write_artifacts)
    execution_result = {
        "schema": EXECUTION_RESULT_SCHEMA,
        "ts_kst": ts_kst or _now_kst(),
        "pr_identity": pid,
        "result_enum": REAL_MERGE_DONE,
        "merge_commit_sha": getattr(proc, "merge_commit_sha", "") or "",
        "mergedAt": getattr(proc, "mergedAt", "") or "",
        "mergedBy": getattr(proc, "mergedBy", "") or "",
        "non_admin": True,
        "admin_override_used": False,
        "subprocess_invocations": subprocess_invocations,
        "post_smoke_ok": None,
        "callback_handoff_event_id": "",
    }
    execution_path = None
    if write_artifacts:
        execution_path = write_merge_execution_result(pr, head_sha, execution_result, canonical_root)
    return {
        "result_enum": REAL_MERGE_DONE,
        "decision": decision,
        "merge_decision_path": decision_path,
        "execution_result_path": execution_path,
        "execution_result": execution_result,
        "snapshot_path": snapshot_path,
        "chair_report_required": False,
        "subprocess_invocations": subprocess_invocations,
    }


def dryrun_to_real_switch(
    classifier_result: Dict[str, Any],
    dryrun_artifact: Dict[str, Any],
    pr_identity: Dict[str, Any],
    gate_snapshot: Any,
    callback_envelope: Any,
    chair_authorization: Any,
    *,
    activation_flag: Optional[bool] = None,
    subprocess_runner: Optional[Callable[[List[str], int], Any]] = None,
    canonical_root: Optional[str] = None,
    changed_files: Optional[List[str]] = None,
    write_artifacts: bool = True,
    env_lookup: Optional[Callable[[str], Optional[str]]] = None,
    clock: Optional[Any] = None,
) -> Dict[str, Any]:
    """Single entry-point invoked from dispatch/__init__.py (회장 결정 #8).

    Resolves the activation flag (env var) when not pinned by the caller and
    delegates to real_merge_execute. This indirection lets dispatch keep its
    import minimal (1 line) while the switch itself stays here, per the
    finalize_hooks-minimal directive.
    """
    if activation_flag is None:
        activation_flag = resolve_activation_flag(env_lookup)
    return real_merge_execute(
        merge_ready_result=classifier_result,
        pr_identity=pr_identity,
        gate_snapshot=gate_snapshot,
        dryrun_artifact=dryrun_artifact,
        callback_envelope=callback_envelope,
        chair_authorization=chair_authorization,
        activation_flag=activation_flag,
        subprocess_runner=subprocess_runner,
        canonical_root=canonical_root,
        changed_files=changed_files,
        write_artifacts=write_artifacts,
        clock=clock,
    )


# Compatibility re-exports (callers can refer to enums via this module).
__all__ = [
    "REAL_MERGE_HOOKS_SCHEMA",
    "NO_OP_FLAG_DISABLED",
    "NO_OP_NO_AUTHORIZATION",
    "NO_OP_NOT_PASS",
    "NO_OP_DRYRUN_MISMATCH",
    "NO_OP_GATE_FAIL",
    "NO_OP_STALE_SNAPSHOT",
    "NO_OP_DUPLICATE",
    "NO_OP_FORBIDDEN_PATH",
    # task-2639 신규 enum
    "NO_OP_AUTH_MISMATCH",
    "CHAIR_REQUIRED_PRODUCTION_IN_SNAPSHOT",
    "CHAIR_REQUIRED_BLOCKING_SECRET_IN_SNAPSHOT",
    "CHAIR_REQUIRED_ADMIN_OVERRIDE_REQUIRED",
    "REAL_MERGE_DONE",
    "REAL_MERGE_FAILED",
    "POST_MERGE_SMOKE_FAILED",
    "NO_OP_RESULTS",
    "FORBIDDEN_PATHS",
    "FORBIDDEN_DIR_PREFIXES",
    "detect_forbidden_paths",
    "real_merge_execute",
    "dryrun_to_real_switch",
]
