#!/usr/bin/env python3
"""
code-review.py

로컬 패턴 매칭 기반 AI 코드리뷰 (LLM 불필요).
Greptile 대체 구현.

기능:
- 하드코딩된 시크릿 탐지 (API 키, 토큰, 비밀번호)
- TODO/FIXME 잔존 탐지
- 미사용 import 탐지 (Python)
- 과도한 함수 길이 탐지 (>50줄)
- git diff 기반 변경 파일 분석
- JSON 출력

Usage:
    python3 code-review.py
    python3 code-review.py --diff-range HEAD~3..HEAD
    python3 code-review.py --files scripts/foo.py scripts/bar.py
    python3 code-review.py --format json
    python3 code-review.py --format summary
"""

from __future__ import annotations

import argparse
import ast
import json
import re
import subprocess
import sys
from pathlib import Path
from typing import TypedDict

sys.path.insert(0, str(Path(__file__).parent.parent))


# ---------------------------------------------------------------------------
# 타입 정의
# ---------------------------------------------------------------------------


class Finding(TypedDict):
    file: str
    line: int
    severity: str  # critical | warning | info
    category: str
    message: str


class ReviewSummary(TypedDict):
    critical: int
    warning: int
    info: int


class ReviewResult(TypedDict):
    files_analyzed: int
    findings: list[Finding]
    summary: ReviewSummary


# ---------------------------------------------------------------------------
# 시크릿 탐지
# ---------------------------------------------------------------------------

# 탐지할 시크릿 패턴 (변수명 기반)
_SECRET_VAR_PATTERNS: list[re.Pattern[str]] = [
    re.compile(r"""(?ix)
        (?:
            api[_\-]?key | secret[_\-]?key | access[_\-]?key | private[_\-]?key |
            auth[_\-]?token | bearer[_\-]?token | password | passwd | pwd |
            token | secret | api[_\-]?secret
        )
        \s*=\s*
        (?P<quote>['"]) (?P<val>[^'"]{4,}) (?P=quote)
        """),
]

# AWS Access Key ID 패턴
_AWS_KEY_PATTERN = re.compile(r"""(AKIA[0-9A-Z]{16})""")

# JWT / Bearer 토큰 (문자열 내 "Bearer <long_token>" 패턴)
_BEARER_PATTERN = re.compile(r"""Bearer\s+(?P<tok>[A-Za-z0-9\-_]{20,}(?:\.[A-Za-z0-9\-_]+)*)""")

# -----BEGIN * KEY----- PEM 헤더
_PEM_PATTERN = re.compile(r"""-----BEGIN\s+[A-Z ]+KEY-----""")

# DB URL with credentials: scheme://user:password@host
_DB_URL_PATTERN = re.compile(r"""(?:postgresql|mysql|mongodb|redis)://[^/\s'"@]+:[^/\s'"@]+@""")

# 환경변수 참조 패턴 (false positive 제외용)
_ENV_REF_PATTERNS: list[re.Pattern[str]] = [
    re.compile(r"""os\.environ"""),
    re.compile(r"""os\.getenv"""),
    re.compile(r"""getenv\("""),
]

# 플레이스홀더 패턴 (false positive 제외)
_PLACEHOLDER_PATTERN = re.compile(r"""<[A-Z_]+>""")

# 빈 문자열 제외
_EMPTY_STRING_PATTERN = re.compile(r"""^['"]?\s*['"]?$""")


def _is_env_reference(line: str) -> bool:
    """줄이 환경변수 참조인지 확인"""
    return any(p.search(line) for p in _ENV_REF_PATTERNS)


def _is_placeholder(value: str) -> bool:
    """값이 플레이스홀더인지 확인"""
    return bool(_PLACEHOLDER_PATTERN.search(value))


