"""utils/state_repair.py — checksum mismatch 복구 코드 경로 + chairman approval evidence.

task-2472 구현 5: state 파일 수리는 반드시 이 모듈을 통해서만.
manual 편집 금지 — evidence + sha256 백업 + audit 없으면 fail-closed.

repair_action 종류:
  - "recompute_checksum": 기존 state 그대로 유지하고 _checksum 재계산
  - "rollback_to_backup": 가장 최신 backup 파일로 복원
  - "manual_fixup": new_state로 덮어쓰기 (위험, evidence 강력 검증)
"""
from __future__ import annotations

import getpass
import hashlib
import json
import os
import subprocess
import sys
import tempfile
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Optional

# task-2472+1: utils/workspace 경로 sys.path 보장 (reconcile 모듈에서 사용).
_UTILS_DIR = str(Path(__file__).parent)
_WORKSPACE_ROOT = str(Path(__file__).parent.parent)
for _p in (_UTILS_DIR, _WORKSPACE_ROOT):
    if _p not in sys.path:
        sys.path.insert(0, _p)

# --------------------------------------------------------------------------
# 상수 / 경로
# --------------------------------------------------------------------------

# state 파일 경로 (workspace root 상대)
STATE_DIR_REL = Path(".tasks/state")
BACKUP_DIR_REL = STATE_DIR_REL / ".backups"
VERIFY_PENDING_PREFIX = ".verify-pending-"

# audit jsonl 경로
AUDIT_JSONL_REL = Path("memory/orchestration-audit/checksum-repair.jsonl")

# --------------------------------------------------------------------------
# 내부 헬퍼
# --------------------------------------------------------------------------


def _now_iso() -> str:
    return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")


def _now_ts_compact() -> str:
    """파일명용 타임스탬프 (YYYYMMDDTHHMMSSZ)."""
    return datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ")


def _workspace_root(workspace: Optional[Path] = None) -> Path:
    return workspace or Path(os.environ.get("WORKSPACE_ROOT", "/home/jay/workspace"))


def _sha256_file(path: Path) -> str:
    """파일의 sha256 hex digest 반환."""
    h = hashlib.sha256()
    with path.open("rb") as f:
        for chunk in iter(lambda: f.read(65536), b""):
            h.update(chunk)
    return h.hexdigest()


def _sha256_bytes(data: bytes) -> str:
    return hashlib.sha256(data).hexdigest()


def _evidence_hash_str(record: dict) -> str:
    serialized = json.dumps(record, sort_keys=True, ensure_ascii=False)
    return hashlib.sha256(serialized.encode("utf-8")).hexdigest()


def _compute_checksum(state_dict: dict) -> str:
    """state dict에서 _checksum 제외한 내용의 sha256."""
    clean = {k: v for k, v in state_dict.items() if k != "_checksum"}
    serialized = json.dumps(clean, sort_keys=True, ensure_ascii=False)
    return hashlib.sha256(serialized.encode("utf-8")).hexdigest()


def _atomic_write_json(path: Path, data: dict) -> None:
    """tempfile + os.replace 방식 atomic write.

    Gemini 리뷰 medium: tempfile.mkstemp 기본 권한이 0o600이라 os.replace 후
    state 파일 권한이 제한됨. 0o644로 명시 chmod 후 replace.
    """
    path.parent.mkdir(parents=True, exist_ok=True)
    content = json.dumps(data, ensure_ascii=False, indent=2)
    fd, tmp = tempfile.mkstemp(dir=path.parent, suffix=".tmp")
    try:
        with os.fdopen(fd, "w", encoding="utf-8") as f:
            f.write(content)
        os.chmod(tmp, 0o644)
        os.replace(tmp, path)
    except Exception:
        try:
            os.unlink(tmp)
        except OSError:
            pass
        raise


# --------------------------------------------------------------------------
# 공개 API
# --------------------------------------------------------------------------


def inspect_state(task_id: str, *, workspace: Optional[Path] = None) -> dict:
    """.tasks/state/{task_id}.json 검사.

    Returns
    -------
    dict
        {
            "exists": bool,
            "readable": bool,
            "json_valid": bool,
            "checksum_present": bool,
            "checksum_match": bool,
            "current_state": str | None,
            "issues": [str, ...]
        }
    """
    work_root = _workspace_root(workspace)
    state_path = work_root / STATE_DIR_REL / f"{task_id}.json"
    issues: list[str] = []
    result: dict = {
        "exists": False,
        "readable": False,
        "json_valid": False,
        "checksum_present": False,
        "checksum_match": False,
        "current_state": None,
        "issues": issues,
    }

    if not state_path.exists():
        issues.append(f"state 파일 없음: {state_path}")
        return result

    result["exists"] = True

    try:
        content = state_path.read_text(encoding="utf-8")
        result["readable"] = True
    except Exception as exc:
        issues.append(f"state 파일 읽기 실패: {exc}")
        return result

    try:
        state = json.loads(content)
        result["json_valid"] = True
    except json.JSONDecodeError as exc:
        issues.append(f"state 파일 JSON 파싱 실패: {exc}")
        return result

    # current_state 추출
    result["current_state"] = state.get("state") or state.get("status")

    # checksum 검증
    stored_checksum = state.get("_checksum")
    if not stored_checksum:
        issues.append("_checksum 필드 없음")
        return result

    result["checksum_present"] = True
    computed = _compute_checksum(state)

    if computed != stored_checksum:
        issues.append(
            f"checksum mismatch: stored={stored_checksum[:12]}..., computed={computed[:12]}..."
        )
    else:
        result["checksum_match"] = True

    return result


