Source code for hosting_metadata

"""One-shot hosting provider and geographic location for the system prompt.

Probed asynchronously at bot startup and merged into
:class:`~prompt_renderer.PromptRenderer` ``default_extras``.
"""

from __future__ import annotations

import asyncio
import jsonutil as json
import logging
from typing import Any

import aiohttp

logger = logging.getLogger(__name__)

_IMDS_TIMEOUT = aiohttp.ClientTimeout(total=2.0, connect=1.0)
_EXTERNAL_TIMEOUT = aiohttp.ClientTimeout(total=5.0, connect=3.0)

_HOSTING_PROBE_LOCK = asyncio.Lock()
_HOSTING_APPLIED = False

_AWS_BASE = "http://169.254.169.254"


def _strip_gcp_zone(raw: str) -> str:
    """Reduce a GCP metadata path to its trailing short name.

    GCP's metadata server returns zone/region values as fully qualified
    paths such as ``projects/123456/zones/us-central1-a``; this trims the
    string and keeps only the segment after the final ``/`` so the caller
    gets a bare zone or region like ``us-central1-a``.

    Called only by :func:`_gcp_probe`, which applies it to both the zone
    and region metadata responses before assembling the region line. No
    other callers exist in the repo.

    Args:
        raw: Raw text body returned by a GCP metadata endpoint, which may
            be a slash-delimited resource path or an already-short value.

    Returns:
        str: The trailing path segment with surrounding whitespace
        removed, or the whole stripped string when there is no ``/``.
    """
    raw = raw.strip()
    if "/" in raw:
        return raw.rsplit("/", 1)[-1]
    return raw


async def _aws_probe(session: aiohttp.ClientSession) -> dict[str, str] | None:
    """Probe the AWS EC2 instance metadata service (IMDSv2) for region info.

    Performs the IMDSv2 handshake by ``PUT``-ing for a session token against
    the link-local endpoint ``169.254.169.254``, then uses that token to read
    the placement region and, best effort, the availability zone and instance
    type. Builds a human-readable provider label and region line for the
    system prompt. Returns ``None`` whenever the host is not on EC2 or any
    required request fails, so the caller can fall through to other providers.

    Issues several HTTP requests over the passed ``session`` to the EC2 IMDS
    (token, ``placement/region``, ``placement/availability-zone``,
    ``instance-type``) using the short ``_IMDS_TIMEOUT``; failures of the
    optional zone/instance-type lookups are swallowed, while token or region
    failures abort the probe. Called only by :func:`probe_hosting_metadata`,
    which runs it concurrently with the other cloud probes and prefers its
    result first; the test suite patches it via ``hosting_metadata._aws_probe``.

    Args:
        session: Shared :class:`aiohttp.ClientSession` used for all probe
            requests; reused so connections and SSL config are pooled.

    Returns:
        dict[str, str] | None: A mapping with ``provider_label`` (e.g.
        ``"Amazon Web Services (EC2), m5.large"``) and ``region_line`` (region
        plus zone when distinct) on success, or ``None`` if the host is not
        EC2 or the probe fails. Never raises; exceptions are logged at debug
        level and converted to ``None``.
    """
    try:
        async with session.put(
            f"{_AWS_BASE}/latest/api/token",
            headers={"X-aws-ec2-metadata-token-ttl-seconds": "21600"},
            timeout=_IMDS_TIMEOUT,
        ) as resp:
            if resp.status != 200:
                return None
            token = (await resp.text()).strip()
        if not token:
            return None
        hdrs = {"X-aws-ec2-metadata-token": token}

        async with session.get(
            f"{_AWS_BASE}/latest/meta-data/placement/region",
            headers=hdrs,
            timeout=_IMDS_TIMEOUT,
        ) as resp:
            if resp.status != 200:
                return None
            region = (await resp.text()).strip()

        zone = ""
        try:
            async with session.get(
                f"{_AWS_BASE}/latest/meta-data/placement/availability-zone",
                headers=hdrs,
                timeout=_IMDS_TIMEOUT,
            ) as zr:
                if zr.status == 200:
                    zone = (await zr.text()).strip()
        except Exception:
            pass

        inst = ""
        try:
            async with session.get(
                f"{_AWS_BASE}/latest/meta-data/instance-type",
                headers=hdrs,
                timeout=_IMDS_TIMEOUT,
            ) as ir:
                if ir.status == 200:
                    inst = (await ir.text()).strip()
        except Exception:
            pass

        parts = [region]
        if zone and zone != region:
            parts.append(zone)
        region_line = ", ".join(parts)
        label = "Amazon Web Services (EC2)"
        if inst:
            label = f"{label}, {inst}"
        return {"provider_label": label, "region_line": region_line}
    except Exception:
        logger.debug("AWS IMDS probe failed", exc_info=True)
        return None


