"""anu_v3.policy_profile_engine — POLICY_PROFILE_ENGINE core (task-2553+33 / Track C1).

회장 GO Track C(POLICY_PROFILE_ENGINE_GENERALIZATION) sub-track C1 core.

목표 (회장 verbatim §1):
  goal_type + policy_profile + boundary 만으로 ANU 가 profile 을 로딩하고,
  gate / HOLD / allowed_actions / forbidden_actions / evidence schema /
  completion packet schema 를 **자동 산출**한다. 회장이 매번 풀어 쓰지 않는다.

본 모듈 한정 책임 (순수, 부작용 0 — GitHub API 0, token 접근 0, 파일 쓰기 0):
  - goal_request parser (정적 meta-schema 검증)
  - policy_profile loader (read-only; per-profile concrete + 정적 meta-schema 검증)
  - goal_type resolver
  - boundary resolver
  - gate / HOLD / allowed_actions / forbidden_actions expansion
  - completion packet schema resolver (정적 meta-schema 참조 + per-profile concrete in-memory)
  - evidence schema resolver (정적 meta-schema 참조 + per-profile concrete in-memory)
  - decision/result JSON 산출 (PolicyResolution.to_decision_dict)
  - batch coordinator 연결 어댑터 (순수 dict, coordinator import 0 — one-way isolation)

설계 invariant:
  - 신규 별도 모듈 (additive). 기존 anu_v3 tracked 파일 import / mutation 0.
  - 기존 단일 profile(test_only_hardening_pr_merge_v1) = read-only 입력, 의미 보존.
  - 정적 meta-schema 4종 = 참조·검증 대상. per-profile concrete 산출물은 런타임
    in-memory / decision JSON 내 표현만 (동적 schema 파일 생성·공유 schema mutation 0).
  - jsonschema 등 외부 의존성 0 — draft-07 subset validator 내장 (offline 100%).
"""

from __future__ import annotations

import json
import re
from dataclasses import asdict, dataclass, field
from pathlib import Path
from typing import Any, Final, Mapping, Sequence

ENGINE_MODULE: Final[str] = "anu_v3.policy_profile_engine"
ENGINE_VERSION: Final[str] = "task-2553+33.C1.v1"

# 정적 meta-schema 4종 (resolver 가 참조·검증; 동적 생성 0). 9-R.3 / 9-R.5.
# 파일명은 §4 권한(repo 충돌 없게 확정)으로 task-namespaced — 기존 untracked
# schemas/goal_request.schema.json(task-2553+17) byte-0 보존, additive only.
SCHEMA_DIR_DEFAULT: Final[str] = "schemas"
META_GOAL_REQUEST: Final[str] = "goal_request_2553plus33.schema.json"
META_POLICY_PROFILE: Final[str] = "policy_profile_2553plus33.schema.json"
META_COMPLETION_PACKET: Final[str] = "completion_packet_2553plus33.schema.json"
META_EVIDENCE: Final[str] = "evidence_2553plus33.schema.json"

# read-only profile 입력 위치 (mutation 0, +22 거버넌스).
PROFILE_JSON_DIR_DEFAULT: Final[str] = "memory/policy_profiles"
PROFILE_SCHEMA_DIR_DEFAULT: Final[str] = "schemas/policy_profiles"

# 엔진 universal forbidden (회장 §7 D-SPEC-EXACTNESS 박제 — 모든 profile 공통).
UNIVERSAL_FORBIDDEN_ACTIONS: Final[tuple[str, ...]] = (
    "production_code_change",
    "pr_branch_main_write",
    "merge_without_gate_pass",
    "credential_or_owner_pat_op",
    "frozen_anchor_mutation",
    "callback_mandatory_rule_weaken",
    "runtime_checkpoint_as_primary_callback",
    "batch_coordinator_original_destroy",
    "doc_or_memory_only_completion",
)

# 엔진 universal allowed (gate 통과 전제 하의 결정·증거·패킷 행위).
UNIVERSAL_ALLOWED_ACTIONS: Final[tuple[str, ...]] = (
    "gate_evaluate",
    "evidence_collect",
    "completion_packet_emit",
)


