"""utils/operational_collector_wiring.py

task-2553+25 — OPERATIONAL COLLECTOR WIRING.

목적 (회장 verbatim, task-2553+25 §2):
  normal completion callback collector **durable-success 직후**
  ``run_operational_cancel_seam(operational=True)`` 를 **1회 호출**하도록
  운영 collector 경로에 결선한다. md 박제가 아닌 실행 결선.

결선 위치·방식 (§4 / 9-R.2 — additive seam-call only, frozen byte-0):
  * 본 모듈은 **신규 strict-additive wrapper** 이다.
    ``utils/anu_delegation_completion_callback.py`` (frozen anchor,
    sha256 83b3e307…) 를 **read-only import** 만 하며 1 byte 도 수정하지
    않는다. utils/completion_callback_operational_cancel_seam (+23 산출),
    utils/live_cron_state_verifier (+23 산출), +9a 분리모듈도 무수정.
  * 결선 call-site = ``run_operational_completion_callback_collector``.
    이 wrapper 는 frozen collector ``run_completion_callback_collector``
    를 호출하여 그것이 반환한 **공개 ``CollectorResult``** 를 관찰한다.
    collector 가 ``closeout_candidate = (classification == PASS)`` 로
    durable-success 를 *판정한 그 결과* 를 frozen anchor **밖에서**
    읽으므로, frozen/forbidden collector 본체 편집·import 결합 0 으로
    durable-success 인접 결선이 성립한다 (9-R.2 hard-gate 통과 — frozen
    edit 유일경로 아님, HOLD 불요).
  * durable-success(PASS) 일 때만 +23 ``run_operational_cancel_seam``
    을 ``operational=True`` 로 **정확히 1회** invoke 한다. 그 외 분류
    (DUPLICATE_CALLBACK_IGNORED 포함) → seam 미진입 (무회귀).

exact-once (9-R.3 — seam-entry-time canonical token + atomic claim):
  * event identity 는 collector 가 durable-success 를 판정하는 그 시점에
    이미 손에 든 입력에서만 도출한다 — ``result.json`` 재독 의존 0.
    ``event_id = sha256(task_id + fallback_cron_id + dispatch-fired marker
    경로 + durable-success 판정 입력 해시)`` 로, seam 호출 시도 이전
    (pre-seam) 에 항상 존재·안정하다.
  * seam invoke **직전** ``event_id`` 기반 claim marker 를
    ``O_CREAT|O_EXCL|O_WRONLY`` 로 원자 선점한다. 선점 성공 시에만 seam
    1회 진입, 이미 존재(retries / duplicate callback / 복수 success
    branch / 동시 2호출)면 **즉시 no-op** (seam 미진입). claim marker 는
    ``task-2553+25.*`` 네임스페이스 (git-untracked batch-internal).

디커플 (§3 / §7 — collector success 와 seam 결과 완전 분리):
  * seam (혹은 wrapper 자체) 의 어떤 예외·skip·remove 실패도 frozen
    collector 가 반환한 ``CollectorResult`` 를 변경하지 않는다. wrapper
    는 collector 결과를 **그대로** 돌려준다. cron-remove 실패가 normal
    collector 성공을 실패로 바꾸지 않는다.

본 task 범위 (§6 / 9-R.4 — 실 운영 cron 비접촉):
  * 본 모듈의 구현·테스트·evidence 는 mock/fixture/격리 only. 실 운영
    cron 실제 삭제 0. 실 firing 은 결선완료·verifier PASS 후 운영단계
    동작이며 본 task 행위가 아니다.
"""
from __future__ import annotations

import errno
import hashlib
import json
import os
from dataclasses import dataclass, field
from pathlib import Path
from typing import Callable, Optional, Union

# frozen anchor — READ-ONLY import (수정·결합 0). public 진입점/타입만 사용.
from utils.anu_delegation_completion_callback import (  # pyright: ignore[reportMissingImports]  # noqa: E501
    CallbackInput,
    Classification,
    CollectorResult,
    PostResultReview,
    no_real_codex_post_result_review,
    run_completion_callback_collector,
)