async def _oci_probe(session: aiohttp.ClientSession) -> dict[str, str] | None:
    """Probe the Oracle Cloud Infrastructure instance metadata service.

    Issues a single ``GET`` to the OCI IMDS v2 instance endpoint (served at the
    same link-local ``169.254.169.254`` address as AWS, but under ``/opc/v2``)
    with the required ``Authorization: Bearer Oracle`` header, parses the JSON
    body, and derives a region/availability-domain line. Returns ``None`` when
    the host is not on OCI or the response is missing or malformed.

    Sends one HTTP request over the shared ``session`` using ``_IMDS_TIMEOUT``
    and decodes the body with the module-aliased :mod:`jsonutil` (imported as
    ``json``). Called only by :func:`probe_hosting_metadata`, which runs it
    concurrently with the AWS/GCP/Azure probes; OCI ranks second in provider
    priority. The test suite patches it via ``hosting_metadata._oci_probe``.

    Args:
        session: Shared :class:`aiohttp.ClientSession` used for the request.

    Returns:
        dict[str, str] | None: A mapping with ``provider_label`` set to
        ``"Oracle Cloud Infrastructure"`` and ``region_line`` (canonical
        region plus availability domain when present) on success, otherwise
        ``None``. Never raises; failures are logged at debug level.
    """
    try:
        async with session.get(
            f"{_AWS_BASE}/opc/v2/instance/",
            headers={"Authorization": "Bearer Oracle"},
            timeout=_IMDS_TIMEOUT,
        ) as resp:
            if resp.status != 200:
                return None
            text = await resp.text()
        data = json.loads(text)
        if not isinstance(data, dict):
            return None
        region = (data.get("canonicalRegionName") or data.get("region") or "").strip()
        ad = (data.get("availabilityDomain") or "").strip()
        parts = [p for p in (region, ad) if p]
        region_line = ", ".join(parts) if parts else region or "OCI"
        return {
            "provider_label": "Oracle Cloud Infrastructure",
            "region_line": region_line,
        }
    except Exception:
        logger.debug("OCI metadata probe failed", exc_info=True)
        return None


async def _gcp_probe(session: aiohttp.ClientSession) -> dict[str, str] | None:
    """Probe the Google Cloud Platform compute metadata server for location.

    Reads the instance ``zone`` and, best effort, the ``region`` from GCP's
    ``metadata.google.internal`` endpoints (which require the
    ``Metadata-Flavor: Google`` header), trims each from its fully qualified
    path, and assembles a deduplicated region line. Returns ``None`` when the
    host is not on GCP or the mandatory zone request fails.

    Issues up to two HTTP requests over the shared ``session`` with
    ``_IMDS_TIMEOUT`` and calls :func:`_strip_gcp_zone` on both responses to
    convert resource paths to short names; a failed/missing region lookup is
    tolerated. Called only by :func:`probe_hosting_metadata`, which runs it
    concurrently with the other cloud probes (GCP ranks third in priority).
    The test suite patches it via ``hosting_metadata._gcp_probe``.

    Args:
        session: Shared :class:`aiohttp.ClientSession` used for the requests.

    Returns:
        dict[str, str] | None: A mapping with ``provider_label`` set to
        ``"Google Cloud Platform"`` and ``region_line`` (region and zone,
        deduplicated, falling back to the zone alone) on success, otherwise
        ``None``. Never raises; failures are logged at debug level.
    """
    try:
        async with session.get(
            "http://metadata.google.internal/computeMetadata/v1/instance/zone",
            headers={"Metadata-Flavor": "Google"},
            timeout=_IMDS_TIMEOUT,
        ) as resp:
            if resp.status != 200:
                return None
            zone = _strip_gcp_zone(await resp.text())
        region = ""
        try:
            async with session.get(
                "http://metadata.google.internal/computeMetadata/v1/instance/region",
                headers={"Metadata-Flavor": "Google"},
                timeout=_IMDS_TIMEOUT,
            ) as rr:
                if rr.status == 200:
                    region = _strip_gcp_zone(await rr.text())
        except Exception:
            pass
        parts = [p for p in (region, zone) if p]
        region_line = ", ".join(dict.fromkeys(parts)) if parts else zone
        return {
            "provider_label": "Google Cloud Platform",
            "region_line": region_line,
        }
    except Exception:
        logger.debug("GCP metadata probe failed", exc_info=True)
        return None


