CtrlK
BlogDocsLog inGet started
Tessl Logo

jbaruch/nanoclaw-flight-assist

Flight notifications via byAir: delay, gate, connection risk, inbound aircraft delay, time-to-leave, arrival logistics. NanoClaw per-chat overlay tile.

69

Quality

87%

Does it follow best practices?

Impact

No eval scenarios have been run

SecuritybySnyk

Passed

No known issues

Overview
Quality
Evals
Security
Files

precheck.pyskills/flight-assist/

#!/usr/bin/env python3
"""Flight-assist precheck script — the scheduler-invoked entry point.

The scheduled task runs this script every ~2 minutes. It reads the
active-flights index, cadence-gates per flight, queries byAir for
flights whose poll-interval has elapsed, runs wake_rules + phase_markers
on each, writes updated state, and emits a single-line JSON payload
on stdout:

    {"wake_agent": <bool>, "data": {"events": [...]}}

When `wake_agent` is true, the scheduler wakes the agent with the
events list as context; the agent composes user notifications.
When false, zero LLM tokens spent. Per `coding-policy:
script-delegation` "Precheck Gating".

The cadence ladder (per `state-schema.md`'s `last_polled_at`
discipline):

    scheduled, T > 6h:                    30 min
    scheduled, 2h < T ≤ 6h:               10 min
    check_in_open / boarding:             2 min
    departed / en_route, T-arr > 30 min:  5 min
    en_route, T-arr ≤ 30 min:             2 min  (catch carousel reveal)
    landed (until acknowledged):          5 min
    cancelled / diverted:                 60 min (visibility only)

The script is the OUTER PROCESS BOUNDARY of the scheduled-task
contract — the scheduler reads non-zero exit OR malformed stdout as
"skip waking the agent this cycle". An unhandled exception there
silently disables the contract per `coding-policy: error-handling`
outer-boundary-process-contract carve-out.
"""

from __future__ import annotations

import json
import os
import sys
import traceback
import urllib.error
from datetime import datetime, timedelta, timezone
from pathlib import Path

_BUNDLE_DIR = Path(__file__).resolve().parent
sys.path.insert(0, str(_BUNDLE_DIR))

from byair_client import ByAirClient, ByAirError  # noqa: E402
from connection_risk import (  # noqa: E402
    DEFAULT_MIN_TRANSFER_MINUTES,
    detect_connection_risks,
)
from maps_client import MapsClient, MapsError  # noqa: E402
from phase_markers import (  # noqa: E402
    check_arrival_logistics,
    check_day_before,
    check_time_to_leave,
)
from state import (  # noqa: E402
    read_active_flights,
    read_config,
    read_current_location,
    read_flight_state,
    write_flight_state,
)
from wake_rules import detect_wake_events  # noqa: E402

# Cadence ladder (minutes). Keyed by `computed_status`; the
# scheduled / en_route bins split further on time-to-departure or
# time-to-arrival inside `_minutes_until_next_poll`.
_BASE_CADENCE_MINUTES = {
    "scheduled": 30,
    "check_in_open": 2,
    "boarding": 2,
    "departed": 5,
    "en_route": 5,
    "landed": 5,
    "cancelled": 60,
    "diverted": 60,
}

# Window for querying maps_client for travel time. Past this window
# the time-to-leave marker has either fired or doesn't matter.
_TIME_TO_LEAVE_QUERY_WINDOW_HOURS = 6

# Maximum age of a `current-location.json` snapshot to be considered
# fresh enough to use as the time-to-leave origin. Older than this and
# the precheck falls back to `home_address`, on the principle that a
# stale location guess is worse than the user's static home base.
# Issue #18 suggested 15–30 min; 30 is the more permissive value and
# matches the orchestrator's typical location-write cadence on
# Telegram live-location sharing.
_MAX_CURRENT_LOCATION_AGE_MINUTES = 30


