"""anu_v2.owner_trigger_audit — append-only audit log + atomic dedupe + bounded scan.

회장 §명시 14장 §8 1:1 박제 (2026-05-11 KST):
  파일: ``memory/events/owner-trigger-audit.jsonl``
  append-only mode (``open(path, "a")`` 강제).
  4 dedupe 조건: same ``pr`` + same ``head`` + ``action == POST_GEMINI_REVIEW_TRIGGER_COMMENT``
                  + ``result == POSTED``.
  head 변경 시 기존 trigger stale, 새 decision 생성 필수.
  atomic 보장: ``fcntl.flock`` (POSIX advisory lock).

task-2554+1 회장 §명시 (2026-05-12 KST) HIGH race condition 보완:
  - ``transaction()`` context manager — check_dedupe + http_post + record_outcome 을
    단일 lock 범위 안에서 원자 처리.
  - audit JSONL 본 파일과 sidecar lock 파일 (``...jsonl.lock``) 2 단 lock.
  - http_post 예외 시 audit FAILED 기록은 caller (``owner_trigger_only``) 에서
    ``transaction.record(...)`` 로 명시 호출.

task-2554+2 회장 §명시 (2026-05-12 KST) §2 1:1 완성:
  - **bounded/reverse scan**: dedupe 조회는 audit JSONL 의 마지막 N (=512) 행만
    tail-read 로 역방향 스캔. O(N) full scan 제거. 회장 §2 직접 일치.
  - ``RESULT_PENDING`` sentinel: http_post 호출 직전 caller 가 transaction.record(PENDING)
    을 호출하면 본 모듈에 영구 기록되어 process crash 후 다음 runner 가 DEDUPED 판정.
  - ``_iter_rows_reverse(max_rows=N)`` helper 추가. 기존 ``_iter_rows`` 는 호환성 보존.

token redaction (§6 1:1):
  허용 3 필드만 audit 에 기록: ``token_present`` / ``token_hash_prefix`` / ``token_value_logged: false``.
  token raw value 는 어떤 분기에서도 audit 에 기록 금지.

one-way isolation: anu_v2/ 외부 import 금지.
"""

from __future__ import annotations

import fcntl
import hashlib
import json
import os
from contextlib import contextmanager
from datetime import datetime, timezone
from pathlib import Path
from typing import Final, Iterator


AUDIT_REL_PATH: Final[str] = "memory/events/owner-trigger-audit.jsonl"

ALLOWED_ACTION: Final[str] = "POST_GEMINI_REVIEW_TRIGGER_COMMENT"
RESULT_POSTED: Final[str] = "POSTED"
RESULT_PENDING: Final[str] = "PENDING"
RESULT_FAILED: Final[str] = "FAILED"
RESULT_DEDUPED: Final[str] = "DEDUPED"

# task-2554+2 §2: bounded scan 한계. 한 PR/head 에 대한 trigger 시도가 512 회를 넘으면
# 다른 운영 문제 (audit 폭주). 일반 dedupe 판정은 최신 N 행만 검사하면 충분.
DEDUPE_SCAN_MAX_ROWS: Final[int] = 512

# tail-read 시 한 번에 읽을 바이트 청크. audit 한 줄은 보통 < 1KB 이므로 64KB 면 충분.
_TAIL_CHUNK_SIZE: Final[int] = 65536

# audit 한 줄 허용 top-level key 집합 — additionalProperties: false
ALLOWED_AUDIT_KEYS: Final[frozenset[str]] = frozenset(
    {
        "schema",
        "ts",
        "task_id",
        "pr",
        "head",
        "action",
        "result",
        "comment_body",
        "endpoint",
        "decision_path",
        "token_present",
        "token_hash_prefix",
        "token_value_logged",
        "error_code",
    }
)

AUDIT_SCHEMA: Final[str] = "anu_v2.owner_trigger_audit.v1"


class DedupeViolation(RuntimeError):
    """동일 PR + head + action + result=POSTED|PENDING 조합 재시도 차단."""


class AuditRedactionError(RuntimeError):
    """audit record 에 token raw value 또는 비허용 key 포함."""


