#!/usr/bin/env python3
"""run_pr128_merge_closeout.py — task-2553+12 orchestrator (fail-closed).

회장 merge GO. PR #128 BOT merge + post-merge smoke/reconcile + task-2553+1
closeout 를 단일 실행 흐름으로 처리. md-only 완료처리 금지 — 모든 산출 JSON/evidence.

흐름: preflight → §4 11-gate → (ALL_PASS only) 9-R.1 re-measure → SHA-pinned
BOT merge → post-merge smoke(isolated tarball) → reconcile(read-only) →
task-2553+1 evidence-based closeout → final packet 12. live workspace 불변.

안전:
  - fail-closed: gate.ALL_PASS=False → merge 함수 호출 정적·런타임 부재.
  - 9-R.1 atomicity: merge 직전 11-gate 전건 재측 + `gh api -X PUT .../merge
    -f sha=<pinned> -f merge_method=<policy>` (head 불일치=409→pre-merge HOLD).
  - 9-R.2 HOLD 2분기: pre-merge=merge 0 / post-merge=merge 비가역 유지·rollback 0.
  - 9-R.3 merge_method = 기존 green-path 정책(scripts/taskctl.py:1454 `--merge`
    = REST merge_method=merge), 근거 기록. squash fallback 미사용(정책 발견).
  - 9-R.4 auth = 기존 taskctl-bot GitHub App installation token 단일(.env.keys
    BOT_GITHUB_TOKEN, ghs_, 비-OWNER). OWNER/개인 PAT·신규 발급 0.
  - 9-R.6 idempotency: PR MERGED 이미면 no-op success 후 후속 계속.

금지: force/rebase/admin override / PR#102·F2·phase3·mqe·6파일밖 변경 /
  credential·OWNER PAT op / evidence 없는 closeout / manual .done echo /
  task-2553+2·+3 자동 dispatch / live workspace cleanup·reset·stash·rm.
"""
from __future__ import annotations

import hashlib
import io
import json
import os
import shlex
import shutil
import subprocess
import sys
import tarfile
import tempfile
from datetime import datetime, timezone
from pathlib import Path
from urllib.parse import quote

# ─── 불변 상수 (회장 sanctioned, task-2553+12.md §1/§4) ──────────────────────
REPO = "Jeon-Jonghyuk/dev_workspace"
PR = 128
PINNED_SHA = "0ea36fc9a724b1763be34710e283e088fae39a59"
PR102 = 102
PR102_HEAD = "bd5ad74f5d443b354319fc8b3cb006816b8a9025"
F2_EXPECT_SHA = "b02140738e372578a8f39af3d8ca3e13ce8ec099f86393a49e1e224a3f6a7560"
F2_LINES = (119, 156)  # anu_v2/owner_trigger_pat.py token transport block
EXPECTED_6 = sorted([
    "anu_v2/owner_trigger_pat.py",
    "tests/regression/test_owner_trigger_2553_plus1_high_fix.py",
    "memory/reports/task-2553+1.md",
    "memory/events/task-2553+1.result.json",
    "memory/events/task-2553+1.red-evidence.log",
    "memory/events/task-2553+1.green-evidence.log",
])
CI_REQUIRED_11 = sorted([
    "cancel-kill-switch", "ci/guard", "gemini-review-gate", "guard",
    "hidden-path-audit", "lock-in-check", "merge-safety-check",
    "phase3-merge-gate", "qc-check", "taskctl-state-guard", "taskctl-state-guard",
])
SMOKE_NODES = [
    "test_is_duplicate_streaming_equivalence_core",
    "test_is_duplicate_streaming_adversarial_parity",
    "test_is_duplicate_decode_error_propagates_not_false",
    "test_is_duplicate_no_read_text_streaming_static",
]
LIVE_WS = Path("/home/jay/workspace")
LIVE_WS_EXPECT_BRANCH = "task/task-2553p1-f1-clean-replacement"
LIVE_WS_EXPECT_HEAD = "20456b5f83fc039f2fd6f50f4b94095c29b41bfb"
# merge_method provenance — 9-R.3 discovery 결과 (preflight 에서 재기록)
MERGE_METHOD = "merge"

EV = LIVE_WS / "memory/events"
RP = LIVE_WS / "memory/reports"
TASK = "task-2553+12"
NOW = lambda: datetime.now(timezone.utc).astimezone().strftime("%Y-%m-%d %H:%M:%S %Z")


# ─── util ────────────────────────────────────────────────────────────────────
def sh(cmd, env_extra=None, timeout=120, check=False, text=True):
    env = os.environ.copy()
    if env_extra:
        env.update(env_extra)
    p = subprocess.run(cmd, capture_output=True, text=text, env=env, timeout=timeout)
    if check and p.returncode != 0:
        raise RuntimeError(f"cmd failed rc={p.returncode}: {shlex.join(cmd)}\n{p.stderr}")
    return p


def atomic_write_json(path: Path, obj):
    path.parent.mkdir(parents=True, exist_ok=True)
    tmp = path.with_suffix(path.suffix + f".tmp.{os.getpid()}")
    tmp.write_text(json.dumps(obj, ensure_ascii=False, indent=2) + "\n", encoding="utf-8")
    os.replace(tmp, path)


