"""One-shot hosting provider and geographic location for the system prompt.
Probed asynchronously at bot startup and merged into
:class:`~prompt_renderer.PromptRenderer` ``default_extras``.
"""
from __future__ import annotations
import asyncio
import jsonutil as json
import logging
from typing import Any
import aiohttp
logger = logging.getLogger(__name__)
_IMDS_TIMEOUT = aiohttp.ClientTimeout(total=2.0, connect=1.0)
_EXTERNAL_TIMEOUT = aiohttp.ClientTimeout(total=5.0, connect=3.0)
_HOSTING_PROBE_LOCK = asyncio.Lock()
_HOSTING_APPLIED = False
_AWS_BASE = "http://169.254.169.254"
def _strip_gcp_zone(raw: str) -> str:
"""Reduce a GCP metadata path to its trailing short name.
GCP's metadata server returns zone/region values as fully qualified
paths such as ``projects/123456/zones/us-central1-a``; this trims the
string and keeps only the segment after the final ``/`` so the caller
gets a bare zone or region like ``us-central1-a``.
Called only by :func:`_gcp_probe`, which applies it to both the zone
and region metadata responses before assembling the region line. No
other callers exist in the repo.
Args:
raw: Raw text body returned by a GCP metadata endpoint, which may
be a slash-delimited resource path or an already-short value.
Returns:
str: The trailing path segment with surrounding whitespace
removed, or the whole stripped string when there is no ``/``.
"""
raw = raw.strip()
if "/" in raw:
return raw.rsplit("/", 1)[-1]
return raw
async def _aws_probe(session: aiohttp.ClientSession) -> dict[str, str] | None:
"""Probe the AWS EC2 instance metadata service (IMDSv2) for region info.
Performs the IMDSv2 handshake by ``PUT``-ing for a session token against
the link-local endpoint ``169.254.169.254``, then uses that token to read
the placement region and, best effort, the availability zone and instance
type. Builds a human-readable provider label and region line for the
system prompt. Returns ``None`` whenever the host is not on EC2 or any
required request fails, so the caller can fall through to other providers.
Issues several HTTP requests over the passed ``session`` to the EC2 IMDS
(token, ``placement/region``, ``placement/availability-zone``,
``instance-type``) using the short ``_IMDS_TIMEOUT``; failures of the
optional zone/instance-type lookups are swallowed, while token or region
failures abort the probe. Called only by :func:`probe_hosting_metadata`,
which runs it concurrently with the other cloud probes and prefers its
result first; the test suite patches it via ``hosting_metadata._aws_probe``.
Args:
session: Shared :class:`aiohttp.ClientSession` used for all probe
requests; reused so connections and SSL config are pooled.
Returns:
dict[str, str] | None: A mapping with ``provider_label`` (e.g.
``"Amazon Web Services (EC2), m5.large"``) and ``region_line`` (region
plus zone when distinct) on success, or ``None`` if the host is not
EC2 or the probe fails. Never raises; exceptions are logged at debug
level and converted to ``None``.
"""
try:
async with session.put(
f"{_AWS_BASE}/latest/api/token",
headers={"X-aws-ec2-metadata-token-ttl-seconds": "21600"},
timeout=_IMDS_TIMEOUT,
) as resp:
if resp.status != 200:
return None
token = (await resp.text()).strip()
if not token:
return None
hdrs = {"X-aws-ec2-metadata-token": token}
async with session.get(
f"{_AWS_BASE}/latest/meta-data/placement/region",
headers=hdrs,
timeout=_IMDS_TIMEOUT,
) as resp:
if resp.status != 200:
return None
region = (await resp.text()).strip()
zone = ""
try:
async with session.get(
f"{_AWS_BASE}/latest/meta-data/placement/availability-zone",
headers=hdrs,
timeout=_IMDS_TIMEOUT,
) as zr:
if zr.status == 200:
zone = (await zr.text()).strip()
except Exception:
pass
inst = ""
try:
async with session.get(
f"{_AWS_BASE}/latest/meta-data/instance-type",
headers=hdrs,
timeout=_IMDS_TIMEOUT,
) as ir:
if ir.status == 200:
inst = (await ir.text()).strip()
except Exception:
pass
parts = [region]
if zone and zone != region:
parts.append(zone)
region_line = ", ".join(parts)
label = "Amazon Web Services (EC2)"
if inst:
label = f"{label}, {inst}"
return {"provider_label": label, "region_line": region_line}
except Exception:
logger.debug("AWS IMDS probe failed", exc_info=True)
return None
async def _oci_probe(session: aiohttp.ClientSession) -> dict[str, str] | None:
"""Probe the Oracle Cloud Infrastructure instance metadata service.
Issues a single ``GET`` to the OCI IMDS v2 instance endpoint (served at the
same link-local ``169.254.169.254`` address as AWS, but under ``/opc/v2``)
with the required ``Authorization: Bearer Oracle`` header, parses the JSON
body, and derives a region/availability-domain line. Returns ``None`` when
the host is not on OCI or the response is missing or malformed.
Sends one HTTP request over the shared ``session`` using ``_IMDS_TIMEOUT``
and decodes the body with the module-aliased :mod:`jsonutil` (imported as
``json``). Called only by :func:`probe_hosting_metadata`, which runs it
concurrently with the AWS/GCP/Azure probes; OCI ranks second in provider
priority. The test suite patches it via ``hosting_metadata._oci_probe``.
Args:
session: Shared :class:`aiohttp.ClientSession` used for the request.
Returns:
dict[str, str] | None: A mapping with ``provider_label`` set to
``"Oracle Cloud Infrastructure"`` and ``region_line`` (canonical
region plus availability domain when present) on success, otherwise
``None``. Never raises; failures are logged at debug level.
"""
try:
async with session.get(
f"{_AWS_BASE}/opc/v2/instance/",
headers={"Authorization": "Bearer Oracle"},
timeout=_IMDS_TIMEOUT,
) as resp:
if resp.status != 200:
return None
text = await resp.text()
data = json.loads(text)
if not isinstance(data, dict):
return None
region = (data.get("canonicalRegionName") or data.get("region") or "").strip()
ad = (data.get("availabilityDomain") or "").strip()
parts = [p for p in (region, ad) if p]
region_line = ", ".join(parts) if parts else region or "OCI"
return {
"provider_label": "Oracle Cloud Infrastructure",
"region_line": region_line,
}
except Exception:
logger.debug("OCI metadata probe failed", exc_info=True)
return None
async def _gcp_probe(session: aiohttp.ClientSession) -> dict[str, str] | None:
"""Probe the Google Cloud Platform compute metadata server for location.
Reads the instance ``zone`` and, best effort, the ``region`` from GCP's
``metadata.google.internal`` endpoints (which require the
``Metadata-Flavor: Google`` header), trims each from its fully qualified
path, and assembles a deduplicated region line. Returns ``None`` when the
host is not on GCP or the mandatory zone request fails.
Issues up to two HTTP requests over the shared ``session`` with
``_IMDS_TIMEOUT`` and calls :func:`_strip_gcp_zone` on both responses to
convert resource paths to short names; a failed/missing region lookup is
tolerated. Called only by :func:`probe_hosting_metadata`, which runs it
concurrently with the other cloud probes (GCP ranks third in priority).
The test suite patches it via ``hosting_metadata._gcp_probe``.
Args:
session: Shared :class:`aiohttp.ClientSession` used for the requests.
Returns:
dict[str, str] | None: A mapping with ``provider_label`` set to
``"Google Cloud Platform"`` and ``region_line`` (region and zone,
deduplicated, falling back to the zone alone) on success, otherwise
``None``. Never raises; failures are logged at debug level.
"""
try:
async with session.get(
"http://metadata.google.internal/computeMetadata/v1/instance/zone",
headers={"Metadata-Flavor": "Google"},
timeout=_IMDS_TIMEOUT,
) as resp:
if resp.status != 200:
return None
zone = _strip_gcp_zone(await resp.text())
region = ""
try:
async with session.get(
"http://metadata.google.internal/computeMetadata/v1/instance/region",
headers={"Metadata-Flavor": "Google"},
timeout=_IMDS_TIMEOUT,
) as rr:
if rr.status == 200:
region = _strip_gcp_zone(await rr.text())
except Exception:
pass
parts = [p for p in (region, zone) if p]
region_line = ", ".join(dict.fromkeys(parts)) if parts else zone
return {
"provider_label": "Google Cloud Platform",
"region_line": region_line,
}
except Exception:
logger.debug("GCP metadata probe failed", exc_info=True)
return None
async def _azure_probe(session: aiohttp.ClientSession) -> dict[str, str] | None:
"""Probe the Azure Instance Metadata Service for the compute location.
Issues a single ``GET`` to the Azure IMDS ``/metadata/instance`` endpoint
(served at ``169.254.169.254`` with the required ``Metadata: true`` header
and a pinned ``api-version``), decodes the JSON, and reads the ``compute``
section's ``location`` (preferred) or ``name``. Returns ``None`` when the
host is not on Azure or the response lacks a ``compute`` object.
Sends one HTTP request over the shared ``session`` using ``_IMDS_TIMEOUT``
and parses the body with aiohttp's :meth:`resp.json`. Called only by
:func:`probe_hosting_metadata`, which runs it concurrently with the other
cloud probes; Azure ranks last in provider priority. The test suite
patches it via ``hosting_metadata._azure_probe``.
Args:
session: Shared :class:`aiohttp.ClientSession` used for the request.
Returns:
dict[str, str] | None: A mapping with ``provider_label`` set to
``"Microsoft Azure"`` and ``region_line`` (the compute location, or
the instance name, falling back to ``"Azure"``) on success, otherwise
``None``. Never raises; failures are logged at debug level.
"""
try:
url = (
"http://169.254.169.254/metadata/instance?"
"api-version=2021-02-01&format=json"
)
async with session.get(
url,
headers={"Metadata": "true"},
timeout=_IMDS_TIMEOUT,
) as resp:
if resp.status != 200:
return None
payload = await resp.json()
compute = payload.get("compute") if isinstance(payload, dict) else None
if not isinstance(compute, dict):
return None
loc = (compute.get("location") or "").strip()
name = (compute.get("name") or "").strip()
region_line = loc or name or "Azure"
return {
"provider_label": "Microsoft Azure",
"region_line": region_line,
}
except Exception:
logger.debug("Azure metadata probe failed", exc_info=True)
return None
async def _fetch_public_ipv4(session: aiohttp.ClientSession) -> str:
"""Fetch the host's public IPv4 address from the ipify service.
Calls the external ``https://api4.ipify.org`` endpoint and returns the
plain-text IPv4 address it reports, used to drive a more accurate
geolocation lookup. Returns an empty string on any failure rather than
raising, since geolocation can still proceed without an explicit IP.
Makes one outbound HTTPS request over the shared ``session`` using the
longer ``_EXTERNAL_TIMEOUT`` (these endpoints are on the public internet,
not link-local). Called only by :func:`probe_hosting_metadata`, which
passes the result on to :func:`_geolocate_ipapi`.
Args:
session: Shared :class:`aiohttp.ClientSession` used for the request.
Returns:
str: The trimmed public IPv4 address on success, or ``""`` if the
request fails or returns a non-200 status. Never raises; failures are
logged at debug level.
"""
try:
async with session.get(
"https://api4.ipify.org",
timeout=_EXTERNAL_TIMEOUT,
) as resp:
if resp.status == 200:
return (await resp.text()).strip()
except Exception:
logger.debug("Public IPv4 fetch failed", exc_info=True)
return ""
async def _geolocate_ipapi(
session: aiohttp.ClientSession,
ipv4: str = "",
) -> str:
"""Resolve an approximate city/region/country via the ipapi.co service.
Queries ``ipapi.co`` for the geolocation of either an explicit IPv4
address or, when none is supplied, the requester's own address, then
formats the available city, region, and country into a single comma-
separated string. Returns an empty string if the host is not reachable,
the response is malformed, or ipapi.co reports an error (e.g. rate limit).
Makes one outbound HTTPS request over the shared ``session`` using
``_EXTERNAL_TIMEOUT`` and parses the JSON with aiohttp's
:meth:`resp.json`. Called only by :func:`probe_hosting_metadata`, which
passes the IPv4 obtained from :func:`_fetch_public_ipv4` and feeds the
result into :func:`_merge_location`.
Args:
session: Shared :class:`aiohttp.ClientSession` used for the request.
ipv4: Optional public IPv4 address to geolocate; when empty, ipapi.co
geolocates the caller's own outbound IP.
Returns:
str: A ``"City, Region, Country"`` style string (omitting blank
components), or ``""`` on any failure or error response. Never raises;
failures are logged at debug level.
"""
try:
path = f"{ipv4}/json/" if ipv4 else "json/"
url = f"https://ipapi.co/{path}"
async with session.get(url, timeout=_EXTERNAL_TIMEOUT) as resp:
if resp.status != 200:
return ""
data = await resp.json()
if not isinstance(data, dict):
return ""
if data.get("error"):
return ""
parts: list[str] = []
city = (data.get("city") or "").strip()
region = (data.get("region") or "").strip()
country = (data.get("country_name") or "").strip()
if city:
parts.append(city)
if region:
parts.append(region)
if country:
parts.append(country)
return ", ".join(parts)
except Exception:
logger.debug("ipapi.co geolocation failed", exc_info=True)
return ""
def _merge_location(geo: str, region_line: str) -> str:
"""Combine an IP-based geolocation with a cloud region into one label.
Produces the final ``server_location`` string shown in the system prompt
by reconciling the two location sources: when both are present and the
cloud region is not already contained in the geolocation text, they are
joined with an em dash; otherwise whichever single value exists is used.
Pure string logic with no I/O. Called only by
:func:`probe_hosting_metadata`, which passes the geolocation from
:func:`_geolocate_ipapi` and the cloud ``region_line`` from the winning
provider probe.
Args:
geo: Human-readable geolocation (e.g. ``"Ashburn, Virginia, United
States"``), possibly empty.
region_line: Cloud provider region/zone line (e.g.
``"us-east-1, us-east-1a"``), possibly empty.
Returns:
str: The merged location, a single source when only one is available,
or ``"Location unavailable"`` when both inputs are empty.
"""
geo = geo.strip()
region_line = region_line.strip()
if geo and region_line:
if region_line.lower() in geo.lower():
return geo
return f"{geo} — {region_line}"
if geo:
return geo
if region_line:
return region_line
return "Location unavailable"