"""tests/regression/test_cron_session_safety_guard_2526.py

회귀 테스트 — task-2526 cron ``--session`` safety guard / safe_cron_dispatch wrapper.

회장 §본질 (2026-05-10):
  독립 bot task cron에 ``--session`` 옵션이 잘못 붙어 봇이 아니라 **아누 자기 세션이
  resume된 사고**가 발생함. 본 테스트는 이 사고를 회귀 fixture로 박제하여 동일 사고가
  다시 발생하지 못하도록 wrapper preflight를 강제한다.

회장 §명시 6 회귀 검증 (정확히 6개):
  1. independent_task + ``--session`` → BLOCK
  2. merge_task + ``--session`` → BLOCK
  3. followup_readonly + ``--session`` → ALLOW
  4. bot_task + bot_key + no ``--session`` → ALLOW
  5. bot_task + no bot_key → BLOCK
  6. owner_pat fallback 감지 → BLOCK

본 사건 fixture (PR #74 재위임 오류):
  - 잘못된 cron ``5C9995CCB`` — ``--session 5eee7634-b0be-4594-b84e-311ae64e557b`` 동반
    → wrapper가 BLOCK 했어야 함 (본 테스트가 박제)
  - 재발사 cron ``74325894`` — ``--session`` 빼고 발사 → 정상 (본 테스트가 ALLOW 박제)

회장 §보안:
  - raw token / raw key / raw session UUID 정적 검사 (wrapper 출력에 0 노출)
  - subprocess 실행 X (preflight only)
  - audit JSONL은 tmp_path에만 기록 (production audit-jsonl 오염 X)
"""
from __future__ import annotations

import json
import re
import sys
from dataclasses import asdict
from pathlib import Path

import pytest

# ---------------------------------------------------------------------------
# Worktree root → sys.path (force position 0) — 패턴 task-2486/task-2523 정합
# ---------------------------------------------------------------------------
_WORKTREE_ROOT = Path(__file__).resolve().parent.parent.parent
if str(_WORKTREE_ROOT) in sys.path:
    sys.path.remove(str(_WORKTREE_ROOT))
sys.path.insert(0, str(_WORKTREE_ROOT))

from scripts.safe_cron_dispatch import (  # noqa: E402  # pyright: ignore[reportMissingImports]
    DispatchStatus,
    safe_cron_dispatch,
    format_chairman_block_notice,
)
from utils.cron_targeting_audit import (  # noqa: E402  # pyright: ignore[reportMissingImports]
    ACTOR_ANU_SESSION,
    ACTOR_BOT_SESSION,
    BlockedReason,
    CronTargetingAuditRecord,
    REQUIRED_AUDIT_FIELDS_2526,
    SUPPLEMENTARY_AUDIT_FIELDS_2526,
    TASK_KIND_BOT,
    TASK_KIND_FOLLOWUP_RO,
    TASK_KIND_INDEPENDENT,
    TASK_KIND_MERGE,
    build_audit_record,
    detect_misrouted_session,
    ensure_no_raw_secrets,
    evidence_based_recover,
    hash_bot_key,
    sanitize_command_preview,
    soft_kill_misrouted,
)


# ===========================================================================
# Fixtures — 본 사건 (PR #74 재위임 오류) 박제
# ===========================================================================

CHAIR_CHAT = "6937032012"
DEV3_DAGDA_KEY = "0b94683120a691cf"  # raw — 테스트는 wrapper가 hash로 바꾸는지 검증
ANU_SELF_SESSION_UUID = "5eee7634-b0be-4594-b84e-311ae64e557b"  # 본 사건 session id


@pytest.fixture
def audit_path(tmp_path: Path) -> Path:
    """production audit-jsonl 오염 방지 — tmp_path에 격리."""
    return tmp_path / "cron-targeting-audit.jsonl"


def _read_audit_lines(audit_path: Path) -> list:
    """audit JSONL 파일의 모든 line을 dict 리스트로 반환."""
    if not audit_path.exists():
        return []
    return [json.loads(ln) for ln in audit_path.read_text("utf-8").splitlines() if ln.strip()]


# ===========================================================================
# 회귀 검증 #1 — independent_task + --session → BLOCK
# ===========================================================================

