"""anu_v3.batch_join_policy — join-time policy: contamination, authority
packet selection, callback classification, batch next-action.

Authority: task-2553+17.md §4(6,7,8,10) + §12 9-R.6.

All rules are deterministic with explicit tie-breakers (9-R.6):
  * final_authority_packet_selector: per task authority =
        result.json (schema anu_*_result / *final_packet) preferred;
        on multiple packets for the same task -> normal collector > fallback,
        then latest ts.
  * duplicate / pending fallback classifier:
        normal collector completed first, later fallback ->
            DUPLICATE_CALLBACK_IGNORED
        fallback only, no normal -> CALLBACK_PENDING (non-blocking iff that
            track's authority packet already settled via the normal path OR
            the batch final decision does not depend on that track)
        4-tuple cron_id mismatch -> TRACK_MISMATCH
        cross-track artifact citation -> batch-level CONTAMINATION_HOLD
  * cross_track_contamination_checker: a track artifact citing another
        track's artifact -> batch-level CONTAMINATION_HOLD.
"""

from __future__ import annotations

from dataclasses import dataclass
from typing import Dict, List, Sequence, Tuple

DUPLICATE_CALLBACK_IGNORED = "DUPLICATE_CALLBACK_IGNORED"
CALLBACK_PENDING = "CALLBACK_PENDING"
TRACK_MISMATCH = "TRACK_MISMATCH"
CONTAMINATION_HOLD = "CONTAMINATION_HOLD"
NORMAL_COLLECTOR_ACCEPTED = "NORMAL_COLLECTOR_ACCEPTED"


@dataclass
class PacketCandidate:
    task_id: str
    track_id: str
    source: str          # "normal" | "fallback"
    schema: str          # e.g. anu_v3.task_2553p9_final_packet.v1
    ts: str              # ISO8601 sortable
    path: str
    status: str = ""

    _SOURCE_RANK = {"normal": 1, "fallback": 0}

    def rank_key(self) -> Tuple[int, str]:
        # deterministic: normal(1) > fallback(0), then latest ts (max).
        return (self._SOURCE_RANK.get(self.source, -1), self.ts)


def final_authority_packet_selector(
    candidates: Sequence[PacketCandidate],
) -> Dict[str, PacketCandidate]:
    """Per task_id pick the single authoritative packet (9-R.6).

    Tie-breakers, in order: source (normal > fallback), then latest ts.
    """
    by_task: Dict[str, List[PacketCandidate]] = {}
    for c in candidates:
        by_task.setdefault(c.task_id, []).append(c)
    chosen: Dict[str, PacketCandidate] = {}
    for task_id, group in by_task.items():
        chosen[task_id] = sorted(
            group, key=lambda c: c.rank_key(), reverse=True
        )[0]
    return chosen


@dataclass
class CallbackEvent:
    track_id: str
    task_id: str
    kind: str            # "normal" | "fallback"
    identity_ok: bool    # 4-tuple validated by callback_track_registry
    normal_already_completed: bool
    authority_settled_via_normal: bool = False
    batch_final_depends_on_track: bool = True
    # HIGH-1: the cron IDs *observed on the live callback* (not reconstructed
    # from the registry). classify_callbacks() validates these against the
    # registered record so a real cron-ID drift surfaces as TRACK_MISMATCH.
    observed_dispatch_cron_id: str = ""
    observed_normal_collector_cron_id: str = ""
    observed_fallback_callback_cron_id: str = ""


def classify_callback(ev: CallbackEvent) -> str:
    """Deterministic callback classification (9-R.6)."""
    if not ev.identity_ok:
        return TRACK_MISMATCH
    if ev.kind == "normal":
        return NORMAL_COLLECTOR_ACCEPTED
    if ev.kind == "fallback":
        if ev.normal_already_completed:
            return DUPLICATE_CALLBACK_IGNORED
        return CALLBACK_PENDING
    raise ValueError(f"unknown callback kind {ev.kind!r}")


def pending_blocks_chair_decision(ev: CallbackEvent) -> bool:
    """A CALLBACK_PENDING fallback blocks the chair decision only if the
    track's authority is NOT already settled via normal AND the batch final
    decision depends on that track (9-R.6)."""
    if classify_callback(ev) != CALLBACK_PENDING:
        return False
    if ev.authority_settled_via_normal:
        return False
    return ev.batch_final_depends_on_track


@dataclass
class TrackArtifacts:
    track_id: str
    own_artifacts: List[str]
    cited_artifacts: List[str]


def cross_track_contamination(
    tracks: Sequence[TrackArtifacts],
) -> List[Tuple[str, str, str]]:
    """Return (citing_track, owner_track, artifact) for every cross-track
    citation. Non-empty result -> batch-level CONTAMINATION_HOLD."""
    owner: Dict[str, str] = {}
    for t in tracks:
        for a in t.own_artifacts:
            owner[a] = t.track_id
    out: List[Tuple[str, str, str]] = []
    for t in tracks:
        for cited in t.cited_artifacts:
            o = owner.get(cited)
            if o is not None and o != t.track_id:
                out.append((t.track_id, o, cited))
    return out


def batch_contaminated(tracks: Sequence[TrackArtifacts]) -> bool:
    return bool(cross_track_contamination(tracks))


# ---------------------------------------------------------------------------
# batch-level next-action planner (§4(10))
# ---------------------------------------------------------------------------

@dataclass
class TrackJoinView:
    track_id: str
    state: str               # terminal/loop state from track_loop_state
    terminal: bool
    callback_class: str      # from classify_callback / track machine
    blocks_chair: bool       # pending fallback blocking?


def batch_next_action(
    track_views: Sequence[TrackJoinView],
    contaminated: bool,
) -> Dict[str, object]:
    """Resolve a single batch-level next action.

    Precedence:
      1. contamination          -> BATCH_HOLD_CONTAMINATION
      2. any HOLD_FOR_CHAIR     -> CHAIR_DECISION_REQUIRED
      3. any blocking pending   -> AWAIT_PENDING_CALLBACK
      4. any non-terminal       -> CONTINUE_LOOP
      5. all terminal & clean   -> CONSOLIDATE_FOR_CHAIR
    """
    if contaminated:
        action = "BATCH_HOLD_CONTAMINATION"
    elif any(v.state == "HOLD_FOR_CHAIR" for v in track_views):
        action = "CHAIR_DECISION_REQUIRED"
    elif any(v.blocks_chair for v in track_views):
        action = "AWAIT_PENDING_CALLBACK"
    elif any(not v.terminal for v in track_views):
        action = "CONTINUE_LOOP"
    else:
        action = "CONSOLIDATE_FOR_CHAIR"
    return {
        "batch_next_action": action,
        "tracks": [
            {
                "track_id": v.track_id,
                "state": v.state,
                "terminal": v.terminal,
                "callback_class": v.callback_class,
                "blocks_chair": v.blocks_chair,
            }
            for v in track_views
        ],
        "contaminated": contaminated,
    }
