"""Merge Topology Gate — 회장 §1 정책 enforcement (task-2503).

병렬 작업의 merge topology를 dispatch 단계에서 자동 판정하여
task-2487+1 / task-2502 사고 재발을 방지한다.

정책 본체: memory/feedback/feedback_merge_topology_gate_260508.md
"""

from __future__ import annotations

import json
import logging
import os
import re
from dataclasses import dataclass, field
from datetime import datetime, timezone, timedelta
from pathlib import Path
from typing import Optional, Callable

WORKSPACE = Path(os.environ.get("WORKSPACE_ROOT", "/home/jay/workspace"))
AUDIT_LOG_PATH = WORKSPACE / "memory" / "orchestration-audit" / "merge-topology-gate.jsonl"
SCHEMA_PATH = WORKSPACE / "memory" / "specs" / "merge-topology-gate-schema.yml"

logger = logging.getLogger(__name__)

# ─── Decision values ───────────────────────────────────────────────────────
ALLOW = "ALLOW"
LIMITED_PARALLEL = "LIMITED_PARALLEL"
BLOCK = "BLOCK"
REQUIRE_CHAIR_OVERRIDE = "REQUIRE_CHAIR_OVERRIDE"
ALLOW_WITH_CHAIR_OVERRIDE = "ALLOW_WITH_CHAIR_OVERRIDE"  # task-2503+1 §f

# ─── Reason codes (회장 §4 9 룰) ───────────────────────────────────────────
REASON_METADATA_MISSING = "METADATA_MISSING"
REASON_DUPLICATE_FILE = "DUPLICATE_FILE"
REASON_DUPLICATE_FUNCTION = "DUPLICATE_FUNCTION"
REASON_DUPLICATE_VERIFIER = "DUPLICATE_VERIFIER"
REASON_DUPLICATE_LIFECYCLE = "DUPLICATE_LIFECYCLE"
REASON_MISSING_DEPENDENCY = "MISSING_DEPENDENCY"
REASON_CHERRY_PICK_REQUESTED = "CHERRY_PICK_REQUESTED"
REASON_QUEUE_POSITION_MISSING = "QUEUE_POSITION_MISSING"
REASON_PARALLEL_SAFE_FALSE_DECLARATION = "PARALLEL_SAFE_FALSE_DECLARATION"
REASON_STALE_RECHECK_REQUIRED = "STALE_RECHECK_REQUIRED"

# 7 필수 metadata 키
_REQUIRED_KEYS = [
    "expected_files",
    "risk_area",
    "dependency",
    "parallel_policy",
    "merge_queue_position",
    "stale_recheck_required",
    "cherry_pick_allowed",
]

_VALID_PARALLEL_POLICY = {"parallel_safe", "limited_parallel", "serial_only"}


@dataclass
class TopologyDecision:
    decision: str  # ALLOW/LIMITED_PARALLEL/BLOCK/REQUIRE_CHAIR_OVERRIDE
    reason_codes: list[str] = field(default_factory=list)
    overlap_score: float = 0.0
    conflicting_tasks: list[str] = field(default_factory=list)
    metadata: dict = field(default_factory=dict)
    active_tasks_snapshot: list[dict] = field(default_factory=list)


# ─── §3.a dependency 문자열 정규화 (task-2503+1) ──────────────────────────
def _parse_dependency_spec(spec: str) -> dict:
    """`"task-2503.merged"` 형태를 task_id + required_state로 분리.

    후방 호환:
      - 'task-2503.merged' → {'task_id': 'task-2503', 'required_state': 'merged'}
      - 'task-2487+1.merged' → {'task_id': 'task-2487+1', 'required_state': 'merged'}
      - 'task-2487+1' (no .merged 접미사) → {'task_id': 'task-2487+1', 'required_state': 'merged'}
        (단순 task ID 명시는 자동 'merged' 추론)
      - 'none' → {'task_id': 'none', 'required_state': 'none'}
    """
    if not isinstance(spec, str) or not spec.strip():
        return {"task_id": "", "required_state": "merged"}

    s = spec.strip()
    if s == "none":
        return {"task_id": "none", "required_state": "none"}

    m = re.match(r"^(task-[\w\d\+\.]+?)\.(merged|done|completed)$", s)
    if m:
        return {"task_id": m.group(1), "required_state": m.group(2)}

    if re.match(r"^task-[\w\d\+\.]+$", s):
        return {"task_id": s, "required_state": "merged"}

    return {"task_id": s, "required_state": "merged"}


# ─── §3.b merged evidence 4 충족 경로 (task-2503+1) ───────────────────────
_MERGED_VERIFY_CACHE: dict[str, tuple[bool, str]] = {}


