CtrlK
BlogDocsLog inGet started
Tessl Logo

jbaruch/nanoclaw-flight-assist

Flight notifications via byAir: delay, gate, connection risk, inbound aircraft delay, time-to-leave, arrival logistics. NanoClaw per-chat overlay tile.

69

Quality

87%

Does it follow best practices?

Impact

No eval scenarios have been run

SecuritybySnyk

Passed

No known issues

Overview
Quality
Evals
Security
Files

precheck.pyskills/sync-tripit/

#!/usr/bin/env python3
"""Adaptive precheck for sync_tripit (TripIt/byAir → active-flights.json refresh).

Fires every 5 minutes via the cadence-registry. Most fires are no-ops:
the precheck reads `/workspace/state/flight-assist/active-flights.json`
and per-flight state files via the non-owner snapshot API in
`state.py`, decides whether the byAir round-trip is warranted, and
emits `wake_agent: false` when it isn't. The byAir call fires only
when:

  - Any tracked flight has scheduled_dep_time within the next 24 hours
    (day-of-travel polling for delays / gate changes / cancellations).
  - `active-flights.json` mtime is older than 6 hours (catches newly-
    booked trips landing in TripIt between travel windows).
  - No state has been written yet (cold start).

When the gate passes, the precheck delegates to flight-assist's
`sync_tripit.py` via subprocess and forwards its stdout — sync_tripit
already emits the same `{wake_agent, data}` contract this script
needs, so the wake-payload composition lives in one place.

Scheduled-task contract: emits single-line JSON `{"wake_agent": <bool>,
"data": {...}}` on stdout. Exit 0 always (per agent-runner contract —
non-zero exit silently disables the wake, per `coding-policy:
error-handling` outer-boundary-process-contract). The sole catch-all
sits inside `main()` so the carve-out's "outermost process boundary"
precondition holds; all bootstrap (path resolution + state import)
happens inside that same try block.

stdlib-only per `coding-policy: dependency-management`.

References:
  - skills/flight-assist/sync_tripit.py — the underlying byAir sync
  - skills/flight-assist/state.py — read_active_flights_snapshot +
    read_flight_state_snapshot (non-owner reader API per
    coding-policy: stateful-artifacts)
"""

from __future__ import annotations

import json
import os
import subprocess
import sys
import traceback
from datetime import datetime, timedelta, timezone
from pathlib import Path
from types import ModuleType

# Locations the co-shipped flight-assist skill may live at. Runtime is
# the container mount; dev is the sibling clone path the test suite
# resolves under pytest. Both checked inside main()'s try block — no
# module-level filesystem access, so a missing co-deployment surfaces
# through the outer-boundary handler rather than via an import-time
# crash.
_FLIGHT_ASSIST_RUNTIME = Path("/home/node/.claude/skills/tessl__flight-assist")
_FLIGHT_ASSIST_DEV = Path(__file__).resolve().parent.parent / "flight-assist"

# Gate thresholds. Imminent-flight window matches the user-stated
# requirement ("only that frequent when there are flights in the next
# 24 hours"). Stale-state threshold catches new bookings landing in
# TripIt between travel windows — 6h is well under the typical "I
# booked something" → "I want it to show up" gap, and well over the
# 5-min cadence so the gate doesn't pin-fire every cycle.
_IMMINENT_FLIGHT_WINDOW = timedelta(hours=24)
_STALE_STATE_THRESHOLD = timedelta(hours=6)

# Subprocess timeout for the sync_tripit.py delegation. byAir has a per-
# call 30s timeout in its own client; one list_trips + N per-flight
# state writes is bounded comfortably below 60s.
_SYNC_SUBPROCESS_TIMEOUT = 60.0


def _load_flight_assist() -> tuple[ModuleType, Path]:
    """Resolve and import the co-shipped flight-assist skill.

    Returns `(state_module, sync_tripit_path)`. Raises `FileNotFoundError`
    when neither the runtime mount nor the dev-clone location holds the
    skill — main()'s outer-boundary handler converts that into the
    safe-shape wake payload. Idempotent: re-importing `state` is a
    cheap dict lookup in `sys.modules` after the first call, and
    repeated `sys.path.insert(0, ...)` of the same prefix is a no-op
    for module resolution (Python deduplicates by path during import).
    """
    if _FLIGHT_ASSIST_RUNTIME.is_dir():
        flight_assist_dir = _FLIGHT_ASSIST_RUNTIME
    elif _FLIGHT_ASSIST_DEV.is_dir():
        flight_assist_dir = _FLIGHT_ASSIST_DEV
    else:
        raise FileNotFoundError(
            "sync-tripit precheck: cannot locate the co-shipped flight-assist skill at "
            f"{_FLIGHT_ASSIST_RUNTIME} (runtime) or {_FLIGHT_ASSIST_DEV} (dev). Both skills "
            "ship from the same tile (jbaruch/nanoclaw-flight-assist); if one is missing the "
            "other can't function."
        )
    sys.path.insert(0, str(flight_assist_dir))
    import state as state_module

    return state_module, flight_assist_dir / "sync_tripit.py"


