"""utils/cron_targeting_audit.py — task-2526 §4 + §5.

회장 §본질 (2026-05-10):
  독립 bot task cron에 ``--session`` 옵션이 잘못 붙어 봇이 아니라 **아누 자기 세션이
  resume된 사고**를 다시 일으키지 않기 위한 audit / kill / recover helper.

본 모듈의 4가지 책임:
  1. **Audit JSONL 기록 (회장 §4)** —
     ``memory/orchestration-audit/cron-targeting-audit.jsonl``에 8 필드 + 보강 2 필드를
     append한다. token / raw key / session secret 원문은 절대 기록하지 않는다.
  2. **Sanitize helper** — ``--key`` / ``--session`` / 환경변수 ``*KEY*`` / ``*TOKEN*``
     값을 정적 패턴으로 ``<redacted>``로 치환한다.
  3. **Misroute detection (회장 §5)** — 잘못된 ``--session`` cron PID를 감지하고
     soft kill (dry-run 우선)을 제안한다.
  4. **Evidence-based recover (회장 §5)** — worktree / PR / CI / audit 7 signal로
     실제 변경 발생 여부를 점검하고 ``contaminated_execution`` 또는 ``clean_abort``로
     분류한다.

회장 §금지:
  - ❌ token / raw key / session secret 원문 기록
  - ❌ admin override / owner_pat fallback
  - ❌ dispatch.py / cokacdir CLI 본체 수정
  - ❌ Critical 7종 외 회장 보고
  - ❌ SIGKILL (soft kill = SIGTERM only, dry-run default)

CLI:
  python3 utils/cron_targeting_audit.py --append --task-kind merge_task ...
  python3 utils/cron_targeting_audit.py --detect-misroute --pid 448820
  python3 utils/cron_targeting_audit.py --recover --task-id task-2526
"""
from __future__ import annotations

import argparse
import fcntl
import hashlib
import json
import os
import re
import signal
import sys
from dataclasses import asdict, dataclass, field
from datetime import datetime, timezone
from pathlib import Path
from typing import Callable, Mapping, Optional, Sequence, Tuple

__all__ = [
    "DEFAULT_AUDIT_JSONL_PATH",
    "REQUIRED_AUDIT_FIELDS_2526",
    "TASK_KIND_INDEPENDENT",
    "TASK_KIND_MERGE",
    "TASK_KIND_BOT",
    "TASK_KIND_FOLLOWUP_RO",
    "ACTOR_BOT_SESSION",
    "ACTOR_ANU_SESSION",
    "BlockedReason",
    "CronTargetingAuditRecord",
    "MisrouteReport",
    "RecoveryPlan",
    "hash_bot_key",
    "sanitize_command_preview",
    "ensure_no_raw_secrets",
    "build_audit_record",
    "append_audit_jsonl",
    "detect_misrouted_session",
    "soft_kill_misrouted",
    "evidence_based_recover",
]


# ---------------------------------------------------------------------------
# Constants — task_kind enum 5종 (회장 §3)
# ---------------------------------------------------------------------------

TASK_KIND_INDEPENDENT = "independent_task"
TASK_KIND_MERGE = "merge_task"
TASK_KIND_BOT = "bot_task"
TASK_KIND_FOLLOWUP_RO = "followup_readonly"
TASK_KIND_HUMAN_RESPONSE = "human_response"

ALLOWED_TASK_KINDS: Tuple[str, ...] = (
    TASK_KIND_INDEPENDENT,
    TASK_KIND_MERGE,
    TASK_KIND_BOT,
    TASK_KIND_FOLLOWUP_RO,
    TASK_KIND_HUMAN_RESPONSE,
)

ACTOR_BOT_SESSION = "bot_session"
ACTOR_ANU_SESSION = "anu_session"

DEFAULT_AUDIT_JSONL_PATH = Path(
    "memory/orchestration-audit/cron-targeting-audit.jsonl"
)


# ---------------------------------------------------------------------------
# BlockedReason — preflight 가 가드한 이유 enum (회장 §2 회귀 6 정합)
# ---------------------------------------------------------------------------

