"""
schema_contract.py - Worker 스키마 계약 검증 verifier
workers/ 디렉토리를 자동 감지하여 SC-1 ~ SC-8 항목 검증

SC-1: Worker 디렉토리에 models.py 존재 확인
SC-2: Worker 디렉토리에 tests/test_contract.py 존재 확인
SC-3: shared/schemas/{name}.schema.json 존재 확인
SC-4: sample.normal.json → JSON Schema 검증
SC-5: sample.edge.json → JSON Schema 검증 (없으면 WARN)
SC-6: Pydantic 모델 필드 ↔ JSON Schema properties 일치 확인
SC-7: sample.json mtime vs .schema.ts mtime 비교 (sample이 더 새우면 WARN)
SC-8: Pydantic v1 문법(schema()) 사용 탐지 (WARN)
"""

import glob
import importlib.util
import json
import os
import re
import sys
from typing import Optional


DEFAULT_TEAMS_BASE = "/home/jay/workspace/teams"
DEFAULT_SCHEMAS_DIR = "/home/jay/workspace/teams/shared/schemas"


# ─── 내부 헬퍼 ────────────────────────────────────────────────────────────────


def _try_import_jsonschema():
    """jsonschema 라이브러리를 임포트 시도. 없으면 None 반환."""
    try:
        import jsonschema  # noqa: PLC0415
        return jsonschema
    except ImportError:
        return None


def _find_worker_dirs(workers_base_dir: str) -> list[dict]:
    """
    workers_base_dir 하위에서 models.py 가 있는 디렉토리를 Worker로 감지.
    반환: [{"dir": "/abs/path/info_keyword", "name": "info_keyword"}, ...]
    """
    pattern = os.path.join(workers_base_dir, "*", "models.py")
    workers = []
    for models_path in glob.glob(pattern):
        worker_dir = os.path.dirname(models_path)
        worker_name = os.path.basename(worker_dir)
        workers.append({"dir": worker_dir, "name": worker_name})

    # 한 단계 더 깊이도 탐색 (예: teams/dev1/workers/info_keyword/models.py)
    pattern2 = os.path.join(workers_base_dir, "*", "*", "models.py")
    for models_path in glob.glob(pattern2):
        worker_dir = os.path.dirname(models_path)
        worker_name = os.path.basename(worker_dir)
        # 중복 방지
        if not any(w["dir"] == worker_dir for w in workers):
            workers.append({"dir": worker_dir, "name": worker_name})

    return workers


def _read_schema_name_override(worker_dir: str) -> Optional[str]:
    """
    models.py 에서 __schema_name__ 변수를 파싱. 없으면 None.
    정규식으로 파싱하여 importlib 없이도 동작.
    """
    models_path = os.path.join(worker_dir, "models.py")
    try:
        with open(models_path, "r", encoding="utf-8") as f:
            content = f.read()
        m = re.search(r'^__schema_name__\s*=\s*["\']([^"\']+)["\']', content, re.MULTILINE)
        if m:
            return m.group(1)
    except OSError:
        pass
    return None


def _load_json_file(path: str) -> tuple[Optional[dict], str]:
    """JSON 파일 로드. (data, error_msg) 반환. 실패 시 data=None."""
    try:
        with open(path, "r", encoding="utf-8") as f:
            return json.load(f), ""
    except FileNotFoundError:
        return None, f"File not found: {path}"
    except json.JSONDecodeError as e:
        return None, f"JSON parse error in {path}: {e}"
    except OSError as e:
        return None, f"OS error reading {path}: {e}"