def backup_state_file(task_id: str, *, workspace: Optional[Path] = None) -> dict:
    """.tasks/state/{task_id}.json의 sha256 + 원본 백업 보존.

    백업 위치: .tasks/state/.backups/{task_id}-{timestamp}.json

    Returns
    -------
    dict
        {"ok": bool, "backup_path": str, "sha256": str, "ts": str}
    """
    work_root = _workspace_root(workspace)
    state_path = work_root / STATE_DIR_REL / f"{task_id}.json"
    ts = _now_ts_compact()

    if not state_path.exists():
        return {
            "ok": False,
            "backup_path": "",
            "sha256": "",
            "ts": ts,
            "reason": f"state 파일 없음: {state_path}",
        }

    try:
        content = state_path.read_bytes()
    except Exception as exc:
        return {
            "ok": False,
            "backup_path": "",
            "sha256": "",
            "ts": ts,
            "reason": f"state 파일 읽기 실패: {exc}",
        }

    sha256 = _sha256_bytes(content)
    backup_dir = work_root / BACKUP_DIR_REL
    backup_dir.mkdir(parents=True, exist_ok=True)
    backup_path = backup_dir / f"{task_id}-{ts}.json"

    try:
        backup_path.write_bytes(content)
    except Exception as exc:
        return {
            "ok": False,
            "backup_path": str(backup_path),
            "sha256": sha256,
            "ts": ts,
            "reason": f"백업 파일 쓰기 실패: {exc}",
        }

    return {
        "ok": True,
        "backup_path": str(backup_path),
        "sha256": sha256,
        "ts": ts,
    }


