# -*- coding: utf-8 -*-
"""utils.real_merge_artifact_schema — artifact writers + post_smoke schema-only.

task-2637 — real merge executor wiring 코드화 (activation false default · 실제 merge 실행 0).

Spec: memory/specs/system_real_merge_executor_wiring_spec_260523.md §7
sha256: bcaf654e981a43083af50879164021c918eeac9753cad3b3ad146209a1a62765

회장 verbatim (회장 10결정 #6 + spec §7):
    artifact 위치: memory/events/real_merge/<pr>/<head_sha>/
    canonical_root resolver (task-2636) 기준 atomic write
    dedupe (pr + head_sha 동일 → 재호출 0 부작용)

writers (구현):
    * write_merge_decision           → merge_decision.json (schema real_merge.decision.v1)
    * write_pre_merge_gate_snapshot  → pre_merge_gate_snapshot.json
    * write_merge_execution_result   → merge_execution_result.json

schema-only (writer 본 task 범위 외 — pilot 단계에서 구현, 회장 결정 task-2637 expected_files):
    * POST_SMOKE_SCHEMA + POST_SMOKE_REQUIRED_KEYS — 스키마 정의 + validate_post_smoke_payload
"""
from __future__ import annotations

import os
from pathlib import Path
from typing import Any, Dict, List, Optional

from utils.atomic_write import atomic_json_write
from utils.callback_envelope_schema import CANONICAL_ROOT_DEFAULT

# ─────────────────────────────────────────────────────────────────────────────
# schema ids (spec §7.x)
# ─────────────────────────────────────────────────────────────────────────────
# task-2639 schema v1 → v2 bump: allow_reason + snapshot_crossref 필드 추가.
# v1 은 callers (legacy fixture) 호환을 위해 alias 유지.
MERGE_DECISION_SCHEMA = "real_merge.decision.v2"
MERGE_DECISION_SCHEMA_V1 = "real_merge.decision.v1"
MERGE_DECISION_SCHEMA_V2 = "real_merge.decision.v2"
PRE_GATE_SNAPSHOT_SCHEMA = "real_merge.pre_gate_snapshot.v1"
EXECUTION_RESULT_SCHEMA = "real_merge.execution_result.v1"
POST_SMOKE_SCHEMA = "real_merge.post_smoke.v1"  # schema-only (no writer)

# v2 신규 optional 필드 (spec §5).
MERGE_DECISION_V2_OPTIONAL_KEYS = (
    "allow_reason",
    "snapshot_crossref",
)

# Artifact directory layout (spec §7 / 회장 결정 #6).
REAL_MERGE_EVENTS_RELDIR = "memory/events/real_merge"

# Filenames inside <pr>/<head_sha>/.
MERGE_DECISION_FILENAME = "merge_decision.json"
PRE_GATE_SNAPSHOT_FILENAME = "pre_merge_gate_snapshot.json"
EXECUTION_RESULT_FILENAME = "merge_execution_result.json"
POST_SMOKE_FILENAME = "post_merge_smoke_result.json"  # schema-only reservation

# Schema-only required keys (used by validate_post_smoke_payload).
POST_SMOKE_REQUIRED_KEYS = (
    "schema",
    "ts_kst",
    "merge_commit_sha",
    "origin_main_before",
    "origin_main_after",
    "smoke_checks",
    "all_ok",
)


def _pr_dir(pr: Any, head_sha: str) -> str:
    """Build the ``<pr>/<head_sha>/`` segment with deterministic naming."""
    try:
        pr_int = int(pr)
    except (TypeError, ValueError):
        raise ValueError(f"pr must be int-convertible, got {pr!r}")
    if not isinstance(head_sha, str) or not head_sha:
        raise ValueError("head_sha must be a non-empty string")
    return f"pr_{pr_int}/{head_sha}"


def resolve_artifact_dir(pr: int, head_sha: str, canonical_root: Optional[str] = None) -> str:
    """Return the absolute directory path for a given (pr, head_sha)."""
    root = canonical_root or CANONICAL_ROOT_DEFAULT
    if not os.path.isabs(root):
        raise ValueError(f"canonical_root must be absolute: {root!r}")
    return os.path.normpath(os.path.join(root, REAL_MERGE_EVENTS_RELDIR, _pr_dir(pr, head_sha)))