def test_regression_1_independent_task_with_session_blocks(audit_path: Path):
    """본 사건 cron 5C9995CCB — independent dev3 task에 anu self-session 동반."""
    result = safe_cron_dispatch(
        prompt="dev3-team independent_task: rebuild stats",
        schedule="2m",
        chat=CHAIR_CHAT,
        target_bot_key=DEV3_DAGDA_KEY,
        task_kind=TASK_KIND_INDEPENDENT,
        session_id=ANU_SELF_SESSION_UUID,
        audit_path=audit_path,
    )

    assert result.status == DispatchStatus.BLOCKED
    assert result.blocked_reason == BlockedReason.INDEPENDENT_TASK_WITH_SESSION
    assert result.command_argv == tuple(), "BLOCK시 command_argv는 비어야 함"
    assert result.chairman_notice is not None
    assert "CRON_TARGETING_GUARD_BLOCKED" in result.chairman_notice
    # 본 사건 핵심: 아누 자기 세션 resume이 막혀야 함
    assert "anu_session" in result.blocked_reason or "session" in result.blocked_reason

    lines = _read_audit_lines(audit_path)
    assert len(lines) == 1
    rec = lines[0]
    assert rec["task_kind"] == TASK_KIND_INDEPENDENT
    assert rec["session_id_present"] is True
    assert rec["session_id_allowed"] is False
    assert rec["blocked_reason"] == BlockedReason.INDEPENDENT_TASK_WITH_SESSION


# ===========================================================================
# 회귀 검증 #2 — merge_task + --session → BLOCK
# ===========================================================================

def test_regression_2_merge_task_with_session_blocks(audit_path: Path):
    """PR merge 위임에 anu self-session 동반 → BLOCK."""
    result = safe_cron_dispatch(
        prompt='[task-2526] merge PR #74 — gh pr merge 74 --squash GH_TOKEN=$BOT_GITHUB_TOKEN',
        schedule="0 9 * * 1",
        chat=CHAIR_CHAT,
        target_bot_key=DEV3_DAGDA_KEY,
        task_kind=TASK_KIND_MERGE,
        session_id=ANU_SELF_SESSION_UUID,
        audit_path=audit_path,
    )

    assert result.status == DispatchStatus.BLOCKED
    # session 동반이 먼저 걸리거나 owner mismatch가 걸려도 안전
    assert result.blocked_reason in (
        BlockedReason.MERGE_TASK_WITH_SESSION,
        BlockedReason.TARGET_SESSION_OWNER_MISMATCH,
    )
    assert result.command_argv == tuple()


# ===========================================================================
# 회귀 검증 #3 — followup_readonly + --session → ALLOW (예외 1건)
# ===========================================================================

def test_regression_3_followup_readonly_with_session_allows(audit_path: Path):
    """read-only status scan followup + 같은 chat anu session → ALLOW."""
    result = safe_cron_dispatch(
        prompt="followup readonly: status scan only — no commit/push",
        schedule="30m",
        chat=CHAIR_CHAT,
        target_bot_key=None,
        task_kind=TASK_KIND_FOLLOWUP_RO,
        session_id=ANU_SELF_SESSION_UUID,
        audit_path=audit_path,
    )

    assert result.status == DispatchStatus.ALLOWED
    assert result.blocked_reason is None
    assert result.command_argv, "ALLOWED 시 command_argv가 채워져야 함"
    # cokacdir 인자 셋에 --session이 포함되어야 함 (예외 ALLOW 케이스)
    assert "--session" in result.command_argv

    lines = _read_audit_lines(audit_path)
    assert len(lines) == 1
    rec = lines[0]
    assert rec["task_kind"] == TASK_KIND_FOLLOWUP_RO
    assert rec["session_id_present"] is True
    assert rec["session_id_allowed"] is True
    assert rec["actor_expected"] == ACTOR_ANU_SESSION
    assert rec["blocked_reason"] is None


# ===========================================================================
# 회귀 검증 #4 — bot_task + bot_key + no --session → ALLOW
# ===========================================================================

