Source code for tools.google_oauth_tools

"""Google API tools using per-user OAuth tokens.

Provides Google Drive, Gmail, and Calendar operations via Google's REST
APIs. Requires the user to have connected their Google account via the
OAuth flow. Distinct from gcp_tools.py which uses service accounts.
"""

from __future__ import annotations

import json
import logging
import base64
from email.mime.text import MIMEText
from typing import Any, TYPE_CHECKING

import aiohttp

if TYPE_CHECKING:
    from tool_context import ToolContext

logger = logging.getLogger(__name__)

DRIVE_API = "https://www.googleapis.com/drive/v3"
GMAIL_API = "https://gmail.googleapis.com/gmail/v1"
CALENDAR_API = "https://www.googleapis.com/calendar/v3"


async def _google_request(
    method: str,
    url: str,
    token: str,
    *,
    params: dict[str, Any] | None = None,
    json_body: dict[str, Any] | None = None,
    data: bytes | None = None,
    extra_headers: dict[str, str] | None = None,
) -> dict[str, Any] | list[Any] | str:
    headers = {"Authorization": f"Bearer {token}"}
    if extra_headers:
        headers.update(extra_headers)

    async with aiohttp.ClientSession() as session:
        kwargs: dict[str, Any] = {"headers": headers, "params": params}
        if json_body is not None:
            kwargs["json"] = json_body
        if data is not None:
            kwargs["data"] = data

        async with session.request(method, url, **kwargs) as resp:
            if resp.status == 204:
                return {"status": "success", "code": 204}
            body = await resp.text()
            if resp.status >= 400:
                return {"error": f"Google API error ({resp.status})", "detail": body[:2000]}
            try:
                return json.loads(body)
            except json.JSONDecodeError:
                return body[:4000]


async def _get_token(ctx: ToolContext | None) -> str:
    if ctx is None or ctx.redis is None:
        raise RuntimeError("Context or Redis not available")
    from oauth_manager import require_oauth_token
    return await require_oauth_token(ctx, "google")


# ---------------------------------------------------------------------------
# Drive handlers
# ---------------------------------------------------------------------------

