# -*- coding: utf-8 -*-
"""tests.regression.test_real_merge_inert_pass_path — task-2637.

pass path 에서도 실 GitHub 호출 0 + artifact 4종 (3 writer + 1 schema) 정합 단언.

Spec: memory/specs/system_real_merge_executor_wiring_spec_260523.md §5 / §7
"""
from __future__ import annotations

import json
import os
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Any, List

from utils.real_merge_artifact_schema import (
    EXECUTION_RESULT_FILENAME,
    EXECUTION_RESULT_SCHEMA,
    MERGE_DECISION_FILENAME,
    MERGE_DECISION_SCHEMA,
    POST_SMOKE_SCHEMA,
    PRE_GATE_SNAPSHOT_FILENAME,
    PRE_GATE_SNAPSHOT_SCHEMA,
    REAL_MERGE_EVENTS_RELDIR,
    resolve_artifact_dir,
    validate_post_smoke_payload,
)
from utils.real_merge_hooks import (
    FORBIDDEN_PATHS,
    NO_OP_FORBIDDEN_PATH,
    REAL_MERGE_DONE,
    real_merge_execute,
)

FIXTURE_ROOT = (
    Path(__file__).resolve().parent.parent / "fixtures" / "real_merge_wiring"
)
KST = timezone(timedelta(hours=9))


def _clock_at(text: str):
    dt = datetime.strptime(text, "%Y-%m-%dT%H:%M:%S%z")
    return lambda: dt


def _load_pass_fixture():
    base = FIXTURE_ROOT / "pass_path_inert_artifact_only"
    evidence = json.loads((base / "evidence.json").read_text(encoding="utf-8"))
    expected = json.loads((base / "expected.json").read_text(encoding="utf-8"))
    return evidence, expected


def test_pass_path_writes_three_artifacts_and_zero_subprocess(tmp_path):
    evidence, expected = _load_pass_fixture()
    clock = _clock_at(evidence["frozen_now_kst"])
    canonical_root = str(tmp_path)

    seen_calls: List[Any] = []

    def trap(*args: Any, **kwargs: Any) -> Any:
        seen_calls.append((args, kwargs))
        raise AssertionError("subprocess_runner must not be invoked on inert pass path")

    out = real_merge_execute(
        merge_ready_result=evidence["merge_ready_result"],
        pr_identity=evidence["pr_identity"],
        gate_snapshot=evidence["gate_snapshot"],
        dryrun_artifact=evidence["dryrun_artifact"],
        callback_envelope=evidence["callback_envelope"],
        chair_authorization=evidence["chair_authorization"],
        activation_flag=True,
        subprocess_runner=None,  # inert path
        canonical_root=canonical_root,
        changed_files=evidence["changed_files"],
        clock=clock,
    )

    # Result is REAL_MERGE_DONE with actually_executed=True (per spec §7.1).
    assert out["result_enum"] == REAL_MERGE_DONE == expected["result_enum"]
    assert out["decision"]["actually_executed"] is True
    assert out["subprocess_invocations"] == []
    assert seen_calls == []
    assert out["chair_report_required"] is False

    # 3 artifact files exist under canonical_root/memory/events/real_merge/pr_<pr>/<sha>/.
    pr = evidence["pr_identity"]["pr"]
    head_sha = evidence["pr_identity"]["head_sha"]
    artifact_dir = Path(resolve_artifact_dir(pr, head_sha, canonical_root))
    assert artifact_dir.is_dir()
    assert (artifact_dir / MERGE_DECISION_FILENAME).is_file()
    assert (artifact_dir / PRE_GATE_SNAPSHOT_FILENAME).is_file()
    assert (artifact_dir / EXECUTION_RESULT_FILENAME).is_file()

    # Schemas correct.
    decision = json.loads((artifact_dir / MERGE_DECISION_FILENAME).read_text())
    snapshot = json.loads((artifact_dir / PRE_GATE_SNAPSHOT_FILENAME).read_text())
    execution = json.loads((artifact_dir / EXECUTION_RESULT_FILENAME).read_text())
    assert decision["schema"] == MERGE_DECISION_SCHEMA
    assert snapshot["schema"] == PRE_GATE_SNAPSHOT_SCHEMA
    assert execution["schema"] == EXECUTION_RESULT_SCHEMA
    assert execution["non_admin"] is True
    assert execution["admin_override_used"] is False
    assert execution["subprocess_invocations"] == []
    # inert path notes itself.
    assert "inert" in execution.get("note", "").lower()

    # 4th schema (post_smoke) — schema-only, no writer. Synthesize a payload
    # and assert validator accepts it.
    post = {
        "schema": POST_SMOKE_SCHEMA,
        "ts_kst": "2026-05-23T12:05:00+09:00",
        "merge_commit_sha": "deadbeef00000000000000000000000000abcdef",
        "origin_main_before": "88a1f2b0a0141b4d4aed7fb3bcd6b9cba54d78b9",
        "origin_main_after": "deadbeef00000000000000000000000000abcdef",
        "smoke_checks": [{"name": "main_fetch_ff", "ok": True}],
        "all_ok": True,
    }
    assert validate_post_smoke_payload(post) == []
    assert expected["post_smoke_schema_validates"] is True