def _verify_merged_state(
    task_id: str,
    *,
    workspace: Optional[Path] = None,
    use_gh: bool = False,
    use_git: bool = True,
    cache: Optional[dict] = None,
) -> tuple[bool, str]:
    """task가 merged 됐는지 4 경로 중 하나로 검증.

    경로 우선순위 (저비용 → 고비용):
      1. memory/events/{task_id}.done 존재
      2. memory/reports/{task_id}.md 내 'merged'/'mergeCommit' evidence 라인
      3. task-timers.json `merge_commit` 필드
      4. git log --grep=task_id --merges (use_git=True 시)
      5. gh pr list (use_gh=True 시 — 호출 비용 큼, default off)

    반환: (satisfied: bool, evidence_kind: str)
      evidence_kind ∈ {'done_event', 'report_evidence', 'task_timer_merge_commit',
                       'git_log_merges', 'gh_pr_merged', 'unsatisfied'}
    """
    ws = workspace or WORKSPACE
    cache_obj = cache if cache is not None else _MERGED_VERIFY_CACHE

    if task_id in cache_obj:
        return cache_obj[task_id]

    # 1. memory/events/{task_id}.done
    done_file = ws / "memory" / "events" / f"{task_id}.done"
    if done_file.exists():
        result = (True, "done_event")
        cache_obj[task_id] = result
        return result

    # 2. memory/reports/{task_id}.md 내 evidence
    report_file = ws / "memory" / "reports" / f"{task_id}.md"
    if report_file.exists():
        try:
            text = report_file.read_text(encoding="utf-8", errors="ignore")
            # 'merged' 또는 'mergeCommit' 라인 — 단순 문자열 'merged' 등장만으로는 부족.
            # 회장 §3.b: "merged 또는 mergeCommit evidence 라인 존재"
            for line in text.splitlines():
                low = line.lower()
                if "mergecommit" in low or "merge commit" in low:
                    result = (True, "report_evidence")
                    cache_obj[task_id] = result
                    return result
                if re.search(r"\bmerged\b", low) and (
                    "main" in low or "pr #" in low
                ):
                    result = (True, "report_evidence")
                    cache_obj[task_id] = result
                    return result
        except Exception:
            pass

    # 3. task-timers.json merge_commit
    tt_path = ws / "memory" / "task-timers.json"
    if tt_path.exists():
        try:
            with open(tt_path, "r", encoding="utf-8") as f:
                tt_data = json.load(f)
            tinfo = tt_data.get("tasks", {}).get(task_id, {})
            mc = tinfo.get("merge_commit")
            if mc:
                result = (True, "task_timer_merge_commit")
                cache_obj[task_id] = result
                return result
        except Exception:
            pass

    # 4. git log --grep
    if use_git:
        try:
            import subprocess
            cp = subprocess.run(
                ["git", "log", "--all", "-F", f"--grep=[{task_id}]", "--oneline", "-1"],
                cwd=str(ws),
                capture_output=True,
                text=True,
                timeout=5,
            )
            if cp.returncode == 0 and cp.stdout.strip():
                result = (True, "git_log_merges")
                cache_obj[task_id] = result
                return result
        except Exception:
            pass

    # 5. gh pr list (opt-in)
    if use_gh:
        try:
            import subprocess
            cp = subprocess.run(
                ["gh", "pr", "list", "--state", "merged", "--search", f"[{task_id}]",
                 "--limit", "1", "--json", "number,mergedAt"],
                cwd=str(ws),
                capture_output=True,
                text=True,
                timeout=8,
            )
            if cp.returncode == 0 and cp.stdout.strip():
                payload = json.loads(cp.stdout)
                if payload and any(p.get("mergedAt") for p in payload):
                    result = (True, "gh_pr_merged")
                    cache_obj[task_id] = result
                    return result
        except Exception:
            pass

    result = (False, "unsatisfied")
    cache_obj[task_id] = result
    return result


# ─── §3.c merged task active/conflict 제외 ────────────────────────────────
def _filter_active_tasks(
    tasks: list,
    *,
    exclude_merged: bool = True,
    workspace: Optional[Path] = None,
    cache: Optional[dict] = None,
) -> list:
    """task-timer 미정리로 active처럼 보이는 task라도 merged evidence가 있으면 제외.

    회장 §3.c: 'merged evidence가 우선' — task-timer status보다 우선한다.
    """
    if not exclude_merged or not tasks:
        return list(tasks)

    filtered: list = []
    for t in tasks:
        tid = t.get("task_id") or ""
        if not tid:
            filtered.append(t)
            continue
        verify_result = _verify_merged_state(
            tid,
            workspace=workspace,
            use_gh=False,
            use_git=True,
            cache=cache,
        )
        if verify_result[0]:
            continue  # merged → active/conflict 후보에서 제외
        filtered.append(t)
    return filtered


# ─── §3.d expected_files / risk_area / verifier overlap 계산 ──────────────
def _compute_overlap(spec_a: dict, spec_b: dict) -> dict:
    """두 task spec 간 overlap 점수 산출.

    반환:
      {
        'files': set[str],   # expected_files 교집합
        'risk_area': set[str],  # risk_area '/' 토큰 교집합
        'verifier': bool,    # 양쪽 모두 verifier_layer 포함 여부
        'mutation_risk': bool,  # files 교집합 OR verifier overlap (실질 mutation)
      }
    """
    files_a = set(spec_a.get("expected_files", []) or [])
    files_b = set(spec_b.get("expected_files", []) or [])
    files_overlap = files_a & files_b

    risk_a = str(spec_a.get("risk_area", ""))
    risk_b = str(spec_b.get("risk_area", ""))
    parts_a = {p.strip() for p in risk_a.split("/") if p.strip()}
    parts_b = {p.strip() for p in risk_b.split("/") if p.strip()}
    risk_overlap = parts_a & parts_b

    verifier_overlap = "verifier_layer" in parts_a and "verifier_layer" in parts_b

    return {
        "files": files_overlap,
        "risk_area": risk_overlap,
        "verifier": verifier_overlap,
        "mutation_risk": bool(files_overlap) or verifier_overlap,
    }


# ─── §3.e read-only report task 예외 ──────────────────────────────────────
_READ_ONLY_PATH_PREFIXES = (
    "memory/reports/",
    "memory/orchestration/",
)
_READ_ONLY_REQUIRED_FORBIDDEN_ACTIONS = {
    "any_code_modification",
    "any_test_modification",
    "any_pr_modification",
    "any_branch_modification",
}


