CtrlK
BlogDocsLog inGet started
Tessl Logo

jbaruch/nanoclaw-travel

Travel assistant for NanoClaw: byAir flight notifications (delay, gate, connection risk, inbound aircraft delay, time-to-leave, arrival logistics), traffic-aware drive planning for in-person meetings (auto drive blocks + leave-by traffic rechecks), travel-booking gap checks, and nightly TripIt sync. Per-chat overlay tile.

77

Quality

96%

Does it follow best practices?

Impact

No eval scenarios have been run

SecuritybySnyk

Advisory

Suggest reviewing before use

Overview
Quality
Evals
Security
Files

fetch_events.pyskills/drive-planner/

"""Wide-window primary-calendar event fetch for the drive-planner sweep.

The sweep needs the upcoming calendar events in one shot so `scan.py` can
classify them (Epic #59 §4). This module is that fetch: a single Composio
tool-execution call against `GOOGLECALENDAR_EVENTS_LIST` (calendarId
"primary", singleEvents) over a time window, returning the raw Google
Calendar event dicts in the exact
shape `scan(events=...)` consumes (`id`, `summary`, `location`, `start`,
`end`, `description`). The recheck poll reads the drive-planner block's machine
state back out of the same `description` field (Epic #59 §4 — the live v3
toolkit has no writable extendedProperties). drive-planner owns its own fetch
rather than sharing
flight-assist's per-calendar `composio_client` — a different action, a
different skill bundle — but mirrors that module's transport faithfully:
stdlib-only `urllib`, HTTP-mockable in CI, the Composio success/failure
envelope, one client per process.

Composio executes the action with a single POST keyed by the slug:

    POST {base}/tools/execute/{action}
    headers: x-api-key: <key>, Content-Type: application/json
    body:    {"user_id": "<id>", "arguments": {"calendarId": "primary",
             "singleEvents": true, "timeMin": "...", "timeMax": "..."}}
    -> 200   {"data": {...events...}, "successful": true,  "error": null}
    -> 200   {"data": {...}, "successful": false, "error": "..."}

The exact `data` container for the events list is Composio-toolkit-version
specific (like the action slugs in `composio_client.py`); the candidate
keys are isolated in `_EVENT_CONTAINER_KEYS` at the top of the file — verify
against the live toolkit when first wiring against the NAS. A `successful:
true` response whose `data` carries none of those keys raises `FetchError`
rather than silently returning zero events (a silent empty fetch would make
the sweep a no-op and quietly stop planning).

stdlib-only per `coding-policy: dependency-management` (Stdlib First).

Caveat: Composio is mid-retirement (nanoclaw#638 → OneCLI workspace MCP);
this fetch is the one piece that re-points later, same as `composio-fetch`.

Public API:
    from fetch_events import CalendarFetcher, FetchError

    fetcher = CalendarFetcher.from_env()
    events = fetcher.fetch_window(time_min=now, time_max=now + timedelta(days=14))
    results = scan(events, now=now, home_address=home, skip_state=active)
"""

from __future__ import annotations

import json
import os
import urllib.error
import urllib.request
from datetime import datetime

_DEFAULT_BASE_URL = "https://backend.composio.dev/api/v3"

# GoogleCalendar action slug — verified against the live v3 toolkit
# (`GET /api/v3/tools/GOOGLECALENDAR_EVENTS_LIST`), matching the proven
# nanoclaw-admin `composio-fetch` precheck. The earlier
# `GOOGLECALENDAR_EVENTS_LIST_ALL_CALENDARS` slug does not exist and 404s.
ACTION_LIST_EVENTS = "GOOGLECALENDAR_EVENTS_LIST"

# The action's required/contract arguments (camelCase). `calendarId` is
# required by the v3 schema; "primary" is the operator's main calendar (where
# meetings live). `singleEvents` expands recurring events into instances so a
# weekly standup surfaces as datable occurrences `scan.py` can classify. The
# wide `[timeMin, timeMax]` window is added per-call in `fetch_window`.
_BASE_ARGS = {"calendarId": "primary", "singleEvents": True}

