#!/usr/bin/env python3
import argparse
import json
import os
import subprocess
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Any

import yaml

# playwright는 선택적 의존성: 사용 시점에 import
try:
    from playwright.sync_api import sync_playwright
except ImportError:
    sync_playwright = None  # type: ignore[assignment]


def _load_impact(impact_file: str) -> list[str]:
    if not impact_file or not os.path.isfile(impact_file):
        return []
    try:
        with open(impact_file, "r", encoding="utf-8") as f:
            data = json.load(f)
        return data.get("affected", [])
    except (json.JSONDecodeError, OSError):
        return []


def _load_scenarios(scenarios_dir: str) -> list[dict]:
    scenarios = []
    for root, _, files in os.walk(scenarios_dir):
        for fname in files:
            if not fname.endswith(".yaml") and not fname.endswith(".yml"):
                continue
            fpath = os.path.join(root, fname)
            try:
                with open(fpath, "r", encoding="utf-8") as f:
                    data = yaml.safe_load(f)
                if isinstance(data, list):
                    scenarios.extend(data)
                elif isinstance(data, dict):
                    scenarios.append(data)
            except (yaml.YAMLError, OSError):
                pass
    return scenarios


def _matches_affected(target: list[str], affected: list[str]) -> bool:
    if not affected:
        return True
    for t in target:
        t_base = t.split(":")[0]
        for a in affected:
            if t_base in a or a in t_base:
                return True
    return False


def _run_curl_step(step: dict) -> tuple[bool, str]:
    action = step.get("action", "")
    expect_status = step.get("expect_status")
    expect_contains = step.get("expect_contains", "")

    try:
        # -w으로 HTTP 상태 코드 추출
        cmd = action
        if expect_status is not None:
            # curl에 -w "%{http_code}" 추가 (이미 있으면 제외)
            if "-w " not in cmd and "--write-out" not in cmd:
                cmd = cmd + ' -w "\\n__STATUS__%{http_code}"'

        result = subprocess.run(cmd, shell=True, capture_output=True, text=True, timeout=120)
        output = result.stdout

        if expect_status is not None and "__STATUS__" in output:
            parts = output.rsplit("__STATUS__", 1)
            body = parts[0]
            status_code_str = parts[1].strip()
            try:
                status_code = int(status_code_str)
            except ValueError:
                status_code = 0
            if status_code != expect_status:
                return False, f"HTTP status {status_code} != expected {expect_status}"
            output = body

        if expect_contains and expect_contains not in output:
            return False, f"expected '{expect_contains}' not found in output"

        return True, output

    except subprocess.TimeoutExpired:
        return False, "timeout after 120s"
    except Exception as e:
        return False, f"{type(e).__name__}: {e}"


def _run_subprocess_step(step: dict) -> tuple[bool, str]:
    action = step.get("action", "")
    expect_contains = step.get("expect_contains", "")

    try:
        result = subprocess.run(action, shell=True, capture_output=True, text=True, timeout=120)
        combined = result.stdout + result.stderr

        if expect_contains and expect_contains not in combined:
            return False, f"expected '{expect_contains}' not found in output"

        return True, combined

    except subprocess.TimeoutExpired:
        return False, "timeout after 120s"
    except Exception as e:
        return False, f"{type(e).__name__}: {e}"


def _run_pytest_step(step: dict) -> tuple[bool, str]:
    action = step.get("action", "")

    try:
        result = subprocess.run(f"python3 -m pytest {action}", shell=True, capture_output=True, text=True, timeout=120)
        if result.returncode != 0:
            return False, f"pytest exited with code {result.returncode}: {result.stdout[-500:]}"
        return True, result.stdout

    except subprocess.TimeoutExpired:
        return False, "timeout after 120s"
    except Exception as e:
        return False, f"{type(e).__name__}: {e}"


def _resolve_placeholders(text: str) -> str:
    """환경변수 기반 플레이스홀더 치환. {key} → os.environ[SCENARIO_key] 또는 원본 유지."""
    import re

    def _replace(m: re.Match) -> str:
        key = m.group(1)
        return os.environ.get(f"SCENARIO_{key}") or m.group(0)

    return re.sub(r"\{(\w+)\}", _replace, text)


