CtrlK
BlogDocsLog inGet started
Tessl Logo

jbaruch/nanoclaw-flight-assist

Flight notifications via byAir: delay, gate, connection risk, inbound aircraft delay, time-to-leave, arrival logistics. NanoClaw per-chat overlay tile.

69

Quality

87%

Does it follow best practices?

Impact

No eval scenarios have been run

SecuritybySnyk

Passed

No known issues

Overview
Quality
Evals
Security
Files

sync_tripit.pyskills/flight-assist/

#!/usr/bin/env python3
"""Daily sync of tracked flights from byAir into the active-flights index.

The precheck polls flights in `active-flights.json`. This script
refreshes that index from byAir's `list_trips` so newly-tracked
flights start getting polled and expired/removed flights stop.

Scheduled-task contract (same as precheck.py): emits a single-line
JSON payload `{"wake_agent": <bool>, "data": {...}}`. wake_agent is
true when the diff produced added/removed flights worth telling the
agent about; false otherwise. Per `coding-policy: script-delegation`
"Precheck Gating".

Uses the outer-boundary-process-contract carve-out for unexpected
exceptions per `coding-policy: error-handling`.

Run cadence: daily at ~04:00 local. The precheck script also calls
into sync logic opportunistically when it encounters a flight_id
not in state — both paths share `_reconcile_active_flights()`.

stdlib-only: `json`, `sys`, `traceback` per `coding-policy:
dependency-management`.
"""

from __future__ import annotations

import json
import sys
import traceback
import urllib.error
from datetime import datetime, timezone
from pathlib import Path

_BUNDLE_DIR = Path(__file__).resolve().parent
sys.path.insert(0, str(_BUNDLE_DIR))

from byair_client import ByAirClient, ByAirError  # noqa: E402
from state import (  # noqa: E402
    delete_flight_state,
    read_active_flights,
    read_flight_state,
    write_active_flights,
    write_flight_state,
)


def main() -> int:
    # outer-boundary-process-contract: same shape as precheck.py.
    # The scheduled task reads non-zero exit OR invalid stdout as
    # "skip waking the agent". A bare programming bug bubbling out
    # would silently disable the wake contract for every subsequent
    # run; catch it here, emit safe-shape JSON, return 0.
    try:
        diff = _run_sync(now_utc=datetime.now(timezone.utc))
        _emit_diff(diff)
        return 0
    except Exception:  # noqa: BLE001 — outer-boundary-process-contract
        traceback.print_exc(file=sys.stderr)
        _emit_diff({"added": [], "removed": [], "error": "sync_exception"})
        return 0


def _emit_diff(diff: dict) -> None:
    """Write the sync-contract JSON to stdout (single line).

    Wraps the added/removed lists into the same `data.events` shape
    the precheck script emits, so SKILL.md Step 3's composition
    table is the single consumer contract — no separate sync-wake
    payload to document. Per `coding-policy: script-delegation`
    "Precheck Gating": data must carry the inputs the agent needs.
    """
    events = []
    for entry in diff.get("added", []) or []:
        events.append(
            {
                "flight_id": entry["flight_id"],
                "event": {
                    "reason": "tracked_flight_added",
                    "code": entry.get("code"),
                    "scheduled_dep_time": entry.get("scheduled_dep_time"),
                    "scheduled_arr_time": entry.get("scheduled_arr_time"),
                },
            }
        )
    for entry in diff.get("removed", []) or []:
        events.append(
            {
                "flight_id": entry["flight_id"],
                "event": {
                    "reason": "tracked_flight_removed",
                    "code": entry.get("code"),
                    "scheduled_dep_time": entry.get("scheduled_dep_time"),
                    "scheduled_arr_time": entry.get("scheduled_arr_time"),
                },
            }
        )
    has_events = bool(events)
    payload = {"wake_agent": has_events, "data": {"events": events}}
    if "error" in diff:
        payload["data"]["error"] = diff["error"]
    print(json.dumps(payload, separators=(",", ":")))


def _run_sync(*, now_utc: datetime) -> dict:
    """Execute one sync pass, returning the {added, removed} diff."""
    byair = ByAirClient.from_env()
    try:
        trips_payload = byair.list_trips(status="active")
    except urllib.error.URLError as transport_err:
        # Transport failure: skip this sync pass. The next scheduled
        # run will retry. Per `coding-policy: error-handling`
        # "Specific Exceptions" + "Graceful Fallback".
        print(
            f"flight-assist sync: transport error: {transport_err}",
            file=sys.stderr,
        )
        return {"added": [], "removed": [], "error": "transport_error"}
    except ByAirError as byair_err:
        print(
            f"flight-assist sync: byair error: {byair_err}",
            file=sys.stderr,
        )
        return {"added": [], "removed": [], "error": byair_err.error_type}

    upstream_flights = _extract_flights(trips_payload)
    return _reconcile_active_flights(upstream_flights, now_utc=now_utc)


