"""tests/regression/test_policy_profile_engine_2553plus33.py — task-2553+33 Track C1 회귀.

Spec: memory/tasks/task-2553+33.md
(sha256 8c120eb9f03429a9e03ba30f1e5e1a66a7919b430d31bef9988a17e6932244dd).

회장 §3 필수구현 10건 + §8 HOLD + §7 invariant 검증:

 1. goal_request parser (정적 meta-schema 검증, fail-closed)
 2. policy_profile loader (read-only, 이중 검증)
 3. goal_type resolver
 4. boundary resolver
 5. gate expansion (기존 단일 profile 의미 보존)
 6. HOLD expansion
 7. allowed / forbidden expansion (universal forbidden 박제)
 8. completion packet schema resolver (정적 meta-schema 참조)
 9. evidence schema resolver (정적 meta-schema 참조)
10. decision JSON 산출 + batch coordinator 연결 어댑터
11. §8 HOLD: gate 부재 / allowed∩forbidden 충돌
12. 기존 단일 profile(test_only_hardening_pr_merge_v1) mutation 0 (semantic preserve §8)

모든 테스트 100% offline — network / git / GitHub API 호출 0건, profile write 0건.
"""

from __future__ import annotations

import copy
import json
import sys
from pathlib import Path

import pytest

# workspace root → sys.path (기존 regression 테스트 패턴 준수)
WORKSPACE = Path(__file__).resolve().parent.parent.parent
if str(WORKSPACE) in sys.path:
    sys.path.remove(str(WORKSPACE))
sys.path.insert(0, str(WORKSPACE))

from anu_v3.policy_profile_engine import (  # noqa: E402  # pyright: ignore[reportMissingImports]
    ENGINE_MODULE,
    META_COMPLETION_PACKET,
    META_EVIDENCE,
    UNIVERSAL_FORBIDDEN_ACTIONS,
    PolicyEngineError,
    build_completion_packet_skeleton,
    build_evidence_skeleton,
    evaluate_hold,
    expand_forbidden_actions,
    expand_gate,
    expand_hold,
    load_meta_schema,
    load_policy_profile,
    normalize_boundary_deny,
    parse_goal_request,
    resolve_boundary,
    resolve_goal_type,
    resolve_policy,
    validate_against_meta,
)

SCHEMA_DIR = WORKSPACE / "schemas"
PROFILE_JSON_DIR = WORKSPACE / "memory" / "policy_profiles"
PROFILE_SCHEMA_DIR = WORKSPACE / "schemas" / "policy_profiles"
PROFILE_NAME = "test_only_hardening_pr_merge_v1"


def _goal_request(**over):
    base = {
        "goal_id": "task-2553+33-c1",
        "goal_statement": "auto-derive gate/HOLD/allowed/forbidden from profile",
        "boundary": ["forbid:production_code_change", "no:pr_branch_main_write"],
        "policy_profile": {"name": PROFILE_NAME},
    }
    base.update(over)
    return base


def _resolve(**over):
    return resolve_policy(
        _goal_request(**over),
        profile_json_dir=PROFILE_JSON_DIR,
        profile_schema_dir=PROFILE_SCHEMA_DIR,
        schema_dir=SCHEMA_DIR,
    )


# ---------------------------------------------------------------------------
# 1. goal_request parser
# ---------------------------------------------------------------------------
def test_parse_goal_request_ok():
    gr = parse_goal_request(_goal_request(), schema_dir=SCHEMA_DIR)
    assert gr["goal_id"] == "task-2553+33-c1"
    assert isinstance(gr["boundary"], list)


def test_parse_goal_request_fail_closed():
    with pytest.raises(PolicyEngineError) as ei:
        parse_goal_request({"goal_id": "x"}, schema_dir=SCHEMA_DIR)
    assert ei.value.code == "goal_request_schema_fail"


def test_parse_goal_request_not_mapping():
    with pytest.raises(PolicyEngineError) as ei:
        parse_goal_request("not-a-dict", schema_dir=SCHEMA_DIR)
    assert ei.value.code == "goal_request_not_mapping"


# ---------------------------------------------------------------------------
# 2. policy_profile loader (read-only, 이중 검증)
# ---------------------------------------------------------------------------
def test_load_policy_profile_ok():
    p = load_policy_profile(
        PROFILE_NAME,
        profile_json_dir=PROFILE_JSON_DIR,
        profile_schema_dir=PROFILE_SCHEMA_DIR,
        schema_dir=SCHEMA_DIR,
    )
    assert p["profile_id"] == PROFILE_NAME
    assert p["version"] == "v1"