class BlockedReason:
    """짧은 enum 모음 — 새 abstraction이 아닌 좁은 상수 묶음."""

    INDEPENDENT_TASK_WITH_SESSION = "independent_task_must_not_resume_anu_session"
    MERGE_TASK_WITH_SESSION = "merge_task_must_not_inherit_anu_session"
    BOT_KEY_MISSING_FOR_BOT_TASK = "bot_key_missing_for_bot_task"
    TARGET_SESSION_OWNER_MISMATCH = "target_bot_session_owner_mismatch"
    OWNER_PAT_FALLBACK_DETECTED = "owner_pat_fallback_path_detected"

    ALL: Tuple[str, ...] = (
        INDEPENDENT_TASK_WITH_SESSION,
        MERGE_TASK_WITH_SESSION,
        BOT_KEY_MISSING_FOR_BOT_TASK,
        TARGET_SESSION_OWNER_MISMATCH,
        OWNER_PAT_FALLBACK_DETECTED,
    )


# ---------------------------------------------------------------------------
# Required audit fields (회장 §4 — 정확히 8개 + 보강 2개)
# ---------------------------------------------------------------------------

REQUIRED_AUDIT_FIELDS_2526: Tuple[str, ...] = (
    "cron_id",
    "target_bot",
    "target_bot_key_hash",
    "session_id_present",
    "session_id_allowed",
    "task_kind",
    "actor_expected",
    "actor_actual_if_known",
)

SUPPLEMENTARY_AUDIT_FIELDS_2526: Tuple[str, ...] = (
    "command_preview_sanitized",
    "blocked_reason",
)


# ---------------------------------------------------------------------------
# Sanitization — token / raw key / session secret 원문 노출 0 (회장 §4 보안)
# ---------------------------------------------------------------------------

# Regex patterns for raw secret detection (정적 검사용 — sanitize_command_preview에서 사용)
_RAW_TOKEN_PATTERNS: Tuple[re.Pattern, ...] = (
    re.compile(r"ghp_[A-Za-z0-9]{20,}"),
    re.compile(r"ghs_[A-Za-z0-9]{20,}"),
    re.compile(r"github_pat_[A-Za-z0-9_]{20,}"),
    re.compile(r"\bsk-[A-Za-z0-9]{20,}\b"),
    re.compile(r"\bxoxb-[A-Za-z0-9-]{20,}\b"),
)

_KEY_FLAG_RE = re.compile(r"(--key)(\s+|=)(['\"].*?['\"]|\S+)")
_SESSION_FLAG_RE = re.compile(r"(--session)(\s+|=)(['\"].*?['\"]|\S+)")
_KEY_ENV_RE = re.compile(
    r"\b([A-Z][A-Z0-9_]*(?:KEY|TOKEN|SECRET|PASSWORD)[A-Z0-9_]*)=(['\"].*?['\"]|\S+)"
)


def hash_bot_key(bot_key: Optional[str]) -> Optional[str]:
    """bot_key를 sha256(첫 16 hex)으로 마스킹. None / 빈 문자열은 None 반환."""
    if not bot_key:
        return None
    return hashlib.sha256(bot_key.encode("utf-8")).hexdigest()[:16]


def _redact_uuid(value: str) -> str:
    """UUID-shaped 값을 short prefix로 redact."""
    if len(value) >= 8:
        return f"{value[:4]}…(redacted)"
    return "(redacted)"


