"""tests/regression/test_batch_settle_writeback_2553plus53.py

task-2553+53 — NORMAL_COLLECTOR_DURABLE_SUCCESS_WRITEBACK_AND_BATCH_SETTLE
_TRIGGER regression.

Spec: memory/tasks/task-2553+53.md
(sha256 e0e433f7039a0b22ad51015f7c9693d33cf60078fa36985e2ffd6062636cfb63).

회장 §5 필수 regression 1~15 — 실 entrypoint 직접 호출 (mock-only FAIL):

  1  independent ANU AUTHORITATIVE_PASS -> durable-success write-back PASS
  2  self-chain PASS -> write-back FAIL (SELF_CHAIN_NO_WRITEBACK)
  3  collector_role != ANU -> FAIL
  4  collector_key != configured ANU key -> FAIL
  5  duplicate write-back, same binding -> idempotent (no dup append)
  6  duplicate write-back, different verdict -> WRITEBACK_BINDING_CONFLICT
  7  +50/+51/+52 all missing registry -> before state DEFER
  8  +50/+51/+52 write-back after fix -> all-settled
  9  all-settled -> consolidated summary candidate 생성
  10 fallback pending does NOT block all-settled
  11 fixed-time / dead-man as progress trigger -> FAIL (basis is fixed)
  12 +49 self-collector guard 무회귀
  13 +44 4-tuple registry 무회귀
  14 +47/+48 event-trigger 무회귀
  15 profile engine / Track1~3 (+50/+51/+52) 산출물 byte-0 무변

모든 테스트 100% offline — network / git mutation / cron / dispatch /
cokacdir / subprocess(prod) 0. The +44 ledger is NEVER mutated by this
test (isolated tmp ledgers only); the real durable-success write-back is
performed by the driver, not the test.
"""
from __future__ import annotations

import hashlib
import json
import subprocess
import sys
from pathlib import Path

import pytest

WORKSPACE = Path(__file__).resolve().parent.parent.parent
if str(WORKSPACE) in sys.path:
    sys.path.remove(str(WORKSPACE))
sys.path.insert(0, str(WORKSPACE))

from anu_v3.batch_settle_writeback import (  # noqa: E402  # pyright: ignore[reportMissingImports]
    DURABLE_SUCCESS_WRITEBACK_SCHEMA,
    DURABLE_SUCCESS_WRITTEN,
    EVALUATED_AT_BASIS,
    SELF_CHAIN_NO_WRITEBACK,
    TRACK_MISMATCH,
    WRITEBACK_BINDING_CONFLICT,
    WRITEBACK_FIELDS_11,
    WRITEBACK_IDEMPOTENT_SKIP,
    WritebackPathRefused,
    apply_durable_success_writeback,
    evaluate_batch_settle,
    evaluate_durable_success_writeback,
    extract_binding_from_source,
)
from anu_v3.consolidated_summary_candidate_generator import (  # noqa: E402  # pyright: ignore[reportMissingImports]
    generate_consolidated_summary_candidate,
    render_candidate_markdown,
)

FIX = json.loads(
    (WORKSPACE / "memory/fixtures/task-2553plus53.cases.json").read_text(
        encoding="utf-8"
    )
)
BATCH_ID = FIX["batch_id"]
EXPECTED_TRACKS = [(t[0], t[1]) for t in FIX["expected_tracks"]]
LIVE = FIX["live_sources"]
ANU_KEY = "c119085addb0f8b7"


def _write_verdict(tmp_path: Path, name: str, payload: dict) -> Path:
    p = tmp_path / name
    p.write_text(json.dumps(payload, ensure_ascii=False), encoding="utf-8")
    return p


def _seed_all_three(tmp_ledger: Path) -> None:
    """Real write-back of the live +50/+51/+52 independent-ANU verdicts
    into an ISOLATED tmp ledger (real entrypoints, not the +44 ledger)."""
    for trk, info in LIVE.items():
        d = evaluate_durable_success_writeback(
            WORKSPACE / info["path"],
            batch_id=BATCH_ID,
            track_id=trk,
            executor_key=info["executor_key"],
            ledger_path=tmp_ledger,
        )
        assert d.ok and d.classification == DURABLE_SUCCESS_WRITTEN, (
            trk,
            d.classification,
            d.reasons,
        )
        applied = apply_durable_success_writeback(
            d, ledger_path=tmp_ledger
        )
        assert applied.appended is True