def _execute_playwright_action(page, action: str, wait=None) -> tuple[bool, str]:
    """개별 Playwright 액션 실행."""
    try:
        action = _resolve_placeholders(action)
        parts = action.split(maxsplit=1)
        cmd = parts[0] if parts else ""
        arg = parts[1] if len(parts) > 1 else ""

        if cmd == "navigate":
            page.goto(arg, wait_until="networkidle" if wait == "networkidle" else "load")
        elif cmd == "fill":
            # "fill selector value" 형태 — 마지막 공백 기준으로 분리
            selector_and_value = arg.rsplit(" ", 1)
            if len(selector_and_value) == 2:
                page.fill(selector_and_value[0], selector_and_value[1])
            else:
                return False, f"fill 액션 형식 오류: {action}"
        elif cmd == "click":
            page.click(arg)
        elif cmd == "assert_visible":
            if not page.locator(arg).is_visible():
                return False, f"assert_visible 실패: '{arg}' 가시적이지 않음"
        elif cmd == "assert_in_viewport":
            box = page.locator(arg).bounding_box()
            if box is None:
                return False, f"assert_in_viewport 실패: '{arg}' bounding box 없음"
        elif cmd == "assert_not_visible":
            if page.locator(arg).is_visible():
                return False, f"assert_not_visible 실패: '{arg}' 가시적임"
        elif cmd == "wait":
            page.wait_for_timeout(int(arg) if arg.isdigit() else 1000)
        else:
            return False, f"알 수 없는 액션: {cmd}"

        # wait 처리
        if isinstance(wait, int) and wait > 0:
            page.wait_for_timeout(wait)

        return True, ""
    except Exception as e:
        return False, f"{type(e).__name__}: {e}"


def _run_playwright_scenario(scenario: dict, storage_state: str = "") -> dict:
    """Playwright E2E 시나리오 실행."""
    sc_id = scenario.get("id", "unknown")
    steps = scenario.get("steps", [])
    screenshots_dir = os.path.join(os.path.dirname(__file__), "screenshots")
    os.makedirs(screenshots_dir, exist_ok=True)

    if sync_playwright is None:
        return {"id": sc_id, "status": "skipped", "reason": "playwright not installed"}

    try:
        with sync_playwright() as p:
            browser = p.chromium.launch(headless=True)
            context_kwargs = {}
            if storage_state and os.path.isfile(storage_state):
                context_kwargs["storage_state"] = storage_state
            context = browser.new_context(**context_kwargs)
            page = context.new_page()

            for i, step in enumerate(steps):
                action = step.get("action", "")
                wait = step.get("wait", None)

                ok, reason = _execute_playwright_action(page, action, wait)
                if not ok:
                    screenshot_path = os.path.join(screenshots_dir, f"{sc_id}-step{i}-fail.png")
                    try:
                        page.screenshot(path=screenshot_path)
                    except Exception:
                        screenshot_path = ""
                    context.close()
                    browser.close()
                    return {
                        "id": sc_id,
                        "status": "failed",
                        "reason": f"step {i}: {reason}",
                        "screenshot": screenshot_path,
                    }

            context.close()
            browser.close()
            return {"id": sc_id, "status": "passed", "reason": ""}
    except Exception as e:
        return {"id": sc_id, "status": "failed", "reason": f"{type(e).__name__}: {e}"}


def _auto_refresh_storage_state(storage_state_path: str = "") -> bool:
    """storageState를 자동 갱신합니다. 성공 시 True, 실패 시 False 반환."""
    setup_auth_path = os.path.join(os.path.dirname(__file__), "auth", "setup_auth.py")
    try:
        result = subprocess.run(
            ["python3", setup_auth_path, "--refresh", "--output", storage_state_path],
            capture_output=True,
            text=True,
            timeout=60,
        )
        return result.returncode == 0
    except Exception:
        return False


def _run_scenario(scenario: dict) -> dict:
    sc_id = scenario.get("id", "unknown")
    sc_type = scenario.get("type", "subprocess")
    steps = scenario.get("steps", [])
    automatable = scenario.get("automatable", True)

    if not automatable:
        return {"id": sc_id, "status": "skipped", "reason": "automatable=false"}

    # Playwright 타입은 별도 러너
    if sc_type == "playwright":
        storage_state = os.path.join(os.path.dirname(__file__), "auth", "storageState.json")
        return _run_playwright_scenario(scenario, storage_state=storage_state)

    # 기존 로직 유지 (subprocess, curl, pytest)
    for step in steps:
        if sc_type == "curl":
            ok, reason = _run_curl_step(step)
        elif sc_type == "pytest":
            ok, reason = _run_pytest_step(step)
        else:
            ok, reason = _run_subprocess_step(step)

        if not ok:
            return {"id": sc_id, "status": "failed", "reason": reason}

    return {"id": sc_id, "status": "passed", "reason": ""}


