#!/usr/bin/env python3
"""run_test_only_hardening_pr_merge.py — profile-driven PR merge lifecycle runner.

task-2553+16 / Track 1. Profile-as-code orchestrator binding:

    profile load -> merge-readiness gate -> (BOT merge, head-pinned)
                  -> post-merge smoke -> reconcile -> closeout

Fail-closed by construction. The runner NEVER calls the GitHub merge endpoint
unless the merge_ready_predicate evaluates ALL_PASS *and* the runtime auth
invariant (9-R.5) holds *and* the caller passes --execute. Gate-only is the
default mode and is side-effect free.

9-R bindings enforced here:
  9-R.1  Gemini-thread resolve hard-bound: exactly-1 unresolved + unique
         blocking id + sole-remaining-blocker, else resolve 0 / PRE_MERGE_HOLD.
  9-R.2  review-state fail-closed allowlist {APPROVED, ""}; CLEAN is the only
         affirmative-safe mergeStateStatus.
  9-R.3  liveness budget; on exceed -> PRE_MERGE_HOLD (no spin).
  9-R.4  emits pre-merge-gate.json (pre-merge decision) + result.json (final).
  9-R.5  runtime auth invariant: BOT_GITHUB_TOKEN ghs_ App token only.
  9-R.6  deterministic merge_method: squash > merge ; rebase-only -> HOLD.

stdlib only.
"""

from __future__ import annotations

import argparse
import json
from pathlib import Path
from typing import Any, Optional

WORKSPACE = Path("/home/jay/workspace")
SCHEMA_PATH = WORKSPACE / "schemas/policy_profiles/test_only_hardening_pr_merge_v1.schema.json"
PROFILE_PATH = WORKSPACE / "memory/policy_profiles/test_only_hardening_pr_merge_v1.json"
GATE_PATH = WORKSPACE / "memory/events/task-2553+16.pre-merge-gate.json"
RESULT_PATH = WORKSPACE / "memory/events/task-2553+16.result.json"

PRE_MERGE_HOLD = "PRE_MERGE_HOLD"
POST_MERGE_HOLD = "POST_MERGE_HOLD"
MERGE_READY = "MERGE_READY"
MERGED = "MERGED"


# ---------------------------------------------------------------------------
# profile loader
# ---------------------------------------------------------------------------


def load_profile(profile_path: Path = PROFILE_PATH) -> dict[str, Any]:
    """Load the profile JSON. Fail-closed: missing/invalid -> raise."""
    if not profile_path.exists():
        raise FileNotFoundError(f"profile missing: {profile_path}")
    data = json.loads(profile_path.read_text(encoding="utf-8"))
    if data.get("profile_id") != "test_only_hardening_pr_merge_v1":
        raise ValueError("profile_id mismatch (fail-closed)")
    for key in ("merge_ready_predicate", "hold_conditions", "merge_method",
                "auth", "gemini_thread_resolve", "liveness_budget",
                "irreversibility_policy", "scope_invariants"):
        if key not in data:
            raise ValueError(f"profile missing mandatory section: {key}")
    return data


# ---------------------------------------------------------------------------
# 9-R.5 runtime auth invariant
# ---------------------------------------------------------------------------


def auth_ok(token: Optional[str], profile: dict[str, Any]) -> tuple[bool, str]:
    """True only if token is the pre-configured taskctl-bot App token.

    OWNER PAT / personal PAT (ghp_/gho_/pat) / new token / absent / empty
    -> fail-closed (False).
    """
    expected_prefix = profile["auth"]["token_prefix_expected"]
    if not token:
        return False, "auth_absent_or_empty"
    if token.startswith(("ghp_", "gho_", "github_pat_")):
        return False, "owner_or_personal_pat_detected"
    if not token.startswith(expected_prefix):
        return False, f"unexpected_token_prefix(expected {expected_prefix})"
    return True, "github_app_installation_token"


# ---------------------------------------------------------------------------
# 9-R.6 deterministic merge_method
# ---------------------------------------------------------------------------