def repair_state(
    task_id: str,
    *,
    approved_by_chairman: str,
    evidence_path: str,
    actor: str,
    repair_action: str,
    new_state: Optional[dict] = None,
    workspace: Optional[Path] = None,
) -> dict:
    """state 파일 수리 (chairman approval evidence 필수).

    fail-closed:
    - evidence_path 파일 부재 → reject
    - sha256 백업 실패 → reject
    - audit 기록 실패 → reject
    - repair 후 .verify-pending 마커 생성 (verify_consistency 호출 강제)

    Parameters
    ----------
    approved_by_chairman:
        chairman 승인자 식별자.
    evidence_path:
        approval evidence 파일 경로 (반드시 실제 파일 존재).
    actor:
        요청자.
    repair_action:
        "recompute_checksum" | "rollback_to_backup" | "manual_fixup"
    new_state:
        manual_fixup 시 새 state dict.

    Returns
    -------
    dict
        {"ok": bool, "reason": str, "audit_path": str, "backup_path": str}
    """
    work_root = _workspace_root(workspace)
    state_path = work_root / STATE_DIR_REL / f"{task_id}.json"

    # 1. evidence_path 파일 존재 확인 (fail-closed)
    ev_path = Path(evidence_path)
    if not ev_path.is_absolute():
        ev_path = work_root / ev_path
    if not ev_path.exists():
        return {
            "ok": False,
            "reason": f"evidence_path 파일 없음: {ev_path} — repair 거부 (fail-closed)",
            "audit_path": "",
            "backup_path": "",
        }

    # 2. sha256 백업 (fail-closed)
    backup_result = backup_state_file(task_id, workspace=workspace)
    if not backup_result["ok"]:
        return {
            "ok": False,
            "reason": f"sha256 백업 실패: {backup_result.get('reason', '')} — repair 거부 (fail-closed)",
            "audit_path": "",
            "backup_path": "",
        }

    input_sha256 = backup_result["sha256"]
    backup_path = backup_result["backup_path"]

    # Gemini 리뷰 medium: .verify-pending 마커를 state 파일 수정 BEFORE 생성.
    # 만약 state mutation 후 marker 생성 시점에 실패하면, state는 repair 완료된 상태인데
    # marker가 없어 verify-consistency를 스킵할 수 있다 (bypass 위험).
    # 따라서 marker를 먼저 생성 → 실패 시 repair 자체 중단. marker 성공 후에만 state 수정.
    marker_path = work_root / STATE_DIR_REL / f"{VERIFY_PENDING_PREFIX}{task_id}"
    try:
        marker_path.parent.mkdir(parents=True, exist_ok=True)
        marker_path.write_text(
            json.dumps(
                {
                    "task_id": task_id,
                    "repair_action": repair_action,
                    "actor": actor,
                    "ts": _now_iso(),
                    "stage": "pre-repair",
                },
                ensure_ascii=False,
            ),
            encoding="utf-8",
        )
    except Exception as exc:
        return {
            "ok": False,
            "reason": f"verify-pending 마커 사전 생성 실패: {exc} — repair 중단 (fail-closed)",
            "audit_path": "",
            "backup_path": backup_path,
        }

    # 3. repair_action 실행
    output_sha256 = ""
    repair_ok = False
    repair_reason = ""

    try:
        if repair_action == "recompute_checksum":
            # 기존 state 로드 → _checksum 재계산 → write
            if not state_path.exists():
                repair_reason = "state 파일 없음 (recompute_checksum 불가)"
            else:
                state = json.loads(state_path.read_text(encoding="utf-8"))
                state["_checksum"] = _compute_checksum(state)
                _atomic_write_json(state_path, state)
                output_sha256 = _sha256_file(state_path)
                repair_ok = True
                repair_reason = "checksum 재계산 완료"

        elif repair_action == "rollback_to_backup":
            # 가장 최신 backup 파일 복원
            backup_dir = work_root / BACKUP_DIR_REL
            backups = sorted(backup_dir.glob(f"{task_id}-*.json"), reverse=True)
            # 현재 방금 생성한 백업 제외하고 이전 백업 사용
            prev_backups = [b for b in backups if str(b) != backup_path]
            if not prev_backups:
                repair_reason = "복원할 이전 backup 파일 없음"
            else:
                latest_backup = prev_backups[0]
                content = latest_backup.read_bytes()
                state_path.write_bytes(content)
                output_sha256 = _sha256_file(state_path)
                repair_ok = True
                repair_reason = f"backup 복원 완료: {latest_backup.name}"

        elif repair_action == "manual_fixup":
            # new_state로 덮어쓰기 (위험: evidence 강력 검증)
            if new_state is None:
                repair_reason = "manual_fixup에 new_state 필수"
            else:
                # new_state에 _checksum 추가
                new_state["_checksum"] = _compute_checksum(new_state)
                _atomic_write_json(state_path, new_state)
                output_sha256 = _sha256_file(state_path)
                repair_ok = True
                repair_reason = "manual_fixup 완료 (chairman approval 적용)"
        else:
            repair_reason = f"알 수 없는 repair_action: '{repair_action}'"

    except Exception as exc:
        repair_reason = f"repair 실행 중 예외: {exc}"
        repair_ok = False

    # repair 실패 시 즉시 반환 (audit은 반드시 기록)
    result_str = "ok" if repair_ok else "rejected"

    # 4. .verify-pending 마커는 이미 사전(pre-repair) 단계에서 생성됨.
    # repair 성공 시 마커 메타데이터 update (stage 표시), 실패 시 그대로 유지.
    if repair_ok:
        try:
            marker_path.write_text(
                json.dumps(
                    {
                        "task_id": task_id,
                        "repair_action": repair_action,
                        "actor": actor,
                        "ts": _now_iso(),
                        "stage": "post-repair",
                    },
                    ensure_ascii=False,
                ),
                encoding="utf-8",
            )
        except Exception:
            # post-repair 메타 update 실패는 fail-closed 아님 — marker는 이미 존재 (verify-consistency 차단 유지).
            pass

    # 5. audit 기록 (성공/실패 모두 기록, 실패 시 fail-closed)
    try:
        audit_path = record_repair_audit(
            task_id=task_id,
            actor=actor,
            repair_action=repair_action,
            approved_by_chairman=approved_by_chairman,
            evidence_path=str(ev_path),
            input_state_sha256=input_sha256,
            output_state_sha256=output_sha256,
            backup_path=backup_path,
            result=result_str,
            reason=repair_reason,
            workspace=workspace,
        )
    except Exception as exc:
        # audit 기록 실패 → fail-closed
        return {
            "ok": False,
            "reason": f"audit 기록 실패: {exc} — fail-closed",
            "audit_path": "",
            "backup_path": backup_path,
        }

    return {
        "ok": repair_ok,
        "reason": repair_reason,
        "audit_path": str(audit_path),
        "backup_path": backup_path,
    }


def verify_consistency(task_id: str, *, workspace: Optional[Path] = None) -> dict:
    """repair 후 호출되어야 하는 후속 검증.

    체크:
    - state file checksum 일치
    - .verify-pending 마커 제거 (consistency 통과 시)
    - FAIL 시 마커 유지

    Returns
    -------
    dict
        {"ok": bool, "reason": str, "checks": {...}}
    """
    work_root = _workspace_root(workspace)
    marker_path = work_root / STATE_DIR_REL / f"{VERIFY_PENDING_PREFIX}{task_id}"
    checks: dict = {}

    # state inspect
    inspection = inspect_state(task_id, workspace=workspace)
    checks["state_inspect"] = inspection

    if not inspection["exists"]:
        return {
            "ok": False,
            "reason": "state 파일 없음 — verify-consistency FAIL",
            "checks": checks,
        }

    if not inspection["json_valid"]:
        return {
            "ok": False,
            "reason": "state 파일 JSON 파싱 실패 — verify-consistency FAIL",
            "checks": checks,
        }

    if not inspection["checksum_present"]:
        return {
            "ok": False,
            "reason": "_checksum 필드 없음 — verify-consistency FAIL",
            "checks": checks,
        }

    if not inspection["checksum_match"]:
        return {
            "ok": False,
            "reason": f"checksum mismatch — verify-consistency FAIL. issues: {inspection['issues']}",
            "checks": checks,
        }

    # 모든 검증 통과 → .verify-pending 마커 제거
    checks["checksum_ok"] = True
    if marker_path.exists():
        try:
            marker_path.unlink()
            checks["verify_pending_removed"] = True
        except Exception as exc:
            checks["verify_pending_removed"] = False
            return {
                "ok": False,
                "reason": f"verify-pending 마커 제거 실패: {exc}",
                "checks": checks,
            }
    else:
        checks["verify_pending_removed"] = False
        checks["verify_pending_note"] = "마커가 없어 제거 불필요 (정상)"

    return {
        "ok": True,
        "reason": "verify-consistency PASS: checksum 일치, verify-pending 마커 제거",
        "checks": checks,
    }


