# -*- coding: utf-8 -*-
"""tests.regression.test_owner_gemini_trigger_router_audit — task-2641 Track A.

회장 verbatim §6 / §8 (2026-05-23) 1:1 정합 — audit JSONL + redaction + 403
header 화이트리스트 + nudge_count_for_pr_head dedupe helper 회귀.

본 회귀는 Layer A / NO-CRON: subprocess / cokacdir / merge / cron 호출 0.
audit 파일은 pytest tmp_path 로 격리.
"""
from __future__ import annotations

import json

import pytest

from anu_v2.owner_gemini_trigger_router_audit import (
    ALL_STATES,
    ALLOWED_AUDIT_KEYS,
    AUDIT_SCHEMA,
    OwnerGeminiTriggerRouterAudit,
    RECORDED_403_HEADERS,
    REDACTED_PLACEHOLDER,
    RouterAuditRedactionError,
    STATE_CHAIR_UI_FALLBACK_REQUIRED,
    STATE_FRESH,
    STATE_GEMINI_EXTERNAL_TRIGGER_STALE,
    STATE_NUDGE_POSTED,
    extract_403_headers,
    redact_diagnostics,
    token_hash_prefix,
)


HEAD_A = "a" * 40
HEAD_B = "b" * 40


# ─── extract_403_headers (회장 verbatim §8 — whitelist + redact) ──────────────


def test_extract_403_headers_whitelist_only():
    headers = {
        "X-Accepted-GitHub-Permissions": "issues=write; pull_requests=write",
        "X-Accepted-OAuth-Scopes": "repo, write:discussion",
        "X-OAuth-Scopes": "public_repo",
        "X-RateLimit-Remaining": "4321",
        "X-GitHub-Request-Id": "ABCD:1234",
        "Authorization": "Bearer ghp_REDACT_ME_NEVER_LOG",
        "Documentation-URL": "https://docs.github.com/",
    }
    result = extract_403_headers(headers)
    # case-insensitive normalisation → lower
    for required_key in RECORDED_403_HEADERS:
        assert required_key in result, f"missing whitelist key {required_key!r}"
    # non-whitelist keys must NOT appear
    assert "x-github-request-id" not in result
    assert "authorization" not in result
    assert "documentation-url" not in result
    # raw token sentinel never present
    serialised = json.dumps(result)
    assert "ghp_" not in serialised
    assert "Bearer " not in serialised


def test_extract_403_headers_empty_dict_returns_empty():
    assert extract_403_headers({}) == {}


def test_extract_403_headers_non_dict_returns_empty():
    assert extract_403_headers(None) == {}
    assert extract_403_headers("string") == {}
    assert extract_403_headers([1, 2, 3]) == {}


def test_extract_403_headers_handles_bearer_value_in_whitelisted_key():
    """defense in depth — 만약 화이트리스트 key 의 value 에 Bearer 가 들어와도 redact."""
    headers = {
        "X-Accepted-GitHub-Permissions": "Bearer ghp_LEAKED_TOKEN_VALUE",
    }
    result = extract_403_headers(headers)
    # value redaction via sentinel
    assert result.get("x-accepted-github-permissions") == REDACTED_PLACEHOLDER


# ─── redact_diagnostics ──────────────────────────────────────────────────────


def test_redact_diagnostics_masks_token_key():
    data = {"token": "ghp_AAA111", "other": "ok"}
    redacted = redact_diagnostics(data)
    assert redacted == {"token": REDACTED_PLACEHOLDER, "other": "ok"}


def test_redact_diagnostics_masks_authorization_value():
    data = {"header": "Bearer ghp_LEAK"}
    redacted = redact_diagnostics(data)
    assert redacted["header"] == REDACTED_PLACEHOLDER


def test_redact_diagnostics_handles_nested_structures():
    data = {
        "outer": {
            "api_key": "secret",
            "list": [{"password": "p"}, "Bearer ghp_LEAK", "ok"],
        }
    }
    redacted = redact_diagnostics(data)
    assert redacted["outer"]["api_key"] == REDACTED_PLACEHOLDER
    assert redacted["outer"]["list"][0]["password"] == REDACTED_PLACEHOLDER
    assert redacted["outer"]["list"][1] == REDACTED_PLACEHOLDER
    assert redacted["outer"]["list"][2] == "ok"


# ─── token_hash_prefix (12 hex char — spec §3.3) ─────────────────────────────


def test_token_hash_prefix_default_length_is_12():
    prefix = token_hash_prefix("any-token-value")
    assert len(prefix) == 12
    assert all(c in "0123456789abcdef" for c in prefix)


def test_token_hash_prefix_deterministic():
    a = token_hash_prefix("xx")
    b = token_hash_prefix("xx")
    assert a == b


def test_token_hash_prefix_empty_token_raises():
    with pytest.raises(ValueError):
        token_hash_prefix("")


# ─── OwnerGeminiTriggerRouterAudit.append ────────────────────────────────────


def _valid_record(**override):
    base = {
        "task_id": "task-test",
        "pr_number": 143,
        "current_head_sha": HEAD_A,
        "freshness_state": "STALE",
        "gemini_commit_id_observed": HEAD_B,
        "nudge_attempted": True,
        "nudge_result": "POSTED",
        "permission_header_diagnostics": None,
        "token_present": True,
        "token_hash_prefix": "0123456789ab",
        "token_value_logged": False,
        "final_state": STATE_NUDGE_POSTED,
        "reason": "test record",
    }
    base.update(override)
    return base


