CtrlK
BlogDocsLog inGet started
Tessl Logo

jbaruch/nanoclaw-travel

Travel assistant for NanoClaw: byAir flight notifications (delay, gate, connection risk, inbound aircraft delay, time-to-leave, arrival logistics), travel-booking gap checks, and nightly TripIt sync. Per-chat overlay tile.

75

Quality

93%

Does it follow best practices?

Impact

No eval scenarios have been run

SecuritybySnyk

Advisory

Suggest reviewing before use

Overview
Quality
Evals
Security
Files

calendar_plan.pyskills/flight-assist/

"""Pure reconciliation planner for flight-assist calendar events.

Given the current per-flight state (the `calendar_events` ledger + flight
facts) and a normalized snapshot of what is currently on the relevant
Google Calendars, compute the set of create / update / delete / adopt
operations that converge the calendar to the desired state. The planner
is a pure function: no network, no clock reads, no I/O. The caller
(`reconcile` script, PR3) fetches the events via Composio, runs this
planner, executes the returned ops, and writes the resulting event IDs
back into the ledger.

Everything deterministic lives here so it is unit-testable in CI without
any external service, per `coding-policy: script-delegation`.

byAir is the tile's single flight-data upstream and also the app that
writes the flight events onto the user's writable calendar (per
`rules/flight-data-locality.md`). TripIt's iCal feed is read-only and
never touched. Reclaim layers travel-time blocks on top.

Three classes of managed event, distinguished by which calendar they
live on (classification is by calendar ID, never summary regex):

  - boarding — flight-assist CREATES this (boarding-start -> departure)
    on the byAir calendar. `managed == "created"`; full lifecycle.
  - flight   — byAir CREATES this; flight-assist ADOPTS it by tagging,
    then shifts it delta-only and deletes it on a true cancel/switch.
    `managed == "adopted"`.
  - reclaim_travel — Reclaim CREATES this; flight-assist DELETES the
    bogus ones in a same-airport layover gap (positional rule). Gated on
    BOTH the Reclaim calendar ID and an `is_reclaim_travel` flag, since
    Reclaim's calendar also holds habit / focus / task blocks. Never
    tracked in the ledger (we delete, we do not own).

Ledger entry shape (see `state-schema.md`): per kind, a dict with
`event_id`, `calendar_id`, `managed`, `synced_signature`. The signature
is the `<start>/<end>` instant pair (normalized UTC) flight-assist last
wrote, so a reconcile is a no-op when the live event already matches.

stdlib-only (`datetime`) per `coding-policy: dependency-management`.
"""

from __future__ import annotations

from datetime import datetime, timedelta, timezone

# Event-kind tags stamped into `extendedProperties.private`.
TAG_FLIGHT_ID = "faFlightId"
TAG_KIND = "faKind"
TAG_MANAGED = "faManaged"

KIND_BOARDING = "boarding"
KIND_FLIGHT = "flight"
KIND_RECLAIM_TRAVEL = "reclaim_travel"

MANAGED_CREATED = "created"
MANAGED_ADOPTED = "adopted"

# A flight is "active" and reconciled normally; any other disposition
# tears its managed events down (or leaves them, for a completed flight).
# The caller classifies disposition — it needs wall-clock + active-flights
# membership, which stay out of this pure planner.
DISPOSITION_ACTIVE = "active"
DISPOSITION_CANCELLED = "cancelled"
DISPOSITION_DIVERTED = "diverted"
DISPOSITION_SWITCHED_AWAY = "switched_away"
DISPOSITION_COMPLETED = "completed"

_TEARDOWN_DISPOSITIONS = frozenset(
    {DISPOSITION_CANCELLED, DISPOSITION_DIVERTED, DISPOSITION_SWITCHED_AWAY}
)

