Travel assistant for NanoClaw: byAir flight notifications (delay, gate, connection risk, inbound aircraft delay, time-to-leave, arrival logistics), traffic-aware drive planning for in-person meetings (auto drive blocks + leave-by traffic rechecks), travel-booking gap checks, and nightly TripIt sync. Per-chat overlay tile.
77
97%
Does it follow best practices?
Impact
—
No eval scenarios have been run
Advisory
Suggest reviewing before use
#!/usr/bin/env python3
"""Apply sweep decisions to the calendar — idempotent create, skip-removes.
The drive-planner sweep is create-first (Epic #59 §3, confirmed interaction
model): when the precheck surfaces a meeting needing a drive block, the agent
calls this script to CREATE the prepared Free block(s); when the user replies
"skip", the agent calls it to REMOVE that meeting's blocks and record a skip so
the next sweep never recreates them. Both are deterministic calendar writes, so
they live in a script (per `coding-policy: script-delegation`), mirroring
flight-assist's `reconcile.py`.
Idempotency is the load-bearing guard (lombot #50 — requiring both directions
before "handled" produced 6 duplicate blocks). Create ALWAYS finds the
meeting's existing marker blocks first and skips any (meeting, direction) that
already exists: the sweep agent and a user reply can race on the same meeting,
so assume the race and make create a no-op when the block is already there.
Composio surface (`find_events` / `create_event` / `delete_event`) ships in the
co-located flight-assist skill's `composio_client`; imported read-only via the
runtime-mount-with-dev-fallback pattern. The marker codec + skip store are
drive-planner's own (`block_props`, `skip_state`).
Modes (subcommand on argv[1]):
create stdin {"meetings": [{"meeting_id": "...", "create_args": [...]}]}
stdout {"created": [...], "skipped_existing": [...], "failed": [...],
"message": "<ready-to-send notification or null>"} — the agent
relays `message` verbatim (id-free; single block → bare "skip",
several → numbered "skip 1 and 3"). `null` when nothing is worth
sending (the silence rule).
list stdin {"now": "<ISO>", "calendar_id": "primary"}
stdout {"blocks": [{"summary": "...", "meeting_id": "...",
"leave_by": "<ISO>"}]} — one per meeting with a current
drive block, ordered by leave_by. Lets the cancel UX map a user's
ordinal / natural-language reference to the internal `meeting_id`
without that id ever appearing in a user-facing message (#86).
remove stdin {"meeting_id" OR "summary": "...", "now": "<ISO>",
optional: "leave_by", "meeting_end", "calendar_id"}
stdout {"removed": [...], "skip_recorded": true}
`summary` resolves to the meeting_id server-side (by exact meeting
name, position-immune) so the cancel UX never needs a raw id (#86);
an unplannable meeting with no block resolves via the meeting event
itself. Pass optional `leave_by` to pin the exact instance when
meetings share a summary (a daily "Standup"). Non-match /
still-ambiguous return, respectively,
{"removed": [], "skip_recorded": false, "unmatched_summary": "..."}
and {"removed": [], "skip_recorded": false, "ambiguous_summary":
"...", "candidates": [{"summary","leave_by"}]} — never an id.
suppress stdin {"patches": [{"event_id": "...", "calendar_id": "...",
"description": "<full rebuilt block description>"}]}
stdout {"patched": ["<event_id>", ...]}
Invoked by the recheck SKILL.md AFTER the alert is sent, so a
failed send never permanently suppresses an alert.
This script is NOT a scheduler precheck — it is invoked by the agent and its
exit code is read directly: exit 0 on success, non-zero with a `{"error": ...}`
stderr line on a usage error or an unrecovered Composio failure (the agent
surfaces that to the user). stdlib-only (plus in-tile modules).
"""
from __future__ import annotations
import json
import sys
import urllib.error
from datetime import datetime, timedelta
from pathlib import Path
_BUNDLE_DIR = Path(__file__).resolve().parent
sys.path.insert(0, str(_BUNDLE_DIR))
_FLIGHT_ASSIST_RUNTIME = Path("/home/node/.claude/skills/tessl__flight-assist")
_FLIGHT_ASSIST_DEV = _BUNDLE_DIR.parent / "flight-assist"
def _flight_assist_dir() -> Path:
if _FLIGHT_ASSIST_RUNTIME.is_dir():
return _FLIGHT_ASSIST_RUNTIME
if _FLIGHT_ASSIST_DEV.is_dir():
return _FLIGHT_ASSIST_DEV
raise FileNotFoundError(
"drive-planner apply: cannot locate the co-shipped flight-assist skill at "
f"{_FLIGHT_ASSIST_RUNTIME} (runtime) or {_FLIGHT_ASSIST_DEV} (dev) — composio_client "
"ships there; both skills are part of jbaruch/nanoclaw-travel"
)
# Composio's find/create/delete surface ships in the co-located flight-assist
# bundle; add it to the path and import its client + error type (the same
# cross-bundle import reconcile.py does in-bundle), so the calendar-write
# failures can be caught by their specific type per `coding-policy:
# error-handling` rather than a bare catch-all.
sys.path.insert(0, str(_flight_assist_dir()))
from block_props import parse_block, parse_marker # noqa: E402
from composio_client import ComposioClient, ComposioError # noqa: E402
from skip_state import add_skip # noqa: E402
# Calendar-write failures worth catching per-op: a Composio tool error or a
# transport error. A non-write bug is not in this set and propagates.
_WRITE_ERRORS = (ComposioError, urllib.error.URLError, urllib.error.HTTPError, OSError)
# Pad the find window around a block so a small clock/timezone skew between
# create and the idempotency find never hides an existing block.
_FIND_PAD = timedelta(hours=1)
# Fallback skip horizon when a remove request carries no meeting_end and no
# blocks are found to derive one from — bounds the search window and the skip
# expiry so a future recurrence is still suppressed without pinning forever.
_DEFAULT_SKIP_HORIZON = timedelta(days=30)
def _load_composio():
"""Construct the in-tile ComposioClient from env (cross-bundle path set above)."""
return ComposioClient.from_env()
def _items(data: object) -> list:
"""Pull the event list out of a Composio FIND_EVENT response, tolerantly.
The live v3 `GOOGLECALENDAR_FIND_EVENT` double-nests the events at
`data.event_data.event_data` (verified against the NAS); a flat
`event_data`/`items` list and a `response_data` wrap are tolerated for other
toolkit shapes. Walks one level into a dict container; first list wins.
"""
if not isinstance(data, dict):
return []
for container in (data, data.get("event_data"), data.get("response_data")):
if not isinstance(container, dict):
continue
for key in ("event_data", "items"):
value = container.get(key)
if isinstance(value, list):
return value
return []
def existing_directions(fetched_events: list, meeting_id: str) -> set:
"""The set of leg directions already blocked for `meeting_id` (lombot #50).
Parses each fetched event with `parse_block` (recognition is by the block's
description state) and collects the directions of blocks that serve
`meeting_id`. "Handled" = ANY block, so a create for a (meeting, direction)
already in this set is skipped.
"""
directions = set()
for event in fetched_events:
state = parse_block(event)
if state is not None and state.meeting_id == meeting_id:
directions.add(state.direction)
return directions
def _arg_direction(create_arg: object) -> str | None:
"""The leg direction of a create-arg, read from its description marker."""
if not isinstance(create_arg, dict):
return None
marker = parse_marker(create_arg.get("description"))
return marker[1] if marker else None
def _calendar_id_of(create_args: list) -> str:
"""The calendar id from the first dict create-arg, defaulting to "primary".
Tolerant of non-dict entries (the create loop already skips malformed args)
so a bad first element never raises on the idempotency find.
"""
for arg in create_args:
if isinstance(arg, dict) and isinstance(arg.get("calendar_id"), str):
return arg["calendar_id"]
return "primary"
def plan_creates(meeting: dict, fetched_events: list) -> tuple[list, list]:
"""Split a meeting's create_args into (to_create, skipped_existing). Pure.
`to_create` are the create-arg dicts whose (meeting, direction) has no
existing marker block; `skipped_existing` are the directions already
present (idempotent no-op, lombot #50).
"""
present = existing_directions(fetched_events, meeting.get("meeting_id", ""))
to_create: list = []
skipped: list = []
create_args = meeting.get("create_args")
for arg in create_args if isinstance(create_args, list) else []:
direction = _arg_direction(arg)
if direction in present:
skipped.append(direction)
else:
to_create.append(arg)
return to_create, skipped
def _find_window(create_args: list) -> tuple[datetime | None, datetime | None]:
"""The padded [min start, max end] across a meeting's create_args.
The v3 create contract is flat `start_datetime` + `event_duration_*`, so
each block's end is start + its duration.
"""
starts, ends = [], []
for arg in create_args:
if not isinstance(arg, dict):
continue
start = _parse_iso(arg.get("start_datetime"))
if not start:
continue
hours = arg.get("event_duration_hour") or 0
minutes = arg.get("event_duration_minutes") or 0
if not isinstance(hours, int) or isinstance(hours, bool):
hours = 0
if not isinstance(minutes, int) or isinstance(minutes, bool):
minutes = 0
starts.append(start)
ends.append(start + timedelta(hours=hours, minutes=minutes))
if not starts or not ends:
return None, None
return min(starts) - _FIND_PAD, max(ends) + _FIND_PAD
def _parse_iso(raw: object) -> datetime | None:
if not isinstance(raw, str):
return None
text = raw.strip()
if text.endswith("Z"):
text = text[:-1] + "+00:00"
try:
parsed = datetime.fromisoformat(text)
except ValueError:
return None
return parsed if parsed.tzinfo is not None else None
def _fmt_clock(iso: object) -> str | None:
"""Format an ISO leave-by into a 12-hour clock label (e.g. "3:28 PM"), or None.
Uses the timestamp's own offset (the venue-local time the operator drives in).
Manual 12-hour math keeps it portable across platforms' strftime.
"""
dt = _parse_iso(iso)
if dt is None:
return None
hour = dt.hour % 12 or 12
return f"{hour}:{dt.minute:02d} {'AM' if dt.hour < 12 else 'PM'}"
def _summary_of(meeting: dict) -> str:
summary = meeting.get("summary")
return summary if isinstance(summary, str) and summary else "(unnamed meeting)"
def _ordered_blocked(meetings: list, created_ids: set) -> list:
"""Meetings that got >=1 created leg this run, ordered by leave_by (None last).
The order matches `_list_mode`'s leave_by ordering so the numbered skip
affordance lines up with what `apply.py list` returns.
"""
blocked = [
m
for m in meetings
if isinstance(m, dict)
and isinstance(m.get("meeting_id"), str)
and m.get("meeting_id") in created_ids
]
def _key(meeting: dict) -> tuple:
parsed = _parse_iso(meeting.get("leave_by"))
# Sort by the actual instant (timestamp), not the ISO string — a
# lexicographic compare of offset-bearing strings diverges from
# chronological order, which would desync the numbered "skip N" list
# from `_list_mode`'s datetime ordering. None (return-only) sorts last.
return (parsed is None, parsed.timestamp() if parsed else 0.0)
return sorted(blocked, key=_key)
def _blocked_line(meeting: dict, *, index: int | None, anchored: bool) -> str:
"""One blocked-meeting line. `index` None → single-block sentence; else numbered.
`anchored` is True when an outbound/bridge leg (the one the meeting-level
`leave_by` describes) was created THIS run. When only the return leg was
newly created — the outbound block already existed — `anchored` is False and
the line renders as return-only, so it never announces the outbound leave-by
as if it were just added.
"""
summary = _summary_of(meeting)
clock = _fmt_clock(meeting.get("leave_by"))
drive = meeting.get("drive_minutes")
has_leave_by = (
anchored and clock is not None and isinstance(drive, int) and not isinstance(drive, bool)
)
if index is None:
if not has_leave_by:
return f"Added a return drive block for {summary}."
return (
f"Added a drive block for {summary} — leave by {clock} "
f"({drive}-min drive with current traffic)."
)
if not has_leave_by:
return f"{index}. {summary} — return drive block"
return f"{index}. {summary} — leave by {clock} ({drive}-min drive)"
def _status_lines(meetings: list, created_ids: set, skipped_ids: set, failed: list) -> list:
"""The route-error / unplannable / failed lines, in meeting order (id-free)."""
# Key failures only by string ids — a malformed entry's unhashable id
# (dict/list) must not raise while composing the notification.
failed_by_id: dict = {}
for entry in failed if isinstance(failed, list) else []:
if (
isinstance(entry, dict)
and isinstance(entry.get("meeting_id"), str)
and entry["meeting_id"]
):
failed_by_id.setdefault(entry["meeting_id"], []).append(entry)
lines: list = []
for meeting in meetings:
if not isinstance(meeting, dict):
continue
summary = _summary_of(meeting)
# Normalize the id to a string (or None) before any set/dict lookup so an
# unhashable malformed id (dict/list) can't raise here.
meeting_id = meeting.get("meeting_id")
meeting_id = meeting_id if isinstance(meeting_id, str) and meeting_id else None
# Normalize to a list up front so a malformed truthy-but-non-list value
# (e.g. a stray string) reads as empty everywhere — including the later
# `not route_errors` / `if unplannable` membership checks below.
route_errors = meeting.get("route_errors")
route_errors = route_errors if isinstance(route_errors, list) else []
for item in route_errors:
err = item.get("error") if isinstance(item, dict) else None
suffix = f" ({err})" if err else ""
lines.append(
f"Couldn't compute drive time for {summary}{suffix} — "
f"no block created; will retry next sweep."
)
unplannable = meeting.get("unplannable")
unplannable = unplannable if isinstance(unplannable, list) else []
for leg in unplannable:
if not isinstance(leg, dict):
continue
direction = leg.get("direction")
reason = leg.get("reason") or "not plannable"
# Drop the direction word when it's missing/empty so the line reads
# "No drive block ..." rather than "No drive drive block ...".
label = (
f"{direction} drive block"
if isinstance(direction, str) and direction
else "drive block"
)
lines.append(f"No {label} for {summary} — {reason}.")
# Fully unplannable (no block at all): offer the mute affordance.
if (
unplannable
and meeting_id not in created_ids
and meeting_id not in skipped_ids
and not route_errors
):
lines.append(f"Reply don't drive to {summary} to stop seeing it.")
for entry in failed_by_id.get(meeting_id, []):
direction = entry.get("direction")
err = entry.get("error") or "error"
# "the <dir> drive block" when a direction is known, else "a drive
# block" — never "the drive drive block".
label = (
f"the {direction} drive block"
if isinstance(direction, str) and direction
else "a drive block"
)
lines.append(f"Couldn't create {label} for {summary} ({err}) — will retry next sweep.")
return lines
def build_notification(
meetings: list, created: list, skipped_existing: list, failed: list
) -> str | None:
"""Compose the id-free sweep notification the agent relays verbatim.
One created block → a single sentence + "Reply skip if you're not driving.".
Several → a numbered list + "Reply skip 1, or skip 1 and 3, to drop any.".
Route-error / unplannable / failed lines follow. Returns None when nothing
is worth sending (the silence rule). No meeting/event id ever appears and
the skip affordance is by bare "skip" or list number, so the operator never
types an id (#86) and a low-capability wake model can't improvise one.
"""
meetings = meetings if isinstance(meetings, list) else []
# Only string meeting ids count — a malformed entry (None / non-string)
# must never match a meeting whose id is also missing.
# Only non-empty string ids count — `_create_mode` treats the empty string
# as a missing id, so it must never match a meeting here either.
created_ids = {
c["meeting_id"]
for c in (created if isinstance(created, list) else [])
if isinstance(c, dict) and isinstance(c.get("meeting_id"), str) and c["meeting_id"]
}
skipped_ids = {
s["meeting_id"]
for s in (skipped_existing if isinstance(skipped_existing, list) else [])
if isinstance(s, dict) and isinstance(s.get("meeting_id"), str) and s["meeting_id"]
}
# Directions actually created this run, per meeting. A meeting is "anchored"
# (the meeting-level leave_by applies) only when an outbound/bridge leg was
# created now — not when the outbound already existed and only the return
# leg was added this run.
created_dirs: dict = {}
for c in created if isinstance(created, list) else []:
if isinstance(c, dict) and isinstance(c.get("meeting_id"), str) and c["meeting_id"]:
created_dirs.setdefault(c["meeting_id"], set()).add(c.get("direction"))
def _anchored(meeting: dict) -> bool:
return bool(created_dirs.get(meeting.get("meeting_id"), set()) & {"outbound", "bridge"})
blocked = _ordered_blocked(meetings, created_ids)
parts: list = []
if len(blocked) == 1:
parts.append(_blocked_line(blocked[0], index=None, anchored=_anchored(blocked[0])))
parts.append("Reply skip if you're not driving.")
elif len(blocked) > 1:
parts.append("Added drive blocks:")
parts.extend(
_blocked_line(meeting, index=i, anchored=_anchored(meeting))
for i, meeting in enumerate(blocked, 1)
)
# The multi-skip example must reference indices that exist for the count
# (e.g. "skip 1 and 2" when only two blocks were added, not "and 3").
second = min(3, len(blocked))
parts.append(f"Reply skip 1, or skip 1 and {second}, to drop any.")
parts.extend(_status_lines(meetings, created_ids, skipped_ids, failed))
if not parts:
return None
return "\n".join(parts)
def _create_mode(request: dict, client) -> dict:
created, skipped_existing, failed = [], [], []
raw_meetings = request.get("meetings")
meetings = raw_meetings if isinstance(raw_meetings, list) else []
for meeting in meetings:
meeting_id = meeting.get("meeting_id") if isinstance(meeting, dict) else None
if not isinstance(meeting_id, str) or not meeting_id:
# A malformed entry (no usable meeting id) is recorded, not crashed
# — one bad item in the request must not abort the whole batch.
failed.append(
{"meeting_id": meeting_id, "direction": None, "error": "missing meeting_id"}
)
continue
args = meeting.get("create_args")
if not isinstance(args, list):
args = []
time_min, time_max = _find_window(args)
fetched = []
if time_min and time_max and args:
calendar_id = _calendar_id_of(args)
fetched = _items(
client.find_events(
{
"calendar_id": calendar_id,
"timeMin": time_min.isoformat(),
"timeMax": time_max.isoformat(),
}
)
)
to_create, already = plan_creates(meeting, fetched)
skipped_existing.extend({"meeting_id": meeting_id, "direction": d} for d in already)
for arg in to_create:
try:
client.create_event(arg)
created.append({"meeting_id": meeting_id, "direction": _arg_direction(arg)})
except _WRITE_ERRORS as exc:
# One leg's create failing must not abort the batch — record it
# and keep going. The next sweep retries idempotently.
failed.append(
{"meeting_id": meeting_id, "direction": _arg_direction(arg), "error": str(exc)}
)
return {
"created": created,
"skipped_existing": skipped_existing,
"failed": failed,
"message": build_notification(meetings, created, skipped_existing, failed),
}
def _remove_mode(request: dict, client) -> dict:
meeting_id = request.get("meeting_id")
summary = request.get("summary")
has_id = isinstance(meeting_id, str) and bool(meeting_id)
has_summary = isinstance(summary, str) and bool(summary)
if not has_id and not has_summary:
raise ValueError("remove: `meeting_id` or `summary` is required")
now = _parse_iso(request.get("now"))
if now is None:
raise ValueError("remove: `now` must be a timezone-aware ISO-8601 string")
# meeting_end is optional — the user reply that triggers a skip carries only
# the meeting id. When absent it is derived below from the deleted blocks.
meeting_end = _parse_iso(request.get("meeting_end"))
calendar_id = request.get("calendar_id")
# Default a missing / non-string / empty calendar_id rather than forwarding
# an invalid value into find/delete.
if not isinstance(calendar_id, str) or not calendar_id:
calendar_id = "primary"
# Search a generous window: the blocks sit near the meeting, which (when no
# meeting_end is given) we don't yet know — bound the search by the skip
# store's furthest reasonable horizon ahead of now.
# Span the window across now AND meeting_end so a late skip/remove (the
# meeting already finished, blocks in the past) still finds the blocks, and
# a past meeting_end never inverts the window (timeMax < timeMin). With no
# meeting_end, look ahead a bounded horizon for the future blocks.
if meeting_end is not None:
time_min = min(now, meeting_end) - _FIND_PAD
time_max = max(now, meeting_end) + _FIND_PAD
else:
time_min = now - _FIND_PAD
time_max = now + _DEFAULT_SKIP_HORIZON
fetched = _items(
client.find_events(
{
"calendar_id": calendar_id,
"timeMin": time_min.isoformat(),
"timeMax": time_max.isoformat(),
}
)
)
# Resolve a name reference to its meeting_id server-side (position-immune):
# the agent maps the user's ordinal / name onto a summary, and this finds
# the meeting by exact summary regardless of how many other blocks exist
# (#86). An unplannable meeting has no block, so fall back to the meeting
# event itself, giving it a working skip path.
if not has_id:
assert isinstance(summary, str) # validation above requires id or summary
candidates = _resolve_candidates(fetched, summary)
# Pin the exact instance when the caller passes the block's leave-by
# (several meetings can share a summary — a daily "Standup").
leave_by = request.get("leave_by")
if isinstance(leave_by, str) and leave_by:
candidates = [c for c in candidates if c[1] == leave_by]
if not candidates:
return {"removed": [], "skip_recorded": False, "unmatched_summary": summary}
if len(candidates) > 1:
# Don't guess between same-summary meetings — hand the choices back
# (summary + leave_by, never an id) for the agent to disambiguate.
return {
"removed": [],
"skip_recorded": False,
"ambiguous_summary": summary,
"candidates": [{"summary": summary, "leave_by": when} for _, when in candidates],
}
meeting_id = candidates[0][0]
assert isinstance(meeting_id, str) # has_id, or resolved to a single str above
removed = []
block_arrivals = []
for event in fetched:
state = parse_block(event)
if state is None or state.meeting_id != meeting_id:
continue
try:
client.delete_event({"calendar_id": calendar_id, "event_id": state.event_id})
except ComposioError as exc:
# A 404 means the event is already gone (a concurrent delete) — an
# idempotent success, not a failure (matches reconcile.py). Any
# other Composio error propagates to main's handler.
if exc.status_code != 404:
raise
removed.append({"event_id": state.event_id, "direction": state.direction})
block_arrivals.append(state.arrive_by)
# Record the skip so the next sweep does not recreate the block. Expiry is
# the meeting's end when given; otherwise the latest block arrive-by (a skip
# is meaningless once the meeting is over, and scan filters it as `past`
# after its start anyway). For a no-block (unplannable) skip resolved from a
# meeting event, anchor to that event's end rather than the bounded fallback,
# so the persisted expiry matches the documented contract (state-schema.md)
# and never over-suppresses a later recurrence. The fallback horizon applies
# only when nothing time-anchored is available.
if meeting_end is None and not block_arrivals:
meeting_end = _meeting_event_end(fetched, meeting_id)
if meeting_end is not None:
expires = meeting_end
elif block_arrivals:
expires = max(block_arrivals)
else:
expires = now + _DEFAULT_SKIP_HORIZON
add_skip(meeting_id, expires=expires, now=now)
return {"removed": removed, "skip_recorded": True}
# The block summary is "Drive: <meeting summary>" (see precheck `_leg_create_args`).
# Stripping the prefix recovers the meeting summary the operator recognizes.
_DRIVE_SUMMARY_PREFIX = "Drive: "
def _block_start(state) -> datetime:
"""The block's actual start (the leave-by the operator saw) for any direction.
`baseline_leave_by` is the leave-by only for arrival-anchored legs
(outbound / bridge start `baseline + buffer` before arrival). A return
block's stored `arrive_by` is its synthetic leg END, so it starts `baseline`
before that — at the meeting end. Deriving per direction keeps `list` /
resolve reporting the real block start for every leg, not a return leg's
buffer-shifted value.
"""
if state.direction == "return":
return state.arrive_by - timedelta(seconds=state.baseline_seconds)
return state.baseline_leave_by
def _meeting_event_end(fetched: list, meeting_id: str) -> datetime | None:
"""The end (or start) of a fetched meeting event, for a no-block skip expiry."""
for event in fetched:
if not isinstance(event, dict) or event.get("id") != meeting_id:
continue
if parse_block(event) is not None:
continue
for key in ("end", "start"):
block = event.get(key)
if isinstance(block, dict):
when = _parse_iso(block.get("dateTime"))
if when is not None:
return when
return None
def _resolve_candidates(fetched: list, summary: str) -> list[tuple[str, str | None]]:
"""Meetings matching `summary` as `(meeting_id, leave_by_iso)`, position-immune.
Prefer drive blocks serving that meeting (parsed `meeting_id` + leave-by);
fall back to the meeting event itself (its id + start) so an `unplannable`
meeting — which has no block — can still be skipped (#86). Matching is by
exact summary; several distinct meetings can share a summary (a daily
"Standup"), so the caller disambiguates by leave-by rather than this picking
one by fetch order. De-duped by meeting_id, ordered by leave-by.
"""
target = f"{_DRIVE_SUMMARY_PREFIX}{summary}"
found: dict[str, str | None] = {}
# Blocks first. A meeting has several leg blocks (outbound / return) with
# different starts; keep the EARLIEST per meeting so the value matches what
# `list` and the notification showed (the outbound leave-by), since the
# caller disambiguates by exactly that.
for event in fetched:
state = parse_block(event)
if state is None or state.summary != target:
continue
leave_by = _block_start(state).isoformat()
current = found.get(state.meeting_id)
if current is None or leave_by < current:
found[state.meeting_id] = leave_by
# Meeting events too — an `unplannable` meeting has no block, and it can
# share a name with a DIFFERENT occurrence that does (so don't stop at
# blocks). A blocked meeting's event shares its id, so it's already covered.
for event in fetched:
if not isinstance(event, dict) or parse_block(event) is not None:
continue
if event.get("summary") == summary:
event_id = event.get("id")
if isinstance(event_id, str) and event_id and event_id not in found:
start = event.get("start")
# Prefer the timed `dateTime`; fall back to an all-day `date` so
# the candidate still carries a sortable when.
found[event_id] = (
(start.get("dateTime") or start.get("date"))
if isinstance(start, dict)
else None
)
# Order by when; a candidate with no parseable when sorts LAST, not first.
return sorted(found.items(), key=lambda item: (item[1] is None, item[1] or ""))
def _list_mode(request: dict, client) -> dict:
"""List the current drive blocks, one per meeting, for the cancel UX (#86).
Returns `{"blocks": [{summary, meeting_id, leave_by}]}` ordered by leave_by.
The agent maps a user's ordinal ("cancel 2") or natural-language ("don't
drive to swimming") onto a `meeting_id` from this list, so the internal id
never has to appear in — or be typed into — a user-facing message.
"""
now = _parse_iso(request.get("now"))
if now is None:
raise ValueError("list: `now` must be a timezone-aware ISO-8601 string")
calendar_id = request.get("calendar_id")
if not isinstance(calendar_id, str) or not calendar_id:
calendar_id = "primary"
fetched = _items(
client.find_events(
{
"calendar_id": calendar_id,
"timeMin": (now - _FIND_PAD).isoformat(),
"timeMax": (now + _DEFAULT_SKIP_HORIZON).isoformat(),
}
)
)
# One entry per meeting; a meeting has several leg blocks, so key by
# meeting_id and keep the earliest leave-by (the outbound) as the meeting's.
by_meeting: dict[str, dict] = {}
for event in fetched:
state = parse_block(event)
if state is None:
continue
leave_by = _block_start(state)
summary = state.summary
if summary.startswith(_DRIVE_SUMMARY_PREFIX):
summary = summary[len(_DRIVE_SUMMARY_PREFIX) :]
existing = by_meeting.get(state.meeting_id)
if existing is None or leave_by < existing["_leave_by"]:
by_meeting[state.meeting_id] = {
"summary": summary,
"meeting_id": state.meeting_id,
"leave_by": leave_by.isoformat(),
"_leave_by": leave_by,
}
blocks = sorted(by_meeting.values(), key=lambda b: b["_leave_by"])
for block in blocks:
del block["_leave_by"]
return {"blocks": blocks}
def _suppress_mode(request: dict, client) -> dict:
"""Persist the recheck poll's alert-suppression records — AFTER the ping.
The recheck SKILL.md calls this only once `mcp__nanoclaw__send_message` has
delivered the leave-earlier / leave-now alert, so a failed send never
permanently suppresses an alert. Each patch carries the block's full new
`description` (the poll rebuilt it with the updated alert record); since the
machine state lives in the description and `GOOGLECALENDAR_PATCH_EVENT`
supports a partial `description` update, the patch is just that one field.
"""
patched = []
patches = request.get("patches")
for patch in patches if isinstance(patches, list) else []:
if not isinstance(patch, dict):
continue
event_id = patch.get("event_id")
description = patch.get("description")
if not isinstance(event_id, str) or not event_id or not isinstance(description, str):
continue
calendar_id = patch.get("calendar_id")
if not isinstance(calendar_id, str) or not calendar_id:
calendar_id = "primary"
try:
client.patch_event(
{
"calendar_id": calendar_id,
"event_id": event_id,
"description": description,
}
)
except ComposioError as exc:
# A 404 means the block was deleted concurrently — nothing to
# suppress, an idempotent skip. One block's 404 must not fail
# suppression for the others; any other status propagates.
if exc.status_code != 404:
raise
continue
patched.append(event_id)
return {"patched": patched}
def main(argv: list[str]) -> int:
if len(argv) < 2 or argv[1] not in ("create", "list", "remove", "suppress"):
print(
json.dumps(
{"error": "usage: apply.py <create|list|remove|suppress> (request JSON on stdin)"}
),
file=sys.stderr,
)
return 2
try:
request = json.load(sys.stdin)
except json.JSONDecodeError as exc:
print(json.dumps({"error": f"invalid JSON on stdin: {exc}"}), file=sys.stderr)
return 2
if not isinstance(request, dict):
print(json.dumps({"error": "stdin must be a JSON object"}), file=sys.stderr)
return 2
try:
client = _load_composio()
if argv[1] == "create":
result = _create_mode(request, client)
elif argv[1] == "list":
result = _list_mode(request, client)
elif argv[1] == "remove":
result = _remove_mode(request, client)
else:
result = _suppress_mode(request, client)
except ValueError as exc:
# Config / usage error — missing COMPOSIO_* env, or a bad remove request.
print(json.dumps({"error": str(exc)}), file=sys.stderr)
return 2
except _WRITE_ERRORS as exc:
# An unrecovered Composio / transport failure during find/delete —
# surface it to the agent. A non-write bug propagates as a traceback.
print(json.dumps({"error": f"{type(exc).__name__}: {exc}"}), file=sys.stderr)
return 1
print(json.dumps(result))
return 0
if __name__ == "__main__":
sys.exit(main(sys.argv))skills
check-travel-bookings
drive-planner
drive-planner-recheck
flight-assist
references
nightly-travel-sync
sync-tripit