#!/usr/bin/env python3
"""Checkmk special agent for Azure Reservations monitoring.

Fetches Azure Reserved Instances of any reservable resource type, optional
multi-window utilization data (Consumption API), and a per-run RBAC self-check.

Authentication: OAuth2 client credentials flow against Azure AD.
API base: https://management.azure.com
"""

import argparse
import contextlib
import datetime
import hashlib
import io
import json
import os
import re
import sys
import time
from typing import Any

import requests

try:
    from cmk.utils.password_store import replace_passwords
except ImportError:
    def replace_passwords() -> None:
        pass

AZURE_MGMT_BASE = "https://management.azure.com"
AZURE_MGMT_SCOPE = "https://management.azure.com/.default"

API_VERSION_SUBSCRIPTIONS = "2022-12-01"
API_VERSION_RESERVATIONS = "2022-11-01"
API_VERSION_CONSUMPTION = "2021-10-01"

# Per-call timeout for the RBAC probes (Phase A2). Isolated from the
# user-configured --timeout so a missing role on one subscription can't slow
# the whole run by 30s per failed probe. Bumped above the 5s default because
# the Consumption usageDetails endpoint is reliably slow even on empty
# results (10+s).
PERMISSION_PROBE_TIMEOUT = 20

# ---------------------------------------------------------------------------
# Phase V (v1.5.1): hold-last-good cache for utilization
# ---------------------------------------------------------------------------
#
# Azure's Consumption reservationSummaries endpoint goes flaky during nightly
# aggregation windows and under rate-limit pressure. Rather than let those
# transient failures produce false UNKNOWN / CRIT states, we persist the
# last successful per-reservation utilization values to disk and replay them
# (with a "data stale for N minutes" marker) until either the live API
# recovers or the cache exceeds the operator's configured max age.

_STATE_SCHEMA_VERSION = 1
_SUMMARY_SCHEMA_VERSION = 1
_HOST_SAFE_RE = re.compile(r"[^A-Za-z0-9._-]+")


def _sanitise_host(host: str) -> str:
    safe = _HOST_SAFE_RE.sub("_", host or "unknown_host").strip("_")
    return safe or "unknown_host"


def _state_path(host: str) -> str:
    omd_root = os.environ.get("OMD_ROOT", "/tmp")
    return os.path.join(
        omd_root, "var", "check_mk", "azure_reservations",
        _sanitise_host(host), "state.json",
    )


def _load_state(host: str) -> dict:
    """Read the per-host hold-last-good cache. Never raises."""
    path = _state_path(host)
    try:
        with open(path, "r", encoding="utf-8") as fh:
            data = json.load(fh)
        if not isinstance(data, dict):
            return {"schema_version": _STATE_SCHEMA_VERSION, "reservations": {}}
        if data.get("schema_version") != _STATE_SCHEMA_VERSION:
            # Future / older schema — treat as empty and overwrite on save.
            return {"schema_version": _STATE_SCHEMA_VERSION, "reservations": {}}
        data.setdefault("reservations", {})
        return data
    except (OSError, ValueError):
        return {"schema_version": _STATE_SCHEMA_VERSION, "reservations": {}}


def _save_state(host: str, state: dict) -> None:
    """Atomically write the per-host cache. Never raises."""
    path = _state_path(host)
    try:
        os.makedirs(os.path.dirname(path), exist_ok=True)
        tmp = path + ".tmp"
        with open(tmp, "w", encoding="utf-8") as fh:
            json.dump(state, fh, separators=(",", ":"))
        os.replace(tmp, path)
    except OSError as exc:
        print(f"WARNING: could not persist util cache to {path}: {exc}", file=sys.stderr)


# ---------------------------------------------------------------------------
# v1.6.x: per-run diagnostics / summary metadata
# ---------------------------------------------------------------------------
# Persisted to summary.json in the same per-host directory as state.json so
# the summary check plugin can surface stale cached metadata even on a cache
# hit (fast path).  Intentionally a SEPARATE file with its own schema_version
# so that a schema bump here NEVER wipes the util cache in state.json.


def _summary_path(host: str) -> str:
    omd_root = os.environ.get("OMD_ROOT", "/tmp")
    return os.path.join(
        omd_root, "var", "check_mk", "azure_reservations",
        _sanitise_host(host), "summary.json",
    )


def _load_summary(host: str) -> dict:
    """Read the persisted summary metadata. Never raises; returns empty dict on miss."""
    path = _summary_path(host)
    try:
        with open(path, "r", encoding="utf-8") as fh:
            data = json.load(fh)
        if not isinstance(data, dict):
            return {}
        if data.get("schema_version") != _SUMMARY_SCHEMA_VERSION:
            return {}
        return data
    except (OSError, ValueError):
        return {}


def _save_summary(host: str, meta: dict) -> None:
    """Atomically persist summary metadata. Never raises."""
    path = _summary_path(host)
    try:
        os.makedirs(os.path.dirname(path), exist_ok=True)
        tmp = path + ".tmp"
        payload = {"schema_version": _SUMMARY_SCHEMA_VERSION}
        payload.update(meta)
        with open(tmp, "w", encoding="utf-8") as fh:
            json.dump(payload, fh, separators=(",", ":"))
        os.replace(tmp, path)
    except OSError as exc:
        print(f"WARNING: could not persist summary metadata to {path}: {exc}", file=sys.stderr)


_CACHED_WINDOW_FIELDS = (
    "utilization_pct_7d", "utilization_pct_30d", "utilization_pct_90d",
    "utilization_pct_6mo", "utilization_pct_12mo", "utilization_pct",
)


def _update_cached_reservation(
    state: dict, reservation_id: str, windows: dict, now: float
) -> None:
    entry = {"fetched_epoch": now}
    for k in _CACHED_WINDOW_FIELDS:
        if k in windows:
            entry[k] = windows[k]
    state.setdefault("reservations", {})[reservation_id] = entry


