#!/usr/bin/env python3
"""replacement_pr_dry_run_activation_guard — P1 fifth-truncate halt guard (task-2590).

회장 verbatim 결정 (memory/tasks/task-2590.md §0 A~G) 박제:
- 구현 위치: (b) 봇 dispatch wrapper inline + (d) activation marker 발행 직전 preflight 혼합 — 외부 wrapper로 구현.
- 금지: inotify daemon / systemd / timer / long-running watcher / replacement_pr_runner 본문 수정.
- halt action = HOLD_FOR_CHAIR escalation marker + Telegram alert + automatic abort.
- 5번째 truncate 감지 시 activation marker 발행 금지, replacement_pr_runner 실행 금지, non-zero exit (code 87).

본 guard는 단일 패스 (preflight) 동작 — daemon/watch loop 아님.

CLI:
    python3 scripts/replacement_pr_dry_run_activation_guard.py preflight \
        --task-id task-2586 \
        --runner-path utils/replacement_pr_runner.py \
        --test-path tests/regression/test_replacement_pr_runner_2510.py \
        --runner-baseline-sha256 <sha256> \
        --test-baseline-sha256 <sha256> \
        --runner-baseline-size 33557 \
        --test-baseline-size 24403 \
        --evidence-dir memory/events

Exit codes:
- 0  : PASS — baselines match. activation 후속 호출 허용.
- 87 : HALT — fifth-truncate (또는 sha256/size mismatch) 감지. activation 호출 금지.
- 2  : USAGE — CLI 인자/환경 오류.

Env vars:
- WORKSPACE                                   — escalation_marker.py가 events dir 결정에 사용.
- FIFTH_TRUNCATE_GUARD_TELEGRAM_MODE          — production | mock | disabled (default: disabled).
- FIFTH_TRUNCATE_GUARD_TELEGRAM_MOCK_PATH     — mock 모드에서 alert payload 기록할 JSON 경로.
- FIFTH_TRUNCATE_GUARD_ESCALATION_MARKER_PATH — escalation_marker.py 경로 override (test 용).
"""
from __future__ import annotations

import argparse
import hashlib
import json
import os
import subprocess
import sys
from datetime import datetime, timezone
from pathlib import Path
from typing import Any

HALT_EXIT_CODE = 87
USAGE_EXIT_CODE = 2

DEFAULT_ESCALATION_MARKER_TASK_ID = "task-2586"
GUARD_SOURCE_LABEL = "fifth_truncate_halt_guard"
BLOCKING_CONDITION_LABEL = "fifth_truncate_recurrence"


def _now_iso() -> str:
    return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")


def _sha256_file(path: Path) -> str:
    h = hashlib.sha256()
    with path.open("rb") as fp:
        for chunk in iter(lambda: fp.read(8192), b""):
            h.update(chunk)
    return h.hexdigest()


def check_file_state(
    path: Path,
    *,
    baseline_sha256: str,
    baseline_size: int,
) -> dict[str, Any]:
    """대상 파일의 truncate / size mismatch / sha256 mismatch 판정.

    Returns:
        {
            "label": str,
            "path": str,
            "exists": bool,
            "size": int | None,
            "sha256": str | None,
            "baseline_size": int,
            "baseline_sha256": str,
            "ok": bool,
            "reason": str,           # "ok" | "missing" | "truncated_zero" | "size_mismatch" | "sha256_mismatch"
        }
    """
    info: dict[str, Any] = {
        "path": str(path),
        "exists": False,
        "size": None,
        "sha256": None,
        "baseline_size": baseline_size,
        "baseline_sha256": baseline_sha256,
        "ok": False,
        "reason": "missing",
    }

    if not path.exists() or not path.is_file():
        return info

    info["exists"] = True
    try:
        size = path.stat().st_size
    except OSError as exc:
        info["reason"] = f"stat_failure: {exc}"
        return info

    info["size"] = size

    if size == 0:
        info["reason"] = "truncated_zero"
        return info

    try:
        sha256 = _sha256_file(path)
    except OSError as exc:
        info["reason"] = f"read_failure: {exc}"
        return info

    info["sha256"] = sha256

    if size != baseline_size:
        info["reason"] = "size_mismatch"
        return info

    if sha256 != baseline_sha256:
        info["reason"] = "sha256_mismatch"
        return info

    info["ok"] = True
    info["reason"] = "ok"
    return info