# ── 1. independent ANU AUTHORITATIVE_PASS -> write-back PASS ─────────────────
@pytest.mark.parametrize("trk", list(LIVE.keys()))
def test_01_independent_anu_authoritative_pass_writeback(tmp_path, trk):
    info = LIVE[trk]
    led = tmp_path / "callback_4tuple_index.jsonl"
    d = evaluate_durable_success_writeback(
        WORKSPACE / info["path"],
        batch_id=BATCH_ID,
        track_id=trk,
        executor_key=info["executor_key"],
        ledger_path=led,
    )
    assert d.ok
    assert d.classification == DURABLE_SUCCESS_WRITTEN
    rec = d.durable_record
    assert rec["schema"] == DURABLE_SUCCESS_WRITEBACK_SCHEMA
    assert rec["authoritative_verdict"] == "AUTHORITATIVE_PASS"
    assert rec["collector_role"] == "ANU"
    assert rec["collector_key"] == ANU_KEY
    # §3.2 all 11 mandatory fields present & non-empty.
    for f in WRITEBACK_FIELDS_11:
        assert rec.get(f), f


# ── 2. self-chain PASS -> write-back FAIL ───────────────────────────────────
def test_02_self_chain_pass_no_writeback(tmp_path):
    payload = FIX["self_chain_only"]["verdict_file"]
    src = _write_verdict(tmp_path, "self_chain.json", payload)
    d = evaluate_durable_success_writeback(
        src,
        batch_id=BATCH_ID,
        track_id="TRACK 2",
        executor_key=payload["executor_key"],
        ledger_path=tmp_path / "l.jsonl",
    )
    assert not d.ok
    assert d.classification == SELF_CHAIN_NO_WRITEBACK
    assert d.durable_record is None or d.appended is False


# ── 3. collector_role != ANU -> FAIL ────────────────────────────────────────
def test_03_collector_role_not_anu_fail(tmp_path):
    payload = dict(FIX["independent_anu_writeback"]["verdict_file"])
    payload["collector_role"] = "executor"
    payload["four_tuple_record"] = dict(payload["four_tuple_record"])
    payload["four_tuple_record"]["collector_role"] = "executor"
    src = _write_verdict(tmp_path, "role.json", payload)
    d = evaluate_durable_success_writeback(
        src,
        batch_id=BATCH_ID,
        track_id="TRACK X",
        executor_key=payload["executor_key"],
        ledger_path=tmp_path / "l.jsonl",
    )
    assert not d.ok
    assert d.classification == SELF_CHAIN_NO_WRITEBACK


# ── 4. collector_key != configured ANU key -> FAIL ──────────────────────────
def test_04_collector_key_not_anu_key_fail(tmp_path):
    payload = dict(FIX["independent_anu_writeback"]["verdict_file"])
    payload["collector_key"] = "0000aaaa1111bbbb"
    payload["four_tuple_record"] = dict(payload["four_tuple_record"])
    payload["four_tuple_record"]["collector_key"] = "0000aaaa1111bbbb"
    src = _write_verdict(tmp_path, "key.json", payload)
    d = evaluate_durable_success_writeback(
        src,
        batch_id=BATCH_ID,
        track_id="TRACK X",
        executor_key=payload["executor_key"],
        ledger_path=tmp_path / "l.jsonl",
    )
    assert not d.ok
    assert d.classification == SELF_CHAIN_NO_WRITEBACK


