#!/usr/bin/env python3
"""task-2609 Track F — AUTO_PR_AND_MERGE_READINESS_DRY_RUN entrypoint.

Verifies automated-PR-creation + automated-merge readiness purely via
read-only consumption + dry-run + fixture evaluation.

HARD CONSTRAINTS (회장 verbatim §2/§7):
  * read-only · dry-run · proposal-only
  * NO GitHub write · NO PR open · NO branch/commit/push · NO merge
  * NO credential raw exposure (BOT_GITHUB_TOKEN redacted preflight only)
  * NO OWNER PAT usage
  * additive-only: writes only task-2609.* allowlisted paths (§6)
  * the 10 consume modules are inspected byte-0 (sha256 + AST symbol probe,
    never executed) — no network, no subprocess, no mutation surface

This module is import-safe (no side effects on import) so the regression
suite can drive judge_case()/evaluate_conditions() with mock fixtures only.
"""
from __future__ import annotations

import argparse
import ast
import datetime as _dt
import hashlib
import json
import os
from pathlib import Path

REPO_ROOT = Path(__file__).resolve().parents[1]

# ── §3 read-only consume targets (byte-0) ───────────────────────────────────
CONSUME_MODULES = [
    "utils/merge_queue_executor.py",
    "utils/replacement_pr_runner.py",
    "utils/post_merge_smoke_runner.py",
    "utils/bot_merge_identity.py",
    "scripts/refresh_bot_token.py",
    "utils/critical_escalation_reporter.py",
    "utils/merge_topology_gate.py",
    "utils/automation_contracts.py",
    "scripts/run_post_merge_reconcile_closeout.py",
    "utils/gemini_gate_validator.py",
]

# Gating-API symbols expected to exist in each module (AST-probed, not run).
EXPECTED_SYMBOLS = {
    "utils/merge_queue_executor.py": ["evaluate_pr", "verify_head_lock_then_merge"],
    "utils/replacement_pr_runner.py": ["ReplacementPRRunner"],
    "utils/post_merge_smoke_runner.py": ["run_post_merge_smoke"],
    "utils/bot_merge_identity.py": ["classify_token_source"],
    "scripts/refresh_bot_token.py": ["request_installation_token"],
    "utils/critical_escalation_reporter.py": ["process_event"],
    "utils/merge_topology_gate.py": ["classify"],
    "utils/automation_contracts.py": ["CriticalEscalationType"],
    "scripts/run_post_merge_reconcile_closeout.py": ["reconcile"],
    "utils/gemini_gate_validator.py": ["evaluate_gate"],
}

# ── §4 the 14 automated-merge conditions (회장 verbatim, ordered) ────────────
CONDITION_KEYS = [
    "queue_head",                       # 1
    "expected_files_exact_match",       # 2
    "forbidden_path_zero",              # 3
    "effective_diff_contamination_zero",# 4
    "dependency_satisfied",             # 5
    "serial_only_collision_zero",       # 6
    "ci_all_success",                   # 7
    "gemini_unresolved_zero",           # 8
    "merge_state_clean",                # 9
    "head_sha_lock_match",              # 10
    "bot_token_is_ghs_app",             # 11
    "owner_pat_usage_zero",             # 12
    "post_merge_smoke_configured",      # 13
    "reconcile_evidence_configured",    # 14
]

# Critical7 escalation types (mirror of utils/automation_contracts.py).
C7_FORBIDDEN_PATH = "FORBIDDEN_PATH_INTRUSION"
C7_REPLACEMENT_CONTAMINATED = "REPLACEMENT_PR_AUTO_CREATION_FAILED_FOR_CONTAMINATED_DIFF"
C7_GEMINI_REAL_BUG = "GEMINI_REAL_BUG_REQUIRES_SCOPE_EXPANSION"
C7_BLOCK_OVERRIDE = "BLOCK_OVERRIDE_REQUIRED_OR_REASON_INSUFFICIENT"
C7_DEP_CYCLE = "DEPENDENCY_CYCLE_OR_SERIAL_ONLY_COLLISION"
C7_REPLACEMENT_FAILED = "REPLACEMENT_PR_FAILED"
C7_SMOKE_FAILED = "POST_MERGE_SMOKE_FAILED"

VERDICTS = {
    "MERGE_CANDIDATE_READY",
    "REPLACEMENT_REQUIRED",
    "HOLD",
    "WAIT",
    "FAIL_CLOSED",
    "CRITICAL7",
}


