"""
git_evidence.py - git 커밋 증거 검증 verifier
task-2031: 코드 커밋 없이 .done 생성 방지

3가지 검증:
1. COMMIT_EXISTS: task ID가 포함된 커밋 최소 1건
2. NO_UNCOMMITTED: uncommitted 변경 없음
3. NON_EMPTY_COMMIT: 마지막 커밋이 빈 커밋이 아님

non-code task (문서만/리서치만) → SKIP
"""

import os
import re
import subprocess
import sys
from typing import Optional

# 워크스페이스 루트를 sys.path 에 등록하여 ``utils.task_id_parser`` 를 안정적으로 import.
# (Gemini PR #47 medium 대응 — 함수 내부 재계산 제거)
_WORKSPACE_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..", "..", ".."))
if _WORKSPACE_ROOT not in sys.path:
    sys.path.insert(0, _WORKSPACE_ROOT)

from utils.task_id_parser import (  # type: ignore[import-not-found]  # noqa: E402
    _TASK_ID_STRICT_PATTERN,
    parse_task_id_v2,
)

# 시스템이 자동으로 생성/수정하는 파일 패턴 (git_evidence 검사에서 제외)
SYSTEM_AUTO_FILES = [
    "memory/heartbeats/",
    "memory/events/",
    "logs/",
    "memory/daily/",
    "whisper/",
    "bot-activity.json",
    "token-ledger.json",
    "memory/pipeline-status.json",
    "memory/preview-state.json",
    "memory/merge-log.json",
    "memory/bot_settings_sync.json",
    "memory/memory-check-log.json",
    "dashboard/data/refine-status.json",
    "dashboard/data/refine-history.json",
    ".heartbeat",
    "memory/.task-counter",
    "memory/task-timers.json",
    "memory/logs/",
    "memory/reports/",
    "memory/tasks/",
    "scripts/gemini_rate_tracker.json",
    "tests/coverage-report.txt",
    "memory/canary-status.json",
    "dashboard/data/medium-comments-log.json",
    "config/constants.json",
]


def _is_system_auto_file(filepath: str) -> bool:
    """시스템이 자동 생성/수정하는 파일인지 판별."""
    normalized = filepath.replace("\\", "/")
    for pattern in SYSTEM_AUTO_FILES:
        if pattern.startswith("."):
            # 확장자 패턴 (예: .heartbeat)
            if normalized.endswith(pattern):
                return True
        elif pattern.endswith("/"):
            # 디렉토리 패턴 (예: memory/heartbeats/)
            if pattern in normalized or normalized.startswith(pattern):
                return True
        else:
            # 파일명 패턴 (예: bot-activity.json)
            if normalized.endswith("/" + pattern) or normalized == pattern:
                return True
    return False


def _is_non_code_task(task_id: str, workspace_root: str) -> bool:
    """task 파일의 ## 레벨 섹션에서 non-code 키워드 판별."""
    task_path = os.path.join(workspace_root, "memory", "tasks", f"{task_id}.md")
    if not os.path.isfile(task_path):
        return False
    try:
        with open(task_path, "r", encoding="utf-8") as f:
            content = f.read()
        # ## 레벨 섹션 내에서만 검사 (본문 설명 텍스트 오탐 방지)
        m = re.search(r"## 레벨\s*\n(.*?)(?=\n## |\Z)", content, re.DOTALL)
        if not m:
            return False
        section = m.group(1)
        return bool(re.search(r"코드 수정 없음|문서 업데이트만|문서만|리서치만", section))
    except OSError:
        return False


def _run_git(args: list[str], cwd: str) -> subprocess.CompletedProcess:
    """git 명령 실행 헬퍼."""
    return subprocess.run(
        ["git"] + args,
        cwd=cwd,
        capture_output=True,
        text=True,
        timeout=30,
    )


def _find_project_root(workspace_root: str) -> str:
    """git rev-parse로 프로젝트 루트 탐지."""
    try:
        result = _run_git(["rev-parse", "--show-toplevel"], workspace_root)
        if result.returncode == 0:
            return result.stdout.strip()
    except (subprocess.TimeoutExpired, OSError):
        pass
    return workspace_root