def _is_pure_read_only(spec: dict) -> bool:
    """read-only report task 판정.

    조건 (모두 충족):
      - expected_files이 memory/reports/** 또는 memory/orchestration/** 아래 단 1건
      - forbidden_actions 4건 모두 포함
      - parallel_policy 'parallel_safe' 또는 risk_area 'read_only_*'

    forbidden_actions는 spec에 없거나 task_md 내 별도 키로 들어올 수 있어
    classify 호출자가 풍부한 spec을 줄 때만 의미가 있다.
    """
    files = spec.get("expected_files") or []
    if not isinstance(files, list) or len(files) != 1:
        return False
    fp = str(files[0]).strip().strip('"').strip("'")
    if not any(fp.startswith(p) for p in _READ_ONLY_PATH_PREFIXES):
        return False

    forbidden = spec.get("forbidden_actions") or []
    forbidden_set = {str(x).strip() for x in forbidden}
    if not _READ_ONLY_REQUIRED_FORBIDDEN_ACTIONS.issubset(forbidden_set):
        return False

    pp = str(spec.get("parallel_policy", "")).strip()
    risk = str(spec.get("risk_area", "")).strip()
    if pp == "parallel_safe":
        return True
    if risk.startswith("read_only_") or risk == "read_only":
        return True
    return False


def parse_topology_metadata(task_desc: str) -> dict:
    """task_desc fenced ```yaml 블록에서 7 metadata를 추출.

    expected_files, risk_area, dependency, parallel_policy, merge_queue_position,
    stale_recheck_required, cherry_pick_allowed 키를 dict로 반환.
    누락 키는 제외. yaml 파싱 실패 시 빈 dict.

    참조 패턴: dispatch/__init__.py의 _parse_allowed_resources() 함수.
    fenced yaml 블록을 모두 찾아 첫 번째 매칭 블록을 사용.
    """
    fenced_blocks = re.findall(r"```yaml\s*\n(.*?)```", task_desc, re.DOTALL)

    for block in fenced_blocks:
        # 7 metadata 키 중 하나라도 포함된 블록 탐색
        if not any(key + ":" in block for key in _REQUIRED_KEYS):
            continue

        # try: import yaml — sanitize_gate.py와 동일 패턴
        try:
            import yaml  # type: ignore[import]
            data = yaml.safe_load(block)
            if isinstance(data, dict):
                result = {}
                for key in _REQUIRED_KEYS:
                    if key in data:
                        result[key] = data[key]
                if result:
                    return result
        except Exception:
            pass

        # yaml import 실패 또는 파싱 실패 시 정규식 fallback
        try:
            return _parse_topology_metadata_regex(block)
        except Exception:
            pass

    return {}


def _parse_topology_metadata_regex(block: str) -> dict:
    """yaml 파서 없이 정규식으로 7 topology metadata 키를 추출하는 fallback."""
    result: dict = {}

    # expected_files (list 형식)
    ef_match = re.search(r"expected_files:\s*\n((?:\s*-\s*.+\n?)*)", block)
    if ef_match:
        items = []
        for line in ef_match.group(1).splitlines():
            line = line.strip()
            if line.startswith("-"):
                item = re.sub(r'^-\s*["\']?|["\']?\s*$', "", line).strip()
                if item:
                    items.append(item)
        if items:
            result["expected_files"] = items
    else:
        # inline list: expected_files: [a, b]
        inline = re.search(r"expected_files:\s*\[([^\]]*)\]", block)
        if inline:
            result["expected_files"] = [
                p.strip().strip("\"'")
                for p in inline.group(1).split(",")
                if p.strip()
            ]

    # risk_area (string)
    ra_match = re.search(r"risk_area:\s*[\"']?(.+?)[\"']?\s*$", block, re.MULTILINE)
    if ra_match:
        result["risk_area"] = ra_match.group(1).strip().strip("\"'")

    # dependency (list or "none")
    dep_match = re.search(r"dependency:\s*\n((?:\s*-\s*.+\n?)*)", block)
    if dep_match:
        items = []
        for line in dep_match.group(1).splitlines():
            line = line.strip()
            if line.startswith("-"):
                item = re.sub(r'^-\s*["\']?|["\']?\s*$', "", line).strip()
                if item:
                    items.append(item)
        result["dependency"] = items if items else "none"
    else:
        dep_inline = re.search(r"dependency:\s*\[([^\]]*)\]", block)
        if dep_inline:
            items = [
                p.strip().strip("\"'")
                for p in dep_inline.group(1).split(",")
                if p.strip()
            ]
            result["dependency"] = items if items else "none"
        else:
            dep_scalar = re.search(r"dependency:\s*[\"']?(\S+)[\"']?\s*$", block, re.MULTILINE)
            if dep_scalar:
                result["dependency"] = dep_scalar.group(1).strip().strip("\"'")

    # parallel_policy (string)
    pp_match = re.search(r"parallel_policy:\s*[\"']?(\S+)[\"']?\s*$", block, re.MULTILINE)
    if pp_match:
        result["parallel_policy"] = pp_match.group(1).strip().strip("\"'")

    # merge_queue_position (int or "n/a")
    mqp_match = re.search(r"merge_queue_position:\s*[\"']?(\S+)[\"']?\s*$", block, re.MULTILINE)
    if mqp_match:
        val = mqp_match.group(1).strip().strip("\"'")
        if val.lower() == "n/a":
            result["merge_queue_position"] = "n/a"
        else:
            try:
                result["merge_queue_position"] = int(val)
            except ValueError:
                result["merge_queue_position"] = val

    # stale_recheck_required (bool)
    srr_match = re.search(r"stale_recheck_required:\s*(\S+)\s*$", block, re.MULTILINE)
    if srr_match:
        val = srr_match.group(1).strip().lower()
        result["stale_recheck_required"] = val in ("true", "yes", "1")

    # cherry_pick_allowed (string)
    cpa_match = re.search(r"cherry_pick_allowed:\s*[\"']?(\S+)[\"']?\s*$", block, re.MULTILINE)
    if cpa_match:
        val = cpa_match.group(1).strip().strip("\"'")
        result["cherry_pick_allowed"] = val

    return result