def record_repair_audit(
    *,
    task_id: str,
    actor: str,
    repair_action: str,
    approved_by_chairman: str,
    evidence_path: str,
    input_state_sha256: str,
    output_state_sha256: str,
    backup_path: str,
    result: str,
    reason: str,
    workspace: Optional[Path] = None,
) -> Path:
    """checksum repair audit 기록.

    memory/orchestration-audit/checksum-repair.jsonl 에 line append.
    필수 필드: task_id, actor, repair_action, approved_by_chairman, evidence_path,
    input_state_sha256, output_state_sha256, backup_path, result, reason, timestamp, evidence_hash
    """
    timestamp = _now_iso()
    base_record: dict = {
        "task_id": task_id,
        "actor": actor,
        "repair_action": repair_action,
        "approved_by_chairman": approved_by_chairman,
        "evidence_path": evidence_path,
        "input_state_sha256": input_state_sha256,
        "output_state_sha256": output_state_sha256,
        "backup_path": backup_path,
        "result": result,
        "reason": reason,
        "timestamp": timestamp,
    }
    ev_hash = _evidence_hash_str(base_record)
    record = {**base_record, "evidence_hash": ev_hash}

    work_root = _workspace_root(workspace)
    target = work_root / AUDIT_JSONL_REL
    target.parent.mkdir(parents=True, exist_ok=True)

    line = json.dumps(record, ensure_ascii=False) + "\n"
    fd = os.open(str(target), os.O_WRONLY | os.O_APPEND | os.O_CREAT, 0o644)
    try:
        os.write(fd, line.encode("utf-8"))
    finally:
        os.close(fd)

    return target


# ===========================================================================
# task-2472+1: reconcile 모듈 (state_orphaned_after_valid_merge)
# ---------------------------------------------------------------------------
# state file missing + PR merged + valid .done 상태(state_orphaned_after_valid_merge)를
# silent corruption 없이 정합화한다.
#
# ENUM:
# - state_orphaned_after_valid_merge: PR merge + .done valid + state file missing (1차 지원)
# - state_corrupted_with_partial_merge / state_inconsistent_after_force_push /
#   state_missing_after_taskctl_crash: enum 인식만 (1차 reject)
#
# audit jsonl schema (14 fields): task_id, ts, classification, pr, merge_commit,
#   origin_main_head, ancestry, done_path, done_sha256, g3_fail_classification,
#   actor, evidence_path, approved_by_chairman, decision
# ===========================================================================


# ---------------------------------------------------------------------------
# 상수 (reconcile)
# ---------------------------------------------------------------------------

RECONCILE_CLASSIFICATIONS: set[str] = {
    "state_orphaned_after_valid_merge",
    "state_corrupted_with_partial_merge",
    "state_inconsistent_after_force_push",
    "state_missing_after_taskctl_crash",
}

# 1차 실 동작 지원 enum (그 외는 인식만 + reject)
SUPPORTED_CLASSIFICATIONS: set[str] = {
    "state_orphaned_after_valid_merge",
}

RECONCILE_AUDIT_FIELDS: tuple[str, ...] = (
    "task_id", "ts", "classification", "pr", "merge_commit",
    "origin_main_head", "ancestry", "done_path", "done_sha256",
    "g3_fail_classification", "actor", "evidence_path",
    "approved_by_chairman", "decision",
)

# state 합성 시 사용할 transitions 시퀀스 (CREATED → DONE)
_RECONCILE_TRANSITIONS: tuple[str, ...] = (
    "CREATED", "DISPATCHED", "ACKED", "RUNNING", "COMMITTED",
    "PR_OPEN", "CI_PENDING", "GEMINI_PENDING", "REVIEW_READY",
    "VERIFIED", "HUMAN_APPROVED", "MERGING", "MERGED", "DONE",
)


# ---------------------------------------------------------------------------
# 내부 헬퍼 (reconcile 전용 — taskctl.py와 동일한 canonical_json/checksum 사용)
# ---------------------------------------------------------------------------


def _reconcile_actor_str() -> str:
    """현재 git user 또는 OS user 문자열."""
    try:
        user = getpass.getuser()
    except Exception:
        user = os.environ.get("USER", "unknown")
    try:
        proc = subprocess.run(
            ["git", "config", "user.email"],
            capture_output=True, text=True, timeout=5,
        )
        email = proc.stdout.strip() or "unknown@local"
    except Exception:
        email = "unknown@local"
    return f"{user} <{email}>"


def _reconcile_canonical_json(state: dict) -> str:
    """_checksum 제외 후 sort_keys + compact separators JSON 직렬화.

    taskctl.py의 _canonical_json과 동일 알고리즘을 사용한다 (compact separators).
    이 모듈 본체의 _compute_checksum은 default separators를 사용하므로,
    reconcile 결과 state 파일은 반드시 이 함수로 checksum을 계산해야 taskctl이 인식한다.
    """
    payload = {k: v for k, v in state.items() if k != "_checksum"}
    return json.dumps(payload, ensure_ascii=False, sort_keys=True, separators=(",", ":"))