[docs] async def google_drive_list( query: str = "", folder_id: str = "", limit: int = 20, page_token: str = "", ctx: ToolContext | None = None, ) -> str: from oauth_manager import OAuthNotConnected try: token = await _get_token(ctx) except OAuthNotConnected as e: return str(e) params: dict[str, Any] = { "pageSize": min(limit, 100), "fields": "nextPageToken,files(id,name,mimeType,size,modifiedTime,webViewLink,parents)", } q_parts = [] if query: q_parts.append(query) if folder_id: q_parts.append(f"'{folder_id}' in parents") q_parts.append("trashed = false") params["q"] = " and ".join(q_parts) if page_token: params["pageToken"] = page_token data = await _google_request("GET", f"{DRIVE_API}/files", token, params=params) if isinstance(data, dict) and "error" in data: return json.dumps(data) files = [] for f in data.get("files", []) if isinstance(data, dict) else []: files.append({ "id": f.get("id"), "name": f.get("name"), "mimeType": f.get("mimeType"), "size": f.get("size"), "modifiedTime": f.get("modifiedTime"), "url": f.get("webViewLink"), }) result: dict[str, Any] = {"count": len(files), "files": files} if isinstance(data, dict) and data.get("nextPageToken"): result["next_page_token"] = data["nextPageToken"] return json.dumps(result)
[docs] async def google_drive_read( file_id: str, ctx: ToolContext | None = None, ) -> str: from oauth_manager import OAuthNotConnected try: token = await _get_token(ctx) except OAuthNotConnected as e: return str(e) meta = await _google_request( "GET", f"{DRIVE_API}/files/{file_id}", token, params={"fields": "id,name,mimeType,size,modifiedTime,webViewLink"}, ) if isinstance(meta, dict) and "error" in meta: return json.dumps(meta) mime = meta.get("mimeType", "") if isinstance(meta, dict) else "" # Google Docs/Sheets/Slides -> export as plain text export_mimes = { "application/vnd.google-apps.document": "text/plain", "application/vnd.google-apps.spreadsheet": "text/csv", "application/vnd.google-apps.presentation": "text/plain", } if mime in export_mimes: content = await _google_request( "GET", f"{DRIVE_API}/files/{file_id}/export", token, params={"mimeType": export_mimes[mime]}, ) else: content = await _google_request( "GET", f"{DRIVE_API}/files/{file_id}", token, params={"alt": "media"}, ) text = content if isinstance(content, str) else json.dumps(content) return json.dumps({ "metadata": meta if isinstance(meta, dict) else {}, "content": text[:16000], })
[docs] async def google_drive_upload( name: str, content: str, mime_type: str = "text/plain", folder_id: str = "", ctx: ToolContext | None = None, ) -> str: from oauth_manager import OAuthNotConnected try: token = await _get_token(ctx) except OAuthNotConnected as e: return str(e) metadata: dict[str, Any] = {"name": name, "mimeType": mime_type} if folder_id: metadata["parents"] = [folder_id] boundary = "stargazer_upload_boundary" body_parts = [ f"--{boundary}", "Content-Type: application/json; charset=UTF-8", "", json.dumps(metadata), f"--{boundary}", f"Content-Type: {mime_type}", "", content, f"--{boundary}--", ] body = "\r\n".join(body_parts) data = await _google_request( "POST", "https://www.googleapis.com/upload/drive/v3/files?uploadType=multipart", token, data=body.encode(), extra_headers={"Content-Type": f"multipart/related; boundary={boundary}"}, ) if isinstance(data, dict) and "error" in data: return json.dumps(data) return json.dumps({ "id": data.get("id") if isinstance(data, dict) else None, "name": data.get("name") if isinstance(data, dict) else name, "mimeType": data.get("mimeType") if isinstance(data, dict) else mime_type, "status": "uploaded", })
# --------------------------------------------------------------------------- # Gmail handlers # ---------------------------------------------------------------------------
[docs] async def google_gmail_list( query: str = "", label: str = "INBOX", limit: int = 15, page_token: str = "", ctx: ToolContext | None = None, ) -> str: from oauth_manager import OAuthNotConnected try: token = await _get_token(ctx) except OAuthNotConnected as e: return str(e) params: dict[str, Any] = {"maxResults": min(limit, 100)} if query: params["q"] = query if label: params["labelIds"] = label if page_token: params["pageToken"] = page_token data = await _google_request("GET", f"{GMAIL_API}/users/me/messages", token, params=params) if isinstance(data, dict) and "error" in data: return json.dumps(data) messages = data.get("messages", []) if isinstance(data, dict) else [] # Fetch headers for each message results = [] for msg_stub in messages[:limit]: msg_data = await _google_request( "GET", f"{GMAIL_API}/users/me/messages/{msg_stub['id']}", token, params={"format": "metadata", "metadataHeaders": "From,To,Subject,Date"}, ) if isinstance(msg_data, dict) and "error" not in msg_data: hdrs = {h["name"]: h["value"] for h in msg_data.get("payload", {}).get("headers", [])} results.append({ "id": msg_data.get("id"), "threadId": msg_data.get("threadId"), "from": hdrs.get("From", ""), "to": hdrs.get("To", ""), "subject": hdrs.get("Subject", ""), "date": hdrs.get("Date", ""), "snippet": msg_data.get("snippet", ""), "labels": msg_data.get("labelIds", []), }) result: dict[str, Any] = {"count": len(results), "messages": results} if isinstance(data, dict) and data.get("nextPageToken"): result["next_page_token"] = data["nextPageToken"] return json.dumps(result)
[docs] async def google_gmail_read( message_id: str, ctx: ToolContext | None = None, ) -> str: from oauth_manager import OAuthNotConnected try: token = await _get_token(ctx) except OAuthNotConnected as e: return str(e) data = await _google_request( "GET", f"{GMAIL_API}/users/me/messages/{message_id}", token, params={"format": "full"}, ) if isinstance(data, dict) and "error" in data: return json.dumps(data) headers = {h["name"]: h["value"] for h in data.get("payload", {}).get("headers", [])} def _extract_body(payload: dict) -> str: if payload.get("body", {}).get("data"): return base64.urlsafe_b64decode(payload["body"]["data"]).decode("utf-8", errors="replace") for part in payload.get("parts", []): if part.get("mimeType") == "text/plain" and part.get("body", {}).get("data"): return base64.urlsafe_b64decode(part["body"]["data"]).decode("utf-8", errors="replace") for part in payload.get("parts", []): body = _extract_body(part) if body: return body return "" body = _extract_body(data.get("payload", {})) return json.dumps({ "id": data.get("id"), "threadId": data.get("threadId"), "from": headers.get("From", ""), "to": headers.get("To", ""), "subject": headers.get("Subject", ""), "date": headers.get("Date", ""), "body": body[:16000], "labels": data.get("labelIds", []), })
[docs] async def google_gmail_send( to: str, subject: str, body: str, ctx: ToolContext | None = None, ) -> str: from oauth_manager import OAuthNotConnected try: token = await _get_token(ctx) except OAuthNotConnected as e: return str(e) msg = MIMEText(body) msg["to"] = to msg["subject"] = subject raw = base64.urlsafe_b64encode(msg.as_bytes()).decode() data = await _google_request( "POST", f"{GMAIL_API}/users/me/messages/send", token, json_body={"raw": raw}, ) if isinstance(data, dict) and "error" in data: return json.dumps(data) return json.dumps({ "status": "sent", "id": data.get("id") if isinstance(data, dict) else None, "threadId": data.get("threadId") if isinstance(data, dict) else None, })
# --------------------------------------------------------------------------- # Calendar handlers # ---------------------------------------------------------------------------
[docs] async def google_calendar_list_events( time_min: str = "", time_max: str = "", calendar_id: str = "primary", limit: int = 20, query: str = "", ctx: ToolContext | None = None, ) -> str: from oauth_manager import OAuthNotConnected try: token = await _get_token(ctx) except OAuthNotConnected as e: return str(e) params: dict[str, Any] = { "maxResults": min(limit, 250), "singleEvents": "true", "orderBy": "startTime", } if time_min: params["timeMin"] = time_min if time_max: params["timeMax"] = time_max if query: params["q"] = query data = await _google_request( "GET", f"{CALENDAR_API}/calendars/{calendar_id}/events", token, params=params, ) if isinstance(data, dict) and "error" in data: return json.dumps(data) events = [] for e in data.get("items", []) if isinstance(data, dict) else []: events.append({ "id": e.get("id"), "summary": e.get("summary"), "description": (e.get("description") or "")[:500], "start": e.get("start", {}).get("dateTime") or e.get("start", {}).get("date"), "end": e.get("end", {}).get("dateTime") or e.get("end", {}).get("date"), "location": e.get("location"), "status": e.get("status"), "url": e.get("htmlLink"), "attendees": [a.get("email") for a in e.get("attendees", [])[:10]], }) return json.dumps({"count": len(events), "events": events})
[docs] async def google_calendar_create_event( summary: str, start_time: str, end_time: str, description: str = "", location: str = "", calendar_id: str = "primary", attendees: list[str] | None = None, ctx: ToolContext | None = None, ) -> str: from oauth_manager import OAuthNotConnected try: token = await _get_token(ctx) except OAuthNotConnected as e: return str(e) event_body: dict[str, Any] = { "summary": summary, "start": {"dateTime": start_time}, "end": {"dateTime": end_time}, } if description: event_body["description"] = description if location: event_body["location"] = location if attendees: event_body["attendees"] = [{"email": e} for e in attendees] data = await _google_request( "POST", f"{CALENDAR_API}/calendars/{calendar_id}/events", token, json_body=event_body, ) if isinstance(data, dict) and "error" in data: return json.dumps(data) return json.dumps({ "status": "created", "id": data.get("id") if isinstance(data, dict) else None, "summary": data.get("summary") if isinstance(data, dict) else summary, "url": data.get("htmlLink") if isinstance(data, dict) else None, "start": data.get("start") if isinstance(data, dict) else start_time, "end": data.get("end") if isinstance(data, dict) else end_time, })
# --------------------------------------------------------------------------- # TOOLS registration # --------------------------------------------------------------------------- TOOLS = [ { "name": "google_drive_list", "description": "List files in the user's Google Drive. Supports search queries and folder filtering.", "parameters": { "type": "object", "properties": { "query": {"type": "string", "description": "Drive search query (e.g. \"name contains 'report'\")"}, "folder_id": {"type": "string", "description": "Filter to files within this folder ID"}, "limit": {"type": "integer", "description": "Max results (default 20, max 100)"}, "page_token": {"type": "string", "description": "Pagination token from previous response"}, }, }, "handler": google_drive_list, }, { "name": "google_drive_read", "description": "Read the contents of a file from Google Drive. Exports Google Docs/Sheets/Slides as text/CSV.", "parameters": { "type": "object", "properties": { "file_id": {"type": "string", "description": "Google Drive file ID"}, }, "required": ["file_id"], }, "handler": google_drive_read, }, { "name": "google_drive_upload", "description": "Upload/create a new file in Google Drive.", "parameters": { "type": "object", "properties": { "name": {"type": "string", "description": "File name"}, "content": {"type": "string", "description": "File content (text)"}, "mime_type": {"type": "string", "description": "MIME type (default: text/plain)"}, "folder_id": {"type": "string", "description": "Parent folder ID (optional)"}, }, "required": ["name", "content"], }, "handler": google_drive_upload, }, { "name": "google_gmail_list", "description": "List emails from the user's Gmail with optional search query and label filter.", "parameters": { "type": "object", "properties": { "query": {"type": "string", "description": "Gmail search query (same syntax as Gmail search bar)"}, "label": {"type": "string", "description": "Label to filter by (default: INBOX)"}, "limit": {"type": "integer", "description": "Max results (default 15)"}, "page_token": {"type": "string", "description": "Pagination token"}, }, }, "handler": google_gmail_list, }, { "name": "google_gmail_read", "description": "Read the full contents of a specific Gmail message by ID.", "parameters": { "type": "object", "properties": { "message_id": {"type": "string", "description": "Gmail message ID"}, }, "required": ["message_id"], }, "handler": google_gmail_read, }, { "name": "google_gmail_send", "description": "Send an email from the user's Gmail account.", "parameters": { "type": "object", "properties": { "to": {"type": "string", "description": "Recipient email address"}, "subject": {"type": "string", "description": "Email subject"}, "body": {"type": "string", "description": "Email body (plain text)"}, }, "required": ["to", "subject", "body"], }, "handler": google_gmail_send, }, { "name": "google_calendar_list_events", "description": "List upcoming events from the user's Google Calendar.", "parameters": { "type": "object", "properties": { "time_min": {"type": "string", "description": "Start of time range (ISO 8601, e.g. '2025-01-01T00:00:00Z')"}, "time_max": {"type": "string", "description": "End of time range (ISO 8601)"}, "calendar_id": {"type": "string", "description": "Calendar ID (default: 'primary')"}, "limit": {"type": "integer", "description": "Max events to return"}, "query": {"type": "string", "description": "Free text search within events"}, }, }, "handler": google_calendar_list_events, }, { "name": "google_calendar_create_event", "description": "Create a new event on the user's Google Calendar.", "parameters": { "type": "object", "properties": { "summary": {"type": "string", "description": "Event title"}, "start_time": {"type": "string", "description": "Start time (ISO 8601 with timezone, e.g. '2025-03-15T10:00:00-05:00')"}, "end_time": {"type": "string", "description": "End time (ISO 8601 with timezone)"}, "description": {"type": "string", "description": "Event description"}, "location": {"type": "string", "description": "Event location"}, "calendar_id": {"type": "string", "description": "Calendar ID (default: 'primary')"}, "attendees": {"type": "array", "items": {"type": "string"}, "description": "Email addresses of attendees"}, }, "required": ["summary", "start_time", "end_time"], }, "handler": google_calendar_create_event, }, ]