# -*- coding: utf-8 -*-
"""utils.ci_watch_handoff_audit — task-2642 CI_WATCH_HANDOFF runner audit JSONL.

회장 verbatim (2026-05-23 19:38 KST) 정책 spec §9 ANU 8 완료 보고 항목 박제:
  1. handoff 생성 여부
  2. watcher 주체
  3. watcher schedule_id
  4. terminal state
  5. 자동수렴 내역
  6. CI/Gemini/phase3 최종 상태
  7. merge-ready 여부
  8. ANU callback 수신 여부

본 모듈 책임 (runner spec §1.1):
  - watcher lifecycle 한 행 = 한 record (PR_OPEN → terminal classification)
  - record 의무 필드: schema / ts_utc / task_id / pr_number / head_sha /
    watcher_owner / watcher_schedule_id / event / terminal_state /
    auto_remediation_attempts / loop_iterations / router_final_state /
    ci_status / callback_prompt_bytes / reason
  - JSONL append-only · atomic via fcntl.flock(LOCK_EX) · UTF-8
  - raw token / Authorization / api_key redact (defense in depth) — PR #144
    owner_gemini_trigger_router_audit 의 redaction doctrine 1:1 정합

one-way isolation: utils/ 외부 import 0 (자기 schema 만 의존). live cokacdir / gh 0.

frozen anchor:
  ANCHOR-1: audit lifecycle event 6종 (HANDOFF_RECEIVED / POLL_TICK /
            AUTO_REMEDIATE / OWNER_NUDGE / TERMINAL_REACHED / CALLBACK_FIRED)
  ANCHOR-2: TERMINAL_REACHED/CALLBACK_FIRED 에는 terminal_state 필수 — 누락 시 AuditError
  ANCHOR-3: token sentinel (Bearer / ghp_ / github_pat_ / ghu_ / ghs_ / ghr_)
            누출 시 AuditError fail-closed
"""
from __future__ import annotations

import fcntl
import json
import os
import re
from datetime import datetime, timezone
from pathlib import Path
from typing import Final, Iterator

from utils.ci_watch_handoff_schema import ALL_TERMINAL_STATES


AUDIT_REL_PATH: Final[str] = "memory/events/ci-watch-handoff-runner-audit.jsonl"
AUDIT_SCHEMA: Final[str] = "utils.ci_watch_handoff_audit.v1"

EVENT_HANDOFF_RECEIVED: Final[str] = "HANDOFF_RECEIVED"
EVENT_POLL_TICK: Final[str] = "POLL_TICK"
EVENT_AUTO_REMEDIATE: Final[str] = "AUTO_REMEDIATE"
EVENT_OWNER_NUDGE: Final[str] = "OWNER_NUDGE"
EVENT_TERMINAL_REACHED: Final[str] = "TERMINAL_REACHED"
EVENT_CALLBACK_FIRED: Final[str] = "CALLBACK_FIRED"

ALL_EVENTS: Final[frozenset[str]] = frozenset(
    {
        EVENT_HANDOFF_RECEIVED,
        EVENT_POLL_TICK,
        EVENT_AUTO_REMEDIATE,
        EVENT_OWNER_NUDGE,
        EVENT_TERMINAL_REACHED,
        EVENT_CALLBACK_FIRED,
    }
)

ALLOWED_AUDIT_KEYS: Final[frozenset[str]] = frozenset(
    {
        "schema",
        "ts_utc",
        "task_id",
        "pr_number",
        "head_sha",
        "watcher_owner",
        "watcher_schedule_id",
        "event",
        "terminal_state",
        "auto_remediation_attempts",
        "loop_iterations",
        "router_final_state",
        "ci_status",
        "callback_prompt_bytes",
        "reason",
    }
)

REDACTED_PLACEHOLDER: Final[str] = "<redacted>"

_REDACT_KEY_RE: Final[re.Pattern[str]] = re.compile(
    r"(?i)(token|authorization|api[_-]?key|secret|password)"
)

# Note: sentinels concat at runtime to avoid leaking a real-looking literal in source
_REDACT_VALUE_SENTINELS: Final[tuple[str, ...]] = (
    "Bearer ",
    "gh" + "p_",
    "github" + "_pat_",
    "gh" + "u_",
    "gh" + "s_",
    "gh" + "r_",
)


class AuditError(RuntimeError):
    """audit record contract / redaction 위반."""


def _now_iso() -> str:
    return datetime.now(timezone.utc).isoformat(timespec="seconds")