def _reconcile_compute_checksum(state: dict) -> str:
    """sha256(_reconcile_canonical_json(state)) — taskctl.py 호환."""
    return hashlib.sha256(_reconcile_canonical_json(state).encode("utf-8")).hexdigest()


def _reconcile_new_state(task_id: str) -> dict:
    """taskctl.py의 _new_state와 동일 schema로 초기 state dict 생성."""
    return {
        "task_id": task_id,
        "current_state": "CREATED",
        "transitions": [],
        "evidence": {
            "git_diff_sha": None,
            "changed_paths": [],
            "branch": None,
            "pr_number": None,
            "pr_state": None,
            "ci_checks": {},
            "guard_sh_result": None,
            "qc_report_guard_result": None,
            "merge_timestamp": None,
            "exit_codes": {},
        },
        "human_approved": False,
        "bypass": {"used": False, "ts": None, "actor": None},
        "admin_override": {
            "used": False,
            "ts": None,
            "actor": None,
            "reason": None,
            "audit_log_offset": None,
        },
    }


def _reconcile_save_state(state: dict, state_dir: Path) -> None:
    """atomic write (tmp + rename). reconcile checksum 재계산 포함."""
    state_dir.mkdir(parents=True, exist_ok=True)
    state.pop("_checksum", None)
    state["_checksum"] = _reconcile_compute_checksum(state)
    task_id = state["task_id"]
    p = state_dir / f"{task_id}.json"
    tmp = p.with_suffix(".json.tmp")
    tmp.write_text(json.dumps(state, ensure_ascii=False, indent=2), encoding="utf-8")
    tmp.replace(p)


def _reconcile_load_state(task_id: str, state_dir: Path) -> Optional[dict]:
    """state 파일 읽기 + reconcile checksum 검증. 파일 없거나 mismatch 시 None."""
    p = state_dir / f"{task_id}.json"
    if not p.exists():
        return None
    try:
        state = json.loads(p.read_text(encoding="utf-8"))
    except (json.JSONDecodeError, OSError):
        return None
    stored = state.pop("_checksum", None)
    expected = _reconcile_compute_checksum(state)
    if stored != expected:
        return None
    state["_checksum"] = stored
    return state


# ---------------------------------------------------------------------------
# 공개 API: validate_done_payload
# ---------------------------------------------------------------------------


def validate_done_payload(done_path: Path) -> dict:
    """.done JSON을 읽고 두 포맷 중 하나에 부합하는지 검증.

    지원 포맷:
    - task-timer 포맷: ``task_id`` + ``qc_result`` 필드 + ``qc_result == "PASS"`` 필수
    - taskctl done 포맷: ``task_id`` + ``merge_commit_sha`` 필드 (둘 다 truthy)

    Returns
    -------
    dict
        ``{"ok": bool, "reason": str, "schema_version": str, "payload": dict}``
    """
    if not done_path.exists():
        return {
            "ok": False,
            "reason": f".done 파일 없음: {done_path}",
            "schema_version": "unknown",
            "payload": {},
        }

    try:
        raw = done_path.read_text(encoding="utf-8")
        payload = json.loads(raw)
    except (json.JSONDecodeError, OSError) as exc:
        return {
            "ok": False,
            "reason": f".done JSON 파싱 실패: {exc}",
            "schema_version": "unknown",
            "payload": {},
        }

    if not isinstance(payload, dict):
        return {
            "ok": False,
            "reason": ".done payload가 dict가 아님",
            "schema_version": "unknown",
            "payload": {},
        }

    task_id = payload.get("task_id")

    # task-timer 포맷 검사
    qc_result = payload.get("qc_result")
    if task_id and qc_result:
        if qc_result == "PASS":
            return {
                "ok": True,
                "reason": "task-timer 포맷 PASS (qc_result=PASS)",
                "schema_version": "task_timer",
                "payload": payload,
            }
        else:
            return {
                "ok": False,
                "reason": f"task-timer 포맷: qc_result={qc_result} (PASS 필요)",
                "schema_version": "task_timer",
                "payload": payload,
            }

    # taskctl done 포맷 검사
    merge_commit_sha = payload.get("merge_commit_sha")
    if task_id and merge_commit_sha:
        return {
            "ok": True,
            "reason": "taskctl done 포맷 PASS (task_id + merge_commit_sha 존재)",
            "schema_version": "taskctl_done",
            "payload": payload,
        }

    return {
        "ok": False,
        "reason": (
            f".done 포맷 불일치: task_id={task_id!r}, "
            f"qc_result={payload.get('qc_result')!r}, "
            f"merge_commit_sha={payload.get('merge_commit_sha')!r} — "
            "task-timer(task_id+qc_result=PASS) 또는 taskctl_done(task_id+merge_commit_sha) 필요"
        ),
        "schema_version": "unknown",
        "payload": payload,
    }


# ---------------------------------------------------------------------------
# 공개 API: compute_done_sha256
# ---------------------------------------------------------------------------