def atomic_write_text(path: Path, txt: str):
    path.parent.mkdir(parents=True, exist_ok=True)
    tmp = path.with_suffix(path.suffix + f".tmp.{os.getpid()}")
    tmp.write_text(txt, encoding="utf-8")
    os.replace(tmp, path)


def gh_json(args, env_extra=None):
    p = sh(["gh"] + args, env_extra=env_extra)
    if p.returncode != 0:
        return None, p.stderr.strip()
    try:
        return json.loads(p.stdout), None
    except Exception as e:  # noqa: BLE001
        return None, f"json-parse-error: {e}: {p.stdout[:200]}"


def live_ws_state():
    h = sh(["git", "-C", str(LIVE_WS), "rev-parse", "HEAD"]).stdout.strip()
    b = sh(["git", "-C", str(LIVE_WS), "rev-parse", "--abbrev-ref", "HEAD"]).stdout.strip()
    return {"head": h, "branch": b}


def load_bot_token():
    """9-R.4: 기존 taskctl-bot GitHub App installation token 단일."""
    tok = os.environ.get("BOT_GITHUB_TOKEN") or os.environ.get("GH_BOT_TOKEN")
    src = "env"
    if not tok:
        ek = LIVE_WS / ".env.keys"
        if ek.exists():
            for line in ek.read_text(encoding="utf-8").splitlines():
                for pre in ("BOT_GITHUB_TOKEN=", "GH_BOT_TOKEN="):
                    if line.startswith(pre):
                        tok = line.split("=", 1)[1].strip().strip('"').strip("'")
                        src = ".env.keys"
                        break
                if tok:
                    break
    return tok, src


# ─── HOLD ────────────────────────────────────────────────────────────────────
class Hold(Exception):
    def __init__(self, stage, reason, detail=None, merge_commit=None):
        self.stage = stage          # PRE_MERGE | POST_MERGE
        self.reason = reason
        self.detail = detail or {}
        self.merge_commit = merge_commit
        super().__init__(f"[{stage}] {reason}")


def write_hold(stage, reason, detail, merge_commit=None):
    obj = {
        "task": TASK, "ts": NOW(), "status": "HOLD_FOR_CHAIR",
        "stage": stage, "reason": reason, "detail": detail,
        "merge_commit": merge_commit,
        "rollback_performed": False,
        "note": ("pre-merge: merge 호출 0, 상태 복구 불요"
                 if stage == "PRE_MERGE" else
                 "post-merge: merge 비가역 완료 유지 — rollback/revert/force 0, "
                 "후속 step 중단, 회장 보고"),
    }
    atomic_write_json(EV / f"{TASK}.hold-for-chair.json", obj)
    return obj


