Flight notifications via byAir: delay, gate, connection risk, inbound aircraft delay, time-to-leave, arrival logistics. NanoClaw per-chat overlay tile.
69
87%
Does it follow best practices?
Impact
—
No eval scenarios have been run
Passed
No known issues
#!/usr/bin/env python3
"""Flight-assist precheck script — the scheduler-invoked entry point.
The scheduled task runs this script every ~2 minutes. It reads the
active-flights index, cadence-gates per flight, queries byAir for
flights whose poll-interval has elapsed, runs wake_rules + phase_markers
on each, writes updated state, and emits a single-line JSON payload
on stdout:
{"wake_agent": <bool>, "data": {"events": [...]}}
When `wake_agent` is true, the scheduler wakes the agent with the
events list as context; the agent composes user notifications.
When false, zero LLM tokens spent. Per `coding-policy:
script-delegation` "Precheck Gating".
The cadence ladder (per `state-schema.md`'s `last_polled_at`
discipline):
scheduled, T > 6h: 30 min
scheduled, 2h < T ≤ 6h: 10 min
check_in_open / boarding: 2 min
departed / en_route, T-arr > 30 min: 5 min
en_route, T-arr ≤ 30 min: 2 min (catch carousel reveal)
landed (until acknowledged): 5 min
cancelled / diverted: 60 min (visibility only)
The script is the OUTER PROCESS BOUNDARY of the scheduled-task
contract — the scheduler reads non-zero exit OR malformed stdout as
"skip waking the agent this cycle". An unhandled exception there
silently disables the contract per `coding-policy: error-handling`
outer-boundary-process-contract carve-out.
"""
from __future__ import annotations
import json
import os
import sys
import traceback
import urllib.error
from datetime import datetime, timedelta, timezone
from pathlib import Path
_BUNDLE_DIR = Path(__file__).resolve().parent
sys.path.insert(0, str(_BUNDLE_DIR))
from byair_client import ByAirClient, ByAirError # noqa: E402
from connection_risk import ( # noqa: E402
DEFAULT_MIN_TRANSFER_MINUTES,
detect_connection_risks,
)
from maps_client import MapsClient, MapsError # noqa: E402
from phase_markers import ( # noqa: E402
check_arrival_logistics,
check_day_before,
check_time_to_leave,
)
from state import ( # noqa: E402
read_active_flights,
read_config,
read_current_location,
read_flight_state,
write_flight_state,
)
from wake_rules import detect_wake_events # noqa: E402
# Cadence ladder (minutes). Keyed by `computed_status`; the
# scheduled / en_route bins split further on time-to-departure or
# time-to-arrival inside `_minutes_until_next_poll`.
_BASE_CADENCE_MINUTES = {
"scheduled": 30,
"check_in_open": 2,
"boarding": 2,
"departed": 5,
"en_route": 5,
"landed": 5,
"cancelled": 60,
"diverted": 60,
}
# Window for querying maps_client for travel time. Past this window
# the time-to-leave marker has either fired or doesn't matter.
_TIME_TO_LEAVE_QUERY_WINDOW_HOURS = 6
# Maximum age of a `current-location.json` snapshot to be considered
# fresh enough to use as the time-to-leave origin. Older than this and
# the precheck falls back to `home_address`, on the principle that a
# stale location guess is worse than the user's static home base.
# Issue #18 suggested 15–30 min; 30 is the more permissive value and
# matches the orchestrator's typical location-write cadence on
# Telegram live-location sharing.
_MAX_CURRENT_LOCATION_AGE_MINUTES = 30
def main() -> int:
# outer-boundary-process-contract: this script is the scheduled-task's
# outermost process boundary. The scheduler reads non-zero exit OR
# invalid stdout as "skip waking the agent". A bare programming bug
# bubbling out would silently disable the wake contract for every
# subsequent run, so the outermost catch emits a safe-shape JSON +
# writes the traceback to stderr per error-handling.md's carve-out.
try:
events = _run_cycle(now_utc=datetime.now(timezone.utc))
_emit({"wake_agent": bool(events), "data": {"events": events}})
return 0
except Exception: # noqa: BLE001 — outer-boundary-process-contract
traceback.print_exc(file=sys.stderr)
_emit({"wake_agent": False, "data": {"error": "precheck_exception"}})
return 0 # exit 0 with safe-shape JSON so the scheduler reads "no wake"
def _emit(payload: dict) -> None:
"""Write the precheck contract JSON to stdout (single line)."""
print(json.dumps(payload, separators=(",", ":")))
def _run_cycle(*, now_utc: datetime) -> list[dict]:
"""Execute one precheck cycle, return aggregated wake events.
`now_utc` is injected so tests can pin the clock without monkey-patching
the `datetime` module (which breaks `fromisoformat()` deeper in the
module). Production callers pass `datetime.now(timezone.utc)`.
"""
active_flight_ids = read_active_flights()
config = read_config() or {}
home_address = config.get("home_address")
min_transfer_minutes = _resolve_min_transfer_minutes(config)
# Resolve the time-to-leave origin once per cycle so every flight
# in this cycle queries Distance Matrix against the same snapshot.
# The host-orchestrator-owned `current-location.json` could be
# rewritten mid-cycle by a concurrent location update; reading it
# once per `_process_flight` would let two flights in the same
# cycle disagree on where the user is, which is incoherent. Per
# Copilot review on `jbaruch/nanoclaw-flight-assist#19`.
cycle_origin = _resolve_time_to_leave_origin(home_address=home_address, now_utc=now_utc)
byair = ByAirClient.from_env()
maps = _maybe_maps_client() # None when GOOGLE_MAPS_API_KEY unset
aggregated_events: list[dict] = []
# Per `coding-policy: stateful-artifacts` — on-disk state is a
# last-seen snapshot, not ground truth. Two distinct exclusion
# categories feed the cross-flight pass's eligibility decision:
#
# - `removed_upstream_ids`: flights confirmed gone (byAir 404).
# Their stale snapshot must never produce a derived alert because
# we KNOW it's a lie.
# - `poll_failed_ids`: flights whose poll was attempted this cycle
# and failed (non-404 byAir error, URLError transport failure).
# We can't verify the snapshot is current, and the rule says
# "before acting on a recalled value, verify against the live
# source" — failed verification means we don't act.
#
# Flights NOT due to poll this cycle keep their cadence-bounded
# freshness contract and remain eligible — their snapshot is within
# the cadence ladder's staleness budget by construction.
removed_upstream_ids: set[int] = set()
poll_failed_ids: set[int] = set()
for flight_id in active_flight_ids:
try:
flight_events = _process_flight(
flight_id=flight_id,
now_utc=now_utc,
byair=byair,
maps=maps,
time_to_leave_origin=cycle_origin,
)
except ByAirError as byair_err:
# 404 on the byAir side means the flight is no longer
# tracked upstream; surface as a removed_upstream event.
if byair_err.error_type == "not_found":
aggregated_events.append(
{"flight_id": flight_id, "event": {"reason": "removed_upstream"}}
)
removed_upstream_ids.add(flight_id)
continue
# Other byAir errors: log to stderr, skip this flight this
# cycle (don't update last_polled_at so it retries next
# cycle). The cycle attempted verification and failed, so
# the flight is excluded from cross-flight derivations.
print(
f"flight-assist precheck: byair error for flight {flight_id}: {byair_err}",
file=sys.stderr,
)
poll_failed_ids.add(flight_id)
continue
except urllib.error.URLError as transport_err:
# Transient transport failure (network, DNS, byAir down).
# Degrade this flight's poll for this cycle instead of
# collapsing the whole precheck via the outer catch — other
# flights' polls still get a chance. last_polled_at is not
# updated, so the cadence-gate fires for this flight next
# cycle. Per `coding-policy: error-handling` "Specific
# Exceptions" + "Graceful Fallback". This cycle attempted
# verification and failed, so the flight is excluded from
# cross-flight derivations per `coding-policy:
# stateful-artifacts` (verify before recall).
print(
f"flight-assist precheck: transport error for flight {flight_id}: {transport_err}",
file=sys.stderr,
)
poll_failed_ids.add(flight_id)
continue
for event in flight_events:
aggregated_events.append({"flight_id": flight_id, "event": event})
# Connection-risk pass: walks the now-up-to-date on-disk state to
# group flights by trip_id and emit cross-flight risk events. Excludes
# both removed-upstream flights (snapshot known to lie) and
# poll-failed flights (snapshot unverified this cycle) per
# stateful-artifacts.
excluded_ids = removed_upstream_ids | poll_failed_ids
risk_candidate_ids = [fid for fid in active_flight_ids if fid not in excluded_ids]
aggregated_events.extend(
_check_connection_risks(
active_flight_ids=risk_candidate_ids,
now_utc=now_utc,
min_transfer_minutes=min_transfer_minutes,
)
)
return aggregated_events
def _check_connection_risks(
*,
active_flight_ids: list[int],
now_utc: datetime,
min_transfer_minutes: int,
) -> list[dict]:
"""Run the cross-flight connection-risk pass and persist fired markers.
Returns the `{flight_id, event}`-shaped list ready to merge into the
precheck's aggregated event output. For each fired event, this
function flips the leg-2 flight's `connection_at_risk_fired` marker
in state so subsequent cycles don't re-fire.
"""
flight_states: list[dict] = []
for fid in active_flight_ids:
state = read_flight_state(fid)
if state is not None:
flight_states.append(state)
risks = detect_connection_risks(
flight_states=flight_states,
now_utc=now_utc,
min_transfer_minutes=min_transfer_minutes,
)
states_by_id = {s["flight_id"]: s for s in flight_states}
emitted: list[dict] = []
for leg2_flight_id, event in risks:
leg2_state = states_by_id.get(leg2_flight_id)
if leg2_state is None:
continue
leg2_state["phase_markers"]["connection_at_risk_fired"] = True
write_flight_state(leg2_state)
emitted.append({"flight_id": leg2_flight_id, "event": event})
return emitted
def _resolve_min_transfer_minutes(config: dict) -> int:
"""Return the validated `min_transfer_minutes` from config, or the default.
The on-disk config can be hand-edited; `write_config` rejects bad
types and negative values but a manually-edited file with
`"min_transfer_minutes": "45"` or `True` would slip past the writer.
Coerce defensively here so a corrupt config doesn't propagate into
`detect_connection_risks` and surface as the `ValueError` the public
API raises on invalid input — which the outer-boundary catch would
then suppress for the entire cycle. Falling back to the default keeps
the cycle running while the stderr diagnostic flags the bad config
for the operator.
"""
value = config.get("min_transfer_minutes")
if value is None:
return DEFAULT_MIN_TRANSFER_MINUTES
if not isinstance(value, int) or isinstance(value, bool) or value < 0:
print(
f"flight-assist precheck: config.json:min_transfer_minutes is "
f"{type(value).__name__} {value!r}, expected non-negative int — "
f"falling back to {DEFAULT_MIN_TRANSFER_MINUTES}",
file=sys.stderr,
)
return DEFAULT_MIN_TRANSFER_MINUTES
return value
def _maybe_maps_client() -> MapsClient | None:
"""Return a MapsClient if GOOGLE_MAPS_API_KEY is set, else None.
Time-to-leave is the only consumer; if no key is configured the
precheck still runs but won't fire that one event type.
"""
if not os.environ.get("GOOGLE_MAPS_API_KEY"):
return None
return MapsClient.from_env()
def _process_flight(
*,
flight_id: int,
now_utc: datetime,
byair: ByAirClient,
maps: MapsClient | None,
time_to_leave_origin: str | None,
) -> list[dict]:
"""Process a single flight: cadence-gate, fetch, diff, emit events.
`time_to_leave_origin` is resolved once per cycle by `_run_cycle`
and passed in here, so every flight processed in the same cycle
agrees on the user's location even when the host-orchestrator-owned
`current-location.json` is rewritten mid-cycle.
"""
prior_state = read_flight_state(flight_id)
prior_snapshot = prior_state.get("last_snapshot") if prior_state else None
phase_markers = prior_state.get("phase_markers") if prior_state else _initial_phase_markers()
if phase_markers is None:
phase_markers = _initial_phase_markers()
if not _due_for_poll(prior_state, now_utc):
return []
raw_flight = byair.get_flight(flight_id=flight_id)
new_snapshot = _trim_to_snapshot(raw_flight)
events: list[dict] = []
# Delta-driven events from wake_rules.
events.extend(detect_wake_events(prior_snapshot, new_snapshot))
# Time-based events from phase_markers.
scheduled_dep_time = (
prior_state["scheduled_dep_time"] if prior_state else raw_flight.get("scheduledDepTime")
)
scheduled_arr_time = (
prior_state["scheduled_arr_time"] if prior_state else raw_flight.get("scheduledArrTime")
)
fired, event = check_day_before(
scheduled_dep_time=scheduled_dep_time,
phase_markers=phase_markers,
now_utc=now_utc,
)
if fired:
phase_markers["day_before_fired"] = True
events.append(event)
travel_time_seconds = _maybe_query_travel_time(
maps=maps,
origin=time_to_leave_origin,
raw_flight=raw_flight,
scheduled_dep_time=scheduled_dep_time,
now_utc=now_utc,
time_to_leave_already_fired=phase_markers.get("time_to_leave_fired", False),
)
fired, event = check_time_to_leave(
scheduled_dep_time=scheduled_dep_time,
travel_time_seconds=travel_time_seconds,
phase_markers=phase_markers,
now_utc=now_utc,
)
if fired:
phase_markers["time_to_leave_fired"] = True
events.append(event)
fired, event = check_arrival_logistics(
scheduled_arr_time=scheduled_arr_time,
phase_markers=phase_markers,
now_utc=now_utc,
)
if fired:
phase_markers["arrival_logistics_fired"] = True
events.append(event)
# Persist updated state. write_flight_state requires every documented
# required field; for a brand-new flight first seen on this cycle,
# populate from the byAir payload.
new_state = _build_flight_state(
flight_id=flight_id,
prior_state=prior_state,
raw_flight=raw_flight,
new_snapshot=new_snapshot,
phase_markers=phase_markers,
now_utc=now_utc,
)
write_flight_state(new_state)
return events
def _due_for_poll(prior_state: dict | None, now_utc: datetime) -> bool:
"""Return True when the cadence-ladder interval has elapsed since last poll."""
if prior_state is None:
return True # first cycle for this flight
last_polled = _parse_iso8601(prior_state.get("last_polled_at"))
if last_polled is None:
return True
snapshot = prior_state.get("last_snapshot") or {}
status = snapshot.get("computed_status", "scheduled")
scheduled_dep = _parse_iso8601(prior_state.get("scheduled_dep_time"))
scheduled_arr = _parse_iso8601(prior_state.get("scheduled_arr_time"))
interval_minutes = _interval_for(status, now_utc, scheduled_dep, scheduled_arr)
return now_utc - last_polled >= timedelta(minutes=interval_minutes)
def _interval_for(
status: str,
now_utc: datetime,
scheduled_dep: datetime | None,
scheduled_arr: datetime | None,
) -> int:
"""Return the cadence interval (minutes) for a given (status, time)."""
base = _BASE_CADENCE_MINUTES.get(status, 30)
if status == "scheduled" and scheduled_dep is not None:
time_to_dep = scheduled_dep - now_utc
if time_to_dep <= timedelta(hours=6):
return 10 # tightened to 10 min as departure approaches
return 30
if status in ("en_route", "departed") and scheduled_arr is not None:
time_to_arr = scheduled_arr - now_utc
if time_to_arr <= timedelta(minutes=30):
return 2 # catch carousel reveal
return base
def _resolve_time_to_leave_origin(
*,
home_address: str | None,
now_utc: datetime,
) -> str | None:
"""Origin-resolution ladder for the time-to-leave Distance Matrix query.
Order of preference:
1. `current-location.json` (orchestrator-written) if present and the
snapshot is fresh — `now_utc - captured_at <= _MAX_CURRENT_LOCATION_AGE_MINUTES`.
Formatted as `"<lat>,<lng>"` for the Distance Matrix API, which
accepts numeric origin/destination pairs natively.
2. `home_address` from `config.json` (the legacy single-source).
3. `None` — neither origin available; caller skips the maps query.
A mobile user (constant traveler) gets correct travel times when
the orchestrator is publishing live location; the static home
address remains the fallback for users who never share location
and for the gap between location updates. Issue
`jbaruch/nanoclaw-flight-assist#18`.
"""
loc = read_current_location()
if loc is not None:
captured = _parse_iso8601(loc.get("captured_at"))
if captured is not None:
age = now_utc - captured
if timedelta() <= age <= timedelta(minutes=_MAX_CURRENT_LOCATION_AGE_MINUTES):
return f"{loc['latitude']},{loc['longitude']}"
return home_address
def _maybe_query_travel_time(
*,
maps: MapsClient | None,
origin: str | None,
raw_flight: dict,
scheduled_dep_time: str | None,
now_utc: datetime,
time_to_leave_already_fired: bool,
) -> int | None:
"""Query maps_client for travel time, only when time-to-leave is close.
Returns None when:
- MapsClient is None (key unset)
- `origin` is None (origin-resolution yielded nothing — neither
a fresh `current-location.json` snapshot nor a configured
`home_address` was available at cycle-start)
- Scheduled departure is more than _TIME_TO_LEAVE_QUERY_WINDOW_HOURS away
- time_to_leave has already fired (no need to re-query)
`origin` is resolved once per cycle in `_run_cycle` and passed
through; this function does NOT re-read `current-location.json`,
so every flight in the same cycle agrees on the user's location.
"""
if maps is None or time_to_leave_already_fired or not origin:
return None
dep_dt = _parse_iso8601(scheduled_dep_time)
if dep_dt is None:
return None
if dep_dt - now_utc > timedelta(hours=_TIME_TO_LEAVE_QUERY_WINDOW_HOURS):
return None
dep_airport = raw_flight.get("depAirport", {})
destination = dep_airport.get("name") or dep_airport.get("code") or ""
if not destination:
return None
try:
result = maps.travel_time(origin=origin, destination=destination)
except MapsError as maps_err:
print(
f"flight-assist precheck: maps error for flight: {maps_err}",
file=sys.stderr,
)
return None
except urllib.error.URLError as transport_err:
# Transient transport failure to Google (network, DNS, API down).
# Degrade just this maps query — return None so time_to_leave
# defers to the next cycle. Per `coding-policy: error-handling`
# "Specific Exceptions" + "Graceful Fallback".
print(
f"flight-assist precheck: maps transport error: {transport_err}",
file=sys.stderr,
)
return None
return result.in_traffic_seconds or result.duration_seconds
def _trim_to_snapshot(raw_flight: dict) -> dict:
"""Filter the ~13KB byAir flight payload to the ~1KB operational slice.
Matches the last_snapshot shape in state-schema.md.
"""
inbound_raw = raw_flight.get("inbound") or {}
inbound = {
"aircraft_model": inbound_raw.get("aircraft_model"),
"registration": inbound_raw.get("registration"),
"flew": inbound_raw.get("flew"),
"predicted_delay_minutes": _extract_predicted_delay_minutes(inbound_raw),
}
position = raw_flight.get("position", {}).get("currentPosition", {})
return {
"code": raw_flight.get("code"),
"computed_status": raw_flight.get("computed_status"),
"computed_status_detail": raw_flight.get("computed_status_detail"),
"computed_phase_progress": raw_flight.get("computed_phase_progress"),
"computed_phase_risk": raw_flight.get("computed_phase_risk"),
"computed_phase_overdue": raw_flight.get("computed_phase_overdue"),
"dep_gate": raw_flight.get("depGate"),
"arr_gate": raw_flight.get("arrGate"),
"dep_terminal": raw_flight.get("depTerminal"),
"arr_terminal": raw_flight.get("arrTerminal"),
"dep_time": raw_flight.get("depTime"),
"arr_time": raw_flight.get("arrTime"),
"baggage": raw_flight.get("baggage"),
"inbound": inbound,
"position_lat": position.get("lat"),
"position_lon": position.get("lon"),
}
def _extract_predicted_delay_minutes(inbound: dict) -> int | None:
predicted = inbound.get("predicted_delay")
if not isinstance(predicted, dict):
return None
value = predicted.get("delay_minutes")
if not isinstance(value, int) or isinstance(value, bool):
return None
return value
def _build_flight_state(
*,
flight_id: int,
prior_state: dict | None,
raw_flight: dict,
new_snapshot: dict,
phase_markers: dict,
now_utc: datetime,
) -> dict:
"""Construct a complete flight-state record per state-schema.md."""
if prior_state is not None:
scheduled_dep_time = prior_state["scheduled_dep_time"]
scheduled_arr_time = prior_state["scheduled_arr_time"]
ownership = prior_state["ownership"]
trip_id = prior_state["trip_id"]
dep_airport_id = prior_state["dep_airport_id"]
arr_airport_id = prior_state["arr_airport_id"]
else:
scheduled_dep_time = raw_flight["scheduledDepTime"]
scheduled_arr_time = raw_flight["scheduledArrTime"]
ownership = raw_flight.get("ownership", "mine")
trip_id = raw_flight.get("trip_id") or raw_flight.get("tripId") or 0
dep_airport_id = raw_flight.get("depAirport", {}).get("id", 0)
arr_airport_id = raw_flight.get("arrAirport", {}).get("id", 0)
return {
"flight_id": flight_id,
"code": raw_flight.get("code", ""),
"ownership": ownership,
"trip_id": trip_id,
"scheduled_dep_time": scheduled_dep_time,
"scheduled_arr_time": scheduled_arr_time,
"dep_airport_id": dep_airport_id,
"arr_airport_id": arr_airport_id,
"last_polled_at": now_utc.isoformat().replace("+00:00", "Z"),
"last_snapshot": new_snapshot,
"phase_markers": phase_markers,
"last_wake_at": prior_state["last_wake_at"] if prior_state else None,
"last_wake_reason": prior_state["last_wake_reason"] if prior_state else None,
}
def _initial_phase_markers() -> dict:
return {
"day_before_fired": False,
"time_to_leave_fired": False,
"boarding_fired": False,
"arrival_logistics_fired": False,
"landed_acknowledged": False,
"connection_at_risk_fired": False,
}
def _parse_iso8601(value: str | None) -> datetime | None:
if not value:
return None
try:
parsed = datetime.fromisoformat(value.replace("Z", "+00:00"))
except ValueError:
return None
if parsed.tzinfo is None:
parsed = parsed.replace(tzinfo=timezone.utc)
return parsed
if __name__ == "__main__":
sys.exit(main())