def _detect_duplicates(scenarios: list[dict]) -> list[str]:
    """id 기준 중복 감지, 중복 id 리스트 반환."""
    from collections import Counter

    ids: list[str] = [str(s["id"]) for s in scenarios if s.get("id") is not None]
    id_counts = Counter(ids)
    return [sc_id for sc_id, count in id_counts.items() if count > 1]


def show_stats(scenarios_dir: str) -> dict:
    """scenarios_dir 하위 프로젝트별 시나리오 카운트 및 중복 감지.

    반환값: {"projects": {"insuwiki": 10, "dashboard": 5}, "total": 15, "duplicates": ["SC-001"]}
    """
    projects: dict[str, int] = {}
    all_scenarios: list[dict] = []

    # 프로젝트별(디렉토리별) 카운트
    if os.path.isdir(scenarios_dir):
        for entry in os.scandir(scenarios_dir):
            if entry.is_dir():
                project_scenarios = _load_scenarios(entry.path)
                if project_scenarios:
                    projects[entry.name] = len(project_scenarios)
                    all_scenarios.extend(project_scenarios)
            elif entry.name.endswith((".yaml", ".yml")):
                # scenarios_dir 직접 하위 yaml 파일 (프로젝트 없이 flat)
                try:
                    with open(entry.path, "r", encoding="utf-8") as f:
                        import yaml as _yaml

                        data = _yaml.safe_load(f)
                    if isinstance(data, list):
                        all_scenarios.extend(data)
                    elif isinstance(data, dict):
                        all_scenarios.append(data)
                except Exception:
                    pass

    duplicates = _detect_duplicates(all_scenarios)
    total = sum(projects.values()) if projects else len(all_scenarios)

    result = {"projects": projects, "total": total, "duplicates": duplicates}

    # stdout 출력
    print(f"[scenario_runner] 시나리오 통계: total={total}")
    for proj, count in sorted(projects.items()):
        print(f"  {proj}: {count}개")
    if duplicates:
        print(f"[scenario_runner] 중복 시나리오 ID: {', '.join(duplicates)}")

    return result


def run_scenarios(scenarios_dir: str, impact_file: str = "", output_file: str = "") -> dict:
    affected = _load_impact(impact_file)
    all_scenarios = _load_scenarios(scenarios_dir)

    filtered = [s for s in all_scenarios if _matches_affected(s.get("target", []), affected)]

    # type별로 분류: playwright는 순차, 나머지는 병렬
    playwright_scenarios = [s for s in filtered if s.get("type") == "playwright"]
    parallel_scenarios = [s for s in filtered if s.get("type") != "playwright"]

    start = time.time()
    total = len(filtered)
    passed = 0
    failed = 0
    skipped = 0
    failures: list[dict] = []

    # playwright 시나리오가 있으면 TTL 체크 (lazy import)
    if playwright_scenarios:
        try:
            from qc.auth.setup_auth import check_and_refresh_ttl

            needs_refresh = check_and_refresh_ttl()
            if needs_refresh:
                success = _auto_refresh_storage_state()
                if success:
                    print("[scenario_runner] storageState 자동 갱신 완료.")
                else:
                    print(
                        "[scenario_runner] storageState 자동 갱신 실패. 수동으로 setup_auth.py --refresh를 실행하세요."
                    )
        except ImportError:
            pass

    # 병렬 실행: subprocess / curl / pytest 타입
    with ThreadPoolExecutor(max_workers=5) as executor:
        future_map = {executor.submit(_run_scenario, s): s for s in parallel_scenarios}
        for future in as_completed(future_map):
            res = future.result()
            if res["status"] == "passed":
                passed += 1
            elif res["status"] == "skipped":
                skipped += 1
            else:
                failed += 1
                failures.append({"id": res["id"], "reason": res["reason"]})

    # 순차 실행: playwright 타입
    for s in playwright_scenarios:
        res = _run_scenario(s)
        if res["status"] == "passed":
            passed += 1
        elif res["status"] == "skipped":
            skipped += 1
        else:
            failed += 1
            failures.append({"id": res["id"], "reason": res["reason"]})

    # gate 결정: must 우선순위 시나리오 중 FAIL 여부
    must_ids = {s.get("id") for s in filtered if s.get("priority") == "must"}
    failed_must = any(f["id"] in must_ids for f in failures)
    gate = "FAIL" if failed_must else "PASS"

    result: dict[str, Any] = {
        "total": total,
        "passed": passed,
        "failed": failed,
        "skipped": skipped,
        "duration_seconds": round(time.time() - start, 2),
        "failures": failures,
        "gate": gate,
    }

    if output_file:
        try:
            os.makedirs(os.path.dirname(os.path.abspath(output_file)), exist_ok=True)
            with open(output_file, "w", encoding="utf-8") as f:
                json.dump(result, f, ensure_ascii=False, indent=2)
        except OSError:
            pass

    return result