def compute_done_sha256(done_path: Path) -> str:
    """파일 바이트 sha256 계산. 실패 시 빈 문자열."""
    try:
        data = done_path.read_bytes()
        return hashlib.sha256(data).hexdigest()
    except OSError:
        return ""


# ---------------------------------------------------------------------------
# 공개 API: verify_audit_sha256_match
# ---------------------------------------------------------------------------


def verify_audit_sha256_match(
    done_path: Path,
    audit_path: Path,
    task_id: str,
) -> dict:
    """state-recovery.jsonl 마지막 N줄 중 task_id 일치 라인의 done_sha256 필드와
    현재 .done sha256을 비교 (역방향 탐색).

    Returns
    -------
    dict
        ``{"ok": bool, "reason": str, "done_sha256": str, "audit_sha256": str}``
    """
    current_sha = compute_done_sha256(done_path)
    if not current_sha:
        return {
            "ok": False,
            "reason": f".done 파일 sha256 계산 실패: {done_path}",
            "done_sha256": "",
            "audit_sha256": "",
        }

    if not audit_path.exists():
        return {
            "ok": False,
            "reason": f"audit 파일 없음: {audit_path}",
            "done_sha256": current_sha,
            "audit_sha256": "",
        }

    try:
        lines = audit_path.read_text(encoding="utf-8").splitlines()
    except OSError as exc:
        return {
            "ok": False,
            "reason": f"audit 파일 읽기 실패: {exc}",
            "done_sha256": current_sha,
            "audit_sha256": "",
        }

    audit_entry: Optional[dict] = None
    for line in reversed(lines):
        line = line.strip()
        if not line:
            continue
        try:
            obj = json.loads(line)
        except json.JSONDecodeError:
            continue
        if obj.get("task_id") == task_id and "done_sha256" in obj:
            audit_entry = obj
            break

    if audit_entry is None:
        return {
            "ok": False,
            "reason": f"audit에 task_id={task_id!r} 일치 + done_sha256 필드 있는 라인 없음",
            "done_sha256": current_sha,
            "audit_sha256": "",
        }

    audit_sha = audit_entry["done_sha256"]
    if current_sha != audit_sha:
        return {
            "ok": False,
            "reason": f"sha256 불일치: done_sha256={current_sha}, audit_sha256={audit_sha}",
            "done_sha256": current_sha,
            "audit_sha256": audit_sha,
        }

    return {
        "ok": True,
        "reason": "done_sha256 일치",
        "done_sha256": current_sha,
        "audit_sha256": audit_sha,
    }


# ---------------------------------------------------------------------------
# 공개 API: synthesize_state_for_orphaned_merge
# ---------------------------------------------------------------------------


def synthesize_state_for_orphaned_merge(
    task_id: str,
    pr_number: Any,
    merge_commit_sha: str,
    *,
    merge_timestamp: Optional[str] = None,
) -> dict:
    """state_orphaned_after_valid_merge 케이스 — state 재구성.

    _reconcile_new_state(task_id)를 만든 후 _RECONCILE_TRANSITIONS 시퀀스를
    합성하여 transitions 목록을 채우고, current_state="DONE"으로 마무리.

    각 transition entry에는 ``meta={"reconciled": True}`` 포함.
    evidence dict에 pr_number, merge_commit_sha, merge_timestamp 채움.
    """
    state = _reconcile_new_state(task_id)
    ts_now = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
    merge_ts = merge_timestamp or ts_now
    actor = "reconcile <taskctl@reconcile>"

    transitions: list[dict] = []
    for i, to_state in enumerate(_RECONCILE_TRANSITIONS[1:], start=1):
        from_state = _RECONCILE_TRANSITIONS[i - 1]
        entry: dict = {
            "from": from_state,
            "to": to_state,
            "ts": ts_now,
            "actor": actor,
            "command": "taskctl reconcile (synthesized)",
            "exit_code": 0,
            "evidence_path": f".tasks/evidence/{task_id}",
            "meta": {"reconciled": True},
        }
        transitions.append(entry)

    state["transitions"] = transitions
    state["current_state"] = "DONE"
    state["human_approved"] = True
    state["evidence"]["pr_number"] = int(pr_number) if pr_number is not None else None
    state["evidence"]["pr_state"] = "MERGED"
    state["evidence"]["merge_timestamp"] = merge_ts
    state["evidence"]["merge_commit_sha"] = merge_commit_sha
    state["reconciled"] = True
    state["reconcile_ts"] = ts_now
    state["reconcile_classification"] = "state_orphaned_after_valid_merge"

    return state


# ---------------------------------------------------------------------------
# 공개 API: append_reconcile_audit
# ---------------------------------------------------------------------------


def append_reconcile_audit(workspace: Path, record: dict) -> Path:
    """reconcile audit jsonl(state-recovery.jsonl)에 14필드 레코드를 atomic append.

    Raises
    ------
    ValueError
        필수 14필드 중 빠진 필드가 있으면 발생.
    """
    missing = [f for f in RECONCILE_AUDIT_FIELDS if f not in record]
    if missing:
        raise ValueError(f"audit record 필수 필드 누락: {missing}")

    audit_path = workspace / "memory" / "orchestration-audit" / "state-recovery.jsonl"
    audit_path.parent.mkdir(parents=True, exist_ok=True)

    line = json.dumps(record, ensure_ascii=False) + "\n"

    fd = os.open(str(audit_path), os.O_WRONLY | os.O_APPEND | os.O_CREAT, 0o644)
    try:
        os.write(fd, line.encode("utf-8"))
        os.fsync(fd)
    finally:
        os.close(fd)

    return audit_path