def check_hardcoded_secrets(filepath: str, content: str) -> list[Finding]:
    """
    하드코딩된 시크릿 탐지.

    Args:
        filepath: 파일 경로
        content: 파일 내용

    Returns:
        Finding 리스트
    """
    findings: list[Finding] = []
    if not content:
        return findings

    lines = content.splitlines()
    for lineno, line in enumerate(lines, start=1):
        # 환경변수 참조는 건너뜀
        if _is_env_reference(line):
            continue

        # 변수명 기반 패턴 탐지
        for pattern in _SECRET_VAR_PATTERNS:
            m = pattern.search(line)
            if m:
                val = m.group("val")
                # 빈 문자열, 플레이스홀더 제외
                if not val or not val.strip() or _is_placeholder(val):
                    continue
                findings.append(
                    Finding(
                        file=filepath,
                        line=lineno,
                        severity="critical",
                        category="hardcoded_secret",
                        message=(
                            f"Possible hardcoded secret found: '{val[:20]}...' "
                            if len(val) > 20
                            else f"Possible hardcoded secret found: '{val}'"
                        ),
                    )
                )
                break  # 한 줄에서 중복 탐지 방지

        # AWS Access Key
        m_aws = _AWS_KEY_PATTERN.search(line)
        if m_aws:
            findings.append(
                Finding(
                    file=filepath,
                    line=lineno,
                    severity="critical",
                    category="hardcoded_secret",
                    message=f"Possible AWS Access Key found: '{m_aws.group(1)}'",
                )
            )

        # Bearer 토큰
        m_bearer = _BEARER_PATTERN.search(line)
        if m_bearer:
            tok = m_bearer.group("tok")
            findings.append(
                Finding(
                    file=filepath,
                    line=lineno,
                    severity="critical",
                    category="hardcoded_secret",
                    message=f"Possible Bearer token found: '{tok[:20]}...'",
                )
            )

        # PEM 헤더
        if _PEM_PATTERN.search(line):
            findings.append(
                Finding(
                    file=filepath,
                    line=lineno,
                    severity="critical",
                    category="hardcoded_secret",
                    message="PEM private key header found",
                )
            )

        # DB URL with credentials
        if _DB_URL_PATTERN.search(line):
            findings.append(
                Finding(
                    file=filepath,
                    line=lineno,
                    severity="critical",
                    category="hardcoded_secret",
                    message="Database URL with embedded credentials found",
                )
            )

    return findings


# ---------------------------------------------------------------------------
# TODO/FIXME 탐지
# ---------------------------------------------------------------------------

_TODO_PATTERN = re.compile(r"""(?i)#.*?\b(TODO|FIXME|HACK|XXX)\b\s*:?\s*(?P<msg>.*)""")


def check_todos(filepath: str, content: str) -> list[Finding]:
    """
    TODO/FIXME/HACK/XXX 잔존 탐지.

    Args:
        filepath: 파일 경로
        content: 파일 내용

    Returns:
        Finding 리스트
    """
    findings: list[Finding] = []
    if not content:
        return findings

    lines = content.splitlines()
    for lineno, line in enumerate(lines, start=1):
        m = _TODO_PATTERN.search(line)
        if m:
            keyword = m.group(1).upper()
            msg_text = m.group("msg").strip()
            display = f"{keyword}: {msg_text}" if msg_text else keyword
            findings.append(
                Finding(
                    file=filepath,
                    line=lineno,
                    severity="info",
                    category="todo",
                    message=f"TODO found: '{display}'",
                )
            )
    return findings


# ---------------------------------------------------------------------------
# 미사용 import 탐지
# ---------------------------------------------------------------------------


def _extract_imports(
    tree: ast.Module,
) -> list[tuple[int, str, str]]:
    """
    AST에서 import 정보 추출.

    Returns:
        (lineno, module_name, alias) 튜플 리스트
        alias: import 시 사용되는 이름 (import X as Y → Y, from X import Y → Y)
    """
    imports: list[tuple[int, str, str]] = []
    for node in ast.walk(tree):
        if isinstance(node, ast.Import):
            for alias in node.names:
                used_name = alias.asname if alias.asname else alias.name.split(".")[0]
                imports.append((node.lineno, alias.name, used_name))
        elif isinstance(node, ast.ImportFrom):
            for alias in node.names:
                used_name = alias.asname if alias.asname else alias.name
                imports.append((node.lineno, alias.name, used_name))
    return imports