def main() -> int:
    # outer-boundary-process-contract: this script is the scheduled-task's
    # outermost process boundary. The scheduler reads non-zero exit OR
    # invalid stdout as "skip waking the agent". A bare programming bug
    # bubbling out would silently disable the wake contract for every
    # subsequent run, so the outermost catch emits a safe-shape JSON +
    # writes the traceback to stderr per error-handling.md's carve-out.
    try:
        events = _run_cycle(now_utc=datetime.now(timezone.utc))
        _emit({"wake_agent": bool(events), "data": {"events": events}})
        return 0
    except Exception:  # noqa: BLE001 — outer-boundary-process-contract
        traceback.print_exc(file=sys.stderr)
        _emit({"wake_agent": False, "data": {"error": "precheck_exception"}})
        return 0  # exit 0 with safe-shape JSON so the scheduler reads "no wake"


def _emit(payload: dict) -> None:
    """Write the precheck contract JSON to stdout (single line)."""
    print(json.dumps(payload, separators=(",", ":")))


def _run_cycle(*, now_utc: datetime) -> list[dict]:
    """Execute one precheck cycle, return aggregated wake events.

    `now_utc` is injected so tests can pin the clock without monkey-patching
    the `datetime` module (which breaks `fromisoformat()` deeper in the
    module). Production callers pass `datetime.now(timezone.utc)`.
    """
    active_flight_ids = read_active_flights()
    config = read_config() or {}
    home_address = config.get("home_address")
    min_transfer_minutes = _resolve_min_transfer_minutes(config)

    # Resolve the time-to-leave origin once per cycle so every flight
    # in this cycle queries Distance Matrix against the same snapshot.
    # The host-orchestrator-owned `current-location.json` could be
    # rewritten mid-cycle by a concurrent location update; reading it
    # once per `_process_flight` would let two flights in the same
    # cycle disagree on where the user is, which is incoherent. Per
    # Copilot review on `jbaruch/nanoclaw-flight-assist#19`.
    cycle_origin = _resolve_time_to_leave_origin(home_address=home_address, now_utc=now_utc)

    byair = ByAirClient.from_env()
    maps = _maybe_maps_client()  # None when GOOGLE_MAPS_API_KEY unset

    aggregated_events: list[dict] = []
    # Per `coding-policy: stateful-artifacts` — on-disk state is a
    # last-seen snapshot, not ground truth. Two distinct exclusion
    # categories feed the cross-flight pass's eligibility decision:
    #
    # - `removed_upstream_ids`: flights confirmed gone (byAir 404).
    #   Their stale snapshot must never produce a derived alert because
    #   we KNOW it's a lie.
    # - `poll_failed_ids`: flights whose poll was attempted this cycle
    #   and failed (non-404 byAir error, URLError transport failure).
    #   We can't verify the snapshot is current, and the rule says
    #   "before acting on a recalled value, verify against the live
    #   source" — failed verification means we don't act.
    #
    # Flights NOT due to poll this cycle keep their cadence-bounded
    # freshness contract and remain eligible — their snapshot is within
    # the cadence ladder's staleness budget by construction.
    removed_upstream_ids: set[int] = set()
    poll_failed_ids: set[int] = set()
    for flight_id in active_flight_ids:
        try:
            flight_events = _process_flight(
                flight_id=flight_id,
                now_utc=now_utc,
                byair=byair,
                maps=maps,
                time_to_leave_origin=cycle_origin,
            )
        except ByAirError as byair_err:
            # 404 on the byAir side means the flight is no longer
            # tracked upstream; surface as a removed_upstream event.
            if byair_err.error_type == "not_found":
                aggregated_events.append(
                    {"flight_id": flight_id, "event": {"reason": "removed_upstream"}}
                )
                removed_upstream_ids.add(flight_id)
                continue
            # Other byAir errors: log to stderr, skip this flight this
            # cycle (don't update last_polled_at so it retries next
            # cycle). The cycle attempted verification and failed, so
            # the flight is excluded from cross-flight derivations.
            print(
                f"flight-assist precheck: byair error for flight {flight_id}: {byair_err}",
                file=sys.stderr,
            )
            poll_failed_ids.add(flight_id)
            continue
        except urllib.error.URLError as transport_err:
            # Transient transport failure (network, DNS, byAir down).
            # Degrade this flight's poll for this cycle instead of
            # collapsing the whole precheck via the outer catch — other
            # flights' polls still get a chance. last_polled_at is not
            # updated, so the cadence-gate fires for this flight next
            # cycle. Per `coding-policy: error-handling` "Specific
            # Exceptions" + "Graceful Fallback". This cycle attempted
            # verification and failed, so the flight is excluded from
            # cross-flight derivations per `coding-policy:
            # stateful-artifacts` (verify before recall).
            print(
                f"flight-assist precheck: transport error for flight {flight_id}: {transport_err}",
                file=sys.stderr,
            )
            poll_failed_ids.add(flight_id)
            continue
        for event in flight_events:
            aggregated_events.append({"flight_id": flight_id, "event": event})

    # Connection-risk pass: walks the now-up-to-date on-disk state to
    # group flights by trip_id and emit cross-flight risk events. Excludes
    # both removed-upstream flights (snapshot known to lie) and
    # poll-failed flights (snapshot unverified this cycle) per
    # stateful-artifacts.
    excluded_ids = removed_upstream_ids | poll_failed_ids
    risk_candidate_ids = [fid for fid in active_flight_ids if fid not in excluded_ids]
    aggregated_events.extend(
        _check_connection_risks(
            active_flight_ids=risk_candidate_ids,
            now_utc=now_utc,
            min_transfer_minutes=min_transfer_minutes,
        )
    )
    return aggregated_events