def validate_metadata(metadata: dict) -> list[str]:
    """7 필드 누락/형식 위반 시 reason_codes 반환. 정상이면 빈 리스트.

    회장 §4 룰 1: METADATA_MISSING.

    검증 항목:
    - 7 필드 모두 존재
    - parallel_policy가 enum에 포함
    - parallel_policy=parallel_safe면 merge_queue_position이 'n/a'
    - parallel_policy != parallel_safe면 merge_queue_position이 양의 정수
    - expected_files가 list이고 비어있지 않음
    - dependency가 list 또는 'none'
    - cherry_pick_allowed가 'false' 또는 'recovery_only' (또는 bool False — YAML 파싱 호환)
    """
    errors: list[str] = []

    # 1. 7 필드 모두 존재 확인
    missing = [k for k in _REQUIRED_KEYS if k not in metadata]
    if missing:
        errors.append(REASON_METADATA_MISSING)
        return errors  # 이후 검증은 의미 없음

    # 2. expected_files: list이고 비어있지 않음
    ef = metadata["expected_files"]
    if not isinstance(ef, list) or len(ef) == 0:
        errors.append(REASON_METADATA_MISSING)

    # 3. parallel_policy enum 검증
    pp = metadata["parallel_policy"]
    if pp not in _VALID_PARALLEL_POLICY:
        errors.append(REASON_METADATA_MISSING)

    # 4. merge_queue_position 검증 (parallel_policy 연동)
    mqp = metadata["merge_queue_position"]
    if pp == "parallel_safe":
        if str(mqp).lower() != "n/a":
            errors.append(REASON_METADATA_MISSING)
    else:
        # limited_parallel / serial_only → 양의 정수 강제
        try:
            pos = int(mqp)
            if pos < 1:
                errors.append(REASON_QUEUE_POSITION_MISSING)
        except (TypeError, ValueError):
            if str(mqp).lower() != "n/a":
                errors.append(REASON_QUEUE_POSITION_MISSING)
            else:
                errors.append(REASON_QUEUE_POSITION_MISSING)

    # 5. dependency: list 또는 "none"
    dep = metadata["dependency"]
    if dep != "none" and not isinstance(dep, list):
        errors.append(REASON_METADATA_MISSING)

    # 6. cherry_pick_allowed: 'false', 'recovery_only', 또는 bool False
    cpa = metadata["cherry_pick_allowed"]
    # YAML 파싱 호환: bool False → "false"
    if isinstance(cpa, bool):
        if cpa is not False:
            errors.append(REASON_METADATA_MISSING)
    elif str(cpa).lower() not in ("false", "recovery_only"):
        errors.append(REASON_METADATA_MISSING)

    return list(dict.fromkeys(errors))  # 중복 제거


def load_active_tasks(
    timer_path: Optional[Path] = None,
    current_task_id: Optional[str] = None,
) -> list[dict]:
    """memory/task-timers.json에서 status=running인 task 목록 반환.

    current_task_id는 제외.
    각 dict: {task_id, expected_files, risk_area, parallel_policy, merge_queue_position, ...}

    task-timers.json은 metadata를 직접 저장하지 않을 수 있으므로,
    각 running task의 task_id에서 memory/tasks/{task_id}.md 파일을 추가로 파싱하여 metadata 보강.
    """
    if timer_path is None:
        timer_path = WORKSPACE / "memory" / "task-timers.json"

    if not timer_path.exists():
        return []

    try:
        with open(timer_path, "r", encoding="utf-8") as f:
            data = json.load(f)
    except Exception as e:
        logger.warning(f"[merge-topology-gate] task-timers.json 읽기 실패: {e}")
        return []

    active: list[dict] = []
    tasks = data.get("tasks", {})

    for task_id, task_info in tasks.items():
        if task_id == current_task_id:
            continue
        if task_info.get("status") != "running":
            continue

        entry: dict = {"task_id": task_id}
        entry.update(task_info)

        # metadata 보강: memory/tasks/{task_id}.md 파싱 시도
        task_md = WORKSPACE / "memory" / "tasks" / f"{task_id}.md"
        if task_md.exists():
            try:
                text = task_md.read_text(encoding="utf-8")
                meta = parse_topology_metadata(text)
                if meta:
                    entry.update(meta)
            except Exception as e:
                logger.debug(f"[merge-topology-gate] {task_id}.md 파싱 실패: {e}")

        active.append(entry)

    return active


def check_dependency_merged(
    dependency_spec: "str | list",
    *,
    gh_pr_resolver: Optional[Callable[[str], dict]] = None,
) -> tuple[bool, list[str]]:
    """dependency 명세가 모두 머지됐는지 확인.

    'none' → (True, [])
    list 항목별 확인:
      - 'task-XXX.merged' 형식 → memory/events/task-XXX.merged 파일 존재
        OR memory/events/task-XXX.done 파일 존재 (간단 fallback)
      - PR resolver가 주어지면 PR mergedAt 확인
    반환: (모두_머지됨, 미머지_list)
    """
    if dependency_spec == "none" or dependency_spec == [] or dependency_spec is None:
        return True, []

    if isinstance(dependency_spec, str):
        items = [dependency_spec]
    else:
        items = list(dependency_spec)

    unmerged: list[str] = []

    for item in items:
        if item == "none":
            continue

        # §3.a 정규화
        parsed = _parse_dependency_spec(str(item))
        tid = parsed["task_id"]
        req = parsed["required_state"]
        if not tid or req == "none":
            continue

        merged = False

        # §3.b 4 충족 경로 (헬퍼 재사용)
        if req == "merged":
            ok = _verify_merged_state(tid, use_gh=False, use_git=True)[0]
            if ok:
                merged = True

        # 후방 호환: 기존 .merged / .done 파일 직접 lookup
        if not merged:
            events_dir = WORKSPACE / "memory" / "events"
            # task-XXX.merged → events/task-XXX.merged
            merged_file = events_dir / item
            if merged_file.exists():
                merged = True
            # fallback: task-XXX.merged → task-XXX.done 파일
            if not merged:
                done_name = re.sub(r"\.merged$", ".done", item)
                done_file = events_dir / done_name
                if done_file.exists():
                    merged = True

        # PR resolver (gh CLI 래퍼 등)
        if not merged and gh_pr_resolver is not None:
            try:
                pr_info = gh_pr_resolver(item)
                if pr_info.get("mergedAt"):
                    merged = True
            except Exception as e:
                logger.debug(f"[merge-topology-gate] PR resolver 실패 ({item}): {e}")

        if not merged:
            unmerged.append(item)

    return (len(unmerged) == 0), unmerged