def _resolve_project_dir_with_source(task_id: str, workspace_root: str) -> tuple:
    """워크트리 경로를 우선순위에 따라 자동 탐지하고, 어떤 소스로 결정됐는지도 반환.

    Returns:
        (project_dir: str, source: str)
        source 라벨: "env_var" | "task_timers" | "task_file" | "fallback"

    우선순위:
    (A) 환경변수 PROJECT_PATH 또는 WORKTREE_PATH + .git 존재  → source="env_var"
    (B) task-timers.json의 worktree_path 또는 project_path   → source="task_timers"
    (C) task 파일에서 "워크트리 경로:" 또는 "프로젝트:" 라인  → source="task_file"
    (D) 폴백: workspace_root에서 git rev-parse              → source="fallback"
    """
    # (A) 환경변수 우선
    for env_var in ("PROJECT_PATH", "WORKTREE_PATH"):
        env_path = os.environ.get(env_var, "").strip()
        if env_path and os.path.isdir(os.path.join(env_path, ".git")):
            return (env_path, "env_var")
        if env_path and os.path.isdir(env_path):
            # .git 파일(worktree 링크 파일)도 허용
            git_file = os.path.join(env_path, ".git")
            if os.path.exists(git_file):
                return (env_path, "env_var")

    # (B) task-timers.json에서 worktree_path / project_path
    timers_path = os.path.join(workspace_root, "memory", "task-timers.json")
    try:
        import json as _json
        with open(timers_path, "r", encoding="utf-8") as f:
            timers = _json.load(f)
        entry = timers.get("tasks", {}).get(task_id, {})
        for field in ("worktree_path", "project_path"):
            candidate = entry.get(field, "").strip()
            if candidate and os.path.isdir(candidate):
                return (candidate, "task_timers")
    except (OSError, ValueError, KeyError):
        pass

    # (C) task 파일에서 "워크트리 경로:" 또는 "프로젝트:" 라인 파싱
    task_path = os.path.join(workspace_root, "memory", "tasks", f"{task_id}.md")
    try:
        with open(task_path, "r", encoding="utf-8") as f:
            for line in f:
                for prefix in ("워크트리 경로:", "프로젝트:"):
                    if prefix in line:
                        candidate = line.split(prefix, 1)[1].strip().strip("`\"'")
                        if candidate and os.path.isdir(candidate):
                            return (candidate, "task_file")
    except OSError:
        pass

    # (D) 폴백: workspace_root에서 git rev-parse
    return (_find_project_root(workspace_root), "fallback")


def _resolve_project_dir(task_id: str, workspace_root: str) -> str:
    """워크트리 경로를 우선순위에 따라 자동 탐지.

    하위 호환 유지: ``_resolve_project_dir_with_source`` 에 위임.
    외부 호출자: ``tests/dev3/test_verifier_fix_pack.py``,
    ``teams/dev7/qc/tests/test_finish_loop_fix.py`` 등.
    """
    return _resolve_project_dir_with_source(task_id, workspace_root)[0]


# 하위 호환 export — 외부에서 ``from ... import _resolve_project_dir`` 형태로 참조하므로
# Pyright의 ``not accessed`` false-positive 회피용 더미 참조.
__all_legacy_exports__ = (_resolve_project_dir,)