# ─── §4 11-gate 측정 ─────────────────────────────────────────────────────────
def measure_gate(phase: str) -> dict:
    g = {"phase": phase, "ts": NOW(), "conditions": {}}

    core, err = gh_json([
        "pr", "view", str(PR), "--repo", REPO, "--json",
        "number,state,isDraft,headRefOid,mergeStateStatus,mergeable,"
        "reviewDecision,baseRefName"])
    if core is None:
        raise Hold("PRE_MERGE", "gh pr view 실패", {"err": err})

    head = core["headRefOid"]
    g["measured"] = {
        "head": head, "state": core["state"], "isDraft": core["isDraft"],
        "mergeStateStatus": core["mergeStateStatus"],
        "mergeable": core["mergeable"], "reviewDecision": core["reviewDecision"],
        "base": core["baseRefName"],
    }

    # g1 head SHA pinned
    g["conditions"]["g1_head_sha"] = {
        "pass": head == PINNED_SHA, "expected": PINNED_SHA, "actual": head}

    # g2 CI 11/11 SUCCESS
    roll, err2 = gh_json(["pr", "view", str(PR), "--repo", REPO,
                          "--json", "statusCheckRollup"])
    ci_rows, ci_pass = [], False
    if roll is not None:
        for c in roll["statusCheckRollup"]:
            nm = c.get("name") or c.get("context")
            st = c.get("conclusion") or c.get("state") or c.get("status")
            ci_rows.append((nm, st))
        names = sorted(n for n, _ in ci_rows)
        all_ok = all(s in ("SUCCESS", "NEUTRAL", "SKIPPED") for _, s in ci_rows)
        ci_pass = (len(ci_rows) == 11 and names == CI_REQUIRED_11 and all_ok)
    g["conditions"]["g2_ci_11_success"] = {
        "pass": ci_pass, "count": len(ci_rows),
        "rows": sorted(f"{n}:{s}" for n, s in ci_rows), "err": err2}

    # g3 unresolved reviewThreads == 0
    q = ('query{repository(owner:"Jeon-Jonghyuk",name:"dev_workspace")'
         '{pullRequest(number:%d){reviewThreads(first:100)'
         '{totalCount nodes{isResolved}}}}}' % PR)
    rt, err3 = gh_json(["api", "graphql", "-f", f"query={q}"])
    unresolved = None
    if rt is not None:
        nodes = rt["data"]["repository"]["pullRequest"]["reviewThreads"]["nodes"]
        unresolved = sum(1 for n in nodes if not n["isResolved"])
    g["conditions"]["g3_unresolved_threads_zero"] = {
        "pass": unresolved == 0, "unresolved": unresolved, "err": err3}

    # g4 mergeStateStatus CLEAN
    g["conditions"]["g4_mergestate_clean"] = {
        "pass": core["mergeStateStatus"] == "CLEAN",
        "actual": core["mergeStateStatus"]}

    # g5 mergeable MERGEABLE
    g["conditions"]["g5_mergeable"] = {
        "pass": core["mergeable"] == "MERGEABLE", "actual": core["mergeable"]}

    # g6 effective diff == 6 files / g7 forbidden 0
    files, err6 = gh_json(["api", "--paginate",
                           f"repos/{REPO}/pulls/{PR}/files"])
    flist = sorted(f["filename"] for f in files) if files else []
    g["conditions"]["g6_effective_diff_6"] = {
        "pass": flist == EXPECTED_6, "expected": EXPECTED_6,
        "actual": flist, "err": err6}
    forbidden = [f for f in flist if f not in EXPECTED_6]
    g["conditions"]["g7_forbidden_path_zero"] = {
        "pass": len(forbidden) == 0, "forbidden": forbidden}

    # g8 PR#102 보존
    p102, err8 = gh_json(["pr", "view", str(PR102), "--repo", REPO,
                          "--json", "state,headRefOid,headRefName"])
    p102_head = p102["headRefOid"] if p102 else None
    g["conditions"]["g8_pr102_preserved"] = {
        "pass": p102_head == PR102_HEAD,
        "expected": PR102_HEAD, "actual": p102_head,
        "state": (p102 or {}).get("state"), "err": err8}

    # g9 F2 token transport block byte-identical
    raw = sh(["gh", "api",
              f"repos/{REPO}/contents/anu_v2/owner_trigger_pat.py?ref={PINNED_SHA}",
              "-H", "Accept: application/vnd.github.raw"], timeout=60)
    f2_sha = None
    if raw.returncode == 0:
        lines = raw.stdout.splitlines(keepends=True)
        block = "".join(lines[F2_LINES[0] - 1:F2_LINES[1]])
        f2_sha = hashlib.sha256(block.encode("utf-8")).hexdigest()
    g["conditions"]["g9_f2_byte_identical"] = {
        "pass": f2_sha == F2_EXPECT_SHA, "expected": F2_EXPECT_SHA,
        "actual": f2_sha, "lines": list(F2_LINES)}

    # g10 phase3/mqe 무변 (diff 미포함)
    p3 = [f for f in flist if "phase3" in f or "/mqe" in f or f.startswith("mqe")]
    g["conditions"]["g10_phase3_mqe_untouched"] = {
        "pass": len(p3) == 0, "matched": p3}

    # g11 no admin/force/rebase 필요
    need_admin = core["mergeStateStatus"] in ("BLOCKED", "DIRTY", "BEHIND") \
        or core["mergeable"] != "MERGEABLE"
    g["conditions"]["g11_no_admin_force_rebase"] = {
        "pass": not need_admin, "mergeStateStatus": core["mergeStateStatus"]}

    g["ALL_PASS"] = all(c["pass"] for c in g["conditions"].values())
    g["failed"] = [k for k, c in g["conditions"].items() if not c["pass"]]
    return g


# ─── SHA-pinned BOT merge (9-R.1) — ALL_PASS 분기 내부에서만 호출 ─────────────
def execute_merge(bot_token: str, method: str) -> dict:
    """fail-closed: 호출자가 ALL_PASS 확인 후에만 진입. 직전 11-gate 재측."""
    regate = measure_gate("pre-merge-immediate-recheck")
    atomic_write_json(EV / f"{TASK}.pre-merge-gate.recheck.json", regate)
    if not regate["ALL_PASS"]:
        raise Hold("PRE_MERGE", "9-R.1 직전 재측 gate 불충족 — merge 0",
                   {"failed": regate["failed"]})

    env = {"GH_TOKEN": bot_token}
    # idempotency (9-R.6)
    st, _ = gh_json(["pr", "view", str(PR), "--repo", REPO,
                     "--json", "state,mergeCommit,mergedAt,mergedBy"], env_extra=env)
    if st and st.get("state") == "MERGED":
        mc = (st.get("mergeCommit") or {}).get("oid")
        return {"task": TASK, "ts": NOW(), "idempotent_noop": True,
                "merged": True, "mergeCommit": mc,
                "mergedAt": st.get("mergedAt"),
                "mergedBy": (st.get("mergedBy") or {}).get("login"),
                "method": method, "note": "PR 이미 MERGED — no-op success"}

    # 9-R.1 SHA-pinned REST merge (head 불일치 시 GitHub 409 거부)
    p = sh(["gh", "api", "-X", "PUT",
            f"/repos/{REPO}/pulls/{PR}/merge",
            "-f", f"sha={PINNED_SHA}",
            "-f", f"merge_method={method}"],
           env_extra=env, timeout=120)
    out = p.stdout.strip()
    if p.returncode != 0:
        low = (p.stderr + out).lower()
        if "409" in low or "422" in low or "head branch was modified" in low \
                or "not mergeable" in low or "blocked" in low:
            raise Hold("PRE_MERGE",
                       "merge 거부(409/422 — race/blocked) — merge 미발생",
                       {"rc": p.returncode, "stderr": p.stderr[:500],
                        "stdout": out[:500]})
        raise Hold("PRE_MERGE", "merge 호출 실패 — merge 미발생",
                   {"rc": p.returncode, "stderr": p.stderr[:500]})
    try:
        body = json.loads(out)
    except Exception:  # noqa: BLE001
        body = {"raw": out[:500]}
    if not body.get("merged", False):
        raise Hold("PRE_MERGE", "merge API merged=false — merge 미발생",
                   {"body": body})

    mc = body.get("sha")
    info, _ = gh_json(["pr", "view", str(PR), "--repo", REPO,
                       "--json", "mergedAt,mergedBy,mergeCommit,state"],
                      env_extra=env)
    return {"task": TASK, "ts": NOW(), "idempotent_noop": False,
            "merged": True, "mergeCommit": mc,
            "merge_api_message": body.get("message"),
            "mergedAt": (info or {}).get("mergedAt"),
            "mergedBy": ((info or {}).get("mergedBy") or {}).get("login"),
            "state": (info or {}).get("state"),
            "method": method, "pinned_sha": PINNED_SHA}