def test_regression_4_bot_task_with_key_no_session_allows(audit_path: Path):
    """재발사 cron 74325894 — --session 빼고 발사 → 정상 (다그다 trigger 박제)."""
    result = safe_cron_dispatch(
        prompt='[dev3-team] task-2526 — independent bot task, GH_TOKEN=$BOT_GITHUB_TOKEN gh pr view',
        schedule="2m",
        chat=CHAIR_CHAT,
        target_bot_key=DEV3_DAGDA_KEY,
        task_kind=TASK_KIND_BOT,
        session_id=None,
        audit_path=audit_path,
    )

    assert result.status == DispatchStatus.ALLOWED
    assert result.blocked_reason is None
    assert result.command_argv, "ALLOWED 시 command_argv가 채워져야 함"
    assert "--session" not in result.command_argv, (
        "bot_task에 session이 새어들어가면 본 사건 재발 — 절대 포함 X"
    )
    assert "--key" in result.command_argv
    # raw key가 argv에는 들어가지만 (cokacdir에 전달 필요), audit에는 hash로만 기록되어야 함
    lines = _read_audit_lines(audit_path)
    rec = lines[0]
    assert rec["target_bot_key_hash"] == hash_bot_key(DEV3_DAGDA_KEY)
    assert rec["session_id_present"] is False
    assert rec["actor_expected"] == ACTOR_BOT_SESSION
    assert rec["blocked_reason"] is None


# ===========================================================================
# 회귀 검증 #5 — bot_task + no bot_key → BLOCK
# ===========================================================================

def test_regression_5_bot_task_without_key_blocks(audit_path: Path):
    """merge cron + 회장 chat key만 (다그다 key 없음) → BLOCK."""
    result = safe_cron_dispatch(
        prompt="[task-2526] merge PR #74 GH_TOKEN=$BOT_GITHUB_TOKEN gh pr merge 74 --squash",
        schedule="2m",
        chat=CHAIR_CHAT,
        target_bot_key=None,  # ★ bot_key 누락
        task_kind=TASK_KIND_MERGE,
        session_id=None,
        audit_path=audit_path,
    )

    assert result.status == DispatchStatus.BLOCKED
    assert result.blocked_reason == BlockedReason.BOT_KEY_MISSING_FOR_BOT_TASK
    assert result.command_argv == tuple()


def test_regression_5b_bot_task_without_key_also_blocks(audit_path: Path):
    """task_kind=bot_task + bot_key 누락도 BLOCK (cross-coverage)."""
    result = safe_cron_dispatch(
        prompt="[dev3-team] independent bot task GH_TOKEN=$BOT_GITHUB_TOKEN",
        schedule="2m",
        chat=CHAIR_CHAT,
        target_bot_key=None,
        task_kind=TASK_KIND_BOT,
        session_id=None,
        audit_path=audit_path,
    )
    assert result.status == DispatchStatus.BLOCKED
    assert result.blocked_reason == BlockedReason.BOT_KEY_MISSING_FOR_BOT_TASK


# ===========================================================================
# 회귀 검증 #6 — owner_pat fallback 감지 → BLOCK
# ===========================================================================

def test_regression_6_owner_pat_fallback_blocks(audit_path: Path):
    """gh pr merge 명령에 GH_TOKEN=$BOT_GITHUB_TOKEN 미설정 → owner_pat fallback path → BLOCK."""
    result = safe_cron_dispatch(
        # ★ GH_TOKEN=$BOT_GITHUB_TOKEN 의도적으로 누락 → global gh auth(owner) 경유 위험
        prompt="[task-2526] merge PR #74 — gh pr merge 74 --squash --delete-branch",
        schedule="2m",
        chat=CHAIR_CHAT,
        target_bot_key=DEV3_DAGDA_KEY,
        task_kind=TASK_KIND_MERGE,
        session_id=None,
        audit_path=audit_path,
    )

    assert result.status == DispatchStatus.BLOCKED
    assert result.blocked_reason == BlockedReason.OWNER_PAT_FALLBACK_DETECTED
    assert result.command_argv == tuple()


def test_regression_6b_owner_pat_safe_when_gh_token_injected(audit_path: Path):
    """대조군 — bot token이 명시 주입되면 OWNER_PAT_FALLBACK_DETECTED는 끈다."""
    result = safe_cron_dispatch(
        prompt='[task-2526] merge PR #74 — GH_TOKEN=$BOT_GITHUB_TOKEN gh pr merge 74 --squash',
        schedule="2m",
        chat=CHAIR_CHAT,
        target_bot_key=DEV3_DAGDA_KEY,
        task_kind=TASK_KIND_MERGE,
        session_id=None,
        audit_path=audit_path,
    )
    assert result.status == DispatchStatus.ALLOWED
    assert result.blocked_reason is None


