"""8 source 동시 조회 collector.

회장 verbatim 필수 구현 9 의 1~8 source 를 한 번의 호출로 수집한다.
9 번째(status enum)는 status_classifier 의 책임이다.

source 8 (회장 verbatim):
  1. legacy worktree path 확인
  2. cokacdir workspace path 확인
  3. schedule_id 기반 workspace dir 확인
  4. wt-<task_id>-<team> glob (두 위치 모두)
  5. executor process (ps + pattern match)
  6. schedule_history (<schedule_id>.log)
  7. result/report/done marker (memory/events + memory/reports)
  8. callback_inbox.acked (memory/.callback_inbox/*.acked)

self-attestation 단독 인정 금지 (ANCHOR-2). callback evidence 는
result/report/done marker 와 callback_inbox.acked 양쪽을 인정하되
최소 2 source 교차가 필요하다(분류는 status_classifier 가 수행).
"""

from __future__ import annotations

import json
import subprocess
from dataclasses import dataclass, field
from pathlib import Path
from typing import Callable, Optional, Sequence

from .path_resolver import (
    COKACDIR_WORKSPACE_ROOT,
    LEGACY_WORKTREE_ROOT,
    SCHEDULE_HISTORY_ROOT,
    WorktreeCandidates,
    resolve_worktree_candidates,
)


ProcessLister = Callable[[], Sequence[str]]


def _default_process_lister() -> Sequence[str]:
    """기본 ps lister.

    `ps -e -o pid,args` 형태로 한 줄씩 반환. 호출 실패 시 빈 리스트.
    실제 ANU 측정 환경(예: 컨테이너)에 따라 ps 가 없을 수 있으므로
    실패해도 silent drop 단정으로 직결되지 않도록 한다 — status_classifier
    가 process 단독 부재로 silent drop 을 단정하지 않음.
    """
    try:
        completed = subprocess.run(
            ["ps", "-e", "-o", "pid,args"],
            capture_output=True,
            text=True,
            timeout=5,
            check=False,
        )
    except (FileNotFoundError, subprocess.TimeoutExpired):
        return ()
    if completed.returncode != 0:
        return ()
    return tuple(completed.stdout.splitlines())


def _read_schedule_history(log_path: Path) -> tuple[bool, tuple[dict, ...], Optional[str]]:
    """schedule_history JSONL 을 파싱한다.

    반환: (present, records_tuple, last_status)
    각 줄은 JSON object. 파싱 실패 줄은 무시.
    last_status 는 마지막 record 의 status 필드.
    """
    if not log_path.exists() or not log_path.is_file():
        return False, (), None
    records: list[dict] = []
    try:
        with log_path.open("r", encoding="utf-8", errors="replace") as fh:
            for raw in fh:
                raw = raw.strip()
                if not raw:
                    continue
                try:
                    obj = json.loads(raw)
                except json.JSONDecodeError:
                    continue
                if isinstance(obj, dict):
                    records.append(obj)
    except OSError:
        return False, (), None

    last_status: Optional[str] = None
    for rec in reversed(records):
        cand = rec.get("status")
        if isinstance(cand, str):
            last_status = cand
            break

    return True, tuple(records), last_status


def _collect_marker_paths(memory_dir: Path, task_id: str) -> dict:
    events_dir = memory_dir / "events"
    reports_dir = memory_dir / "reports"

    done_path = events_dir / f"{task_id}.done"
    report_path = reports_dir / f"{task_id}.md"

    result_matches: tuple[Path, ...] = ()
    if events_dir.exists() and events_dir.is_dir():
        result_matches = tuple(
            sorted(events_dir.glob(f"{task_id}.*-result-*.json"), key=lambda p: str(p))
        )

    return {
        "done_present": done_path.exists() and done_path.is_file(),
        "done_path": str(done_path),
        "report_present": report_path.exists() and report_path.is_file(),
        "report_path": str(report_path),
        "result_marker_paths": tuple(str(p) for p in result_matches),
        "result_present": bool(result_matches),
    }


def _collect_callback_inbox(memory_dir: Path, task_id: str) -> dict:
    inbox = memory_dir / ".callback_inbox"
    if not inbox.exists() or not inbox.is_dir():
        return {"acked_present": False, "acked_paths": ()}
    short = task_id.removeprefix("task-")
    matches: list[Path] = []
    seen: set[str] = set()
    for pattern in (f"*{task_id}*.acked", f"*{short}*.acked"):
        for p in inbox.glob(pattern):
            if p.is_file():
                key = str(p)
                if key not in seen:
                    seen.add(key)
                    matches.append(p)
    matches.sort(key=lambda p: str(p))
    return {
        "acked_present": bool(matches),
        "acked_paths": tuple(str(p) for p in matches),
    }


def _process_pattern_match(
    processes: Sequence[str], task_id: str, team_short: Optional[str]
) -> tuple[bool, tuple[str, ...]]:
    """executor process pattern match.

    봇 spawn 의 별칭/분기를 모두 고려해 patterns 다수 사용.
    """
    short = task_id.removeprefix("task-")
    patterns = [
        f"wt-{short}",
        f"task/{task_id}",
        f"task-{task_id}",
        f"task_id={task_id}",
        f"--task-id {task_id}",
        f"--task {task_id}",
        f"/{task_id}.",
        f"/{task_id}/",
    ]
    if team_short:
        patterns.append(f"wt-{short}-{team_short}")
        patterns.append(f"task-{task_id}-{team_short}")
    matches: list[str] = []
    for line in processes:
        if any(p in line for p in patterns):
            matches.append(line)
    return bool(matches), tuple(matches)


