#!/usr/bin/env python3
"""
verify_workflow_sha_payload.py

두 모드 지원:
  --mode resolve   : GitHub Actions workflow runtime 에서 SHA/PR 추출 (fallback chain + canonical 검증)
  --mode dry-run   : tests/regression/fixtures/ 의 fixture 4종으로 자체 검증

작성자: 불칸 (dev1-team 백엔드)
task: task-2486 — CI pull_request SHA payload fallback fix
"""

from __future__ import annotations

import argparse
import json
import os
import subprocess
import sys
from pathlib import Path
from typing import Optional, Tuple


# ---------------------------------------------------------------------------
# helpers
# ---------------------------------------------------------------------------

def _env(key: str, default: str = "") -> str:
    return os.environ.get(key, default).strip()


def _run_gh(args: list[str], timeout: int = 15) -> Optional[str]:
    """gh CLI 호출. 실패/타임아웃 시 None 반환."""
    try:
        result = subprocess.run(
            ["gh"] + args,
            capture_output=True,
            text=True,
            timeout=timeout,
        )
        if result.returncode == 0:
            return result.stdout.strip()
        return None
    except Exception:
        return None


def _emit_audit(lines: list[str], step_summary_path: str) -> None:
    """$GITHUB_STEP_SUMMARY 에 markdown 라인을 append. 미설정 시 stderr 출력."""
    content = "\n".join(lines) + "\n"
    if step_summary_path:
        try:
            with open(step_summary_path, "a", encoding="utf-8") as f:
                f.write(content)
            return
        except OSError:
            pass
    sys.stderr.write(content)


def _load_event(event_path: str) -> dict:
    if not event_path:
        return {}
    try:
        with open(event_path, "r", encoding="utf-8") as f:
            return json.load(f)
    except Exception:
        return {}


# ---------------------------------------------------------------------------
# SHA fallback chain
# ---------------------------------------------------------------------------

def _resolve_sha(event: dict, pr_number: Optional[str], repo: str, dry_run: bool = False) -> Tuple[str, str, bool]:
    """
    SHA 추출 fallback chain.

    Returns:
        (sha_value, stage_used, is_canonical)
        stage_used: "1" | "2" | "3" | "4"
        is_canonical: True 이면 canonical 비교 불필요
    """
    candidates: list[Tuple[str, str, bool]] = []

    # 1단계: event.pull_request.head.sha
    sha1 = str(event.get("pull_request", {}).get("head", {}).get("sha") or "").strip()
    candidates.append((sha1, "1", True))  # 1단계는 즉시 채택 (canonical 생략)

    # 2단계: event.after
    sha2 = str(event.get("after") or "").strip()
    candidates.append((sha2, "2", False))

    # 3단계: GITHUB_SHA 환경변수
    sha3 = _env("GITHUB_SHA")
    candidates.append((sha3, "3", False))

    # 4단계: gh pr view (dry-run 에서는 스킵)
    sha4 = ""
    if not dry_run and pr_number:
        raw = _run_gh(["pr", "view", pr_number, "--json", "headRefOid", "--repo", repo])
        if raw:
            try:
                sha4 = json.loads(raw).get("headRefOid", "").strip()
            except Exception:
                sha4 = ""
    candidates.append((sha4, "4", True))  # 4단계 자체가 canonical

    for sha_val, stage, is_can in candidates:
        if sha_val:
            return sha_val, stage, is_can

    return "", "none", False


# ---------------------------------------------------------------------------
# PR fallback chain
# ---------------------------------------------------------------------------

def _resolve_pr(event: dict, branch: str, repo: str, dry_run: bool = False) -> Tuple[str, str]:
    """
    PR 번호 추출 fallback chain.

    Returns:
        (pr_number, stage_used)
        stage_used: "1" | "2" | "3" | "none"
    """
    # 1단계: event.pull_request.number
    pr1 = event.get("pull_request", {}).get("number")
    if pr1 is not None:
        val = str(pr1).strip()
        if val and val != "None" and val != "null":
            return val, "1"

    # 2단계: event.number
    pr2 = event.get("number")
    if pr2 is not None:
        val = str(pr2).strip()
        if val and val != "None" and val != "null":
            return val, "2"

    # 3단계: gh pr list (dry-run 에서는 스킵/fail)
    if not dry_run and branch and repo:
        raw = _run_gh([
            "pr", "list",
            "--head", branch,
            "--base", "main",
            "--state", "open",
            "--json", "number,headRefName,headRepositoryOwner",
            "--repo", repo,
        ])
        if raw:
            try:
                items = json.loads(raw)
                if isinstance(items, list) and len(items) == 1:
                    num = str(items[0].get("number", "")).strip()
                    if num:
                        return num, "3"
                # 0건 또는 2건 이상 → 해당 후보 실패
            except Exception:
                pass

    return "", "none"