# ===========================================================================
# 본 사건 replay fixture — PR #74 재위임 오류 cron 5C9995CCB
# ===========================================================================

def test_pr74_misroute_incident_replay_blocks_5C9995CCB(audit_path: Path):
    """본 사건: cron 5C9995CCB — dev3 다그다에게 위임하려 했는데 --session 5eee7634...
    가 붙어 있어서 PID 448820 = 아누 자기 세션이 resume되었음.

    본 wrapper는 이 cron을 **실행 전에** 차단해야 함.
    """
    result = safe_cron_dispatch(
        prompt='[dev3-team] task-2526 발사 — gh pr merge 74',
        schedule="2m",
        chat=CHAIR_CHAT,
        target_bot_key=DEV3_DAGDA_KEY,
        task_kind=TASK_KIND_INDEPENDENT,  # 회장이 의도한 대로 분류
        session_id=ANU_SELF_SESSION_UUID,  # ★ 잘못 붙은 session
        audit_path=audit_path,
    )

    assert result.status == DispatchStatus.BLOCKED, (
        "본 사건 cron 5C9995CCB가 wrapper에 의해 BLOCK되지 않으면 "
        "PR #74 재위임 오류가 다시 발생할 수 있음."
    )
    assert result.blocked_reason == BlockedReason.INDEPENDENT_TASK_WITH_SESSION


def test_pr74_redispatch_cron_74325894_allows(audit_path: Path):
    """재발사 cron 74325894 — --session 제거 후 정상 위임.

    다그다 PID 455634에서 trigger되어야 한다 (anu PID 448820 X)."""
    result = safe_cron_dispatch(
        prompt='[dev3-team] task-2526 재발사 — GH_TOKEN=$BOT_GITHUB_TOKEN gh pr view 74',
        schedule="2m",
        chat=CHAIR_CHAT,
        target_bot_key=DEV3_DAGDA_KEY,
        task_kind=TASK_KIND_BOT,
        session_id=None,  # ★ session 제거됨
        audit_path=audit_path,
    )
    assert result.status == DispatchStatus.ALLOWED
    assert "--session" not in result.command_argv


# ===========================================================================
# Audit schema — 8 필드 + 2 보강 (회장 §4)
# ===========================================================================

def test_audit_record_has_required_8_fields(audit_path: Path):
    """audit JSONL line에 회장 §4 명시 8 필드 + 2 보강 모두 존재."""
    safe_cron_dispatch(
        prompt='[dev3-team] task-2526 GH_TOKEN=$BOT_GITHUB_TOKEN',
        schedule="2m",
        chat=CHAIR_CHAT,
        target_bot_key=DEV3_DAGDA_KEY,
        task_kind=TASK_KIND_BOT,
        session_id=None,
        audit_path=audit_path,
    )
    lines = _read_audit_lines(audit_path)
    assert len(lines) == 1
    rec = lines[0]
    for fname in REQUIRED_AUDIT_FIELDS_2526:
        assert fname in rec, f"required audit field missing: {fname}"
    for fname in SUPPLEMENTARY_AUDIT_FIELDS_2526:
        assert fname in rec, f"supplementary audit field missing: {fname}"
    # 정확히 8개의 필수 필드가 정의되어 있는지 (회장 §명시)
    assert len(REQUIRED_AUDIT_FIELDS_2526) == 8


def test_audit_dataclass_field_set_matches_schema():
    rec = build_audit_record(
        cron_id=None,
        target_bot=f"chat={CHAIR_CHAT}",
        bot_key=DEV3_DAGDA_KEY,
        session_id=None,
        session_id_allowed=False,
        task_kind=TASK_KIND_BOT,
        actor_expected=ACTOR_BOT_SESSION,
        actor_actual_if_known=None,
        command_preview=f'cokacdir --cron "x" --at 2m --chat {CHAIR_CHAT} --key {DEV3_DAGDA_KEY}',
        blocked_reason=None,
    )
    assert isinstance(rec, CronTargetingAuditRecord)
    d = asdict(rec)
    for fname in REQUIRED_AUDIT_FIELDS_2526:
        assert fname in d
    # raw bot_key는 dataclass에 절대 안 들어감 — hash만
    assert rec.target_bot_key_hash == hash_bot_key(DEV3_DAGDA_KEY)
    assert DEV3_DAGDA_KEY not in d["command_preview_sanitized"]


