#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""scripts/ci_watch_handoff_runner.py — CI_WATCH_HANDOFF progress watcher runner.

task-2729 Phase 1 — progress watcher dispatch gate (watcher 골격).

회장 목표:
  dispatch 직후 등록되는 progress watcher 의 러너 골격.
  6 상태(head change · non-force push · CI · finish-task .done · normal callback ·
  fallback prune) 추적 + review-settle quiet-window 골격 + terminal 도달 시
  ANU normal callback 발사를 담당한다.

  ★ ACTIVE=false (Phase 1):
    본 러너는 --dry-run/--once 모드에서 네트워크 호출(gh 폴링) 없이 import-safe 하게
    동작하는 골격이다. 실제 live gh polling 루프는 골격만 두고 미실행한다
    (watcher 위임 골격). production ACTIVE 전환은 별도 회장 승인 대상.

  ★ fallback 은 progress trigger 아님:
    fallback_prune 추적 상태는 dead-man safety-net 의 관측 항목일 뿐이며
    progress trigger 가 아니다. progress 판단은 watcher 의 6 상태/terminal classify 가
    담당한다.

기존 classifier(utils.pr_watcher_terminal_state_classifier) 를 재사용하며
raw ANU 키 문자열을 절대 작성하지 않는다 (anu_key=None → registrar 가 env→default 해결).
"""
import argparse
import sys
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Dict, Optional, Tuple

# repo root 를 sys.path 에 추가 (standalone 실행 시 utils/dispatch import 보장).
# `python3 scripts/ci_watch_handoff_runner.py` 처럼 직접 실행하면 sys.path[0] 이
# scripts/ 가 되어 repo-root 패키지를 찾지 못하므로 parent(=repo root)를 주입한다.
_REPO_ROOT = str(Path(__file__).resolve().parent.parent)
if _REPO_ROOT not in sys.path:
    sys.path.insert(0, _REPO_ROOT)

from utils.pr_watcher_terminal_state_classifier import (
    PRSnapshot,
    classify,
    TERMINAL_STATES,
    build_callback_envelope,
    register_terminal_callback,
)
from dispatch.progress_watcher_gate import (
    WATCHER_TRACKED_STATES,
    watcher_terminal_callback_status,
    all_states_tracked,
)

# ---------------------------------------------------------------------------
# 상수
# ---------------------------------------------------------------------------
REVIEW_SETTLE_QUIET_WINDOW_SEC = 120  # review-settle quiet-window 골격
DEFAULT_POLL_INTERVAL_SEC = 30
DEFAULT_MAX_WATCH_SEC = 3600


@dataclass
class WatcherState:
    """watcher 6 상태 추적 + poll/elapsed 메타."""

    head_change: bool = False
    non_force_push: bool = False
    ci: bool = False
    finish_done: bool = False
    normal_callback: bool = False
    fallback_prune: bool = False
    polls: int = 0
    elapsed_sec: int = 0

    def as_tracked_dict(self) -> Dict[str, bool]:
        """WATCHER_TRACKED_STATES 키 → bool 매핑."""
        return {
            "head_change": self.head_change,
            "non_force_push": self.non_force_push,
            "ci": self.ci,
            "finish_done": self.finish_done,
            "normal_callback": self.normal_callback,
            "fallback_prune": self.fallback_prune,
        }

    def all_tracked(self) -> bool:
        """6 상태가 모두 True 인지 (all_states_tracked 사용)."""
        return all_states_tracked(self.as_tracked_dict())


def update_state(state: WatcherState, **flags: bool) -> WatcherState:
    """6 상태 갱신 (WATCHER_TRACKED_STATES 키만 허용)."""
    for key, value in flags.items():
        if key in WATCHER_TRACKED_STATES:
            setattr(state, key, bool(value))
    return state


def mark_head_change(state: WatcherState) -> WatcherState:
    state.head_change = True
    return state


def mark_non_force_push(state: WatcherState) -> WatcherState:
    state.non_force_push = True
    return state


def mark_ci(state: WatcherState) -> WatcherState:
    state.ci = True
    return state


def mark_finish_done(state: WatcherState) -> WatcherState:
    state.finish_done = True
    return state


def mark_normal_callback(state: WatcherState) -> WatcherState:
    state.normal_callback = True
    return state


def mark_fallback_prune(state: WatcherState) -> WatcherState:
    state.fallback_prune = True
    return state


def is_quiet_window_settled(
    *,
    last_event_sec: int,
    now_sec: int,
    quiet_window_sec: int = REVIEW_SETTLE_QUIET_WINDOW_SEC,
) -> bool:
    """review-settle quiet-window 판정 골격.

    마지막 이벤트 이후 quiet_window_sec 이상 무이벤트면 settled.
    """
    return (now_sec - last_event_sec) >= quiet_window_sec


def fire_terminal_callback(
    *,
    task_id: str,
    pr_number: int,
    terminal_state: str,
    reason: str,
    state: WatcherState,
    last_snapshot: Optional[Dict[str, Any]] = None,
    anu_key: Optional[str] = None,
    chat_id: Optional[int] = None,
    runner: Optional[Any] = None,
) -> Dict[str, Any]:
    """terminal 도달 시 ANU normal callback 발사 (classifier 재사용).

    - build_callback_envelope 로 envelope 생성 → register_terminal_callback 호출.
    - ★ anu_key=None 이면 registrar 가 env→ANU_KEY_DEFAULT 로 해결 (self-key 차단은
      그쪽 책임). raw 키 문자열 절대 작성 금지.
    - result.fired=False 면 callback_status == WATCHER_TERMINAL_CALLBACK_NOT_WIRED.
    """
    envelope = build_callback_envelope(
        task_id=task_id,
        pr_number=pr_number,
        terminal_state=terminal_state,
        reason=reason,
        polls_completed=state.polls,
        elapsed_sec=state.elapsed_sec,
        last_snapshot=last_snapshot,
    )
    result = register_terminal_callback(
        envelope=envelope,
        anu_key=anu_key,
        chat_id=chat_id,
        runner=runner,
    )
    if result.fired:
        state.normal_callback = True
    return {
        "fired": result.fired,
        "terminal_state": terminal_state,
        "callback_status": watcher_terminal_callback_status(
            terminal_reached=True,
            callback_fired=result.fired,
        ),
    }


def run_once(
    snap: PRSnapshot,
    *,
    expected_head: str,
    elapsed_watcher_sec: int,
    state: WatcherState,
) -> Tuple[str, str]:
    """단일 classify 평가 → (terminal_state_or_empty, reason).

    terminal 이면 반환 state 는 TERMINAL_STATES 에 포함된다.
    """
    state.polls += 1
    state.elapsed_sec = elapsed_watcher_sec
    terminal_state, reason = classify(
        snap,
        elapsed_watcher_sec=elapsed_watcher_sec,
        expected_head=expected_head,
    )
    if terminal_state and terminal_state in TERMINAL_STATES:
        return terminal_state, reason
    return "", reason


# ---------------------------------------------------------------------------
# task-2729 Phase 2 — 결함6·5·4 결선 함수 (기존 함수/동작 무손상, 추가만)
# ---------------------------------------------------------------------------


def maybe_fire_owner_gemini_on_new_head(
    *,
    task_id: str,
    pr_number: int,
    owner: str,
    repo: str,
    current_head_sha: str,
    head_changed: bool,
    state: WatcherState,
    trigger: Optional[Any] = None,
    already_fired_heads: Optional[Any] = None,
    audit_path: Optional[Any] = None,
) -> Dict[str, Any]:
    """결함6 결선: head 변경 시 OWNER /gemini review 자동 발사.

    ★ ACTIVE=false 골격: 실 gh 네트워크 호출 없이 구조·결선만 제공.
    ★ review-settle: 발사 후 state는 변경하지 않음 (progress trigger 아님).
      /gemini review 발사는 리뷰 요청일 뿐이며 watcher progress 판단에 개입하지 않는다.
    ★ head_changed가 False이면 발사하지 않고 NO_NEW_HEAD 반환.
    """
    if not head_changed:
        return {"fired": False, "status": "NO_NEW_HEAD"}

    # head_changed=True: request-only 자동 발사 시도 (ACTIVE=false 골격)
    try:
        import utils.owner_gemini_trigger as _ogt
        result = _ogt.fire_owner_gemini_review(
            pr_number=pr_number,
            current_head_sha=current_head_sha,
            owner=owner,
            repo=repo,
            task_id=task_id,
            observed_comment=None,
            audit_path=audit_path,
            router=trigger,  # trigger 주입 가능 (None이면 ACTIVE=false 골격)
            request_only=True,
            token_provider=None,
            already_fired_heads=already_fired_heads,
        )
    except Exception as exc:  # noqa: BLE001
        result = {
            "fired": False,
            "status": f"FIRE_ERROR: {type(exc).__name__}: {exc}",
            "body": "/gemini review",
            "pr_number": pr_number,
            "head_sha": current_head_sha,
            "dedupe": False,
            "request_only": True,
            "token_hash_prefix": None,
            "active": False,
        }
    # review-settle: state 변경 없음 (주석 명시)
    # state.head_change 등 watcher state 미변경 — /gemini review는 progress trigger 아님.
    return result


def base_sync_before_merge(
    *,
    pr_number: int,
    merge_state_status: str,
    pr_workdir: str,
    runner: Optional[Any],
    merge_approved: bool,
    sync_fn: Optional[Any] = None,
    fetch_state_fn: Optional[Any] = None,
    max_diff_lines: Optional[int] = None,
    observed_diff_lines: Optional[int] = None,
) -> Dict[str, Any]:
    """결함5: merge 전 base sync 파이프라인.

    - merge_approved False → NOT_APPROVED.
    - merge_state_status가 admin·force 필요 신호(DIRTY/BLOCKED) → ABORT_ADMIN_OR_FORCE_REQUIRED.
      절대 admin override/force 호출 금지.
    - UP_TO_DATE면 sync 불필요 → UP_TO_DATE 반환.
    - BEHIND면 sync_fn(기본=merge_queue_executor.sync_pr_base) 호출.
      conflict → ABORT_CONFLICT.
      diff 초과 → ABORT_DIFF_OVER_LIMIT.
      성공 시 fetch_state_fn으로 재검증 → BEHIND면 ABORT_STILL_BEHIND, 아니면 BASE_SYNCED_REVALIDATED.
    - sync_fn/fetch_state_fn 미주입 시 merge_queue_executor 지연 import(테스트 mock 우선).
    """
    # (1) merge 승인 여부
    if not merge_approved:
        return {"merge_ok": False, "status": "NOT_APPROVED"}

    # (2) admin/force 필요 신호 — 절대 admin override/force 호출 금지
    if merge_state_status in ("DIRTY", "BLOCKED"):
        return {"merge_ok": False, "status": "ABORT_ADMIN_OR_FORCE_REQUIRED"}

    # (3) UP_TO_DATE: sync 불필요
    if merge_state_status != "BEHIND":
        return {"merge_ok": True, "status": "UP_TO_DATE", "base_synced": False}

    # (4) BEHIND: sync_pr_base 호출 (update-branch, admin override/rebase/force 아님)
    if sync_fn is None:
        try:
            import utils.merge_queue_executor as _mqe  # 지연 import (테스트 mock 우선)
        except ImportError:
            return {"merge_ok": False, "status": "ABORT_SYNC_FN_UNAVAILABLE"}
        # 실제 sync_pr_base 시그니처: (_pr_branch, pr_workdir, runner, merge_state_status)
        # _pr_branch 는 미사용 → None. update-branch only, admin/force 아님.
        def sync_fn(_pn, _rn, _mqe=_mqe, _wd=pr_workdir, _ms=merge_state_status):
            return _mqe.sync_pr_base(None, _wd, _rn, _ms)

    try:
        sync_result = sync_fn(pr_number, runner)
    except Exception as exc:  # noqa: BLE001
        return {"merge_ok": False, "status": f"ABORT_SYNC_ERROR: {type(exc).__name__}: {exc}"}

    if sync_result.get("conflict"):
        return {"merge_ok": False, "status": "ABORT_CONFLICT", "base_synced": True}

    # (5) diff 초과 검사
    if (
        max_diff_lines is not None
        and observed_diff_lines is not None
        and observed_diff_lines > max_diff_lines
    ):
        return {"merge_ok": False, "status": "ABORT_DIFF_OVER_LIMIT"}

    # (6) 재검증: fetch_merge_state
    if fetch_state_fn is None:
        try:
            import utils.merge_queue_executor as _mqe  # 지연 import (테스트 mock 우선)
            fetch_state_fn = _mqe.fetch_merge_state
        except ImportError:
            return {"merge_ok": False, "status": "ABORT_FETCH_FN_UNAVAILABLE"}

    try:
        revalidated = fetch_state_fn(pr_number, runner)
    except Exception as exc:  # noqa: BLE001
        return {"merge_ok": False, "status": f"ABORT_REVALIDATE_ERROR: {type(exc).__name__}: {exc}"}

    revalidated_status = revalidated.get("mergeStateStatus", "UNKNOWN")
    if revalidated_status == "BEHIND":
        return {"merge_ok": False, "status": "ABORT_STILL_BEHIND"}

    return {"merge_ok": True, "status": "BASE_SYNCED_REVALIDATED", "base_synced": True}


def authoritative_completion_marker(
    *,
    task_id: str,
    external_dirty: bool,
    merge_base_clean: bool,
    ci_passed: bool,
    local_fix_verified: bool,
    marker_writer: Optional[Any] = None,
) -> Dict[str, Any]:
    """결함4: external dirty block 존재 시 scope evidence 3개로 authoritative completion 인정.

    ★ .done 파일 직접 생성 코드 절대 작성 금지.
      marker_writer 주입 시 marker_writer(marker dict)를 호출하는 것뿐이며,
      본 함수는 .done 파일을 직접 생성하지 않는다.
    ★ manual_done_forgery=False 항상 유지 (.done 위조 0).

    - external_dirty False → NO_EXTERNAL_DIRTY_BLOCK (authoritative 인정 전제 없음).
    - external_dirty True + scope evidence 3개(merge_base_clean AND ci_passed AND local_fix_verified) 모두 True
      → AUTHORITATIVE_COMPLETION_BY_SCOPE_EVIDENCE, marker_writer 주입 시 호출.
    - evidence 하나라도 빠지면 → EXTERNAL_DIRTY_BLOCK_UNRESOLVED.
    """
    if not external_dirty:
        return {
            "authoritative_completion": False,
            "status": "NO_EXTERNAL_DIRTY_BLOCK",
            "manual_done_forgery": False,
        }

    # scope evidence 3개 검사
    if merge_base_clean and ci_passed and local_fix_verified:
        marker: Dict[str, Any] = {
            "authoritative_completion": True,
            "status": "AUTHORITATIVE_COMPLETION_BY_SCOPE_EVIDENCE",
            "evidence": {
                "merge_base_clean": True,
                "ci": True,
                "local_fix_verified": True,
            },
            "manual_done_forgery": False,
            "task_id": task_id,
        }
        if marker_writer is not None:
            try:
                marker_writer(marker)
            except Exception as exc:  # noqa: BLE001
                marker["marker_writer_error"] = f"{type(exc).__name__}: {exc}"
        return marker

    return {
        "authoritative_completion": False,
        "status": "EXTERNAL_DIRTY_BLOCK_UNRESOLVED",
        "manual_done_forgery": False,
    }


def main(argv: Optional[list] = None) -> int:
    """argparse 엔트리.

    --dry-run/--once 모드는 실제 gh 폴링 없이 import-safe 하게 동작(네트워크 호출 없이
    0 반환). 실제 live 폴링 루프는 골격만 둔다.
    """
    parser = argparse.ArgumentParser(
        prog="ci_watch_handoff_runner",
        description="task-2729 Phase 1 CI_WATCH_HANDOFF progress watcher runner (ACTIVE=false)",
    )
    parser.add_argument("--task-id", default="", help="task id (e.g. 2729)")
    parser.add_argument("--pr", type=int, default=0, help="PR number")
    parser.add_argument("--expected-head", default="", help="expected head oid")
    parser.add_argument("--once", action="store_true", help="단일 평가 후 종료 (폴링 없음)")
    parser.add_argument("--dry-run", action="store_true", help="네트워크 호출 없이 import-safe dry-run")

    args = parser.parse_args(argv)

    state = WatcherState()

    if args.dry_run or args.once:
        # ACTIVE=false: live gh polling 미실행, watcher 위임 골격.
        # dry-run/once 모드는 네트워크 호출 없이 상태 초기화만 하고 0 반환.
        return 0

    # ACTIVE=false: live gh polling 미실행, watcher 위임 골격.
    #   실제 운영 결선 시 아래에 poll loop(gh pr view/api → PRSnapshot.from_gh →
    #   run_once → terminal 시 fire_terminal_callback) 를 활성화한다.
    #   Phase 1 은 record-only 이므로 live 루프를 실행하지 않는다.
    return 0


if __name__ == "__main__":
    sys.exit(main())