def _ensure_no_secret_leak(record: dict) -> None:
    """audit record 에 raw token / authorization header value 가 절대 포함되지 않도록 검증.

    회장 §6 명시: ``token_present`` / ``token_hash_prefix`` / ``token_value_logged: false`` 외 token 관련
    필드 금지.
    """
    extra = set(record.keys()) - ALLOWED_AUDIT_KEYS
    if extra:
        raise AuditRedactionError(f"disallowed audit keys: {sorted(extra)}")
    if record.get("token_value_logged") is not False:
        raise AuditRedactionError("token_value_logged must be False")
    serialised = json.dumps(record, ensure_ascii=False)
    sentinels = ("Bearer ", "ghp_", "github_pat_", "ghu_", "ghs_", "ghr_")
    for s in sentinels:
        if s in serialised:
            raise AuditRedactionError(f"audit record contains token sentinel {s!r}")


def token_hash_prefix(token: str, *, length: int = 8) -> str:
    """token value → SHA256 16진수 앞 ``length`` 자리 prefix."""
    if not isinstance(token, str) or not token:
        raise ValueError("token must be a non-empty string")
    digest = hashlib.sha256(token.encode("utf-8")).hexdigest()
    return digest[:length]


def _now_iso() -> str:
    return datetime.now(timezone.utc).isoformat(timespec="seconds")


def _normalise_head(head: str) -> str:
    """head SHA 정규화 — 40-char hex 정확 일치 검증."""
    if not isinstance(head, str) or len(head) != 40:
        raise ValueError("head must be 40-char hex SHA string")
    lowered = head.lower()
    if any(c not in "0123456789abcdef" for c in lowered):
        raise ValueError("head must be 40-char hex SHA string")
    return lowered