def send_telegram_alert(message: str) -> dict[str, Any]:
    """Telegram alert 호출 — production / mock / disabled.

    회장 #3 verbatim — 실제 발송 금지 (test). 본 함수는 mode env에 따라 mock 기록 또는 skip.
    Production 모드는 실제 호출 가능하나 본 task scope 내 test에서는 mock으로만 활성화.
    """
    mode = os.environ.get("FIFTH_TRUNCATE_GUARD_TELEGRAM_MODE", "disabled").lower()
    record: dict[str, Any] = {
        "mode": mode,
        "ts": _now_iso(),
        "message_preview": message[:200],
        "called": False,
    }

    if mode == "mock":
        mock_path = os.environ.get("FIFTH_TRUNCATE_GUARD_TELEGRAM_MOCK_PATH")
        if not mock_path:
            record["error"] = "mock_path_missing"
            return record
        p = Path(mock_path)
        try:
            p.parent.mkdir(parents=True, exist_ok=True)
            payload = {
                "ts": _now_iso(),
                "message": message,
            }
            p.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8")
            record["called"] = True
            record["mock_path"] = str(p)
        except OSError as exc:
            record["error"] = f"mock_write_failure: {exc}"
        return record

    if mode == "production":
        bot_token = os.environ.get("TELEGRAM_BOT_TOKEN", "")
        chat_id = os.environ.get("TELEGRAM_CHAT_ID", "")
        if not bot_token or not chat_id:
            record["error"] = "missing_bot_token_or_chat_id"
            return record
        try:
            import urllib.parse
            import urllib.request
            url = f"https://api.telegram.org/bot{bot_token}/sendMessage"
            data = urllib.parse.urlencode({
                "chat_id": chat_id,
                "text": message,
            }).encode("utf-8")
            req = urllib.request.Request(url, data=data, method="POST")
            with urllib.request.urlopen(req, timeout=5) as resp:
                record["http_status"] = resp.status
            record["called"] = True
        except Exception as exc:
            record["error"] = f"http_failure: {exc}"
        return record

    return record


def write_evidence_file(
    *,
    evidence_dir: Path,
    task_id: str,
    runner_check: dict[str, Any],
    test_check: dict[str, Any],
) -> Path:
    """evidence JSONL 박제 — escalation_marker.py --evidence path 인자에 전달."""
    evidence_dir.mkdir(parents=True, exist_ok=True)
    evidence_path = evidence_dir / f"{task_id}.fifth-truncate-halt-evidence.jsonl"
    record = {
        "ts": _now_iso(),
        "guard": GUARD_SOURCE_LABEL,
        "task_id": task_id,
        "runner_check": runner_check,
        "test_check": test_check,
    }
    with evidence_path.open("a", encoding="utf-8") as fp:
        fp.write(json.dumps(record, ensure_ascii=False) + "\n")
    return evidence_path


def write_side_marker(
    *,
    events_dir: Path,
    runner_check: dict[str, Any],
    test_check: dict[str, Any],
    escalation_result: dict[str, Any],
    telegram_result: dict[str, Any],
    evidence_path: Path,
    exit_code: int,
) -> Path:
    """task-2590.fifth-truncate-halt-trigger.json 보조 marker — spec §6.3.

    escalation_marker.py가 발행하는 task-2586.done.escalated 와 별도로 isolation 마커 박제.
    task-2586.done.escalated가 이미 존재해서 escalation_marker.py가 실패해도 본 marker는 박제됨.
    """
    events_dir.mkdir(parents=True, exist_ok=True)
    marker_path = events_dir / "task-2590.fifth-truncate-halt-trigger.json"
    payload = {
        "task_id": "task-2590",
        "trigger": "fifth_truncate_halt_guard",
        "ts": _now_iso(),
        "halt_exit_code": exit_code,
        "runner_check": runner_check,
        "test_check": test_check,
        "escalation_marker_result": escalation_result,
        "telegram_alert_result": telegram_result,
        "evidence_path": str(evidence_path),
    }
    marker_path.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8")
    return marker_path


def invoke_escalation_marker(
    *,
    task_id: str,
    reason: str,
    evidence_path: Path,
    escalation_marker_path: Path,
) -> dict[str, Any]:
    """escalation_marker.py emit subprocess 호출.

    회장 #4 precheck — 기존 호환 kind=escalated 사용 (ALLOWED_KINDS=('escalated','blocked')).
    """
    cmd = [
        sys.executable,
        str(escalation_marker_path),
        "emit",
        "--task-id", task_id,
        "--kind", "escalated",
        "--reason", reason,
        "--source", GUARD_SOURCE_LABEL,
        "--blocking", BLOCKING_CONDITION_LABEL,
        "--evidence", str(evidence_path),
    ]
    result: dict[str, Any] = {
        "cmd": cmd,
        "returncode": None,
        "stdout": "",
        "stderr": "",
        "called": False,
    }
    try:
        completed = subprocess.run(
            cmd,
            capture_output=True,
            text=True,
            timeout=30,
            check=False,
        )
        result["returncode"] = completed.returncode
        result["stdout"] = completed.stdout
        result["stderr"] = completed.stderr
        result["called"] = True
    except (subprocess.TimeoutExpired, OSError) as exc:
        result["error"] = f"subprocess_failure: {exc}"
    return result


