#!/usr/bin/env python3
# Copyright (C) 2026 Benjamin Knapp
# SPDX-License-Identifier: GPL-2.0-only
"""Generic JSON API special agent.

Fetches a JSON document over HTTP(S), extracts the configured fields by path,
and prints a 'json_api' section. One JSON document per run, on a single line.

The section payload looks like:

    {"url": "...", "error": null,
     "results": [{"service": "...", "path": "...", "found": true,
                  "value": "UP", "error": null,
                  "levels_upper": ["fixed", [80, 90]], "levels_lower": null,
                  "expected": "UP|ok"}, ...]}
"""

import argparse
import json
import re
import sys
from collections.abc import Sequence
from pathlib import Path

import requests
from cmk.utils import password_store as _legacy_pwstore

try:
    # Checkmk 2.5+: public convenience API for secret options.
    from cmk.password_store.v1_unstable import parser_add_secret_option, resolve_secret_option

    _HAVE_PWSTORE_V1 = True
except ImportError:
    # Checkmk 2.4: fall back to the internal password store (same on-disk format).
    _HAVE_PWSTORE_V1 = False

_PATH_TOKEN = re.compile(r"([^.\[\]]+)|\[(\d+)\]")


def _add_secret_option(parser: argparse.ArgumentParser, name: str, help_text: str) -> None:
    """Register a required secret option, adapting to the available API.

    Both paths end up reading the same server-side-call rendering: a bare
    ``Secret`` becomes ``--<name>-id "<id>:<password_store_file>"``.
    """
    if _HAVE_PWSTORE_V1:
        parser_add_secret_option(parser, long=f"--{name}", help=help_text, required=True)
    else:
        parser.add_argument(f"--{name}-id", required=True, help=help_text)


def _reveal_secret(args: argparse.Namespace, name: str) -> str:
    """Resolve a secret from the parsed args, across Checkmk 2.4 and 2.5+."""
    if _HAVE_PWSTORE_V1:
        return resolve_secret_option(args, name).reveal()
    secret_id, store_file = getattr(args, f"{name}_id").split(":", 1)
    return _legacy_pwstore.lookup(Path(store_file), secret_id)


def _resolve_path(data: object, path: str) -> tuple[bool, object]:
    """Resolve a dotted path with optional [index] segments.

    Returns (found, value). Supports e.g. 'a.b', 'a[0].b', leading '$.' is
    stripped. Array wildcards ('[*]') are intentionally not supported yet.
    """
    current = data
    cleaned = path.strip()
    if cleaned.startswith("$."):
        cleaned = cleaned[2:]
    elif cleaned.startswith("$"):
        cleaned = cleaned[1:]
    for key, index in _PATH_TOKEN.findall(cleaned):
        if key:
            if not isinstance(current, dict) or key not in current:
                return False, None
            current = current[key]
        else:
            i = int(index)
            if not isinstance(current, list) or i >= len(current):
                return False, None
            current = current[i]
    return True, current


def parse_arguments(argv: Sequence[str]) -> argparse.Namespace:
    parser = argparse.ArgumentParser(description=__doc__)
    parser.add_argument("--url", required=True)
    parser.add_argument("--method", default="GET", choices=["GET", "POST"])
    parser.add_argument("--body", default=None)
    parser.add_argument(
        "--header",
        action="append",
        default=[],
        metavar="NAME:VALUE",
        help="Additional request header, repeatable.",
    )
    parser.add_argument("--no-cert-check", action="store_true")
    parser.add_argument("--timeout", type=float, default=30.0)
    parser.add_argument(
        "--extractions",
        required=True,
        help="JSON list of {path, service, levels_upper, levels_lower, expected}.",
    )

    sub = parser.add_subparsers(dest="auth")
    login = sub.add_parser("auth_login")
    login.add_argument("--username", required=True)
    _add_secret_option(login, "password", "API password")
    token = sub.add_parser("auth_token")
    _add_secret_option(token, "token", "API bearer token")
    return parser.parse_args(argv)


def _build_session(args: argparse.Namespace) -> tuple[requests.Session, dict[str, str]]:
    session = requests.Session()
    headers: dict[str, str] = {}
    for raw in args.header:
        name, _, value = raw.partition(":")
        headers[name.strip()] = value.strip()
    match args.auth:
        case "auth_login":
            session.auth = (args.username, _reveal_secret(args, "password"))
        case "auth_token":
            headers["Authorization"] = "Bearer " + _reveal_secret(args, "token")
    if args.body is not None and not any(h.lower() == "content-type" for h in headers):
        headers["Content-Type"] = "application/json"
    return session, headers