def _filter_dirty_to_task_scope(files: list, task_id: str) -> list:
    """dirty 파일 목록 중 현재 task 범위에 해당하는 것만 반환.

    경로 안에 등장하는 task id를 ``utils.task_id_parser`` 의 경계 강화 loose
    패턴으로 추출한 뒤, 추출된 id 가 다음 중 하나에 일치하면 scope 내로 인정한다.

    - 현재 task id 와 정확히 동일 (예: ``task-2472+1`` ↔ ``task-2472+1``)
    - 현재 task id 의 base 와 동일 (예: ``task-2472+1`` ↔ ``task-2472``)
    - 현재 base 가 다른 retry suffix 와 함께 등장 (예: ``task-2472`` ↔ ``task-2472+1``)

    부분문자열 매칭은 사용하지 않으므로 ``task-2472+1`` 검사 시
    ``task-2472+10`` / ``task-24720`` / ``foo-task-2472-bar`` 같은
    경계 위반 케이스가 본 task scope 로 오인되지 않는다.
    """
    # Gemini PR #47 high #1 대응: ``num`` raw 그룹을 명시적으로 사용해 base 구성.
    # parse_task_id_v2 가 매치 실패 시 num=None → fallback to task_id 그대로 사용.
    parsed = parse_task_id_v2(task_id)
    raw_num = parsed.get("num")
    target_base = f"task-{raw_num}" if raw_num else task_id

    result = []
    for f in files:
        # 파일 경로 전체에서 가능한 모든 task id 후보를 추출 (경계 강화 strict 패턴)
        matched = False
        for m in _TASK_ID_STRICT_PATTERN.finditer(f):
            candidate_full = m.group(0)
            # 정확 매치: 현재 task id 와 후보 전체 일치
            if candidate_full == task_id:
                matched = True
                break
            # base 만 후보: 본 task 의 base (예: task-2472+1 의 task-2472) 와 동일
            # task-2472+10 같이 다른 retry/phase 가 붙은 후보는 다른 task 로 간주.
            if candidate_full == target_base:
                matched = True
                break
        if matched:
            result.append(f)
    return result


def _safe_grep_pattern(task_id: str) -> str:
    """grep 패턴 반환. git log는 --fixed-strings 플래그와 함께 호출하므로 escape 불필요.

    fallback (--fixed-strings 미지원 환경)을 위해 BRE 메타문자 escape 결과도 함께 제공.
    호출부에서는 보통 task_id 원본을 --fixed-strings 와 같이 사용한다.
    """
    return task_id


def _get_merge_commit_from_timers(task_id: str, workspace_root: str) -> Optional[str]:
    """task-timers.json 및 events 폴백에서 merge commit SHA 검색."""
    import json as _json

    # 1) task-timers.json
    timers_path = os.path.join(workspace_root, "memory", "task-timers.json")
    try:
        with open(timers_path, "r", encoding="utf-8") as f:
            data = _json.load(f)
        entry = data.get("tasks", {}).get(task_id, {})
        for key in ("merge_commit", "pr_merge_commit", "mergeCommit", "merged_commit_sha"):
            val = entry.get(key, "")
            if val and val.strip():
                return val.strip()
    except (OSError, ValueError, KeyError):
        pass

    # 2) events 폴백
    events_dir = os.path.join(workspace_root, "memory", "events")
    candidate_files = [
        os.path.join(events_dir, f"{task_id}.classifications"),
        os.path.join(events_dir, f"{task_id}.essence-pass-escalated-verifier-limitation"),
    ]
    for fpath in candidate_files:
        try:
            with open(fpath, "r", encoding="utf-8") as f:
                content = f.read().strip()
            if not content:
                continue
            try:
                obj = _json.loads(content)
            except ValueError:
                continue
            # multi-path 검색
            for path_keys in (
                ("merge_evidence", "merge_commit"),
                ("amendment_not_enforced_evidence", "task_2503_merge_commit"),
                ("merge_commit",),
                ("mergeCommit",),
                ("merged_commit_sha",),
            ):
                val = obj
                try:
                    for k in path_keys:
                        val = val[k]
                    if isinstance(val, str) and val.strip():
                        return val.strip()
                except (KeyError, TypeError):
                    pass
        except OSError:
            continue

    return None