def _check_connection_risks(
    *,
    active_flight_ids: list[int],
    now_utc: datetime,
    min_transfer_minutes: int,
) -> list[dict]:
    """Run the cross-flight connection-risk pass and persist fired markers.

    Returns the `{flight_id, event}`-shaped list ready to merge into the
    precheck's aggregated event output. For each fired event, this
    function flips the leg-2 flight's `connection_at_risk_fired` marker
    in state so subsequent cycles don't re-fire.
    """
    flight_states: list[dict] = []
    for fid in active_flight_ids:
        state = read_flight_state(fid)
        if state is not None:
            flight_states.append(state)
    risks = detect_connection_risks(
        flight_states=flight_states,
        now_utc=now_utc,
        min_transfer_minutes=min_transfer_minutes,
    )
    states_by_id = {s["flight_id"]: s for s in flight_states}
    emitted: list[dict] = []
    for leg2_flight_id, event in risks:
        leg2_state = states_by_id.get(leg2_flight_id)
        if leg2_state is None:
            continue
        leg2_state["phase_markers"]["connection_at_risk_fired"] = True
        write_flight_state(leg2_state)
        emitted.append({"flight_id": leg2_flight_id, "event": event})
    return emitted


def _resolve_min_transfer_minutes(config: dict) -> int:
    """Return the validated `min_transfer_minutes` from config, or the default.

    The on-disk config can be hand-edited; `write_config` rejects bad
    types and negative values but a manually-edited file with
    `"min_transfer_minutes": "45"` or `True` would slip past the writer.
    Coerce defensively here so a corrupt config doesn't propagate into
    `detect_connection_risks` and surface as the `ValueError` the public
    API raises on invalid input — which the outer-boundary catch would
    then suppress for the entire cycle. Falling back to the default keeps
    the cycle running while the stderr diagnostic flags the bad config
    for the operator.
    """
    value = config.get("min_transfer_minutes")
    if value is None:
        return DEFAULT_MIN_TRANSFER_MINUTES
    if not isinstance(value, int) or isinstance(value, bool) or value < 0:
        print(
            f"flight-assist precheck: config.json:min_transfer_minutes is "
            f"{type(value).__name__} {value!r}, expected non-negative int — "
            f"falling back to {DEFAULT_MIN_TRANSFER_MINUTES}",
            file=sys.stderr,
        )
        return DEFAULT_MIN_TRANSFER_MINUTES
    return value


def _maybe_maps_client() -> MapsClient | None:
    """Return a MapsClient if GOOGLE_MAPS_API_KEY is set, else None.

    Time-to-leave is the only consumer; if no key is configured the
    precheck still runs but won't fire that one event type.
    """
    if not os.environ.get("GOOGLE_MAPS_API_KEY"):
        return None
    return MapsClient.from_env()