async def _azure_probe(session: aiohttp.ClientSession) -> dict[str, str] | None:
    """Probe the Azure Instance Metadata Service for the compute location.

    Issues a single ``GET`` to the Azure IMDS ``/metadata/instance`` endpoint
    (served at ``169.254.169.254`` with the required ``Metadata: true`` header
    and a pinned ``api-version``), decodes the JSON, and reads the ``compute``
    section's ``location`` (preferred) or ``name``. Returns ``None`` when the
    host is not on Azure or the response lacks a ``compute`` object.

    Sends one HTTP request over the shared ``session`` using ``_IMDS_TIMEOUT``
    and parses the body with aiohttp's :meth:`resp.json`. Called only by
    :func:`probe_hosting_metadata`, which runs it concurrently with the other
    cloud probes; Azure ranks last in provider priority. The test suite
    patches it via ``hosting_metadata._azure_probe``.

    Args:
        session: Shared :class:`aiohttp.ClientSession` used for the request.

    Returns:
        dict[str, str] | None: A mapping with ``provider_label`` set to
        ``"Microsoft Azure"`` and ``region_line`` (the compute location, or
        the instance name, falling back to ``"Azure"``) on success, otherwise
        ``None``. Never raises; failures are logged at debug level.
    """
    try:
        url = (
            "http://169.254.169.254/metadata/instance?"
            "api-version=2021-02-01&format=json"
        )
        async with session.get(
            url,
            headers={"Metadata": "true"},
            timeout=_IMDS_TIMEOUT,
        ) as resp:
            if resp.status != 200:
                return None
            payload = await resp.json()
        compute = payload.get("compute") if isinstance(payload, dict) else None
        if not isinstance(compute, dict):
            return None
        loc = (compute.get("location") or "").strip()
        name = (compute.get("name") or "").strip()
        region_line = loc or name or "Azure"
        return {
            "provider_label": "Microsoft Azure",
            "region_line": region_line,
        }
    except Exception:
        logger.debug("Azure metadata probe failed", exc_info=True)
        return None


async def _fetch_public_ipv4(session: aiohttp.ClientSession) -> str:
    """Fetch the host's public IPv4 address from the ipify service.

    Calls the external ``https://api4.ipify.org`` endpoint and returns the
    plain-text IPv4 address it reports, used to drive a more accurate
    geolocation lookup. Returns an empty string on any failure rather than
    raising, since geolocation can still proceed without an explicit IP.

    Makes one outbound HTTPS request over the shared ``session`` using the
    longer ``_EXTERNAL_TIMEOUT`` (these endpoints are on the public internet,
    not link-local). Called only by :func:`probe_hosting_metadata`, which
    passes the result on to :func:`_geolocate_ipapi`.

    Args:
        session: Shared :class:`aiohttp.ClientSession` used for the request.

    Returns:
        str: The trimmed public IPv4 address on success, or ``""`` if the
        request fails or returns a non-200 status. Never raises; failures are
        logged at debug level.
    """
    try:
        async with session.get(
            "https://api4.ipify.org",
            timeout=_EXTERNAL_TIMEOUT,
        ) as resp:
            if resp.status == 200:
                return (await resp.text()).strip()
    except Exception:
        logger.debug("Public IPv4 fetch failed", exc_info=True)
    return ""