# ---------------------------------------------------------------------------
# 예외 — fail-closed (절대 활성화 진입 불가)
# ---------------------------------------------------------------------------
class PolicyEngineError(ValueError):
    """policy engine parse / load / 검증 실패. ``self.code`` = 실패 사유 코드."""

    def __init__(self, code: str, message: str) -> None:
        super().__init__(f"[{code}] {message}")
        self.code = code
        self.message = message


# ---------------------------------------------------------------------------
# draft-07 subset validator (외부 의존성 0, offline)
#   지원: type, required, properties, additionalProperties(bool),
#         const, enum, pattern, items, minItems, maxItems, minimum, minLength
# ---------------------------------------------------------------------------
_TYPE_MAP: Final[dict[str, tuple[type, ...]]] = {
    "object": (dict,),
    "array": (list, tuple),
    "string": (str,),
    "integer": (int,),
    "number": (int, float),
    "boolean": (bool,),
}


def _type_ok(value: Any, json_type: str) -> bool:
    if json_type == "integer":
        return isinstance(value, int) and not isinstance(value, bool)
    if json_type == "number":
        return isinstance(value, (int, float)) and not isinstance(value, bool)
    if json_type == "boolean":
        return isinstance(value, bool)
    return isinstance(value, _TYPE_MAP.get(json_type, (object,)))


def _validate(value: Any, schema: Mapping[str, Any], path: str, errs: list[str]) -> None:
    jt = schema.get("type")
    if isinstance(jt, str) and not _type_ok(value, jt):
        errs.append(f"{path}: type 불일치 (expected {jt}, got {type(value).__name__})")
        return
    if "const" in schema and value != schema["const"]:
        errs.append(f"{path}: const 불일치 (expected {schema['const']!r}, got {value!r})")
    if "enum" in schema and value not in schema["enum"]:
        errs.append(f"{path}: enum 밖 ({value!r} not in {schema['enum']!r})")
    if isinstance(value, str):
        pat = schema.get("pattern")
        if pat and re.search(pat, value) is None:
            errs.append(f"{path}: pattern 불일치 ({value!r} !~ /{pat}/)")
        ml = schema.get("minLength")
        if isinstance(ml, int) and len(value) < ml:
            errs.append(f"{path}: minLength {ml} 미만")
    if isinstance(value, (int, float)) and not isinstance(value, bool):
        mn = schema.get("minimum")
        if isinstance(mn, (int, float)) and value < mn:
            errs.append(f"{path}: minimum {mn} 미만")
    if isinstance(value, (list, tuple)):
        mi = schema.get("minItems")
        if isinstance(mi, int) and len(value) < mi:
            errs.append(f"{path}: minItems {mi} 미만")
        mx = schema.get("maxItems")
        if isinstance(mx, int) and len(value) > mx:
            errs.append(f"{path}: maxItems {mx} 초과")
        item_schema = schema.get("items")
        if isinstance(item_schema, Mapping):
            for i, item in enumerate(value):
                _validate(item, item_schema, f"{path}[{i}]", errs)
    if isinstance(value, dict):
        props: Mapping[str, Any] = schema.get("properties", {}) or {}
        for req in schema.get("required", []) or []:
            if req not in value:
                errs.append(f"{path}: 필수 key '{req}' 누락")
        ap = schema.get("additionalProperties", True)
        for k, v in value.items():
            if k in props:
                _validate(v, props[k], f"{path}.{k}", errs)
            elif ap is False:
                errs.append(f"{path}: additionalProperties false 인데 '{k}' 존재")


def validate_against_meta(instance: Any, meta_schema: Mapping[str, Any]) -> list[str]:
    """instance 를 draft-07 subset meta-schema 로 검증. 빈 list = PASS."""
    errs: list[str] = []
    _validate(instance, meta_schema, "$", errs)
    return errs


# ---------------------------------------------------------------------------
# meta-schema 로더 (정적 — 참조만, 동적 생성 0)
# ---------------------------------------------------------------------------
def _read_json(p: Path, code: str) -> dict[str, Any]:
    try:
        raw = p.read_text(encoding="utf-8")
    except OSError as e:
        raise PolicyEngineError(code, f"파일 읽기 실패 {p}: {e}") from e
    try:
        obj = json.loads(raw)
    except json.JSONDecodeError as e:
        raise PolicyEngineError(code, f"JSON 파싱 실패 {p}: {e}") from e
    if not isinstance(obj, dict):
        raise PolicyEngineError(code, f"최상위 dict 아님 {p}")
    return obj