def classify(
    metadata: dict,
    active_tasks: list[dict],
    *,
    dependency_check: Optional[Callable[["str|list"], tuple[bool, list[str]]]] = None,
) -> TopologyDecision:
    """회장 §4 9 룰 적용.

    1. metadata 누락 → BLOCK + METADATA_MISSING
    2. 동일 파일 (expected_files 교집합) → BLOCK + DUPLICATE_FILE
    3. 동일 함수/state machine (risk_area=lifecycle_state, ssot 동일) → BLOCK + DUPLICATE_FUNCTION
    4. 동일 verifier (risk_area=verifier_layer 동일) → BLOCK + DUPLICATE_VERIFIER
    5. 동일 QC/merge lifecycle (risk_area=lifecycle_state, ci_workflow 동일) → LIMITED_PARALLEL + DUPLICATE_LIFECYCLE
    6. dependency 미머지 → BLOCK + MISSING_DEPENDENCY
    7. cherry_pick_allowed=recovery_only → REQUIRE_CHAIR_OVERRIDE + CHERRY_PICK_REQUESTED
    8. parallel_policy=limited_parallel + merge_queue_position 누락/n/a → BLOCK + QUEUE_POSITION_MISSING
    9. parallel_policy=parallel_safe인데 다른 active task와 expected_files 교집합 발견
       → BLOCK + PARALLEL_SAFE_FALSE_DECLARATION

    overlap_score = (active_tasks와 교집합 파일 수 / max(expected_files 길이, 1))
    conflicting_tasks = 교집합 발생한 active task_id 목록

    우선순위: 1 → 9 순서대로 검사.
    단, reason_codes는 누적해서 모두 기록.
    """
    reason_codes: list[str] = []
    conflicting_tasks: list[str] = []
    overlap_count = 0

    # ── 룰 1: metadata 누락 ────────────────────────────────────────────────
    validation_errors = validate_metadata(metadata)
    if validation_errors:
        # QUEUE_POSITION_MISSING 단독인 경우 분리 처리 (룰 8로 넘김)
        only_queue_missing = (
            validation_errors == [REASON_QUEUE_POSITION_MISSING]
            and REASON_METADATA_MISSING not in validation_errors
        )
        if not only_queue_missing:
            return TopologyDecision(
                decision=BLOCK,
                reason_codes=[REASON_METADATA_MISSING],
                overlap_score=0.0,
                conflicting_tasks=[],
                metadata=metadata,
                active_tasks_snapshot=active_tasks,
            )
        # QUEUE_POSITION_MISSING만 있는 경우 → 룰 8에서 처리하도록 통과

    # §3.c: 이미 merged 된 task는 active/conflict 후보에서 제외
    active_tasks = _filter_active_tasks(active_tasks, exclude_merged=True)

    my_files: set[str] = set(metadata.get("expected_files", []))
    my_risk_area: str = str(metadata.get("risk_area", ""))
    my_risk_parts: set[str] = {p.strip() for p in my_risk_area.split("/")}
    my_pp: str = metadata.get("parallel_policy", "")
    my_mqp = metadata.get("merge_queue_position", "n/a")

    # 파일 교집합 사전 계산
    for task in active_tasks:
        other_files: set[str] = set(task.get("expected_files", []))
        if not other_files:
            continue
        overlap = my_files & other_files
        if overlap:
            overlap_count += len(overlap)
            if task["task_id"] not in conflicting_tasks:
                conflicting_tasks.append(task["task_id"])

    total_overlap_score = overlap_count / max(len(my_files), 1)

    # ── 룰 2: 동일 파일 교집합 ────────────────────────────────────────────
    for task in active_tasks:
        other_files = set(task.get("expected_files", []))
        if my_files & other_files:
            reason_codes.append(REASON_DUPLICATE_FILE)
            break

    # ── 룰 3: 동일 함수/state machine (ssot, lifecycle_state) ─────────────
    ssot_lifecycle = {"ssot", "lifecycle_state"}
    if my_risk_parts & ssot_lifecycle:
        for task in active_tasks:
            other_risk = str(task.get("risk_area", ""))
            other_parts = {p.strip() for p in other_risk.split("/")}
            if (my_risk_parts & ssot_lifecycle) & (other_parts & ssot_lifecycle):
                reason_codes.append(REASON_DUPLICATE_FUNCTION)
                if task["task_id"] not in conflicting_tasks:
                    conflicting_tasks.append(task["task_id"])
                break

    # ── 룰 4: 동일 verifier (verifier_layer) ──────────────────────────────
    if "verifier_layer" in my_risk_parts:
        for task in active_tasks:
            other_risk = str(task.get("risk_area", ""))
            other_parts = {p.strip() for p in other_risk.split("/")}
            if "verifier_layer" in other_parts:
                reason_codes.append(REASON_DUPLICATE_VERIFIER)
                if task["task_id"] not in conflicting_tasks:
                    conflicting_tasks.append(task["task_id"])
                break

    # ── 룰 5: 동일 QC/merge lifecycle (ci_workflow) → LIMITED_PARALLEL ────
    lifecycle_risk = {"lifecycle_state", "ci_workflow"}
    limited_lifecycle_conflict = False
    if my_risk_parts & lifecycle_risk:
        for task in active_tasks:
            other_risk = str(task.get("risk_area", ""))
            other_parts = {p.strip() for p in other_risk.split("/")}
            if (my_risk_parts & lifecycle_risk) & (other_parts & lifecycle_risk):
                reason_codes.append(REASON_DUPLICATE_LIFECYCLE)
                limited_lifecycle_conflict = True
                if task["task_id"] not in conflicting_tasks:
                    conflicting_tasks.append(task["task_id"])
                break

    # ── 룰 6: dependency 미머지 ────────────────────────────────────────────
    dep = metadata.get("dependency", "none")
    if dependency_check is not None:
        _dep_ok, _unmerged_deps = dependency_check(dep)
    else:
        _dep_ok, _unmerged_deps = check_dependency_merged(dep)
    if not _dep_ok:
        reason_codes.append(REASON_MISSING_DEPENDENCY)
        for _udep in _unmerged_deps:
            if _udep not in conflicting_tasks:
                conflicting_tasks.append(_udep)

    # ── 룰 7: cherry_pick_allowed=recovery_only ────────────────────────────
    cpa = metadata.get("cherry_pick_allowed", "false")
    # YAML 파싱 호환: bool False → 'false'
    if isinstance(cpa, bool):
        cpa_str = "false"
    else:
        cpa_str = str(cpa).lower()
    if cpa_str == "recovery_only":
        reason_codes.append(REASON_CHERRY_PICK_REQUESTED)

    # ── 룰 8: limited_parallel + merge_queue_position 누락/n/a ────────────
    if my_pp == "limited_parallel":
        try:
            pos = int(my_mqp)
            if pos < 1:
                reason_codes.append(REASON_QUEUE_POSITION_MISSING)
        except (TypeError, ValueError):
            reason_codes.append(REASON_QUEUE_POSITION_MISSING)

    # ── 룰 9: parallel_safe 허위 선언 ─────────────────────────────────────
    # §3.d: expected_files 교집합 0 + mutation risk 0이면 PARALLEL_SAFE_FALSE_DECLARATION 금지
    if my_pp == "parallel_safe":
        true_mutation_overlap = False
        for task in active_tasks:
            ov = _compute_overlap(metadata, task)
            if ov["mutation_risk"]:
                true_mutation_overlap = True
                break
        if true_mutation_overlap:
            reason_codes.append(REASON_PARALLEL_SAFE_FALSE_DECLARATION)

    # ── 최종 판정 ──────────────────────────────────────────────────────────
    reason_codes = list(dict.fromkeys(reason_codes))  # 중복 제거, 순서 유지

    # BLOCK 사유가 있는 경우 (룰 2~6, 8, 9)
    block_reasons = {
        REASON_DUPLICATE_FILE,
        REASON_DUPLICATE_FUNCTION,
        REASON_DUPLICATE_VERIFIER,
        REASON_MISSING_DEPENDENCY,
        REASON_QUEUE_POSITION_MISSING,
        REASON_PARALLEL_SAFE_FALSE_DECLARATION,
    }
    if any(r in block_reasons for r in reason_codes):
        # §3.e: 순수 read-only report task는 BLOCK → ALLOW (또는 LIMITED_PARALLEL) 다운그레이드.
        # MISSING_DEPENDENCY는 read-only 예외 대상이 아님 — dependency는 그대로 강제.
        if (
            _is_pure_read_only(metadata)
            and REASON_MISSING_DEPENDENCY not in reason_codes
        ):
            return TopologyDecision(
                decision=ALLOW,
                reason_codes=reason_codes,
                overlap_score=total_overlap_score,
                conflicting_tasks=conflicting_tasks,
                metadata=metadata,
                active_tasks_snapshot=active_tasks,
            )
        return TopologyDecision(
            decision=BLOCK,
            reason_codes=reason_codes,
            overlap_score=total_overlap_score,
            conflicting_tasks=conflicting_tasks,
            metadata=metadata,
            active_tasks_snapshot=active_tasks,
        )

    # REQUIRE_CHAIR_OVERRIDE (룰 7)
    if REASON_CHERRY_PICK_REQUESTED in reason_codes:
        return TopologyDecision(
            decision=REQUIRE_CHAIR_OVERRIDE,
            reason_codes=reason_codes,
            overlap_score=total_overlap_score,
            conflicting_tasks=conflicting_tasks,
            metadata=metadata,
            active_tasks_snapshot=active_tasks,
        )

    # LIMITED_PARALLEL (룰 5)
    if limited_lifecycle_conflict:
        return TopologyDecision(
            decision=LIMITED_PARALLEL,
            reason_codes=reason_codes,
            overlap_score=total_overlap_score,
            conflicting_tasks=conflicting_tasks,
            metadata=metadata,
            active_tasks_snapshot=active_tasks,
        )

    # ALLOW
    return TopologyDecision(
        decision=ALLOW,
        reason_codes=reason_codes,
        overlap_score=total_overlap_score,
        conflicting_tasks=conflicting_tasks,
        metadata=metadata,
        active_tasks_snapshot=active_tasks,
    )