def run_preflight(args: argparse.Namespace) -> int:
    runner_path = Path(args.runner_path).resolve()
    test_path = Path(args.test_path).resolve()
    evidence_dir = Path(args.evidence_dir).resolve()
    task_id = args.task_id

    runner_check = check_file_state(
        runner_path,
        baseline_sha256=args.runner_baseline_sha256,
        baseline_size=args.runner_baseline_size,
    )
    runner_check["label"] = "replacement_pr_runner"

    test_check = check_file_state(
        test_path,
        baseline_sha256=args.test_baseline_sha256,
        baseline_size=args.test_baseline_size,
    )
    test_check["label"] = "test_replacement_pr_runner_2510"

    halt = (not runner_check["ok"]) or (not test_check["ok"])

    if not halt:
        result = {
            "status": "PASS",
            "ts": _now_iso(),
            "guard": GUARD_SOURCE_LABEL,
            "task_id": task_id,
            "runner_check": runner_check,
            "test_check": test_check,
        }
        print(json.dumps(result, ensure_ascii=False, indent=2))
        return 0

    reason_parts = []
    if not runner_check["ok"]:
        reason_parts.append(f"runner:{runner_check['reason']}")
    if not test_check["ok"]:
        reason_parts.append(f"test:{test_check['reason']}")
    halt_reason = (
        "fifth_truncate_halt — utils/replacement_pr_runner.py and/or "
        "tests/regression/test_replacement_pr_runner_2510.py truncate detected "
        "(5th known recurrence) — activation halted per task-2586 P1 precondition "
        f"[{', '.join(reason_parts)}]"
    )

    evidence_path = write_evidence_file(
        evidence_dir=evidence_dir,
        task_id="task-2590",
        runner_check=runner_check,
        test_check=test_check,
    )

    escalation_marker_path = Path(
        os.environ.get(
            "FIFTH_TRUNCATE_GUARD_ESCALATION_MARKER_PATH",
            str(Path(__file__).resolve().parent / "escalation_marker.py"),
        )
    )

    escalation_result = invoke_escalation_marker(
        task_id=args.escalation_task_id,
        reason=halt_reason,
        evidence_path=evidence_path,
        escalation_marker_path=escalation_marker_path,
    )

    telegram_msg = (
        f"[HOLD_FOR_CHAIR] fifth_truncate_halt — {halt_reason}\n"
        f"evidence={evidence_path}\n"
        f"escalation_marker_returncode={escalation_result.get('returncode')}\n"
        f"ts={_now_iso()}"
    )
    telegram_result = send_telegram_alert(telegram_msg)

    marker_path = write_side_marker(
        events_dir=evidence_dir,
        runner_check=runner_check,
        test_check=test_check,
        escalation_result=escalation_result,
        telegram_result=telegram_result,
        evidence_path=evidence_path,
        exit_code=HALT_EXIT_CODE,
    )

    result = {
        "status": "HALT",
        "ts": _now_iso(),
        "guard": GUARD_SOURCE_LABEL,
        "task_id": task_id,
        "halt_reason": halt_reason,
        "runner_check": runner_check,
        "test_check": test_check,
        "evidence_path": str(evidence_path),
        "side_marker_path": str(marker_path),
        "escalation_marker_result": escalation_result,
        "telegram_alert_result": telegram_result,
        "exit_code": HALT_EXIT_CODE,
    }
    print(json.dumps(result, ensure_ascii=False, indent=2))
    return HALT_EXIT_CODE


def main(argv: list[str] | None = None) -> int:
    parser = argparse.ArgumentParser(
        prog="replacement_pr_dry_run_activation_guard",
        description="P1 fifth-truncate halt guard — task-2586 dry-run activation preflight (task-2590).",
    )
    sub = parser.add_subparsers(dest="command")

    pre = sub.add_parser("preflight", help="single-shot baseline preflight check")
    pre.add_argument("--task-id", default="task-2590", help="guard 호출 task ID (logging 용)")
    pre.add_argument(
        "--escalation-task-id",
        default=DEFAULT_ESCALATION_MARKER_TASK_ID,
        help="escalation_marker.py emit 시 --task-id (기본: task-2586, spec §6.2 verbatim)",
    )
    pre.add_argument("--runner-path", required=True)
    pre.add_argument("--test-path", required=True)
    pre.add_argument("--runner-baseline-sha256", required=True)
    pre.add_argument("--test-baseline-sha256", required=True)
    pre.add_argument("--runner-baseline-size", type=int, required=True)
    pre.add_argument("--test-baseline-size", type=int, required=True)
    pre.add_argument("--evidence-dir", required=True, help="evidence + side marker 발행 dir (보통 memory/events)")

    args = parser.parse_args(argv)

    if args.command == "preflight":
        return run_preflight(args)

    parser.print_help()
    return USAGE_EXIT_CODE


if __name__ == "__main__":
    sys.exit(main())