def load_meta_schema(name: str, *, schema_dir: str | Path = SCHEMA_DIR_DEFAULT) -> dict[str, Any]:
    """정적 meta-schema 로드 (참조 전용; 절대 mutation 0)."""
    return _read_json(Path(schema_dir) / name, "meta_schema_load_fail")


# ---------------------------------------------------------------------------
# 1. goal_request parser
# ---------------------------------------------------------------------------
def parse_goal_request(
    obj: Any, *, schema_dir: str | Path = SCHEMA_DIR_DEFAULT
) -> dict[str, Any]:
    """chair-supplied goal_request → 정적 meta-schema 검증된 dict (fail-closed).

    입력: {goal_id, goal_statement, boundary?, goal_type?, policy_profile{name}}.
    """
    if not isinstance(obj, Mapping):
        raise PolicyEngineError("goal_request_not_mapping", f"goal_request dict 아님: {type(obj).__name__}")
    meta = load_meta_schema(META_GOAL_REQUEST, schema_dir=schema_dir)
    errs = validate_against_meta(dict(obj), meta)
    if errs:
        raise PolicyEngineError("goal_request_schema_fail", "; ".join(errs))
    out = dict(obj)
    out["boundary"] = list(out.get("boundary") or [])
    return out


# ---------------------------------------------------------------------------
# 2. policy_profile loader (read-only)
# ---------------------------------------------------------------------------
def load_policy_profile(
    name: str,
    *,
    profile_json_dir: str | Path = PROFILE_JSON_DIR_DEFAULT,
    profile_schema_dir: str | Path = PROFILE_SCHEMA_DIR_DEFAULT,
    schema_dir: str | Path = SCHEMA_DIR_DEFAULT,
) -> dict[str, Any]:
    """policy profile 을 이름만으로 로딩 (read-only, mutation 0).

    이중 검증:
      (a) per-profile concrete schema (schemas/policy_profiles/<name>.schema.json)
      (b) 엔진 generic 정적 meta-schema (policy_profile_2553plus33.schema.json)
    둘 다 통과해야 함 (fail-closed). 기존 단일 profile 의미 보존.
    """
    if not name or not isinstance(name, str):
        raise PolicyEngineError("profile_name_invalid", f"profile name 무효: {name!r}")
    profile = _read_json(Path(profile_json_dir) / f"{name}.json", "profile_load_fail")

    concrete_schema_path = Path(profile_schema_dir) / f"{name}.schema.json"
    if concrete_schema_path.exists():
        concrete = _read_json(concrete_schema_path, "profile_concrete_schema_load_fail")
        errs = validate_against_meta(profile, concrete)
        if errs:
            raise PolicyEngineError("profile_concrete_schema_fail", "; ".join(errs))

    generic = load_meta_schema(META_POLICY_PROFILE, schema_dir=schema_dir)
    gerrs = validate_against_meta(profile, generic)
    if gerrs:
        raise PolicyEngineError("profile_generic_meta_fail", "; ".join(gerrs))
    return profile


# ---------------------------------------------------------------------------
# 3. goal_type resolver
# ---------------------------------------------------------------------------
def resolve_goal_type(goal_request: Mapping[str, Any], profile: Mapping[str, Any]) -> str:
    """goal_type 산출. 우선순위: 명시 goal_type > profile_id 추론 > generic."""
    explicit = goal_request.get("goal_type")
    if isinstance(explicit, str) and explicit.strip():
        return explicit.strip()
    pid = str(profile.get("profile_id", ""))
    if "pr_merge" in pid or "merge" in pid:
        return "pr_merge_lifecycle"
    if "hardening" in pid:
        return "hardening_lifecycle"
    return "generic_goal"