def artifact_already_present(
    pr: int,
    head_sha: str,
    canonical_root: Optional[str] = None,
    *,
    filename: str = EXECUTION_RESULT_FILENAME,
    fs_exists: Optional[Any] = None,
) -> bool:
    """Dedupe check (spec §12.1).

    True if the named artifact already exists for this (pr, head_sha) — caller
    must transition to NO_OP_DUPLICATE without writing or invoking merge.
    """
    exists = fs_exists if fs_exists is not None else os.path.exists
    target = os.path.join(
        resolve_artifact_dir(pr, head_sha, canonical_root), filename
    )
    return bool(exists(target))


def _ensure_dir(path: str) -> None:
    Path(path).mkdir(parents=True, exist_ok=True)


def write_merge_decision(
    pr: int,
    head_sha: str,
    payload: Dict[str, Any],
    canonical_root: Optional[str] = None,
) -> str:
    """Atomic write ``merge_decision.json`` (spec §7.1)."""
    out_dir = resolve_artifact_dir(pr, head_sha, canonical_root)
    _ensure_dir(out_dir)
    target = os.path.join(out_dir, MERGE_DECISION_FILENAME)
    canonical_payload = dict(payload)
    canonical_payload.setdefault("schema", MERGE_DECISION_SCHEMA)
    atomic_json_write(target, canonical_payload)
    return target


def write_pre_merge_gate_snapshot(
    pr: int,
    head_sha: str,
    payload: Dict[str, Any],
    canonical_root: Optional[str] = None,
) -> str:
    """Atomic write ``pre_merge_gate_snapshot.json`` (spec §7.2)."""
    out_dir = resolve_artifact_dir(pr, head_sha, canonical_root)
    _ensure_dir(out_dir)
    target = os.path.join(out_dir, PRE_GATE_SNAPSHOT_FILENAME)
    canonical_payload = dict(payload)
    canonical_payload.setdefault("schema", PRE_GATE_SNAPSHOT_SCHEMA)
    atomic_json_write(target, canonical_payload)
    return target


def write_merge_execution_result(
    pr: int,
    head_sha: str,
    payload: Dict[str, Any],
    canonical_root: Optional[str] = None,
) -> str:
    """Atomic write ``merge_execution_result.json`` (spec §7.4)."""
    out_dir = resolve_artifact_dir(pr, head_sha, canonical_root)
    _ensure_dir(out_dir)
    target = os.path.join(out_dir, EXECUTION_RESULT_FILENAME)
    canonical_payload = dict(payload)
    canonical_payload.setdefault("schema", EXECUTION_RESULT_SCHEMA)
    atomic_json_write(target, canonical_payload)
    return target


def validate_post_smoke_payload(payload: Any) -> List[str]:
    """Schema-only validator for post_merge_smoke_result.json (spec §7.3).

    Writer is intentionally out of scope for task-2637 (pilot 단계에서 구현).
    This validator exists so the schema contract is fixed *now* — callers can
    author payloads with confidence and the pilot writer will reuse the check.
    """
    errors: List[str] = []
    if not isinstance(payload, dict):
        return [f"payload must be dict, got {type(payload).__name__}"]
    if payload.get("schema") != POST_SMOKE_SCHEMA:
        errors.append(
            f"schema must be {POST_SMOKE_SCHEMA!r}, got {payload.get('schema')!r}"
        )
    for key in POST_SMOKE_REQUIRED_KEYS:
        if key not in payload:
            errors.append(f"required key missing: {key}")
    smoke = payload.get("smoke_checks")
    if smoke is not None and not isinstance(smoke, list):
        errors.append(f"smoke_checks must be a list, got {type(smoke).__name__}")
    all_ok = payload.get("all_ok")
    if all_ok is not None and not isinstance(all_ok, bool):
        errors.append(f"all_ok must be bool, got {type(all_ok).__name__}")
    return errors