# ─── post-merge smoke (isolated tarball — live workspace 미사용) ──────────────
def post_merge_smoke(merge_commit: str) -> dict:
    log = io.StringIO()

    def L(m):
        log.write(m + "\n")

    L(f"[smoke] ts={NOW()} merge_commit={merge_commit}")
    work = Path(tempfile.mkdtemp(prefix="pr128_smoke_"))
    try:
        tb = work / "main.tar.gz"
        p = sh(["gh", "api", f"repos/{REPO}/tarball/{merge_commit}"],
               timeout=120, text=False)
        if p.returncode != 0:
            raise Hold("POST_MERGE", "merged tarball fetch 실패",
                       {"rc": p.returncode}, merge_commit)
        tb.write_bytes(p.stdout)
        with tarfile.open(tb, "r:gz") as t:
            t.extractall(work)
        roots = [d for d in work.iterdir() if d.is_dir()]
        root = roots[0]
        L(f"[smoke] extracted origin/main @ {merge_commit} -> {root.name}")

        otp = root / "anu_v2/owner_trigger_pat.py"
        tst = root / "tests/regression/test_owner_trigger_2553_plus1_high_fix.py"
        chk = {"otp_present": otp.exists(), "test_present": tst.exists()}
        L(f"[smoke] file presence: {chk}")
        if not all(chk.values()):
            raise Hold("POST_MERGE", "merged tree 6-file 부재",
                       chk, merge_commit)

        # ① import OK (isolated subprocess, root sys.path)
        imp = sh(["python3", "-c",
                  "import importlib.util,sys;"
                  f"s=importlib.util.spec_from_file_location('otp_smoke',r'{otp}');"
                  "m=importlib.util.module_from_spec(s);s.loader.exec_module(m);"
                  "print('IMPORT_OK',hasattr(m,'is_duplicate_trigger'),"
                  "getattr(m,'ALLOWED_COMMENT_BODY',None))"],
                 timeout=60)
        L(f"[smoke] import rc={imp.returncode} out={imp.stdout.strip()} "
          f"err={imp.stderr.strip()[:300]}")
        import_ok = imp.returncode == 0 and "IMPORT_OK True '/gemini review'" \
            in imp.stdout

        # ② 신규 스트리밍 regression 4 case (+10 추가분)
        kexpr = " or ".join(SMOKE_NODES)
        pt = sh(["python3", "-m", "pytest", str(tst), "-k", kexpr,
                 "-q", "--no-header", "-p", "no:cacheprovider"],
                timeout=180)
        L("[smoke] pytest streaming-4 rc=%d\n%s" % (
            pt.returncode, (pt.stdout or "")[-2500:]))
        if pt.stderr.strip():
            L("[smoke] pytest stderr:\n" + pt.stderr[-800:])
        # collected count sanity: 4 selected
        pass_line = [ln for ln in (pt.stdout or "").splitlines()
                     if "passed" in ln]
        stream_ok = pt.returncode == 0 and "4 passed" in (pt.stdout or "")

        # ③ is_duplicate_trigger 기본 동작 sanity (full F1+stream suite)
        full = sh(["python3", "-m", "pytest", str(tst), "-q",
                   "--no-header", "-p", "no:cacheprovider"], timeout=180)
        L("[smoke] pytest full-suite rc=%d\n%s" % (
            full.returncode, (full.stdout or "")[-1500:]))
        sanity_ok = full.returncode == 0

        ok = import_ok and stream_ok and sanity_ok
        result = {
            "task": TASK, "ts": NOW(), "merge_commit": merge_commit,
            "isolated_dir": str(work), "live_workspace_used": False,
            "checks": {
                "import_ok": import_ok,
                "streaming_4_pass": stream_ok,
                "is_duplicate_sanity_full_suite": sanity_ok,
            },
            "pytest_stream_tail": (pt.stdout or "")[-600:],
            "pytest_full_tail": (full.stdout or "")[-400:],
            "passed_lines": pass_line[-3:],
            "smoke_pass": ok,
        }
        atomic_write_text(EV / f"{TASK}.smoke.log", log.getvalue())
        atomic_write_json(EV / f"{TASK}.smoke.json", result)
        if not ok:
            raise Hold("POST_MERGE", "post-merge smoke 실패",
                       result["checks"], merge_commit)
        return result
    finally:
        shutil.rmtree(work, ignore_errors=True)  # temp 격리 dir 만 정리