def test_pass_path_canonical_root_layout(tmp_path):
    """Artifact directory must match memory/events/real_merge/pr_<pr>/<sha>/ layout."""
    evidence, _ = _load_pass_fixture()
    clock = _clock_at(evidence["frozen_now_kst"])
    canonical_root = str(tmp_path)

    out = real_merge_execute(
        merge_ready_result=evidence["merge_ready_result"],
        pr_identity=evidence["pr_identity"],
        gate_snapshot=evidence["gate_snapshot"],
        dryrun_artifact=evidence["dryrun_artifact"],
        callback_envelope=evidence["callback_envelope"],
        chair_authorization=evidence["chair_authorization"],
        activation_flag=True,
        subprocess_runner=None,
        canonical_root=canonical_root,
        changed_files=evidence["changed_files"],
        clock=clock,
    )
    pr = evidence["pr_identity"]["pr"]
    head_sha = evidence["pr_identity"]["head_sha"]
    expected_dir = os.path.normpath(
        os.path.join(canonical_root, REAL_MERGE_EVENTS_RELDIR, f"pr_{pr}", head_sha)
    )
    assert out["merge_decision_path"].startswith(expected_dir)
    assert out["execution_result_path"].startswith(expected_dir)


def test_dedupe_no_double_merge(tmp_path):
    """Second invocation with same (pr, head_sha) → NO_OP_DUPLICATE."""
    evidence, _ = _load_pass_fixture()
    clock = _clock_at(evidence["frozen_now_kst"])
    canonical_root = str(tmp_path)

    first = real_merge_execute(
        merge_ready_result=evidence["merge_ready_result"],
        pr_identity=evidence["pr_identity"],
        gate_snapshot=evidence["gate_snapshot"],
        dryrun_artifact=evidence["dryrun_artifact"],
        callback_envelope=evidence["callback_envelope"],
        chair_authorization=evidence["chair_authorization"],
        activation_flag=True,
        subprocess_runner=None,
        canonical_root=canonical_root,
        changed_files=evidence["changed_files"],
        clock=clock,
    )
    assert first["result_enum"] == REAL_MERGE_DONE

    second = real_merge_execute(
        merge_ready_result=evidence["merge_ready_result"],
        pr_identity=evidence["pr_identity"],
        gate_snapshot=evidence["gate_snapshot"],
        dryrun_artifact=evidence["dryrun_artifact"],
        callback_envelope=evidence["callback_envelope"],
        chair_authorization=evidence["chair_authorization"],
        activation_flag=True,
        subprocess_runner=None,
        canonical_root=canonical_root,
        changed_files=evidence["changed_files"],
        clock=clock,
    )
    assert second["result_enum"] == "NO_OP_DUPLICATE"
    assert second["subprocess_invocations"] == []


def test_forbidden_path_short_circuits_before_any_gate(tmp_path):
    """changed_files contains any forbidden path → NO_OP_FORBIDDEN_PATH + CHAIR_REPORT."""
    evidence, _ = _load_pass_fixture()
    clock = _clock_at(evidence["frozen_now_kst"])
    canonical_root = str(tmp_path)
    # Inject a single forbidden file alongside legitimate ones.
    contaminated = list(evidence["changed_files"]) + [FORBIDDEN_PATHS[0]]

    out = real_merge_execute(
        merge_ready_result=evidence["merge_ready_result"],
        pr_identity=evidence["pr_identity"],
        gate_snapshot=evidence["gate_snapshot"],
        dryrun_artifact=evidence["dryrun_artifact"],
        callback_envelope=evidence["callback_envelope"],
        chair_authorization=evidence["chair_authorization"],
        activation_flag=True,
        subprocess_runner=None,
        canonical_root=canonical_root,
        changed_files=contaminated,
        clock=clock,
    )
    assert out["result_enum"] == NO_OP_FORBIDDEN_PATH
    assert out["chair_report_required"] is True
    assert out["subprocess_invocations"] == []
    assert out["decision"]["actually_executed"] is False


def test_mock_runner_pass_path_records_invocation_zero_admin_tokens(tmp_path):
    """When a mock runner IS provided + rc=0, REAL_MERGE_DONE records argv with
    NO admin override tokens (--admin / --force)."""
    evidence, _ = _load_pass_fixture()
    clock = _clock_at(evidence["frozen_now_kst"])
    canonical_root = str(tmp_path)

    captured: List[List[str]] = []

    class FakeProc:
        returncode = 0
        stdout = "ok"
        stderr = ""

    def fake_runner(argv: List[str], timeout: int) -> Any:
        captured.append(list(argv))
        return FakeProc()

    out = real_merge_execute(
        merge_ready_result=evidence["merge_ready_result"],
        pr_identity=evidence["pr_identity"],
        gate_snapshot=evidence["gate_snapshot"],
        dryrun_artifact=evidence["dryrun_artifact"],
        callback_envelope=evidence["callback_envelope"],
        chair_authorization=evidence["chair_authorization"],
        activation_flag=True,
        subprocess_runner=fake_runner,
        canonical_root=canonical_root,
        changed_files=evidence["changed_files"],
        clock=clock,
    )
    assert out["result_enum"] == REAL_MERGE_DONE
    assert len(out["subprocess_invocations"]) == 1
    # CRITICAL: composed argv must never carry admin override CLI tokens.
    composed = captured[0]
    assert "--admin" not in composed
    assert "--force" not in composed
    assert "--merge" in composed
    assert "--delete-branch=false" in composed
