#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""scan_anu_v3_cli_output_sinks_2616.py — task-2616 READ-ONLY scanner.

회장 승인 범위(task-2616 §1~§3): ANU v3 CLI entrypoint 전수 read-only scan.
- scan 대상 코드는 **read-only·byte-0·수정 0**. 본 스크립트는 어떤 대상 .py 도
  write 모드로 열지 않는다(오직 'r' read + ast.parse).
- 본 스크립트 자체는 argparse/main 을 가지나, 이는 *scanner* entrypoint 이며
  task-2616 §4 의 "공통 guard = import-only" 제약 대상이 아니다(guard 가 아님).
- 산출물은 memory/events·memory/fixtures·memory/reports 의 task-2616.* 만 write.

검출:
  1. CLI 프레임워크: argparse / click / typer
  2. entrypoint: if __name__=='__main__' · def main · def _main
  3. 출력 인자: --out / --output / output_file / report_path / outfile (옵션 dest 포함)
  4. write sink: Path(...).write_text/write_bytes · open(..., 'w'|'a'|'x'|...) · json.dump
  5. arg→sink data-flow (heuristic): 출력 인자 dest 가 write sink 인자식에 등장 → tainted
  6. stdout-only(print/sys.stdout) vs 파일 write 구분
  7. Critical7 후보: CLI 출력 인자 값이 무검증으로 write sink path 에 직접 전달

usage:
  python3 scripts/scan_anu_v3_cli_output_sinks_2616.py \
      [--anu-root anu_v3] [--ws-root /home/jay/workspace] [--emit]
  --emit 없으면 stdout 으로 scan-result JSON 만 출력(파일 write 0).