def _replay_cached_reservation(
    norm: dict, windows: dict, cached: dict, now: float,
    warn_minutes: int, max_minutes: int,
) -> None:
    """Splice cached windows into `norm` with stale-age markers.

    `norm` is mutated; `windows` only contributes its fetch-status flags
    (so the check plugin still sees what's currently broken).
    """
    stale_min = (now - cached.get("fetched_epoch", now)) / 60.0
    if stale_min > max_minutes:
        # Cache too old to trust — let the missing-data path handle it.
        norm.update(windows)
        return
    for k in _CACHED_WINDOW_FIELDS:
        if k in cached:
            norm[k] = cached[k]
    norm["utilization_cache_stale_minutes"] = round(stale_min, 1)
    norm["utilization_cache_stale_warn_at"] = warn_minutes
    norm["utilization_cache_stale_max_at"] = max_minutes
    # Carry the fetch-status flags so the check plugin can surface which
    # leg failed even when displaying cached values.
    for k, v in windows.items():
        if k.startswith("utilization_fetch_"):
            norm[k] = v


# ---------------------------------------------------------------------------
# v1.6.0: whole-output cache
# ---------------------------------------------------------------------------
#
# Checkmk invokes a special agent on every check cycle (default 60 s), but
# every data class this agent collects (Consumption utilization, reservation
# expiry, RBAC roles) changes at most once a day.
# Re-querying Azure every minute is therefore ~98 % waste and a 429-throttling
# risk. We cache the agent's ENTIRE rendered stdout per host and replay it on
# every cycle until the cache exceeds `--output-cache-minutes` (default 30),
# at which point exactly one cycle does a live refresh. This is the same idea
# the built-in Microsoft Azure agent implements with DataCache; we vendor a
# small purpose-built version rather than import cmk's unstable internals.
#
# This is an OUTER layer, independent of and complementary to the v1.5.1
# hold-last-good utilization cache (which lives inside a live refresh run).


def _output_cache_dir(host: str) -> str:
    omd_root = os.environ.get("OMD_ROOT", "/tmp")
    return os.path.join(
        omd_root, "var", "check_mk", "azure_reservations",
        _sanitise_host(host),
    )


def _args_fingerprint(args: argparse.Namespace) -> str:
    """Stable hash of every output-shaping argument.

    A change to any of these means the cached blob no longer matches what the
    operator asked for, so it must be invalidated. The client SECRET is
    deliberately excluded (never hash a credential in cleartext); tenant and
    client IDs are non-secret identifiers and ARE included because they
    determine which tenant's data the blob describes.
    """
    parts = [
        args.tenant_id,
        args.client_id,
        args.subscriptions,
        str(args.timeout),
        str(bool(args.no_utilization)),
        str(bool(args.check_permissions)),
        str(args.util_cache_warn_minutes),
        str(args.util_cache_max_minutes),
    ]
    return hashlib.sha256("|".join(parts).encode("utf-8")).hexdigest()[:16]


class _OutputCache:
    """Per-host, per-config cache of the agent's full rendered stdout.

    Stored as a plain-text file (the stdout blob verbatim) so a cache hit
    replays byte-identical Checkmk sections. Never raises on I/O errors —
    a broken cache degrades to a live refresh, never to a crash.
    """

    def __init__(self, host: str, fingerprint: str, interval_seconds: int) -> None:
        self._dir = _output_cache_dir(host)
        self._fingerprint = fingerprint
        self._path = os.path.join(self._dir, f"output.{fingerprint}.cache")
        self._interval = max(0, interval_seconds)

    def read_fresh(self) -> str | None:
        """Return the cached blob iff it exists and is younger than interval."""
        try:
            mtime = os.stat(self._path).st_mtime
        except OSError:
            return None
        age = time.time() - mtime
        if 0 <= age < self._interval:
            return self.read_any()
        return None

    def read_any(self) -> str | None:
        """Return the cached blob regardless of age (serve-stale fallback)."""
        try:
            with open(self._path, "r", encoding="utf-8") as fh:
                return fh.read()
        except OSError:
            return None

    def write(self, blob: str) -> None:
        """Atomically persist the blob; prune sibling configs. Never raises."""
        try:
            os.makedirs(self._dir, exist_ok=True)
            tmp = self._path + ".tmp"
            with open(tmp, "w", encoding="utf-8") as fh:
                fh.write(blob)
            os.replace(tmp, self._path)
            self._prune_other_fingerprints()
        except OSError as exc:
            print(
                f"WARNING: could not persist output cache to {self._path}: {exc}",
                file=sys.stderr,
            )

    def _prune_other_fingerprints(self) -> None:
        """Drop cache files from superseded configs so the dir can't grow."""
        try:
            for name in os.listdir(self._dir):
                if (
                    name.startswith("output.")
                    and name.endswith(".cache")
                    and name != os.path.basename(self._path)
                ):
                    with contextlib.suppress(OSError):
                        os.remove(os.path.join(self._dir, name))
        except OSError:
            pass


# ---------------------------------------------------------------------------
# Argument parsing
# ---------------------------------------------------------------------------