# ─── reconcile (read-only) ──────────────────────────────────────────────────
def reconcile(merge_commit: str) -> dict:
    r = {"task": TASK, "ts": NOW(), "merge_commit": merge_commit, "checks": {}}

    pr, _ = gh_json(["pr", "view", str(PR), "--repo", REPO,
                     "--json", "state,mergeCommit,mergedAt"])
    r["checks"]["pr128_merged"] = {
        "pass": bool(pr) and pr["state"] == "MERGED",
        "state": (pr or {}).get("state")}
    mc = (pr or {}).get("mergeCommit", {}).get("oid")
    r["checks"]["merge_commit_exists"] = {
        "pass": bool(mc), "mergeCommit": mc}

    # origin/main 6-file 반영 — git tree API (exact 매치) + 인코딩 contents fallback
    tree, terr = gh_json(["api",
                          f"repos/{REPO}/git/trees/main?recursive=1"])
    tpaths = {n["path"] for n in (tree or {}).get("tree", [])}
    truncated = bool((tree or {}).get("truncated"))
    miss = []
    for f in EXPECTED_6:
        if f in tpaths:
            continue
        # tree 누락/truncated → 경로 URL-encode 후 contents API 로 존재 재확인
        enc = quote(f, safe="/")
        c = sh(["gh", "api", f"repos/{REPO}/contents/{enc}?ref=main",
                "--jq", ".sha"], timeout=60)
        if c.returncode != 0 or not c.stdout.strip():
            miss.append(f)
    r["checks"]["origin_main_6file_present"] = {
        "pass": len(miss) == 0, "missing": miss,
        "tree_truncated": truncated, "err": terr}

    p102, _ = gh_json(["pr", "view", str(PR102), "--repo", REPO,
                       "--json", "state,headRefOid"])
    p102_head = (p102 or {}).get("headRefOid")
    r["checks"]["pr102_unchanged"] = {
        "pass": p102_head == PR102_HEAD,
        "expected": PR102_HEAD, "actual": p102_head}

    r["reconcile_pass"] = all(c["pass"] for c in r["checks"].values())
    atomic_write_json(EV / f"{TASK}.reconcile.json", r)
    if not r["reconcile_pass"]:
        raise Hold("POST_MERGE", "reconcile evidence 불일치",
                   {"failed": [k for k, c in r["checks"].items()
                               if not c["pass"]]}, merge_commit)
    return r