# ---------------------------------------------------------------------------
# canonical 검증
# ---------------------------------------------------------------------------

def _verify_canonical(sha: str, pr_number: str, repo: str) -> Tuple[str, bool]:
    """
    gh pr view 로 canonical SHA 조회, mismatch 시 canonical 로 교체.

    Returns:
        (final_sha, was_mismatch)
    """
    raw = _run_gh(["pr", "view", pr_number, "--json", "headRefOid", "--repo", repo])
    if not raw:
        # canonical 조회 실패 → 기존 SHA 사용 (워닝만)
        return sha, False
    try:
        canonical = json.loads(raw).get("headRefOid", "").strip()
    except Exception:
        return sha, False

    if not canonical:
        return sha, False

    if canonical != sha:
        return canonical, True

    return sha, False


# ---------------------------------------------------------------------------
# resolve mode
# ---------------------------------------------------------------------------

def mode_resolve(args: argparse.Namespace) -> int:
    event_path = args.event_path or _env("GITHUB_EVENT_PATH")
    branch = args.branch or _env("GITHUB_HEAD_REF") or _env("GITHUB_REF_NAME")
    repo = args.repo or _env("GITHUB_REPOSITORY")
    step_summary = _env("GITHUB_STEP_SUMMARY")

    event = _load_event(event_path)

    # --- PR fallback ---
    pr_number, pr_stage = _resolve_pr(event, branch, repo)

    # --- SHA fallback ---
    sha, sha_stage, sha_is_canonical = _resolve_sha(event, pr_number or None, repo)

    # --- canonical 검증 (fallback 2~3단계 SHA 채택 시만) ---
    sha_mismatch = False
    if sha and not sha_is_canonical and pr_number and repo:
        sha, sha_mismatch = _verify_canonical(sha, pr_number, repo)

    # --- audit table ---
    audit_lines = [
        "",
        "## SHA/PR Resolve Audit",
        "",
        "| Candidate | Value | Stage | Used | Notes |",
        "|-----------|-------|-------|------|-------|",
        f"| SHA-1 event.pull_request.head.sha | `{str(event.get('pull_request', {}).get('head', {}).get('sha') or '')[:16]}` | 1 | {sha_stage == '1'} | primary |",
        f"| SHA-2 event.after | `{str(event.get('after') or '')[:16]}` | 2 | {sha_stage == '2'} | fallback |",
        f"| SHA-3 GITHUB_SHA | `{_env('GITHUB_SHA')[:16]}` | 3 | {sha_stage == '3'} | env fallback |",
        f"| SHA-4 gh pr view headRefOid | canonical | 4 | {sha_stage == '4'} | canonical |",
        f"| PR-1 event.pull_request.number | `{str(event.get('pull_request', {}).get('number') or '')}` | 1 | {pr_stage == '1'} | primary |",
        f"| PR-2 event.number | `{str(event.get('number') or '')}` | 2 | {pr_stage == '2'} | fallback |",
        f"| PR-3 gh pr list | (api) | 3 | {pr_stage == '3'} | gh fallback |",
    ]
    if sha_mismatch:
        audit_lines.append("")
        audit_lines.append(f"> **WARNING**: SHA fallback mismatch — canonical override applied (stage={sha_stage}→canonical)")
    audit_lines.append("")

    _emit_audit(audit_lines, step_summary)

    # --- failure check ---
    if not sha or not pr_number:
        print("::error::missing pull_request payload (SHA={} PR={})".format(sha, pr_number))
        return 1

    # --- stdout output (eval 가능 형식) ---
    print(f"RESOLVED_SHA={sha}")
    print(f"RESOLVED_PR={pr_number}")
    return 0


# ---------------------------------------------------------------------------
# dry-run mode
# ---------------------------------------------------------------------------

FIXTURES_DIR = Path(__file__).parent.parent / "tests" / "regression" / "fixtures" / "workflow_sha_payload"

DRY_RUN_FIXTURES = [
    "pr_event_normal",
    "pr_event_empty_sha",
    "pr_event_empty_pr",
    "pr_event_empty_both",
]