# How far apart a byAir-authored flight event's start may be from the
# flight's departure and still be considered the same leg when adopting.
# Wide enough to absorb a delay byAir already baked into the event before
# flight-assist first saw it; narrow enough not to collide with the next
# day's same-numbered flight. Same-code candidates are further tie-broken
# by closest start, so this is an outer bound, not the match precision.
ADOPT_MATCH_TOLERANCE_MINUTES = 360


class PlanError(ValueError):
    """Raised when a flight or event input is missing a field the planner needs.

    A ValueError subclass: the caller's recovery is "pass a well-formed
    input", not "retry". The reconcile script validates Composio output
    into the normalized shape before calling the planner, so a PlanError
    signals a caller bug, not bad calendar data.
    """


def _to_instant(value: str, *, field: str) -> datetime:
    """Parse an RFC 3339 string to a timezone-aware UTC datetime.

    Accepts a trailing `Z` or an explicit offset. Raises PlanError on a
    naive or unparseable value — every time the planner compares must be
    an unambiguous instant, so a missing offset is a hard error rather
    than a silent local-time assumption.
    """
    try:
        parsed = datetime.fromisoformat(value.replace("Z", "+00:00"))
    except (ValueError, AttributeError) as exc:
        raise PlanError(f"{field} is not an RFC 3339 datetime: {value!r}") from exc
    if parsed.tzinfo is None:
        raise PlanError(f"{field} is missing a UTC offset: {value!r}")
    return parsed.astimezone(timezone.utc)


def _signature(start: str, end: str) -> str:
    """Canonical `<start>/<end>` signature in normalized UTC seconds.

    Offset-agnostic: the same instant written with `+00:00` or a local
    offset produces the same signature, so a re-serialization with a
    different-but-equal offset never reads as a change.
    """
    start_utc = _to_instant(start, field="start").strftime("%Y-%m-%dT%H:%M:%SZ")
    end_utc = _to_instant(end, field="end").strftime("%Y-%m-%dT%H:%M:%SZ")
    return f"{start_utc}/{end_utc}"


def _require(mapping: dict, key: str, *, where: str):
    if key not in mapping:
        raise PlanError(f"{where} is missing required field {key!r}")
    return mapping[key]


def _event_signature(event: dict) -> str:
    return _signature(
        _require(event, "start", where=f"event {event.get('event_id')}"),
        _require(event, "end", where=f"event {event.get('event_id')}"),
    )


def _boarding_window(flight: dict) -> tuple[str, str]:
    """Desired boarding block: [departure - lead, departure], in dep's offset.

    Keeps the departure time's original UTC offset so the event a human
    sees is phrased in the airport's local time, not normalized UTC.
    """
    dep_raw = _require(flight, "dep_time", where=f"flight {flight.get('flight_id')}")
    # Parse preserving the original offset (not normalized to UTC) so the
    # written event stays in local airport time.
    dep_local = datetime.fromisoformat(dep_raw.replace("Z", "+00:00"))
    if dep_local.tzinfo is None:
        raise PlanError(f"flight {flight.get('flight_id')} dep_time missing offset: {dep_raw!r}")
    lead = _require(flight, "boarding_lead_minutes", where=f"flight {flight.get('flight_id')}")
    start_local = dep_local - timedelta(minutes=int(lead))
    return start_local.isoformat(), dep_local.isoformat()


def _make_op(*, op, kind, flight_id, calendar_id, reason, event_id=None, body=None, signature=None):
    return {
        "op": op,
        "kind": kind,
        "flight_id": flight_id,
        "calendar_id": calendar_id,
        "event_id": event_id,
        "body": body,
        "signature": signature,
        "reason": reason,
    }


def _boarding_body(flight: dict, start: str, end: str) -> dict:
    return {
        "summary": f"Boarding {flight['code']}",
        "start": start,
        "end": end,
        "private_props": {
            TAG_FLIGHT_ID: str(flight["flight_id"]),
            TAG_KIND: KIND_BOARDING,
            TAG_MANAGED: MANAGED_CREATED,
        },
    }