# +23 산출 operational cancel seam — 유일 허용 결선 메커니즘 (9-R.2).
from utils.completion_callback_operational_cancel_seam import (  # pyright: ignore[reportMissingImports]  # noqa: E501
    OperationalCancelOutcome,
    run_operational_cancel_seam,
)

SCHEMA_CANCEL_AUDIT = "task-2553+25.cancel-audit_v1"

#: claim / cancel-audit marker 네임스페이스 prefix (git-untracked).
TASK_MARKER_PREFIX = "task-2553+25"


@dataclass
class OperationalSeamParams:
    """+23 ``run_operational_cancel_seam`` 호출에 필요한 운영 입력.

    운영 caller 가 결선 시점에 바인딩한다. 모든 경로는 격리/fixture 로
    주입 가능하며, ``cron_lister``/``remover`` 미주입 시 +23 seam 의
    운영 기본 DI (실 cokacdir) 가 쓰이나 — 본 task 구현/테스트는 항상
    fake/격리 주입으로만 검증한다 (§6 9-R.4).
    """

    target_cron_id: str  # = 사전등록 fallback callback cron id
    dispatch_fired_marker_path: Path
    result_json_path: Path
    report_path: Path
    collector_result_marker_path: Path
    audit_path: Optional[Path] = None
    cron_lister: Optional[Callable[[], dict]] = None
    remover: Optional[Callable[..., object]] = None
    fallback_cancelled_marker_path: Optional[Path] = None
    cancel_lock_path: Optional[Path] = None
    callback_contract: Optional[dict] = None


@dataclass
class OperationalWiringResult:
    """wrapper 1회 실행 결과.

    ``collector_result`` 는 frozen collector 가 반환한 값 **그대로**
    (디커플 — seam 결과로 변경 0). ``seam_invoked`` 가 True 일 때만
    ``seam_outcome`` 존재.
    """

    collector_result: CollectorResult
    durable_success: bool
    seam_invoked: bool
    seam_skipped_reason: str
    event_id: str
    claim_marker_path: Optional[str] = None
    seam_outcome: Optional[OperationalCancelOutcome] = None
    cancel_audit_path: Optional[str] = None
    cancel_audit: dict = field(default_factory=dict)
    notes: list = field(default_factory=list)


def _durable_success_input_hash(result: CollectorResult) -> str:
    """durable-success 판정 입력 해시 (9-R.3).

    collector 가 PASS 를 산출하는 데 쓰인 *결정 입력* 만 canonical JSON
    으로 직렬화해 sha256. ``result.json`` 재독 0 — frozen collector 가
    in-memory ``CallbackInput`` 에서 만들어 반환한 ``delegation_result``
    의 결정 필드만 사용한다 (변동 ts_utc 등 비결정 필드 제외).
    """
    dr = result.delegation_result
    # durable-success 를 *결정* 한 입력만 (artifact 경로 ack_marker_path 등
    # 비결정 필드 제외 — 동일 논리 이벤트는 ack 파일 인스턴스와 무관하게
    # 동일 event identity 를 가져야 한다, 9-R.3).
    decisive = {
        "classification": result.classification.value,
        "callback_type": dr.get("callback_type"),
        "callback_cron_id": dr.get("callback_cron_id"),
        "dispatch_cron_id": dr.get("dispatch_cron_id"),
        "required_closeout_markers": dr.get("required_closeout_markers"),
        "forbidden_action_postcheck": dr.get("forbidden_action_postcheck"),
        "preservation_anchors": dr.get("preservation_anchors"),
        "dev_sunset": dr.get("dev_sunset"),
    }
    canon = json.dumps(decisive, sort_keys=True, ensure_ascii=False)
    return hashlib.sha256(canon.encode("utf-8")).hexdigest()


