"""utils/fallback_schedule_registry.py
task-2728 — durable fallback schedule registry for idempotent prune.
fallback 등록 시 (task_id, round, head_sha, cron_id, registered_at, owner_key)를 durable JSONL에 기록.
collector가 이 registry를 읽어 (task_id·round·head) 단위로 pending fallback을 prune.
"""
from __future__ import annotations

import json
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from typing import Callable, Optional

try:
    import fcntl  # type: ignore
except ImportError:  # pragma: no cover — 비 POSIX 환경 graceful fallback
    fcntl = None  # type: ignore

from utils.callback_envelope_schema import CANONICAL_ROOT_DEFAULT

REGISTRY_SCHEMA = "fallback_schedule_registry_v1"
_REL_PATH = "memory/state/fallback_schedule_registry.jsonl"


def _append_locked(rp: Path, line: str) -> None:
    """JSONL 한 줄 append. fcntl 지원 시 LOCK_EX 로 쓰기 race 방지, 미지원 시 graceful fallback."""
    with open(rp, "a", encoding="utf-8") as fh:
        if fcntl is not None:
            try:
                fcntl.flock(fh.fileno(), fcntl.LOCK_EX)
            except OSError:
                pass  # flock 미지원 파일시스템 — best-effort
        fh.write(line)


def _now_utc() -> str:
    return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")


def resolve_registry_path(
    canonical_root: Optional[str] = None,
    registry_path: Optional[str] = None,
) -> Path:
    """registry_path 주어지면 Path(registry_path),
    아니면 Path(canonical_root or CANONICAL_ROOT_DEFAULT)/_REL_PATH.
    autoset/cwd 추측 금지.
    """
    if registry_path is not None:
        return Path(registry_path)
    return Path(canonical_root or CANONICAL_ROOT_DEFAULT) / _REL_PATH


@dataclass
class FallbackScheduleRecord:
    task_id: str
    round: int
    head_sha: str
    cron_id: str
    registered_at: str
    owner_key: str
    status: str = "PENDING"
    cause: Optional[str] = None
    updated_at: Optional[str] = None

    def key(self) -> tuple:
        return (self.task_id, self.round, self.head_sha, self.cron_id)

    def to_dict(self) -> dict:
        return {
            "schema": REGISTRY_SCHEMA,
            "task_id": self.task_id,
            "round": self.round,
            "head_sha": self.head_sha,
            "cron_id": self.cron_id,
            "registered_at": self.registered_at,
            "owner_key": self.owner_key,
            "status": self.status,
            "cause": self.cause,
            "updated_at": self.updated_at,
        }

    @classmethod
    def from_dict(cls, d: dict) -> "FallbackScheduleRecord":
        return cls(
            task_id=d["task_id"],
            round=int(d["round"]),
            head_sha=d["head_sha"],
            cron_id=d["cron_id"],
            registered_at=d["registered_at"],
            owner_key=d.get("owner_key", ""),
            status=d.get("status", "PENDING"),
            cause=d.get("cause"),
            updated_at=d.get("updated_at"),
        )


def register_fallback(
    *,
    task_id: str,
    round: int,
    head_sha: str,
    cron_id: str,
    owner_key: str,
    canonical_root: Optional[str] = None,
    registry_path: Optional[str] = None,
    now_fn: Callable[[], str] = _now_utc,
) -> FallbackScheduleRecord:
    """PENDING 레코드 생성, JSONL 한 줄 append.
    파일 없으면 parent mkdir 후 생성. 레코드 반환.
    """
    rec = FallbackScheduleRecord(
        task_id=task_id,
        round=round,
        head_sha=head_sha,
        cron_id=cron_id,
        registered_at=now_fn(),
        owner_key=owner_key,
        status="PENDING",
        cause=None,
        updated_at=None,
    )
    rp = resolve_registry_path(canonical_root=canonical_root, registry_path=registry_path)
    rp.parent.mkdir(parents=True, exist_ok=True)
    _append_locked(rp, json.dumps(rec.to_dict(), ensure_ascii=False) + "\n")
    return rec


def read_records(
    *,
    canonical_root: Optional[str] = None,
    registry_path: Optional[str] = None,
) -> list:
    """JSONL 전체 읽어 (task_id, round, head_sha, cron_id) key별
    마지막 상태(append-only last-wins) 레코드 리스트 반환.
    파일 없으면 []. 파싱 불가 줄은 skip.
    """
    rp = resolve_registry_path(canonical_root=canonical_root, registry_path=registry_path)
    if not rp.exists():
        return []
    seen: dict = {}
    try:
        with open(rp, "r", encoding="utf-8") as fh:
            for raw in fh:
                line = raw.strip()
                if not line:
                    continue
                try:
                    d = json.loads(line)
                    rec = FallbackScheduleRecord.from_dict(d)
                    seen[rec.key()] = rec
                except (json.JSONDecodeError, KeyError, TypeError, ValueError):
                    continue
    except OSError:
        return []
    return list(seen.values())


def pending_for(
    task_id: str,
    round: Optional[int] = None,
    head_sha: Optional[str] = None,
    *,
    canonical_root: Optional[str] = None,
    registry_path: Optional[str] = None,
) -> list:
    """read_records 중 status=="PENDING" 이고 task_id 일치
    (+round/head_sha 주어지면 일치)하는 레코드.
    """
    recs = read_records(canonical_root=canonical_root, registry_path=registry_path)
    result = []
    for rec in recs:
        if rec.status != "PENDING":
            continue
        if rec.task_id != task_id:
            continue
        if round is not None and rec.round != round:
            continue
        if head_sha is not None and rec.head_sha != head_sha:
            continue
        result.append(rec)
    return result


def mark_pruned(
    *,
    task_id: str,
    round: int,
    head_sha: str,
    cron_id: str,
    cause: str,
    canonical_root: Optional[str] = None,
    registry_path: Optional[str] = None,
    now_fn: Callable[[], str] = _now_utc,
) -> FallbackScheduleRecord:
    """status="PRUNED", cause, updated_at=now_fn() 인 tombstone 레코드 JSONL append, 반환.
    이미 PRUNED여도 재호출 안전(idempotent).
    """
    rec = FallbackScheduleRecord(
        task_id=task_id,
        round=round,
        head_sha=head_sha,
        cron_id=cron_id,
        registered_at="",  # tombstone은 registered_at 불필요하지만 필드 유지
        owner_key="",
        status="PRUNED",
        cause=cause,
        updated_at=now_fn(),
    )
    rp = resolve_registry_path(canonical_root=canonical_root, registry_path=registry_path)
    rp.parent.mkdir(parents=True, exist_ok=True)
    _append_locked(rp, json.dumps(rec.to_dict(), ensure_ascii=False) + "\n")
    return rec