def parse_arguments(argv: list[str]) -> argparse.Namespace:
    p = argparse.ArgumentParser(
        description="Checkmk special agent for Azure Reservations monitoring"
    )
    p.add_argument("--tenant-id", required=True, help="Azure AD Tenant ID (GUID)")
    p.add_argument("--client-id", required=True, help="Service Principal Application (Client) ID")
    p.add_argument("--client-secret", required=True, help="Service Principal client secret")
    p.add_argument(
        "--subscriptions",
        default="",
        help="Comma-separated subscription IDs to monitor (default: all accessible)",
    )
    p.add_argument(
        "--timeout",
        type=int,
        default=60,
        help=(
            "HTTP request timeout in seconds (default: 60). The Consumption "
            "API is reliably slow (10+s baseline); 60 s leaves headroom for "
            "tail-latency spikes. Bump higher if you see frequent timeouts."
        ),
    )
    # Phase A toggles
    p.add_argument("--no-utilization", action="store_true",
                   help="Skip Consumption API utilization queries")
    # Phase A2: permissions self-check
    p.add_argument("--check-permissions", dest="check_permissions",
                   action="store_true", default=True,
                   help="Probe required RBAC roles and emit a permissions section")
    p.add_argument("--no-check-permissions", dest="check_permissions",
                   action="store_false",
                   help="Skip the RBAC self-check")
    # v1.5.1: hold-last-good cache thresholds for utilization
    p.add_argument(
        "--util-cache-warn-minutes",
        type=int,
        default=90,
        help=(
            "Once the cached utilization value (used when the Consumption "
            "API is transiently unreachable) is older than this many "
            "minutes, the check plugin escalates the per-reservation "
            "service to WARN. Default 90."
        ),
    )
    p.add_argument(
        "--util-cache-max-minutes",
        type=int,
        default=360,
        help=(
            "Above this many minutes the agent stops replaying the cached "
            "utilization value and emits the fetch-failure status instead "
            "(so the check shows UNKNOWN / missing-data rather than a "
            "stale value). Default 360 (6 h)."
        ),
    )
    # v1.5.2: stable per-host identifier used only for the hold-last-good
    # cache path. Sourced from host_config.name in the server-side-calls
    # module; safe to omit when invoking the agent by hand (we fall back
    # to OMD_HOSTNAME or "unknown_host"). The agent intentionally takes no
    # host address — it talks to Azure's REST API using credentials only,
    # mirroring the built-in Microsoft Azure / GCP special agents.
    p.add_argument(
        "--host-key",
        dest="host_key",
        default=None,
        help=(
            "Stable per-host identifier; used only as the cache key for "
            "the hold-last-good utilization state file. Defaults to the "
            "OMD_HOSTNAME env var, then 'unknown_host'."
        ),
    )
    # v1.6.0: whole-output cache. The agent talks to Azure at most once per
    # this interval; every other Checkmk cycle replays the last rendered
    # output from disk, making zero Azure calls.
    p.add_argument(
        "--output-cache-minutes",
        type=int,
        default=30,
        help=(
            "How long a rendered run is cached and replayed before the agent "
            "queries Azure again (default: 30). Reservation/utilization data "
            "is daily-grained, so a longer interval costs nothing in freshness "
            "and protects against Consumption-API 429 throttling. A just-made "
            "Azure or config change can take up to this long to appear."
        ),
    )
    p.add_argument(
        "--no-cache",
        action="store_true",
        help=(
            "Bypass the whole-output cache: always query Azure and do not "
            "read or write the cached blob. Diagnostics / forced refresh only. "
            "The inner hold-last-good utilization cache is unaffected."
        ),
    )
    # Legacy shim: pre-v1.5.2 server-side-calls passed the host address
    # as a trailing positional. Accept it for one release so a mid-upgrade
    # activate-changes window can't fail the agent.
    p.add_argument(
        "hostaddress",
        nargs="?",
        default="",
        help=argparse.SUPPRESS,
    )

    args = p.parse_args(argv)
    args.subscription_filter: list[str] = [
        s.strip() for s in args.subscriptions.split(",") if s.strip()
    ]
    args.cache_key = (
        args.host_key
        or args.hostaddress
        or os.environ.get("OMD_HOSTNAME")
        or "unknown_host"
    )
    return args


# ---------------------------------------------------------------------------
# Authentication
# ---------------------------------------------------------------------------

def get_token(tenant_id: str, client_id: str, client_secret: str, timeout: int) -> str:
    token_url = f"https://login.microsoftonline.com/{tenant_id}/oauth2/v2.0/token"
    resp = requests.post(
        token_url,
        data={
            "grant_type": "client_credentials",
            "client_id": client_id,
            "client_secret": client_secret,
            "scope": AZURE_MGMT_SCOPE,
        },
        headers={"Content-Type": "application/x-www-form-urlencoded"},
        timeout=timeout,
        verify=True,
    )
    if resp.status_code == 400:
        print(
            f"ERROR: Token request returned 400. Check tenant_id, client_id, and client_secret. "
            f"Response: {resp.text[:300]}",
            file=sys.stderr,
        )
        sys.exit(1)
    if resp.status_code == 401:
        print(
            "ERROR: Authentication failed (401). Check client_id and client_secret.",
            file=sys.stderr,
        )
        sys.exit(1)
    resp.raise_for_status()

    payload = resp.json()
    token = payload.get("access_token", "")
    if not token:
        print(
            f"ERROR: No access_token in token response. Keys: {list(payload.keys())}",
            file=sys.stderr,
        )
        sys.exit(1)
    return token


def make_session(token: str) -> requests.Session:
    session = requests.Session()
    session.headers.update({
        "Authorization": f"Bearer {token}",
        "Accept": "application/json",
    })
    return session


# ---------------------------------------------------------------------------
# API helpers
# ---------------------------------------------------------------------------

def _handle_api_error(resp: requests.Response, path: str) -> None:
    if resp.status_code == 401:
        print(f"ERROR: 401 Unauthorized on {path} — Bearer token rejected.", file=sys.stderr)
        sys.exit(1)
    if resp.status_code == 403:
        print(
            f"WARNING: 403 Forbidden on {path} — insufficient permissions. "
            "Run with --check-permissions for actionable RBAC guidance.",
            file=sys.stderr,
        )
        return
    if resp.status_code == 429:
        print(f"WARNING: Rate limit (429) on {path}.", file=sys.stderr)
        return
    if not resp.ok:
        print(
            f"WARNING: HTTP {resp.status_code} on {path}: {resp.text[:300]}",
            file=sys.stderr,
        )


def _resp_json_safe(
    resp: requests.Response, url: str
) -> tuple[Any, bool]:
    """Safely parse resp.json() with three-way distinction.

    Returns (parsed, ok):
      * HTTP 204 OR empty body => (None, True)  — legitimate empty, no warning.
      * Non-empty body that fails json parse => (None, False) — broken/HTML page,
        caller should emit a sanitised warning.
      * Valid JSON => (parsed, True) — unchanged fast path.
    """
    # Legitimate empty: 204 No Content, blank body, or Content-Length: 0.
    content_length = resp.headers.get("Content-Length", "").strip()
    is_empty = (
        resp.status_code == 204
        or (content_length == "0")
        or (not resp.text.strip())
    )
    if is_empty:
        return None, True
    try:
        return resp.json(), True
    except ValueError:
        return None, False


def api_get_paged(session: requests.Session, url: str, timeout: int) -> list[dict]:
    results: list[dict] = []
    next_url: str | None = url
    while next_url:
        try:
            resp = session.get(next_url, timeout=timeout)
        except requests.exceptions.RequestException as exc:
            print(f"WARNING: Request failed for {next_url}: {exc}", file=sys.stderr)
            break
        if resp.status_code == 403:
            _handle_api_error(resp, next_url)
            break
        if not resp.ok:
            _handle_api_error(resp, next_url)
            break
        data, ok = _resp_json_safe(resp, next_url)
        if not ok:
            print(
                f"WARNING: non-JSON response from {next_url}: "
                f"{resp.text[:300].replace(chr(10), ' ')}",
                file=sys.stderr,
            )
            break
        if data is None:
            break
        results.extend(data.get("value", []))
        next_url = data.get("nextLink")
    return results