# ── 5. duplicate write-back same binding -> idempotent ──────────────────────
def test_05_duplicate_same_binding_idempotent(tmp_path):
    led = tmp_path / "callback_4tuple_index.jsonl"
    info = LIVE["TRACK 1"]
    d1 = evaluate_durable_success_writeback(
        WORKSPACE / info["path"],
        batch_id=BATCH_ID,
        track_id="TRACK 1",
        executor_key=info["executor_key"],
        ledger_path=led,
    )
    apply_durable_success_writeback(d1, ledger_path=led)
    lines_after_1 = led.read_text(encoding="utf-8").splitlines()
    # repeated collector execution / repeated scan -> idempotent SKIP.
    d2 = evaluate_durable_success_writeback(
        WORKSPACE / info["path"],
        batch_id=BATCH_ID,
        track_id="TRACK 1",
        executor_key=info["executor_key"],
        ledger_path=led,
    )
    assert d2.classification == WRITEBACK_IDEMPOTENT_SKIP
    assert d2.ok
    applied2 = apply_durable_success_writeback(d2, ledger_path=led)
    assert applied2.appended is False
    lines_after_2 = led.read_text(encoding="utf-8").splitlines()
    assert lines_after_1 == lines_after_2  # NO duplicate append


# ── 6. duplicate write-back different verdict -> BINDING_CONFLICT ────────────
def test_06_duplicate_different_verdict_conflict(tmp_path):
    led = tmp_path / "callback_4tuple_index.jsonl"
    base = dict(FIX["conflict_different_verdict"]["verdict_file"])
    p1 = dict(base)
    p1["verdict"] = FIX["conflict_different_verdict"]["first_verdict"]
    s1 = _write_verdict(tmp_path, "c1.json", p1)
    d1 = evaluate_durable_success_writeback(
        s1, batch_id=BATCH_ID, track_id="TRACK Y",
        executor_key=base["executor_key"], ledger_path=led,
    )
    apply_durable_success_writeback(d1, ledger_path=led)
    p2 = dict(base)
    p2["verdict"] = FIX["conflict_different_verdict"]["second_verdict"]
    s2 = _write_verdict(tmp_path, "c2.json", p2)
    d2 = evaluate_durable_success_writeback(
        s2, batch_id=BATCH_ID, track_id="TRACK Y",
        executor_key=base["executor_key"], ledger_path=led,
    )
    assert not d2.ok
    assert d2.classification == WRITEBACK_BINDING_CONFLICT
    # silent skip FORBIDDEN — the conflict is RECORDED in the decision.
    assert any("BINDING_CONFLICT" in r or "conflict" in r.lower()
               for r in d2.reasons)
    applied = apply_durable_success_writeback(d2, ledger_path=led)
    assert applied.appended is False


# ── 6b. explicit expected mismatch -> TRACK_MISMATCH ────────────────────────
def test_06b_track_mismatch_recorded(tmp_path):
    info = LIVE["TRACK 3"]
    ov = FIX["track_mismatch"]["expected_override"]
    d = evaluate_durable_success_writeback(
        WORKSPACE / info["path"],
        batch_id=BATCH_ID,
        track_id="TRACK 3",
        executor_key=info["executor_key"],
        expected={"track_id": ov["track_id"], "task_id": ov["task_id"]},
        ledger_path=tmp_path / "l.jsonl",
    )
    assert not d.ok
    assert d.classification == TRACK_MISMATCH


# ── 7. +50/+51/+52 all missing registry -> before state DEFER ───────────────
def test_07_before_state_all_missing_defer(tmp_path):
    led = tmp_path / "callback_4tuple_index.jsonl"  # empty (none written)
    res = evaluate_batch_settle(
        batch_id=BATCH_ID,
        expected_tracks=EXPECTED_TRACKS,
        ledger_path=led,
    )
    assert res.all_settled is False
    assert res.decision == "RECORD_AND_DEFER"
    assert res.tracks_settled == []


# ── 8. write-back after fix -> all-settled ──────────────────────────────────
def test_08_after_fix_all_settled(tmp_path):
    led = tmp_path / "callback_4tuple_index.jsonl"
    _seed_all_three(led)
    res = evaluate_batch_settle(
        batch_id=BATCH_ID,
        expected_tracks=EXPECTED_TRACKS,
        ledger_path=led,
        this_track_id="TRACK 3",
    )
    assert res.all_settled is True
    assert res.all_authoritative_pass is True
    assert res.decision == "ALL_SETTLED_CONSOLIDATE"
    assert sorted(res.tracks_settled) == sorted(
        [t for t, _ in EXPECTED_TRACKS]
    )
    assert res.this_collector_is_last_settle_track is True