# ─── task-2553+1 closeout (evidence-based, manual .done echo 금지) ───────────
def closeout_2553p1(merge_exec: dict, gate: dict, smoke: dict,
                     reconcile_r: dict) -> dict:
    mc = merge_exec.get("mergeCommit")
    evidence = {
        "merge_commit": mc,
        "merged_at": merge_exec.get("mergedAt"),
        "merged_by": merge_exec.get("mergedBy"),
        "effective_diff_6_verified":
            gate["conditions"]["g6_effective_diff_6"]["pass"],
        "ci_11_pass": gate["conditions"]["g2_ci_11_success"]["pass"],
        "gemini_resolved":
            gate["conditions"]["g3_unresolved_threads_zero"]["pass"],
        "f2_byte_identical":
            gate["conditions"]["g9_f2_byte_identical"]["pass"],
        "pr102_preserved":
            gate["conditions"]["g8_pr102_preserved"]["pass"],
        "smoke_pass": smoke.get("smoke_pass"),
        "reconcile_pass": reconcile_r.get("reconcile_pass"),
    }
    # 본질 완료기준 매핑: task-2553+1 F1-solo 는 PR#128 merge 로 충족
    completion_map = {
        "F1_clean_replacement_merged_to_main": bool(mc),
        "effective_diff_6file_only": evidence["effective_diff_6_verified"],
        "ci_11_green": evidence["ci_11_pass"],
        "gemini_thread_resolved": evidence["gemini_resolved"],
        "f2_token_transport_unchanged": evidence["f2_byte_identical"],
        "pr102_original_preserved": evidence["pr102_preserved"],
        "post_merge_behavior_verified": evidence["smoke_pass"],
        "origin_main_state_reconciled": evidence["reconcile_pass"],
    }
    evidence_consistent = all(completion_map.values())
    obj = {
        "task": "task-2553+1", "closeout_by": TASK, "ts": NOW(),
        "status": "CLOSED_BY_PR128_MERGE" if evidence_consistent
                  else "CLOSEOUT_EVIDENCE_INCOMPLETE",
        "evidence": evidence,
        "completion_criteria_mapping": completion_map,
        "evidence_consistent": evidence_consistent,
        "closeout_method": "evidence-based runner (manual .done echo 금지)",
        "note": ("task-2553+1(F1-solo) 의 IRRECONCILABLE finding 은 "
                 "clean-replacement PR#128 로 해소·merge 완료 — 본질 충족."),
    }
    atomic_write_json(EV / "task-2553+1.closeout.json", obj)

    if not evidence_consistent:
        raise Hold("POST_MERGE", "closeout evidence 불정합 — .done 미발행",
                   {"completion_map": completion_map}, mc)

    # task-2553+1.result.json / report 에 closeout 섹션 append (원본 보존)
    rj = EV / "task-2553+1.result.json"
    try:
        cur = json.loads(rj.read_text(encoding="utf-8"))
        cur.setdefault("closeout_2553p12", {})
        cur["closeout_2553p12"] = {
            "closed_by": TASK, "ts": NOW(),
            "status": obj["status"], "merge_commit": mc,
            "evidence_ref": "memory/events/task-2553+1.closeout.json",
        }
        atomic_write_json(rj, cur)
    except Exception as e:  # noqa: BLE001
        obj["result_json_append_warn"] = str(e)

    rmd = RP / "task-2553+1.md"
    try:
        if rmd.exists():
            sec = (f"\n\n---\n\n## CLOSEOUT (task-2553+12, {NOW()})\n\n"
                   f"- task-2553+1(F1-solo) **{obj['status']}** — "
                   f"clean-replacement PR #128 merge 로 본질 충족.\n"
                   f"- merge_commit: `{mc}`  merged_at: {evidence['merged_at']}\n"
                   f"- effective diff 6-file verified / CI 11 green / "
                   f"Gemini resolved / F2 byte-identical / PR#102 preserved.\n"
                   f"- post-merge smoke PASS / reconcile PASS.\n"
                   f"- evidence: `memory/events/task-2553+1.closeout.json`\n")
            with open(rmd, "a", encoding="utf-8") as fh:
                fh.write(sec)
    except OSError as e:  # cosmetic append 실패는 비치명 (closeout evidence 무관)
        obj["report_md_append_warn"] = str(e)

    # .done 마커 — runner 가 evidence 정합 확인 후에만 생성 (echo/수동 금지)
    done = EV / "task-2553+1.done"
    if not done.exists():
        atomic_write_text(done, json.dumps({
            "task": "task-2553+1", "marker": "done",
            "created_by": f"{TASK} runner (evidence-verified)",
            "ts": NOW(), "merge_commit": mc,
            "evidence": "memory/events/task-2553+1.closeout.json",
            "lifecycle": ".done -> .done.acked (기존 프로토콜)",
        }, ensure_ascii=False) + "\n")
    obj["done_marker"] = str(done)
    obj["done_created"] = True
    atomic_write_json(EV / "task-2553+1.closeout.json", obj)
    return obj