def _should_sync_now(state_module: ModuleType, *, now: datetime) -> tuple[bool, str]:
    """Decide whether this 5-min fire should hit byAir.

    Returns `(should, reason)` where `reason` is an opaque diagnostic
    string suitable for inclusion in the wake-payload's `data.reason`
    field. Reasons are stable enough for log-grep but not parsed by
    consumers.

    Uses the non-owner snapshot readers per `coding-policy:
    stateful-artifacts` — schema_version mismatch is treated as "no
    usable prior state" and never triggers an owner-side rewrite.
    """
    # Cold start — no state file yet, OR an old-schema file the owner
    # hasn't yet migrated. Both shapes resolve to "no usable prior
    # state" via the snapshot reader; we sync so the index gets
    # populated (and the owner sync_tripit.py call will rewrite it at
    # the latest schema).
    try:
        flight_ids = state_module.read_active_flights_snapshot()
    except state_module.StateError:
        # Corrupt state file — propagate to the outer-boundary handler;
        # this is a real fault that needs operator attention, not a
        # gate-skip case.
        raise

    if not flight_ids:
        # State file missing, empty, or old-schema. Use the mtime check
        # to decide whether to poll: a present-but-empty file with
        # recent mtime means the last sync confirmed zero tracked
        # flights; an absent or stale file means we should sync now.
        return _stale_state_check(state_module, now=now)

    # Imminent-flight check — any tracked flight's scheduled_dep_time
    # within the next 24 hours triggers a sync. Iteration is bounded by
    # the number of tracked flights (typically <10).
    threshold = now + _IMMINENT_FLIGHT_WINDOW
    for fid in flight_ids:
        flight_state = state_module.read_flight_state_snapshot(fid)
        if flight_state is None:
            # No usable per-flight state (file missing or old schema).
            # Fall through; the owner's next sync will rewrite it.
            continue
        dep_str = flight_state.get("scheduled_dep_time")
        if not dep_str:
            continue
        try:
            dep_time = datetime.fromisoformat(dep_str.replace("Z", "+00:00"))
        except (ValueError, AttributeError):
            # Malformed scheduled_dep_time — skip this flight in the
            # gate (don't crash). The owner skill's writer validates
            # the field; a malformed value here means a writer regressed
            # and should be filed as a bug, not handled by the gate.
            continue
        if now <= dep_time <= threshold:
            return True, f"imminent_flight_{fid}"

    # No imminent flights — fall through to the stale-state check to
    # catch newly-booked trips.
    return _stale_state_check(state_module, now=now)


def _stale_state_check(state_module: ModuleType, *, now: datetime) -> tuple[bool, str]:
    """Return (should_sync, reason) based on active-flights.json mtime."""
    active_path = state_module.state_dir() / state_module.ACTIVE_FLIGHTS_FILE
    if not active_path.exists():
        return True, "cold_start_no_state_file"
    mtime = datetime.fromtimestamp(active_path.stat().st_mtime, tz=timezone.utc)
    age = now - mtime
    if age > _STALE_STATE_THRESHOLD:
        return True, f"stale_state_age_{int(age.total_seconds() // 60)}min"
    return False, "no_imminent_flights_recent_sync"


def _emit(payload: dict) -> None:
    """Write the single-line JSON wake-payload to stdout."""
    print(json.dumps(payload, separators=(",", ":")))


def main() -> int:
    # outer-boundary-process-contract: the agent-runner reads non-zero
    # exit OR invalid stdout JSON as wake_agent=false, which here
    # silently disables the entire flight-assist polling pipeline.
    # Every unexpected exception — including bootstrap failures from
    # `_load_flight_assist` — flows into the safe-shape JSON path and
    # exit 0 so the contract stays honest. See coding-policy:
    # error-handling. This is the sole catch-all in the file.
    try:
        state_module, sync_tripit_path = _load_flight_assist()
        now = datetime.now(timezone.utc)
        should_sync, reason = _should_sync_now(state_module, now=now)

        if not should_sync:
            _emit({"wake_agent": False, "data": {"reason": reason}})
            return 0

        # Gate passed — delegate to flight-assist's sync_tripit.py.
        # That script emits the same {wake_agent, data} contract we
        # need, so forward its stdout verbatim. Inherit env (including
        # FLIGHT_ASSIST_STATE_DIR + BYAIR_MCP_URL).
        result = subprocess.run(
            [sys.executable, str(sync_tripit_path)],
            capture_output=True,
            text=True,
            timeout=_SYNC_SUBPROCESS_TIMEOUT,
            env=os.environ.copy(),
            check=False,
        )
        # Forward sync_tripit.py's diagnostic stderr verbatim (preserves
        # tracebacks from its outer-boundary-process-contract handler).
        if result.stderr:
            sys.stderr.write(result.stderr)
        # Forward stdout — sync_tripit.py is contracted to emit the
        # wake-payload JSON on stdout. If it didn't (subprocess crash,
        # empty output), emit a safe-shape payload so the cadence-
        # registry doesn't silently disable the next wake decision.
        if result.stdout.strip():
            sys.stdout.write(result.stdout)
        else:
            _emit(
                {"wake_agent": False, "data": {"reason": "sync_no_output", "gate_reason": reason}}
            )
        return 0
    except subprocess.TimeoutExpired:
        sys.stderr.write(
            f"sync-tripit precheck: sync_tripit.py exceeded {_SYNC_SUBPROCESS_TIMEOUT}s budget\n"
        )
        _emit({"wake_agent": False, "data": {"reason": "sync_subprocess_timeout"}})
        return 0
    except Exception:  # noqa: BLE001 — outer-boundary-process-contract
        traceback.print_exc(file=sys.stderr)
        _emit({"wake_agent": False, "data": {"reason": "precheck_internal_error"}})
        return 0


if __name__ == "__main__":
    sys.exit(main())

README.md

tile.json