# ---------------------------------------------------------------------------
# 4. boundary resolver
# ---------------------------------------------------------------------------
def resolve_boundary(
    goal_request: Mapping[str, Any], profile: Mapping[str, Any]
) -> dict[str, Any]:
    """chair boundary[] + profile scope_invariants → 정규화 boundary dict."""
    explicit = [str(b) for b in (goal_request.get("boundary") or [])]
    si = profile.get("scope_invariants") or {}
    return {
        "explicit": explicit,
        "forbidden_paths_regex": list(si.get("forbidden_paths_regex") or []),
        "live_ws_path": si.get("live_ws_path"),
        "live_ws_sanctioned_commit": si.get("live_ws_sanctioned_commit"),
        "live_ws_branch": si.get("live_ws_branch"),
        "ops_isolation": si.get("merge_ops_isolation"),
    }


# ---------------------------------------------------------------------------
# 5. gate / HOLD / allowed / forbidden expansion
# ---------------------------------------------------------------------------
@dataclass
class GateCondition:
    name: str
    expected: Any


def expand_gate(profile: Mapping[str, Any]) -> list[GateCondition]:
    """profile AND-gate predicate → ordered GateCondition list.

    generic: ``gate_predicate`` 우선, 없으면 ``merge_ready_predicate``
    (기존 단일 profile 의미 보존). ALL true 여야 gate PASS.
    """
    pred = profile.get("gate_predicate")
    if not isinstance(pred, Mapping):
        pred = profile.get("merge_ready_predicate")
    if not isinstance(pred, Mapping):
        return []
    return [GateCondition(name=k, expected=v) for k, v in pred.items()]


def expand_hold(profile: Mapping[str, Any]) -> list[str]:
    """profile hold_conditions 중 truthy → HOLD trigger 이름 list. any true => HOLD."""
    hc = profile.get("hold_conditions") or {}
    if not isinstance(hc, Mapping):
        return []
    return [str(k) for k, v in hc.items() if v]


def expand_allowed_actions(
    profile: Mapping[str, Any], goal_type: str
) -> list[str]:
    """profile → allowed_actions. 명시 우선, 없으면 인식된 block 에서 파생."""
    explicit = profile.get("allowed_actions")
    if isinstance(explicit, (list, tuple)):
        return [str(a) for a in explicit]
    out: list[str] = list(UNIVERSAL_ALLOWED_ACTIONS)
    for step in profile.get("post_merge_steps") or []:
        out.append(f"post_merge:{step}")
    mm = profile.get("merge_method") or {}
    if isinstance(mm, Mapping) and mm.get("default_preference"):
        out.append(f"merge_method:{mm['default_preference']}")
    gtr = profile.get("gemini_thread_resolve") or {}
    if isinstance(gtr, Mapping) and int(gtr.get("max_resolve_calls", 0) or 0) > 0:
        out.append("review_thread_resolve:bounded")
    out.append(f"goal_type:{goal_type}")
    # 안정적 dedup (순서 보존).
    seen: set[str] = set()
    return [a for a in out if not (a in seen or seen.add(a))]


_BOUNDARY_DENY_PREFIXES: Final[tuple[str, ...]] = ("forbid:", "no:", "deny:")


def normalize_boundary_deny(token: str) -> str | None:
    """boundary deny 토큰을 bare action id 로 정규화.

    'forbid:completion_packet_emit' -> 'completion_packet_emit'.
    deny 접두사가 없으면 None (= deny 아님). Codex CRITICAL fix:
    raw 표현 mismatch 로 boundary 가 우회되지 않도록 한다.
    """
    low = token.strip().lower()
    for pfx in _BOUNDARY_DENY_PREFIXES:
        if low.startswith(pfx):
            bare = token.strip()[len(pfx):].strip()
            return bare or None
    return None


def expand_forbidden_actions(
    profile: Mapping[str, Any], boundary: Mapping[str, Any]
) -> list[str]:
    """universal forbidden + profile/boundary 파생 forbidden.

    boundary deny 는 raw 토큰과 **정규화된 bare action id** 를 모두 등재해
    allowed_actions 와의 교집합 검사/가지치기에서 표현 mismatch 우회를
    차단한다 (Codex CRITICAL fix).
    """
    out: list[str] = list(UNIVERSAL_FORBIDDEN_ACTIONS)
    explicit = profile.get("forbidden_actions")
    if isinstance(explicit, (list, tuple)):
        out.extend(str(a) for a in explicit)
    for rgx in boundary.get("forbidden_paths_regex") or []:
        out.append(f"write_path:{rgx}")
    for b in boundary.get("explicit") or []:
        bs = str(b)
        bare = normalize_boundary_deny(bs)
        if bare is not None:
            out.append(bs)    # raw 보존 (감사 추적성)
            out.append(bare)  # 정규화 — 실제 binding
    seen: set[str] = set()
    return [a for a in out if not (a in seen or seen.add(a))]