def test_load_policy_profile_missing():
    with pytest.raises(PolicyEngineError) as ei:
        load_policy_profile(
            "no_such_profile",
            profile_json_dir=PROFILE_JSON_DIR,
            profile_schema_dir=PROFILE_SCHEMA_DIR,
            schema_dir=SCHEMA_DIR,
        )
    assert ei.value.code == "profile_load_fail"


# ---------------------------------------------------------------------------
# 3. goal_type resolver
# ---------------------------------------------------------------------------
def test_resolve_goal_type_explicit_wins():
    gr = {"goal_type": "custom_lifecycle"}
    assert resolve_goal_type(gr, {"profile_id": PROFILE_NAME}) == "custom_lifecycle"


def test_resolve_goal_type_inferred():
    assert resolve_goal_type({}, {"profile_id": PROFILE_NAME}) == "pr_merge_lifecycle"
    assert resolve_goal_type({}, {"profile_id": "something_else"}) == "generic_goal"


# ---------------------------------------------------------------------------
# 4. boundary resolver
# ---------------------------------------------------------------------------
def test_resolve_boundary_merges_profile_scope():
    p = load_policy_profile(
        PROFILE_NAME,
        profile_json_dir=PROFILE_JSON_DIR,
        profile_schema_dir=PROFILE_SCHEMA_DIR,
        schema_dir=SCHEMA_DIR,
    )
    b = resolve_boundary(_goal_request(), p)
    assert "forbid:production_code_change" in b["explicit"]
    assert b["live_ws_path"] == "/home/jay/workspace"
    assert b["live_ws_sanctioned_commit"] == "20456b5f83fc039f2fd6f50f4b94095c29b41bfb"
    assert any("owner_trigger_pat" in r for r in b["forbidden_paths_regex"])


# ---------------------------------------------------------------------------
# 5. gate expansion — 기존 단일 profile 의미 보존
# ---------------------------------------------------------------------------
def test_expand_gate_preserves_existing_profile():
    p = load_policy_profile(
        PROFILE_NAME,
        profile_json_dir=PROFILE_JSON_DIR,
        profile_schema_dir=PROFILE_SCHEMA_DIR,
        schema_dir=SCHEMA_DIR,
    )
    gate = expand_gate(p)
    names = {g.name for g in gate}
    # task-2553+16 가 손으로 풀던 8 predicate 가 엔진으로 동일 산출.
    assert {
        "mergeable_eq",
        "merge_state_status_in",
        "review_decision_allowlist",
        "ci_all_success",
        "unresolved_review_threads_eq",
        "effective_diff_test_only",
        "production_byte0",
        "head_sha_eq_sanctioned",
    } == names
    by = {g.name: g.expected for g in gate}
    assert by["mergeable_eq"] == "MERGEABLE"
    assert by["ci_all_success"] is True
    assert by["unresolved_review_threads_eq"] == 0


def test_expand_gate_prefers_generic_gate_predicate():
    gate = expand_gate({"gate_predicate": {"x": 1}, "merge_ready_predicate": {"y": 2}})
    assert [g.name for g in gate] == ["x"]


# ---------------------------------------------------------------------------
# 6. HOLD expansion
# ---------------------------------------------------------------------------
def test_expand_hold_any_true():
    p = load_policy_profile(
        PROFILE_NAME,
        profile_json_dir=PROFILE_JSON_DIR,
        profile_schema_dir=PROFILE_SCHEMA_DIR,
        schema_dir=SCHEMA_DIR,
    )
    hold = expand_hold(p)
    assert "codex_high_or_critical" in hold
    assert "critical7" in hold
    assert "credential_or_permission_expansion" in hold


# ---------------------------------------------------------------------------
# 7. allowed / forbidden expansion — universal forbidden 박제 (§7)
# ---------------------------------------------------------------------------
def test_universal_forbidden_verbatim_present():
    r = _resolve()
    for f in UNIVERSAL_FORBIDDEN_ACTIONS:
        assert f in r.forbidden_actions
    assert "production_code_change" in r.forbidden_actions
    assert "merge_without_gate_pass" in r.forbidden_actions
    assert "callback_mandatory_rule_weaken" in r.forbidden_actions
    assert "batch_coordinator_original_destroy" in r.forbidden_actions


def test_forbidden_includes_profile_scope_regex():
    p = load_policy_profile(
        PROFILE_NAME,
        profile_json_dir=PROFILE_JSON_DIR,
        profile_schema_dir=PROFILE_SCHEMA_DIR,
        schema_dir=SCHEMA_DIR,
    )
    b = resolve_boundary(_goal_request(), p)
    forb = expand_forbidden_actions(p, b)
    assert any("owner_trigger_pat" in f for f in forb)


def test_allowed_actions_derived():
    r = _resolve()
    assert "gate_evaluate" in r.allowed_actions
    assert "completion_packet_emit" in r.allowed_actions
    assert any(a.startswith("post_merge:") for a in r.allowed_actions)
    assert "review_thread_resolve:bounded" in r.allowed_actions