def _get_all_names_in_code(tree: ast.Module, import_lines: set[int]) -> set[str]:
    """
    import 줄을 제외한 코드에서 사용된 모든 이름 수집.
    """
    names: set[str] = set()
    for node in ast.walk(tree):
        if isinstance(node, (ast.Name, ast.Attribute)):
            if isinstance(node, ast.Name):
                names.add(node.id)
            elif isinstance(node, ast.Attribute) and isinstance(node.value, ast.Name):
                names.add(node.value.id)
    return names


def check_unused_imports(filepath: str, content: str) -> list[Finding]:
    """
    미사용 import 탐지 (Python 파일만).

    Args:
        filepath: 파일 경로
        content: 파일 내용

    Returns:
        Finding 리스트
    """
    findings: list[Finding] = []
    if not content:
        return findings

    # Python 파일만 처리
    if not filepath.endswith(".py"):
        return findings

    try:
        tree = ast.parse(content)
    except SyntaxError:
        return findings

    imports = _extract_imports(tree)
    if not imports:
        return findings

    import_lines = {lineno for lineno, _, _ in imports}
    used_names = _get_all_names_in_code(tree, import_lines)

    for lineno, module_name, used_name in imports:
        if used_name not in used_names:
            findings.append(
                Finding(
                    file=filepath,
                    line=lineno,
                    severity="warning",
                    category="unused_import",
                    message=f"Unused import: '{used_name}' (from '{module_name}')",
                )
            )

    return findings


# ---------------------------------------------------------------------------
# 함수 길이 탐지
# ---------------------------------------------------------------------------


def check_function_length(filepath: str, content: str, max_lines: int = 50) -> list[Finding]:
    """
    과도한 함수 길이 탐지 (Python 파일만, >max_lines).

    Args:
        filepath: 파일 경로
        content: 파일 내용
        max_lines: 허용 최대 줄 수 (이 값 초과 시 탐지)

    Returns:
        Finding 리스트
    """
    findings: list[Finding] = []
    if not content:
        return findings

    # Python 파일만 처리
    if not filepath.endswith(".py"):
        return findings

    try:
        tree = ast.parse(content)
    except SyntaxError:
        return findings

    for node in ast.walk(tree):
        if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
            start_line = node.lineno
            end_line = node.end_lineno if node.end_lineno is not None else start_line
            func_lines = end_line - start_line + 1
            if func_lines > max_lines:
                findings.append(
                    Finding(
                        file=filepath,
                        line=start_line,
                        severity="warning",
                        category="long_function",
                        message=(f"Function '{node.name}' is {func_lines} lines long " f"(max: {max_lines})"),
                    )
                )

    return findings


# ---------------------------------------------------------------------------
# 파일 리뷰
# ---------------------------------------------------------------------------


def review_file(filepath: str) -> list[Finding]:
    """
    단일 파일에 대해 모든 체커를 실행.

    Args:
        filepath: 파일 경로

    Returns:
        Finding 리스트
    """
    path = Path(filepath)
    if not path.exists():
        return []

    # 바이너리 파일 감지
    try:
        content = path.read_text(encoding="utf-8", errors="strict")
    except (UnicodeDecodeError, PermissionError):
        # 바이너리 파일 또는 읽기 불가 파일 → 건너뜀
        try:
            content = path.read_text(encoding="latin-1")
            # 바이너리 휴리스틱: null bytes 포함 시 바이너리로 간주
            if "\x00" in content:
                return []
        except Exception:
            return []

    findings: list[Finding] = []
    findings.extend(check_hardcoded_secrets(filepath, content))
    findings.extend(check_todos(filepath, content))
    findings.extend(check_unused_imports(filepath, content))
    findings.extend(check_function_length(filepath, content))
    return findings


# ---------------------------------------------------------------------------
# 전체 리뷰 집계
# ---------------------------------------------------------------------------