def sanitize_command_preview(command: str, max_prompt_chars: int = 80) -> str:
    """``cokacdir --cron`` 형태의 command 문자열을 audit-safe 형태로 마스킹.

    - ``--key <value>`` → ``--key <hash>``
    - ``--session <uuid>`` → ``--session <prefix>…(redacted)``
    - ``XXX_KEY=…`` / ``XXX_TOKEN=…`` 환경변수 → ``XXX_KEY=<redacted>``
    - 알려진 token prefix (ghp_/ghs_/sk-/xoxb-) → ``<redacted>``
    - prompt 본문은 80자로 절단 (과도한 문맥 노출 차단).
    """
    s = command

    def _strip_quotes(raw: str) -> str:
        # shlex.quote로 둘러싼 따옴표를 제거하여 hash/redact 일관성 보장.
        if len(raw) >= 2 and raw[0] == raw[-1] and raw[0] in ("'", '"'):
            return raw[1:-1]
        return raw

    # 1. --key <value>
    def _key_sub(m: re.Match) -> str:
        raw = _strip_quotes(m.group(3))
        return f"{m.group(1)} <hash:{hash_bot_key(raw)}>"
    s = _KEY_FLAG_RE.sub(_key_sub, s)

    # 2. --session <uuid>
    def _session_sub(m: re.Match) -> str:
        return f"{m.group(1)} {_redact_uuid(_strip_quotes(m.group(3)))}"
    s = _SESSION_FLAG_RE.sub(_session_sub, s)

    # 3. *_KEY= / *_TOKEN= / *_SECRET= / *_PASSWORD= 환경변수
    def _env_sub(m: re.Match) -> str:
        return f"{m.group(1)}=<redacted>"
    s = _KEY_ENV_RE.sub(_env_sub, s)

    # 4. 알려진 raw token prefix
    for pat in _RAW_TOKEN_PATTERNS:
        s = pat.sub("<redacted>", s)

    # 5. prompt 절단 (--cron "<...>"는 그대로 두되 80자 초과시 ellipsis)
    cron_quote_re = re.compile(r'(--cron\s+)("([^"]*)"|\'([^\']*)\')')

    def _cron_sub(m: re.Match) -> str:
        prefix = m.group(1)
        body = m.group(3) if m.group(3) is not None else m.group(4)
        if body is None:
            return m.group(0)
        if len(body) > max_prompt_chars:
            body = body[:max_prompt_chars] + "…"
        return f'{prefix}"{body}"'
    s = cron_quote_re.sub(_cron_sub, s)

    return s


def ensure_no_raw_secrets(payload: object) -> None:
    """payload(any JSON-serializable)에 raw secret pattern이 남아있으면 ValueError.

    회장 §4 보안 정합 — audit JSONL append 직전 호출.

    sanitize_command_preview를 거친 값은 ``<hash:…>`` / ``<redacted>`` / ``…(redacted)``
    형태이므로 통과한다. 마스킹되지 않은 raw key/session/token만 차단한다.
    """
    text = json.dumps(payload, ensure_ascii=False, default=str)
    for pat in _RAW_TOKEN_PATTERNS:
        m = pat.search(text)
        if m:
            raise ValueError(
                f"raw secret pattern detected in audit payload "
                f"(pattern={pat.pattern!r}); refusing to write JSONL"
            )
    # raw --key <value> (값이 < 로 시작하지 않으면 마스킹 안 된 raw)
    if re.search(r"--key(?:\s+|=)(?!<)\S", text):
        raise ValueError(
            "raw --key value detected in audit payload (must be hashed)"
        )
    # session UUID full form은 절대 허용 X
    full_uuid_re = re.compile(
        r"\b[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}\b"
    )
    if full_uuid_re.search(text):
        raise ValueError(
            "raw session UUID detected in audit payload (must be redacted)"
        )


# ---------------------------------------------------------------------------
# Audit record dataclass (회장 §4 — 8 필드 + 2 보강)
# ---------------------------------------------------------------------------

@dataclass(frozen=True)
class CronTargetingAuditRecord:
    """task-2526 audit record. token/session secret raw 0 노출."""

    cron_id: Optional[str]
    target_bot: Optional[str]
    target_bot_key_hash: Optional[str]
    session_id_present: bool
    session_id_allowed: bool
    task_kind: str
    actor_expected: str
    actor_actual_if_known: Optional[str]
    command_preview_sanitized: str
    blocked_reason: Optional[str]
    ts: str = field(default_factory=lambda: datetime.now(tz=timezone.utc)
                    .isoformat().replace("+00:00", "Z"))

    def to_dict(self) -> dict:
        return asdict(self)