def audit_log(
    task_id: str,
    decision: TopologyDecision,
    *,
    override_used: bool = False,
    open_prs_snapshot: Optional[list[dict]] = None,
    dry_run: bool = False,
) -> Path:
    """append-only jsonl 기록. 회장 §5 9 필드 + Phase 1 amendment dry_run 필드.

    {task_id, decision, reason_codes, overlap_score, conflicting_tasks,
     active_tasks_snapshot, open_prs_snapshot, override_used, timestamp,
     dry_run(optional)}

    AUDIT_LOG_PATH 부모 디렉토리 자동 생성.
    timestamp: KST ISO 8601 (+09:00).
    open_prs_snapshot이 None이면 빈 리스트 — gh CLI 호출은 호출자가 (호출 비용 절약).

    회장 amendment 2026-05-08T11:32 (Phase 1):
    - dry_run=True 시 dry_run=true 필드를 추가 기록 (production 차단 동작 X).
    """
    # KST = UTC+09:00
    KST = timezone(timedelta(hours=9))
    now_kst = datetime.now(KST)
    timestamp = now_kst.isoformat()

    record = {
        "task_id": task_id,
        "decision": decision.decision,
        "reason_codes": decision.reason_codes,
        "overlap_score": round(decision.overlap_score, 4),
        "conflicting_tasks": decision.conflicting_tasks,
        "active_tasks_snapshot": decision.active_tasks_snapshot,
        "open_prs_snapshot": open_prs_snapshot if open_prs_snapshot is not None else [],
        "override_used": override_used,
        "timestamp": timestamp,
    }
    if dry_run:
        record["dry_run"] = True

    # 부모 디렉토리 자동 생성
    AUDIT_LOG_PATH.parent.mkdir(parents=True, exist_ok=True)

    with open(AUDIT_LOG_PATH, "a", encoding="utf-8") as f:
        f.write(json.dumps(record, ensure_ascii=False) + "\n")

    logger.info(
        f"[merge-topology-gate] audit logged: task={task_id} decision={decision.decision}"
        f"{' (dry_run)' if dry_run else ''}"
    )
    return AUDIT_LOG_PATH