async def _geolocate_ipapi(
    session: aiohttp.ClientSession,
    ipv4: str = "",
) -> str:
    """Resolve an approximate city/region/country via the ipapi.co service.

    Queries ``ipapi.co`` for the geolocation of either an explicit IPv4
    address or, when none is supplied, the requester's own address, then
    formats the available city, region, and country into a single comma-
    separated string. Returns an empty string if the host is not reachable,
    the response is malformed, or ipapi.co reports an error (e.g. rate limit).

    Makes one outbound HTTPS request over the shared ``session`` using
    ``_EXTERNAL_TIMEOUT`` and parses the JSON with aiohttp's
    :meth:`resp.json`. Called only by :func:`probe_hosting_metadata`, which
    passes the IPv4 obtained from :func:`_fetch_public_ipv4` and feeds the
    result into :func:`_merge_location`.

    Args:
        session: Shared :class:`aiohttp.ClientSession` used for the request.
        ipv4: Optional public IPv4 address to geolocate; when empty, ipapi.co
            geolocates the caller's own outbound IP.

    Returns:
        str: A ``"City, Region, Country"`` style string (omitting blank
        components), or ``""`` on any failure or error response. Never raises;
        failures are logged at debug level.
    """
    try:
        path = f"{ipv4}/json/" if ipv4 else "json/"
        url = f"https://ipapi.co/{path}"
        async with session.get(url, timeout=_EXTERNAL_TIMEOUT) as resp:
            if resp.status != 200:
                return ""
            data = await resp.json()
        if not isinstance(data, dict):
            return ""
        if data.get("error"):
            return ""
        parts: list[str] = []
        city = (data.get("city") or "").strip()
        region = (data.get("region") or "").strip()
        country = (data.get("country_name") or "").strip()
        if city:
            parts.append(city)
        if region:
            parts.append(region)
        if country:
            parts.append(country)
        return ", ".join(parts)
    except Exception:
        logger.debug("ipapi.co geolocation failed", exc_info=True)
        return ""


def _merge_location(geo: str, region_line: str) -> str:
    """Combine an IP-based geolocation with a cloud region into one label.

    Produces the final ``server_location`` string shown in the system prompt
    by reconciling the two location sources: when both are present and the
    cloud region is not already contained in the geolocation text, they are
    joined with an em dash; otherwise whichever single value exists is used.

    Pure string logic with no I/O. Called only by
    :func:`probe_hosting_metadata`, which passes the geolocation from
    :func:`_geolocate_ipapi` and the cloud ``region_line`` from the winning
    provider probe.

    Args:
        geo: Human-readable geolocation (e.g. ``"Ashburn, Virginia, United
            States"``), possibly empty.
        region_line: Cloud provider region/zone line (e.g.
            ``"us-east-1, us-east-1a"``), possibly empty.

    Returns:
        str: The merged location, a single source when only one is available,
        or ``"Location unavailable"`` when both inputs are empty.
    """
    geo = geo.strip()
    region_line = region_line.strip()
    if geo and region_line:
        if region_line.lower() in geo.lower():
            return geo
        return f"{geo}{region_line}"
    if geo:
        return geo
    if region_line:
        return region_line
    return "Location unavailable"