def build_audit_record(
    *,
    cron_id: Optional[str],
    target_bot: Optional[str],
    bot_key: Optional[str],
    session_id: Optional[str],
    session_id_allowed: bool,
    task_kind: str,
    actor_expected: str,
    actor_actual_if_known: Optional[str],
    command_preview: str,
    blocked_reason: Optional[str],
) -> CronTargetingAuditRecord:
    """raw bot_key / raw session_id를 안전하게 hash/flag로 변환해 record 빌드."""
    if task_kind not in ALLOWED_TASK_KINDS:
        raise ValueError(
            f"task_kind must be one of {ALLOWED_TASK_KINDS}, got {task_kind!r}"
        )
    if blocked_reason is not None and blocked_reason not in BlockedReason.ALL:
        raise ValueError(
            f"blocked_reason must be one of {BlockedReason.ALL} or None, "
            f"got {blocked_reason!r}"
        )

    sanitized = sanitize_command_preview(command_preview)
    record = CronTargetingAuditRecord(
        cron_id=cron_id,
        target_bot=target_bot,
        target_bot_key_hash=hash_bot_key(bot_key),
        session_id_present=session_id is not None and session_id != "",
        session_id_allowed=bool(session_id_allowed),
        task_kind=task_kind,
        actor_expected=actor_expected,
        actor_actual_if_known=actor_actual_if_known,
        command_preview_sanitized=sanitized,
        blocked_reason=blocked_reason,
    )
    return record


def append_audit_jsonl(
    record: CronTargetingAuditRecord,
    *,
    audit_path: Optional[Path] = None,
) -> Path:
    """fcntl 락으로 record를 JSONL append. raw secret 정적 검증 후 기록."""
    target = Path(audit_path) if audit_path is not None else DEFAULT_AUDIT_JSONL_PATH

    payload = record.to_dict()
    ensure_no_raw_secrets(payload)

    parent = target.parent
    if str(parent):
        parent.mkdir(parents=True, exist_ok=True)

    line = json.dumps(payload, ensure_ascii=False) + "\n"
    with open(target, "a", encoding="utf-8") as fh:
        fcntl.flock(fh.fileno(), fcntl.LOCK_EX)
        try:
            fh.write(line)
            fh.flush()
            os.fsync(fh.fileno())
        finally:
            fcntl.flock(fh.fileno(), fcntl.LOCK_UN)

    return target


# ---------------------------------------------------------------------------
# Misroute detection / soft kill (회장 §5)
# ---------------------------------------------------------------------------

@dataclass(frozen=True)
class MisrouteReport:
    pid: int
    has_session_flag: bool
    session_id_redacted: Optional[str]
    suspected_misroute: bool
    cmdline_preview_sanitized: str
    reason: Optional[str]


def _read_proc_cmdline(pid: int, *, proc_root: Path = Path("/proc")) -> Optional[str]:
    cmdfile = proc_root / str(pid) / "cmdline"
    try:
        raw = cmdfile.read_bytes()
    except (FileNotFoundError, PermissionError, OSError):
        return None
    return raw.replace(b"\x00", b" ").decode("utf-8", errors="replace").strip()


def detect_misrouted_session(
    pid: int,
    *,
    cmdline_reader: Callable[[int], Optional[str]] = _read_proc_cmdline,
) -> MisrouteReport:
    """PID의 cmdline을 검사해 ``--session <uuid>`` 동반 cron 흐름이면 misroute로 분류.

    cmdline_reader는 테스트용 주입 포인트.
    """
    cmdline = cmdline_reader(pid)
    if cmdline is None:
        return MisrouteReport(
            pid=pid,
            has_session_flag=False,
            session_id_redacted=None,
            suspected_misroute=False,
            cmdline_preview_sanitized="<unreachable>",
            reason="proc_cmdline_unreadable",
        )

    sanitized = sanitize_command_preview(cmdline)
    has_session = bool(_SESSION_FLAG_RE.search(cmdline))
    session_redacted: Optional[str] = None
    if has_session:
        m = _SESSION_FLAG_RE.search(cmdline)
        if m:
            session_redacted = _redact_uuid(m.group(3))

    is_cron_dispatch = (
        ("cokacdir" in cmdline and "--cron" in cmdline)
        or ("safe_cron_dispatch" in cmdline)
    )

    suspected = has_session and is_cron_dispatch
    reason = (
        "cron_dispatch_with_session_flag"
        if suspected
        else None
    )
    return MisrouteReport(
        pid=pid,
        has_session_flag=has_session,
        session_id_redacted=session_redacted,
        suspected_misroute=suspected,
        cmdline_preview_sanitized=sanitized,
        reason=reason,
    )