# Candidate keys under the Composio `data` envelope that hold the event list.
# The live v3 response is the Google-native events.list resource, whose list is
# under `items`; `events` and the `response_data` nesting are tolerated for
# other toolkit shapes. Checked in order; the first present list wins.
_EVENT_CONTAINER_KEYS = ("items", "events")

# Event fields carried through verbatim from the raw event. `scan.py` reads
# id/summary/location/start/end/description; the recheck poll also reads
# `description` — that's where the drive-planner block's machine state lives
# (the live v3 toolkit has no writable extendedProperties), so `description`
# alone carries it back (Epic #59 §4 — calendar event IS the state, fetched by
# API).
_EVENT_FIELDS = (
    "id",
    "summary",
    "location",
    "start",
    "end",
    "description",
    "attendees",  # scan.py reads the operator's RSVP to skip declined meetings
    "status",  # scan.py skips cancelled events
)


class FetchError(Exception):
    """Raised when the calendar fetch fails at the tool level or returns an
    unrecognized shape.

    Distinct from a transport error (`urllib.error.*`, which propagates): a
    `FetchError` means Composio answered but the answer was a tool-level
    failure (`successful: false`) or a `successful: true` body whose `data`
    held no recognizable event container. The fix is to check the Composio
    connection / re-verify the action's response shape, not to retry.
    """

    def __init__(self, message: str, *, status_code: int | None = None):
        super().__init__(message)
        self.message = message
        self.status_code = status_code


class CalendarFetcher:
    """Thin Composio client for the wide-window primary-calendar event fetch.

    Auth (`x-api-key`) and user scoping (`user_id`) are fixed per instance.
    Not thread-safe — one instance per process, matching `composio_client`.
    """

    def __init__(
        self,
        api_key: str,
        user_id: str,
        *,
        base_url: str = _DEFAULT_BASE_URL,
        timeout: float = 30.0,
    ):
        if not api_key:
            raise ValueError(
                "CalendarFetcher: api_key is empty — set COMPOSIO_API_KEY in the env "
                "(from https://app.composio.dev settings) or pass it explicitly"
            )
        if not user_id:
            raise ValueError(
                "CalendarFetcher: user_id is empty — set COMPOSIO_USER_ID in the env "
                "(the Composio entity the Google Calendar account is connected under) "
                "or pass it explicitly"
            )
        self._api_key = api_key
        self._user_id = user_id
        self._base_url = base_url.rstrip("/")
        self._timeout = timeout

    @classmethod
    def from_env(
        cls,
        *,
        api_key_var: str = "COMPOSIO_API_KEY",
        user_id_var: str = "COMPOSIO_USER_ID",
        base_url_var: str = "COMPOSIO_BASE_URL",
        timeout: float = 30.0,
    ) -> CalendarFetcher:
        """Construct from COMPOSIO_API_KEY + COMPOSIO_USER_ID env vars.

        COMPOSIO_BASE_URL optionally overrides the endpoint; unset uses the
        public v3 backend.
        """
        api_key = os.environ.get(api_key_var, "")
        if not api_key:
            raise ValueError(
                f"CalendarFetcher.from_env: ${api_key_var} is unset — add the Composio API "
                f"key (https://app.composio.dev settings) to OneCLI vault and restart the container"
            )
        user_id = os.environ.get(user_id_var, "")
        if not user_id:
            raise ValueError(
                f"CalendarFetcher.from_env: ${user_id_var} is unset — add the Composio user/"
                f"entity id (the entity the Google Calendar account is connected under) to vault"
            )
        base_url = os.environ.get(base_url_var) or _DEFAULT_BASE_URL
        return cls(api_key, user_id, base_url=base_url, timeout=timeout)

    def fetch_window(self, *, time_min: datetime, time_max: datetime) -> list:
        """Fetch primary-calendar events in [time_min, time_max] as scan-shaped dicts.

        Args:
            time_min: window start (tz-aware).
            time_max: window end (tz-aware, after time_min).

        Returns:
            A list of raw event dicts carrying the fields scan.py reads.
            Empty when the window genuinely has no events. Any non-dict entry
            in the upstream list is passed through verbatim for scan.py to
            classify as `filtered` — fetch never silently drops one.

        Raises:
            ValueError: on a naive datetime or time_max <= time_min.
            FetchError: on a Composio tool-level failure or an unrecognized
                successful-response shape.
            urllib.error.HTTPError / URLError: on transport failure.
        """
        if time_min.tzinfo is None or time_max.tzinfo is None:
            raise ValueError("fetch_window: time_min and time_max must be timezone-aware")
        if time_max <= time_min:
            raise ValueError("fetch_window: time_max must be after time_min")

        data = self._execute(
            ACTION_LIST_EVENTS,
            {**_BASE_ARGS, "timeMin": time_min.isoformat(), "timeMax": time_max.isoformat()},
        )
        return [_project_event(event) for event in _extract_events(data)]

    def _execute(self, action: str, arguments: dict) -> dict:
        """Execute one Composio action; return its `data` payload.

        Mirrors `composio_client.ComposioClient.execute`: raises FetchError
        on `successful: false`, normalizes a read timeout to URLError.
        """
        url = f"{self._base_url}/tools/execute/{action}"
        payload = json.dumps({"user_id": self._user_id, "arguments": arguments}).encode("utf-8")
        headers = {
            "Content-Type": "application/json",
            "Accept": "application/json",
            "x-api-key": self._api_key,
        }
        request = urllib.request.Request(url, data=payload, headers=headers, method="POST")
        try:
            with urllib.request.urlopen(request, timeout=self._timeout) as response:
                raw = response.read().decode("utf-8")
        except TimeoutError as timeout_err:
            raise urllib.error.URLError(f"timed out: {timeout_err}") from timeout_err

        body = json.loads(raw)
        if not body.get("successful", False):
            data = body.get("data") or {}
            status_code = data.get("status_code") if isinstance(data, dict) else None
            message = body.get("error") or (data.get("message") if isinstance(data, dict) else None)
            raise FetchError(
                f"{action} failed: {message or 'Composio reported successful=false'}",
                status_code=status_code,
            )
        return body.get("data") or {}