"""
from __future__ import annotations

import argparse
import ast
import json
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional

CANONICAL_WS_ROOT = Path("/home/jay/workspace")
SCHEMA = "anu.task2616.cli_output_sink_scan.v1"

OUTPUT_ARG_TOKENS = ("--out", "--output", "--outfile", "--report-path",
                     "--report_path", "--output-file", "--output_file")
OUTPUT_DEST_TOKENS = ("out", "output", "outfile", "output_file",
                      "report_path", "out_path", "output_path")
WRITE_METHODS = ("write_text", "write_bytes")
WRITE_FUNCS = ("open",)          # 'w'/'a'/'x' 모드만 sink
DUMP_FUNCS = ("dump",)           # json.dump / yaml.dump (file 인자)
STDOUT_MARKERS = ("print", "sys.stdout", "stdout.write")

# 회장 spec §2 명시 대상 task 매핑 (read-only 사실 기록 — 코드 변조 아님)
TASK_FILE_MAP = {
    "task-2610": "anu_v3/batch_hold_adjudicator.py",
    "task-2612": "anu_v3/auto_remediation_planner.py",
    "task-2613": "anu_v3/batch_dependency_classifier.py",
    "task-2615": None,  # decision/events 부재 — 미dispatch 로 명시
}
CRITICAL7_CLASSIFIER_FILES = (
    "anu_v3/critical7_classifier.py",
    "anu_v3/codex_high_classifier.py",
)


def _utcnow() -> str:
    return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")


def _attr_chain(node: ast.AST) -> str:
    """a.b.c() / Path(x).write_text → dotted 표현 best-effort."""
    parts: List[str] = []
    cur = node
    while isinstance(cur, ast.Attribute):
        parts.append(cur.attr)
        cur = cur.value
    if isinstance(cur, ast.Name):
        parts.append(cur.id)
    elif isinstance(cur, ast.Call):
        parts.append(_attr_chain(cur.func) + "()")
    return ".".join(reversed(parts))


def _names_in(node: ast.AST) -> List[str]:
    return [n.id for n in ast.walk(node) if isinstance(n, ast.Name)]


def _open_mode_is_write(call: ast.Call) -> Optional[str]:
    """open(path, 'w'...) → 모드 문자열 반환(write 면), 아니면 None."""
    mode_node: Optional[ast.AST] = None
    if len(call.args) >= 2:
        mode_node = call.args[1]
    for kw in call.keywords:
        if kw.arg == "mode":
            mode_node = kw.value
    if isinstance(mode_node, ast.Constant) and isinstance(mode_node.value, str):
        if any(c in mode_node.value for c in ("w", "a", "x", "+")):
            return mode_node.value
        return None
    if mode_node is None:
        return None  # default 'r' — read, sink 아님
    return "<dynamic>"  # 동적 모드 → 보수적으로 표기(검토 대상)


def scan_file(path: Path, rel: str) -> Dict[str, Any]:
    src = path.read_text(encoding="utf-8")  # READ-ONLY
    try:
        tree = ast.parse(src, filename=rel)
    except SyntaxError as exc:  # pragma: no cover
        return {"file": rel, "parse_error": str(exc)}

    imports: set = set()
    for n in ast.walk(tree):
        if isinstance(n, ast.Import):
            for a in n.names:
                imports.add(a.name.split(".")[0])
        elif isinstance(n, ast.ImportFrom) and n.module:
            imports.add(n.module.split(".")[0])

    frameworks = sorted(
        fw for fw in ("argparse", "click", "typer") if fw in imports
    )

    has_dunder_main = False
    for n in ast.walk(tree):
        if (isinstance(n, ast.If) and isinstance(n.test, ast.Compare)
                and isinstance(n.test.left, ast.Name)
                and n.test.left.id == "__name__"):
            has_dunder_main = True

    entry_funcs = sorted(
        f.name for f in ast.walk(tree)
        if isinstance(f, ast.FunctionDef) and f.name in ("main", "_main")
    )

    # 출력 인자 (argparse add_argument)
    output_args: List[Dict[str, Any]] = []
    for n in ast.walk(tree):
        if (isinstance(n, ast.Call) and isinstance(n.func, ast.Attribute)
                and n.func.attr == "add_argument" and n.args):
            first = n.args[0]
            if isinstance(first, ast.Constant) and isinstance(first.value, str):
                opt = first.value
                dest = opt.lstrip("-").replace("-", "_")
                for kw in n.keywords:
                    if kw.arg == "dest" and isinstance(kw.value, ast.Constant):
                        dest = kw.value.value
                if (opt in OUTPUT_ARG_TOKENS
                        or dest in OUTPUT_DEST_TOKENS):
                    output_args.append(
                        {"option": opt, "dest": dest, "line": n.lineno}
                    )

    # write sink + stdout marker
    write_sinks: List[Dict[str, Any]] = []
    stdout_lines: List[int] = []
    for n in ast.walk(tree):
        if isinstance(n, ast.Call):
            chain = _attr_chain(n.func)
            kind: Optional[str] = None
            detail: Dict[str, Any] = {}
            if isinstance(n.func, ast.Attribute) and n.func.attr in WRITE_METHODS:
                kind = n.func.attr
            elif isinstance(n.func, ast.Name) and n.func.id in WRITE_FUNCS:
                mode = _open_mode_is_write(n)
                if mode is not None:
                    kind = "open"
                    detail["mode"] = mode
            elif (isinstance(n.func, ast.Attribute)
                  and n.func.attr in DUMP_FUNCS
                  and isinstance(n.func.value, ast.Name)
                  and n.func.value.id in ("json", "yaml")):
                kind = f"{n.func.value.id}.dump"
            if kind:
                arg_names = sorted(set(
                    nm for a in (list(n.args) + [k.value for k in n.keywords])
                    for nm in _names_in(a)
                ))
                write_sinks.append({
                    "line": n.lineno,
                    "kind": kind,
                    "expr": chain,
                    "arg_names": arg_names,
                    **detail,
                })
            # stdout marker
            if (isinstance(n.func, ast.Name) and n.func.id == "print") or \
               chain in ("sys.stdout.write", "stdout.write"):
                stdout_lines.append(n.lineno)

    # arg→sink data-flow (heuristic): 출력 인자 dest 가 sink arg_names 에 출현
    dest_tokens = {a["dest"] for a in output_args}
    # argparse 결과는 보통 args.<dest> 형태 → args / a 변수도 추적
    tainted: List[Dict[str, Any]] = []
    for s in write_sinks:
        hit = [d for d in dest_tokens if d in s["arg_names"]]
        # args.out → Name 'args' + Attribute 'out'; arg_names 엔 'args' 만 잡힘.
        # 보강: 같은 라인 소스에 dest 토큰이 .write_text 인자로 등장하는지 텍스트 확인
        src_line = src.splitlines()[s["line"] - 1] if s["line"] <= len(
            src.splitlines()) else ""
        text_hit = [d for d in dest_tokens
                    if (("." + d) in src_line or ("args." + d) in src_line
                        or ("a." + d) in src_line)]
        all_hits = sorted(set(hit) | set(text_hit))
        if all_hits:
            tainted.append({
                "sink_line": s["line"],
                "sink_kind": s["kind"],
                "sink_expr": s["expr"],
                "from_output_args": all_hits,
                "src_line": src_line.strip(),
            })

    is_cli = bool(frameworks) and (has_dunder_main or entry_funcs)
    file_writes = bool(write_sinks)
    # stdout-only 안전: CLI 이고 출력 인자→write sink taint 없음 & write sink 없음
    classification = "non_cli"
    if is_cli:
        if tainted:
            classification = "cli_file_write_tainted"   # Critical7 후보
        elif file_writes:
            classification = "cli_file_write_untainted"  # 검토(arg 무관 write)
        else:
            classification = "cli_stdout_only_safe"

    return {
        "file": rel,
        "frameworks": frameworks,
        "has_dunder_main": has_dunder_main,
        "entry_funcs": entry_funcs,
        "is_cli_entrypoint": is_cli,
        "output_args": output_args,
        "write_sinks": write_sinks,
        "stdout_marker_lines": sorted(stdout_lines),
        "arg_to_sink_taint": tainted,
        "classification": classification,
        "task_id": next(
            (t for t, f in TASK_FILE_MAP.items() if f == rel), None),
        "is_critical7_classifier_family": rel in CRITICAL7_CLASSIFIER_FILES,
    }


def run_scan(anu_root: Path, ws_root: Path) -> Dict[str, Any]:
    files = sorted(anu_root.glob("*.py"))
    per_file: List[Dict[str, Any]] = []
    for fp in files:
        rel = str(fp.relative_to(ws_root))
        per_file.append(scan_file(fp, rel))

    cli = [r for r in per_file if r.get("is_cli_entrypoint")]
    crit = [r for r in per_file
            if r.get("classification") == "cli_file_write_tainted"]
    safe = [r for r in per_file
            if r.get("classification") == "cli_stdout_only_safe"]
    untainted = [r for r in per_file
                 if r.get("classification") == "cli_file_write_untainted"]

    return {
        "schema": SCHEMA,
        "task_id": "task-2616",
        "mode": "READ_ONLY_AST_SCAN (scan 대상 byte-0·write 0)",
        "generated_utc": _utcnow(),
        "anu_root": str(anu_root),
        "ws_root": str(ws_root),
        "totals": {
            "py_files": len(files),
            "cli_entrypoints": len(cli),
            "critical7_candidates": len(crit),
            "stdout_only_safe": len(safe),
            "file_write_untainted_review": len(untainted),
        },
        "task_scope_mapping": {
            t: {"file": f,
                "present": (f is not None and (anu_root.parent / f).exists()
                            if f else False),
                "note": ("미dispatch — memory/events/task-2615.* 부재"
                         if t == "task-2615" else "decision/result 존재")}
            for t, f in TASK_FILE_MAP.items()
        },
        "critical7_classifier_family": [
            {"file": f,
             "classification": next(
                 (r["classification"] for r in per_file if r["file"] == f),
                 "not_found")}
            for f in CRITICAL7_CLASSIFIER_FILES
        ],
        "critical7_candidates": crit,
        "stdout_only_safe": [r["file"] for r in safe],
        "file_write_untainted_review": [r["file"] for r in untainted],
        "per_file": per_file,
    }


def main(argv: Optional[List[str]] = None) -> int:
    p = argparse.ArgumentParser(
        prog="scan_anu_v3_cli_output_sinks_2616",
        description="task-2616 READ-ONLY ANU v3 CLI output-sink scanner")
    p.add_argument("--anu-root", default=str(CANONICAL_WS_ROOT / "anu_v3"))
    p.add_argument("--ws-root", default=str(CANONICAL_WS_ROOT))
    p.add_argument("--emit", action="store_true",
                   help="memory/events/task-2616.scan-result.json write")
    a = p.parse_args(argv)

    anu_root = Path(a.anu_root).resolve()
    ws_root = Path(a.ws_root).resolve()
    result = run_scan(anu_root, ws_root)
    text = json.dumps(result, ensure_ascii=False, indent=2)

    if a.emit:
        # 산출물 write 는 task-2616 허용 경로(memory/events) 한정 — 고정 경로
        out = ws_root / "memory" / "events" / "task-2616.scan-result.json"
        out.write_text(text + "\n", encoding="utf-8")
        print(f"scan-result → {out}")
    else:
        print(text)
    return 0


if __name__ == "__main__":  # pragma: no cover
    raise SystemExit(main())
