"""Google API tools using per-user OAuth tokens.
Provides Google Drive, Gmail, and Calendar operations via Google's REST
APIs. Requires the user to have connected their Google account via the
OAuth flow. Distinct from gcp_tools.py which uses service accounts.
"""
from __future__ import annotations
import json
import logging
import base64
from email.mime.text import MIMEText
from typing import Any, TYPE_CHECKING
import aiohttp
if TYPE_CHECKING:
from tool_context import ToolContext
logger = logging.getLogger(__name__)
DRIVE_API = "https://www.googleapis.com/drive/v3"
GMAIL_API = "https://gmail.googleapis.com/gmail/v1"
CALENDAR_API = "https://www.googleapis.com/calendar/v3"
async def _google_request(
method: str,
url: str,
token: str,
*,
params: dict[str, Any] | None = None,
json_body: dict[str, Any] | None = None,
data: bytes | None = None,
extra_headers: dict[str, str] | None = None,
) -> dict[str, Any] | list[Any] | str:
headers = {"Authorization": f"Bearer {token}"}
if extra_headers:
headers.update(extra_headers)
async with aiohttp.ClientSession() as session:
kwargs: dict[str, Any] = {"headers": headers, "params": params}
if json_body is not None:
kwargs["json"] = json_body
if data is not None:
kwargs["data"] = data
async with session.request(method, url, **kwargs) as resp:
if resp.status == 204:
return {"status": "success", "code": 204}
body = await resp.text()
if resp.status >= 400:
return {"error": f"Google API error ({resp.status})", "detail": body[:2000]}
try:
return json.loads(body)
except json.JSONDecodeError:
return body[:4000]
async def _get_token(ctx: ToolContext | None) -> str:
if ctx is None or ctx.redis is None:
raise RuntimeError("Context or Redis not available")
from oauth_manager import require_oauth_token
return await require_oauth_token(ctx, "google")
# ---------------------------------------------------------------------------
# Drive handlers
# ---------------------------------------------------------------------------
[docs]
async def google_drive_list(
query: str = "",
folder_id: str = "",
limit: int = 20,
page_token: str = "",
ctx: ToolContext | None = None,
) -> str:
from oauth_manager import OAuthNotConnected
try:
token = await _get_token(ctx)
except OAuthNotConnected as e:
return str(e)
params: dict[str, Any] = {
"pageSize": min(limit, 100),
"fields": "nextPageToken,files(id,name,mimeType,size,modifiedTime,webViewLink,parents)",
}
q_parts = []
if query:
q_parts.append(query)
if folder_id:
q_parts.append(f"'{folder_id}' in parents")
q_parts.append("trashed = false")
params["q"] = " and ".join(q_parts)
if page_token:
params["pageToken"] = page_token
data = await _google_request("GET", f"{DRIVE_API}/files", token, params=params)
if isinstance(data, dict) and "error" in data:
return json.dumps(data)
files = []
for f in data.get("files", []) if isinstance(data, dict) else []:
files.append({
"id": f.get("id"),
"name": f.get("name"),
"mimeType": f.get("mimeType"),
"size": f.get("size"),
"modifiedTime": f.get("modifiedTime"),
"url": f.get("webViewLink"),
})
result: dict[str, Any] = {"count": len(files), "files": files}
if isinstance(data, dict) and data.get("nextPageToken"):
result["next_page_token"] = data["nextPageToken"]
return json.dumps(result)
[docs]
async def google_drive_read(
file_id: str,
ctx: ToolContext | None = None,
) -> str:
from oauth_manager import OAuthNotConnected
try:
token = await _get_token(ctx)
except OAuthNotConnected as e:
return str(e)
meta = await _google_request(
"GET", f"{DRIVE_API}/files/{file_id}",
token,
params={"fields": "id,name,mimeType,size,modifiedTime,webViewLink"},
)
if isinstance(meta, dict) and "error" in meta:
return json.dumps(meta)
mime = meta.get("mimeType", "") if isinstance(meta, dict) else ""
# Google Docs/Sheets/Slides -> export as plain text
export_mimes = {
"application/vnd.google-apps.document": "text/plain",
"application/vnd.google-apps.spreadsheet": "text/csv",
"application/vnd.google-apps.presentation": "text/plain",
}
if mime in export_mimes:
content = await _google_request(
"GET", f"{DRIVE_API}/files/{file_id}/export",
token,
params={"mimeType": export_mimes[mime]},
)
else:
content = await _google_request(
"GET", f"{DRIVE_API}/files/{file_id}",
token,
params={"alt": "media"},
)
text = content if isinstance(content, str) else json.dumps(content)
return json.dumps({
"metadata": meta if isinstance(meta, dict) else {},
"content": text[:16000],
})
[docs]
async def google_drive_upload(
name: str,
content: str,
mime_type: str = "text/plain",
folder_id: str = "",
ctx: ToolContext | None = None,
) -> str:
from oauth_manager import OAuthNotConnected
try:
token = await _get_token(ctx)
except OAuthNotConnected as e:
return str(e)
metadata: dict[str, Any] = {"name": name, "mimeType": mime_type}
if folder_id:
metadata["parents"] = [folder_id]
boundary = "stargazer_upload_boundary"
body_parts = [
f"--{boundary}",
"Content-Type: application/json; charset=UTF-8",
"",
json.dumps(metadata),
f"--{boundary}",
f"Content-Type: {mime_type}",
"",
content,
f"--{boundary}--",
]
body = "\r\n".join(body_parts)
data = await _google_request(
"POST",
"https://www.googleapis.com/upload/drive/v3/files?uploadType=multipart",
token,
data=body.encode(),
extra_headers={"Content-Type": f"multipart/related; boundary={boundary}"},
)
if isinstance(data, dict) and "error" in data:
return json.dumps(data)
return json.dumps({
"id": data.get("id") if isinstance(data, dict) else None,
"name": data.get("name") if isinstance(data, dict) else name,
"mimeType": data.get("mimeType") if isinstance(data, dict) else mime_type,
"status": "uploaded",
})
# ---------------------------------------------------------------------------
# Gmail handlers
# ---------------------------------------------------------------------------
[docs]
async def google_gmail_list(
query: str = "",
label: str = "INBOX",
limit: int = 15,
page_token: str = "",
ctx: ToolContext | None = None,
) -> str:
from oauth_manager import OAuthNotConnected
try:
token = await _get_token(ctx)
except OAuthNotConnected as e:
return str(e)
params: dict[str, Any] = {"maxResults": min(limit, 100)}
if query:
params["q"] = query
if label:
params["labelIds"] = label
if page_token:
params["pageToken"] = page_token
data = await _google_request("GET", f"{GMAIL_API}/users/me/messages", token, params=params)
if isinstance(data, dict) and "error" in data:
return json.dumps(data)
messages = data.get("messages", []) if isinstance(data, dict) else []
# Fetch headers for each message
results = []
for msg_stub in messages[:limit]:
msg_data = await _google_request(
"GET", f"{GMAIL_API}/users/me/messages/{msg_stub['id']}",
token,
params={"format": "metadata", "metadataHeaders": "From,To,Subject,Date"},
)
if isinstance(msg_data, dict) and "error" not in msg_data:
hdrs = {h["name"]: h["value"] for h in msg_data.get("payload", {}).get("headers", [])}
results.append({
"id": msg_data.get("id"),
"threadId": msg_data.get("threadId"),
"from": hdrs.get("From", ""),
"to": hdrs.get("To", ""),
"subject": hdrs.get("Subject", ""),
"date": hdrs.get("Date", ""),
"snippet": msg_data.get("snippet", ""),
"labels": msg_data.get("labelIds", []),
})
result: dict[str, Any] = {"count": len(results), "messages": results}
if isinstance(data, dict) and data.get("nextPageToken"):
result["next_page_token"] = data["nextPageToken"]
return json.dumps(result)
[docs]
async def google_gmail_read(
message_id: str,
ctx: ToolContext | None = None,
) -> str:
from oauth_manager import OAuthNotConnected
try:
token = await _get_token(ctx)
except OAuthNotConnected as e:
return str(e)
data = await _google_request(
"GET", f"{GMAIL_API}/users/me/messages/{message_id}",
token,
params={"format": "full"},
)
if isinstance(data, dict) and "error" in data:
return json.dumps(data)
headers = {h["name"]: h["value"] for h in data.get("payload", {}).get("headers", [])}
def _extract_body(payload: dict) -> str:
if payload.get("body", {}).get("data"):
return base64.urlsafe_b64decode(payload["body"]["data"]).decode("utf-8", errors="replace")
for part in payload.get("parts", []):
if part.get("mimeType") == "text/plain" and part.get("body", {}).get("data"):
return base64.urlsafe_b64decode(part["body"]["data"]).decode("utf-8", errors="replace")
for part in payload.get("parts", []):
body = _extract_body(part)
if body:
return body
return ""
body = _extract_body(data.get("payload", {}))
return json.dumps({
"id": data.get("id"),
"threadId": data.get("threadId"),
"from": headers.get("From", ""),
"to": headers.get("To", ""),
"subject": headers.get("Subject", ""),
"date": headers.get("Date", ""),
"body": body[:16000],
"labels": data.get("labelIds", []),
})
[docs]
async def google_gmail_send(
to: str,
subject: str,
body: str,
ctx: ToolContext | None = None,
) -> str:
from oauth_manager import OAuthNotConnected
try:
token = await _get_token(ctx)
except OAuthNotConnected as e:
return str(e)
msg = MIMEText(body)
msg["to"] = to
msg["subject"] = subject
raw = base64.urlsafe_b64encode(msg.as_bytes()).decode()
data = await _google_request(
"POST", f"{GMAIL_API}/users/me/messages/send",
token,
json_body={"raw": raw},
)
if isinstance(data, dict) and "error" in data:
return json.dumps(data)
return json.dumps({
"status": "sent",
"id": data.get("id") if isinstance(data, dict) else None,
"threadId": data.get("threadId") if isinstance(data, dict) else None,
})
# ---------------------------------------------------------------------------
# Calendar handlers
# ---------------------------------------------------------------------------
[docs]
async def google_calendar_list_events(
time_min: str = "",
time_max: str = "",
calendar_id: str = "primary",
limit: int = 20,
query: str = "",
ctx: ToolContext | None = None,
) -> str:
from oauth_manager import OAuthNotConnected
try:
token = await _get_token(ctx)
except OAuthNotConnected as e:
return str(e)
params: dict[str, Any] = {
"maxResults": min(limit, 250),
"singleEvents": "true",
"orderBy": "startTime",
}
if time_min:
params["timeMin"] = time_min
if time_max:
params["timeMax"] = time_max
if query:
params["q"] = query
data = await _google_request(
"GET", f"{CALENDAR_API}/calendars/{calendar_id}/events",
token, params=params,
)
if isinstance(data, dict) and "error" in data:
return json.dumps(data)
events = []
for e in data.get("items", []) if isinstance(data, dict) else []:
events.append({
"id": e.get("id"),
"summary": e.get("summary"),
"description": (e.get("description") or "")[:500],
"start": e.get("start", {}).get("dateTime") or e.get("start", {}).get("date"),
"end": e.get("end", {}).get("dateTime") or e.get("end", {}).get("date"),
"location": e.get("location"),
"status": e.get("status"),
"url": e.get("htmlLink"),
"attendees": [a.get("email") for a in e.get("attendees", [])[:10]],
})
return json.dumps({"count": len(events), "events": events})
[docs]
async def google_calendar_create_event(
summary: str,
start_time: str,
end_time: str,
description: str = "",
location: str = "",
calendar_id: str = "primary",
attendees: list[str] | None = None,
ctx: ToolContext | None = None,
) -> str:
from oauth_manager import OAuthNotConnected
try:
token = await _get_token(ctx)
except OAuthNotConnected as e:
return str(e)
event_body: dict[str, Any] = {
"summary": summary,
"start": {"dateTime": start_time},
"end": {"dateTime": end_time},
}
if description:
event_body["description"] = description
if location:
event_body["location"] = location
if attendees:
event_body["attendees"] = [{"email": e} for e in attendees]
data = await _google_request(
"POST", f"{CALENDAR_API}/calendars/{calendar_id}/events",
token, json_body=event_body,
)
if isinstance(data, dict) and "error" in data:
return json.dumps(data)
return json.dumps({
"status": "created",
"id": data.get("id") if isinstance(data, dict) else None,
"summary": data.get("summary") if isinstance(data, dict) else summary,
"url": data.get("htmlLink") if isinstance(data, dict) else None,
"start": data.get("start") if isinstance(data, dict) else start_time,
"end": data.get("end") if isinstance(data, dict) else end_time,
})
# ---------------------------------------------------------------------------
# TOOLS registration
# ---------------------------------------------------------------------------
TOOLS = [
{
"name": "google_drive_list",
"description": "List files in the user's Google Drive. Supports search queries and folder filtering.",
"parameters": {
"type": "object",
"properties": {
"query": {"type": "string", "description": "Drive search query (e.g. \"name contains 'report'\")"},
"folder_id": {"type": "string", "description": "Filter to files within this folder ID"},
"limit": {"type": "integer", "description": "Max results (default 20, max 100)"},
"page_token": {"type": "string", "description": "Pagination token from previous response"},
},
},
"handler": google_drive_list,
},
{
"name": "google_drive_read",
"description": "Read the contents of a file from Google Drive. Exports Google Docs/Sheets/Slides as text/CSV.",
"parameters": {
"type": "object",
"properties": {
"file_id": {"type": "string", "description": "Google Drive file ID"},
},
"required": ["file_id"],
},
"handler": google_drive_read,
},
{
"name": "google_drive_upload",
"description": "Upload/create a new file in Google Drive.",
"parameters": {
"type": "object",
"properties": {
"name": {"type": "string", "description": "File name"},
"content": {"type": "string", "description": "File content (text)"},
"mime_type": {"type": "string", "description": "MIME type (default: text/plain)"},
"folder_id": {"type": "string", "description": "Parent folder ID (optional)"},
},
"required": ["name", "content"],
},
"handler": google_drive_upload,
},
{
"name": "google_gmail_list",
"description": "List emails from the user's Gmail with optional search query and label filter.",
"parameters": {
"type": "object",
"properties": {
"query": {"type": "string", "description": "Gmail search query (same syntax as Gmail search bar)"},
"label": {"type": "string", "description": "Label to filter by (default: INBOX)"},
"limit": {"type": "integer", "description": "Max results (default 15)"},
"page_token": {"type": "string", "description": "Pagination token"},
},
},
"handler": google_gmail_list,
},
{
"name": "google_gmail_read",
"description": "Read the full contents of a specific Gmail message by ID.",
"parameters": {
"type": "object",
"properties": {
"message_id": {"type": "string", "description": "Gmail message ID"},
},
"required": ["message_id"],
},
"handler": google_gmail_read,
},
{
"name": "google_gmail_send",
"description": "Send an email from the user's Gmail account.",
"parameters": {
"type": "object",
"properties": {
"to": {"type": "string", "description": "Recipient email address"},
"subject": {"type": "string", "description": "Email subject"},
"body": {"type": "string", "description": "Email body (plain text)"},
},
"required": ["to", "subject", "body"],
},
"handler": google_gmail_send,
},
{
"name": "google_calendar_list_events",
"description": "List upcoming events from the user's Google Calendar.",
"parameters": {
"type": "object",
"properties": {
"time_min": {"type": "string", "description": "Start of time range (ISO 8601, e.g. '2025-01-01T00:00:00Z')"},
"time_max": {"type": "string", "description": "End of time range (ISO 8601)"},
"calendar_id": {"type": "string", "description": "Calendar ID (default: 'primary')"},
"limit": {"type": "integer", "description": "Max events to return"},
"query": {"type": "string", "description": "Free text search within events"},
},
},
"handler": google_calendar_list_events,
},
{
"name": "google_calendar_create_event",
"description": "Create a new event on the user's Google Calendar.",
"parameters": {
"type": "object",
"properties": {
"summary": {"type": "string", "description": "Event title"},
"start_time": {"type": "string", "description": "Start time (ISO 8601 with timezone, e.g. '2025-03-15T10:00:00-05:00')"},
"end_time": {"type": "string", "description": "End time (ISO 8601 with timezone)"},
"description": {"type": "string", "description": "Event description"},
"location": {"type": "string", "description": "Event location"},
"calendar_id": {"type": "string", "description": "Calendar ID (default: 'primary')"},
"attendees": {"type": "array", "items": {"type": "string"}, "description": "Email addresses of attendees"},
},
"required": ["summary", "start_time", "end_time"],
},
"handler": google_calendar_create_event,
},
]