def soft_kill_misrouted(
    pid: int,
    *,
    dry_run: bool = True,
    killer: Callable[[int, int], None] = os.kill,
    audit_path: Optional[Path] = None,
) -> dict:
    """misroute 의심 PID에 SIGTERM(soft) 신호. dry_run이면 호출만 시뮬레이트.

    SIGKILL 절대 사용 안 함 (회장 §5).
    """
    report = detect_misrouted_session(pid)
    action = "dry_run" if dry_run else "sigterm"
    sent = False
    if report.suspected_misroute and not dry_run:
        try:
            killer(pid, signal.SIGTERM)
            sent = True
        except (ProcessLookupError, PermissionError, OSError) as e:
            action = f"sigterm_failed:{type(e).__name__}"

    result = {
        "pid": pid,
        "suspected_misroute": report.suspected_misroute,
        "action": action,
        "signal_sent": sent,
        "reason": report.reason,
        "session_id_redacted": report.session_id_redacted,
        "cmdline_preview_sanitized": report.cmdline_preview_sanitized,
    }

    # audit append (작은 helper record — main 8 필드 schema와 별도)
    audit_target = Path(audit_path) if audit_path is not None else DEFAULT_AUDIT_JSONL_PATH
    audit_target.parent.mkdir(parents=True, exist_ok=True)
    audit_payload = {
        "ts": datetime.now(tz=timezone.utc).isoformat().replace("+00:00", "Z"),
        "kind": "soft_kill_audit_2526",
        **result,
    }
    # 회장 §4 — 모든 audit 기록 직전 raw secret 검증 (sanitize 실패 케이스 차단).
    ensure_no_raw_secrets(audit_payload)
    line = json.dumps(audit_payload, ensure_ascii=False) + "\n"
    with open(audit_target, "a", encoding="utf-8") as fh:
        fcntl.flock(fh.fileno(), fcntl.LOCK_EX)
        try:
            fh.write(line)
            fh.flush()
        finally:
            fcntl.flock(fh.fileno(), fcntl.LOCK_UN)

    return result


# ---------------------------------------------------------------------------
# Evidence-based recover (회장 §5)
# ---------------------------------------------------------------------------

@dataclass(frozen=True)
class RecoveryPlan:
    task_id: str
    classification: str  # "clean_abort" | "contaminated_execution" | "no_evidence"
    signals_checked: Tuple[str, ...]
    signals_with_evidence: Tuple[str, ...]
    next_steps: Tuple[str, ...]
    redispatch_required: bool


# 7 signals — feedback_bot_no_response_not_dead_260509.md 정합 (read-only 참조)
_RECOVERY_SIGNALS: Tuple[str, ...] = (
    "worktree_diff",
    "worktree_untracked",
    "branch_unpushed_commits",
    "remote_branch_exists",
    "open_pr_for_task",
    "ci_run_for_branch",
    "audit_jsonl_evidence",
)


def evidence_based_recover(
    task_id: str,
    *,
    signals: Optional[Mapping[str, bool]] = None,
) -> RecoveryPlan:
    """7 signal mapping으로 회복 계획 산출.

    signals dict에 ``True``인 signal이 1개라도 있으면 ``contaminated_execution``,
    모두 ``False``면 ``clean_abort``, signals=None이면 ``no_evidence``로 분류.
    """
    if not isinstance(task_id, str) or not task_id.strip():
        raise ValueError("task_id must be a non-empty str")

    if signals is None:
        return RecoveryPlan(
            task_id=task_id,
            classification="no_evidence",
            signals_checked=tuple(),
            signals_with_evidence=tuple(),
            next_steps=(
                "1) collect 7 signals from worktree/PR/CI/audit",
                "2) re-run evidence_based_recover with signals dict",
            ),
            redispatch_required=False,
        )

    checked = tuple(s for s in _RECOVERY_SIGNALS if s in signals)
    with_evidence = tuple(s for s in checked if signals.get(s, False))

    if with_evidence:
        return RecoveryPlan(
            task_id=task_id,
            classification="contaminated_execution",
            signals_checked=checked,
            signals_with_evidence=with_evidence,
            next_steps=(
                "1) freeze branch / no force-push / no rebase",
                "2) report contaminated_execution to chairman chat (Critical 7종 X)",
                "3) verify whether commit/push/merge happened on bot identity",
                "4) if owner_pat fallback detected → log capability_gap, do not redispatch silently",
                "5) once chairman acknowledges, redispatch via safe_cron_dispatch with proper bot key",
            ),
            redispatch_required=True,
        )

    return RecoveryPlan(
        task_id=task_id,
        classification="clean_abort",
        signals_checked=checked,
        signals_with_evidence=tuple(),
        next_steps=(
            "1) no commit/push/PR/CI/audit evidence → safe to abort",
            "2) redispatch via safe_cron_dispatch with correct bot key + no --session",
        ),
        redispatch_required=True,
    )