def _fetch(args: argparse.Namespace) -> tuple[object | None, str | None]:
    session, headers = _build_session(args)
    try:
        response = session.request(
            args.method,
            args.url,
            data=args.body,
            headers=headers,
            verify=not args.no_cert_check,
            timeout=args.timeout,
        )
        response.raise_for_status()
        return response.json(), None
    except requests.exceptions.RequestException as exc:
        return None, f"Request failed: {exc}"
    except ValueError as exc:
        return None, f"Response is not valid JSON: {exc}"


_WILDCARD = "[*]"


def _split_wildcard(path: str) -> tuple[str, str] | None:
    """Split a path on a single '[*]' wildcard into (before, after).

    'after' is the per-element remainder with a leading '.' stripped. Returns
    None when there is no wildcard. Only the first '[*]' is honoured.
    """
    index = path.find(_WILDCARD)
    if index == -1:
        return None
    before = path[:index]
    after = path[index + len(_WILDCARD) :]
    if after.startswith("."):
        after = after[1:]
    return before, after


def _result(
    spec: dict, service: str, found: bool, value: object, error: str
) -> dict:
    # Values that are not JSON scalars (dict/list) are reported as strings.
    if found and isinstance(value, (dict, list)):
        value = json.dumps(value)
    return {
        "service": service,
        "path": spec["path"],
        "found": found,
        "value": value if found else None,
        "error": None if found else error,
        "levels_upper": spec.get("levels_upper"),
        "levels_lower": spec.get("levels_lower"),
        "expected": spec.get("expected"),
    }


def _extract(document: object, extractions: list[dict]) -> list[dict]:
    results = []
    for spec in extractions:
        split = _split_wildcard(spec["path"])
        if split is None:
            found, value = _resolve_path(document, spec["path"])
            results.append(
                _result(spec, spec["service"], found, value, "path not found in response")
            )
            continue

        # Array wildcard: one service per element, labelled by label_path or index.
        before, after = split
        if _WILDCARD in after:
            results.append(
                _result(spec, spec["service"], False, None, "nested [*] wildcards are not supported")
            )
            continue
        found_array, array = _resolve_path(document, before)
        if not found_array or not isinstance(array, list):
            results.append(
                _result(spec, spec["service"], False, None, "array not found at wildcard path")
            )
            continue

        for label, element in zip(_element_labels(array, spec.get("label_path")), array):
            if after:
                found, value = _resolve_path(element, after)
            else:
                found, value = True, element
            results.append(
                _result(spec, f"{spec['service']} {label}", found, value, "path not found in element")
            )
    return results


def _element_labels(array: list, label_path: str | None) -> list[str]:
    """One label per array element, guaranteed unique.

    The label comes from label_path within each element (falling back to the
    index). If a label value repeats across elements, every occurrence of it is
    suffixed with its index, so two elements can never collapse into one
    service.
    """
    labels = []
    for index, element in enumerate(array):
        if label_path:
            found, value = _resolve_path(element, label_path)
            labels.append(str(value) if found else str(index))
        else:
            labels.append(str(index))
    counts: dict[str, int] = {}
    for label in labels:
        counts[label] = counts.get(label, 0) + 1
    return [
        f"{label} [{index}]" if counts[label] > 1 else label
        for index, label in enumerate(labels)
    ]


def main(argv: Sequence[str] | None = None) -> int:
    args = parse_arguments(sys.argv[1:] if argv is None else argv)
    extractions = json.loads(args.extractions)

    document, error = _fetch(args)
    if error is not None:
        # Fail the data source so Checkmk reports one CRIT on the "Check_MK"
        # service and the JSON services go stale - instead of turning every
        # configured service CRIT at once.
        sys.stderr.write(error + "\n")
        return 1

    payload = {"url": args.url, "results": _extract(document, extractions)}
    sys.stdout.write("<<<json_api:sep(0)>>>\n")
    sys.stdout.write(json.dumps(payload) + "\n")
    return 0


if __name__ == "__main__":
    sys.exit(main())