def review_all(files: list[str]) -> ReviewResult:
    """
    여러 파일에 대해 리뷰를 실행하고 결과를 집계.

    Args:
        files: 파일 경로 리스트

    Returns:
        ReviewResult
    """
    all_findings: list[Finding] = []
    for filepath in files:
        all_findings.extend(review_file(filepath))

    summary = ReviewSummary(
        critical=sum(1 for f in all_findings if f["severity"] == "critical"),
        warning=sum(1 for f in all_findings if f["severity"] == "warning"),
        info=sum(1 for f in all_findings if f["severity"] == "info"),
    )

    return ReviewResult(
        files_analyzed=len(files),
        findings=all_findings,
        summary=summary,
    )


# ---------------------------------------------------------------------------
# git diff 기반 변경 파일 목록
# ---------------------------------------------------------------------------


def get_changed_files(diff_range: str = "HEAD") -> list[str]:
    """
    git diff 기반으로 변경된 파일 목록 반환.

    Args:
        diff_range: git diff 범위 (예: "HEAD", "HEAD~3..HEAD", "--cached")

    Returns:
        변경된 파일 경로 리스트 (git 실패 시 빈 리스트)
    """
    try:
        cmd = ["git", "diff", "--name-only", diff_range]
        result = subprocess.run(
            cmd,
            capture_output=True,
            text=True,
            check=True,
        )
        files = [f.strip() for f in result.stdout.splitlines() if f.strip()]
        return files
    except (subprocess.CalledProcessError, FileNotFoundError, OSError):
        return []


# ---------------------------------------------------------------------------
# 출력 포매터
# ---------------------------------------------------------------------------


def format_json(result: ReviewResult) -> str:
    """JSON 형식으로 출력"""
    return json.dumps(result, indent=2, ensure_ascii=False)


def format_summary(result: ReviewResult) -> str:
    """요약 형식으로 출력"""
    lines = [
        f"Files analyzed: {result['files_analyzed']}",
        f"Critical: {result['summary']['critical']}",
        f"Warning:  {result['summary']['warning']}",
        f"Info:     {result['summary']['info']}",
        f"Total:    {result['summary']['critical'] + result['summary']['warning'] + result['summary']['info']}",
    ]
    if result["findings"]:
        lines.append("")
        lines.append("Findings:")
        for f in result["findings"]:
            lines.append(f"  [{f['severity'].upper():8}] {f['file']}:{f['line']} " f"({f['category']}) {f['message']}")
    return "\n".join(lines)


# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------


def _build_parser() -> argparse.ArgumentParser:
    parser = argparse.ArgumentParser(
        description="로컬 패턴 매칭 기반 코드리뷰 (LLM 불필요)",
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog=__doc__,
    )
    group = parser.add_mutually_exclusive_group()
    group.add_argument(
        "--diff-range",
        default=None,
        help="git diff 범위 (예: HEAD~3..HEAD, --cached). 기본: HEAD",
    )
    group.add_argument(
        "--files",
        nargs="+",
        metavar="FILE",
        help="직접 지정할 파일 경로",
    )
    parser.add_argument(
        "--format",
        choices=["json", "summary"],
        default="json",
        help="출력 형식 (기본: json)",
    )
    return parser


def main(argv: list[str] | None = None) -> int:
    parser = _build_parser()
    args = parser.parse_args(argv)

    # 분석 대상 파일 결정
    if args.files:
        files = list(args.files)
    else:
        diff_range = args.diff_range if args.diff_range else "HEAD"
        files = get_changed_files(diff_range)

    if not files:
        print("분석할 파일이 없습니다.", file=sys.stderr)
        result = review_all([])
    else:
        result = review_all(files)

    # 출력
    if args.format == "summary":
        print(format_summary(result))
    else:
        print(format_json(result))

    # 종료 코드: critical 발견 시 1
    return 1 if result["summary"]["critical"] > 0 else 0


if __name__ == "__main__":
    sys.exit(main())