def api_get(session: requests.Session, url: str, timeout: int) -> Any | None:
    try:
        resp = session.get(url, timeout=timeout)
    except requests.exceptions.RequestException as exc:
        print(f"WARNING: GET {url} failed: {exc}", file=sys.stderr)
        return None
    if resp.status_code == 404:
        return None
    if not resp.ok:
        _handle_api_error(resp, url)
        return None
    data, ok = _resp_json_safe(resp, url)
    if not ok:
        print(
            f"WARNING: non-JSON response from {url}: "
            f"{resp.text[:300].replace(chr(10), ' ')}",
            file=sys.stderr,
        )
        return None
    return data


def probe_status(session: requests.Session, url: str, timeout: int) -> tuple[int | None, str]:
    """Issue a single GET and return (http_status, error_detail).

    Used by the RBAC probe; never raises, never exits the agent.
    """
    try:
        resp = session.get(url, timeout=timeout)
    except requests.exceptions.RequestException as exc:
        return None, str(exc)
    detail = ""
    if not resp.ok:
        # Truncate to keep section payloads small.
        detail = resp.text[:200].replace("\n", " ").replace("|", "/")
    return resp.status_code, detail


# ---------------------------------------------------------------------------
# Data collection helpers
# ---------------------------------------------------------------------------

def _iso_to_epoch(dt_str: str) -> float:
    if not dt_str:
        return 0.0
    # Robust parse: handles Z, +HH:MM and -HH:MM offsets.
    try:
        dt = datetime.datetime.fromisoformat(dt_str.replace("Z", "+00:00"))
        return dt.timestamp()
    except (ValueError, AttributeError):
        # Fallback for date-only strings (YYYY-MM-DD) or other formats.
        try:
            dt = datetime.datetime.strptime(dt_str[:10], "%Y-%m-%d")
            return dt.replace(tzinfo=datetime.timezone.utc).timestamp()
        except ValueError:
            return 0.0


def collect_subscriptions(
    session: requests.Session, sub_filter: list[str], timeout: int
) -> list[dict]:
    url = f"{AZURE_MGMT_BASE}/subscriptions?api-version={API_VERSION_SUBSCRIPTIONS}"
    subs = api_get_paged(session, url, timeout)
    if sub_filter:
        subs = [s for s in subs if s.get("subscriptionId") in sub_filter]
    return subs


def collect_reservation_orders(session: requests.Session, timeout: int) -> list[str]:
    url = (
        f"{AZURE_MGMT_BASE}/providers/Microsoft.Capacity/reservationOrders"
        f"?api-version={API_VERSION_RESERVATIONS}"
    )
    orders = api_get_paged(session, url, timeout)
    return [o["name"] for o in orders if o.get("name")]


def collect_reservations(
    session: requests.Session, order_id: str, timeout: int
) -> list[dict]:
    url = (
        f"{AZURE_MGMT_BASE}/providers/Microsoft.Capacity/reservationOrders"
        f"/{order_id}/reservations"
        f"?api-version={API_VERSION_RESERVATIONS}&$expand=utilizationInfo"
    )
    return api_get_paged(session, url, timeout)


def _mean_pct(records: list[dict]) -> float | None:
    """Return the simple mean of avgUtilizationPercentage across records.

    Returns None when no records (so the check plugin can treat the window
    as 'missing data'). usedQuantity / reservedQuantity on the Consumption
    reservationSummaries response are unreliable (often 0 or null even
    when avgPct is non-zero), so we use the API's pre-computed avg.
    """
    if not records:
        return None
    vals = []
    for r in records:
        v = r.get("properties", {}).get("avgUtilizationPercentage")
        if v is None:
            continue
        try:
            vals.append(float(v))
        except (TypeError, ValueError):
            continue
    if not vals:
        return None
    return sum(vals) / len(vals)


# Phase S2 (v1.5.1): the Consumption reservationSummaries endpoint is slow
# (~10 s baseline) and prone to transient 429 / 503 / timeout. A single
# silent failure used to cascade into a synthetic 0.0 utilization that the
# check plugin then reported as CRIT. We retry up to 2 times with short
# backoff and surface the outcome as a status flag so the check plugin can
# distinguish "fetch failed" from "data is actually zero".
_CONSUMPTION_RETRY_DELAYS = (3, 7)


def _consumption_get_with_retry(
    session: requests.Session, url: str, timeout: int
) -> tuple[dict | None, str]:
    """GET with retries for transient 429 / 5xx / network failures.

    Returns (payload, status_label) where status_label is one of:
      'ok'        — got a 2xx response, payload may still be empty
      'retried'   — succeeded after one or more retries
      'throttled' — 429 every attempt
      'unavail'   — 5xx every attempt
      'timeout'   — network exception every attempt
      'forbidden' — 403 (no retry — RBAC issue, not transient)
    """
    last_status = "timeout"
    for attempt, delay in enumerate([0] + list(_CONSUMPTION_RETRY_DELAYS)):
        if delay:
            time.sleep(delay)
        try:
            resp = session.get(url, timeout=timeout)
        except requests.exceptions.RequestException:
            last_status = "timeout"
            continue
        if resp.status_code == 403:
            _handle_api_error(resp, url)
            return None, "forbidden"
        if resp.status_code == 429:
            last_status = "throttled"
            continue
        if 500 <= resp.status_code < 600:
            last_status = "unavail"
            continue
        if not resp.ok:
            _handle_api_error(resp, url)
            return None, "unavail"
        try:
            return resp.json(), ("ok" if attempt == 0 else "retried")
        except ValueError:
            last_status = "unavail"
            continue
    return None, last_status


