CtrlK
BlogDocsLog inGet started
Tessl Logo

jbaruch/nanoclaw-trusted

Rules for trusted NanoClaw groups. Shared memory, session bootstrap, cross-group memory updates. Loaded for trusted and main containers only.

94

Quality

94%

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

SecuritybySnyk

Risky

Do not use without reviewing

This version of the tile failed moderation
Failed moderation (intent review) in rules/session-bootstrap.md: This is a prompt injection attack disguised as a cursor rule. It instructs an AI agent to execute a bash command as its very first action in every session, then call an external skill ('tessl__trusted-memory') which could load arbitrary instructions or exfiltrate data. The sentinel file mechanism (/tmp/session_bootstrapped) is designed to make it look like a legitimate caching/dedup mechanism, but the real goal is to force the AI to execute commands and call external services before processing the user's actual request. This is a classic prompt injection / agent manipulation pattern.
Overview
Quality
Evals
Security
Files

register-session.pyskills/trusted-memory/scripts/

#!/usr/bin/env python3
"""Write session metadata into session-state.json and mark bootstrap done.

Usage:
    register-session.py

Reads:
    - Session ID from `/workspace/store/messages.db` (`sessions` table,
      first row). Falls back to `None` when the table is empty.
    - Session name from `$NANOCLAW_SESSION_NAME` (defaults to `default`).
    - Current session ID from `$CLAUDE_SESSION_ID` (for the bootstrap
      sentinel).

Writes (both atomically, tempfile + fsync + os.replace):
    - `/workspace/group/session-state.json` — updates
      `sessions.<session_name>` with `{started, epoch, session_id,
      last_seen}` and mirrors `session_id` at the top level for
      back-compat.
    - `/tmp/session_bootstrapped` — sentinel containing the current
      CLAUDE_SESSION_ID so `needs-bootstrap.py` reports "done" on
      subsequent runs. NOT written when CLAUDE_SESSION_ID is missing
      or empty — an empty sentinel would match an empty env on the
      next run and cause bootstrap to be skipped forever.

Exit codes:
    0 — success (state written; sentinel skipped is still success
        when CLAUDE_SESSION_ID is empty/unset, with a stderr note)
    1 — state file read/write failure

Note on sqlite behavior: the previous inline block would have raised
on a missing messages.db or missing `sessions` table. This script
intentionally catches `sqlite3.Error` broadly and treats it as
`session_id=None` with a stderr note — a deliberate relaxation,
not a behavior-preserving port, because bootstrap shouldn't hard-
crash the whole memory-load flow when the DB is temporarily
unreadable (locked, early-boot, etc.). The back-compat mirror still
records `None`, which downstream consumers tolerate.

Concurrency: state-file writes use an fcntl.LOCK_EX on
`<STATE_PATH>.lock` around the read-modify-write cycle. heartbeat-
precheck.py and check-email writers use the same lock file; without
coordination a concurrent `last_seen` stamp or `pending_response`
update can clobber this script's `sessions.<name>` write.
"""
import fcntl
import json
import os
import sqlite3
import sys
import tempfile
import time
from datetime import datetime, timezone
from typing import Optional

# messages.db can be locked by the orchestrator's writer during bootstrap;
# without a timeout sqlite3.connect waits forever. Other admin-tile
# scripts (heartbeat-precheck / heartbeat-checks) use 5–15s. 10s keeps
# the bootstrap flow responsive while still riding out short contention.
MESSAGES_DB_TIMEOUT_SECONDS = 10

MESSAGES_DB = "/workspace/store/messages.db"
STATE_PATH = "/workspace/group/session-state.json"
STATE_LOCK_PATH = STATE_PATH + ".lock"
SENTINEL = "/tmp/session_bootstrapped"

# Bumped when the on-disk shape of session-state.json changes. v1 is the
# current canonical shape (top-level `schema_version` + `sessions.<name>`
# subtree + back-compat top-level `session_id`). Files written before
# this field existed are read-tolerated below and silently upgraded to
# v1 on the next write — owner-skill migration per
# `jbaruch/coding-policy: stateful-artifacts`.
STATE_SCHEMA_VERSION = 1


def read_session_id_from_db() -> Optional[str]:
    try:
        conn = sqlite3.connect(MESSAGES_DB, timeout=MESSAGES_DB_TIMEOUT_SECONDS)
        try:
            row = conn.execute(
                "SELECT session_id FROM sessions LIMIT 1"
            ).fetchone()
        finally:
            conn.close()
        return row[0] if row else None
    except sqlite3.Error as e:
        print(
            f"register-session: cannot read session_id from {MESSAGES_DB}: {e}",
            file=sys.stderr,
        )
        return None


def atomic_write_text(path: str, content: str, default_mode: int = 0o644) -> None:
    """Atomically replace `path`'s contents with `content`.

    Tempfile in the same directory → write → flush → fsync → chmod (to
    the target's existing mode, or `default_mode` if creating fresh) →
    os.replace. Cleans up the tempfile on any failure so a crash mid-
    write doesn't leave a stray `*.tmp` orphan next to the target.

    The chmod step matters: NamedTemporaryFile defaults to 0600, which
    would silently narrow shared state files (e.g. /workspace/group/*)
    on the first write unless preserved.
    """
    dir_ = os.path.dirname(path) or "."
    try:
        target_mode = os.stat(path).st_mode & 0o777
    except FileNotFoundError:
        target_mode = default_mode

    tmp_path: Optional[str] = None
    try:
        with tempfile.NamedTemporaryFile(
            "w", dir=dir_, delete=False, suffix=".tmp"
        ) as tf:
            tmp_path = tf.name
            tf.write(content)
            tf.flush()
            os.fsync(tf.fileno())
        os.chmod(tmp_path, target_mode)
        os.replace(tmp_path, path)
        tmp_path = None
    finally:
        if tmp_path is not None:
            try:
                os.unlink(tmp_path)
            except OSError as e:
                print(
                    f"register-session: failed to clean up temp file {tmp_path}: {e}",
                    file=sys.stderr,
                )