def verify(
    task_id: str,
    scenarios_dir: str = "",
    impact_file: str = "",
    check_files: list | None = None,
    level: int = 0,
) -> dict:
    if not scenarios_dir:
        return {"status": "SKIP", "details": ["scenarios_dir 미지정"]}

    if not os.path.isdir(scenarios_dir):
        if level >= 3:
            return {
                "status": "FAIL",
                "details": [f"시나리오 디렉토리 없음: {scenarios_dir} — Lv.3+ 작업은 시나리오 필수"],
            }
        return {"status": "SKIP", "details": [f"시나리오 디렉토리 없음: {scenarios_dir}"]}

    all_scenarios = _load_scenarios(scenarios_dir)

    if len(all_scenarios) == 0:
        if level >= 3:
            return {"status": "FAIL", "details": ["시나리오 없음: Lv.3+ 작업은 시나리오 필수"]}
        return {"status": "SKIP", "details": ["시나리오 0건: Lv.1-2 작업은 시나리오 생략 허용"]}

    # check_files가 있고 impact_file이 없으면, check_files를 affected 목록으로 사용
    effective_impact = impact_file
    if not impact_file and check_files:
        tmp_impact = os.path.join(scenarios_dir, f".impact-{task_id}.json")
        try:
            with open(tmp_impact, "w", encoding="utf-8") as f:
                json.dump({"affected": check_files}, f)
            effective_impact = tmp_impact
        except OSError:
            pass

    result = run_scenarios(scenarios_dir, impact_file=effective_impact)

    # 임시 impact 파일 정리
    if effective_impact != impact_file and os.path.isfile(effective_impact):
        try:
            os.remove(effective_impact)
        except OSError:
            pass

    if result["gate"] == "FAIL":
        details = [f"{f['id']}: {f['reason']}" for f in result.get("failures", [])]
        return {"status": "FAIL", "details": details or ["gate=FAIL"]}

    total = result["total"]
    passed = result["passed"]

    # Lv.3+ playwright 게이트
    if level >= 3:
        pw_scenarios = [s for s in all_scenarios if s.get("type") == "playwright"]
        if not pw_scenarios:
            return {
                "status": "WARN",
                "details": [
                    f"{passed}/{total} 시나리오 통과",
                    "Lv.3+ 작업이지만 playwright 타입 시나리오 없음 (E2E 검증 권장)",
                ],
            }

    return {"status": "PASS", "details": [f"{passed}/{total} 시나리오 통과 (task: {task_id})"]}


def main() -> None:
    parser = argparse.ArgumentParser(description="시나리오 YAML 실행기")
    parser.add_argument("--scenarios-dir", required=True, help="시나리오 YAML 디렉토리")
    parser.add_argument("--impact", default="", help="impact.json 파일 경로")
    parser.add_argument("--output", default="", help="결과 JSON 파일 경로")
    parser.add_argument("--stats", action="store_true", help="프로젝트별 통계 및 중복 감지 후 종료")
    args = parser.parse_args()

    if args.stats:
        result = show_stats(scenarios_dir=args.scenarios_dir)
        print(json.dumps(result, ensure_ascii=False, indent=2))
        return

    result = run_scenarios(
        scenarios_dir=args.scenarios_dir,
        impact_file=args.impact,
        output_file=args.output,
    )
    print(json.dumps(result, ensure_ascii=False, indent=2))


if __name__ == "__main__":
    main()