class OwnerTriggerAudit:
    """append-only audit JSONL writer + dedupe gate + bounded scan.

    atomic: 매 write 직전 ``fcntl.flock(LOCK_EX)`` 로 advisory lock 획득.
    open mode: 오직 ``"a"`` (append) 또는 ``"rb"`` (read for tail scan).

    transaction (task-2554+1 race condition fix):
      ``transaction()`` context manager 는 별도 sidecar lock 파일에 ``LOCK_EX`` 를 잡고
      caller 가 check_dedupe + record(PENDING) + http_post + record(POSTED|FAILED) 전체를
      단일 lock 안에서 실행한다.

    task-2554+2 §2: bounded reverse scan
      dedupe 조회는 tail-read 로 마지막 ``DEDUPE_SCAN_MAX_ROWS`` 행만 검사. audit 파일이
      거대해져도 dedupe 비용 일정 (O(N) full scan 제거).
    """

    def __init__(self, workspace_root: str | Path) -> None:
        root = Path(workspace_root).resolve()
        self._workspace_root = root
        self._path = root / AUDIT_REL_PATH

    @property
    def path(self) -> Path:
        return self._path

    @property
    def lock_path(self) -> Path:
        """transaction sidecar lock 파일 경로 (audit JSONL append 와 분리)."""
        return Path(str(self._path) + ".lock")

    def _ensure_parent(self) -> None:
        self._path.parent.mkdir(parents=True, exist_ok=True)

    def _iter_rows(self) -> Iterator[dict]:
        """audit JSONL 스트림 (forward order). 손상된 라인은 관용(skip).

        후방 호환 메서드 — 신규 dedupe 코드는 ``_iter_rows_reverse`` 사용.
        """
        if not self._path.exists():
            return
        with open(self._path, "r", encoding="utf-8") as fh:
            for line in fh:
                line = line.strip()
                if not line:
                    continue
                try:
                    yield json.loads(line)
                except json.JSONDecodeError:
                    continue

    def _iter_rows_reverse(
        self,
        *,
        max_rows: int = DEDUPE_SCAN_MAX_ROWS,
    ) -> Iterator[dict]:
        """audit JSONL 의 마지막 ``max_rows`` 행을 역순으로 yield (task-2554+2 §2).

        구현:
          1. 파일 끝에서부터 ``_TAIL_CHUNK_SIZE`` 바이트 단위로 역방향 청크 읽기.
          2. 줄 단위로 split → 마지막 ``max_rows`` 행 확보될 때까지 반복.
          3. 손상 라인은 ``json.JSONDecodeError`` 관용 (skip).

        보장:
          - audit 파일 크기에 무관하게 dedupe 비용 일정 (O(max_rows)).
          - 회장 §2 bounded/reverse scan 직접 일치.
          - tail 가 ``max_rows`` 보다 적으면 가능한 만큼만 yield (file 전체가 작을 때).
        """
        if not self._path.exists() or max_rows <= 0:
            return
        try:
            with open(self._path, "rb") as fh:
                fh.seek(0, os.SEEK_END)
                file_size = fh.tell()
                if file_size == 0:
                    return

                # leftover: 청크 경계에서 잘린 부분 라인을 다음 read 에서 prepend 하기 위한 buffer.
                leftover = b""
                lines_collected: list[bytes] = []
                position = file_size

                while position > 0 and len(lines_collected) < max_rows:
                    read_size = min(_TAIL_CHUNK_SIZE, position)
                    position -= read_size
                    fh.seek(position)
                    chunk = fh.read(read_size)
                    # chunk 끝부분 + 이전 leftover (= 청크 경계에서 next chunk 시작 fragment)
                    buf = chunk + leftover
                    # 마지막 줄이 newline 없이 끝나면 그것은 partial 가능 — 단순화: split 후 첫 token 을
                    # leftover 로 보존 (position > 0 인 경우만; position == 0 이면 그 자체가 첫 줄).
                    parts = buf.split(b"\n")
                    if position > 0:
                        # 첫 part 는 다음 청크의 끝부분과 결합되어야 함
                        leftover = parts[0]
                        parts = parts[1:]
                    else:
                        leftover = b""
                    # parts 는 forward order (오래된 → 최신). 역순으로 lines_collected 에 추가.
                    for line_bytes in reversed(parts):
                        if not line_bytes.strip():
                            continue
                        lines_collected.append(line_bytes)
                        if len(lines_collected) >= max_rows:
                            break

                # 위에서 lines_collected 는 already in reverse order (newest first).
                for line_bytes in lines_collected:
                    try:
                        text = line_bytes.decode("utf-8")
                    except UnicodeDecodeError:
                        continue
                    text = text.strip()
                    if not text:
                        continue
                    try:
                        yield json.loads(text)
                    except json.JSONDecodeError:
                        continue
        except FileNotFoundError:
            return

    def _read_all(self) -> list[dict]:
        """후방 호환: 전체 list 로딩. 신규 코드는 ``_iter_rows_reverse`` 사용 권장."""
        return list(self._iter_rows())

    def _has_posted(self, *, pr: int, head: str) -> bool:
        """POSTED dedupe 검사 — bounded reverse scan (task-2554+2 §2).

        audit JSONL 의 마지막 ``DEDUPE_SCAN_MAX_ROWS`` 행만 검사. 동일 (pr, head) trigger
        시도가 그 안에 없으면 fresh 로 간주 (회장 §2 bounded).
        """
        head_norm = _normalise_head(head)
        for row in self._iter_rows_reverse():
            if (
                row.get("pr") == pr
                and isinstance(row.get("head"), str)
                and row["head"].lower() == head_norm
                and row.get("action") == ALLOWED_ACTION
                and row.get("result") == RESULT_POSTED
            ):
                return True
        return False

    def _has_active_trigger(self, *, pr: int, head: str) -> bool:
        """가장 최근 (pr, head) outcome 이 POSTED 또는 PENDING 이면 True (active).

        의미:
          - POSTED: 이미 성공 — 재시도 차단.
          - PENDING (그 뒤 FAILED 없음): http_post 진행 중 또는 process crash —
            fail-closed (다음 runner 가 DEDUPED 판정).
          - FAILED (가장 최근): 명시적 실패 — 재시도 허용 (transient failure).
          - DEDUPED: outcome 자체가 아님 (skip, 이전 outcome 으로 판정).

        bounded reverse scan (task-2554+2 §2): 마지막 ``DEDUPE_SCAN_MAX_ROWS`` 행을
        역순으로 보고 가장 최근 POSTED/PENDING/FAILED 가 무엇인지 결정.
        """
        head_norm = _normalise_head(head)
        outcome_results = {RESULT_POSTED, RESULT_PENDING, RESULT_FAILED}
        for row in self._iter_rows_reverse():
            if (
                row.get("pr") == pr
                and isinstance(row.get("head"), str)
                and row["head"].lower() == head_norm
                and row.get("action") == ALLOWED_ACTION
            ):
                result = row.get("result")
                if result in outcome_results:
                    # 가장 최근 outcome (POSTED|PENDING|FAILED) 발견.
                    return result in (RESULT_POSTED, RESULT_PENDING)
                # DEDUPED 등은 outcome 이 아님 — 계속 역방향 탐색.
        return False

    def check_dedupe(self, *, pr: int, head: str) -> None:
        """4 조건 일치 시 ``DedupeViolation`` raise. fail-closed.

        후방 호환: POSTED 만 검사. transaction 안의 atomic dedupe 는
        ``_AtomicTriggerTransaction.check_dedupe`` 가 POSTED + PENDING 모두 검사.
        """
        if self._has_posted(pr=pr, head=head):
            raise DedupeViolation(
                f"duplicate POST_GEMINI_REVIEW_TRIGGER_COMMENT for pr={pr} head={head[:8]}..."
            )

    def append(self, record: dict) -> None:
        """atomic append. record key 화이트리스트 + token leak guard 적용 후 기록.

        record 내부에 ``ts`` / ``schema`` 미지정 시 자동 채움.

        atomic: 내부 ``fcntl.flock(LOCK_EX)`` 를 audit 본 파일에 잡고 dedupe 재검 후 기록.
        transaction context 안에서는 별도 sidecar lock 이 보호하므로 본 메서드의 내부 flock
        은 보조 안전망 (deadlock 안남 — sidecar lock 과 본 파일 lock 은 분리).
        """
        rec = dict(record)
        rec.setdefault("schema", AUDIT_SCHEMA)
        rec.setdefault("ts", _now_iso())
        rec.setdefault("token_value_logged", False)
        _ensure_no_secret_leak(rec)
        if "head" in rec and isinstance(rec["head"], str) and len(rec["head"]) == 40:
            rec["head"] = rec["head"].lower()

        self._ensure_parent()
        with open(self._path, "a", encoding="utf-8") as fh:
            fcntl.flock(fh.fileno(), fcntl.LOCK_EX)
            try:
                if rec.get("action") == ALLOWED_ACTION and rec.get("result") == RESULT_POSTED:
                    pr_val = rec.get("pr")
                    head_val = rec.get("head")
                    if isinstance(pr_val, int) and isinstance(head_val, str):
                        head_norm = _normalise_head(head_val)
                        # bounded reverse scan (task-2554+2 §2)
                        for row in self._iter_rows_reverse():
                            if (
                                row.get("pr") == pr_val
                                and isinstance(row.get("head"), str)
                                and row["head"].lower() == head_norm
                                and row.get("action") == ALLOWED_ACTION
                                and row.get("result") == RESULT_POSTED
                            ):
                                raise DedupeViolation(
                                    f"atomic dedupe blocked pr={pr_val} head={head_val[:8]}..."
                                )
                fh.write(json.dumps(rec, ensure_ascii=False, sort_keys=True) + "\n")
                fh.flush()
                os.fsync(fh.fileno())
            finally:
                fcntl.flock(fh.fileno(), fcntl.LOCK_UN)

    # ─── task-2554+1: HIGH race condition fix ────────────────────────────────

    @contextmanager
    def transaction(self) -> Iterator["_AtomicTriggerTransaction"]:
        """trigger 전체 lifecycle 을 단일 lock 안에서 직렬화.

        sidecar lock 파일 (``<audit_path>.lock``) 에 ``LOCK_EX`` 를 획득해 본 with 블록을
        보호한다. 본 블록 안에서 caller 는 check_dedupe → record(PENDING) → http_post →
        record(POSTED|FAILED) 순서대로 호출.

        audit JSONL 본 파일에 대한 write 자체는 여전히 ``open(self._path, "a")`` 만 사용.
        """
        self._ensure_parent()
        lock_path = self.lock_path
        with open(lock_path, "a", encoding="utf-8") as lock_fh:
            fcntl.flock(lock_fh.fileno(), fcntl.LOCK_EX)
            try:
                yield _AtomicTriggerTransaction(self)
            finally:
                fcntl.flock(lock_fh.fileno(), fcntl.LOCK_UN)