def _plan_boarding(flight: dict, events_by_id: dict, config: dict) -> list[dict]:
    """Reconcile the flight-assist-created boarding block for one active flight."""
    flight_id = flight["flight_id"]
    cal = _require(config, "boarding_calendar_id", where="config")
    start, end = _boarding_window(flight)
    desired_sig = _signature(start, end)
    body = _boarding_body(flight, start, end)
    ledger = flight.get("calendar_events", {}) or {}
    entry = ledger.get(KIND_BOARDING)

    if not entry:
        return [
            _make_op(
                op="create",
                kind=KIND_BOARDING,
                flight_id=flight_id,
                calendar_id=cal,
                body=body,
                signature=desired_sig,
                reason=f"no boarding block tracked for {flight['code']}; create at {start}",
            )
        ]

    live = events_by_id.get(entry["event_id"])
    if live is None:
        # Tracked but gone from the calendar (deleted out of band) — recreate.
        return [
            _make_op(
                op="create",
                kind=KIND_BOARDING,
                flight_id=flight_id,
                calendar_id=cal,
                body=body,
                signature=desired_sig,
                reason=f"tracked boarding block {entry['event_id']} missing; recreate",
            )
        ]

    if _event_signature(live) == desired_sig and entry.get("synced_signature") == desired_sig:
        return []  # delta-only: live event already matches desired window

    return [
        _make_op(
            op="update",
            kind=KIND_BOARDING,
            flight_id=flight_id,
            calendar_id=entry["calendar_id"],
            event_id=entry["event_id"],
            body=body,
            signature=desired_sig,
            reason=f"shift boarding block for {flight['code']} to {start}",
        )
    ]


def _strip_whitespace(value: str) -> str:
    """Collapse all whitespace out of a string for tolerant code matching.

    A flight event's summary may render the code with a space (`UA 8018`)
    while byAir's `code` field carries it unspaced (`UA8018`), or vice
    versa, so a raw substring match misses. Stripping all whitespace from
    both sides makes the comparison spacing-agnostic.
    """
    return "".join(value.split())


def _match_byair_event(flight: dict, events: list[dict], config: dict) -> dict | None:
    """Find the byAir-authored flight event for this leg, to adopt it.

    Candidates: events on the byAir calendar whose summary contains the
    flight code (whitespace-insensitively) and whose start is within
    ADOPT_MATCH_TOLERANCE_MINUTES of the flight's departure. The closest by
    start wins, so two legs sharing a code on different days do not
    cross-match. Already-tagged events are skipped — adoption keys off the
    ledger after the first match, never by re-matching a moved event.
    """
    byair_cal = _require(config, "byair_calendar_id", where="config")
    dep = _to_instant(
        _require(flight, "dep_time", where=f"flight {flight['flight_id']}"), field="dep_time"
    )
    code = _strip_whitespace(flight["code"])
    best = None
    best_delta = None
    for event in events:
        if event.get("calendar_id") != byair_cal:
            continue
        if code not in _strip_whitespace(event.get("summary") or ""):
            continue
        if (event.get("private_props") or {}).get(TAG_FLIGHT_ID):
            continue  # already tagged/owned — not an adoption candidate
        start = _to_instant(
            _require(event, "start", where=f"event {event.get('event_id')}"), field="start"
        )
        delta = abs((start - dep).total_seconds()) / 60.0
        if delta > ADOPT_MATCH_TOLERANCE_MINUTES:
            continue
        if best_delta is None or delta < best_delta:
            best, best_delta = event, delta
    return best