def _process_flight(
    *,
    flight_id: int,
    now_utc: datetime,
    byair: ByAirClient,
    maps: MapsClient | None,
    time_to_leave_origin: str | None,
) -> list[dict]:
    """Process a single flight: cadence-gate, fetch, diff, emit events.

    `time_to_leave_origin` is resolved once per cycle by `_run_cycle`
    and passed in here, so every flight processed in the same cycle
    agrees on the user's location even when the host-orchestrator-owned
    `current-location.json` is rewritten mid-cycle.
    """
    prior_state = read_flight_state(flight_id)
    prior_snapshot = prior_state.get("last_snapshot") if prior_state else None
    phase_markers = prior_state.get("phase_markers") if prior_state else _initial_phase_markers()
    if phase_markers is None:
        phase_markers = _initial_phase_markers()

    if not _due_for_poll(prior_state, now_utc):
        return []

    raw_flight = byair.get_flight(flight_id=flight_id)
    new_snapshot = _trim_to_snapshot(raw_flight)

    events: list[dict] = []

    # Delta-driven events from wake_rules.
    events.extend(detect_wake_events(prior_snapshot, new_snapshot))

    # Time-based events from phase_markers.
    scheduled_dep_time = (
        prior_state["scheduled_dep_time"] if prior_state else raw_flight.get("scheduledDepTime")
    )
    scheduled_arr_time = (
        prior_state["scheduled_arr_time"] if prior_state else raw_flight.get("scheduledArrTime")
    )

    fired, event = check_day_before(
        scheduled_dep_time=scheduled_dep_time,
        phase_markers=phase_markers,
        now_utc=now_utc,
    )
    if fired:
        phase_markers["day_before_fired"] = True
        events.append(event)

    travel_time_seconds = _maybe_query_travel_time(
        maps=maps,
        origin=time_to_leave_origin,
        raw_flight=raw_flight,
        scheduled_dep_time=scheduled_dep_time,
        now_utc=now_utc,
        time_to_leave_already_fired=phase_markers.get("time_to_leave_fired", False),
    )
    fired, event = check_time_to_leave(
        scheduled_dep_time=scheduled_dep_time,
        travel_time_seconds=travel_time_seconds,
        phase_markers=phase_markers,
        now_utc=now_utc,
    )
    if fired:
        phase_markers["time_to_leave_fired"] = True
        events.append(event)

    fired, event = check_arrival_logistics(
        scheduled_arr_time=scheduled_arr_time,
        phase_markers=phase_markers,
        now_utc=now_utc,
    )
    if fired:
        phase_markers["arrival_logistics_fired"] = True
        events.append(event)

    # Persist updated state. write_flight_state requires every documented
    # required field; for a brand-new flight first seen on this cycle,
    # populate from the byAir payload.
    new_state = _build_flight_state(
        flight_id=flight_id,
        prior_state=prior_state,
        raw_flight=raw_flight,
        new_snapshot=new_snapshot,
        phase_markers=phase_markers,
        now_utc=now_utc,
    )
    write_flight_state(new_state)

    return events


def _due_for_poll(prior_state: dict | None, now_utc: datetime) -> bool:
    """Return True when the cadence-ladder interval has elapsed since last poll."""
    if prior_state is None:
        return True  # first cycle for this flight
    last_polled = _parse_iso8601(prior_state.get("last_polled_at"))
    if last_polled is None:
        return True
    snapshot = prior_state.get("last_snapshot") or {}
    status = snapshot.get("computed_status", "scheduled")
    scheduled_dep = _parse_iso8601(prior_state.get("scheduled_dep_time"))
    scheduled_arr = _parse_iso8601(prior_state.get("scheduled_arr_time"))
    interval_minutes = _interval_for(status, now_utc, scheduled_dep, scheduled_arr)
    return now_utc - last_polled >= timedelta(minutes=interval_minutes)