def _ensure_no_secret_leak(record: dict) -> None:
    """record key 화이트리스트 + raw token / Authorization 값 누출 차단."""
    extra = set(record.keys()) - ALLOWED_AUDIT_KEYS
    if extra:
        raise AuditError(f"disallowed audit keys: {sorted(extra)}")
    for key in record:
        if _REDACT_KEY_RE.search(str(key)):
            raise AuditError(
                f"audit key {key!r} matches token/authorization sentinel — "
                "fail-closed (회장 verbatim 안전 불변식)"
            )
    serialised = json.dumps(record, ensure_ascii=False)
    for sentinel in _REDACT_VALUE_SENTINELS:
        if sentinel in serialised:
            raise AuditError(
                f"audit record contains token sentinel {sentinel!r}"
            )


class CiWatchHandoffAudit:
    """append-only JSONL audit for watcher lifecycle. atomic via fcntl.flock.

    파일 위치: ``<workspace_root>/memory/events/ci-watch-handoff-runner-audit.jsonl``.
    pytest 회귀는 ``tmp_path`` 를 workspace_root 로 주입하여 격리.
    """

    def __init__(self, workspace_root: str | Path) -> None:
        root = Path(workspace_root).resolve()
        self._workspace_root = root
        self._path = root / AUDIT_REL_PATH

    @property
    def path(self) -> Path:
        return self._path

    def _ensure_parent(self) -> None:
        self._path.parent.mkdir(parents=True, exist_ok=True)

    def _iter_rows_forward(self) -> Iterator[dict]:
        if not self._path.exists():
            return
        try:
            with open(self._path, "r", encoding="utf-8") as fh:
                for line in fh:
                    if not line.strip():
                        continue
                    try:
                        yield json.loads(line)
                    except json.JSONDecodeError:
                        continue
        except FileNotFoundError:
            return

    def append(self, record: dict) -> None:
        """단일 record append. event/terminal_state 강제 검증 + redaction guard.

        Args:
          record: audit record dict (event 필수, schema/ts_utc 자동 채움).

        Raises:
          AuditError: event enum 위반 / terminal_state 누락 (TERMINAL_*) /
            disallowed key / token sentinel 누출.
        """
        rec = dict(record)
        rec.setdefault("schema", AUDIT_SCHEMA)
        rec.setdefault("ts_utc", _now_iso())

        event = rec.get("event")
        if event not in ALL_EVENTS:
            raise AuditError(
                f"event must be one of {sorted(ALL_EVENTS)}, got {event!r}"
            )
        # ANCHOR-2: TERMINAL_REACHED / CALLBACK_FIRED 는 terminal_state 필수
        if event in (EVENT_TERMINAL_REACHED, EVENT_CALLBACK_FIRED):
            ts = rec.get("terminal_state")
            if ts not in ALL_TERMINAL_STATES:
                raise AuditError(
                    f"terminal_state must be one of {sorted(ALL_TERMINAL_STATES)} "
                    f"for event={event}, got {ts!r}"
                )
        # head_sha 정규화
        head_val = rec.get("head_sha")
        if isinstance(head_val, str) and len(head_val) == 40:
            rec["head_sha"] = head_val.lower()

        _ensure_no_secret_leak(rec)

        self._ensure_parent()
        with open(self._path, "a", encoding="utf-8") as fh:
            fcntl.flock(fh.fileno(), fcntl.LOCK_EX)
            try:
                fh.write(
                    json.dumps(rec, ensure_ascii=False, sort_keys=True) + "\n"
                )
                fh.flush()
                os.fsync(fh.fileno())
            finally:
                fcntl.flock(fh.fileno(), fcntl.LOCK_UN)

    def records_for_pr_head(self, *, pr_number: int, head: str) -> list[dict]:
        """전체 audit JSONL 에서 (pr_number, head_sha) 매칭 record list 반환.

        Streaming line-by-line scan (메모리 로드 0). 회귀에서 lifecycle 추적용.
        """
        head_norm = head.lower() if isinstance(head, str) else ""
        return [
            r
            for r in self._iter_rows_forward()
            if r.get("pr_number") == pr_number
            and isinstance(r.get("head_sha"), str)
            and r["head_sha"].lower() == head_norm
        ]


__all__ = [
    "AUDIT_REL_PATH",
    "AUDIT_SCHEMA",
    "EVENT_HANDOFF_RECEIVED",
    "EVENT_POLL_TICK",
    "EVENT_AUTO_REMEDIATE",
    "EVENT_OWNER_NUDGE",
    "EVENT_TERMINAL_REACHED",
    "EVENT_CALLBACK_FIRED",
    "ALL_EVENTS",
    "ALLOWED_AUDIT_KEYS",
    "REDACTED_PLACEHOLDER",
    "AuditError",
    "CiWatchHandoffAudit",
]