# ── 8b. 2/3 settled -> DEFER ────────────────────────────────────────────────
def test_08b_two_of_three_defer(tmp_path):
    led = tmp_path / "callback_4tuple_index.jsonl"
    for trk in ("TRACK 1", "TRACK 2"):
        info = LIVE[trk]
        d = evaluate_durable_success_writeback(
            WORKSPACE / info["path"], batch_id=BATCH_ID, track_id=trk,
            executor_key=info["executor_key"], ledger_path=led,
        )
        apply_durable_success_writeback(d, ledger_path=led)
    res = evaluate_batch_settle(
        batch_id=BATCH_ID, expected_tracks=EXPECTED_TRACKS,
        ledger_path=led,
    )
    assert res.all_settled is False
    assert res.decision == "RECORD_AND_DEFER"
    assert len(res.tracks_settled) == 2


# ── 9. all-settled -> consolidated summary candidate 생성 ───────────────────
def test_09_consolidated_summary_candidate(tmp_path):
    led = tmp_path / "callback_4tuple_index.jsonl"
    _seed_all_three(led)
    res = evaluate_batch_settle(
        batch_id=BATCH_ID, expected_tracks=EXPECTED_TRACKS,
        ledger_path=led, this_track_id="TRACK 3",
    )
    cand = generate_consolidated_summary_candidate(
        res,
        source_paths_by_track={t: LIVE[t]["path"] for t in LIVE},
        canonical_root=str(WORKSPACE),
    )
    assert cand is not None
    assert cand["status"] == "CONSOLIDATED_SUMMARY_CANDIDATE_READY"
    assert cand["track_count"] == 3
    assert all(
        t["authoritative_verdict"] == "AUTHORITATIVE_PASS"
        for t in cand["tracks"]
    )
    md = render_candidate_markdown(cand)
    assert "consolidated summary CANDIDATE" in md
    # NOT all-settled -> no candidate (fail-closed).
    empty = evaluate_batch_settle(
        batch_id=BATCH_ID, expected_tracks=EXPECTED_TRACKS,
        ledger_path=tmp_path / "empty.jsonl",
    )
    assert generate_consolidated_summary_candidate(
        empty, source_paths_by_track={}
    ) is None


# ── 10. fallback pending does NOT block all-settled ─────────────────────────
def test_10_fallback_pending_non_blocking(tmp_path):
    led = tmp_path / "callback_4tuple_index.jsonl"
    _seed_all_three(led)
    # inject an unrelated fallback/dead-man PENDING +44 record — must NOT
    # affect the durable-success-event-based settle gate (§3.6).
    with open(led, "a", encoding="utf-8") as fh:
        fh.write(json.dumps({
            "schema": "callback_4tuple_ledger_record.v1",
            "task_id": "task-2553+50", "dispatch_id": "FB",
            "dispatch_cron_id": "FB", "executor": "fallback",
            "chat_id": "6937032012",
            "normal_collector_cron_id": "FB-NC",
            "fallback_callback_cron_id": "FB-CB",
            "role": "fallback", "status": "REGISTERED",
            "no_fallback": False, "ts_kst": "x",
        }, sort_keys=True) + "\n")
    res = evaluate_batch_settle(
        batch_id=BATCH_ID, expected_tracks=EXPECTED_TRACKS,
        ledger_path=led,
    )
    assert res.all_settled is True
    assert res.fallback_pending_non_blocking is True
    assert res.decision == "ALL_SETTLED_CONSOLIDATE"


# ── 11. fixed-time / dead-man as progress trigger -> FAIL ───────────────────
def test_11_no_fixedtime_deadman_progress_trigger(tmp_path):
    led = tmp_path / "callback_4tuple_index.jsonl"
    _seed_all_three(led)
    res = evaluate_batch_settle(
        batch_id=BATCH_ID, expected_tracks=EXPECTED_TRACKS,
        ledger_path=led,
    )
    # the settle basis is hard-pinned to the normal-callback durable-
    # success event — NEVER fixed-time / dead-man / fallback (§3.7).
    assert res.evaluated_at_basis == EVALUATED_AT_BASIS
    assert res.evaluated_at_basis == "normal_callback_durable_success_event"
    assert any("fixed-time" in r and "dead-man" in r for r in res.reasons)
    # no API accepts a fixed-time / dead-man trigger as a settle input.
    import inspect

    from anu_v3 import batch_settle_writeback as bsw

    sig = inspect.signature(bsw.evaluate_batch_settle)
    for bad in ("fixed_time", "deadman", "dead_man", "wall_clock"):
        assert bad not in sig.parameters