# ===========================================================================
# 보안 — token / raw key / raw session UUID 0 노출 (정적 검사)
# ===========================================================================

def test_no_raw_secrets_in_audit_jsonl(audit_path: Path):
    """audit JSONL 전체 텍스트에 raw key / raw session UUID가 1건도 없어야 함."""
    # 다양한 path 경유시 모두 sanitize되는지 확인
    safe_cron_dispatch(
        prompt='dev3 indep task',
        schedule="2m",
        chat=CHAIR_CHAT,
        target_bot_key=DEV3_DAGDA_KEY,
        task_kind=TASK_KIND_INDEPENDENT,
        session_id=ANU_SELF_SESSION_UUID,
        audit_path=audit_path,
    )
    safe_cron_dispatch(
        prompt='dev3 bot task GH_TOKEN=$BOT_GITHUB_TOKEN',
        schedule="2m",
        chat=CHAIR_CHAT,
        target_bot_key=DEV3_DAGDA_KEY,
        task_kind=TASK_KIND_BOT,
        session_id=None,
        audit_path=audit_path,
    )

    text = audit_path.read_text("utf-8")
    assert DEV3_DAGDA_KEY not in text, "raw bot_key가 audit에 노출되면 안 됨"
    assert ANU_SELF_SESSION_UUID not in text, "raw session UUID가 audit에 노출되면 안 됨"
    # 일반적인 GitHub token prefix도 검출
    assert not re.search(r"ghp_[A-Za-z0-9]{20,}", text)
    assert not re.search(r"ghs_[A-Za-z0-9]{20,}", text)


def test_sanitize_command_preview_redacts_key_and_session():
    raw = (
        f'cokacdir --cron "do thing" --at 2m '
        f'--chat {CHAIR_CHAT} --key {DEV3_DAGDA_KEY} '
        f'--session {ANU_SELF_SESSION_UUID}'
    )
    out = sanitize_command_preview(raw)
    assert DEV3_DAGDA_KEY not in out
    assert ANU_SELF_SESSION_UUID not in out
    assert "<hash:" in out
    assert "(redacted)" in out


def test_ensure_no_raw_secrets_blocks_session_uuid():
    payload = {"session_id_raw": ANU_SELF_SESSION_UUID, "kind": "bug"}
    with pytest.raises(ValueError):
        ensure_no_raw_secrets(payload)


def test_ensure_no_raw_secrets_blocks_github_pat():
    payload = {"cmd": "GH_TOKEN=ghp_aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa gh pr merge"}
    with pytest.raises(ValueError):
        ensure_no_raw_secrets(payload)


def test_hash_bot_key_is_deterministic_and_short():
    h1 = hash_bot_key(DEV3_DAGDA_KEY)
    h2 = hash_bot_key(DEV3_DAGDA_KEY)
    assert h1 == h2 and h1 is not None and len(h1) == 16
    assert hash_bot_key(None) is None
    assert hash_bot_key("") is None


# ===========================================================================
# Misroute detection / soft kill / recover (회장 §5)
# ===========================================================================

def test_detect_misrouted_session_flags_session_in_cron_dispatch():
    """cmdline에 cokacdir + --cron + --session 가 모두 있으면 misroute 의심."""
    fake_cmdline = (
        f"/usr/local/bin/cokacdir --cron 'task' "
        f"--at 2m --chat {CHAIR_CHAT} --key {DEV3_DAGDA_KEY} "
        f"--session {ANU_SELF_SESSION_UUID}"
    )
    report = detect_misrouted_session(
        pid=448820,  # 본 사건 PID
        cmdline_reader=lambda pid: fake_cmdline if pid else None,
    )
    assert report.suspected_misroute is True
    assert report.has_session_flag is True
    # raw session UUID가 report에 안 들어가야 함
    assert ANU_SELF_SESSION_UUID not in (report.session_id_redacted or "")
    assert ANU_SELF_SESSION_UUID not in report.cmdline_preview_sanitized
    assert DEV3_DAGDA_KEY not in report.cmdline_preview_sanitized