def compute_event_id(
    *,
    task_id: str,
    fallback_cron_id: str,
    dispatch_fired_marker_path: Union[str, Path],
    result: CollectorResult,
) -> str:
    """seam-entry-time canonical token (9-R.3).

    pre-seam 항상 존재·안정. 입력은 전부 durable-success 판정 시점에
    손에 든 값 (result.json 재독 0).
    """
    tup = "\x1f".join(
        (
            task_id,
            fallback_cron_id,
            str(dispatch_fired_marker_path),
            _durable_success_input_hash(result),
        )
    )
    return hashlib.sha256(tup.encode("utf-8")).hexdigest()


def _atomic_claim(claim_path: Path, payload: dict) -> bool:
    """``O_CREAT|O_EXCL|O_WRONLY`` 원자 선점.

    True  = 선점 성공 (본 호출만 seam 1회 진입 허용).
    False = 이미 존재 (retries/duplicate/concurrent) → 즉시 no-op.
    """
    claim_path.parent.mkdir(parents=True, exist_ok=True)
    try:
        fd = os.open(
            str(claim_path), os.O_CREAT | os.O_EXCL | os.O_WRONLY, 0o644
        )
    except FileExistsError:
        return False
    except OSError as exc:  # pragma: no cover - 방어
        if exc.errno == errno.EEXIST:
            return False
        raise
    with os.fdopen(fd, "w", encoding="utf-8") as f:
        json.dump(payload, f, ensure_ascii=False, indent=2)
        f.flush()
        os.fsync(f.fileno())
    return True


def build_cancel_audit(
    *,
    event_id: str,
    target_cron_id: str,
    seam_invoked: bool,
    seam_skipped_reason: str,
    outcome: Optional[OperationalCancelOutcome],
) -> dict:
    """task-2553+25.cancel-audit_v1 레코드 (9-R.4 필수 필드 전수).

    필수: lookup_status / five_condition_results / remove_attempted /
    remove_result / skip_reason / already_removed_or_missing /
    normal_success_unchanged / event_id.
    """
    if outcome is None:
        return {
            "schema": SCHEMA_CANCEL_AUDIT,
            "event_id": event_id,
            "target_cron_id": target_cron_id,
            "seam_invoked": False,
            "lookup_status": "NOT_INVOKED",
            "five_condition_results": {
                "c1_task_id_match": None,
                "c2_chat_id_owned": None,
                "c3_role_fallback": None,
                "c4_marker_id_crosscheck": None,
                "c5_pending_not_fired_not_removed": None,
            },
            "remove_attempted": False,
            "remove_result": "NOT_ATTEMPTED",
            "skip_reason": seam_skipped_reason,
            "already_removed_or_missing": False,
            "normal_success_unchanged": True,
        }
    lv = outcome.live_verification or {}
    checks = lv.get("checks") or {}
    five = {
        "c1_task_id_match": checks.get("c1_task_id_match"),
        "c2_chat_id_owned": checks.get("c2_chat_id_owned"),
        "c3_role_fallback": checks.get("c3_role_fallback"),
        "c4_marker_id_crosscheck": checks.get("c4_marker_id_crosscheck"),
        "c5_pending_not_fired_not_removed": checks.get(
            "c5_pending_not_fired_not_removed"
        ),
    }
    sc = outcome.seam_classification
    already = sc in (
        "SKIP_LIVE_SKIP_ALREADY_REMOVED",
        "SKIP_LIVE_SKIP_ALREADY_FIRED",
        "SKIP_LIVE_SKIP_NOT_FOUND",
    )
    if outcome.cron_remove_invoked:
        remove_result = (
            "CANCELLED" if outcome.fallback_cancelled else "REMOVE_NONFATAL"
        )
    else:
        remove_result = "NOT_ATTEMPTED"
    return {
        "schema": SCHEMA_CANCEL_AUDIT,
        "event_id": event_id,
        "target_cron_id": outcome.target_cron_id,
        "seam_invoked": seam_invoked,
        "seam_classification": sc,
        "stage": outcome.stage,
        "lookup_status": lv.get("classification", outcome.stage),
        "five_condition_results": five,
        "remove_attempted": bool(outcome.cron_remove_invoked),
        "remove_result": remove_result,
        "fallback_cancelled": bool(outcome.fallback_cancelled),
        "remove_allowed_by_live_verifier": bool(
            outcome.remove_allowed_by_live_verifier
        ),
        "skip_reason": outcome.skip_reason,
        "already_removed_or_missing": already,
        # 디커플 증명 — seam 결과와 무관하게 항상 True.
        "normal_success_unchanged": bool(outcome.normal_success_preserved),
        "plus9a_decision": outcome.plus9a_decision,
        "ts_utc": outcome.ts_utc,
    }