# ---------------------------------------------------------------------------
# CLI surface (read-only utility — append 한 건씩 검증용)
# ---------------------------------------------------------------------------

def _build_arg_parser() -> argparse.ArgumentParser:
    parser = argparse.ArgumentParser(
        prog="cron_targeting_audit",
        description="task-2526 cron targeting audit / kill / recover helper",
    )
    sub = parser.add_subparsers(dest="cmd", required=True)

    p_append = sub.add_parser("append", help="audit JSONL append 1건")
    p_append.add_argument("--cron-id", default=None)
    p_append.add_argument("--target-bot", default=None)
    p_append.add_argument("--bot-key", default=None,
                          help="raw key (해시되어 기록됨)")
    p_append.add_argument("--session-id", default=None,
                          help="raw session id (포함 여부만 기록됨)")
    p_append.add_argument("--session-allowed", action="store_true")
    p_append.add_argument("--task-kind", required=True, choices=ALLOWED_TASK_KINDS)
    p_append.add_argument("--actor-expected", required=True,
                          choices=[ACTOR_BOT_SESSION, ACTOR_ANU_SESSION])
    p_append.add_argument("--actor-actual", default=None)
    p_append.add_argument("--command-preview", required=True)
    p_append.add_argument("--blocked-reason", default=None)
    p_append.add_argument("--audit-path", default=None)

    p_detect = sub.add_parser("detect-misroute")
    p_detect.add_argument("--pid", type=int, required=True)

    p_kill = sub.add_parser("soft-kill")
    p_kill.add_argument("--pid", type=int, required=True)
    p_kill.add_argument("--apply", action="store_true",
                        help="기본 dry-run; --apply 시 SIGTERM 실송")

    p_rec = sub.add_parser("recover")
    p_rec.add_argument("--task-id", required=True)
    p_rec.add_argument("--signals-json", default=None,
                       help='dict JSON e.g. \'{"worktree_diff":false,...}\'')
    return parser


def _cli_main(argv: Optional[Sequence[str]] = None) -> int:
    parser = _build_arg_parser()
    args = parser.parse_args(argv)

    if args.cmd == "append":
        record = build_audit_record(
            cron_id=args.cron_id,
            target_bot=args.target_bot,
            bot_key=args.bot_key,
            session_id=args.session_id,
            session_id_allowed=bool(args.session_allowed),
            task_kind=args.task_kind,
            actor_expected=args.actor_expected,
            actor_actual_if_known=args.actor_actual,
            command_preview=args.command_preview,
            blocked_reason=args.blocked_reason,
        )
        path = append_audit_jsonl(
            record,
            audit_path=Path(args.audit_path) if args.audit_path else None,
        )
        print(json.dumps({"ok": True, "audit_path": str(path)}, ensure_ascii=False))
        return 0

    if args.cmd == "detect-misroute":
        report = detect_misrouted_session(args.pid)
        print(json.dumps(asdict(report), ensure_ascii=False))
        return 0

    if args.cmd == "soft-kill":
        result = soft_kill_misrouted(args.pid, dry_run=not args.apply)
        print(json.dumps(result, ensure_ascii=False))
        return 0

    if args.cmd == "recover":
        sig_map: Optional[Mapping[str, bool]] = None
        if args.signals_json:
            sig_map = json.loads(args.signals_json)
        plan = evidence_based_recover(args.task_id, signals=sig_map)
        print(json.dumps({
            "task_id": plan.task_id,
            "classification": plan.classification,
            "signals_checked": list(plan.signals_checked),
            "signals_with_evidence": list(plan.signals_with_evidence),
            "next_steps": list(plan.next_steps),
            "redispatch_required": plan.redispatch_required,
        }, ensure_ascii=False))
        return 0

    parser.print_help()
    return 2


if __name__ == "__main__":
    sys.exit(_cli_main())