def _validate_json_against_schema(
    data: dict, schema: dict, jsonschema_mod, sample_path: str
) -> tuple[bool, str]:
    """jsonschema로 data를 schema에 대해 검증. (ok, detail_msg) 반환."""
    try:
        validator_cls = jsonschema_mod.Draft7Validator
        validator = validator_cls(schema)
        errors = sorted(validator.iter_errors(data), key=lambda e: list(e.path))
        if errors:
            msgs = [f"{'.'.join(str(p) for p in e.path) or '<root>'}: {e.message}" for e in errors[:5]]
            return False, f"Schema validation FAILED for {sample_path}: " + "; ".join(msgs)
        return True, f"Schema validation PASSED: {sample_path}"
    except Exception as e:
        return False, f"jsonschema error validating {sample_path}: {type(e).__name__}: {e}"


def _get_pydantic_fields(worker_dir: str, worker_name: str) -> tuple[Optional[set], str]:
    """
    models.py 를 동적으로 임포트하여 Pydantic 모델의 필드 목록을 반환.
    (fields_set_or_None, detail_msg) 반환.
    """
    models_path = os.path.join(worker_dir, "models.py")
    module_name = f"_sc_worker_{worker_name}_models"

    try:
        spec = importlib.util.spec_from_file_location(module_name, models_path)
        if spec is None:
            return None, f"Cannot create module spec for {models_path}"
        mod = importlib.util.module_from_spec(spec)

        # 임시로 sys.path에 worker_dir 추가 (내부 import 지원)
        added_path = False
        if worker_dir not in sys.path:
            sys.path.insert(0, worker_dir)
            added_path = True
        try:
            spec.loader.exec_module(mod)  # type: ignore[union-attr]
        finally:
            if added_path and worker_dir in sys.path:
                sys.path.remove(worker_dir)

        # Pydantic BaseModel 서브클래스 탐색
        try:
            from pydantic import BaseModel  # noqa: PLC0415
        except ImportError:
            return None, "pydantic is not installed; cannot inspect model fields"

        model_classes = []
        for attr_name in dir(mod):
            attr = getattr(mod, attr_name)
            try:
                if (
                    isinstance(attr, type)
                    and issubclass(attr, BaseModel)
                    and attr is not BaseModel
                    and attr.__module__ == module_name
                ):
                    model_classes.append((attr_name, attr))
            except TypeError:
                pass

        if not model_classes:
            return None, f"No Pydantic BaseModel subclass found in {models_path}"

        # 첫 번째(또는 유일한) 모델 사용; 여러 개면 필드 합집합
        all_fields: set = set()
        used_names = []
        for cls_name, cls in model_classes:
            try:
                schema_dict = cls.model_json_schema()  # Pydantic v2
            except AttributeError:
                try:
                    schema_dict = cls.schema()  # Pydantic v1 fallback
                except Exception:
                    schema_dict = {}
            props = set(schema_dict.get("properties", {}).keys())
            all_fields |= props
            used_names.append(cls_name)

        if not all_fields:
            return None, f"No fields found in models {used_names} in {models_path}"

        return all_fields, f"Pydantic model(s) {used_names} loaded from {models_path}"

    except Exception as e:
        return None, f"Dynamic import FAILED for {models_path}: {type(e).__name__}: {e}"


def _detect_pydantic_v1_usage(worker_dir: str) -> list[str]:
    """
    Worker 디렉토리의 .py 파일에서 Pydantic v1 문법 (`.schema()`) 사용을 탐지.
    발견된 위치 목록 반환.
    """
    hits = []
    py_files = glob.glob(os.path.join(worker_dir, "**", "*.py"), recursive=True)
    # .schema() 호출 패턴: 모델명.schema() 또는 Model.schema(
    pattern = re.compile(r'\bschema\(\s*\)')
    for py_file in py_files:
        try:
            with open(py_file, "r", encoding="utf-8", errors="ignore") as f:
                for lineno, line in enumerate(f, 1):
                    if pattern.search(line):
                        rel = os.path.relpath(py_file, worker_dir)
                        hits.append(f"{rel}:{lineno}: {line.rstrip()}")
        except OSError:
            pass
    return hits


# ─── 단일 Worker 검증 ─────────────────────────────────────────────────────────