def fetch_reservation_utilization_windows(
    session: requests.Session, order_id: str, res_id: str, timeout: int
) -> dict:
    """Fetch utilization across multiple lookback windows.

    Returns a dict containing:
      - per-window `utilization_pct_{7d,30d,90d,6mo,12mo}` (each omitted when
        the API returned no data for that window),
      - legacy `utilization_pct` alias (only present when at least one monthly
        record had pct>0 — v1.5.1 no longer synthesises a 0.0 fallback),
      - `utilization_fetch_daily_status` / `utilization_fetch_monthly_status`
        flags (always present) so the check plugin can distinguish
        "fetch failed" from "data is genuinely zero".
    """
    today = datetime.datetime.now(datetime.timezone.utc).date()
    since_daily = (today - datetime.timedelta(days=32)).isoformat()
    since_monthly = (today - datetime.timedelta(days=400)).isoformat()
    today_str = today.isoformat()

    base = (
        f"{AZURE_MGMT_BASE}/providers/Microsoft.Capacity/reservationOrders"
        f"/{order_id}/reservations/{res_id}"
        f"/providers/Microsoft.Consumption/reservationSummaries"
    )
    # The daily-grain endpoint requires BOTH ge and le bounds (the API
    # rejects open-ended ranges with "UsageDate should have ending range value").
    daily_url = (
        f"{base}?grain=daily&api-version={API_VERSION_CONSUMPTION}"
        f"&$filter=properties/usageDate ge {since_daily} "
        f"and properties/usageDate le {today_str}"
    )
    monthly_url = (
        f"{base}?grain=monthly&api-version={API_VERSION_CONSUMPTION}"
        f"&$filter=properties/usageDate ge {since_monthly}"
    )

    daily_payload, daily_status = _consumption_get_with_retry(session, daily_url, timeout)
    monthly_payload, monthly_status = _consumption_get_with_retry(session, monthly_url, timeout)

    daily = sorted(
        (daily_payload or {}).get("value", []),
        key=lambda s: s.get("properties", {}).get("usageDate", ""),
        reverse=True,
    )
    monthly = sorted(
        (monthly_payload or {}).get("value", []),
        key=lambda s: s.get("properties", {}).get("usageDate", ""),
        reverse=True,
    )

    result: dict = {
        "utilization_fetch_daily_status": daily_status,
        "utilization_fetch_monthly_status": monthly_status,
    }

    if not daily and not monthly:
        # Nothing usable — return only the status flags so the check plugin
        # can distinguish "fetch failed" from "data is genuinely zero".
        return result

    # Only emit window fields when the API returned data; omission signals
    # "missing data" to the check plugin (-> configurable state).
    windows = [
        ("utilization_pct_7d", _mean_pct(daily[:7])),
        ("utilization_pct_30d", _mean_pct(daily[:30])),
        ("utilization_pct_90d", _mean_pct(monthly[:3])),
        ("utilization_pct_6mo", _mean_pct(monthly[:6])),
        ("utilization_pct_12mo", _mean_pct(monthly[:12])),
    ]
    for name, value in windows:
        if value is not None:
            result[name] = value
    result["utilization_windows_have_daily"] = bool(daily)
    result["utilization_windows_have_monthly"] = bool(monthly)

    # Legacy alias: most recent non-zero monthly value (v1.1.x callers).
    # If no monthly record has pct>0 we OMIT the field entirely — never
    # synthesise 0.0, which the check plugin would treat as valid data and
    # the operator would see as a false CRIT.
    for s in monthly:
        props = s.get("properties", {})
        pct = float(props.get("avgUtilizationPercentage", 0) or 0)
        if pct > 0:
            result["utilization_pct"] = pct
            break

    return result


def normalise_reservation(res: dict, order_id: str) -> dict:
    props = res.get("properties", {})

    term_raw = props.get("term", "")
    term_human = {"P1Y": "1 Year", "P3Y": "3 Years"}.get(term_raw, term_raw)

    effective_dt = props.get("effectiveDateTime", "")
    start_month = effective_dt[:7] if effective_dt else ""
    start_date = effective_dt[:10] if effective_dt else ""

    expiry_dt = props.get("expiryDateTime", "")
    expiry_epoch = _iso_to_epoch(expiry_dt)

    effective_epoch = _iso_to_epoch(effective_dt)
    now = time.time()
    age_days = (now - effective_epoch) / 86400.0 if effective_epoch else 999.0

    util_info = props.get("utilizationInfo", {}) or {}
    util_percentages = util_info.get("utilizationPercentages", []) or []
    if util_percentages:
        total_used = sum(float(p.get("usedQuantity", 0) or 0) for p in util_percentages)
        total_reserved = sum(
            float(p.get("totalReservedQuantity", 0) or p.get("reservedQuantity", 0) or 0)
            for p in util_percentages
        )
        util_pct = (total_used / total_reserved * 100.0) if total_reserved > 0 else 0.0
    else:
        util_pct = float(util_info.get("utilizationPercentage", 0.0) or 0.0)

    applied_scopes: list[str] = props.get("appliedScopes") or []
    scope_type = props.get("appliedScopeType", "")

    # reservedResourceType is what tells us which Azure resource family this
    # reservation covers (VirtualMachines, SqlDatabases, AppService, etc.).
    reserved_resource_type = props.get("reservedResourceType", "")

    return {
        "reservation_id": res.get("name", ""),
        "order_id": order_id,
        "display_name": props.get("displayName", ""),
        "sku_name": res.get("sku", {}).get("name", ""),
        "quantity": props.get("quantity", 0),
        "term": term_raw,
        "term_human": term_human,
        "start_month": start_month,
        "start_date": start_date,
        "expiry_dt": expiry_dt,
        "expiry_epoch": expiry_epoch,
        "location": res.get("location", "") or props.get("location", ""),
        "scope_type": scope_type,
        "applied_scopes": applied_scopes,
        "reserved_resource_type": reserved_resource_type,
        "utilization_pct": util_pct,
        "used_quantity": 0.0,
        "reserved_quantity": float(props.get("quantity", 0)),
        "provisioning_state": props.get("provisioningState", ""),
        "age_days": round(age_days, 1),
    }


def render_scope(applied_scope: str, sub_name_by_id: dict[str, str]) -> str:
    """Turn one ARM scope ID into a human string for the check details."""
    if not applied_scope:
        return ""
    if "/providers/Microsoft.Management/managementGroups/" in applied_scope:
        return f"Management group: {applied_scope.rsplit('/', 1)[-1]}"
    parts = applied_scope.split("/")
    # Format: /subscriptions/<id>[/resourceGroups/<rg>]
    if len(parts) >= 3 and parts[1].lower() == "subscriptions":
        sub_id = parts[2]
        sub_name = sub_name_by_id.get(sub_id.lower(), sub_id)
        if len(parts) >= 5 and parts[3].lower() == "resourcegroups":
            return f"Resource group: {parts[4]} in subscription {sub_name}"
        return f"Subscription: {sub_name}"
    return applied_scope