def resolve_merge_method(repo_allow: dict[str, bool],
                         profile: dict[str, Any]) -> tuple[Optional[str], dict[str, Any]]:
    """squash > merge ; rebase-only -> HOLD (None)."""
    prio = profile["merge_method"]["resolution_priority"]
    provenance = {
        "repo_allow_squash_merge": bool(repo_allow.get("squash")),
        "repo_allow_merge_commit": bool(repo_allow.get("merge")),
        "repo_allow_rebase_merge": bool(repo_allow.get("rebase")),
        "resolution_priority": prio,
    }
    if repo_allow.get("squash"):
        provenance["selected"] = "squash"
        return "squash", provenance
    if repo_allow.get("merge"):
        provenance["selected"] = "merge"
        return "merge", provenance
    if repo_allow.get("rebase"):
        provenance["selected"] = None
        provenance["hold_reason"] = "rebase_only_forbidden"
        return None, provenance
    provenance["selected"] = None
    provenance["hold_reason"] = "no_merge_method_available"
    return None, provenance


# ---------------------------------------------------------------------------
# 9-R.1 Gemini-thread resolve eligibility
# ---------------------------------------------------------------------------


def thread_resolve_eligible(unresolved_count: int,
                            unique_blocking_identified: bool,
                            other_blockers_present: bool,
                            profile: dict[str, Any]) -> tuple[bool, str]:
    """Resolve permitted only when: unresolved == exactly 1, the single
    blocking thread is uniquely identified, it is the sole remaining
    blocker. Otherwise resolve 0.
    """
    cfg = profile["gemini_thread_resolve"]
    if unresolved_count != cfg["require_exact_unresolved_count"]:
        return False, f"unresolved_count={unresolved_count}_not_exactly_1"
    if not unique_blocking_identified:
        return False, "blocking_thread_not_uniquely_identified"
    if other_blockers_present:
        return False, "other_blockers_present"
    return True, "eligible_single_resolve"


# ---------------------------------------------------------------------------
# merge_ready_predicate evaluation
# ---------------------------------------------------------------------------


def evaluate_predicate(obs: dict[str, Any], profile: dict[str, Any]) -> dict[str, Any]:
    """Evaluate the AND-gate predicate against an observation dict.

    obs keys: mergeable, merge_state_status, review_decision, ci_all_success,
              unresolved_review_threads, effective_diff_test_only,
              production_byte0_sha256, head_sha
    """
    p = profile["merge_ready_predicate"]
    checks: dict[str, dict[str, Any]] = {}

    checks["mergeable_eq"] = {
        "expected": p["mergeable_eq"],
        "measured": obs.get("mergeable"),
        "pass": obs.get("mergeable") == p["mergeable_eq"],
    }
    checks["merge_state_status_clean"] = {
        "expected_in": p["merge_state_status_in"],
        "measured": obs.get("merge_state_status"),
        "pass": obs.get("merge_state_status") in p["merge_state_status_in"],
    }
    rd = obs.get("review_decision")
    rd_norm = "" if rd in (None, "") else rd
    checks["review_decision_allowlist"] = {
        "allowlist": p["review_decision_allowlist"],
        "measured": rd_norm,
        "pass": rd_norm in p["review_decision_allowlist"],
    }
    checks["ci_all_success"] = {
        "expected": True,
        "measured": obs.get("ci_all_success"),
        "pass": obs.get("ci_all_success") is True,
    }
    checks["unresolved_review_threads_eq_0"] = {
        "expected": 0,
        "measured": obs.get("unresolved_review_threads"),
        "pass": obs.get("unresolved_review_threads") == 0,
    }
    checks["effective_diff_test_only"] = {
        "expected": True,
        "measured": obs.get("effective_diff_test_only"),
        "pass": obs.get("effective_diff_test_only") is True,
    }
    checks["production_byte0"] = {
        "expected_sha256": p["production_byte0"]["baseline_sha256"],
        "measured": obs.get("production_byte0_sha256"),
        "pass": obs.get("production_byte0_sha256") == p["production_byte0"]["baseline_sha256"],
    }
    checks["head_sha_eq_sanctioned"] = {
        "expected": profile["target"]["sanctioned_head_sha"],
        "measured": obs.get("head_sha"),
        "pass": obs.get("head_sha") == profile["target"]["sanctioned_head_sha"],
    }

    all_pass = all(c["pass"] for c in checks.values())
    failed = [k for k, c in checks.items() if not c["pass"]]
    return {"checks": checks, "ALL_PASS": all_pass, "failed": failed}