@dataclass(frozen=True)
class SourceSnapshot:
    """8 source 수집 결과."""

    task_id: str
    team_short: Optional[str]
    schedule_id: Optional[str]
    # 1+2+3+4
    worktree_candidates: WorktreeCandidates
    # 5
    executor_process_present: bool
    executor_process_matches: tuple[str, ...]
    # 6
    schedule_history_present: bool
    schedule_history_last_status: Optional[str]
    schedule_history_records: tuple[dict, ...] = field(default_factory=tuple)
    # 7
    done_present: bool = False
    done_path: str = ""
    report_present: bool = False
    report_path: str = ""
    result_present: bool = False
    result_marker_paths: tuple[str, ...] = field(default_factory=tuple)
    # 8
    callback_inbox_acked_present: bool = False
    callback_inbox_acked_paths: tuple[str, ...] = field(default_factory=tuple)

    @property
    def legacy_worktree_present(self) -> bool:
        return bool(self.worktree_candidates.legacy_matches)

    @property
    def cokacdir_worktree_present(self) -> bool:
        return bool(self.worktree_candidates.cokacdir_matches)

    @property
    def schedule_workspace_dir_present(self) -> bool:
        return self.worktree_candidates.schedule_workspace_dir is not None

    @property
    def wt_dir_matches(self) -> tuple[str, ...]:
        return tuple(str(p) for p in self.worktree_candidates.all_matches)

    def positive_sources(self) -> tuple[str, ...]:
        """양성으로 관측된 source 이름 목록(2 source 교차 룰에 사용)."""
        out: list[str] = []
        if self.legacy_worktree_present:
            out.append("legacy_worktree")
        if self.cokacdir_worktree_present:
            out.append("cokacdir_worktree")
        if self.schedule_workspace_dir_present:
            out.append("schedule_workspace_dir")
        if self.executor_process_present:
            out.append("executor_process")
        if self.schedule_history_present:
            out.append("schedule_history")
        if self.done_present:
            out.append("done_marker")
        if self.result_present:
            out.append("result_marker")
        if self.report_present:
            out.append("report_marker")
        if self.callback_inbox_acked_present:
            out.append("callback_inbox_acked")
        return tuple(out)

    def callback_evidence_sources(self) -> tuple[str, ...]:
        """callback recovery 증거로 채택 가능한 source.

        ANCHOR-2: result/report/done marker 와 callback_inbox.acked 둘 다
        callback evidence 로 인정한다.
        """
        out: list[str] = []
        if self.done_present:
            out.append("done_marker")
        if self.result_present:
            out.append("result_marker")
        if self.report_present:
            out.append("report_marker")
        if self.callback_inbox_acked_present:
            out.append("callback_inbox_acked")
        return tuple(out)


def collect_sources(
    task_id: str,
    team_short: Optional[str],
    schedule_id: Optional[str],
    *,
    legacy_root: Path = LEGACY_WORKTREE_ROOT,
    cokacdir_root: Path = COKACDIR_WORKSPACE_ROOT,
    schedule_history_dir: Path = SCHEDULE_HISTORY_ROOT,
    memory_dir: Path = Path("/home/jay/workspace/memory"),
    process_lister: Optional[ProcessLister] = None,
    extra_cokacdir_schedule_ids: Sequence[str] = (),
) -> SourceSnapshot:
    """8 source 를 한 번에 수집한다.

    Parameters
    ----------
    task_id: 예) "task-2657"
    team_short: 예) "dev6" (없으면 None)
    schedule_id: 예) "426931FE" (모르면 None — 광역 glob 으로 fallback)
    process_lister: 테스트 의존성 주입. 기본은 `ps -e -o pid,args`.
    """
    candidates = resolve_worktree_candidates(
        task_id,
        team_short,
        schedule_id,
        legacy_root=legacy_root,
        cokacdir_root=cokacdir_root,
        extra_cokacdir_schedule_ids=extra_cokacdir_schedule_ids,
    )

    lister = process_lister or _default_process_lister
    process_lines = tuple(lister())
    process_present, process_matches = _process_pattern_match(
        process_lines, task_id, team_short
    )

    log_path: Optional[Path] = None
    if schedule_id:
        log_path = schedule_history_dir / f"{schedule_id}.log"
    history_present = False
    history_records: tuple[dict, ...] = ()
    history_last_status: Optional[str] = None
    if log_path is not None:
        history_present, history_records, history_last_status = _read_schedule_history(log_path)

    markers = _collect_marker_paths(memory_dir, task_id)
    inbox = _collect_callback_inbox(memory_dir, task_id)

    return SourceSnapshot(
        task_id=task_id,
        team_short=team_short,
        schedule_id=schedule_id,
        worktree_candidates=candidates,
        executor_process_present=process_present,
        executor_process_matches=process_matches,
        schedule_history_present=history_present,
        schedule_history_last_status=history_last_status,
        schedule_history_records=history_records,
        done_present=bool(markers["done_present"]),
        done_path=str(markers["done_path"]),
        report_present=bool(markers["report_present"]),
        report_path=str(markers["report_path"]),
        result_present=bool(markers["result_present"]),
        result_marker_paths=tuple(markers["result_marker_paths"]),
        callback_inbox_acked_present=bool(inbox["acked_present"]),
        callback_inbox_acked_paths=tuple(inbox["acked_paths"]),
    )