# ── 11b. allowlist write guard fail-closed ──────────────────────────────────
def test_11b_writeback_path_guard_failclosed(tmp_path):
    info = LIVE["TRACK 1"]
    led = tmp_path / "callback_4tuple_index.jsonl"
    d = evaluate_durable_success_writeback(
        WORKSPACE / info["path"], batch_id=BATCH_ID, track_id="TRACK 1",
        executor_key=info["executor_key"], ledger_path=led,
    )
    with pytest.raises(WritebackPathRefused):
        apply_durable_success_writeback(
            d, ledger_path=tmp_path / "not_the_ledger.jsonl"
        )


# ── 12/13/14 무회귀 (real existing regression suites — real entrypoints) ────
@pytest.mark.parametrize(
    "suite",
    [
        "tests/regression/test_self_collector_guard_runtime_2553plus49.py",
        "tests/regression/test_authoritative_verdict_selector_2553plus49.py",
        "tests/regression/test_callback_4tuple_registry_2553plus44.py",
        "tests/regression/test_callback_event_trigger_2553plus47.py",
    ],
)
def test_12_13_14_upstream_no_regression(suite):
    r = subprocess.run(
        [sys.executable, "-m", "pytest", "-q", suite],
        cwd=str(WORKSPACE),
        capture_output=True,
        text=True,
        timeout=600,
    )
    assert r.returncode == 0, f"{suite} regressed:\n{r.stdout}\n{r.stderr}"


# ── 15. profile engine + Track1~3 (+50/+51/+52) byte-0 무변 ─────────────────
def test_15_frozen_and_track_artifacts_byte0():
    # frozen anchors — exact byte-0 hashes recorded by +50 decision.json.
    frozen = {
        "anu_v3/callback_4tuple_registry.py":
            "774d550628410d36962c23a7663c4b6dbf72789de7c7fd940871e9ad8280e5ab",
        "dispatch/executor_completion_contract.py":
            "364caa11904285657abd716d78c5493b1f8b519318387d0f864fb6a136dca0b4",
        "anu_v3/callback_event_trigger.py":
            "352ad0f570e55040e7c1e4a32cbfe0f076cbd53529b4db6222a8da1a4bee9cc5",
        "anu_v3/policy_profile_engine.py":
            "2363e291a0a43884892f5e554f115481a077322bd5caa3000fb75bf5b72bc6be",
        "anu_v3/parallel_batch_coordinator.py":
            "10529421110b3d2765785b6cf911527c8f5e964b5078fcfa6190fcb86d0f2c0f",
    }
    for rel, want in frozen.items():
        got = hashlib.sha256(
            (WORKSPACE / rel).read_bytes()
        ).hexdigest()
        assert got == want, f"FROZEN byte-0 broken: {rel}"
    # Track1~3 independent-ANU verdict sources are read-only consumed —
    # this test never mutates them; assert they are still parseable JSON
    # with the AUTHORITATIVE_PASS the write-back relies on.
    for trk, info in LIVE.items():
        data = json.loads(
            (WORKSPACE / info["path"]).read_text(encoding="utf-8")
        )
        v = data.get("authoritative_verdict") or data.get("verdict")
        assert v == "AUTHORITATIVE_PASS", (trk, v)
    # extraction is a REAL entrypoint (not a mock) over the live files.
    b = extract_binding_from_source(
        WORKSPACE / LIVE["TRACK 1"]["path"],
        batch_id=BATCH_ID, track_id="TRACK 1",
        executor_key=LIVE["TRACK 1"]["executor_key"],
    )
    assert b.task_id == "task-2553+50"
    assert b.collector_role == "ANU"