def decide(obs: dict[str, Any], profile: dict[str, Any],
           token: Optional[str], repo_allow: dict[str, bool]) -> dict[str, Any]:
    """Full pre-merge decision. Returns a gate-decision dict (no side effects)."""
    pred = evaluate_predicate(obs, profile)
    ok_auth, auth_reason = auth_ok(token, profile)
    method, method_prov = resolve_merge_method(repo_allow, profile)

    decision = MERGE_READY if pred["ALL_PASS"] else PRE_MERGE_HOLD
    reasons: list[str] = []

    if not pred["ALL_PASS"]:
        reasons.append("predicate_fail:" + ",".join(pred["failed"]))
    if not ok_auth:
        decision = PRE_MERGE_HOLD
        reasons.append("auth_fail_closed:" + auth_reason)
    if method is None:
        decision = PRE_MERGE_HOLD
        reasons.append("merge_method:" + method_prov.get("hold_reason", "unresolved"))

    # 9-R.1 thread-resolve gating note (advisory; does not auto-merge)
    urc = obs.get("unresolved_review_threads")
    resolve_elig = None
    if urc not in (0, None):
        resolve_elig, resolve_reason = thread_resolve_eligible(
            urc,
            obs.get("unique_blocking_identified", False),
            obs.get("other_blockers_present", True),
            profile,
        )
        reasons.append(f"thread_resolve:{resolve_reason}")

    return {
        "decision": decision,
        "ALL_PASS": pred["ALL_PASS"],
        "predicate": pred,
        "auth": {"ok": ok_auth, "reason": auth_reason},
        "merge_method_provenance": method_prov,
        "merge_method_selected": method,
        "thread_resolve_eligible": resolve_elig,
        "reasons": reasons,
    }


# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------


def main(argv: Optional[list[str]] = None) -> int:
    ap = argparse.ArgumentParser()
    ap.add_argument("--observation", help="path to observation JSON")
    ap.add_argument("--token-env", default="BOT_GITHUB_TOKEN")
    ap.add_argument("--execute", action="store_true",
                    help="permit BOT merge if and only if MERGE_READY (default: gate-only)")
    ap.add_argument("--emit-gate", default=str(GATE_PATH))
    args = ap.parse_args(argv)

    import os

    profile = load_profile()
    if not args.observation:
        print(json.dumps({"error": "no_observation_supplied", "decision": PRE_MERGE_HOLD}))
        return 2
    obs = json.loads(Path(args.observation).read_text(encoding="utf-8"))
    token = os.environ.get(args.token_env)
    repo_allow = obs.get("repo_allow", {})

    gate = decide(obs, profile, token, repo_allow)
    Path(args.emit_gate).write_text(
        json.dumps(gate, indent=2, ensure_ascii=False), encoding="utf-8")

    if gate["decision"] != MERGE_READY:
        # pre-merge HOLD: merge call 0 (irreversibility doctrine).
        print(json.dumps({"decision": gate["decision"], "reasons": gate["reasons"]}))
        return 0

    if not args.execute:
        print(json.dumps({"decision": MERGE_READY, "note": "gate-only; --execute withheld"}))
        return 0

    # Execution path is intentionally gated and not auto-driven here; the
    # orchestrating task performs the head-pinned `gh api PUT .../merge`
    # call with the sanctioned token only after re-measuring the predicate
    # (TOCTOU removal). Fail-closed default.
    print(json.dumps({"decision": MERGE_READY, "note": "execute requested; defer to orchestrator head-pinned merge"}))
    return 0


if __name__ == "__main__":
    raise SystemExit(main())