def _verify_worker(
    worker: dict,
    schemas_dir: str,
    jsonschema_mod,
    jsonschema_missing: bool,
) -> tuple[str, list[str]]:
    """
    단일 Worker에 대해 SC-1 ~ SC-8 수행.
    (overall_status, details_list) 반환.
    """
    worker_dir = worker["dir"]
    worker_name = worker["name"]
    details: list[str] = []
    has_fail = False
    has_warn = False

    # 스키마 이름 결정
    schema_name_override = _read_schema_name_override(worker_dir)
    if schema_name_override:
        schema_base = schema_name_override
        details.append(f"Worker '{worker_name}': schema name override → '{schema_base}'")
    else:
        schema_base = worker_name
        details.append(f"Worker '{worker_name}': schema name = '{schema_base}' (from dir name)")

    schema_json_path = os.path.join(schemas_dir, f"{schema_base}.schema.json")

    # ── SC-1: models.py 존재 ──────────────────────────────────────────────────
    models_py = os.path.join(worker_dir, "models.py")
    if os.path.isfile(models_py):
        details.append(f"SC-1 PASS: models.py found: {models_py}")
    else:
        details.append(f"SC-1 FAIL: models.py MISSING: {models_py}")
        has_fail = True

    # ── SC-2: tests/test_contract.py 존재 ────────────────────────────────────
    test_contract_py = os.path.join(worker_dir, "tests", "test_contract.py")
    if os.path.isfile(test_contract_py):
        details.append(f"SC-2 PASS: tests/test_contract.py found: {test_contract_py}")
    else:
        details.append(f"SC-2 FAIL: tests/test_contract.py MISSING: {test_contract_py}")
        has_fail = True

    # ── SC-3: shared/schemas/{name}.schema.json 존재 ─────────────────────────
    if os.path.isfile(schema_json_path):
        details.append(f"SC-3 PASS: schema file found: {schema_json_path}")
    else:
        details.append(f"SC-3 FAIL: schema file MISSING: {schema_json_path}")
        has_fail = True
        # SC-3 실패 시 SC-4, SC-5, SC-6 건너뜀
        schema_data = None
    # SC-3 성공 시 스키마 로드
    if os.path.isfile(schema_json_path):
        schema_data, schema_err = _load_json_file(schema_json_path)
        if schema_err:
            details.append(f"SC-3 FAIL: {schema_err}")
            has_fail = True
            schema_data = None
    else:
        schema_data = None

    # ── SC-4: sample.normal.json → JSON Schema 검증 ──────────────────────────
    normal_json_path = os.path.join(schemas_dir, f"{schema_base}.sample.normal.json")
    if jsonschema_missing:
        details.append(
            "SC-4 SKIP: jsonschema library not installed. "
            "Install with: pip install jsonschema"
        )
    elif schema_data is None:
        details.append("SC-4 SKIP: schema not loaded (SC-3 failed), cannot validate")
    elif not os.path.isfile(normal_json_path):
        details.append(f"SC-4 FAIL: sample.normal.json MISSING: {normal_json_path}")
        has_fail = True
    else:
        sample_data, sample_err = _load_json_file(normal_json_path)
        if sample_err:
            details.append(f"SC-4 FAIL: {sample_err}")
            has_fail = True
        else:
            ok, msg = _validate_json_against_schema(
                sample_data, schema_data, jsonschema_mod, normal_json_path
            )
            details.append(f"SC-4 {'PASS' if ok else 'FAIL'}: {msg}")
            if not ok:
                has_fail = True

    # ── SC-5: sample.edge.json → JSON Schema 검증 ────────────────────────────
    edge_json_path = os.path.join(schemas_dir, f"{schema_base}.sample.edge.json")
    if jsonschema_missing:
        details.append(
            "SC-5 SKIP: jsonschema library not installed. "
            "Install with: pip install jsonschema"
        )
    elif schema_data is None:
        details.append("SC-5 SKIP: schema not loaded (SC-3 failed), cannot validate")
    elif not os.path.isfile(edge_json_path):
        details.append(f"SC-5 WARN: sample.edge.json NOT FOUND: {edge_json_path}")
        has_warn = True
    else:
        sample_data, sample_err = _load_json_file(edge_json_path)
        if sample_err:
            details.append(f"SC-5 FAIL: {sample_err}")
            has_fail = True
        else:
            ok, msg = _validate_json_against_schema(
                sample_data, schema_data, jsonschema_mod, edge_json_path
            )
            details.append(f"SC-5 {'PASS' if ok else 'FAIL'}: {msg}")
            if not ok:
                has_fail = True

    # ── SC-6: Pydantic 모델 필드 ↔ JSON Schema properties 일치 ───────────────
    if schema_data is None:
        details.append("SC-6 SKIP: schema not loaded (SC-3 failed), cannot compare")
    else:
        schema_props = set(schema_data.get("properties", {}).keys())
        if not schema_props:
            details.append(
                f"SC-6 WARN: JSON Schema has no 'properties' defined in {schema_json_path}"
            )
            has_warn = True
        else:
            pydantic_fields, import_msg = _get_pydantic_fields(worker_dir, worker_name)
            if pydantic_fields is None:
                details.append(f"SC-6 FAIL: Cannot inspect Pydantic model — {import_msg}")
                has_fail = True
            else:
                details.append(f"SC-6 info: {import_msg}")
                only_in_schema = schema_props - pydantic_fields
                only_in_model = pydantic_fields - schema_props
                if only_in_schema or only_in_model:
                    mismatch_parts = []
                    if only_in_schema:
                        mismatch_parts.append(
                            f"in schema only: {sorted(only_in_schema)}"
                        )
                    if only_in_model:
                        mismatch_parts.append(
                            f"in model only: {sorted(only_in_model)}"
                        )
                    details.append(
                        f"SC-6 FAIL: Field mismatch — {'; '.join(mismatch_parts)}"
                    )
                    has_fail = True
                else:
                    details.append(
                        f"SC-6 PASS: Pydantic fields match JSON Schema properties "
                        f"({len(schema_props)} fields)"
                    )

    # ── SC-7: sample.json vs .schema.ts mtime 비교 ───────────────────────────
    ts_schema_path = os.path.join(schemas_dir, f"{schema_base}.schema.ts")
    sample_paths_for_mtime = [
        p for p in [normal_json_path, edge_json_path] if os.path.isfile(p)
    ]
    if not os.path.isfile(ts_schema_path):
        details.append(
            f"SC-7 SKIP: .schema.ts not found ({ts_schema_path}), skipping mtime check"
        )
    elif not sample_paths_for_mtime:
        details.append("SC-7 SKIP: no sample.json files found for mtime check")
    else:
        try:
            ts_mtime = os.path.getmtime(ts_schema_path)
            stale_samples = []
            for sp in sample_paths_for_mtime:
                s_mtime = os.path.getmtime(sp)
                if s_mtime > ts_mtime:
                    diff = s_mtime - ts_mtime
                    stale_samples.append(
                        f"{os.path.basename(sp)} is {diff:.0f}s newer than {os.path.basename(ts_schema_path)}"
                    )
            if stale_samples:
                details.append(
                    f"SC-7 WARN: sample file(s) newer than .schema.ts — "
                    + "; ".join(stale_samples)
                )
                has_warn = True
            else:
                details.append(
                    f"SC-7 PASS: .schema.ts is up-to-date relative to sample files"
                )
        except OSError as e:
            details.append(f"SC-7 WARN: mtime check error: {e}")
            has_warn = True

    # ── SC-8: Pydantic v1 문법 (.schema()) 탐지 ──────────────────────────────
    v1_hits = _detect_pydantic_v1_usage(worker_dir)
    if v1_hits:
        details.append(
            f"SC-8 WARN: Pydantic v1 .schema() usage detected in '{worker_name}' "
            f"({len(v1_hits)} occurrence(s)):"
        )
        for hit in v1_hits[:10]:
            details.append(f"  {hit}")
        has_warn = True
    else:
        details.append(f"SC-8 PASS: No Pydantic v1 .schema() usage detected in '{worker_name}'")

    # 최종 상태 결정
    if has_fail:
        status = "FAIL"
    elif has_warn:
        status = "WARN"
    else:
        status = "PASS"

    return status, details