# ---------------------------------------------------------------------------
# 6 / 7. completion packet & evidence schema resolver (정적 meta-schema 참조)
# ---------------------------------------------------------------------------
def resolve_completion_packet_schema(
    profile: Mapping[str, Any],
    *,
    schema_dir: str | Path = SCHEMA_DIR_DEFAULT,
) -> dict[str, Any]:
    """completion packet schema resolver.

    반환: {meta_schema(정적 ref), concrete(per-profile in-memory spec)}.
    동적 schema 파일 생성 0 / 공유 schema mutation 0 (9-R.3).
    """
    meta_path = Path(schema_dir) / META_COMPLETION_PACKET
    meta = load_meta_schema(META_COMPLETION_PACKET, schema_dir=schema_dir)
    required = list(meta.get("required") or [])
    declared = profile.get("completion_packet")
    concrete = {
        "schema_id": "anu_v3.policy_profile_engine.completion_packet.v1",
        "profile_id": profile.get("profile_id"),
        "required_fields": required,
        "profile_declared": declared if isinstance(declared, Mapping) else None,
    }
    return {"meta_schema_ref": str(meta_path), "concrete": concrete}


def resolve_evidence_schema(
    profile: Mapping[str, Any],
    *,
    schema_dir: str | Path = SCHEMA_DIR_DEFAULT,
) -> dict[str, Any]:
    """evidence schema resolver. 정적 meta-schema 참조 + per-profile concrete in-memory."""
    meta_path = Path(schema_dir) / META_EVIDENCE
    meta = load_meta_schema(META_EVIDENCE, schema_dir=schema_dir)
    required = list(meta.get("required") or [])
    declared = profile.get("evidence")
    concrete = {
        "schema_id": "anu_v3.policy_profile_engine.evidence.v1",
        "profile_id": profile.get("profile_id"),
        "required_fields": required,
        "profile_declared": declared if isinstance(declared, Mapping) else None,
    }
    return {"meta_schema_ref": str(meta_path), "concrete": concrete}


# ---------------------------------------------------------------------------
# 8. PolicyResolution + decision/result JSON
# ---------------------------------------------------------------------------
def evaluate_hold(
    hold_trigger_conditions: Sequence[str],
    runtime_signals: Mapping[str, Any],
) -> tuple[bool, list[str]]:
    """런타임 HOLD 평가기 (순수). HOLD 의미를 명시적으로 구현.

    profile.hold_conditions 의 truthy 항목 = **enabled HOLD trigger 이름**
    (정의-시점 enablement, schema const:true). 실제 HOLD 는 그 trigger 가
    런타임에 *관측*될 때 발생한다. resolve-time status 는 contract 도출만
    반영하며 런타임 발생을 미리 단정하지 않는다 (Codex HIGH fix — 정의-시점
    enablement 와 런타임 발생 분리).

    반환: (hold: bool, fired: 발화된 trigger 이름 list). ANY fired => HOLD.
    """
    fired = [c for c in hold_trigger_conditions if runtime_signals.get(c)]
    return (bool(fired), fired)