def atomic_write_json(path: str, data: dict) -> None:
    atomic_write_text(path, json.dumps(data, indent=2))


def main() -> None:
    session_id = read_session_id_from_db()
    session_name = os.environ.get("NANOCLAW_SESSION_NAME", "default")
    claude_session_id = os.environ.get("CLAUDE_SESSION_ID", "")

    # Hold LOCK_EX on STATE_LOCK_PATH for the entire read-modify-write
    # cycle. Other writers on this file — heartbeat-precheck's
    # last_seen stamp, check-email's seen_email_ids append, default-
    # session pending_response/muted_threads updates — use (or will
    # use) the same lock file. Without coordination, our sessions.<name>
    # write can clobber a concurrent last_seen update, or vice versa.
    # heartbeat-precheck.py documents the shared-lock convention.
    try:
        lock_f = open(STATE_LOCK_PATH, "a+")
    except OSError as e:
        print(
            f"register-session: cannot open lock file {STATE_LOCK_PATH}: {e}",
            file=sys.stderr,
        )
        sys.exit(1)

    try:
        fcntl.flock(lock_f.fileno(), fcntl.LOCK_EX)

        try:
            with open(STATE_PATH) as f:
                state = json.load(f)
        except FileNotFoundError:
            state = {}
        except (json.JSONDecodeError, UnicodeDecodeError) as e:
            # UnicodeDecodeError covers non-UTF-8 bytes (corruption/manual
            # edit); treated like malformed JSON — start fresh with a
            # stderr note rather than letting a traceback bubble up.
            print(
                f"register-session: state file malformed at {STATE_PATH}: {e}; starting fresh",
                file=sys.stderr,
            )
            state = {}
        except OSError as e:
            # PermissionError, EIO, etc. — docstring promises exit 1 on
            # any read failure.
            print(
                f"register-session: state file read failed at {STATE_PATH}: {e}",
                file=sys.stderr,
            )
            sys.exit(1)

        if not isinstance(state, dict):
            print(
                f"register-session: state file at {STATE_PATH} contained non-object JSON (type {type(state).__name__}); starting fresh",
                file=sys.stderr,
            )
            state = {}

        # setdefault("sessions", {}) doesn't guard against an existing but
        # non-dict value (e.g. list/string) — state["sessions"][name] = ...
        # below would crash with TypeError. Reset to {} with a stderr note
        # if the shape is wrong.
        if not isinstance(state.get("sessions"), dict):
            if "sessions" in state:
                print(
                    f"register-session: state.sessions was not an object (type {type(state.get('sessions')).__name__}); resetting",
                    file=sys.stderr,
                )
            state["sessions"] = {}
        now_iso = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
        state["sessions"][session_name] = {
            "started": now_iso,
            "epoch": int(time.time()),
            "session_id": session_id,
            "last_seen": now_iso,
        }
        state["session_id"] = session_id  # back-compat; see PR #55
        # Stamp schema_version last so it always reflects what this writer
        # produces, even when starting from an unversioned legacy file.
        state["schema_version"] = STATE_SCHEMA_VERSION

        try:
            atomic_write_json(STATE_PATH, state)
        except OSError as e:
            print(
                f"register-session: failed to write {STATE_PATH}: {e}",
                file=sys.stderr,
            )
            sys.exit(1)
    finally:
        # Lock released automatically when lock_f closes, but be
        # explicit so the intent is obvious to readers.
        try:
            fcntl.flock(lock_f.fileno(), fcntl.LOCK_UN)
        except OSError:
            # Already released or fd invalidated — lock_f.close() below
            # handles the rest. Nothing actionable; skip the stderr.
            pass
        lock_f.close()

    # Refuse to write an empty sentinel. An empty string would match the
    # default `$CLAUDE_SESSION_ID` fallback in needs-bootstrap.py
    # (`os.environ.get(..., "")`), so a subsequent run would read an
    # empty sentinel, see an empty current session, and incorrectly
    # decide bootstrap was already done — permanently skipping memory
    # load for every future session until the file is removed.
    if not claude_session_id:
        print(
            "register-session: $CLAUDE_SESSION_ID missing/empty; skipping sentinel write so next run re-bootstraps",
            file=sys.stderr,
        )
        emit_status(session_id, session_name, wrote_sentinel=False)
        return

    try:
        atomic_write_text(SENTINEL, claude_session_id)
    except OSError as e:
        print(
            f"register-session: failed to write sentinel {SENTINEL}: {e}",
            file=sys.stderr,
        )
        sys.exit(1)

    emit_status(session_id, session_name, wrote_sentinel=True)


def emit_status(
    session_id: Optional[str], session_name: str, *, wrote_sentinel: bool
) -> None:
    """Single-line JSON status to stdout per `script-delegation` rule.

    Always prints on a successful state write. `wrote_state` is always true
    here because reaching this point implies the atomic state write
    succeeded; failure paths exit non-zero before getting here. The exit
    code remains the authoritative success signal — JSON is for callers
    that want to log/inspect what was registered."""
    print(
        json.dumps(
            {
                "session_id": session_id,
                "session_name": session_name,
                "schema_version": STATE_SCHEMA_VERSION,
                "wrote_state": True,
                "wrote_sentinel": wrote_sentinel,
            }
        )
    )


if __name__ == "__main__":
    main()

README.md

tile.json