#!/usr/bin/env python3
"""auto_merge_lock.py — FileLock helper for the auto-merge controller.

Goal: serialize concurrent runs of the auto-merge controller so the bot only
ever processes one PR at a time. Two layers exist:

1. Process-local advisory lock via ``fcntl.flock`` on a file under
   ``memory/cache/auto_merge_controller.lock``.
2. GitHub Actions ``concurrency`` group (defined in the workflow file).

The lock is intentionally narrow: it protects the merge "critical section"
inside ``auto_merge_controller.process_open_prs`` and is released on context
exit (success, exception, or timeout).
"""

from __future__ import annotations

import fcntl
import os
import time
from contextlib import contextmanager
from pathlib import Path
from typing import Iterator


class LockTimeout(RuntimeError):
    """Raised when the lock cannot be acquired within the timeout."""


class FileLock:
    """Cross-process advisory lock backed by ``fcntl.flock``.

    Usage::

        with FileLock(Path("memory/cache/auto_merge_controller.lock"), timeout=10):
            critical_section()
    """

    def __init__(self, path: Path, timeout: float = 10.0, poll_interval: float = 0.2):
        self.path = Path(path)
        self.timeout = float(timeout)
        self.poll_interval = float(poll_interval)
        self._fd: int | None = None

    def acquire(self) -> None:
        self.path.parent.mkdir(parents=True, exist_ok=True)
        self._fd = os.open(str(self.path), os.O_RDWR | os.O_CREAT, 0o644)
        deadline = time.monotonic() + self.timeout
        while True:
            try:
                fcntl.flock(self._fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
                os.ftruncate(self._fd, 0)
                payload = f"{os.getpid()}\n{time.time()}\n".encode("utf-8")
                os.write(self._fd, payload)
                return
            except BlockingIOError:
                if time.monotonic() >= deadline:
                    os.close(self._fd)
                    self._fd = None
                    raise LockTimeout(
                        f"could not acquire {self.path} within {self.timeout}s"
                    )
                time.sleep(self.poll_interval)

    def release(self) -> None:
        if self._fd is None:
            return
        try:
            fcntl.flock(self._fd, fcntl.LOCK_UN)
        finally:
            os.close(self._fd)
            self._fd = None

    def __enter__(self) -> "FileLock":
        self.acquire()
        return self

    def __exit__(self, exc_type, exc_val, exc_tb) -> None:
        self.release()


@contextmanager
def lock_path(path: Path, timeout: float = 10.0) -> Iterator[FileLock]:
    """Convenience context manager: ``with lock_path(p): ...``."""
    lock = FileLock(path, timeout=timeout)
    lock.acquire()
    try:
        yield lock
    finally:
        lock.release()


__all__ = ["FileLock", "LockTimeout", "lock_path"]