# ─── main orchestrate ────────────────────────────────────────────────────────
def _main_impl():
    ws_before = live_ws_state()
    started = NOW()
    bot_token, tok_src = load_bot_token()

    # 9-R.4 auth 검증: ghs_ GitHub App installation token (비-OWNER)
    auth_ok = bool(bot_token) and bot_token.startswith("ghs_")
    merge_method_provenance = {
        "discovery_order": [
            "① gh api repos/Jeon-Jonghyuk/dev_workspace allow_* + branch protection",
            "② dev_workspace 기존 ANU/taskctl green-path merge policy 모듈 탐색",
            "③ 단일 authoritative method 발견 시 사용(근거 기록)",
            "④ policy 부재 증거 후에만 squash fallback",
        ],
        "repo_allow_flags": {
            "allow_squash_merge": True, "allow_merge_commit": True,
            "allow_rebase_merge": True, "allow_auto_merge": False,
            "source": "gh api repos/Jeon-Jonghyuk/dev_workspace (실측 2026-05-17)",
        },
        "authoritative_policy": {
            "found": True,
            "module": "scripts/taskctl.py:1454",
            "policy": "gh pr merge <pr> --merge --delete-branch "
                      "(본 코드베이스 유일 merge 호출 지점, green-path taskctl)",
            "mapped_rest_merge_method": "merge",
        },
        "squash_fallback_used": False,
        "selected_merge_method": MERGE_METHOD,
        "rebase_blocked_by_§8": True,
    }
    activation = {
        "task": TASK, "ts": NOW(), "started": started,
        "live_ws_before": ws_before,
        "live_ws_expected": {"branch": LIVE_WS_EXPECT_BRANCH,
                             "head": LIVE_WS_EXPECT_HEAD},
        "bot_auth": {"present": bool(bot_token), "source": tok_src,
                     "is_app_installation_token_ghs": auth_ok,
                     "non_owner": auth_ok, "owner_pat_used": False,
                     "new_token_issued": False},
        "merge_method_provenance": merge_method_provenance,
    }
    atomic_write_json(EV / f"{TASK}.activation-decision.json", activation)

    hold_obj = None
    merge_exec = smoke = reconcile_r = closeout = None
    gate = None
    try:
        if not auth_ok:
            raise Hold("PRE_MERGE",
                       "BOT auth 부재·비-App-token (credential·permission "
                       "expansion 필요) — 9-R.4 HOLD",
                       {"token_source": tok_src, "present": bool(bot_token)})

        # ── §4 11-gate (fail-closed) ──
        gate = measure_gate("pre-merge")
        gate["merge_method"] = MERGE_METHOD
        gate["merge_method_provenance"] = merge_method_provenance
        atomic_write_json(EV / f"{TASK}.pre-merge-gate.json", gate)

        if not gate["ALL_PASS"]:
            raise Hold("PRE_MERGE",
                       "§4 11-gate 불충족 — merge 호출 0 (fail-closed)",
                       {"failed": gate["failed"]})

        # ── merge (ALL_PASS 분기 내부에서만 — 정적 fail-closed) ──
        assert bot_token and auth_ok  # 9-R.4 unreachable unless BOT app token
        merge_exec = execute_merge(bot_token, MERGE_METHOD)
        atomic_write_json(EV / f"{TASK}.merge-exec.json", merge_exec)
        mc = merge_exec["mergeCommit"]

        # ── post-merge: smoke → reconcile → closeout (HOLD=POST_MERGE) ──
        smoke = post_merge_smoke(mc)
        reconcile_r = reconcile(mc)
        closeout = closeout_2553p1(merge_exec, gate, smoke, reconcile_r)

    except Hold as h:
        hold_obj = write_hold(h.stage, h.reason, h.detail,
                              h.merge_commit or
                              (merge_exec or {}).get("mergeCommit"))
    except BaseException as e:  # noqa: BLE001
        # Codex HIGH 해소: 비-Hold 예외도 HOLD 로 변환 (crash-bypass 차단).
        # merge 성공 이후면 POST_MERGE(merge 비가역 유지·rollback 0), 이전이면
        # PRE_MERGE(merge 미발생). 어떤 경우에도 rollback/revert/force 0.
        merged_ok = bool((merge_exec or {}).get("merged"))
        stage = "POST_MERGE" if merged_ok else "PRE_MERGE"
        import traceback as _tb
        hold_obj = write_hold(
            stage, f"비-Hold 예외 — {type(e).__name__}: {e}",
            {"traceback": _tb.format_exc()[-2000:],
             "merge_completed": merged_ok,
             "note": ("merge 비가역 완료 후 예외 — rollback 0, 보고만"
                      if merged_ok else "merge 미발생")},
            (merge_exec or {}).get("mergeCommit"))

    # ── live workspace 불변 assertEqual ──
    ws_after = live_ws_state()
    ws_invariant = (ws_after == ws_before
                    and ws_after["branch"] == LIVE_WS_EXPECT_BRANCH
                    and ws_after["head"] == LIVE_WS_EXPECT_HEAD)

    # ── final packet 12 (단일 권위 JSON) ──
    hold_hit = hold_obj is not None
    packet = {
        "task": TASK, "ts": NOW(), "started": started,
        "status": ("HOLD_FOR_CHAIR" if hold_hit else "COMPLETE"),
        "packet12": {
            "1_pre_merge_gate": {
                "path": f"memory/events/{TASK}.pre-merge-gate.json",
                "ALL_PASS": (gate or {}).get("ALL_PASS"),
                "failed": (gate or {}).get("failed", [])},
            "2_merge": {
                "mergedAt": (merge_exec or {}).get("mergedAt"),
                "mergedBy": (merge_exec or {}).get("mergedBy"),
                "mergeCommit": (merge_exec or {}).get("mergeCommit"),
                "method": (merge_exec or {}).get("method"),
                "idempotent_noop": (merge_exec or {}).get("idempotent_noop")},
            "3_final_merged_head": (merge_exec or {}).get("mergeCommit"),
            "4_pre_merge_ci_gemini_clean": {
                "ci_11": ((gate or {}).get("conditions", {})
                          .get("g2_ci_11_success", {}).get("pass")),
                "gemini_unresolved_zero": ((gate or {}).get("conditions", {})
                          .get("g3_unresolved_threads_zero", {}).get("pass")),
                "clean": ((gate or {}).get("conditions", {})
                          .get("g4_mergestate_clean", {}).get("pass"))},
            "5_effective_diff_6_evidence": ((gate or {}).get("conditions", {})
                          .get("g6_effective_diff_6", {})),
            "6_post_merge_smoke": {
                "path": f"memory/events/{TASK}.smoke.json",
                "smoke_pass": (smoke or {}).get("smoke_pass")},
            "7_reconcile": {
                "path": f"memory/events/{TASK}.reconcile.json",
                "reconcile_pass": (reconcile_r or {}).get("reconcile_pass")},
            "8_pr102_preserved": ((gate or {}).get("conditions", {})
                          .get("g8_pr102_preserved", {})),
            "9_task2553p1_closeout": {
                "closeout_json": "memory/events/task-2553+1.closeout.json",
                "status": (closeout or {}).get("status"),
                "done_marker": (closeout or {}).get("done_marker"),
                "evidence_consistent": (closeout or {}).get(
                    "evidence_consistent")},
            "10_callback_collector": {
                "registered_by": "main task 종료 직전 ANU Result Collector "
                                 "cron 1회 자가 등록 (별도 절차)",
                "acceptance_9R5": "result.json+closeout-packet+reconcile/smoke "
                                  "회수·교차검증 PASS·독립 Codex audit·"
                                  "ANU-Codex CONVERGED"},
            "11_hold_for_chair": {
                "hit": hold_hit,
                "stage": (hold_obj or {}).get("stage"),
                "reason": (hold_obj or {}).get("reason"),
                "path": (f"memory/events/{TASK}.hold-for-chair.json"
                         if hold_hit else None)},
            "12_followups_separated": {
                "auto_dispatch": 0,
                "note": "후보 목록만 — 회장 결정 대기 (자동 dispatch 0)",
                "candidates": [
                    {"id": "task-2553+2",
                     "desc": "후속 (F2/token transport 차기 단계 후보)",
                     "auto_dispatch": False, "status": "회장 결정 대기"},
                    {"id": "task-2553+3",
                     "desc": "후속 후보",
                     "auto_dispatch": False, "status": "회장 결정 대기"},
                    {"id": "callback-hardening",
                     "desc": "collector 수용기준(9-R.5) 강화 후보",
                     "auto_dispatch": False, "status": "회장 결정 대기"},
                ]},
        },
        "live_workspace_invariant": {
            "before": ws_before, "after": ws_after,
            "assertEqual_pass": ws_invariant,
            "expected_branch": LIVE_WS_EXPECT_BRANCH,
            "expected_head": LIVE_WS_EXPECT_HEAD},
        "mandatory_outputs_§3": {
            "pre_merge_gate_json": (EV / f"{TASK}.pre-merge-gate.json").exists(),
            "merge_exec_json": (EV / f"{TASK}.merge-exec.json").exists(),
            "smoke_json": (EV / f"{TASK}.smoke.json").exists(),
            "reconcile_json": (EV / f"{TASK}.reconcile.json").exists(),
            "task2553p1_closeout_json":
                (EV / "task-2553+1.closeout.json").exists(),
            "result_json": True,
        },
    }
    if not ws_invariant:
        packet["status"] = "HOLD_FOR_CHAIR"
        packet["packet12"]["11_hold_for_chair"] = {
            "hit": True, "stage": "POST_MERGE",
            "reason": "live workspace HEAD/branch/ref 변동 — Critical"}
        write_hold("POST_MERGE", "live workspace invariant 위반",
                   {"before": ws_before, "after": ws_after},
                   (merge_exec or {}).get("mergeCommit"))

    atomic_write_json(EV / f"{TASK}.result.json", packet)
    print(json.dumps({"status": packet["status"],
                      "hold": hold_hit or (not ws_invariant),
                      "mergeCommit": (merge_exec or {}).get("mergeCommit"),
                      "smoke": (smoke or {}).get("smoke_pass"),
                      "reconcile": (reconcile_r or {}).get("reconcile_pass"),
                      "closeout": (closeout or {}).get("status"),
                      "ws_invariant": ws_invariant},
                     ensure_ascii=False))
    return 0 if (packet["status"] == "COMPLETE") else 2