def _extract_events(data: object) -> list:
    """Pull the event list out of the Composio `data` envelope.

    Tries each `_EVENT_CONTAINER_KEYS` in order and returns the first that is
    a list — verbatim, including any non-dict entries. A `data` with none of
    the keys raises FetchError: a successful response we cannot read is a
    shape regression to surface, not a silent empty fetch that would stop the
    sweep planning. Individual malformed entries are NOT filtered here —
    `scan.py` classifies a non-dict event as `filtered` (it never crashes and
    never silently drops one), so preserving them keeps a partial shape
    regression visible in the sweep's audit rather than hidden.
    """
    if not isinstance(data, dict):
        raise FetchError(
            f"calendar fetch returned a non-object data payload: {type(data).__name__}"
        )
    # Look in the top-level `data` and, for toolkit shapes that wrap the
    # Google payload one level down, in `data.response_data` (the same nesting
    # flight-assist's reconcile `_items` tolerates).
    nested = data.get("response_data")
    containers = [data, nested] if isinstance(nested, dict) else [data]
    for container in containers:
        for key in _EVENT_CONTAINER_KEYS:
            value = container.get(key)
            if isinstance(value, list):
                return value
    raise FetchError(
        "calendar fetch succeeded but no event list found under "
        f"{_EVENT_CONTAINER_KEYS} (top-level or response_data) — verify the action's "
        "response shape against the live toolkit"
    )


def _project_event(event: object) -> object:
    """Keep only the fields scan.py reads, dropping the rest of the GCal resource.

    A non-dict entry is passed through untouched for `scan.py` to classify as
    `filtered` — fetch must not silently drop it (that would turn a shape
    regression into an invisible empty sweep).
    """
    if not isinstance(event, dict):
        return event
    return {field: event[field] for field in _EVENT_FIELDS if field in event}

CHANGELOG.md

README.md

tile.json