def _search_commit_in_workspaces(
    task_id: str, workspace_root: str
) -> tuple[list[str], str]:
    """여러 workspace 디렉토리에서 task_id 커밋 검색.

    Returns:
        (commit_lines, found_dir) — 첫 번째로 1건 이상 발견된 디렉토리 결과.
        모두 0건이면 ([], primary_dir).
    """
    primary_dir = _resolve_project_dir(task_id, workspace_root)
    fallback_env = os.environ.get("WORKSPACE_ROOT_FALLBACK", "/home/jay/workspace")

    # 중복 제거하되 순서 유지
    seen: set[str] = set()
    dirs: list[str] = []
    for d in (primary_dir, fallback_env, workspace_root):
        if d and d not in seen:
            seen.add(d)
            dirs.append(d)

    safe_pattern = _safe_grep_pattern(task_id)

    for d in dirs:
        if not os.path.isdir(d):
            continue
        try:
            result = subprocess.run(
                [
                    "git", "log", "--oneline", "--all",
                    "--fixed-strings", f"--grep={safe_pattern}",
                ],
                cwd=d,
                capture_output=True,
                text=True,
                timeout=30,
            )
            lines = [l for l in result.stdout.strip().splitlines() if l.strip()]
            if lines:
                return (lines, d)
        except (subprocess.TimeoutExpired, OSError):
            continue

    return ([], primary_dir)


def _ensure_origin_main_fetched(proj_dir: str) -> None:
    """git fetch origin main --quiet (silent, timeout=5s)."""
    try:
        subprocess.run(
            ["git", "fetch", "origin", "main", "--quiet"],
            cwd=proj_dir,
            capture_output=True,
            text=True,
            timeout=5,
        )
    except (subprocess.TimeoutExpired, OSError):
        pass