def run_operational_completion_callback_collector(
    inp: CallbackInput,
    ack_path: Union[str, Path],
    *,
    seam_params: OperationalSeamParams,
    claim_dir: Union[str, Path],
    post_result_review_fn: Callable[
        [dict, Classification], PostResultReview
    ] = no_real_codex_post_result_review,
    duplicate_callback_seen: Optional[list] = None,
    evidence_paths: Optional[list] = None,
    post_result_review_marker_path: Optional[Union[str, Path]] = None,
    cancel_audit_path: Optional[Union[str, Path]] = None,
) -> OperationalWiringResult:
    """운영 collector 결선 진입점 — durable-success 직후 seam 1회 결선.

    절차:
      1. frozen ``run_completion_callback_collector`` 호출 (read-only;
         frozen byte-0). 반환 ``CollectorResult`` 만 관찰.
      2. ``classification == PASS`` → durable-success. 그 외 → seam 미진입
         (DUPLICATE_CALLBACK_IGNORED 등 무회귀).
      3. 9-R.3: pre-seam event_id 도출 (result.json 재독 0) → O_EXCL
         atomic claim. 선점 성공 시에만 4 진입, 실패 시 즉시 no-op.
      4. ``run_operational_cancel_seam(operational=True)`` **1회** invoke.
      5. cancel-audit JSON 기록. seam 예외/skip/remove 실패는 collector
         결과로 전파 0 — collector_result 는 1 의 반환을 **그대로** 유지.
    """
    # ── 1. frozen collector (byte-0, read-only) ────────────────────────────
    collector_result = run_completion_callback_collector(
        inp,
        ack_path,
        post_result_review_fn=post_result_review_fn,
        duplicate_callback_seen=duplicate_callback_seen,
        evidence_paths=evidence_paths,
        post_result_review_marker_path=post_result_review_marker_path,
    )

    # ── 2. durable-success 판정 (frozen anchor 밖에서 공개 결과 관찰) ───────
    durable_success = (
        collector_result.classification == Classification.PASS
        and collector_result.closeout_candidate is True
        and collector_result.ack_acquired is True
    )

    # ── 3. pre-seam canonical event_id (9-R.3) ─────────────────────────────
    event_id = compute_event_id(
        task_id=inp.task_id,
        fallback_cron_id=seam_params.target_cron_id,
        dispatch_fired_marker_path=seam_params.dispatch_fired_marker_path,
        result=collector_result,
    )

    if not durable_success:
        return OperationalWiringResult(
            collector_result=collector_result,
            durable_success=False,
            seam_invoked=False,
            seam_skipped_reason=(
                f"durable-success 아님 (classification="
                f"{collector_result.classification.value}) → seam 미진입, "
                "fallback 보존, 무회귀"
            ),
            event_id=event_id,
            notes=[
                "비-PASS (DUPLICATE_CALLBACK_IGNORED 포함) → operational "
                "cancel seam 진입 0 (기존 경로 무회귀)",
            ],
        )

    # ── 3b. seam invoke 직전 atomic claim (exact-once) ─────────────────────
    claim_path = (
        Path(claim_dir)
        / f"{TASK_MARKER_PREFIX}.seam-claim.{event_id}.json"
    )
    claimed = _atomic_claim(
        claim_path,
        {
            "schema": "task-2553+25.seam-claim_v1",
            "event_id": event_id,
            "task_id": inp.task_id,
            "target_cron_id": seam_params.target_cron_id,
            "atomic_create_method": "O_CREAT|O_EXCL",
        },
    )
    if not claimed:
        return OperationalWiringResult(
            collector_result=collector_result,
            durable_success=True,
            seam_invoked=False,
            seam_skipped_reason=(
                "event_id claim 이미 선점됨 (retries/duplicate/concurrent) "
                "→ exact-once no-op, seam 미진입"
            ),
            event_id=event_id,
            claim_marker_path=str(claim_path),
            notes=["exact-once: atomic claim loser → seam 중복호출 0"],
        )

    # ── 4. operational cancel seam 1회 invoke (디커플 가드) ────────────────
    seam_outcome: Optional[OperationalCancelOutcome] = None
    seam_invoked = False
    seam_skipped_reason = ""
    try:
        seam_outcome = run_operational_cancel_seam(
            task_id=inp.task_id,
            target_cron_id=seam_params.target_cron_id,
            dispatch_fired_marker_path=seam_params.dispatch_fired_marker_path,
            result_json_path=seam_params.result_json_path,
            report_path=seam_params.report_path,
            collector_result_marker_path=(
                seam_params.collector_result_marker_path
            ),
            audit_path=seam_params.audit_path,
            cron_lister=seam_params.cron_lister,
            remover=seam_params.remover,
            fallback_cancelled_marker_path=(
                seam_params.fallback_cancelled_marker_path
            ),
            cancel_lock_path=seam_params.cancel_lock_path,
            callback_contract=seam_params.callback_contract,
            operational=True,  # ★ 운영 결선 — 회장 verbatim
        )
        seam_invoked = True
    except Exception as exc:  # noqa: BLE001 - 디커플: seam 예외 collector 비전파
        seam_skipped_reason = (
            f"seam invoke 예외 → fallback 보존, normal collector 성공 불변 "
            f"(디커플): {exc}"
        )

    # ── 5. cancel-audit JSON (9-R.4 필수 필드) ─────────────────────────────
    audit = build_cancel_audit(
        event_id=event_id,
        target_cron_id=seam_params.target_cron_id,
        seam_invoked=seam_invoked,
        seam_skipped_reason=seam_skipped_reason,
        outcome=seam_outcome,
    )
    audit_written: Optional[str] = None
    audit_target = (
        Path(cancel_audit_path)
        if cancel_audit_path is not None
        else Path(claim_dir) / f"{TASK_MARKER_PREFIX}.cancel-audit.json"
    )
    try:
        audit_target.parent.mkdir(parents=True, exist_ok=True)
        audit_target.write_text(
            json.dumps(audit, ensure_ascii=False, indent=2), encoding="utf-8"
        )
        audit_written = str(audit_target)
    except OSError:
        audit_written = None

    return OperationalWiringResult(
        collector_result=collector_result,  # 디커플: 1 의 반환 그대로
        durable_success=True,
        seam_invoked=seam_invoked,
        seam_skipped_reason=seam_skipped_reason,
        event_id=event_id,
        claim_marker_path=str(claim_path),
        seam_outcome=seam_outcome,
        cancel_audit_path=audit_written,
        cancel_audit=audit,
        notes=[
            "결선 위치: frozen collector 공개 CollectorResult 관찰 후 "
            "durable-success(PASS) 직후 +23 run_operational_cancel_seam"
            "(operational=True) 1회 invoke (frozen byte-0, additive)",
            "디커플: seam 결과와 무관하게 collector_result 불변, "
            "normal_success_unchanged=True",
        ],
    )


__all__ = [
    "SCHEMA_CANCEL_AUDIT",
    "TASK_MARKER_PREFIX",
    "OperationalSeamParams",
    "OperationalWiringResult",
    "compute_event_id",
    "build_cancel_audit",
    "run_operational_completion_callback_collector",
]