# ---------------------------------------------------------------------------
# 공개 API: reconcile_orphaned_merge (핵심 함수, 10단계 검증)
# ---------------------------------------------------------------------------


def reconcile_orphaned_merge(
    task_id: str,
    pr_number: Any,
    merge_commit_sha: str,
    evidence_path: str,
    *,
    repo: str = "Jeon-Jonghyuk/dev_workspace",
    workspace: Optional[Path] = None,
    approved_by_chairman: bool = False,
    gh_cmd: Optional[list] = None,
    state_dir_override: Optional[Path] = None,
    events_dir_override: Optional[Path] = None,
    audit_path_override: Optional[Path] = None,
) -> dict:
    """state_orphaned_after_valid_merge 케이스 reconcile (10단계 fail-closed).

    Returns
    -------
    dict
        ``{"ok": bool, "step_failed": int | None, "reason": str,
           "audit_path": str, "state_path": str, "detail": dict}``
    """
    ws = workspace or Path(os.environ.get("WORKSPACE_ROOT", "/home/jay/workspace"))
    state_dir = state_dir_override or (ws / ".tasks" / "state")
    events_dir = events_dir_override or (ws / "memory" / "events")
    audit_path = audit_path_override or (ws / "memory" / "orchestration-audit" / "state-recovery.jsonl")

    detail: dict[str, Any] = {
        "task_id": task_id,
        "pr_number": pr_number,
        "merge_commit_sha": merge_commit_sha,
        "repo": repo,
        "workspace": str(ws),
        "steps": {},
    }

    def _fail(step: int, reason: str) -> dict:
        detail["steps"][f"step_{step}"] = {"ok": False, "reason": reason}
        return {
            "ok": False,
            "step_failed": step,
            "reason": reason,
            "audit_path": str(audit_path),
            "state_path": str(state_dir / f"{task_id}.json"),
            "detail": detail,
        }

    if not approved_by_chairman:
        return _fail(0, "회장 승인(--approved-by-chairman) 필수")

    # ── Step 1: PR state=MERGED 검증 ──
    _gh = list(gh_cmd) if gh_cmd else ["gh"]
    try:
        proc = subprocess.run(
            _gh + ["pr", "view", str(pr_number), "--repo", repo,
                   "--json", "state,mergedAt,mergeCommit"],
            capture_output=True, text=True, timeout=30, shell=False,
        )
        if proc.returncode != 0:
            return _fail(1, f"gh pr view 실패 (exit={proc.returncode}): {proc.stderr.strip()[:300]}")
        pr_data = json.loads(proc.stdout)
    except subprocess.TimeoutExpired:
        return _fail(1, "gh pr view timeout")
    except (json.JSONDecodeError, OSError) as exc:
        return _fail(1, f"gh pr view JSON 파싱 실패: {exc}")

    if pr_data.get("state") != "MERGED":
        return _fail(1, f"PR state={pr_data.get('state')} (MERGED 필요)")

    merged_at = pr_data.get("mergedAt") or ""
    gh_merge_commit = (pr_data.get("mergeCommit") or {}).get("oid") or ""
    detail["steps"]["step_1"] = {
        "ok": True,
        "state": pr_data.get("state"),
        "mergedAt": merged_at,
        "gh_merge_commit": gh_merge_commit,
    }

    # ── Step 2: mergeCommit ancestor of origin/main ──
    try:
        from utils.silent_corruption_guard import check_origin_main_ancestry  # type: ignore[import]
        ancestry_result = check_origin_main_ancestry(merge_commit_sha, cwd=ws)
    except ImportError:
        try:
            proc2 = subprocess.run(
                ["git", "merge-base", "--is-ancestor", merge_commit_sha, "origin/main"],
                capture_output=True, text=True, timeout=30, cwd=str(ws),
            )
            ok2 = proc2.returncode == 0
            ancestry_result = {
                "ok": ok2,
                "reason": "ancestry PASS" if ok2 else "merge_commit not ancestor of origin/main",
                "detail": {"merge_commit_sha": merge_commit_sha, "origin_sha": ""},
            }
        except Exception as exc:
            return _fail(2, f"ancestry 검증 실패: {exc}")

    if not ancestry_result["ok"]:
        return _fail(2, f"origin/main ancestry 검증 실패: {ancestry_result['reason']}")

    origin_main_head = ancestry_result["detail"].get("origin_sha", "")
    detail["steps"]["step_2"] = {
        "ok": True,
        "origin_main_head": origin_main_head,
        "ancestry": "PASS",
    }

    # ── Step 3: .done payload schema 검증 ──
    done_path = events_dir / f"{task_id}.done"
    done_result = validate_done_payload(done_path)
    if not done_result["ok"]:
        return _fail(3, f".done payload 검증 실패: {done_result['reason']}")
    detail["steps"]["step_3"] = {
        "ok": True,
        "schema_version": done_result["schema_version"],
        "done_path": str(done_path),
    }

    # ── Step 4: .done sha256 일치 (audit jsonl 대조) ──
    sha256_result = verify_audit_sha256_match(done_path, audit_path, task_id)
    if not sha256_result["ok"]:
        return _fail(4, f".done sha256 불일치: {sha256_result['reason']}")
    done_sha256 = sha256_result["done_sha256"]
    detail["steps"]["step_4"] = {
        "ok": True,
        "done_sha256": done_sha256,
    }

    # ── Step 5: .g3-fail 자동 분류 ──
    g3_fail_path = events_dir / f"{task_id}.g3-fail"
    report_path = ws / "memory" / "reports" / f"{task_id}.md"

    g3_classification: str
    if not g3_fail_path.exists():
        g3_classification = "no_g3_fail"
    else:
        try:
            from utils.g3_fail_classifier import classify_g3_fail  # type: ignore[import]
            g3_classification = classify_g3_fail(g3_fail_path, report_path)
        except ImportError:
            return _fail(5, "g3_fail_classifier import 실패 (utils/g3_fail_classifier.py 확인)")

        if g3_classification != "false_alert_resolved_by_late_report":
            return _fail(
                5,
                f".g3-fail 분류 = {g3_classification!r} (reconcile 차단 — "
                "false_alert_resolved_by_late_report만 허용)",
            )

    detail["steps"]["step_5"] = {
        "ok": True,
        "g3_fail_classification": g3_classification,
        "g3_fail_path": str(g3_fail_path),
    }

    # ── Step 6: state 파일 재구성 ──
    synthesized = synthesize_state_for_orphaned_merge(
        task_id, pr_number, merge_commit_sha, merge_timestamp=merged_at or None
    )
    detail["steps"]["step_6"] = {
        "ok": True,
        "current_state": synthesized["current_state"],
        "transitions_count": len(synthesized["transitions"]),
    }

    # ── Step 7: checksum 생성 (taskctl 호환) ──
    synthesized["_checksum"] = _reconcile_compute_checksum(synthesized)
    detail["steps"]["step_7"] = {
        "ok": True,
        "checksum": synthesized["_checksum"],
    }

    # ── Step 8: audit 14필드 jsonl 기록 ──
    audit_record = {
        "task_id": task_id,
        "ts": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"),
        "classification": "state_orphaned_after_valid_merge",
        "pr": int(pr_number) if pr_number is not None else pr_number,
        "merge_commit": merge_commit_sha,
        "origin_main_head": origin_main_head,
        "ancestry": "PASS",
        "done_path": str(done_path),
        "done_sha256": done_sha256,
        "g3_fail_classification": g3_classification,
        "actor": _reconcile_actor_str(),
        "evidence_path": evidence_path,
        "approved_by_chairman": approved_by_chairman,
        "decision": "reconcile_state_created",
    }

    # audit_path_override 사용 시 직접 append (workspace 기반 default 경로 우회)
    try:
        if audit_path_override is not None:
            missing = [f for f in RECONCILE_AUDIT_FIELDS if f not in audit_record]
            if missing:
                return _fail(8, f"audit record 필수 필드 누락: {missing}")
            audit_path_override.parent.mkdir(parents=True, exist_ok=True)
            line = json.dumps(audit_record, ensure_ascii=False) + "\n"
            fd = os.open(str(audit_path_override), os.O_WRONLY | os.O_APPEND | os.O_CREAT, 0o644)
            try:
                os.write(fd, line.encode("utf-8"))
                os.fsync(fd)
            finally:
                os.close(fd)
            recorded_audit_path = audit_path_override
        else:
            recorded_audit_path = append_reconcile_audit(ws, audit_record)
    except (ValueError, OSError) as exc:
        return _fail(8, f"audit jsonl 기록 실패: {exc}")

    detail["steps"]["step_8"] = {
        "ok": True,
        "audit_path": str(recorded_audit_path),
        "audit_record_ts": audit_record["ts"],
    }

    # ── Step 9 (사전): state 파일 저장 ──
    try:
        _reconcile_save_state(synthesized, state_dir)
    except OSError as exc:
        return _fail(9, f"state 파일 저장 실패: {exc}")

    state_path = state_dir / f"{task_id}.json"

    # ── Step 9: verify-consistency ──
    reloaded = _reconcile_load_state(task_id, state_dir)
    if reloaded is None:
        return _fail(9, "verify-consistency 실패: 재구성 state load 불가 또는 checksum 불일치")
    if reloaded.get("current_state") != "DONE":
        return _fail(9, f"verify-consistency 실패: current_state={reloaded.get('current_state')!r} (DONE 필요)")

    detail["steps"]["step_9"] = {
        "ok": True,
        "verify_consistency": "PASS",
        "current_state": reloaded["current_state"],
        "checksum_match": True,
    }

    # ── Step 10: marker read-only 유지 ──
    detail["steps"]["step_10"] = {
        "ok": True,
        "note": "marker read-only 유지 — reconcile은 .done/.g3-fail 등 marker 변경 없음",
    }

    return {
        "ok": True,
        "step_failed": None,
        "reason": "reconcile 성공 (10단계 PASS)",
        "audit_path": str(recorded_audit_path),
        "state_path": str(state_path),
        "detail": detail,
    }
