#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""tests/integration/test_axis_2_live_inbound.py — Axis 2 live activation 자동 검증.

task-2649 AXIS_2_CALLBACK_COLLECTOR_CONTROL_PLANE_LIVE_ACTIVATION
task md: memory/tasks/task-2649.md
chair_authorization_id: CHAIR-AUTH-AXIS-2-20260524-JJONGS-INBOUND-001

회장 verbatim 7 필수 검증 자동화:
  1. UserPromptSubmit hook이 callback inbox 감지
  2. RECEIVED_INBOUND_THIS_SESSION 5 조건 자동 검증 가능한 항목
  3. callback_id + schedule_id ↔ ledger row 1:1 매칭
  4. self-attested result만으로 수신 처리 금지
  5. duplicate callback dedupe 처리
  6. Stop hook 미처리 callback 시 종료 차단
  7. hook crash L1 rollback / 5회 연속 실패 L2 rollback 박제 검증

본 dispatch executor 세션은 chair-facing 아니므로 회장-facing 본 세션의 실제 inbound
인식은 별도 회장 본 세션에서 직접 검증한다. 본 테스트는 hooks 3개 + ledger linkage +
dedupe + stop hook block + rollback 트리거 조건을 자동화로 박제한다.
"""
from __future__ import annotations

import json
import os
import shutil
import subprocess
import tempfile
import unittest
from pathlib import Path


CHAIR_AUTH_ID = "CHAIR-AUTH-AXIS-2-20260524-JJONGS-INBOUND-001"
STAGED_WORKTREE = Path("/home/jay/workspace/.worktrees/task-2644+1-dev4")
LIVE_HOOKS_DIR = Path("/home/jay/.claude/hooks")
LIVE_SETTINGS = Path("/home/jay/.claude/settings.json")

HOOK_NAMES = [
    "session_start_anu_callback_collector_v2.py",
    "stop_anu_callback_collector_verifier_v2.py",
    "user_prompt_submit_hook_callback_inbox_v2.py",
]


def _hook_env(inbox_dir: Path, workspace_root: Path) -> dict:
    env = dict(os.environ)
    env["PYTHONPATH"] = str(STAGED_WORKTREE)
    env["ANU_WORKSPACE_ROOT"] = str(workspace_root)
    env["ANU_CALLBACK_INBOX_DIR"] = str(inbox_dir)
    return env


def _run_hook(hook_name: str, stdin_payload: dict, env: dict) -> dict:
    proc = subprocess.run(
        ["python3", str(LIVE_HOOKS_DIR / hook_name)],
        input=json.dumps(stdin_payload),
        capture_output=True,
        text=True,
        env=env,
        timeout=15,
    )
    return {
        "returncode": proc.returncode,
        "stdout": proc.stdout,
        "stderr": proc.stderr,
        "parsed": json.loads(proc.stdout) if proc.stdout.strip() else None,
    }


class AxisTwoLiveInboundTests(unittest.TestCase):
    """7 필수 검증 + 5 조건 자동화 박제."""

    @classmethod
    def setUpClass(cls):
        cls.tmp = Path(tempfile.mkdtemp(prefix="axis2_live_"))
        cls.inbox = cls.tmp / "inbox"
        cls.inbox.mkdir()
        cls.workspace = cls.tmp / "workspace"
        (cls.workspace / "memory" / "system").mkdir(parents=True)
        cls.ledger = cls.workspace / "memory" / "system" / ".callback_ledger.jsonl"
        cls.ledger.touch()
        cls.dedupe = cls.workspace / "memory" / "system" / ".callback_dedupe_table.jsonl"
        cls.dedupe.touch()

    @classmethod
    def tearDownClass(cls):
        shutil.rmtree(cls.tmp, ignore_errors=True)

    def test_00_chair_authorization_id_verbatim(self):
        marker = Path(
            "/home/jay/workspace/memory/events/"
            "chair-authorization-axis-2-signature-260524.json"
        )
        self.assertTrue(marker.is_file(), "marker missing")
        data = json.loads(marker.read_text(encoding="utf-8"))
        self.assertEqual(data["chair_authorization_id"], CHAIR_AUTH_ID)
        self.assertEqual(data["axis_id"], 2)

    def test_00b_live_hooks_3_installed(self):
        for name in HOOK_NAMES:
            p = LIVE_HOOKS_DIR / name
            self.assertTrue(p.is_file(), f"live hook missing: {name}")

    def test_00c_settings_json_hooks_registered(self):
        cfg = json.loads(LIVE_SETTINGS.read_text(encoding="utf-8"))
        joined = json.dumps(cfg, ensure_ascii=False)
        for name in HOOK_NAMES:
            self.assertIn(name, joined, f"settings.json 에 hook 미등록: {name}")
        self.assertIn("SessionStart", cfg["hooks"])
        self.assertIn("Stop", cfg["hooks"])
        self.assertIn("UserPromptSubmit", cfg["hooks"])

    def test_01_user_prompt_submit_detects_callback_inbox(self):
        cb = {
            "schema_version": "v1",
            "callback_id": "cb-axis2-smoke-001",
            "task_id": "task-2649",
            "schedule_id": "sched-axis2-smoke-001",
            "source_cron_id": "cron-smoke-001",
            "owner_key": "dev4-vishnu",
            "envelope_bytes": 128,
            "ledger_pointer": {
                "callback_id": "cb-axis2-smoke-001",
                "schedule_id": "sched-axis2-smoke-001",
            },
            "ttl_at": "2026-05-27T13:00:00+09:00",
            "processed": False,
        }
        f = self.inbox / "cb-axis2-smoke-001__sched-axis2-smoke-001__260524T1302.callback"
        f.write_text(json.dumps(cb, ensure_ascii=False), encoding="utf-8")

        env = _hook_env(self.inbox, self.workspace)
        result = _run_hook(HOOK_NAMES[2], {}, env)

        self.assertEqual(result["returncode"], 0, msg=result["stderr"])
        self.assertIsNotNone(result["parsed"])
        self.assertEqual(result["parsed"]["pending_count"], 1)
        items = result["parsed"]["items"]
        self.assertEqual(items[0]["callback_id"], "cb-axis2-smoke-001")
        ctx = (result["parsed"]["hookSpecificOutput"] or {}).get("additionalContext")
        self.assertIsNotNone(ctx)
        self.assertIn("ANU_CALLBACK_INBOX", ctx or "")
        f.unlink()

    def test_02_received_inbound_5_conditions_documented(self):
        """5 조건은 회장 본 세션에서 직접 검증. 본 테스트는 조건 5개의 정의 박제 검증."""
        conditions = [
            "UserPromptSubmit hook 이 .callback 파일 인식",
            "callback 파일 내 ledger_pointer 가 Axis 1 ledger row 와 1:1 매칭",
            "ledger row 의 source_attribution == CALLBACK_COLLECTOR_PROCESSED",
            "본 ANU 세션이 callback 정보를 응답에 직접 활용",
            "본 ANU 세션이 callback 에 대한 next_action 을 본 세션 내에서 실행 또는 결정",
        ]
        self.assertEqual(len(conditions), 5)
        marker = Path(
            "/home/jay/workspace/memory/events/"
            "chair-signature-packet-axis-2-callback-collector-control-plane-260524.json"
        )
        packet = json.loads(marker.read_text(encoding="utf-8"))
        field_05 = packet["field_05_received_inbound_this_session_claim_condition"]
        self.assertEqual(len(field_05["after_axis_2_live_activation_allowed_condition"]), 5)

    def test_03_callback_id_schedule_id_ledger_one_to_one(self):
        ledger_row = {
            "schema": "callback_ledger.v2",
            "callback_id": "cb-axis2-match-001",
            "schedule_id": "sched-axis2-match-001",
            "task_id": "task-2649",
            "source_attribution": "CALLBACK_COLLECTOR_PROCESSED",
            "actual_owner_key_verified": True,
            "envelope_parsed": True,
            "context_recovered": True,
            "terminal_state_classified": True,
            "next_action_decided": "BATCH_WAIT",
            "auto_action_dispatched": False,
            "chair_report_emitted": False,
            "noop_terminal_recorded": False,
            "batch_wait_recorded": True,
            "callback_ledger_written": True,
            "next_action_result": "OK",
            "helper_integration_status": {"available": True},
        }
        with self.ledger.open("w", encoding="utf-8") as fh:
            fh.write(json.dumps(ledger_row, ensure_ascii=False) + "\n")

        cb = {
            "callback_id": "cb-axis2-match-001",
            "task_id": "task-2649",
            "schedule_id": "sched-axis2-match-001",
            "ledger_pointer": {
                "callback_id": "cb-axis2-match-001",
                "schedule_id": "sched-axis2-match-001",
            },
            "processed": False,
        }
        f = self.inbox / "cb-axis2-match-001__sched-axis2-match-001__260524.callback"
        f.write_text(json.dumps(cb, ensure_ascii=False), encoding="utf-8")

        env = _hook_env(self.inbox, self.workspace)
        result = _run_hook(HOOK_NAMES[2], {}, env)
        self.assertEqual(result["returncode"], 0, msg=result["stderr"])
        items = result["parsed"]["items"]
        self.assertEqual(items[0]["callback_id"], "cb-axis2-match-001")

        rows = [
            json.loads(line)
            for line in self.ledger.read_text(encoding="utf-8").splitlines()
            if line.strip()
        ]
        matches = [
            r
            for r in rows
            if r["callback_id"] == "cb-axis2-match-001"
            and r["schedule_id"] == "sched-axis2-match-001"
        ]
        self.assertEqual(len(matches), 1)
        f.unlink()

    def test_04_self_attested_result_without_source_attribution_blocked(self):
        env = _hook_env(self.inbox, self.workspace)
        env["ANU_CALLBACK_LEDGER_PATH"] = str(self.ledger)
        env["ANU_LAST_OUTPUT_TEXT"] = "callback 수신 완료"
        env["COKACDIR_MODE"] = "ANU_CALLBACK_COLLECTOR"

        bad_row = {
            "callback_id": "cb-axis2-self-attest-001",
            "envelope_parsed": True,
            "context_recovered": True,
            "terminal_state_classified": True,
            "next_action_decided": "BATCH_WAIT",
            "auto_action_dispatched": False,
            "chair_report_emitted": False,
            "noop_terminal_recorded": False,
            "batch_wait_recorded": True,
            "callback_ledger_written": True,
            "next_action_result": "OK",
            "source_attribution": None,
            "helper_integration_status": {"available": True},
        }
        with self.ledger.open("w", encoding="utf-8") as fh:
            fh.write(json.dumps(bad_row, ensure_ascii=False) + "\n")

        result = _run_hook(
            HOOK_NAMES[1],
            {
                "collector_mode": True,
                "callback_id": "cb-axis2-self-attest-001",
                "last_output_text": "callback 수신 완료",
            },
            env,
        )
        self.assertIn(result["returncode"], (0, 2), msg=result["stderr"])
        self.assertTrue(result["parsed"]["block"])
        self.assertIn(
            "RECEIVED_PHRASE_WITHOUT_INBOUND_SOURCE_ATTRIBUTION",
            result["parsed"]["failures"],
        )

    def test_05_duplicate_callback_dedupe(self):
        cb = {
            "callback_id": "cb-axis2-dup-001",
            "task_id": "task-2649",
            "schedule_id": "sched-axis2-dup-001",
            "ledger_pointer": {
                "callback_id": "cb-axis2-dup-001",
                "schedule_id": "sched-axis2-dup-001",
            },
        }
        f1 = self.inbox / "cb-axis2-dup-001__sched-axis2-dup-001__260524-A.callback"
        f2 = self.inbox / "cb-axis2-dup-001__sched-axis2-dup-001__260524-B.callback"
        f1.write_text(json.dumps(cb, ensure_ascii=False), encoding="utf-8")
        f2.write_text(json.dumps(cb, ensure_ascii=False), encoding="utf-8")

        env = _hook_env(self.inbox, self.workspace)
        result = _run_hook(HOOK_NAMES[2], {}, env)
        items = result["parsed"]["items"]
        seen = {}
        for it in items:
            seen.setdefault(it.get("callback_id"), []).append(it)
        dups = [k for k, v in seen.items() if len(v) > 1]
        # hook 자체는 inbox 항목을 모두 표시. dedupe 책임은 collector 본체.
        # 본 테스트는 dedupe key (callback_id) 추적이 가능함을 박제.
        self.assertEqual(dups, ["cb-axis2-dup-001"])
        # dedupe table 에 기록 시뮬레이션
        with self.dedupe.open("a", encoding="utf-8") as fh:
            fh.write(
                json.dumps(
                    {
                        "callback_id": "cb-axis2-dup-001",
                        "first_seen_at": "2026-05-24T13:02:00+09:00",
                        "duplicate_count": 1,
                    },
                    ensure_ascii=False,
                )
                + "\n"
            )
        rows = [
            json.loads(l)
            for l in self.dedupe.read_text(encoding="utf-8").splitlines()
            if l.strip()
        ]
        self.assertEqual(len(rows), 1)
        f1.unlink()
        f2.unlink()

    def test_06_stop_hook_blocks_unprocessed_callback(self):
        env = _hook_env(self.inbox, self.workspace)
        env["ANU_CALLBACK_LEDGER_PATH"] = str(self.ledger)
        env["COKACDIR_MODE"] = "ANU_CALLBACK_COLLECTOR"

        # 미처리 row (envelope_parsed=False, action 누락)
        bad = {
            "callback_id": "cb-axis2-unproc-001",
            "envelope_parsed": False,
            "context_recovered": False,
            "terminal_state_classified": False,
            "next_action_decided": None,
            "auto_action_dispatched": False,
            "chair_report_emitted": False,
            "noop_terminal_recorded": False,
            "batch_wait_recorded": False,
            "callback_ledger_written": False,
            "next_action_result": None,
        }
        with self.ledger.open("w", encoding="utf-8") as fh:
            fh.write(json.dumps(bad, ensure_ascii=False) + "\n")

        result = _run_hook(
            HOOK_NAMES[1],
            {
                "collector_mode": True,
                "callback_id": "cb-axis2-unproc-001",
                "last_output_text": "",
            },
            env,
        )
        self.assertTrue(result["parsed"]["block"])
        self.assertIn("MISSING_CALLBACK_ENVELOPE_PARSED", result["parsed"]["failures"])
        self.assertIn("MISSING_CONTEXT_RECOVERED", result["parsed"]["failures"])
        self.assertIn("MISSING_NEXT_ACTION_DECIDED", result["parsed"]["failures"])

    def test_07_hook_crash_rollback_trigger_documented(self):
        marker = Path(
            "/home/jay/workspace/memory/events/"
            "chair-signature-packet-axis-2-callback-collector-control-plane-260524.json"
        )
        packet = json.loads(marker.read_text(encoding="utf-8"))
        fb = packet["field_08_failure_fallback_and_rollback"]
        self.assertEqual(len(fb["L1_failure_trigger_30sec"]), 3)
        self.assertEqual(len(fb["L2_failure_trigger_5min"]), 3)
        self.assertIn("자동 제거", fb["L1_rollback_action"])
        self.assertIn("비활성화", fb["L2_rollback_action"])
        bak = Path("/home/jay/.claude/settings.json.bak.task-2649-pre-axis-2-260524")
        self.assertTrue(bak.is_file(), "L1 rollback baseline backup missing")


if __name__ == "__main__":
    unittest.main(verbosity=2)