[docs] async def probe_hosting_metadata() -> dict[str, str]: """Detect cloud provider / region and geographic location (best effort). Opens one shared :class:`aiohttp.ClientSession` and races the four cloud metadata probes (:func:`_aws_probe`, :func:`_oci_probe`, :func:`_gcp_probe`, :func:`_azure_probe`) concurrently via :func:`asyncio.gather`, picking the first provider that responds in gather order (AWS, then OCI, GCP, Azure). It then layers on an IP-based geolocation -- :func:`_fetch_public_ipv4` feeding :func:`_geolocate_ipapi` -- and reconciles the two location sources with :func:`_merge_location`. All network failures are absorbed by the individual probes, so this function itself never raises and degrades to the placeholder strings ``"Unknown"`` / ``"Location unavailable"``. Performs only outbound HTTP (link-local IMDS endpoints plus the public ipify/ipapi.co services); no Redis, KG, or filesystem access. Called by :func:`ensure_hosting_metadata_in_prompt`, and exercised directly by ``tests/test_hosting_metadata.py``. Returns: dict[str, str]: A mapping with ``hosting_provider`` (a human-readable provider label, or ``"Unknown"``) and ``server_location`` (the merged geo/region string, or ``"Location unavailable"``), suitable for the renderer's Jinja ``default_extras``. """ hosting_provider = "Unknown" server_location = "Location unavailable" cloud: dict[str, str] | None = None connector = aiohttp.TCPConnector(ssl=True) async with aiohttp.ClientSession(connector=connector) as session: aws_t = asyncio.create_task(_aws_probe(session)) oci_t = asyncio.create_task(_oci_probe(session)) gcp_t = asyncio.create_task(_gcp_probe(session)) az_t = asyncio.create_task(_azure_probe(session)) results = await asyncio.gather( aws_t, oci_t, gcp_t, az_t, return_exceptions=True, ) # Priority matches gather order: AWS, OCI, GCP, Azure. for r in results: if isinstance(r, dict) and r.get("provider_label"): cloud = r break if cloud: hosting_provider = cloud["provider_label"] region_line = cloud.get("region_line", "") else: region_line = "" ipv4 = await _fetch_public_ipv4(session) geo = await _geolocate_ipapi(session, ipv4) server_location = _merge_location(geo, region_line) logger.info( "Hosting metadata probe: provider=%r location=%r", hosting_provider, server_location, ) return { "hosting_provider": hosting_provider, "server_location": server_location, }
[docs] async def ensure_hosting_metadata_in_prompt(renderer: Any) -> None: """Run :func:`probe_hosting_metadata` once per process; update renderer extras. Probes the host's cloud provider and location a single time and folds the resulting ``hosting_provider`` / ``server_location`` values into the renderer's Jinja ``default_extras`` so the system prompt can surface where the bot is running. Guarded by the module-level :data:`_HOSTING_PROBE_LOCK` and the :data:`_HOSTING_APPLIED` flag, making it idempotent and concurrency-safe across overlapping service-startup calls; if the probe raises it is caught and placeholder extras are applied instead, so the flag is always set and the work runs at most once. Mutates ``renderer.default_extras`` in place (the only side effect besides the one-shot HTTP performed inside :func:`probe_hosting_metadata`). Intended to be awaited from a service entry point during startup; currently invoked directly only by ``tests/test_hosting_metadata.py`` (no in-repo production caller imports this module yet). Args: renderer: The prompt renderer whose ``default_extras`` dict receives the ``hosting_provider`` and ``server_location`` keys; typed loosely as ``Any`` to avoid importing :class:`prompt_renderer.PromptRenderer`. """ global _HOSTING_APPLIED async with _HOSTING_PROBE_LOCK: if _HOSTING_APPLIED: return try: extras = await probe_hosting_metadata() except Exception: logger.exception("Hosting metadata probe raised; using placeholders") extras = { "hosting_provider": "Unknown", "server_location": "Location unavailable", } renderer.default_extras.update(extras) _HOSTING_APPLIED = True
[docs] def reset_hosting_metadata_probe_for_tests() -> None: """Reset the process-level idempotency flag so the probe can rerun (tests only). Clears :data:`_HOSTING_APPLIED` to ``False`` so a subsequent :func:`ensure_hosting_metadata_in_prompt` will probe again instead of short-circuiting on the one-shot guard. Pure in-process state mutation with no I/O. Called only from ``tests/test_hosting_metadata.py`` (fixture setup) to give each test a clean starting state; not used in production. """ global _HOSTING_APPLIED _HOSTING_APPLIED = False