def _plan_flight_event(
    flight: dict, events: list[dict], events_by_id: dict, config: dict
) -> list[dict]:
    """Adopt and delta-shift the byAir-authored flight event for one active flight."""
    flight_id = flight["flight_id"]
    dep = _require(flight, "dep_time", where=f"flight {flight_id}")
    arr = _require(flight, "arr_time", where=f"flight {flight_id}")
    desired_sig = _signature(dep, arr)
    ledger = flight.get("calendar_events", {}) or {}
    entry = ledger.get(KIND_FLIGHT)

    if not entry:
        match = _match_byair_event(flight, events, config)
        if match is None:
            return []  # byAir has not written the event yet, or it is elsewhere
        return [
            _make_op(
                op="adopt",
                kind=KIND_FLIGHT,
                flight_id=flight_id,
                calendar_id=match["calendar_id"],
                event_id=match["event_id"],
                body={
                    "private_props": {
                        TAG_FLIGHT_ID: str(flight_id),
                        TAG_KIND: KIND_FLIGHT,
                        TAG_MANAGED: MANAGED_ADOPTED,
                    }
                },
                signature=_event_signature(match),
                reason=f"adopt byAir flight event {match['event_id']} for {flight['code']}",
            )
        ]

    live = events_by_id.get(entry["event_id"])
    if live is None:
        # Adopted event vanished from the calendar — byAir owns its
        # existence, so drop our tombstone rather than recreating it.
        return [
            _make_op(
                op="forget",
                kind=KIND_FLIGHT,
                flight_id=flight_id,
                calendar_id=entry["calendar_id"],
                event_id=entry["event_id"],
                reason=f"adopted flight event {entry['event_id']} gone; stop tracking",
            )
        ]

    if _event_signature(live) == desired_sig:
        return []  # byAir already shifted it (or we did) — no-op

    return [
        _make_op(
            op="update",
            kind=KIND_FLIGHT,
            flight_id=flight_id,
            calendar_id=entry["calendar_id"],
            event_id=entry["event_id"],
            body={"start": dep, "end": arr},
            signature=desired_sig,
            reason=f"byAir left {flight['code']} event stale; shift to {dep}",
        )
    ]


def _plan_reclaim_deletions(
    trip_flights: list[dict], events: list[dict], config: dict
) -> list[dict]:
    """Delete bogus Reclaim travel blocks in same-airport layover gaps.

    Positional rule: between leg-N and leg-(N+1) of one trip, a same
    departure/arrival airport means no ground transfer, so a Reclaim
    travel block sitting in that gap is bogus. Different airports mean a
    real inter-airport transfer — keep it. Travel before the first leg
    and after the last leg is never in a gap, so it is never deleted.

    Two guards bound every delete: the event must be on the Reclaim
    calendar AND carry `is_reclaim_travel`. Reclaim's calendar also holds
    habit / focus / task blocks, so calendar membership alone is not a
    safe delete signal — `is_reclaim_travel` is set upstream by the
    reconcile script's Reclaim classifier, and the planner refuses to
    delete an unflagged event. User events on other calendars are never
    candidates at all.
    """
    reclaim_cal = _require(config, "reclaim_calendar_id", where="config")
    legs = sorted(
        trip_flights,
        key=lambda f: _to_instant(
            _require(f, "dep_time", where=f"flight {f.get('flight_id')}"), field="dep_time"
        ),
    )
    ops: list[dict] = []
    for leg_n, leg_next in zip(legs, legs[1:]):
        where_n = f"flight {leg_n.get('flight_id')}"
        where_next = f"flight {leg_next.get('flight_id')}"
        if _require(leg_n, "arr_airport_id", where=where_n) != _require(
            leg_next, "dep_airport_id", where=where_next
        ):
            continue  # different airports -> legitimate transfer, keep
        gap_start = _to_instant(_require(leg_n, "arr_time", where=where_n), field="arr_time")
        gap_end = _to_instant(_require(leg_next, "dep_time", where=where_next), field="dep_time")
        for event in events:
            if event.get("calendar_id") != reclaim_cal:
                continue
            if not event.get("is_reclaim_travel"):
                continue  # Reclaim calendar also holds habit/focus/task blocks — never delete those
            ev_start = _to_instant(
                _require(event, "start", where=f"event {event.get('event_id')}"), field="start"
            )
            ev_end = _to_instant(
                _require(event, "end", where=f"event {event.get('event_id')}"), field="end"
            )
            if ev_start >= gap_start and ev_end <= gap_end:
                ops.append(
                    _make_op(
                        op="delete",
                        kind=KIND_RECLAIM_TRAVEL,
                        flight_id=_require(leg_next, "flight_id", where=where_next),
                        calendar_id=reclaim_cal,
                        event_id=event["event_id"],
                        reason=(
                            f"Reclaim travel block in same-airport layover "
                            f"({_require(leg_n, 'code', where=where_n)} -> "
                            f"{_require(leg_next, 'code', where=where_next)}); delete"
                        ),
                    )
                )
    return ops