def _interval_for(
    status: str,
    now_utc: datetime,
    scheduled_dep: datetime | None,
    scheduled_arr: datetime | None,
) -> int:
    """Return the cadence interval (minutes) for a given (status, time)."""
    base = _BASE_CADENCE_MINUTES.get(status, 30)
    if status == "scheduled" and scheduled_dep is not None:
        time_to_dep = scheduled_dep - now_utc
        if time_to_dep <= timedelta(hours=6):
            return 10  # tightened to 10 min as departure approaches
        return 30
    if status in ("en_route", "departed") and scheduled_arr is not None:
        time_to_arr = scheduled_arr - now_utc
        if time_to_arr <= timedelta(minutes=30):
            return 2  # catch carousel reveal
    return base


def _resolve_time_to_leave_origin(
    *,
    home_address: str | None,
    now_utc: datetime,
) -> str | None:
    """Origin-resolution ladder for the time-to-leave Distance Matrix query.

    Order of preference:

    1. `current-location.json` (orchestrator-written) if present and the
       snapshot is fresh — `now_utc - captured_at <= _MAX_CURRENT_LOCATION_AGE_MINUTES`.
       Formatted as `"<lat>,<lng>"` for the Distance Matrix API, which
       accepts numeric origin/destination pairs natively.
    2. `home_address` from `config.json` (the legacy single-source).
    3. `None` — neither origin available; caller skips the maps query.

    A mobile user (constant traveler) gets correct travel times when
    the orchestrator is publishing live location; the static home
    address remains the fallback for users who never share location
    and for the gap between location updates. Issue
    `jbaruch/nanoclaw-flight-assist#18`.
    """
    loc = read_current_location()
    if loc is not None:
        captured = _parse_iso8601(loc.get("captured_at"))
        if captured is not None:
            age = now_utc - captured
            if timedelta() <= age <= timedelta(minutes=_MAX_CURRENT_LOCATION_AGE_MINUTES):
                return f"{loc['latitude']},{loc['longitude']}"
    return home_address


def _maybe_query_travel_time(
    *,
    maps: MapsClient | None,
    origin: str | None,
    raw_flight: dict,
    scheduled_dep_time: str | None,
    now_utc: datetime,
    time_to_leave_already_fired: bool,
) -> int | None:
    """Query maps_client for travel time, only when time-to-leave is close.

    Returns None when:
    - MapsClient is None (key unset)
    - `origin` is None (origin-resolution yielded nothing — neither
      a fresh `current-location.json` snapshot nor a configured
      `home_address` was available at cycle-start)
    - Scheduled departure is more than _TIME_TO_LEAVE_QUERY_WINDOW_HOURS away
    - time_to_leave has already fired (no need to re-query)

    `origin` is resolved once per cycle in `_run_cycle` and passed
    through; this function does NOT re-read `current-location.json`,
    so every flight in the same cycle agrees on the user's location.
    """
    if maps is None or time_to_leave_already_fired or not origin:
        return None
    dep_dt = _parse_iso8601(scheduled_dep_time)
    if dep_dt is None:
        return None
    if dep_dt - now_utc > timedelta(hours=_TIME_TO_LEAVE_QUERY_WINDOW_HOURS):
        return None
    dep_airport = raw_flight.get("depAirport", {})
    destination = dep_airport.get("name") or dep_airport.get("code") or ""
    if not destination:
        return None
    try:
        result = maps.travel_time(origin=origin, destination=destination)
    except MapsError as maps_err:
        print(
            f"flight-assist precheck: maps error for flight: {maps_err}",
            file=sys.stderr,
        )
        return None
    except urllib.error.URLError as transport_err:
        # Transient transport failure to Google (network, DNS, API down).
        # Degrade just this maps query — return None so time_to_leave
        # defers to the next cycle. Per `coding-policy: error-handling`
        # "Specific Exceptions" + "Graceful Fallback".
        print(
            f"flight-assist precheck: maps transport error: {transport_err}",
            file=sys.stderr,
        )
        return None
    return result.in_traffic_seconds or result.duration_seconds