def _utcnow() -> str:
    return _dt.datetime.now(_dt.timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")


# ── read-only consume (byte-0): sha256 + AST symbol probe, never executed ────
def consume_modules_readonly() -> dict:
    out = {"mode": "read-only byte-0 (sha256 + ast.parse, no exec/import/net)",
           "modules": [], "all_present": True, "all_symbols_ok": True}
    for rel in CONSUME_MODULES:
        p = REPO_ROOT / rel
        rec = {"path": rel, "present": p.is_file()}
        if not p.is_file():
            out["all_present"] = False
            out["all_symbols_ok"] = False
            rec["sha256"] = None
            rec["symbols_found"] = []
            out["modules"].append(rec)
            continue
        raw = p.read_bytes()
        rec["sha256"] = hashlib.sha256(raw).hexdigest()
        rec["bytes"] = len(raw)
        found = []
        try:
            tree = ast.parse(raw.decode("utf-8", "replace"), filename=rel)
            names = {n.name for n in ast.walk(tree)
                     if isinstance(n, (ast.FunctionDef, ast.AsyncFunctionDef, ast.ClassDef))}
            wanted = EXPECTED_SYMBOLS.get(rel, [])
            found = [s for s in wanted if s in names]
            rec["symbols_expected"] = wanted
            rec["symbols_found"] = found
            if set(found) != set(wanted):
                out["all_symbols_ok"] = False
        except SyntaxError as e:  # pragma: no cover - defensive
            rec["ast_error"] = str(e)
            out["all_symbols_ok"] = False
        out["modules"].append(rec)
    return out


# ── §3.4/§3.5 redacted BOT token preflight (NEVER raw) ──────────────────────
def redacted_token_preflight(env=None) -> dict:
    env = dict(os.environ) if env is None else dict(env)
    tok = env.get("BOT_GITHUB_TOKEN", "") or ""
    present = bool(tok)
    if tok.startswith("ghs_"):
        source = "GITHUB_APP_INSTALLATION_TOKEN"
        prefix_class = "ghs_"
    elif tok.startswith(("ghp_", "github_pat_", "gho_")):
        source = "OWNER_PAT"
        prefix_class = "pat"
    elif tok:
        source = "UNKNOWN"
        prefix_class = "other"
    else:
        source = "ABSENT"
        prefix_class = "none"
    owner_pat_env_names = [k for k in ("OWNER_PAT", "PAT_TOKEN", "GITHUB_PAT")
                           if env.get(k)]
    return {
        "bot_token_present": present,
        "prefix_class": prefix_class,              # never the token itself
        "fingerprint_sha256_8": (hashlib.sha256(tok.encode()).hexdigest()[:8]
                                 if tok else None),
        "classified_source": source,
        "is_ghs_app_token": source == "GITHUB_APP_INSTALLATION_TOKEN",
        "owner_pat_detected": source == "OWNER_PAT" or bool(owner_pat_env_names),
        "owner_pat_env_names_set": owner_pat_env_names,
        "raw_exposed": False,                      # invariant: always False
    }


# ── §4 condition evaluation from a mock fixture case (dry-run, no live I/O) ──
def evaluate_conditions(inp: dict) -> dict:
    """Map a mock fixture's inputs onto the 14 boolean conditions."""
    c = {
        "queue_head": bool(inp.get("queue_head", True)),
        "expected_files_exact_match": bool(inp.get("expected_files_exact_match", True)),
        "forbidden_path_zero": int(inp.get("forbidden_path_count", 0)) == 0,
        "effective_diff_contamination_zero":
            not bool(inp.get("effective_diff_contamination", False)),
        "dependency_satisfied": (bool(inp.get("dependency_satisfied", True))
                                 and not bool(inp.get("dependency_cycle", False))),
        "serial_only_collision_zero":
            not bool(inp.get("serial_only_collision", False)),
        "ci_all_success": bool(inp.get("ci_all_success", True)),
        "gemini_unresolved_zero": int(inp.get("gemini_unresolved_count", 0)) == 0,
        "merge_state_clean": str(inp.get("merge_state_status", "CLEAN")) == "CLEAN",
        "head_sha_lock_match": bool(inp.get("head_sha_lock_match", True)),
        "bot_token_is_ghs_app":
            str(inp.get("bot_token_source", "")) == "GITHUB_APP_INSTALLATION_TOKEN",
        "owner_pat_usage_zero": not bool(inp.get("owner_pat_detected", False)),
        "post_merge_smoke_configured":
            bool(inp.get("post_merge_smoke_configured", True)),
        "reconcile_evidence_configured":
            bool(inp.get("reconcile_evidence_configured", True)),
    }
    return c


def judge_case(case: dict) -> dict:
    """Resolve a mock fixture case to a dry-run verdict by safety precedence.

    Precedence (highest first): FAIL_CLOSED > CRITICAL7 > REPLACEMENT_REQUIRED
    > HOLD > WAIT > MERGE_CANDIDATE_READY.
    """
    inp = case.get("inputs", {})
    cond = evaluate_conditions(inp)
    reasons: list[str] = []
    verdict = None
    critical7 = None

    # 1) FAIL_CLOSED — security/safety hard stops (reg 8, 10)
    if not cond["head_sha_lock_match"]:
        verdict, reasons = "FAIL_CLOSED", reasons + ["HEAD SHA lock mismatch"]
    elif not cond["owner_pat_usage_zero"]:
        verdict, reasons = "FAIL_CLOSED", reasons + ["OWNER PAT detected"]

    # 2) CRITICAL7 (reg 3, 13, 14, 15, gemini real bug)
    if verdict is None:
        if not cond["forbidden_path_zero"]:
            verdict, critical7 = "CRITICAL7", C7_FORBIDDEN_PATH
            reasons.append("forbidden path > 0")
        elif inp.get("dependency_cycle") or inp.get("serial_only_collision"):
            verdict, critical7 = "CRITICAL7", C7_DEP_CYCLE
            reasons.append("dependency cycle / serial_only collision")
        elif inp.get("replacement_pr_outcome") == "failure":
            verdict, critical7 = "CRITICAL7", C7_REPLACEMENT_FAILED
            reasons.append("replacement PR failure")
        elif inp.get("post_merge_smoke_outcome") == "fail":
            verdict, critical7 = "CRITICAL7", C7_SMOKE_FAILED
            reasons.append("post-merge smoke failure")
        elif inp.get("gemini_severity") == "real_bug":
            verdict, critical7 = "CRITICAL7", C7_GEMINI_REAL_BUG
            reasons.append("Gemini real bug requires scope expansion")

    # 3) REPLACEMENT_REQUIRED — contaminated diff routes to replacement PR path
    if verdict is None and not cond["effective_diff_contamination_zero"]:
        verdict = "REPLACEMENT_REQUIRED"
        reasons.append("effective diff contamination → replacement PR path")

    # 4) HOLD — expected_files mismatch / token absent / smoke|reconcile missing
    if verdict is None:
        if not cond["expected_files_exact_match"]:
            verdict = "REPLACEMENT_REQUIRED" if inp.get(
                "replacement_pr_outcome") == "success" else "HOLD"
            reasons.append("expected_files mismatch → HOLD/REPLACEMENT_REQUIRED")
        elif not cond["bot_token_is_ghs_app"] and inp.get(
                "bot_token_source", "") in ("", "ABSENT"):
            verdict = "HOLD"
            reasons.append("BOT token absent → HOLD")
        elif not cond["post_merge_smoke_configured"]:
            verdict = "HOLD"
            reasons.append("post-merge smoke missing → HOLD")
        elif not cond["reconcile_evidence_configured"]:
            verdict = "HOLD"
            reasons.append("reconcile evidence missing → HOLD")
        elif not cond["gemini_unresolved_zero"] and inp.get(
                "gemini_severity") in ("high", "critical"):
            verdict = "HOLD"
            reasons.append("Gemini unresolved high/critical → HOLD")

    # 5) WAIT — recoverable transient gates
    if verdict is None:
        if not cond["ci_all_success"]:
            verdict, reasons = "WAIT", reasons + ["CI not green → WAIT"]
        elif not cond["merge_state_clean"]:
            verdict, reasons = "WAIT", reasons + ["mergeStateStatus≠CLEAN → WAIT"]
        elif not cond["gemini_unresolved_zero"] and inp.get(
                "gemini_severity") in ("low", "medium", None):
            verdict, reasons = "WAIT", reasons + ["Gemini unresolved low/medium → WAIT"]

    # 6) MERGE_CANDIDATE_READY — all 14 true
    if verdict is None:
        if all(cond.values()):
            verdict = "MERGE_CANDIDATE_READY"
            reasons.append("all 14 conditions true")
        else:  # defensive: never auto-ready if any condition false
            verdict = "HOLD"
            failed = [k for k, v in cond.items() if not v]
            reasons.append(f"residual failed conditions: {failed}")

    return {
        "id": case.get("id"),
        "name": case.get("name"),
        "conditions": cond,
        "conditions_all_true": all(cond.values()),
        "verdict": verdict,
        "critical7_type": critical7,
        "reasons": reasons,
        "expected_verdict": case.get("expected_verdict"),
        "expected_critical7": case.get("expected_critical7"),
        "match": (verdict == case.get("expected_verdict")
                  and critical7 == case.get("expected_critical7")),
    }


# ── live-write guard (proves the engine has zero mutation surface) ──────────
def assert_no_live_write_surface() -> dict:
    """Static self-audit: this module must not reference any GitHub/git write.

    Scans its own source for forbidden write tokens. Any hit = the engine
    would be capable of a live mutation → fail closed.
    """
    src = Path(__file__).read_text(encoding="utf-8")
    forbidden = ["gh pr merge", "gh pr create", "git push", "git commit",
                 "git merge", "git branch -", "--no-dry-run", "subprocess.run",
                 "subprocess.Popen", "os.system"]
    # allow the literal list above (this very definition) by checking usage,
    # not the audit table itself: count occurrences outside this function body.
    hits = []
    for tok in forbidden:
        # the only legitimate appearance is inside this audit's `forbidden`
        # list; >1 occurrence implies real usage elsewhere.
        if src.count(tok) > 1:
            hits.append(tok)
    return {"forbidden_tokens_scanned": forbidden,
            "violations": hits,
            "no_live_write_surface": not hits}


def run(emit: bool) -> dict:
    fixture_path = REPO_ROOT / "memory/fixtures/task-2609.critical7-cases.json"
    cases = json.loads(fixture_path.read_text(encoding="utf-8"))["cases"]
    results = [judge_case(c) for c in cases]
    consume = consume_modules_readonly()
    token_pf = redacted_token_preflight()
    write_guard = assert_no_live_write_surface()

    all_match = all(r["match"] for r in results)
    ready_case = next((r for r in results
                       if r["name"] == "all14_true"), None)
    auto_pr_ready = bool(ready_case and ready_case["verdict"]
                         == "MERGE_CANDIDATE_READY")
    consume_ok = consume["all_present"] and consume["all_symbols_ok"]
    no_owner_pat = not token_pf["owner_pat_detected"]

    track_pass = (all_match and consume_ok and write_guard["no_live_write_surface"]
                  and no_owner_pat and not token_pf["raw_exposed"])

    hold_for_chair = (not track_pass) and any(
        r["verdict"] in ("FAIL_CLOSED", "CRITICAL7") and not r["match"]
        for r in results)

    summary = {
        "task": "task-2609",
        "track": "F",
        "title": "AUTO_PR_AND_MERGE_READINESS_DRY_RUN",
        "generated_at": _utcnow(),
        "mode": "read-only · dry-run · proposal-only · mock-only",
        "track_pass": track_pass,
        "hold_for_chair": hold_for_chair,
        "auto_pr_creation_readiness": auto_pr_ready,
        "auto_merge_readiness": auto_pr_ready,
        "conditions_count": len(CONDITION_KEYS),
        "regression_count": len(cases),
        "regression_all_match": all_match,
        "consume_byte0_ok": consume_ok,
        "no_live_write_surface": write_guard["no_live_write_surface"],
        "owner_pat_used": False,
        "real_write_or_merge_attempts": 0,
        "next_step_limited_live_pr_open_pilot_possible": (
            track_pass and auto_pr_ready and not hold_for_chair),
        "authority_note": ("readiness/proposal-only — Track F PASS confers "
                           "NO automatic real PR/merge permission (§8/§10)"),
    }

    if emit:
        _emit_artifacts(summary, results, consume, token_pf, write_guard)
    return {"summary": summary, "results": results,
            "consume": consume, "token_preflight": token_pf,
            "write_guard": write_guard}


def _w(rel: str, obj) -> None:
    p = REPO_ROOT / rel
    p.parent.mkdir(parents=True, exist_ok=True)
    p.write_text(json.dumps(obj, ensure_ascii=False, indent=2) + "\n",
                 encoding="utf-8")


def _emit_artifacts(summary, results, consume, token_pf, write_guard):
    matrix = {"task": "task-2609", "generated_at": summary["generated_at"],
              "conditions": CONDITION_KEYS,
              "per_case": [{"id": r["id"], "name": r["name"],
                            "conditions": r["conditions"],
                            "verdict": r["verdict"],
                            "critical7_type": r["critical7_type"]}
                           for r in results]}
    queue_head = {"task": "task-2609",
                  "dry_run": True,
                  "queue_head_evaluation": [
                      {"id": r["id"], "name": r["name"],
                       "queue_head": r["conditions"]["queue_head"],
                       "verdict": r["verdict"]} for r in results],
                  "note": "no gh/git call — fixture-driven dry-run only"}
    smoke_recon = {"task": "task-2609", "dry_run": True,
                   "post_merge_smoke": [
                       {"id": r["id"], "name": r["name"],
                        "smoke_configured":
                            r["conditions"]["post_merge_smoke_configured"],
                        "reconcile_configured":
                            r["conditions"]["reconcile_evidence_configured"],
                        "verdict": r["verdict"]} for r in results],
                   "note": "post_merge_smoke_runner / "
                           "run_post_merge_reconcile_closeout consumed byte-0; "
                           "no execution"}
    c7 = [r for r in results if r["verdict"] in ("CRITICAL7", "FAIL_CLOSED")]
    c7_fixture = {"task": "task-2609",
                  "critical7_and_fail_closed_cases": [
                      {"id": r["id"], "name": r["name"],
                       "verdict": r["verdict"],
                       "critical7_type": r["critical7_type"],
                       "expected": r["expected_verdict"],
                       "match": r["match"]} for r in c7]}
    decision = {
        "task": "task-2609", "track": "F",
        "decision": ("TRACK_F_PASS" if summary["track_pass"]
                     else ("HOLD_FOR_CHAIR" if summary["hold_for_chair"]
                           else "TRACK_F_FAIL")),
        "generated_at": summary["generated_at"],
        "auto_pr_readiness": summary["auto_pr_creation_readiness"],
        "auto_merge_readiness": summary["auto_merge_readiness"],
        "proposal_only": True,
        "confers_real_pr_merge_permission": False,
        "next_step_limited_live_pr_open_pilot_possible":
            summary["next_step_limited_live_pr_open_pilot_possible"],
        "callback_owner": "independent ANU key c119085addb0f8b7",
        "executor_self_callback_forbidden": True,
    }
    result_doc = {
        "task": "task-2609", "track": "F",
        "status": "PASS" if summary["track_pass"] else (
            "HOLD_FOR_CHAIR" if summary["hold_for_chair"] else "FAIL"),
        "summary": summary,
        "regression_results": results,
        "consume_byte0": consume,
        "bot_token_redacted_preflight": token_pf,
        "no_live_write_surface": write_guard,
    }

    _w("memory/events/task-2609.merge_gate_matrix.json", matrix)
    _w("memory/events/task-2609.queue_head_dry_run.json", queue_head)
    _w("memory/events/task-2609.post_merge_smoke_reconcile_dry_run.json",
       smoke_recon)
    _w("memory/events/task-2609.critical7_failure_fixture_results.json",
       c7_fixture)
    _w("memory/events/task-2609.bot_token_redacted_preflight.json", token_pf)
    _w("memory/events/task-2609.auto_pr_merge_readiness.result.json",
       result_doc)
    _w("memory/events/task-2609.auto_pr_merge_readiness.decision.json",
       decision)
    _w("memory/events/task-2609.result.json", result_doc)
    _w("memory/events/task-2609.decision.json", decision)


def main(argv=None) -> int:
    ap = argparse.ArgumentParser(description="task-2609 Track F dry-run")
    ap.add_argument("--emit", action="store_true",
                    help="write task-2609.* allowlisted artifacts")
    ap.add_argument("--json", action="store_true",
                    help="print summary json to stdout")
    args = ap.parse_args(argv)
    out = run(emit=args.emit)
    if args.json or not args.emit:
        print(json.dumps(out["summary"], ensure_ascii=False, indent=2))
    return 0 if out["summary"]["track_pass"] else 1


if __name__ == "__main__":
    raise SystemExit(main())