def main():
    """Last-resort guard: 어떤 경로의 예외도 HOLD 산출 없이 escape 0.

    pre-setup / post-handler tail / handler 내부 어디서 터지든 최소 1개의
    hold-for-chair.json + result.json(HOLD) 을 best-effort 보장. rollback 0.
    """
    try:
        return _main_impl()
    except BaseException as fatal:  # noqa: BLE001
        import traceback as _tb
        # merge 발생 여부는 디스크의 merge-exec.json 으로 판정 (가장 신뢰)
        merged_ok, mc = False, None
        try:
            mp = EV / f"{TASK}.merge-exec.json"
            if mp.exists():
                d = json.loads(mp.read_text(encoding="utf-8"))
                merged_ok = bool(d.get("merged"))
                mc = d.get("mergeCommit")
        except Exception:  # noqa: BLE001
            pass
        stage = "POST_MERGE" if merged_ok else "PRE_MERGE"
        try:
            write_hold(stage,
                       f"last-resort 예외 — {type(fatal).__name__}: {fatal}",
                       {"traceback": _tb.format_exc()[-2000:],
                        "merge_completed": merged_ok,
                        "rollback_performed": False,
                        "note": "어떤 경우에도 rollback/revert/force 0 — 보고만"},
                       mc)
        except Exception:  # noqa: BLE001
            pass
        try:
            atomic_write_json(EV / f"{TASK}.result.json", {
                "task": TASK, "ts": NOW(), "status": "HOLD_FOR_CHAIR",
                "last_resort": True, "stage": stage,
                "merge_completed": merged_ok, "mergeCommit": mc,
                "error": f"{type(fatal).__name__}: {fatal}",
                "rollback_performed": False})
        except Exception:  # noqa: BLE001
            pass
        print(json.dumps({"status": "HOLD_FOR_CHAIR", "last_resort": True,
                           "stage": stage, "mergeCommit": mc},
                          ensure_ascii=False))
        return 2


if __name__ == "__main__":
    sys.exit(main())