def _plan_teardown(flight: dict) -> list[dict]:
    """Delete flight-assist-managed events for a cancelled / switched flight."""
    flight_id = flight["flight_id"]
    ledger = flight.get("calendar_events", {}) or {}
    ops: list[dict] = []

    boarding = ledger.get(KIND_BOARDING)
    if boarding:
        ops.append(
            _make_op(
                op="delete",
                kind=KIND_BOARDING,
                flight_id=flight_id,
                calendar_id=boarding["calendar_id"],
                event_id=boarding["event_id"],
                reason=f"{flight['code']} {flight['disposition']}; delete boarding block",
            )
        )

    adopted = ledger.get(KIND_FLIGHT)
    if adopted:
        # byAir leaves the stale event behind on a switch/cancel; ayeaye
        # takes over and removes it. Only acts on the event it adopted.
        ops.append(
            _make_op(
                op="delete",
                kind=KIND_FLIGHT,
                flight_id=flight_id,
                calendar_id=adopted["calendar_id"],
                event_id=adopted["event_id"],
                reason=f"{flight['code']} {flight['disposition']}; remove stale byAir flight event",
            )
        )
    return ops


def plan_reconciliation(flights: list[dict], events: list[dict], config: dict) -> list[dict]:
    """Compute the calendar reconciliation plan for the given flights.

    Pure function. `flights` are caller-derived per-flight inputs (flight
    facts + the `calendar_events` ledger + a `disposition` + a resolved
    `boarding_lead_minutes`); `events` is the normalized list of current
    calendar events across the byAir and Reclaim calendars; `config`
    carries the calendar IDs only. The boarding-lead policy (aircraft size
    + TATL/TPAC) is resolved upstream by `boarding_lead.py` and arrives
    here as a per-flight integer, so the volatile policy stays out of the
    planner. Returns an ordered op list — see module docstring for op
    shapes. The caller executes them and writes event IDs back to the
    ledger.

    Order: teardown deletes first, then per-active-flight boarding and
    flight-event ops, then per-trip Reclaim deletions. Deterministic given
    deterministic inputs (flights are processed in input order; trips in
    first-seen order).
    """
    events_by_id = {e["event_id"]: e for e in events if e.get("event_id")}
    ops: list[dict] = []

    active: list[dict] = []
    for flight in flights:
        disposition = _require(flight, "disposition", where=f"flight {flight.get('flight_id')}")
        if disposition == DISPOSITION_ACTIVE:
            active.append(flight)
        elif disposition in _TEARDOWN_DISPOSITIONS:
            ops.extend(_plan_teardown(flight))
        # DISPOSITION_COMPLETED: leave managed events as a record; no ops.

    for flight in active:
        ops.extend(_plan_boarding(flight, events_by_id, config))
        ops.extend(_plan_flight_event(flight, events, events_by_id, config))

    # Reclaim deletions are per-trip over active legs (a cancelled leg no
    # longer shapes the layover). Group by trip_id, first-seen order.
    trips: dict[int, list[dict]] = {}
    for flight in active:
        trips.setdefault(
            _require(flight, "trip_id", where=f"flight {flight['flight_id']}"), []
        ).append(flight)
    for trip_flights in trips.values():
        if len(trip_flights) >= 2:
            ops.extend(_plan_reclaim_deletions(trip_flights, events, config))

    return ops

CHANGELOG.md

README.md

tile.json