@dataclass
class PolicyResolution:
    goal_id: str
    goal_type: str
    profile_id: str
    profile_version: str
    boundary: dict[str, Any]
    gate: list[GateCondition]
    hold_conditions: list[str]  # = enabled HOLD trigger names (정의-시점)
    allowed_actions: list[str]
    forbidden_actions: list[str]
    completion_packet_schema: dict[str, Any]
    evidence_schema: dict[str, Any]
    status: str = "RESOLVED"  # RESOLVED | HOLD_FOR_CHAIR
    hold_reason: str | None = None
    engine: str = ENGINE_MODULE
    engine_version: str = ENGINE_VERSION
    notes: list[str] = field(default_factory=list)

    def to_decision_dict(self) -> dict[str, Any]:
        return {
            "schema": "anu_v3.policy_profile_engine.decision.v1",
            "engine": self.engine,
            "engine_version": self.engine_version,
            "goal_id": self.goal_id,
            "goal_type": self.goal_type,
            "profile_id": self.profile_id,
            "profile_version": self.profile_version,
            "status": self.status,
            "hold_reason": self.hold_reason,
            "boundary": self.boundary,
            "gate": [asdict(g) for g in self.gate],
            "gate_semantics": "AND — ALL conditions must hold for PASS",
            "hold_trigger_conditions": list(self.hold_conditions),
            "hold_semantics": (
                "enabled HOLD triggers (definition-time enablement). ANY trigger "
                "OBSERVED at runtime => HOLD, action 0. resolve-time status "
                "reflects contract derivation only, not runtime occurrence — "
                "use evaluate_hold(hold_trigger_conditions, runtime_signals)."
            ),
            "allowed_actions": list(self.allowed_actions),
            "forbidden_actions": list(self.forbidden_actions),
            "completion_packet_schema": self.completion_packet_schema,
            "evidence_schema": self.evidence_schema,
            "notes": list(self.notes),
        }

    def to_coordinator_binding(self) -> dict[str, Any]:
        """batch coordinator 연결용 순수 dict 어댑터 (coordinator import 0).

        §2: batch coordinator 와 연결 가능해야 함 — coordinator 가 track 별
        gate/hold/allowed/forbidden 을 그대로 소비할 수 있는 평면 표현.
        """
        return {
            "goal_id": self.goal_id,
            "goal_type": self.goal_type,
            "profile_id": self.profile_id,
            "status": self.status,
            "gate_condition_names": [g.name for g in self.gate],
            "hold_trigger_conditions": list(self.hold_conditions),
            "allowed_actions": list(self.allowed_actions),
            "forbidden_actions": list(self.forbidden_actions),
            "completion_packet_meta_ref": self.completion_packet_schema.get("meta_schema_ref"),
            "evidence_meta_ref": self.evidence_schema.get("meta_schema_ref"),
        }


def build_completion_packet_skeleton(
    resolution: "PolicyResolution",
    *,
    task_id: str,
    final_status: str,
    generated_at_utc: str,
    evidence_ref: str,
    gate_result: Mapping[str, Any] | None = None,
) -> dict[str, Any]:
    """resolution → completion packet instance skeleton.

    completion_packet_2553plus33.schema.json(정적 meta-schema)을 **충족**하는
    런타임 패킷 instance 를 생성한다. resolver 의 ``concrete`` 는 per-profile
    spec 기술자(instance 아님)이고, 이 함수가 meta-schema 가 검증하는 실제
    패킷 형태를 산출한다 (Codex HIGH fix — contract 가 만족 가능함을 입증).
    """
    return {
        "schema": resolution.completion_packet_schema["concrete"]["schema_id"],
        "task_id": task_id,
        "profile_id": resolution.profile_id,
        "goal_id": resolution.goal_id,
        "final_status": final_status,
        "gate_result": dict(gate_result or {}),
        "hold_evaluated": list(resolution.hold_conditions),
        "evidence_ref": evidence_ref,
        "generated_at_utc": generated_at_utc,
    }


def build_evidence_skeleton(
    resolution: "PolicyResolution",
    *,
    collected_at_utc: str,
    fields: Sequence[Mapping[str, Any]] | None = None,
) -> dict[str, Any]:
    """resolution → evidence instance skeleton (evidence meta-schema 충족).

    fields 미지정 시 gate 조건명을 required evidence field 로 도출 (gate 통과
    입증에 필요한 최소 evidence). (Codex HIGH fix — contract 만족 입증.)
    """
    if fields is None:
        derived = [
            {"name": g.name, "kind": "gate_observation", "required": True}
            for g in resolution.gate
        ]
        fields = derived or [
            {"name": "resolution_status", "kind": "engine_state", "required": True}
        ]
    return {
        "schema": resolution.evidence_schema["concrete"]["schema_id"],
        "profile_id": resolution.profile_id,
        "goal_id": resolution.goal_id,
        "collected_at_utc": collected_at_utc,
        "fields": [dict(f) for f in fields],
    }