# ---------------------------------------------------------------------------
# Permissions self-check (Phase A2)
# ---------------------------------------------------------------------------

def _az_cli_command(role: str, client_id: str, scope: str) -> str:
    return (
        f'az role assignment create --role "{role}" '
        f'--assignee {client_id} --scope "{scope}"'
    )


def probe_permissions(
    session: requests.Session,
    client_id: str,
    expected_subs: list[dict],
    has_explicit_filter: bool,
) -> list[dict]:
    """Probe every required RBAC role and return one record per (scope, role).

    `expected_subs` is the union of (a) subscriptions the SPN can enumerate and
    (b) subscriptions the operator listed via --subscriptions but where the SPN
    has no roles. Stubs from (b) carry `_invisible=True` so we can flag the
    per-sub probes as 'not visible to SPN'.
    """
    records: list[dict] = []

    # 0) Tenant subscription enumeration. Useful when no sub is visible AND
    # operator didn't pass --subscriptions: tells them to grant tenant-root
    # Reader or list subs explicitly. Re-probes the same /subscriptions URL
    # that main() already called — cheap, but the diagnostic is worth it.
    sub_enum_url = (
        f"{AZURE_MGMT_BASE}/subscriptions"
        f"?api-version={API_VERSION_SUBSCRIPTIONS}&$top=1"
    )
    sub_enum_status, sub_enum_detail = probe_status(
        session, sub_enum_url, PERMISSION_PROBE_TIMEOUT,
    )
    visible_count = sum(1 for s in expected_subs if not s.get("_invisible"))
    invisible_count = sum(1 for s in expected_subs if s.get("_invisible"))
    if visible_count == 0 and not has_explicit_filter:
        enum_crit = "critical"
        enum_detail = (
            "0 subscriptions visible. Grant the SPN at least 'Reader' at the "
            "tenant root (root management group, scope '/') or at each "
            "management group containing the subscriptions you want to "
            "monitor, OR pass --subscriptions explicitly via the "
            "special-agent ruleset's 'Subscriptions' field."
        )
    elif invisible_count > 0:
        enum_crit = "advisory"
        enum_detail = (
            f"{visible_count} subscription(s) visible to the SPN, "
            f"{invisible_count} listed in --subscriptions but invisible "
            "(SPN has no role on them). Per-sub probes below will show the "
            "missing roles; grant Reader at the tenant root to make them "
            "auto-discoverable."
        )
    elif not has_explicit_filter:
        enum_crit = "advisory"
        enum_detail = (
            f"{visible_count} subscription(s) auto-discovered. To probe "
            "subscriptions the SPN currently has no roles on, list them in "
            "the special-agent ruleset's 'Subscriptions' field."
        )
    else:
        enum_crit = "advisory"
        enum_detail = (
            f"{visible_count} subscription(s) visible and probed. "
            "All explicitly-listed subscriptions are reachable."
        )
    records.append({
        "scope": "/subscriptions",
        "scope_human": "tenant (subscription enumeration)",
        "role": "Reader at tenant root or management group",
        "criticality": enum_crit,
        "status": sub_enum_status,
        "detail": enum_detail,
        # Granting Reader at "/" requires Owner / User Access Administrator on
        # the root MG. We suggest it because it is the cleanest single grant
        # that unlocks tenant-wide enumeration.
        "az_cli_command": _az_cli_command("Reader", client_id, "/"),
    })

    # 1) Tenant-scope: Reservations Reader (read reservationOrders).
    url = (
        f"{AZURE_MGMT_BASE}/providers/Microsoft.Capacity/reservationOrders"
        f"?api-version={API_VERSION_RESERVATIONS}&$top=1"
    )
    status, detail = probe_status(session, url, PERMISSION_PROBE_TIMEOUT)
    records.append({
        "scope": "/providers/Microsoft.Capacity",
        "scope_human": "tenant (Capacity resource provider)",
        "role": "Reservations Reader",
        "criticality": "critical",
        "status": status,
        "detail": detail,
        # Azure rejects `--scope /providers/Microsoft.Capacity/reservationOrders`
        # when granting Reservations Reader; the role only binds at the
        # provider scope itself. The probe URL above is unrelated — it's the
        # data-plane endpoint, not the RBAC scope.
        "az_cli_command": _az_cli_command(
            "Reservations Reader", client_id,
            "/providers/Microsoft.Capacity",
        ),
    })

    # 2) Per-subscription: Reader + Cost Management Reader.
    for sub in expected_subs:
        sub_id = sub.get("subscriptionId", "")
        if not sub_id:
            continue
        sub_name = sub.get("displayName", sub_id)
        scope = f"/subscriptions/{sub_id}"
        scope_human = f"subscription '{sub_name}'"
        if sub.get("_invisible"):
            scope_human += " (not visible to SPN — listed via --subscriptions)"

        # Reader: list resource groups in the subscription. Requires
        # Microsoft.Resources/subscriptions/resourceGroups/read which is
        # granted by built-in "Reader". Returns 200 on an empty subscription.
        reader_url = (
            f"{AZURE_MGMT_BASE}{scope}/resourceGroups"
            f"?api-version=2021-04-01&$top=1"
        )
        status, detail = probe_status(session, reader_url, PERMISSION_PROBE_TIMEOUT)
        records.append({
            "scope": scope,
            "scope_human": scope_human,
            "role": "Reader",
            "criticality": "critical",
            "status": status,
            "detail": detail,
            "az_cli_command": _az_cli_command("Reader", client_id, scope),
        })

        # Cost Management Reader: read Consumption usageDetails. The
        # reservationSummaries endpoint is NOT subscription-scoped, but the
        # broader usageDetails endpoint requires the same RBAC role and is
        # the canonical probe for Consumption-API access.
        cm_url = (
            f"{AZURE_MGMT_BASE}{scope}"
            f"/providers/Microsoft.Consumption/usageDetails"
            f"?api-version=2023-05-01&$top=1"
        )
        status, detail = probe_status(session, cm_url, PERMISSION_PROBE_TIMEOUT)
        records.append({
            "scope": scope,
            "scope_human": scope_human,
            "role": "Cost Management Reader",
            "criticality": "critical",
            "status": status,
            "detail": detail,
            "az_cli_command": _az_cli_command("Cost Management Reader", client_id, scope),
        })

    return records


# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------

def _emit_sections(args: argparse.Namespace, _diag: dict) -> None:
    """Run the full Azure collection and print every section to stdout.

    This is the expensive 'live data' path. The OAuth token fetch lives here
    (not in main) so a whole-output cache hit performs zero Azure calls. On a
    hard failure (e.g. token acquisition) this raises; the caller decides
    whether to serve a stale cached blob.

    `_diag` is a mutable dict that is populated with diagnostic counters and
    error strings for the summary section.  The caller (collect_and_render)
    owns the dict and passes it up to main() so the summary section can be
    emitted OUTSIDE the cached blob.
    """
    # Diagnostics accumulators — written into _diag at the end.
    _diag_errors: list[str] = []

    token = get_token(args.tenant_id, args.client_id, args.client_secret, args.timeout)
    session = make_session(token)
    now = time.time()

    # Phase V (v1.5.1): load the hold-last-good utilization cache for this
    # host so we can replay the last successful values on transient API
    # failures. Missing / unreadable file is silently treated as empty.
    util_cache_state = _load_state(args.cache_key)

    subscriptions = collect_subscriptions(session, args.subscription_filter, args.timeout)
    if not subscriptions:
        print("WARNING: No subscriptions found or accessible.", file=sys.stderr)
        _diag_errors.append("no subscriptions found or accessible")

    # Build the union (visible) + (explicit but invisible) for the permission
    # probes only. Data collection stays restricted to `subscriptions` — there
    # is no point hammering endpoints with guaranteed 403s.
    expected_subs: list[dict] = list(subscriptions)
    if args.subscription_filter:
        visible_ids = {s["subscriptionId"].lower() for s in subscriptions}
        for sub_id in args.subscription_filter:
            if sub_id.lower() not in visible_ids:
                expected_subs.append({
                    "subscriptionId": sub_id,
                    "displayName": sub_id,
                    "_invisible": True,
                })

    # --- Subscription name lookup (for scope rendering) ---
    sub_name_by_id = {
        s["subscriptionId"].lower(): s.get("displayName", s["subscriptionId"])
        for s in subscriptions
    }

    # --- Reservations (tenant-level) ---
    all_reservations: list[dict] = []
    reservation_keys: list[str] = []  # parallel list of item keys

    seen_keys: set[str] = set()
    for order_id in collect_reservation_orders(session, args.timeout):
        for res in collect_reservations(session, order_id, args.timeout):
            norm = normalise_reservation(res, order_id)
            if (not args.no_utilization) and norm["provisioning_state"] == "Succeeded":
                windows = fetch_reservation_utilization_windows(
                    session, order_id, norm["reservation_id"], args.timeout
                )
                cached_entry = util_cache_state.get("reservations", {}).get(
                    norm["reservation_id"]
                )
                # A fetch is "successful enough" if at least one endpoint
                # returned data. We persist on success, replay on failure.
                live_fetch_ok = (
                    windows.get("utilization_fetch_daily_status") in ("ok", "retried")
                    or windows.get("utilization_fetch_monthly_status") in ("ok", "retried")
                )
                # Track non-fatal utilization fetch failures for diagnostics.
                # We count a reservation as a fetch error only when BOTH legs
                # are non-ok — a single-leg failure is less severe.
                _non_ok_statuses = {"throttled", "unavail", "timeout", "forbidden"}
                daily_bad = windows.get("utilization_fetch_daily_status") in _non_ok_statuses
                monthly_bad = windows.get("utilization_fetch_monthly_status") in _non_ok_statuses
                if daily_bad and monthly_bad:
                    _bad_daily = windows.get("utilization_fetch_daily_status", "")
                    _bad_monthly = windows.get("utilization_fetch_monthly_status", "")
                    _res_short = norm["reservation_id"][:8]
                    _diag_errors.append(
                        f"utilization fetch failed for reservation {_res_short} "
                        f"(daily={_bad_daily}, monthly={_bad_monthly})"
                    )
                has_live_value = any(
                    k in windows for k in _CACHED_WINDOW_FIELDS
                )
                if live_fetch_ok and has_live_value:
                    _update_cached_reservation(
                        util_cache_state, norm["reservation_id"], windows, now,
                    )
                    norm.update(windows)
                elif cached_entry:
                    _replay_cached_reservation(
                        norm, windows, cached_entry, now,
                        args.util_cache_warn_minutes,
                        args.util_cache_max_minutes,
                    )
                else:
                    norm.update(windows)
            # Human-readable applied scopes.
            norm["scope_human"] = [
                render_scope(s, sub_name_by_id)
                for s in (norm.get("applied_scopes") or [])
            ]
            all_reservations.append(norm)
            base_key = f"{norm['display_name']}_{norm['reservation_id'][:8]}"
            key = base_key
            suffix = 1
            while key in seen_keys:
                key = f"{base_key}_{suffix}"
                suffix += 1
            seen_keys.add(key)
            norm["_item_key"] = key
            reservation_keys.append(key)

    # --- Emit sections in order ---

    # 1) Reservations (decorated with scope_human).
    print("<<<azure_reservations:sep(124)>>>")
    for key, norm in zip(reservation_keys, all_reservations):
        print(f"{key}|{json.dumps(norm, separators=(',', ':'))}")

    # --- Permissions self-check (Phase A2) ---
    if args.check_permissions:
        print("<<<azure_reservations_permissions:sep(124)>>>")
        records = probe_permissions(
            session,
            args.client_id,
            expected_subs,
            has_explicit_filter=bool(args.subscription_filter),
        )
        for rec in records:
            # Item key is unique per (scope, role).
            key = f"{rec['scope']}::{rec['role']}"
            print(f"{key}|{json.dumps(rec, separators=(',', ':'))}")

    # Phase V: persist the hold-last-good cache. We do this at the very
    # end so a crash mid-run leaves the previous state intact, not partial.
    _save_state(args.cache_key, util_cache_state)

    # --- Populate diagnostics for the summary section ---
    _diag.update({
        "subscription_count": len(subscriptions),
        "reservation_count": len(all_reservations),
        "total_item_count": len(all_reservations),
        "error_count": len(_diag_errors),
        "errors": _diag_errors,
    })