def run_gate(
    task_id: str,
    task_desc: str,
    *,
    override: bool = False,
    override_merge_topology_gate: bool = False,
    chair_override_reason: Optional[str] = None,
    chair_approved_by: str = "chair",
    timer_path: Optional[Path] = None,
) -> tuple[TopologyDecision, bool]:
    """편의 함수: parse → classify → audit_log를 한 번에.

    반환:
      (decision, allowed)
        allowed=True  → dispatch 진행
        allowed=False → dispatch 거부

    BLOCK → allowed=False
        단, override_merge_topology_gate=True + chair_override_reason 명시 시
        → ALLOW_WITH_CHAIR_OVERRIDE (§3.f), audit 9 필드 박제
    REQUIRE_CHAIR_OVERRIDE → override=True면 allowed=True (audit에 override_used=true)
                            → override=False면 allowed=False
    LIMITED_PARALLEL → allowed=True (queue position 검증은 classify가 이미 했음)
    ALLOW → allowed=True
    """
    # 1. parse
    metadata = parse_topology_metadata(task_desc)

    # 2. load active tasks
    active_tasks = load_active_tasks(timer_path=timer_path, current_task_id=task_id)

    # 3. classify
    decision = classify(metadata, active_tasks)
    original_decision_value = decision.decision
    original_reason_codes = list(decision.reason_codes)

    # 4. 판정 → allowed 결정
    chair_override_applied = False
    if decision.decision == BLOCK:
        # §3.f: BLOCK override 허용 룰 (회장 명시 + 플래그)
        if override_merge_topology_gate and chair_override_reason:
            decision = TopologyDecision(
                decision=ALLOW_WITH_CHAIR_OVERRIDE,
                reason_codes=original_reason_codes,
                overlap_score=decision.overlap_score,
                conflicting_tasks=list(decision.conflicting_tasks),
                metadata=decision.metadata,
                active_tasks_snapshot=decision.active_tasks_snapshot,
            )
            allowed = True
            chair_override_applied = True
        else:
            allowed = False
    elif decision.decision == REQUIRE_CHAIR_OVERRIDE:
        allowed = override
    elif decision.decision == LIMITED_PARALLEL:
        allowed = True
    else:  # ALLOW or ALLOW_WITH_CHAIR_OVERRIDE
        allowed = True

    # 5. audit log
    try:
        if chair_override_applied:
            _audit_log_chair_override(
                task_id=task_id,
                decision=decision,
                original_decision=original_decision_value,
                original_reason_codes=original_reason_codes,
                override_reason=chair_override_reason or "",
                approved_by=chair_approved_by,
            )
        else:
            audit_log(
                task_id=task_id,
                decision=decision,
                override_used=(override and original_decision_value == REQUIRE_CHAIR_OVERRIDE),
            )
    except Exception as e:
        logger.error(f"[merge-topology-gate] audit_log 실패: {e}", exc_info=True)

    return decision, allowed


def _audit_log_chair_override(
    task_id: str,
    decision: TopologyDecision,
    *,
    original_decision: str,
    original_reason_codes: list[str],
    override_reason: str,
    approved_by: str = "chair",
) -> Path:
    """§3.f BLOCK override 전용 audit. 회장 9 필드 박제."""
    KST = timezone(timedelta(hours=9))
    now_kst = datetime.now(KST)
    timestamp = now_kst.isoformat()

    record = {
        "task_id": task_id,
        "original_decision": original_decision,
        "override_used": True,
        "override_decision": ALLOW_WITH_CHAIR_OVERRIDE,
        "override_reason": override_reason,
        "approved_by": approved_by,
        "original_reason_codes": original_reason_codes,
        "conflicting_tasks": list(decision.conflicting_tasks),
        "decision": decision.decision,
        "reason_codes": list(decision.reason_codes),
        "overlap_score": round(decision.overlap_score, 4),
        "active_tasks_snapshot": decision.active_tasks_snapshot,
        "timestamp": timestamp,
    }

    AUDIT_LOG_PATH.parent.mkdir(parents=True, exist_ok=True)
    with open(AUDIT_LOG_PATH, "a", encoding="utf-8") as f:
        f.write(json.dumps(record, ensure_ascii=False) + "\n")

    logger.info(
        f"[merge-topology-gate] CHAIR_OVERRIDE audit logged: task={task_id} "
        f"original={original_decision} → ALLOW_WITH_CHAIR_OVERRIDE"
    )
    return AUDIT_LOG_PATH