def verify(task_id: str, workspace_root: str = "/home/jay/workspace") -> dict:
    """
    git 커밋 증거 검증.

    Returns:
        {"status": "PASS"|"FAIL"|"SKIP", "details": [...]}
    """
    # non-code task → SKIP
    if _is_non_code_task(task_id, workspace_root):
        return {
            "status": "SKIP",
            "details": ["non-code task (문서/리서치) — git 검증 SKIP"],
        }

    proj_dir, resolved_via = _resolve_project_dir_with_source(task_id, workspace_root)
    is_worktree = resolved_via != "fallback"
    _ensure_origin_main_fetched(proj_dir)
    details = [f"resolved_via={resolved_via} dir={proj_dir}"]
    failed = []

    # 1) COMMIT_EXISTS: task ID 커밋 최소 1건 (worktree → main workspace 폴백)
    commit_lines: list[str] = []
    search_dir = proj_dir
    try:
        commit_lines, search_dir = _search_commit_in_workspaces(task_id, workspace_root)
    except (subprocess.TimeoutExpired, OSError) as e:
        details.append(f"WARN COMMIT_EXISTS: git 검색 일부 실패 — {e}")

    # task-timers / events에서 mergeCommit evidence
    merge_commit_evidence = _get_merge_commit_from_timers(task_id, workspace_root)

    if commit_lines:
        details.append(f"PASS COMMIT_EXISTS: {task_id} 커밋 {len(commit_lines)}건 (검색위치={search_dir})")
    elif merge_commit_evidence:
        details.append(f"PASS COMMIT_EXISTS: mergeCommit evidence (sha={merge_commit_evidence[:8]}, source=timers/events)")
    else:
        details.append(f"FAIL COMMIT_EXISTS: {task_id} 커밋 0건 + mergeCommit evidence 없음")
        failed.append("COMMIT_EXISTS")

    # 2) NO_UNCOMMITTED: uncommitted 변경 없음 (시스템 자동 파일 제외)
    try:
        diff_result = _run_git(["diff", "--name-only"], proj_dir)
        cached_result = _run_git(["diff", "--cached", "--name-only"], proj_dir)
        diff_files = [f for f in diff_result.stdout.strip().splitlines() if f.strip()]
        cached_files = [f for f in cached_result.stdout.strip().splitlines() if f.strip()]
        # 시스템 자동 파일 제외
        real_diff = [f for f in diff_files if not _is_system_auto_file(f)]
        real_cached = [f for f in cached_files if not _is_system_auto_file(f)]
        excluded_count = len(diff_files) + len(cached_files) - len(real_diff) - len(real_cached)

        if is_worktree:
            # worktree 모드: 기존 로직 그대로 (real_diff 또는 real_cached 있으면 FAIL)
            if real_diff or real_cached:
                details.append(f"FAIL NO_UNCOMMITTED: uncommitted 변경 존재 ({len(real_diff)} unstaged, {len(real_cached)} staged)")
                failed.append("NO_UNCOMMITTED")
            else:
                msg = "PASS NO_UNCOMMITTED: uncommitted 변경 없음"
                if excluded_count > 0:
                    msg += f" (시스템 자동 파일 {excluded_count}건 제외)"
                details.append(msg)
        else:
            # fallback 모드 (main repo): task scope 내 dirty만 검사, 다른 task dirty는 무시
            task_scope_diff = _filter_dirty_to_task_scope(real_diff, task_id)
            task_scope_cached = _filter_dirty_to_task_scope(real_cached, task_id)
            other_dirty_count = (len(real_diff) + len(real_cached)) - (len(task_scope_diff) + len(task_scope_cached))
            if task_scope_diff or task_scope_cached:
                details.append(
                    f"FAIL NO_UNCOMMITTED: task scope 내 uncommitted 변경 존재 "
                    f"({len(task_scope_diff)} unstaged, {len(task_scope_cached)} staged)"
                    + (f" (main repo fallback, 다른 task scope dirty {other_dirty_count}건 무시)" if other_dirty_count > 0 else "")
                )
                failed.append("NO_UNCOMMITTED")
            else:
                msg = "PASS NO_UNCOMMITTED: uncommitted 변경 없음"
                if excluded_count > 0:
                    msg += f" (시스템 자동 파일 {excluded_count}건 제외)"
                if other_dirty_count > 0:
                    msg += f" (main repo fallback, 다른 task scope dirty {other_dirty_count}건 무시)"
                details.append(msg)
    except (subprocess.TimeoutExpired, OSError) as e:
        details.append(f"FAIL NO_UNCOMMITTED: git 명령 실패 — {e}")
        failed.append("NO_UNCOMMITTED")

    # 3) NON_EMPTY_COMMIT: 빈 커밋 방어
    # sha 조회 우선순위: commit_lines 첫 줄 → merge_commit_evidence → SKIP
    last_hash = ""
    sha_source = ""
    if commit_lines:
        first_line = commit_lines[0].strip()
        last_hash = first_line.split()[0] if first_line else ""
        sha_source = "commit_lines"
    if not last_hash and merge_commit_evidence:
        last_hash = merge_commit_evidence
        sha_source = "timers/events"

    if not last_hash:
        details.append("SKIP NON_EMPTY_COMMIT: task ID 커밋 없음 (COMMIT_EXISTS에서 처리)")
    else:
        try:
            diff_files_result = subprocess.run(
                ["git", "diff", "--name-only", f"{last_hash}^..{last_hash}"],
                cwd=search_dir,
                capture_output=True,
                text=True,
                timeout=30,
            )
            if diff_files_result.returncode != 0:
                # sha^ 부재(initial commit) 등 — fallback: git show
                show_result = subprocess.run(
                    ["git", "show", "--name-only", "--pretty=format:", last_hash],
                    cwd=search_dir,
                    capture_output=True,
                    text=True,
                    timeout=30,
                )
                file_lines = [l for l in show_result.stdout.strip().splitlines() if l.strip()]
            else:
                file_lines = [l for l in diff_files_result.stdout.strip().splitlines() if l.strip()]
            diff_file_count = len(file_lines)
            if diff_file_count == 0:
                # mergeCommit이지만 search_dir에 sha가 없으면 SKIP
                if sha_source == "timers/events":
                    details.append(f"SKIP NON_EMPTY_COMMIT: mergeCommit sha={last_hash[:8]} not found in {search_dir} (fetch 실패 등)")
                else:
                    details.append("FAIL NON_EMPTY_COMMIT: 빈 커밋(변경 파일 0건)")
                    failed.append("NON_EMPTY_COMMIT")
            else:
                details.append(f"PASS NON_EMPTY_COMMIT: 변경 파일 {diff_file_count}건")
        except (subprocess.TimeoutExpired, OSError) as e:
            details.append(f"FAIL NON_EMPTY_COMMIT: git 명령 실패 — {e}")
            failed.append("NON_EMPTY_COMMIT")

    if failed:
        return {"status": "FAIL", "details": details, "failed_checks": failed}

    return {"status": "PASS", "details": details}
