Rules for trusted NanoClaw groups. Shared memory, session bootstrap, cross-group memory updates. Loaded for trusted and main containers only.
94
94%
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Risky
Do not use without reviewing
#!/usr/bin/env python3
"""Read-only system-status probe for trusted-tier NanoClaw containers.
Trusted containers see the orchestrator's SQLite at
`/workspace/store/messages.db`. This script collects the three
read-only health signals the trusted skill surfaces — stuck
scheduled tasks, DB size, recent task-run failures — and emits one
JSON payload the SKILL.md consumes.
It is the trusted-tier counterpart to admin's `heartbeat-checks.py`,
narrowed to the data sources trusted containers actually have:
- Trusted tier does NOT have the disk-/IPC-/container-health probes
(those live in admin and require admin-only mounts).
- Trusted tier does NOT manage the dismiss file
(`/workspace/group/system-health-dismissed.json`); that's an admin
responsibility. This script reports task-run failures verbatim;
the operator decides what to do with them.
Per `jbaruch/coding-policy: script-delegation`: the deterministic
SQL queries belong here, not inlined in the SKILL.md prose. The
previous `tessl__check-system-health` skill in trusted had three
`python3 -c` blocks; this script replaces them.
Per `jbaruch/coding-policy: testing-standards`: covered by
`tests/test_system_status_checks.py` in this tile.
Usage:
system-status-checks.py [--db <path>] [--stuck-grace-minutes <int>]
[--message-row-warn <int>]
[--task-log-row-warn <int>]
[--db-size-mb-warn <int>]
--db Path to the messages.db (default
`/workspace/store/messages.db`).
--stuck-grace-minutes How long past `next_run` an active task
must be to count as stuck. Default 5.
Mirrors the prior inline check.
--message-row-warn Alert threshold for `messages` row count.
Default 100000 (the prior inline value).
--task-log-row-warn Alert threshold for `task_run_logs` row
count. Default 10000.
--db-size-mb-warn Alert threshold for the SQLite file size
in MB. Default 500.
Output (single-line JSON on stdout):
{
"checked_at": "<ISO 8601 UTC>",
"db_path": "...",
"stuck_tasks": [
{"id": "task-...", "prompt_preview": "...", "next_run": "..."}
],
"stuck_count": <int>,
"row_counts": {"messages": <int>, "task_run_logs": <int>},
"db_size_mb": <float>,
"recent_failures": [
{"task_id": "...", "run_at": "...", "error_summary": "..."}
],
"alerts": [<short string>] // one per crossed threshold
}
Exit codes:
0 — checks ran (alerts may or may not be populated; absence of
alerts means silence per the SKILL.md output contract).
1 — DB unreachable or every check raised: stdout still emits
the canonical shape with `alerts` describing what failed,
so the SKILL.md can report uniformly.
2 — CLI usage error.
"""
import argparse
import json
import os
import sqlite3
import sys
from datetime import datetime, timezone
def _now_iso() -> str:
return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
def _query_stuck_tasks(conn: sqlite3.Connection, grace_minutes: int) -> list[dict]:
rows = conn.execute(
"SELECT id, substr(prompt, 1, 50), next_run "
"FROM scheduled_tasks "
"WHERE status = 'active' AND next_run <= datetime('now', ?)",
(f"-{int(grace_minutes)} minutes",),
).fetchall()
return [
{"id": r[0], "prompt_preview": r[1], "next_run": r[2]} for r in rows
]
def _query_row_counts(conn: sqlite3.Connection) -> dict[str, int]:
msgs = conn.execute("SELECT COUNT(*) FROM messages").fetchone()[0]
logs = conn.execute("SELECT COUNT(*) FROM task_run_logs").fetchone()[0]
return {"messages": msgs, "task_run_logs": logs}
def _query_recent_failures(conn: sqlite3.Connection) -> list[dict]:
"""Failures within the last 24h. The dismiss file lives in admin's
domain and is NOT consulted here; trusted reports verbatim and
the operator decides what to do (per the SKILL.md contract)."""
rows = conn.execute(
"SELECT task_id, run_at, substr(coalesce(last_result, ''), 1, 200) "
"FROM task_run_logs "
"WHERE run_at >= datetime('now', '-24 hours') "
" AND coalesce(last_result, '') LIKE '%error%' "
"ORDER BY run_at DESC "
"LIMIT 20"
).fetchall()
return [
{"task_id": r[0], "run_at": r[1], "error_summary": r[2]} for r in rows
]
def main() -> int:
parser = argparse.ArgumentParser(description=__doc__.split("\n\n")[0])
parser.add_argument("--db", default="/workspace/store/messages.db")
parser.add_argument("--stuck-grace-minutes", type=int, default=5)
parser.add_argument("--message-row-warn", type=int, default=100_000)
parser.add_argument("--task-log-row-warn", type=int, default=10_000)
parser.add_argument("--db-size-mb-warn", type=int, default=500)
args = parser.parse_args()
payload: dict = {
"checked_at": _now_iso(),
"db_path": args.db,
"stuck_tasks": [],
"stuck_count": 0,
"row_counts": {"messages": None, "task_run_logs": None},
"db_size_mb": None,
"recent_failures": [],
"alerts": [],
}
if not os.path.exists(args.db):
payload["alerts"].append(f"db missing: {args.db}")
print(
f"system-status-checks: db missing at {args.db}; "
f"verify the trusted-tier mount points at the orchestrator's "
f"messages.db, or pass --db <path> to override.",
file=sys.stderr,
)
print(json.dumps(payload))
return 1
try:
payload["db_size_mb"] = round(os.path.getsize(args.db) / 1048576, 1)
except OSError as e:
payload["alerts"].append(f"db size unreadable: {e}")
queries_succeeded = 0
queries_attempted = 0
conn = None
try:
conn = sqlite3.connect(args.db, timeout=5)
queries_attempted += 1
try:
payload["stuck_tasks"] = _query_stuck_tasks(
conn, args.stuck_grace_minutes
)
payload["stuck_count"] = len(payload["stuck_tasks"])
queries_succeeded += 1
except sqlite3.Error as e:
payload["alerts"].append(f"stuck-tasks query failed: {e}")
queries_attempted += 1
try:
payload["row_counts"] = _query_row_counts(conn)
queries_succeeded += 1
except sqlite3.Error as e:
payload["alerts"].append(f"row-counts query failed: {e}")
queries_attempted += 1
try:
payload["recent_failures"] = _query_recent_failures(conn)
queries_succeeded += 1
except sqlite3.Error as e:
payload["alerts"].append(f"recent-failures query failed: {e}")
except sqlite3.Error as e:
payload["alerts"].append(f"db connect failed: {e}")
print(
f"system-status-checks: sqlite3 connect failed against {args.db}: {e}. "
f"Check that the file is a valid SQLite database and that the "
f"trusted container has read access to it (the trusted tier is "
f"read-only on this mount; if the file was moved, update the "
f"--db flag or the orchestrator mount).",
file=sys.stderr,
)
print(json.dumps(payload))
return 1
finally:
if conn is not None:
conn.close()
if payload["stuck_count"] > 0:
payload["alerts"].append(f"stuck tasks: {payload['stuck_count']}")
msg_rows = payload["row_counts"].get("messages")
if msg_rows is not None and msg_rows > args.message_row_warn:
payload["alerts"].append(
f"messages rowcount {msg_rows} > {args.message_row_warn}"
)
log_rows = payload["row_counts"].get("task_run_logs")
if log_rows is not None and log_rows > args.task_log_row_warn:
payload["alerts"].append(
f"task_run_logs rowcount {log_rows} > {args.task_log_row_warn}"
)
if (
payload["db_size_mb"] is not None
and payload["db_size_mb"] > args.db_size_mb_warn
):
payload["alerts"].append(
f"db size {payload['db_size_mb']}MB > {args.db_size_mb_warn}MB"
)
if payload["recent_failures"]:
payload["alerts"].append(
f"recent task failures: {len(payload['recent_failures'])} in last 24h"
)
# Per-docstring contract: exit 1 if connect succeeded but every
# query raised — without this, a schema mismatch / dropped table
# would produce a "silent green" with only failure-alerts populated.
if queries_attempted > 0 and queries_succeeded == 0:
print(
f"system-status-checks: connected to {args.db} but every check "
f"raised; see `alerts` in stdout for per-query reasons. Likely "
f"causes: schema drift, missing tables, or a corrupted DB. "
f"Run `sqlite3 {args.db} '.schema'` to verify the expected "
f"tables (messages, task_run_logs, scheduled_tasks) exist.",
file=sys.stderr,
)
print(json.dumps(payload))
return 1
print(json.dumps(payload))
return 0
if __name__ == "__main__":
sys.exit(main())rules
skills
system-status
trusted-memory