def _trim_to_snapshot(raw_flight: dict) -> dict:
    """Filter the ~13KB byAir flight payload to the ~1KB operational slice.

    Matches the last_snapshot shape in state-schema.md.
    """
    inbound_raw = raw_flight.get("inbound") or {}
    inbound = {
        "aircraft_model": inbound_raw.get("aircraft_model"),
        "registration": inbound_raw.get("registration"),
        "flew": inbound_raw.get("flew"),
        "predicted_delay_minutes": _extract_predicted_delay_minutes(inbound_raw),
    }
    position = raw_flight.get("position", {}).get("currentPosition", {})
    return {
        "code": raw_flight.get("code"),
        "computed_status": raw_flight.get("computed_status"),
        "computed_status_detail": raw_flight.get("computed_status_detail"),
        "computed_phase_progress": raw_flight.get("computed_phase_progress"),
        "computed_phase_risk": raw_flight.get("computed_phase_risk"),
        "computed_phase_overdue": raw_flight.get("computed_phase_overdue"),
        "dep_gate": raw_flight.get("depGate"),
        "arr_gate": raw_flight.get("arrGate"),
        "dep_terminal": raw_flight.get("depTerminal"),
        "arr_terminal": raw_flight.get("arrTerminal"),
        "dep_time": raw_flight.get("depTime"),
        "arr_time": raw_flight.get("arrTime"),
        "baggage": raw_flight.get("baggage"),
        "inbound": inbound,
        "position_lat": position.get("lat"),
        "position_lon": position.get("lon"),
    }


def _extract_predicted_delay_minutes(inbound: dict) -> int | None:
    predicted = inbound.get("predicted_delay")
    if not isinstance(predicted, dict):
        return None
    value = predicted.get("delay_minutes")
    if not isinstance(value, int) or isinstance(value, bool):
        return None
    return value


def _build_flight_state(
    *,
    flight_id: int,
    prior_state: dict | None,
    raw_flight: dict,
    new_snapshot: dict,
    phase_markers: dict,
    now_utc: datetime,
) -> dict:
    """Construct a complete flight-state record per state-schema.md."""
    if prior_state is not None:
        scheduled_dep_time = prior_state["scheduled_dep_time"]
        scheduled_arr_time = prior_state["scheduled_arr_time"]
        ownership = prior_state["ownership"]
        trip_id = prior_state["trip_id"]
        dep_airport_id = prior_state["dep_airport_id"]
        arr_airport_id = prior_state["arr_airport_id"]
    else:
        scheduled_dep_time = raw_flight["scheduledDepTime"]
        scheduled_arr_time = raw_flight["scheduledArrTime"]
        ownership = raw_flight.get("ownership", "mine")
        trip_id = raw_flight.get("trip_id") or raw_flight.get("tripId") or 0
        dep_airport_id = raw_flight.get("depAirport", {}).get("id", 0)
        arr_airport_id = raw_flight.get("arrAirport", {}).get("id", 0)
    return {
        "flight_id": flight_id,
        "code": raw_flight.get("code", ""),
        "ownership": ownership,
        "trip_id": trip_id,
        "scheduled_dep_time": scheduled_dep_time,
        "scheduled_arr_time": scheduled_arr_time,
        "dep_airport_id": dep_airport_id,
        "arr_airport_id": arr_airport_id,
        "last_polled_at": now_utc.isoformat().replace("+00:00", "Z"),
        "last_snapshot": new_snapshot,
        "phase_markers": phase_markers,
        "last_wake_at": prior_state["last_wake_at"] if prior_state else None,
        "last_wake_reason": prior_state["last_wake_reason"] if prior_state else None,
    }


def _initial_phase_markers() -> dict:
    return {
        "day_before_fired": False,
        "time_to_leave_fired": False,
        "boarding_fired": False,
        "arrival_logistics_fired": False,
        "landed_acknowledged": False,
        "connection_at_risk_fired": False,
    }


def _parse_iso8601(value: str | None) -> datetime | None:
    if not value:
        return None
    try:
        parsed = datetime.fromisoformat(value.replace("Z", "+00:00"))
    except ValueError:
        return None
    if parsed.tzinfo is None:
        parsed = parsed.replace(tzinfo=timezone.utc)
    return parsed


if __name__ == "__main__":
    sys.exit(main())

README.md

tile.json