"""Gateway service entry point — platform ingress and egress.
The Gateway is the only service that holds live platform connections
(Discord, Matrix, WebChat, …). It runs :class:`GatewayService`, a
:class:`~core.service_base.StargazerService` that:
* Creates a :class:`~platforms.base.PlatformAdapter` per enabled platform
(via :func:`platforms.factory.create_platform`) and registers inbound
callbacks for new messages, edits, deletes, and reactions.
* On each inbound message, translates the
:class:`~platforms.base.IncomingMessage` into a validated
:class:`~core.serialization.InboundEnvelopeModel`
(:func:`_make_inbound_envelope`) and publishes it to the
``sg:stream:inbound`` Redis stream via
:meth:`~core.event_bus.RedisEventBus.publish_inbound`.
* Runs one :class:`~core.outbound_consumer.OutboundStreamConsumer` per
platform that reads ``sg:stream:outbound:{platform}`` and dispatches each
response back through the real adapter (``send`` / ``send_file`` /
``send_with_buttons`` / reactions).
* Periodically publishes a platform inventory (servers/channels) to Redis
for the web dashboard, throttled to at most every 12 hours.
Since Phase T3 (2026-06-02) the StranglerRouter has been removed: the
monolith is gone and all inbound traffic routes unconditionally over the
event bus to the Inference service.
Launched standalone (``python gateway_main.py``) by
``scripts/systemd/stargazer-gateway.service``.
"""
import asyncio
import base64
import logging
import signal
import time
import uuid
import sys
from typing import Any
from platforms.base import IncomingMessage, PlatformAdapter
from config import Config
from core.serialization import InboundEnvelopeModel
from core.service_base import StargazerService
from core.event_bus import RedisEventBus
from core.outbound_consumer import OutboundStreamConsumer
from media_cache import MediaCache
from platforms.factory import create_platform
from core.proxy_adapter import ProxyPlatformAdapter
logger = logging.getLogger("gateway")
def _make_inbound_envelope(
msg: IncomingMessage,
adapter: PlatformAdapter,
route: str,
trace_id: str | None = None,
) -> InboundEnvelopeModel:
"""Translate an IncomingMessage into a validated InboundEnvelopeModel.
This is the single canonical translation point between the platform layer
(IncomingMessage dataclass) and the wire format (InboundEnvelopeModel Pydantic
schema). Pydantic validation runs at construction time, catching field-type
mismatches before the payload reaches the Redis stream.
"""
platform = adapter.name
channel_id = msg.channel_id
extra = msg.extra or {}
serialized_attachments = [
{
"data": base64.b64encode(att.data).decode() if att.data else "",
"filename": att.filename,
"mimetype": att.mimetype,
"source_url": att.source_url,
}
for att in (msg.attachments or [])
]
return InboundEnvelopeModel(
channel_key=f"{platform}:{channel_id}",
platform=platform,
channel_id=channel_id,
user_id=msg.user_id,
username=msg.user_name,
display_name=msg.user_name,
content=msg.text,
message_id=msg.message_id,
timestamp=msg.timestamp.timestamp(),
attachments=serialized_attachments,
reply_to=msg.reply_to_id or None,
room_name=msg.channel_name or None,
is_addressed=msg.is_addressed,
is_dm=extra.get("is_dm", False),
guild_id=extra.get("guild_id"),
member_roles=extra.get("member_roles", []),
embeds=extra.get("embeds"),
user_aliases=msg.user_aliases,
unified_user_id=msg.unified_user_id,
reactions=msg.reactions,
strangler_route=route,
trace_id=trace_id or str(uuid.uuid4()),
enqueued_at=time.time(),
)
[docs]
class GatewayService(StargazerService):
[docs]
def __init__(
self,
config: Config,
redis_client: Any,
instance_id: str,
use_health_server: bool = True
):
"""Construct the Gateway service and initialise empty runtime state.
Wires the base :class:`~core.service_base.StargazerService` contract
(service name ``"gateway"``, instance id, Redis client, mandatory Redis,
optional health server) and stashes ``config`` on ``self.cfg``. All heavy
resources (event bus, media cache, adapters, outbound consumers, inventory
task) are left as ``None``/empty here and only created later in
:meth:`on_start`; ``self._stop_event`` is the :class:`asyncio.Event` that
:meth:`run` blocks on and :meth:`on_stop` sets to unblock shutdown.
Delegates base-class setup to ``StargazerService.__init__`` (which builds
the optional :class:`~core.health_server.HealthServer`). Called by
:func:`main` when launching the service standalone, and directly by the
gateway migration tests under ``tests/core/migration/`` (e.g.
``test_gateway_service.py``, ``test_strangler_removal.py``).
Args:
config: Loaded :class:`~config.Config` holding platform definitions,
media-cache settings, and ``tools_dir``.
redis_client: Async Redis client (``decode_responses=False``) shared
across the event bus, outbound consumers, and inventory cache.
instance_id: Unique id for this process (e.g. ``"gateway-ab12cd34"``),
used as the consumer name for outbound stream reads and for
service-registry registration.
use_health_server: When ``True`` (default) the base class starts an
HTTP health server; tests pass ``False`` to skip it.
"""
super().__init__(
service_name="gateway",
instance_id=instance_id,
redis_client=redis_client,
redis_required=True,
use_health_server=use_health_server
)
self.cfg = config
self.adapters = []
self.media_cache = None
self.outbound_consumers = []
self.event_bus = None
self._stop_event = asyncio.Event()
self._inventory_task = None
self._tree_sync_task = None
[docs]
async def on_start(self) -> None:
"""Build and start every Gateway dependency: event bus, adapters, consumers.
This is the Gateway's implementation of the base-class startup phase. It
constructs the :class:`~core.event_bus.RedisEventBus` and ensures the
Redis streams exist, creates the :class:`~media_cache.MediaCache`, then for
each enabled platform in ``self.cfg.platforms`` builds a
:class:`~platforms.base.PlatformAdapter` via
:func:`platforms.factory.create_platform` (registering
:meth:`_handle_inbound_message` plus the edit/delete/reaction handlers).
Crucially it starts every :class:`~core.outbound_consumer.OutboundStreamConsumer`
*before* starting the adapters, so the response path on
``sg:stream:outbound:{platform}`` exists before any inbound message can
arrive. Finally it launches the background
:meth:`_publish_platform_inventory_loop` as a named asyncio task.
Interactions: calls ``event_bus.ensure_streams``, ``create_platform``,
``consumer.start()``, and each ``adapter.start()``; populates
``self.event_bus``, ``self.media_cache``, ``self.adapters``,
``self.outbound_consumers``, and ``self._inventory_task``. Adapter
construction failures are logged and swallowed so one broken platform does
not abort the whole service. Called by
:meth:`~core.service_base.StargazerService.boot` during phase 3-7 (which
:func:`main` invokes), and exercised directly by the adapter-lifecycle and
strangler-removal migration tests.
"""
logger.info("Initializing Gateway dependencies...")
self.event_bus = RedisEventBus(
redis=self.redis,
node_role="gateway",
node_id=self.instance_id
)
await self.event_bus.ensure_streams()
self.media_cache = MediaCache(
cache_dir=self.cfg.media_cache_dir,
max_size_mb=self.cfg.media_cache_max_mb,
)
for pcfg in self.cfg.platforms:
if not pcfg.enabled:
continue
try:
adapter = create_platform(
pcfg,
self._handle_inbound_message,
self.media_cache,
message_update_handler=self._handle_inbound_update,
message_delete_handler=self._handle_inbound_delete,
config=self.cfg,
reaction_update_handler=self._handle_inbound_reaction,
)
self.adapters.append(adapter)
logger.info("Created adapter for platform: %s", pcfg.type)
except Exception:
logger.exception("Failed to create adapter for platform %s", pcfg.type)
# Wire up slash-command callbacks. Post-T3 the gateway is the only
# process with the live client, so ALL slash-command callbacks must be
# wired here (previously only _cancel_callback was, leaving
# /revoke_access, /purge_privileges and /emotions dead).
for adapter in self.adapters:
adapter._cancel_callback = self._handle_cancellation
adapter._revoke_callback = self._handle_revoke_access
adapter._purge_callback = self._handle_purge_privileges
adapter._toggle_callback = self._handle_feature_toggle
# Handle responses from the outbound stream
for adapter in self.adapters:
# We assume adapter has a .name or we fallback to pcfg.type
# Actually, create_platform returns an adapter which has a .name
platform_name = getattr(adapter, 'name', None) or adapter.__class__.__name__.lower().replace("platform", "")
consumer = OutboundStreamConsumer(
redis=self.redis,
platform=platform_name,
adapter=adapter,
consumer_name=self.instance_id,
tools_dir=self.cfg.tools_dir,
)
await consumer.start()
self.outbound_consumers.append(consumer)
# Start platform adapters AFTER outbound consumers are ready so the
# response path exists before the first inbound message can arrive.
for adapter in self.adapters:
if hasattr(adapter, 'start'):
await adapter.start()
# Start background platform inventory publisher
self._inventory_task = asyncio.create_task(
self._publish_platform_inventory_loop(),
name="platform_inventory_publisher"
)
# Safe, cooldown-guarded slash-command tree sync. Boot-time sync was
# removed because syncing on every restart trips Discord's rate-limit
# penalty box; a Redis NX cooldown means at most one sync per
# command_sync_cooldown_seconds across all gateway instances, so commands
# stay registered without the penalty-box risk.
self._tree_sync_task = asyncio.create_task(
self._auto_sync_command_trees(),
name="command_tree_autosync",
)
async def _auto_sync_command_trees(self) -> None:
"""Sync each adapter's slash-command tree once per cooldown window.
Waits for each adapter to become ready, then — if it wins a Redis NX
cooldown lock — calls ``adapter.sync_command_tree()``. Failures are logged
and never block the gateway.
"""
cooldown = int(getattr(self.cfg, "command_sync_cooldown_seconds", 21600))
for adapter in self.adapters:
name = getattr(adapter, "name", "?")
# Only adapters that actually have a slash tree need syncing.
if not hasattr(adapter, "sync_command_tree"):
continue
try:
# Wait (bounded) for the client to be ready before syncing.
for _ in range(60):
if getattr(adapter, "is_running", False):
break
await asyncio.sleep(0.5)
if self.redis is not None:
lock_key = f"sg:gateway:command_sync:{name}"
got = await self.redis.set(lock_key, "1", nx=True, ex=cooldown)
if not got:
logger.info(
"command_tree_autosync skip name=%s (synced within last %ds)",
name, cooldown,
)
continue
synced = await adapter.sync_command_tree()
logger.info("command_tree_autosync name=%s synced=%s", name, synced)
except Exception:
logger.exception("command_tree_autosync failed name=%s", name)
async def _handle_cancellation(
self, platform_name: str, channel_id: str, user_id: str, is_admin: bool
) -> str:
"""Authorize and execute a stop/cancel request raised from a slash command.
Implements the Gateway side of the ``/stop`` (and equivalent) platform
slash commands: it confirms there is an in-flight response for the channel,
checks that the requester is allowed to cancel it, and if so broadcasts a
cancellation so the worker actually aborts the turn. This is the only place
the Gateway converts a user-issued stop into a cross-service abort signal.
It reads the per-channel Redis hash ``pending_response:{platform}:{channel}``
to find the response initiator, then permits cancellation only when the
requester is the initiator or a server administrator. On success it
publishes a JSON payload (platform, channel, user) to the
``sg:channel:cancel`` Redis pub/sub channel; the WorkerCancellationDaemon in
``message_processor/processor.py`` subscribes there and aborts the matching
local turn. The returned string is surfaced back to the user as the slash
command reply. Wired up in :meth:`on_start` via
``adapter._cancel_callback = self._handle_cancellation`` and invoked from the
Discord adapter's slash handler (``platforms/discord.py``); also exercised
directly by ``tests/core/migration/test_gateway_stop_command.py``.
Args:
platform_name: Name of the platform the command originated on (e.g.
``"discord"``).
channel_id: Channel/room id whose pending response should be cancelled.
user_id: Platform user id of the person issuing the stop command.
is_admin: ``True`` when the requester has administrator privileges and
may cancel any response, not just their own.
Returns:
A human-readable status string for the slash command reply (success,
no-active-response, not-authorized, or error).
"""
logger.info("Gateway: stop slash command triggered by user_id=%s in channel_id=%s", user_id, channel_id)
import json
pending_key = f"pending_response:{platform_name}:{channel_id}"
payload = {
"platform": platform_name,
"channel_id": channel_id,
"user_id": user_id,
}
try:
initiator_bytes = await self.redis.hget(pending_key, "user_id")
if not initiator_bytes:
# No marker found. The marker is only an authorization/UX hint —
# the worker's WorkerCancellationDaemon aborts the in-flight task
# purely by its name (inference:{platform}:{channel}), which is set
# at the very start of handling. So publish a best-effort cancel
# anyway: it is a no-op if nothing is actually running, and it makes
# /stop reliable even in the brief window before the marker is
# written or if the marker write was missed. Diagnostics on miss.
try:
existing = []
async for k in self.redis.scan_iter(match="pending_response:*", count=100):
existing.append(k.decode() if isinstance(k, bytes) else k)
logger.warning(
"Gateway: no pending marker at %s; existing markers=%s; sending best-effort cancel",
pending_key, existing[:20],
)
except Exception:
logger.warning("Gateway: no pending marker at %s; sending best-effort cancel", pending_key)
await self.redis.publish("sg:channel:cancel", json.dumps(payload))
return "🛑 Stop signal sent."
initiator_id = initiator_bytes.decode() if isinstance(initiator_bytes, bytes) else initiator_bytes
logger.info("Gateway: pending response for channel_id=%s has initiator_id=%s", channel_id, initiator_id)
if is_admin or str(initiator_id) == str(user_id):
logger.info("Gateway: user_id=%s is authorized to cancel channel_id=%s (is_admin=%s)", user_id, channel_id, is_admin)
await self.redis.publish("sg:channel:cancel", json.dumps(payload))
logger.info("Gateway: published cancellation payload to 'sg:channel:cancel' for channel_id=%s", channel_id)
return "🛑 Response stopped."
else:
logger.warning("Gateway: user_id=%s is NOT authorized to cancel channel_id=%s", user_id, channel_id)
return "❌ You are not allowed to stop this response (only the message author or a server administrator can stop it)."
except Exception:
logger.exception("Gateway: error during cancellation handling for channel_id=%s", channel_id)
return "❌ An unexpected error occurred while stopping the response."
async def _handle_revoke_access(
self, target_user_id: str, caller_user_id: str, reason: str = ""
) -> str:
"""Gateway handler for the ``/revoke_access`` slash command.
Strips ``STARGAZER_USE`` from *target_user_id* at every scope. Requires the
caller to hold ``ALTER_PRIVILEGES`` (or be a configured admin) and refuses
to target a configured admin — mirroring the ``stargazer_full_revoke`` tool
wrapper, but driven from the live slash command.
"""
from tools.alter_privileges import _is_admin, _stargazer_full_revoke
if self.redis is None:
return "❌ Redis is not available."
if not await self._caller_can_alter(caller_user_id):
return "❌ You do not have the `ALTER_PRIVILEGES` privilege."
if _is_admin(target_user_id, self.cfg):
return f"❌ Refused: `{target_user_id}` is a configured admin."
try:
result = await _stargazer_full_revoke(
target_user_id,
redis=self.redis,
config=self.cfg,
caller_id=caller_user_id,
reason=reason,
)
except Exception as e:
logger.exception("Gateway: /revoke_access failed for %s", target_user_id)
return f"❌ Revoke failed: `{e}`"
if result.get("success"):
return (
f"🚫 Revoked all Stargazer access for `{target_user_id}` "
f"(cleared {result.get('guild_keys_cleared', 0)} guild + "
f"{result.get('channel_keys_cleared', 0)} channel scopes)."
)
return f"❌ Revoke failed: `{result.get('error', 'unknown error')}`"
async def _handle_purge_privileges(
self, target_user_id: str, caller_user_id: str, reason: str = ""
) -> str:
"""Gateway handler for the ``/purge_privileges`` slash command.
Zeroes the entire privilege mask for *target_user_id* at all scopes. Same
authorization as :meth:`_handle_revoke_access`.
"""
from tools.alter_privileges import _is_admin, _purge_all_privileges
if self.redis is None:
return "❌ Redis is not available."
if not await self._caller_can_alter(caller_user_id):
return "❌ You do not have the `ALTER_PRIVILEGES` privilege."
if _is_admin(target_user_id, self.cfg):
return f"❌ Refused: `{target_user_id}` is a configured admin."
try:
result = await _purge_all_privileges(
target_user_id,
redis=self.redis,
config=self.cfg,
caller_id=caller_user_id,
reason=reason,
)
except Exception as e:
logger.exception("Gateway: /purge_privileges failed for %s", target_user_id)
return f"❌ Purge failed: `{e}`"
if result.get("success"):
return f"☢️ Purged ALL privilege bits for `{target_user_id}`."
return f"❌ Purge failed: `{result.get('error', 'unknown error')}`"
async def _handle_feature_toggle(
self,
platform_name: str,
channel_id: str,
user_id: str,
is_server_admin: bool,
is_dm: bool,
feature: str,
action: str,
) -> str:
"""Gateway handler for the ``/emotions`` slash command (feature toggle).
Mirrors the worker's ``!emotions`` handling: checks the toggle permission
(bot admin / CTX_MANAGE / server admin / DM) and persists the per-channel
feature state via :func:`feature_toggles.set_disabled`.
"""
import feature_toggles
from platforms.base import IncomingMessage
if self.redis is None:
return "❌ Redis is not available."
# Reuse the worker's permission rule by building a minimal IncomingMessage.
probe = IncomingMessage(
platform=platform_name,
channel_id=channel_id,
user_id=user_id,
user_name="",
text="",
is_addressed=False,
extra={
"is_dm": bool(is_dm),
"is_server_admin": bool(is_server_admin),
},
)
if not await feature_toggles.check_toggle_permission(probe, self.cfg, self.redis):
return (
"❌ You don't have permission to toggle that "
"(requires bot admin, server/channel admin, `CTX_MANAGE`, or a DM)."
)
channel_key = f"{platform_name}:{channel_id}"
disabled = str(action).strip().lower() == "off"
try:
await feature_toggles.set_disabled(self.redis, feature, channel_key, disabled)
except Exception as e:
logger.exception("Gateway: /%s toggle failed", feature)
return f"❌ Toggle failed: `{e}`"
state_word = "disabled" if disabled else "enabled"
return f"✅ `{feature}` {state_word} for this channel."
async def _caller_can_alter(self, caller_user_id: str) -> bool:
"""True if *caller_user_id* may alter privileges (admin or ALTER_PRIVILEGES)."""
if caller_user_id in (getattr(self.cfg, "admin_user_ids", None) or []):
return True
try:
from tools.alter_privileges import PRIVILEGES, get_user_privileges
mask = await get_user_privileges(self.redis, caller_user_id, self.cfg)
return bool(mask & (1 << PRIVILEGES["ALTER_PRIVILEGES"]))
except Exception:
logger.exception("Gateway: caller_can_alter check failed for %s", caller_user_id)
return False
async def _handle_inbound_message(self, message: IncomingMessage, adapter: PlatformAdapter) -> None:
"""Translate IncomingMessage to a validated InboundEnvelopeModel and publish.
Phase T3: StranglerRouter removed. The monolith is no longer running;
all inbound messages route unconditionally to sg:stream:inbound — EXCEPT
gateway-owned commands (gateway-locus / stealth / cluster control-ops),
which are handled here and never published, so stealth ``!sb*`` stays
invisible to inference and client-bound / fleet-wide commands run where
the live connection lives.
"""
try:
from message_processor.gateway_command_handler import handle_gateway_command
if await handle_gateway_command(message, adapter, redis=self.redis, config=self.cfg):
return # handled at the gateway; do NOT publish to inference
except Exception:
logger.exception("gateway_command_handler raised; publishing as normal message")
envelope = _make_inbound_envelope(message, adapter, "microservice")
await self.event_bus.publish_inbound(envelope.model_dump())
async def _handle_inbound_update(
self, platform_name: str, channel_id: str, message_id: str, user_name: str, user_id: str, text: str, timestamp: str, reply_to_id: str
) -> None:
"""Handle a platform message-edit event (currently a no-op stub).
Registered as the ``message_update_handler`` passed to
:func:`platforms.factory.create_platform` in :meth:`on_start`, so the
underlying adapter invokes it whenever a user edits a message. The body is
a ``pass`` placeholder: serialization of edit events onto the event bus is
not yet implemented (see the TODO). No internal callers exist beyond the
adapter's dynamic dispatch.
Args:
platform_name: Name of the originating platform (e.g. ``"discord"``).
channel_id: Channel/room id the edited message belongs to.
message_id: Id of the message that was edited.
user_name: Display name of the editing user.
user_id: Platform user id of the editing user.
text: New message text after the edit.
timestamp: Edit timestamp as provided by the platform.
reply_to_id: Id of the message this one replies to, if any.
"""
pass # TODO: serialize and publish message-edit events
async def _handle_inbound_delete(
self, platform_name: str, channel_id: str, message_id: str, timestamp: str
) -> None:
"""Handle a platform message-delete event (currently a no-op stub).
Registered as the ``message_delete_handler`` passed to
:func:`platforms.factory.create_platform` in :meth:`on_start`, so the
adapter calls it when a user deletes a message. The body is a ``pass``
placeholder pending serialization of delete events onto the event bus (see
the TODO). No internal callers exist beyond the adapter's dynamic dispatch.
Args:
platform_name: Name of the originating platform (e.g. ``"discord"``).
channel_id: Channel/room id the deleted message belonged to.
message_id: Id of the message that was deleted.
timestamp: Deletion timestamp as provided by the platform.
"""
pass # TODO: serialize and publish message-delete events
async def _handle_inbound_reaction(
self, platform_name: str, channel_id: str, message_id: str, reactions_str: str
) -> None:
"""Handle a platform reaction-change event (currently a no-op stub).
Registered as the ``reaction_update_handler`` passed to
:func:`platforms.factory.create_platform` in :meth:`on_start`, so the
adapter calls it when reactions on a message change. The body is a ``pass``
placeholder pending serialization of reaction events onto the event bus
(see the TODO). No internal callers exist beyond the adapter's dynamic
dispatch.
Args:
platform_name: Name of the originating platform (e.g. ``"discord"``).
channel_id: Channel/room id the message belongs to.
message_id: Id of the message whose reactions changed.
reactions_str: Serialized representation of the message's current
reactions, as supplied by the adapter.
"""
pass # TODO: serialize and publish reaction events
[docs]
async def run(self) -> None:
"""Run the Gateway's main loop by blocking until shutdown is requested.
The Gateway has no polling work of its own in the foreground: all activity
happens in adapter callbacks and the outbound consumers started during
:meth:`on_start`. This coroutine therefore simply awaits
``self._stop_event``, which :meth:`on_stop` sets during graceful shutdown.
Called by :func:`main` immediately after :meth:`~core.service_base.StargazerService.boot`
completes.
"""
await self._stop_event.wait()
async def _publish_platform_inventory(self) -> None:
"""Snapshot every running adapter's servers/channels and cache it in Redis.
Builds the platform inventory the web dashboard consumes, walking each live
adapter and persisting the server/channel tree so the dashboard can render
available destinations without holding its own platform connections. The
work is expensive and rarely changing, so it is throttled to at most once
every 12 hours.
It enforces that throttle with the Redis key
``sg:platform:inventory:cooldown`` (a 12-hour ``SET ... EX``): if the key is
present the call returns immediately. Otherwise it sets the cooldown, then
for each adapter reporting ``is_running`` it awaits
``adapter.list_servers_and_channels()`` and writes the JSON result to
``sg:platform:inventory:{platform}`` with a 24-hour expiry (longer than the
refresh interval so cached data survives between updates). Redis and adapter
failures are logged and swallowed so one bad platform cannot abort the
sweep. Called by :meth:`_publish_platform_inventory_loop`, and directly by
``tests/core/migration/test_gateway_platform_inventory.py``.
"""
# Enforce rate limit cooldown - happen at most every 12 hours
cooldown_key = "sg:platform:inventory:cooldown"
try:
is_cooldown = await self.redis.get(cooldown_key)
if is_cooldown:
logger.info("Platform inventory publisher is in cooldown. Skipping.")
return
except Exception:
logger.exception("Failed to query platform inventory cooldown key from Redis")
try:
await self.redis.set(cooldown_key, "1", ex=12 * 3600) # 12-hour cooldown
except Exception:
logger.exception("Failed to write platform inventory cooldown key to Redis")
for adapter in self.adapters:
platform_name = getattr(adapter, 'name', None) or adapter.__class__.__name__.lower().replace("platform", "")
if getattr(adapter, "is_running", False):
try:
servers = await adapter.list_servers_and_channels()
import json
key = f"sg:platform:inventory:{platform_name}"
# Cache inventory for 24 hours to ensure it survives between 12-hour updates
await self.redis.set(key, json.dumps(servers), ex=24 * 3600)
logger.info("Published platform inventory for %s to Redis", platform_name)
except Exception:
logger.exception("Failed to publish platform inventory for %s", platform_name)
async def _publish_platform_inventory_loop(self) -> None:
"""Background task that refreshes the platform inventory every 12 hours.
The long-lived driver behind :meth:`_publish_platform_inventory`: it
publishes an inventory immediately on startup and then once every 12 hours
for the lifetime of the service, keeping the dashboard's cached server and
channel data reasonably fresh.
It loops until ``self._stop_event`` is set, calling
:meth:`_publish_platform_inventory` (which itself applies the Redis cooldown)
and sleeping 12 hours between passes. Both the inventory call and the sleep
treat :class:`asyncio.CancelledError` as a clean exit so :meth:`on_stop` can
cancel the task during shutdown; other exceptions are logged and the loop
continues. Launched as the named task ``platform_inventory_publisher`` in
:meth:`on_start` and cancelled in :meth:`on_stop`; not called directly
elsewhere.
"""
while not self._stop_event.is_set():
try:
await self._publish_platform_inventory()
except asyncio.CancelledError:
break
except Exception:
logger.exception("Error in platform inventory publisher loop")
try:
await asyncio.sleep(12 * 3600) # 12 hours
except asyncio.CancelledError:
break
[docs]
async def on_stop(self) -> None:
"""Gracefully tear down the Gateway: stop the loop, consumers, and adapters.
Sets ``self._stop_event`` (unblocking :meth:`run`), cancels and awaits the
background ``self._inventory_task`` (swallowing the resulting
:class:`asyncio.CancelledError`), stops every
:class:`~core.outbound_consumer.OutboundStreamConsumer` so no further
outbound responses are dispatched, and stops each platform adapter that
exposes a ``stop`` method to close live connections. Called by
:meth:`~core.service_base.StargazerService.shutdown` (which the SIGINT/
SIGTERM handler in :func:`main` triggers via ``service.shutdown()``), and
exercised by the gateway adapter-lifecycle migration tests.
"""
logger.info("Shutting down GatewayService...")
self._stop_event.set()
if self._inventory_task:
self._inventory_task.cancel()
try:
await self._inventory_task
except asyncio.CancelledError:
pass
_tree_sync = getattr(self, "_tree_sync_task", None)
if _tree_sync:
_tree_sync.cancel()
try:
await _tree_sync
except asyncio.CancelledError:
pass
for consumer in self.outbound_consumers:
await consumer.stop()
for adapter in self.adapters:
if hasattr(adapter, 'stop'):
await adapter.stop()
[docs]
async def main() -> None:
"""Configure logging, build the Gateway service, and run it until signalled.
Process entry point for the standalone Gateway. It sets up basic logging,
loads :class:`~config.Config` via ``Config.load()``, mints a unique
``instance_id``, builds a binary (``decode_responses=False``) async Redis
client, and constructs :class:`GatewayService`. It installs SIGINT/SIGTERM
handlers that schedule ``service.shutdown()``, then drives the lifecycle with
``service.boot()`` followed by ``service.run()`` (which blocks until shutdown).
The ``finally`` block always closes the Redis client via ``aclose()``.
Invoked by the ``__main__`` guard at the bottom of this module through
``asyncio.run(main())`` when launched by
``scripts/systemd/stargazer-gateway.service``. No internal Python callers.
"""
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s"
)
cfg = Config.load()
instance_id = f"gateway-{uuid.uuid4().hex[:8]}"
redis_client = cfg.build_async_redis_client(decode_responses=False)
service = GatewayService(cfg, redis_client, instance_id)
loop = asyncio.get_running_loop()
for sig in (signal.SIGINT, signal.SIGTERM):
loop.add_signal_handler(sig, lambda: asyncio.create_task(service.shutdown()))
try:
await service.boot()
await service.run()
finally:
if redis_client:
await redis_client.aclose()
if __name__ == "__main__":
asyncio.run(main())