def _extract_flights(trips_payload: dict) -> list[dict]:
    """Pull every flight dict out of the trips response into a flat list."""
    flights: list[dict] = []
    for trip in trips_payload.get("trips", []) or []:
        for flight in trip.get("flights", []) or []:
            # Carry the trip_id into the flight dict so the writer has it.
            flight = {**flight, "_trip_id": trip.get("id")}
            flights.append(flight)
    return flights


def _reconcile_active_flights(upstream_flights: list[dict], *, now_utc: datetime) -> dict:
    """Diff the upstream list against the on-disk active-flights index.

    Returns `{added: [flight_id, ...], removed: [flight_id, ...]}`.

    For each `added` flight, writes an initial per-flight state record
    so the next precheck cycle has scheduled times to work with.
    For each `removed` flight, deletes its state file.
    """
    upstream_ids = {
        flight["id"] for flight in upstream_flights if isinstance(flight.get("id"), int)
    }
    current_ids = set(read_active_flights())

    added_ids = upstream_ids - current_ids
    removed_ids = current_ids - upstream_ids

    upstream_by_id = {
        flight["id"]: flight for flight in upstream_flights if isinstance(flight.get("id"), int)
    }

    # Build added entries with the upstream flight metadata.
    added: list[dict] = []
    for flight_id in sorted(added_ids):
        flight = upstream_by_id[flight_id]
        write_flight_state(_initial_state(flight, now_utc=now_utc))
        added.append(
            {
                "flight_id": flight_id,
                "code": flight.get("code"),
                "scheduled_dep_time": flight.get("scheduledDepTime"),
                "scheduled_arr_time": flight.get("scheduledArrTime"),
            }
        )

    # Capture removed-flight metadata BEFORE deleting state (the
    # notification template in SKILL.md needs `code` + scheduled times
    # to compose "Flight <code> stopped tracking...").
    removed: list[dict] = []
    for flight_id in sorted(removed_ids):
        prior = read_flight_state(flight_id)
        removed.append(
            {
                "flight_id": flight_id,
                "code": prior.get("code") if prior else None,
                "scheduled_dep_time": prior.get("scheduled_dep_time") if prior else None,
                "scheduled_arr_time": prior.get("scheduled_arr_time") if prior else None,
            }
        )
        delete_flight_state(flight_id)

    # Persist the new active-flights index. read_active_flights returns
    # sorted/preserved order; here we sort for determinism.
    write_active_flights(sorted(upstream_ids))

    return {"added": added, "removed": removed}


def _initial_state(flight: dict, *, now_utc: datetime) -> dict:
    """Build the initial state record for a newly-tracked flight.

    All required fields per state-schema.md must be present so
    `write_flight_state`'s validator accepts it. `last_snapshot` is
    None on first write — the next precheck cycle populates it.
    """
    dep_airport = flight.get("depAirport") or {}
    arr_airport = flight.get("arrAirport") or {}
    return {
        "flight_id": flight["id"],
        "code": flight.get("code", ""),
        "ownership": flight.get("ownership", "mine"),
        "trip_id": flight.get("_trip_id") or 0,
        "scheduled_dep_time": flight.get("scheduledDepTime", ""),
        "scheduled_arr_time": flight.get("scheduledArrTime", ""),
        "dep_airport_id": dep_airport.get("id", 0),
        "arr_airport_id": arr_airport.get("id", 0),
        "last_polled_at": now_utc.isoformat().replace("+00:00", "Z"),
        "last_snapshot": None,
        "phase_markers": {
            "day_before_fired": False,
            "time_to_leave_fired": False,
            "boarding_fired": False,
            "arrival_logistics_fired": False,
            "landed_acknowledged": False,
            "connection_at_risk_fired": False,
        },
        "last_wake_at": None,
        "last_wake_reason": None,
    }


# Public entry point for the precheck to call when it discovers a
# flight_id not on the active-flights index. The precheck doesn't
# need the full sync — just the per-flight state initialization.
def initialize_flight_from_byair(*, flight: dict, now_utc: datetime) -> None:
    """Write the initial state record for a flight first seen via the precheck.

    Tolerates either `id` (byair_get_flight raw shape) or `flight_id`
    (precheck's internal shape) as the integer identifier. Normalizes
    to `id` before calling `_initial_state` so the downstream lookup
    doesn't KeyError on flight_id-only payloads.
    """
    flight_id = flight.get("id")
    if not isinstance(flight_id, int) or isinstance(flight_id, bool):
        flight_id = flight.get("flight_id")
        if not isinstance(flight_id, int) or isinstance(flight_id, bool):
            return
    normalized = {**flight, "id": flight_id}
    write_flight_state(_initial_state(normalized, now_utc=now_utc))


# Provide read_flight_state as a re-export so callers can do the
# "is this already known?" check without importing state directly.
__all__ = [
    "initialize_flight_from_byair",
    "read_flight_state",
]


if __name__ == "__main__":
    sys.exit(main())

README.md

tile.json