# ─── Phase 1 dry-run CLI helper (회장 amendment 2026-05-08T11:32) ─────────
# dispatch.py 미경유. classifier만 직접 호출하여 ALLOW/LIMITED/BLOCK/REQUIRE 출력.
# audit jsonl에는 dry_run=true 필드로 결과 기록 (production 차단 동작 X).

def _dry_run_from_task_file(
    task_file: Path,
    *,
    task_id: Optional[str] = None,
    timer_path: Optional[Path] = None,
    write_audit: bool = True,
) -> tuple[TopologyDecision, dict]:
    """task spec 파일을 읽어 classifier만 직접 호출 (dispatch.py 미경유).

    반환: (decision, summary_dict)
    summary_dict는 CLI 출력용 평탄 dict.
    """
    task_file = Path(task_file)
    if not task_file.exists():
        raise FileNotFoundError(f"task file not found: {task_file}")

    task_desc = task_file.read_text(encoding="utf-8")

    # task_id 추출: 명시값 > 파일명 패턴 > task spec 첫 줄
    if task_id is None:
        m = re.match(r"task-[\w\d\+\.]+", task_file.stem)
        if m:
            task_id = m.group(0)
        else:
            head_match = re.search(r"task-[\w\d\+\.]+", task_desc[:500])
            task_id = head_match.group(0) if head_match else task_file.stem

    metadata = parse_topology_metadata(task_desc)
    active_tasks = load_active_tasks(timer_path=timer_path, current_task_id=task_id)
    decision = classify(metadata, active_tasks)

    if write_audit:
        try:
            audit_log(
                task_id=task_id,
                decision=decision,
                override_used=False,
                dry_run=True,
            )
        except Exception as e:
            logger.error(f"[merge-topology-gate] dry-run audit_log 실패: {e}", exc_info=True)

    summary = {
        "task_id": task_id,
        "task_file": str(task_file),
        "decision": decision.decision,
        "reason_codes": decision.reason_codes,
        "overlap_score": round(decision.overlap_score, 4),
        "conflicting_tasks": decision.conflicting_tasks,
        "metadata_keys_present": sorted(list(metadata.keys())),
        "active_tasks_count": len(active_tasks),
        "dry_run": True,
    }
    return decision, summary


def main(argv: Optional[list[str]] = None) -> int:
    """CLI entry point — 회장 amendment 2026-05-08T11:32 Phase 1 dry-run 한정.

    사용:
      python utils/merge_topology_gate.py --dry-run --task-file <path>

    종료 코드:
      0  ALLOW / LIMITED_PARALLEL
      2  BLOCK / REQUIRE_CHAIR_OVERRIDE (dry-run 표시; production 차단 동작 X)
      1  실행 자체 오류 (파일 누락 등)
    """
    import argparse
    parser = argparse.ArgumentParser(
        prog="merge_topology_gate",
        description=(
            "Merge Topology Gate dry-run helper (Phase 1, dispatch.py 미경유). "
            "회장 §1 정책 본체 enforcement는 Phase 2 (task-2504)에서 수행."
        ),
    )
    parser.add_argument("--dry-run", action="store_true", required=True,
                        help="필수. dispatch.py 미경유 dry-run mode.")
    parser.add_argument("--task-file", required=True,
                        help="검사할 task spec 마크다운 파일 경로.")
    parser.add_argument("--task-id", default=None,
                        help="명시 task_id (기본: 파일명에서 추출).")
    parser.add_argument("--timer-path", default=None,
                        help="task-timers.json 경로 override (테스트용).")
    parser.add_argument("--no-audit", action="store_true",
                        help="audit jsonl 미기록 (테스트용).")
    parser.add_argument("--json", action="store_true",
                        help="결과를 JSON 한 줄로 출력.")

    args = parser.parse_args(argv)

    try:
        decision, summary = _dry_run_from_task_file(
            task_file=Path(args.task_file),
            task_id=args.task_id,
            timer_path=Path(args.timer_path) if args.timer_path else None,
            write_audit=(not args.no_audit),
        )
    except FileNotFoundError as e:
        print(json.dumps({"status": "error", "message": str(e)}, ensure_ascii=False))
        return 1
    except Exception as e:
        print(json.dumps({"status": "error", "message": f"dry-run 실패: {e}"}, ensure_ascii=False))
        return 1

    if args.json:
        print(json.dumps({"status": "ok", **summary}, ensure_ascii=False))
    else:
        print(f"[merge-topology-gate dry-run]")
        print(f"  task_id          : {summary['task_id']}")
        print(f"  task_file        : {summary['task_file']}")
        print(f"  decision         : {summary['decision']}")
        print(f"  reason_codes     : {summary['reason_codes']}")
        print(f"  overlap_score    : {summary['overlap_score']}")
        print(f"  conflicting_tasks: {summary['conflicting_tasks']}")
        print(f"  metadata_keys    : {summary['metadata_keys_present']}")
        print(f"  active_tasks     : {summary['active_tasks_count']}")
        print(f"  (production 차단 동작 X — Phase 1 dry-run 한정)")

    if decision.decision in (BLOCK, REQUIRE_CHAIR_OVERRIDE):
        return 2
    return 0


if __name__ == "__main__":
    import sys
    sys.exit(main())