def resolve_policy(
    goal_request: Any,
    *,
    profile_json_dir: str | Path = PROFILE_JSON_DIR_DEFAULT,
    profile_schema_dir: str | Path = PROFILE_SCHEMA_DIR_DEFAULT,
    schema_dir: str | Path = SCHEMA_DIR_DEFAULT,
) -> PolicyResolution:
    """END-TO-END core: goal_type + policy_profile + boundary 만으로
    gate / HOLD / allowed / forbidden / evidence schema / completion packet
    schema 자동 산출. 순수 함수 (부작용 0, profile mutation 0).
    """
    gr = parse_goal_request(goal_request, schema_dir=schema_dir)
    pp = gr.get("policy_profile") or {}
    profile_name = pp.get("name") if isinstance(pp, Mapping) else None
    if not profile_name:
        raise PolicyEngineError("policy_profile_name_missing", "policy_profile.name 누락")

    profile = load_policy_profile(
        profile_name,
        profile_json_dir=profile_json_dir,
        profile_schema_dir=profile_schema_dir,
        schema_dir=schema_dir,
    )

    goal_type = resolve_goal_type(gr, profile)
    boundary = resolve_boundary(gr, profile)
    gate = expand_gate(profile)
    hold = expand_hold(profile)
    allowed = expand_allowed_actions(profile, goal_type)
    forbidden = expand_forbidden_actions(profile, boundary)

    # Codex CRITICAL fix: boundary/forbidden 와 표현 mismatch 우회 차단.
    # forbidden(정규화 포함) 와 교집합인 allowed 항목은 제거하고, 교집합이
    # 존재하면 chair-boundary vs profile contract 충돌로 escalate (fail-closed).
    forbidden_set = set(forbidden)
    contradiction = sorted(set(allowed) & forbidden_set)
    pruned_allowed = [a for a in allowed if a not in forbidden_set]
    allowed = pruned_allowed

    cps = resolve_completion_packet_schema(profile, schema_dir=schema_dir)
    evs = resolve_evidence_schema(profile, schema_dir=schema_dir)

    notes: list[str] = []
    status = "RESOLVED"
    hold_reason: str | None = None

    # gate 없는 profile = 불완전 contract → 결정 불가, HOLD (fail-closed §8).
    if not gate:
        status = "HOLD_FOR_CHAIR"
        hold_reason = "profile 에 gate predicate(gate_predicate/merge_ready_predicate) 부재 — 자동 결정 불가"

    # allowed ∩ forbidden 충돌(가지치기 전) = chair-boundary/profile contract
    # 모순 → HOLD (engine 무결성, fail-closed §8). 우회 불가.
    if contradiction:
        status = "HOLD_FOR_CHAIR"
        hold_reason = (
            f"allowed_actions ∩ forbidden_actions 충돌(boundary/profile contract "
            f"모순, 정규화 후): {contradiction}"
        )
        notes.append(f"pruned from allowed (forbidden 우선): {contradiction}")

    return PolicyResolution(
        goal_id=str(gr.get("goal_id", "")),
        goal_type=goal_type,
        profile_id=str(profile.get("profile_id", "")),
        profile_version=str(profile.get("version", "")),
        boundary=boundary,
        gate=gate,
        hold_conditions=hold,
        allowed_actions=allowed,
        forbidden_actions=forbidden,
        completion_packet_schema=cps,
        evidence_schema=evs,
        status=status,
        hold_reason=hold_reason,
        notes=notes,
    )


__all__ = [
    "ENGINE_MODULE",
    "ENGINE_VERSION",
    "META_GOAL_REQUEST",
    "META_POLICY_PROFILE",
    "META_COMPLETION_PACKET",
    "META_EVIDENCE",
    "UNIVERSAL_FORBIDDEN_ACTIONS",
    "UNIVERSAL_ALLOWED_ACTIONS",
    "PolicyEngineError",
    "GateCondition",
    "PolicyResolution",
    "validate_against_meta",
    "load_meta_schema",
    "parse_goal_request",
    "load_policy_profile",
    "resolve_goal_type",
    "resolve_boundary",
    "expand_gate",
    "expand_hold",
    "expand_allowed_actions",
    "expand_forbidden_actions",
    "normalize_boundary_deny",
    "evaluate_hold",
    "resolve_completion_packet_schema",
    "resolve_evidence_schema",
    "build_completion_packet_skeleton",
    "build_evidence_skeleton",
    "resolve_policy",
]
