Travel assistant for NanoClaw: byAir flight notifications (delay, gate, connection risk, inbound aircraft delay, time-to-leave, arrival logistics), travel-booking gap checks, and nightly TripIt sync. Per-chat overlay tile.
75
93%
Does it follow best practices?
Impact
—
No eval scenarios have been run
Advisory
Suggest reviewing before use
"""Calendar reconciliation orchestrator for flight-assist (#55).
The deterministic glue that connects the pure planner (`calendar_plan.py`)
to live Google Calendars via Composio. No LLM in the loop: it resolves
calendar IDs, fetches + normalizes the current calendar state, builds the
per-flight planner inputs, runs `plan_reconciliation`, executes the
returned ops through `composio_client`, and writes the resulting event IDs
back into each flight's `calendar_events` ledger.
Responsibility split (the same discipline the planner enforces):
- `calendar_plan.py` — pure decisions (what ops converge the calendar).
- `disposition.py` — per-flight disposition (needs the wall clock +
active-flights membership).
- `boarding_lead.py` — boarding-lead policy (aircraft size / TATL-TPAC).
- this module — I/O + glue: resolve IDs, fetch, execute, persist.
Calendar grounding (settled in #55):
- The flight events + the boarding block live on the operator's flight
calendar ("Flighty Flights" — the byAir calendar in tile terms). Its ID
is resolved at runtime from the operator-supplied `byair_calendar_name`
in config and cached as `byair_calendar_id`; never hardcoded in tile
code per `rules/flight-data-locality.md`.
- Reclaim writes its travel blocks onto the user's PRIMARY calendar
interleaved with real meetings — there is no dedicated Reclaim calendar.
They are content-classified (`calendar_normalize.is_reclaim_travel`) and
the planner deletes one only inside a same-airport layover gap.
Two passes per cycle:
- Active pass: reconcile the flights currently in `active-flights.json`
(boarding block, adopt-by-tag + delta-shift the byAir flight event,
Reclaim same-airport-gap deletions).
- Tombstone sweep: flights that have dropped out of `active-flights.json`
but still carry a `calendar_events` ledger. The per-flight wake loop is
structurally blind to these (it only visits active flights), so this is
the one place a switched-away flight's managed events get torn down. The
planner resolves their disposition (switched_away / cancelled / diverted
→ teardown deletes; completed → leave as a record) off the retained
ledger, this module executes the deletes, then archives the state file
once teardown settles (see `_archive_settled_tombstones`).
The exact `GOOGLECALENDAR_*` *argument* field names are Composio-version-
specific. They are isolated in the "Composio argument adapters" section
below so a live-toolkit correction is a one-spot fix — the same treatment
`composio_client.py` gives its action slugs. Verify them against the live
toolkit when first wiring against the NAS.
stdlib-only (`datetime`) per `coding-policy: dependency-management`.
"""
from __future__ import annotations
import sys
from datetime import datetime, timedelta, timezone
from boarding_lead import resolve_boarding_lead_minutes
from calendar_normalize import NormalizeError, normalize_event
from calendar_plan import (
DISPOSITION_COMPLETED,
MANAGED_ADOPTED,
MANAGED_CREATED,
plan_reconciliation,
)
from composio_client import ComposioError
from disposition import resolve_disposition
from state import (
delete_flight_state,
list_flight_state_ids,
read_active_flights,
read_config,
read_flight_state,
write_config,
write_flight_state,
)
# Reclaim travel blocks live on the user's primary calendar (#55). Google's
# well-known alias for the authenticated user's primary calendar.
PRIMARY_CALENDAR_ID = "primary"
# Padding around the flights' own span when fetching calendar events. Wide
# enough to capture a Reclaim travel block sitting just outside a leg's
# scheduled window; the planner's same-airport-gap rule bounds which ones
# are actually deletable, so an over-wide fetch only costs a few extra
# events to classify, never a wrong delete.
_FETCH_WINDOW_PAD = timedelta(hours=6)
class ReconcileError(Exception):
"""Raised on a non-recoverable reconcile setup failure.
Per-op execution failures are NOT this — they are logged and collected
so one bad op never aborts the cycle (mirrors the precheck's per-flight
resilience). This is reserved for failures that make the whole run
meaningless, e.g. the active-flights index being unreadable.
"""
# --- Calendar argument adapters (verify against the live Composio toolkit) ---
#
# Each helper maps to/from the GOOGLECALENDAR_* `arguments` / response shape.
# The response shape (`{"items": [...]}`, an event resource carrying `id`) is
# the Google-native shape the composio_client tests already assume. The
# request-argument field names are the version-specific surface; isolated here.
def _list_calendar_items(client) -> list[dict]:
"""Return the operator's calendar list as `[{id, summary, ...}, ...]`."""
data = client.list_calendars()
return _items(data)
def _find_events_args(*, calendar_id: str, time_min: str, time_max: str) -> dict:
"""Arguments for GOOGLECALENDAR_FIND_EVENT over a calendar + time window.
`singleEvents` expands recurring events into instances so each carries a
concrete start/end the planner can compare; without it a recurring master
comes back with no concrete instance time and `_is_timed` would mishandle
it. The name is Google Calendar's documented camelCase API parameter (the
response is Google-native too) — verify against the live toolkit.
"""
return {
"calendar_id": calendar_id,
"timeMin": time_min,
"timeMax": time_max,
"singleEvents": True,
}
def _create_event_args(op: dict) -> dict:
"""Arguments for GOOGLECALENDAR_CREATE_EVENT from a planner `create` op.
The planner's `body` is `{summary, start, end, private_props}`; map it to
a Google-native event resource under the calendar. Verify field names
against the live toolkit.
"""
body = op["body"]
return {
"calendar_id": op["calendar_id"],
"summary": body["summary"],
"start": {"dateTime": body["start"]},
"end": {"dateTime": body["end"]},
"extendedProperties": {"private": body["private_props"]},
}
def _patch_event_args(op: dict) -> dict:
"""Arguments for GOOGLECALENDAR_PATCH_EVENT from an `update` / `adopt` op.
`update` carries `{start, end}` (a delta-shift to byAir truth); `adopt`
carries `{private_props}` (the tag-only patch that claims a byAir event).
Only the keys present in the op body are sent, so a patch never clobbers
a field it did not intend to touch.
"""
body = op["body"]
args: dict = {"calendar_id": op["calendar_id"], "event_id": op["event_id"]}
if "start" in body:
args["start"] = {"dateTime": body["start"]}
if "end" in body:
args["end"] = {"dateTime": body["end"]}
if "private_props" in body:
args["extendedProperties"] = {"private": body["private_props"]}
return args
def _delete_event_args(op: dict) -> dict:
"""Arguments for GOOGLECALENDAR_DELETE_EVENT from a `delete` op."""
return {"calendar_id": op["calendar_id"], "event_id": op["event_id"]}
def _items(data: dict) -> list[dict]:
"""Pull the `items` list out of a Composio GoogleCalendar list/find response.
Composio returns the Google-native `{"items": [...]}` payload as the
action's `data`; some toolkit versions nest it one level under
`response_data`. Tolerate both, default to empty.
"""
if not isinstance(data, dict):
return []
items = data.get("items")
if items is None:
nested = data.get("response_data")
items = nested.get("items") if isinstance(nested, dict) else None
return items if isinstance(items, list) else []
def _created_event_id(data: dict) -> str | None:
"""Extract the new event's `id` from a CREATE_EVENT response."""
if not isinstance(data, dict):
return None
event_id = data.get("id")
if event_id:
return event_id
nested = data.get("response_data")
if isinstance(nested, dict):
return nested.get("id")
return None
# --- Calendar-ID resolution -------------------------------------------------
def _match_calendar(items: list[dict], name: str) -> str | None:
"""Return the id of the calendar whose summary equals `name`.
Case-insensitive, whitespace-trimmed exact match on the calendar's
display name (`summary`). Returns None when no calendar matches, so the
caller can no-op rather than guess.
"""
target = name.strip().casefold()
for item in items:
summary = (item.get("summary") or "").strip().casefold()
if summary == target and item.get("id"):
return item["id"]
return None
def resolve_byair_calendar_id(client, config: dict) -> str | None:
"""Resolve the flight ("Flighty Flights") calendar ID, caching the result.
Order:
1. `config["byair_calendar_id"]` — already cached, use directly.
2. `config["byair_calendar_name"]` — list calendars, match the name
once, cache the resolved id back into config.json so later cycles
skip the lookup.
3. Neither configured / no match — return None; the caller no-ops
(there is no flight calendar to reconcile against).
Never writes "Flighty" into tile code — the name is operator-supplied
config data per `rules/flight-data-locality.md`.
"""
cached = config.get("byair_calendar_id")
if cached:
return cached
name = config.get("byair_calendar_name")
if not name:
return None
resolved = _match_calendar(_list_calendar_items(client), name)
if resolved is None:
print(
f"flight-assist reconcile: no calendar named {name!r} found — "
f"check config.byair_calendar_name against the operator's calendar list",
file=sys.stderr,
)
return None
# Cache the resolved id. read_config strips schema_version handling; pass
# only the documented optional fields back to write_config.
to_persist = {k: v for k, v in config.items() if k != "schema_version"}
to_persist["byair_calendar_id"] = resolved
write_config(to_persist)
return resolved
# --- Per-flight planner-input building ---------------------------------------
def _effective_times(state: dict) -> tuple[str, str]:
"""Return (dep, arr) byAir-truth instants: actual when known, else scheduled.
The planner converges every managed event to byAir truth, which is the
actual `last_snapshot.dep_time` / `arr_time` once byAir has published
them, falling back to the scheduled times before the first poll.
`is not None`, not truthiness — a present-but-empty actual time is
malformed and must surface downstream rather than silently use scheduled.
"""
snapshot = state.get("last_snapshot") or {}
dep = snapshot.get("dep_time")
arr = snapshot.get("arr_time")
return (
dep if dep is not None else state["scheduled_dep_time"],
arr if arr is not None else state["scheduled_arr_time"],
)
def _resolve_lead(state: dict) -> int:
"""Resolve the boarding-lead minutes for a flight from its snapshot.
Reads the aircraft model + airport coordinates the lead policy needs out
of `last_snapshot`; every input is optional and the resolver degrades
gracefully (widebody by `inbound.aircraft_model`, else the narrowbody
default) until the precheck stamps the richer fields (see
`state-schema.md` last_snapshot, #55 runtime facts).
"""
snapshot = state.get("last_snapshot") or {}
inbound = snapshot.get("inbound") or {}
return resolve_boarding_lead_minutes(
aircraft_model=snapshot.get("aircraft_model"),
inbound_aircraft_model=inbound.get("aircraft_model"),
dep_lat=snapshot.get("dep_lat"),
dep_lon=snapshot.get("dep_lon"),
arr_lat=snapshot.get("arr_lat"),
arr_lon=snapshot.get("arr_lon"),
)
def build_planner_flight(state: dict, *, in_active_flights: bool, now: datetime) -> dict:
"""Build one flight's planner input from its persisted state record.
Resolves the disposition (wall-clock + membership) and the boarding lead,
and selects byAir-truth dep/arr times, leaving the pure planner to decide
the ops. The `calendar_events` ledger is passed through so the planner
keys updates/deletes off it.
"""
dep, arr = _effective_times(state)
return {
"flight_id": state["flight_id"],
"code": state["code"],
"trip_id": state["trip_id"],
"dep_airport_id": state["dep_airport_id"],
"arr_airport_id": state["arr_airport_id"],
"dep_time": dep,
"arr_time": arr,
"boarding_lead_minutes": _resolve_lead(state),
"disposition": resolve_disposition(state, in_active_flights=in_active_flights, now=now),
"calendar_events": state.get("calendar_events") or {},
}
# --- Event fetch + normalization ---------------------------------------------
def _parse_instant(value: str) -> datetime:
return datetime.fromisoformat(value.replace("Z", "+00:00")).astimezone(timezone.utc)
def _fetch_window(flights: list[dict]) -> tuple[str, str]:
"""Compute the [time_min, time_max] event-fetch window across all flights.
Spans from the earliest boarding-block start (dep − lead) to the latest
arrival, padded both sides so a Reclaim travel block adjacent to a leg is
still fetched. Returns RFC 3339 UTC strings.
"""
starts = [
_parse_instant(f["dep_time"]) - timedelta(minutes=int(f["boarding_lead_minutes"]))
for f in flights
]
ends = [_parse_instant(f["arr_time"]) for f in flights]
lo = min(starts) - _FETCH_WINDOW_PAD
hi = max(ends) + _FETCH_WINDOW_PAD
return _to_rfc3339(lo), _to_rfc3339(hi)
def _to_rfc3339(instant: datetime) -> str:
return instant.astimezone(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
def _is_timed(event: dict) -> bool:
"""True when the normalized event has concrete timed start AND end.
All-day events normalize to a bare `YYYY-MM-DD` start (no `T`); they are
never flights or Reclaim travel blocks, and the planner's instant parse
would reject them, so they are filtered before planning.
"""
start = event.get("start")
end = event.get("end")
return isinstance(start, str) and "T" in start and isinstance(end, str) and "T" in end
def collect_events(client, *, byair_calendar_id: str, time_min: str, time_max: str) -> list[dict]:
"""Fetch + normalize the events the planner needs across both calendars.
The flight calendar (`classify_reclaim=False` — its events are byAir
flight events / boarding blocks) and the primary calendar
(`classify_reclaim=True` — where Reclaim writes its travel blocks). Bare
all-day events and events normalization rejects (no `id`) are skipped
with a diagnostic rather than aborting the cycle.
"""
events: list[dict] = []
for calendar_id, classify_reclaim in (
(byair_calendar_id, False),
(PRIMARY_CALENDAR_ID, True),
):
raw = client.find_events(
_find_events_args(calendar_id=calendar_id, time_min=time_min, time_max=time_max)
)
for raw_event in _items(raw):
try:
normalized = normalize_event(
raw_event, calendar_id=calendar_id, classify_reclaim=classify_reclaim
)
except NormalizeError as exc:
print(f"flight-assist reconcile: skipping malformed event: {exc}", file=sys.stderr)
continue
if _is_timed(normalized):
events.append(normalized)
return events
# --- Op execution + ledger writeback -----------------------------------------
def _ledger_entry(*, event_id: str, calendar_id: str, managed: str, signature: str) -> dict:
return {
"event_id": event_id,
"calendar_id": calendar_id,
"managed": managed,
"synced_signature": signature,
}
def _apply_op_to_ledger(op: dict, ledger: dict, client) -> bool:
"""Execute one planner op and mutate `ledger` in place. Return True if the
flight's state needs to be written back (ledger changed).
Raises ComposioError / urllib errors to the caller, which logs and
collects them so one failed op never aborts the cycle. A delete that
404s (event already gone) is an idempotent success, handled here.
"""
kind = op["kind"]
operation = op["op"]
if operation == "create":
data = client.create_event(_create_event_args(op))
new_id = _created_event_id(data)
if new_id is None:
raise ComposioError(f"create for {kind} returned no event id: {data!r}")
ledger[kind] = _ledger_entry(
event_id=new_id,
calendar_id=op["calendar_id"],
managed=MANAGED_CREATED,
signature=op["signature"],
)
return True
if operation == "adopt":
client.patch_event(_patch_event_args(op))
ledger[kind] = _ledger_entry(
event_id=op["event_id"],
calendar_id=op["calendar_id"],
managed=MANAGED_ADOPTED,
signature=op["signature"],
)
return True
if operation == "update":
client.patch_event(_patch_event_args(op))
entry = ledger.get(kind)
if entry is not None:
entry["synced_signature"] = op["signature"]
return True
return False
if operation == "delete":
try:
client.delete_event(_delete_event_args(op))
except ComposioError as exc:
if exc.status_code != 404:
raise
# 404 = already gone; idempotent success, fall through to ledger drop.
# Reclaim travel blocks are not tracked in the ledger; only drop a
# boarding / flight entry that the ledger actually holds.
if kind in ledger:
del ledger[kind]
return True
return False
if operation == "forget":
if kind in ledger:
del ledger[kind]
return True
return False
raise ReconcileError(f"unknown planner op {operation!r}")
# --- Top-level orchestration -------------------------------------------------
def _gather_tombstones(
active_set: set[int], *, now: datetime
) -> tuple[dict[int, dict], list[dict]]:
"""Read on-disk flights that left active-flights but still hold a ledger.
These are the switched-away / cancelled / completed flights the per-flight
wake loop can no longer see. A state file with an empty (or absent)
`calendar_events` ledger has nothing to tear down and is ignored — it is
not a tombstone. Returns `(states_by_id, planner_flights)`, the flights
built with `in_active_flights=False` so the disposition resolver routes
them to teardown / record rather than active reconciliation.
"""
states_by_id: dict[int, dict] = {}
flights: list[dict] = []
for flight_id in list_flight_state_ids():
if flight_id in active_set:
continue
state = read_flight_state(flight_id)
if state is None or not state.get("calendar_events"):
continue
states_by_id[flight_id] = state
flights.append(build_planner_flight(state, in_active_flights=False, now=now))
return states_by_id, flights
def _settled_tombstone_ids(
tombstone_flights: list[dict], states_by_id: dict[int, dict]
) -> set[int]:
"""Return the tombstone flight_ids whose teardown has settled — safe to archive.
A switched_away / cancelled / diverted tombstone settles when its ledger
is empty: every teardown delete succeeded (a failed delete keeps its
ledger entry, so the file is retained for the next cycle's retry — never
archived with events still live). A completed tombstone settles
immediately: the planner emits no ops (its managed events stay as a
historical record), and the state file no longer needs to track a flight
that is done and out of active-flights.
"""
settled: set[int] = set()
for flight in tombstone_flights:
flight_id = flight["flight_id"]
ledger = states_by_id[flight_id].get("calendar_events") or {}
if flight["disposition"] == DISPOSITION_COMPLETED or not ledger:
settled.add(flight_id)
return settled
def run_reconcile(client, *, now: datetime) -> dict:
"""Run one calendar reconciliation cycle: active pass + tombstone sweep.
Returns a summary dict: the resolved calendar id, the op counts, the
number of teardown tombstones archived, and any per-op failures
(collected, not raised — a single failed Composio call defers that op to
the next cycle without aborting the rest).
"""
config = read_config() or {}
byair_calendar_id = resolve_byair_calendar_id(client, config)
if byair_calendar_id is None:
return {"status": "no_calendar", "planned": 0, "executed": 0, "failed": []}
active_ids = read_active_flights()
active_set = set(active_ids)
states_by_id: dict[int, dict] = {}
active_flights: list[dict] = []
for flight_id in active_ids:
state = read_flight_state(flight_id)
if state is None:
continue
states_by_id[flight_id] = state
active_flights.append(build_planner_flight(state, in_active_flights=True, now=now))
# Tombstone sweep — flights gone from active-flights that still hold a
# ledger. The per-flight wake loop never visits these, so the reconcile is
# the only place their managed events get torn down (#55).
tombstone_states, tombstone_flights = _gather_tombstones(active_set, now=now)
states_by_id.update(tombstone_states)
planner_flights = active_flights + tombstone_flights
if not planner_flights:
return {
"status": "no_flights",
"byair_calendar_id": byair_calendar_id,
"planned": 0,
"executed": 0,
"failed": [],
"archived": 0,
}
config_ids = {
"byair_calendar_id": byair_calendar_id,
"boarding_calendar_id": byair_calendar_id,
"reclaim_calendar_id": PRIMARY_CALENDAR_ID,
}
# Only the active pass consults live calendar events (boarding match,
# adopt-by-tag, Reclaim same-airport-gap). Teardown ops come purely off the
# ledger, so a tombstone needs no fetch — and may sit far outside the
# active window. Skip the fetch entirely when nothing is active.
events: list[dict] = []
if active_flights:
time_min, time_max = _fetch_window(active_flights)
events = collect_events(
client, byair_calendar_id=byair_calendar_id, time_min=time_min, time_max=time_max
)
ops = plan_reconciliation(planner_flights, events, config_ids)
executed = 0
failed: list[dict] = []
dirty: set[int] = set()
for op in ops:
flight_id = op["flight_id"]
state = states_by_id.get(flight_id)
if state is None:
# Reclaim deletes carry the downstream leg's flight_id; that state
# always exists here (built above), so this guards only against a
# planner change that emits an op for an untracked flight.
continue
ledger = state.setdefault("calendar_events", {})
try:
changed = _apply_op_to_ledger(op, ledger, client)
except (ComposioError, OSError) as exc:
print(
f"flight-assist reconcile: op {op['op']}/{op['kind']} for flight "
f"{flight_id} failed: {exc}",
file=sys.stderr,
)
failed.append({"flight_id": flight_id, "op": op["op"], "kind": op["kind"]})
continue
executed += 1
if changed:
dirty.add(flight_id)
# Archive tombstones whose teardown settled this cycle; persist the rest of
# the dirty set. A settled tombstone is deleted, not written back — its
# ledger is empty (or intentionally-retained completed record), so writing
# it just to delete it is wasted I/O.
archived_ids = _settled_tombstone_ids(tombstone_flights, states_by_id)
for flight_id in dirty - archived_ids:
write_flight_state(states_by_id[flight_id])
for flight_id in archived_ids:
delete_flight_state(flight_id)
return {
"status": "ok",
"byair_calendar_id": byair_calendar_id,
"planned": len(ops),
"executed": executed,
"failed": failed,
"archived": len(archived_ids),
}skills
check-travel-bookings
flight-assist
nightly-travel-sync
sync-tripit