# ---------------------------------------------------------------------------
# 8 / 9. completion packet & evidence schema resolver (정적 meta-schema 참조)
# ---------------------------------------------------------------------------
def test_completion_packet_resolver_static_ref():
    r = _resolve()
    ref = r.completion_packet_schema["meta_schema_ref"]
    assert ref.endswith(META_COMPLETION_PACKET)
    assert Path(ref).exists()
    assert r.completion_packet_schema["concrete"]["profile_id"] == PROFILE_NAME


def test_evidence_resolver_static_ref():
    r = _resolve()
    ref = r.evidence_schema["meta_schema_ref"]
    assert ref.endswith(META_EVIDENCE)
    assert Path(ref).exists()
    assert r.evidence_schema["concrete"]["profile_id"] == PROFILE_NAME


# ---------------------------------------------------------------------------
# 10. decision JSON + batch coordinator 연결 어댑터
# ---------------------------------------------------------------------------
def test_decision_dict_shape_and_json_serializable():
    r = _resolve()
    d = r.to_decision_dict()
    assert d["schema"] == "anu_v3.policy_profile_engine.decision.v1"
    assert d["engine"] == ENGINE_MODULE
    assert d["status"] == "RESOLVED"
    assert d["profile_id"] == PROFILE_NAME
    assert d["gate_semantics"].startswith("AND")
    assert "hold_trigger_conditions" in d
    assert "HOLD" in d["hold_semantics"]
    # 완전 JSON 직렬화 가능 (decision/result JSON 산출 §3).
    json.dumps(d)


def test_coordinator_binding_pure_dict():
    r = _resolve()
    cb = r.to_coordinator_binding()
    assert cb["profile_id"] == PROFILE_NAME
    assert cb["status"] == "RESOLVED"
    assert isinstance(cb["gate_condition_names"], list) and cb["gate_condition_names"]
    assert isinstance(cb["forbidden_actions"], list)
    json.dumps(cb)


# ---------------------------------------------------------------------------
# 11. §8 HOLD paths
# ---------------------------------------------------------------------------
def test_hold_when_no_gate(tmp_path):
    # gate 없는 in-memory profile → HOLD_FOR_CHAIR (fail-closed).
    pj = tmp_path / "memory" / "policy_profiles"
    pj.mkdir(parents=True)
    (pj / "no_gate_v1.json").write_text(
        json.dumps({"profile_id": "no_gate_v1", "version": "v1"}), encoding="utf-8"
    )
    r = resolve_policy(
        _goal_request(policy_profile={"name": "no_gate_v1"}),
        profile_json_dir=pj,
        profile_schema_dir=tmp_path / "noschemas",
        schema_dir=SCHEMA_DIR,
    )
    assert r.status == "HOLD_FOR_CHAIR"
    assert "gate predicate" in (r.hold_reason or "")


def test_hold_when_allowed_forbidden_conflict(tmp_path):
    pj = tmp_path / "memory" / "policy_profiles"
    pj.mkdir(parents=True)
    (pj / "conflict_v1.json").write_text(
        json.dumps(
            {
                "profile_id": "conflict_v1",
                "version": "v1",
                "gate_predicate": {"ok": True},
                "allowed_actions": ["production_code_change"],
                "forbidden_actions": ["production_code_change"],
            }
        ),
        encoding="utf-8",
    )
    r = resolve_policy(
        _goal_request(policy_profile={"name": "conflict_v1"}),
        profile_json_dir=pj,
        profile_schema_dir=tmp_path / "noschemas",
        schema_dir=SCHEMA_DIR,
    )
    assert r.status == "HOLD_FOR_CHAIR"
    assert "충돌" in (r.hold_reason or "")


# ---------------------------------------------------------------------------
# 12. 기존 단일 profile mutation 0 (semantic preserve §8) + meta-schema 정합
# ---------------------------------------------------------------------------
def test_existing_profile_byte0_after_resolve():
    pj = PROFILE_JSON_DIR / f"{PROFILE_NAME}.json"
    ps = PROFILE_SCHEMA_DIR / f"{PROFILE_NAME}.schema.json"
    before_j = pj.read_bytes()
    before_s = ps.read_bytes()
    _resolve()  # 엔진 전체 실행
    assert pj.read_bytes() == before_j, "profile JSON mutation 발생 — §7 위반"
    assert ps.read_bytes() == before_s, "profile schema mutation 발생 — §7 위반"


def test_generic_meta_validates_existing_profile():
    profile = json.loads((PROFILE_JSON_DIR / f"{PROFILE_NAME}.json").read_text())
    meta = load_meta_schema("policy_profile_2553plus33.schema.json", schema_dir=SCHEMA_DIR)
    assert validate_against_meta(profile, meta) == [], "generic meta 가 기존 profile 거부 — 의미 파괴"