class _AtomicTriggerTransaction:
    """transaction 컨텍스트 내부 helper. sidecar flock 외부에서 잡힌 상태로 호출됨.

    제공 메서드:
      - ``check_dedupe(pr, head)``: POSTED + PENDING 모두 검사 (race + crash fail-closed).
      - ``record(record_dict)``: audit 본 파일에 한 줄 append (sidecar lock + 본 파일 flock).
    """

    def __init__(self, audit: "OwnerTriggerAudit") -> None:
        self._audit = audit

    def check_dedupe(self, *, pr: int, head: str) -> None:
        """POSTED OR PENDING 일치 시 ``DedupeViolation``.

        - POSTED: 기존 trigger 가 성공 완료된 (pr, head).
        - PENDING: 다른 transaction 이 http_post 직전 sentinel 을 남겼지만 미완료
          (process crash 가능성). 어느 쪽이든 재시도 차단 — fail-closed.

        bounded reverse scan (task-2554+2 §2): 마지막 ``DEDUPE_SCAN_MAX_ROWS`` 행만 검사.
        """
        if self._audit._has_active_trigger(pr=pr, head=head):
            raise DedupeViolation(
                f"active trigger (POSTED|PENDING) blocks duplicate pr={pr} head={head[:8]}..."
            )

    def record(self, record: dict) -> None:
        """audit 본 파일에 한 줄 append. sidecar lock + 본 파일 flock 2 단 보호.

        2 단 lock 이유: legacy/test 가 ``OwnerTriggerAudit.append`` 를 sidecar lock 없이
        직접 호출하는 경로도 직렬화 보장.

        POSTED record 는 lock 안에서 한번 더 dedupe re-check (defense in depth).
        """
        rec = dict(record)
        rec.setdefault("schema", AUDIT_SCHEMA)
        rec.setdefault("ts", _now_iso())
        rec.setdefault("token_value_logged", False)
        _ensure_no_secret_leak(rec)
        if "head" in rec and isinstance(rec["head"], str) and len(rec["head"]) == 40:
            rec["head"] = _normalise_head(rec["head"])
        self._audit._ensure_parent()
        with open(self._audit._path, "a", encoding="utf-8") as fh:
            fcntl.flock(fh.fileno(), fcntl.LOCK_EX)
            try:
                if rec.get("action") == ALLOWED_ACTION and rec.get("result") == RESULT_POSTED:
                    pr_val = rec.get("pr")
                    head_val = rec.get("head")
                    if isinstance(pr_val, int) and isinstance(head_val, str):
                        head_norm = _normalise_head(head_val)
                        # bounded reverse scan (task-2554+2 §2)
                        for row in self._audit._iter_rows_reverse():
                            if (
                                row.get("pr") == pr_val
                                and isinstance(row.get("head"), str)
                                and row["head"].lower() == head_norm
                                and row.get("action") == ALLOWED_ACTION
                                and row.get("result") == RESULT_POSTED
                            ):
                                raise DedupeViolation(
                                    f"transaction atomic dedupe blocked pr={pr_val} head={head_val[:8]}..."
                                )
                fh.write(json.dumps(rec, ensure_ascii=False, sort_keys=True) + "\n")
                fh.flush()
                os.fsync(fh.fileno())
            finally:
                fcntl.flock(fh.fileno(), fcntl.LOCK_UN)