def _run_fixture_dryrun(name: str) -> Tuple[bool, str]:
    """
    단일 fixture 에 대해 resolve 로직을 dry-run (gh 호출 없이).

    Returns:
        (passed, detail_message)
    """
    event_file = FIXTURES_DIR / f"{name}.json"
    expected_file = FIXTURES_DIR / f"{name}.expected.json"

    if not event_file.exists():
        return False, f"FIXTURE NOT FOUND: {event_file}"
    if not expected_file.exists():
        return False, f"EXPECTED NOT FOUND: {expected_file}"

    with open(event_file, "r", encoding="utf-8") as f:
        event = json.load(f)
    with open(expected_file, "r", encoding="utf-8") as f:
        expected = json.load(f)

    # env_overrides 적용 (임시)
    env_overrides: dict = expected.get("env_overrides", {})
    original_env: dict = {}
    for k, v in env_overrides.items():
        original_env[k] = os.environ.get(k)
        os.environ[k] = str(v)

    try:
        # PR fallback (dry-run: gh 3단계 스킵)
        pr_number, _ = _resolve_pr(event, branch="", repo="", dry_run=True)

        # SHA fallback (dry-run: gh 4단계 스킵)
        sha, _, _ = _resolve_sha(event, pr_number or None, repo="", dry_run=True)

        # canonical 검증 dry-run 에서는 스킵

        # 성공/실패 결정
        resolved_ok = bool(sha and pr_number)
        expected_exit = expected.get("exit_code", 0)
        got_exit = 0 if resolved_ok else 1

        # exit_code 비교
        if got_exit != expected_exit:
            return False, (
                f"FAIL [{name}]: exit_code mismatch "
                f"(expected={expected_exit}, got={got_exit}, sha={sha!r}, pr={pr_number!r})"
            )

        # 성공 케이스이면 값 비교
        if expected_exit == 0:
            exp_sha = expected.get("sha")
            exp_pr = str(expected.get("pr", "")) if expected.get("pr") is not None else None

            if exp_sha is not None and sha != exp_sha:
                return False, (
                    f"FAIL [{name}]: SHA mismatch (expected={exp_sha!r}, got={sha!r})"
                )
            if exp_pr is not None and pr_number != exp_pr:
                return False, (
                    f"FAIL [{name}]: PR mismatch (expected={exp_pr!r}, got={pr_number!r})"
                )

        return True, f"PASS [{name}]: sha={sha!r} pr={pr_number!r} exit={got_exit}"

    finally:
        # env_overrides 원복
        for k, orig in original_env.items():
            if orig is None:
                os.environ.pop(k, None)
            else:
                os.environ[k] = orig


def mode_dryrun() -> int:
    print("=== verify_workflow_sha_payload.py --mode dry-run ===")
    all_pass = True
    results: list[str] = []

    for name in DRY_RUN_FIXTURES:
        passed, detail = _run_fixture_dryrun(name)
        results.append(detail)
        print(detail)
        if not passed:
            all_pass = False

    print()
    if all_pass:
        print(f"dry-run result: PASS ({len(DRY_RUN_FIXTURES)}/{len(DRY_RUN_FIXTURES)} fixtures)")
        return 0
    else:
        fail_count = sum(1 for r in results if r.startswith("FAIL"))
        print(f"dry-run result: FAIL ({fail_count}/{len(DRY_RUN_FIXTURES)} fixtures failed)")
        return 1


# ---------------------------------------------------------------------------
# main
# ---------------------------------------------------------------------------

def main() -> int:
    parser = argparse.ArgumentParser(
        description="verify_workflow_sha_payload — SHA/PR fallback chain for GitHub Actions"
    )
    parser.add_argument(
        "--mode",
        choices=["resolve", "dry-run"],
        required=True,
        help="실행 모드: resolve (workflow runtime) | dry-run (자체 검증)",
    )
    parser.add_argument("--event-path", default="", help="GitHub event JSON 경로 (기본값: GITHUB_EVENT_PATH)")
    parser.add_argument("--event-name", default="", help="이벤트 이름 (기본값: GITHUB_EVENT_NAME)")
    parser.add_argument("--branch", default="", help="head 브랜치 (기본값: GITHUB_HEAD_REF 또는 GITHUB_REF_NAME)")
    parser.add_argument("--repo", default="", help="저장소 (기본값: GITHUB_REPOSITORY)")

    args = parser.parse_args()

    if args.mode == "resolve":
        return mode_resolve(args)
    elif args.mode == "dry-run":
        return mode_dryrun()
    else:
        print(f"::error::unknown mode: {args.mode}")
        return 1


if __name__ == "__main__":
    sys.exit(main())