def test_append_writes_valid_record_with_schema(tmp_path):
    audit = OwnerGeminiTriggerRouterAudit(tmp_path)
    audit.append(_valid_record())
    text = audit.path.read_text(encoding="utf-8")
    assert text.endswith("\n")
    row = json.loads(text.strip())
    assert row["schema"] == AUDIT_SCHEMA
    assert row["pr_number"] == 143
    assert row["current_head_sha"] == HEAD_A  # already lower
    assert row["final_state"] == STATE_NUDGE_POSTED
    assert row["token_value_logged"] is False


def test_append_normalises_uppercase_head(tmp_path):
    audit = OwnerGeminiTriggerRouterAudit(tmp_path)
    audit.append(_valid_record(current_head_sha=HEAD_A.upper()))
    row = json.loads(audit.path.read_text(encoding="utf-8").strip())
    assert row["current_head_sha"] == HEAD_A


def test_append_rejects_unknown_key(tmp_path):
    audit = OwnerGeminiTriggerRouterAudit(tmp_path)
    bad = _valid_record()
    bad["extra_key"] = "no"
    with pytest.raises(RouterAuditRedactionError):
        audit.append(bad)


def test_append_rejects_token_value_logged_true(tmp_path):
    audit = OwnerGeminiTriggerRouterAudit(tmp_path)
    bad = _valid_record(token_value_logged=True)
    with pytest.raises(RouterAuditRedactionError):
        audit.append(bad)


def test_append_rejects_invalid_final_state(tmp_path):
    audit = OwnerGeminiTriggerRouterAudit(tmp_path)
    bad = _valid_record(final_state="WHATEVER")
    with pytest.raises(RouterAuditRedactionError):
        audit.append(bad)


def test_append_rejects_record_with_raw_token_sentinel(tmp_path):
    audit = OwnerGeminiTriggerRouterAudit(tmp_path)
    bad = _valid_record(reason="oops Bearer ghp_LEAK")
    with pytest.raises(RouterAuditRedactionError):
        audit.append(bad)


def test_allowed_audit_keys_contains_all_record_fields():
    """allowed_audit_keys 가 schema-stable. 키 집합 미세 변경 회귀 차단."""
    expected = {
        "schema",
        "ts_utc",
        "task_id",
        "pr_number",
        "current_head_sha",
        "freshness_state",
        "gemini_commit_id_observed",
        "nudge_attempted",
        "nudge_result",
        "permission_header_diagnostics",
        "token_present",
        "token_hash_prefix",
        "token_value_logged",
        "final_state",
        "reason",
    }
    assert ALLOWED_AUDIT_KEYS == frozenset(expected)


def test_all_states_set_immutable():
    assert STATE_FRESH in ALL_STATES
    assert STATE_GEMINI_EXTERNAL_TRIGGER_STALE in ALL_STATES
    assert STATE_CHAIR_UI_FALLBACK_REQUIRED in ALL_STATES


# ─── nudge_count_for_pr_head (회장 verbatim §9 hard limit support) ────────────


def test_nudge_count_zero_when_no_prior_records(tmp_path):
    audit = OwnerGeminiTriggerRouterAudit(tmp_path)
    assert audit.nudge_count_for_pr_head(pr_number=143, head=HEAD_A) == 0


def test_nudge_count_only_increments_when_nudge_attempted_true(tmp_path):
    audit = OwnerGeminiTriggerRouterAudit(tmp_path)
    # FRESH record (nudge_attempted=False) — no increment
    audit.append(
        _valid_record(
            final_state=STATE_FRESH,
            nudge_attempted=False,
            nudge_result=None,
        )
    )
    assert audit.nudge_count_for_pr_head(pr_number=143, head=HEAD_A) == 0


def test_nudge_count_increments_for_matching_pr_head(tmp_path):
    audit = OwnerGeminiTriggerRouterAudit(tmp_path)
    audit.append(_valid_record(nudge_attempted=True, nudge_result="POSTED"))
    audit.append(
        _valid_record(
            nudge_attempted=True,
            nudge_result="DEDUPED",
            final_state=STATE_NUDGE_POSTED,
        )
    )
    assert audit.nudge_count_for_pr_head(pr_number=143, head=HEAD_A) == 2


def test_nudge_count_isolated_per_head(tmp_path):
    audit = OwnerGeminiTriggerRouterAudit(tmp_path)
    audit.append(_valid_record(current_head_sha=HEAD_A))
    audit.append(_valid_record(current_head_sha=HEAD_B))
    assert audit.nudge_count_for_pr_head(pr_number=143, head=HEAD_A) == 1
    assert audit.nudge_count_for_pr_head(pr_number=143, head=HEAD_B) == 1


def test_nudge_count_isolated_per_pr(tmp_path):
    audit = OwnerGeminiTriggerRouterAudit(tmp_path)
    audit.append(_valid_record(pr_number=143))
    audit.append(_valid_record(pr_number=144))
    assert audit.nudge_count_for_pr_head(pr_number=143, head=HEAD_A) == 1
    assert audit.nudge_count_for_pr_head(pr_number=144, head=HEAD_A) == 1


def test_audit_records_are_appended_not_overwritten(tmp_path):
    audit = OwnerGeminiTriggerRouterAudit(tmp_path)
    audit.append(_valid_record())
    audit.append(_valid_record(reason="second"))
    lines = audit.path.read_text(encoding="utf-8").strip().splitlines()
    assert len(lines) == 2
    second = json.loads(lines[1])
    assert second["reason"] == "second"