def test_subset_validator_catches_violation():
    meta = {
        "type": "object",
        "required": ["a"],
        "additionalProperties": False,
        "properties": {"a": {"type": "string", "const": "x"}},
    }
    assert validate_against_meta({"a": "x"}, meta) == []
    assert validate_against_meta({"a": "y"}, meta)  # const 위반
    assert validate_against_meta({"b": 1}, meta)  # required 누락 + additionalProps


def test_resolve_does_not_mutate_input_request():
    gr = _goal_request()
    snap = copy.deepcopy(gr)
    resolve_policy(
        gr,
        profile_json_dir=PROFILE_JSON_DIR,
        profile_schema_dir=PROFILE_SCHEMA_DIR,
        schema_dir=SCHEMA_DIR,
    )
    assert gr == snap, "resolve_policy 가 입력 goal_request mutate — 순수성 위반"


# ---------------------------------------------------------------------------
# 13. Codex CRITICAL fix — boundary deny 표현 mismatch 우회 차단
# ---------------------------------------------------------------------------
def test_normalize_boundary_deny():
    assert normalize_boundary_deny("forbid:completion_packet_emit") == "completion_packet_emit"
    assert normalize_boundary_deny("NO: gate_evaluate") == "gate_evaluate"
    assert normalize_boundary_deny("deny:x") == "x"
    assert normalize_boundary_deny("plain_action") is None
    assert normalize_boundary_deny("forbid:") is None


def test_boundary_deny_not_bypassed():
    # boundary 가 universal-allowed 행위를 forbid → allowed 에서 제거 + HOLD escalate.
    r = _resolve(boundary=["forbid:completion_packet_emit"])
    assert "completion_packet_emit" not in r.allowed_actions, "boundary 우회됨 — CRITICAL 회귀"
    assert "completion_packet_emit" in r.forbidden_actions
    assert r.status == "HOLD_FOR_CHAIR"
    assert "충돌" in (r.hold_reason or "")


def test_boundary_tightening_no_false_hold():
    # 기존 케이스(profile 이 grant 안 하는 행위 forbid) → RESOLVED 유지.
    r = _resolve(boundary=["forbid:production_code_change", "no:pr_branch_main_write"])
    assert r.status == "RESOLVED"
    assert "production_code_change" in r.forbidden_actions


# ---------------------------------------------------------------------------
# 14. Codex HIGH fix — 런타임 HOLD 평가기 (정의-시점 vs 런타임 발생 분리)
# ---------------------------------------------------------------------------
def test_evaluate_hold_runtime_signal_triggers():
    r = _resolve()
    # resolve-time: contract 도출만 → RESOLVED (런타임 발생 미단정).
    assert r.status == "RESOLVED"
    # 런타임 신호 발생 → HOLD.
    hold, fired = evaluate_hold(r.hold_conditions, {"critical7": True})
    assert hold is True
    assert "critical7" in fired
    # 무신호 → no HOLD.
    hold2, fired2 = evaluate_hold(r.hold_conditions, {})
    assert hold2 is False and fired2 == []


def test_decision_dict_hold_labeling_precise():
    r = _resolve()
    d = r.to_decision_dict()
    assert "hold_trigger_conditions" in d
    assert "definition-time" in d["hold_semantics"]
    assert "evaluate_hold" in d["hold_semantics"]


# ---------------------------------------------------------------------------
# 15. Codex HIGH fix — packet/evidence skeleton 이 정적 meta-schema 충족 입증
# ---------------------------------------------------------------------------
def test_completion_packet_skeleton_satisfies_meta():
    r = _resolve()
    pkt = build_completion_packet_skeleton(
        r,
        task_id="task-2553+33",
        final_status="RESOLVED",
        generated_at_utc="2026-05-18T00:00Z",
        evidence_ref="memory/events/task-2553+33.evidence.json",
        gate_result={"ALL_PASS": True},
    )
    meta = load_meta_schema("completion_packet_2553plus33.schema.json", schema_dir=SCHEMA_DIR)
    assert validate_against_meta(pkt, meta) == [], "completion packet skeleton 이 meta 미충족"
    json.dumps(pkt)


def test_evidence_skeleton_satisfies_meta():
    r = _resolve()
    ev = build_evidence_skeleton(r, collected_at_utc="2026-05-18T00:00Z")
    meta = load_meta_schema("evidence_2553plus33.schema.json", schema_dir=SCHEMA_DIR)
    assert validate_against_meta(ev, meta) == [], "evidence skeleton 이 meta 미충족"
    assert len(ev["fields"]) >= 1
    json.dumps(ev)


if __name__ == "__main__":
    raise SystemExit(pytest.main([__file__, "-q"]))