def test_detect_misrouted_session_no_misroute_for_normal_cron():
    fake_cmdline = (
        f"/usr/local/bin/cokacdir --cron 'task' "
        f"--at 2m --chat {CHAIR_CHAT} --key {DEV3_DAGDA_KEY}"
    )
    report = detect_misrouted_session(
        pid=455634,  # 재발사 다그다 PID
        cmdline_reader=lambda pid: fake_cmdline if pid else None,
    )
    assert report.suspected_misroute is False
    assert report.has_session_flag is False


def test_detect_misrouted_session_handles_unreadable_proc():
    report = detect_misrouted_session(pid=99999999, cmdline_reader=lambda pid: None if pid else None)
    assert report.suspected_misroute is False
    assert report.reason == "proc_cmdline_unreadable"


def test_soft_kill_dry_run_does_not_invoke_killer(audit_path: Path):
    invocations = []

    def fake_killer(pid: int, sig: int) -> None:
        invocations.append((pid, sig))

    # dry_run=True (default) — fake_killer는 절대 호출되어선 안 됨
    result = soft_kill_misrouted(448820, dry_run=True, killer=fake_killer, audit_path=audit_path)
    assert result["action"] == "dry_run"
    assert result["signal_sent"] is False
    assert invocations == []


def test_evidence_based_recover_clean_abort_when_no_evidence():
    plan = evidence_based_recover(
        "task-2526",
        signals={
            "worktree_diff": False,
            "worktree_untracked": False,
            "branch_unpushed_commits": False,
            "remote_branch_exists": False,
            "open_pr_for_task": False,
            "ci_run_for_branch": False,
            "audit_jsonl_evidence": False,
        },
    )
    assert plan.classification == "clean_abort"
    assert plan.redispatch_required is True
    assert plan.signals_with_evidence == tuple()


def test_evidence_based_recover_contaminated_when_any_signal_true():
    plan = evidence_based_recover(
        "task-2526",
        signals={"worktree_diff": True, "audit_jsonl_evidence": True},
    )
    assert plan.classification == "contaminated_execution"
    assert plan.redispatch_required is True
    assert "worktree_diff" in plan.signals_with_evidence


def test_evidence_based_recover_no_evidence_when_signals_none():
    plan = evidence_based_recover("task-2526", signals=None)
    assert plan.classification == "no_evidence"


# ===========================================================================
# Chairman notice (Critical 7종 X — 짧은 운영 신호만)
# ===========================================================================

def test_chairman_notice_is_short_and_not_critical_taxonomy():
    notice = format_chairman_block_notice(
        task_kind=TASK_KIND_INDEPENDENT,
        blocked_reason=BlockedReason.INDEPENDENT_TASK_WITH_SESSION,
        target_bot=f"chat={CHAIR_CHAT}/key=<hash:abcd…>",
    )
    assert "CRON_TARGETING_GUARD_BLOCKED" in notice
    assert len(notice) < 200, "회장 짧은 보고 정합 — 200자 미만 유지"
    # Critical 7종 enum 표기를 노출하면 안 됨 (Critical 7종 외 회장 보고 금지)
    assert "Critical" not in notice
    assert "CRITICAL" not in notice


# ===========================================================================
# expected_files 정합 — 본 task가 정확히 4 파일만 생성/수정 (회장 §expected_files)
# ===========================================================================

def test_expected_files_exactly_four_exist():
    """task-2526 expected_files = 정확히 4 파일."""
    expected = {
        _WORKTREE_ROOT / "scripts" / "safe_cron_dispatch.py",
        _WORKTREE_ROOT / "tests" / "regression" / "test_cron_session_safety_guard_2526.py",
        _WORKTREE_ROOT / "memory" / "specs" / "cron-targeting-spec.md",
        _WORKTREE_ROOT / "utils" / "cron_targeting_audit.py",
    }
    for p in expected:
        assert p.exists(), f"expected_file missing: {p}"
    assert len(expected) == 4