# ─── 공개 인터페이스 ──────────────────────────────────────────────────────────


def verify(
    task_id: str,
    workers_base_dir: Optional[str] = None,
    schemas_dir: Optional[str] = None,
) -> dict:
    """
    Worker 스키마 계약을 검증합니다.

    Args:
        task_id: 검증할 task ID (로깅용)
        workers_base_dir: Worker 디렉토리 탐색 기준 경로.
                          기본값: /home/jay/workspace/teams 하위 자동 탐색
        schemas_dir: JSON Schema 파일이 있는 디렉토리.
                     기본값: /home/jay/workspace/teams/shared/schemas/

    Returns:
        {"status": "PASS"|"FAIL"|"WARN"|"SKIP", "details": [...]}
    """
    if workers_base_dir is None:
        workers_base_dir = DEFAULT_TEAMS_BASE
    if schemas_dir is None:
        schemas_dir = DEFAULT_SCHEMAS_DIR

    details: list[str] = []
    details.append(f"Schema contract check for task: {task_id}")
    details.append(f"Workers base dir: {workers_base_dir}")
    details.append(f"Schemas dir: {schemas_dir}")

    # schemas_dir 존재 확인
    if not os.path.isdir(schemas_dir):
        details.append(f"WARN: schemas_dir not found: {schemas_dir}")

    # jsonschema 가용성 확인 (한 번만)
    jsonschema_mod = _try_import_jsonschema()
    jsonschema_missing = jsonschema_mod is None
    if jsonschema_missing:
        details.append(
            "WARN: jsonschema library not installed. "
            "SC-4 and SC-5 will be SKIPPED. Install with: pip install jsonschema"
        )

    # Worker 자동 감지
    workers = _find_worker_dirs(workers_base_dir)
    if not workers:
        details.append(
            f"SKIP: No workers found (no models.py) under {workers_base_dir}"
        )
        return {"status": "SKIP", "details": details}

    details.append(f"Found {len(workers)} worker(s): {[w['name'] for w in workers]}")

    # 각 Worker 검증
    overall_has_fail = False
    overall_has_warn = False

    for worker in workers:
        details.append(f"\n--- Worker: {worker['name']} ({worker['dir']}) ---")
        w_status, w_details = _verify_worker(
            worker=worker,
            schemas_dir=schemas_dir,
            jsonschema_mod=jsonschema_mod,
            jsonschema_missing=jsonschema_missing,
        )
        details.extend(w_details)
        details.append(f"--- Worker '{worker['name']}' result: {w_status} ---")

        if w_status == "FAIL":
            overall_has_fail = True
        elif w_status == "WARN":
            overall_has_warn = True

    if overall_has_fail:
        final_status = "FAIL"
    elif overall_has_warn:
        final_status = "WARN"
    else:
        final_status = "PASS"

    return {"status": final_status, "details": details}


if __name__ == "__main__":
    import sys
    import json as _json

    task = sys.argv[1] if len(sys.argv) > 1 else "test"
    wdir = sys.argv[2] if len(sys.argv) > 2 else None
    sdir = sys.argv[3] if len(sys.argv) > 3 else None

    result = verify(task, workers_base_dir=wdir, schemas_dir=sdir)
    print(_json.dumps(result, ensure_ascii=False, indent=2))