def collect_and_render(args: argparse.Namespace) -> tuple[str, dict]:
    """Run a full live refresh and return the complete stdout blob plus diagnostics.

    All of _emit_sections' stdout is captured into a buffer (stderr WARNING/
    ERROR lines are left untouched). The blob is returned ONLY if the whole
    run completes; any exception propagates out with no partial blob, so the
    caller never caches a truncated render.

    Returns (blob, diag) where diag is a dict of diagnostic counters/errors
    populated by _emit_sections; used by main() to emit the summary section
    OUTSIDE the cached blob.
    """
    buf = io.StringIO()
    _diag: dict = {}
    with contextlib.redirect_stdout(buf):
        _emit_sections(args, _diag)
    return buf.getvalue(), _diag


def main(argv: list[str] | None = None) -> int:
    replace_passwords()
    args = parse_arguments(argv or sys.argv[1:])

    cache = _OutputCache(
        args.cache_key, _args_fingerprint(args), args.output_cache_minutes * 60
    )

    # Fast path: a fresh cached blob means zero Azure calls this cycle.
    if not args.no_cache:
        fresh = cache.read_fresh()
        if fresh is not None:
            sys.stdout.write(fresh)
            # Load persisted summary metadata from last live run.
            persisted = _load_summary(args.cache_key)
            try:
                blob_age = time.time() - os.stat(cache._path).st_mtime
            except OSError:
                blob_age = 0.0
            _emit_summary_section(
                status="cached",
                served_from_cache=True,
                data_age_seconds=max(0.0, blob_age),
                fetch_duration_seconds=float(persisted.get("fetch_duration_seconds", 0.0)),
                fetch_timestamp=int(persisted.get("fetch_timestamp", 0)),
                subscription_count=int(persisted.get("subscription_count", 0)),
                reservation_count=int(persisted.get("reservation_count", 0)),
                total_item_count=int(persisted.get("total_item_count", 0)),
                error_count=int(persisted.get("error_count", 0)),
                output_cache_minutes=args.output_cache_minutes,
                errors=list(persisted.get("errors", [])),
            )
            return 0

    # Slow path: live refresh. On a hard failure, replay the last good blob
    # (serve-stale) rather than emitting nothing — mirrors the v1.5.1
    # hold-last-good intent at the whole-output level.
    fetch_start = time.monotonic()
    fetch_ts = int(time.time())
    try:
        blob, diag = collect_and_render(args)
    except Exception as exc:  # noqa: BLE001 - last-resort resilience
        if not args.no_cache:
            stale = cache.read_any()
            if stale is not None:
                # Scrub the exception value: it may contain a Bearer token or
                # client secret embedded in a request URL. Emit only the type
                # and a truncated, sanitised message (mirrors _handle_api_error).
                _exc_msg = str(exc)[:300].replace("\n", " ").replace("|", "/")
                print(
                    f"WARNING: live refresh failed ({type(exc).__name__}: "
                    f"{_exc_msg}); serving stale cached output",
                    file=sys.stderr,
                )
                sys.stdout.write(stale)
                persisted = _load_summary(args.cache_key)
                try:
                    blob_age = time.time() - os.stat(cache._path).st_mtime
                except OSError:
                    blob_age = 0.0
                _emit_summary_section(
                    status="failed",
                    served_from_cache=True,
                    data_age_seconds=max(0.0, blob_age),
                    fetch_duration_seconds=float(persisted.get("fetch_duration_seconds", 0.0)),
                    fetch_timestamp=int(persisted.get("fetch_timestamp", 0)),
                    subscription_count=int(persisted.get("subscription_count", 0)),
                    reservation_count=int(persisted.get("reservation_count", 0)),
                    total_item_count=int(persisted.get("total_item_count", 0)),
                    error_count=int(persisted.get("error_count", 0)),
                    output_cache_minutes=args.output_cache_minutes,
                    errors=list(persisted.get("errors", [])),
                )
                return 0
        raise

    fetch_duration = time.monotonic() - fetch_start
    # Determine status: "partial" if any non-fatal error was recorded.
    live_status = "partial" if diag.get("errors") else "ok"

    # Persist summary metadata for future cache-hit runs.
    summary_meta = {
        "fetch_duration_seconds": round(fetch_duration, 3),
        "fetch_timestamp": fetch_ts,
        "subscription_count": diag.get("subscription_count", 0),
        "reservation_count": diag.get("reservation_count", 0),
        "total_item_count": diag.get("total_item_count", 0),
        "error_count": diag.get("error_count", 0),
        "errors": diag.get("errors", []),
    }
    _save_summary(args.cache_key, summary_meta)

    if not args.no_cache:
        cache.write(blob)
    sys.stdout.write(blob)
    _emit_summary_section(
        status=live_status,
        served_from_cache=False,
        data_age_seconds=0.0,
        fetch_duration_seconds=round(fetch_duration, 3),
        fetch_timestamp=fetch_ts,
        subscription_count=diag.get("subscription_count", 0),
        reservation_count=diag.get("reservation_count", 0),
        total_item_count=diag.get("total_item_count", 0),
        error_count=diag.get("error_count", 0),
        output_cache_minutes=args.output_cache_minutes,
        errors=diag.get("errors", []),
    )
    return 0


def _emit_summary_section(
    *,
    status: str,
    served_from_cache: bool,
    data_age_seconds: float,
    fetch_duration_seconds: float,
    fetch_timestamp: int,
    subscription_count: int,
    reservation_count: int,
    total_item_count: int,
    error_count: int,
    output_cache_minutes: int,
    errors: list,
) -> None:
    """Print the azure_reservations_summary section to stdout.

    Must be called OUTSIDE collect_and_render() so that the data sections
    inside the cached blob remain byte-identical on every cache hit.
    """
    rec = {
        "status": status,
        "fetch_duration_seconds": fetch_duration_seconds,
        "fetch_timestamp": fetch_timestamp,
        "served_from_cache": served_from_cache,
        "data_age_seconds": data_age_seconds,
        "subscription_count": subscription_count,
        "reservation_count": reservation_count,
        "total_item_count": total_item_count,
        "error_count": error_count,
        "output_cache_minutes": output_cache_minutes,
        "errors": errors,
    }
    print("<<<azure_reservations_summary:sep(124)>>>")
    print(f"summary|{json.dumps(rec, separators=(',', ':'))}")


if __name__ == "__main__":
    sys.exit(main())
