CtrlK
BlogDocsLog inGet started
Tessl Logo

jbaruch/nanoclaw-conferences

Finds open conference CFPs relevant to the user across Java/AI/developer conferences, with persistent sent/dismissed/remind state and source-aware Sessionize verification. NanoClaw per-chat overlay tile.

70

Quality

87%

Does it follow best practices?

Impact

No eval scenarios have been run

SecuritybySnyk

Advisory

Suggest reviewing before use

Overview
Quality
Evals
Security
Files

run-state.pyskills/check-cfps/scripts/

#!/usr/bin/env python3
"""Run-state checkpoint store for the check-cfps pipeline (resumable runs).

check-cfps is an agent-orchestrated pipeline: deterministic helper scripts
(fetch, prepare-sessionize-batch, apply-sessionize-results, dedup, ...)
bracket agent-only steps (MCP calls, web search, relevance judgment). The
agent historically held every stage's intermediate artifact in context and
persisted only at Step 8, so a token-limit continuation lost the working
set and reconstructed it from a chat summary
(jbaruch/nanoclaw-conferences#4 — the 2026-06-10 run blew its budget
mid-pipeline, then re-derived prep output and the state schema from memory).

This script gives each stage a durable, machine-readable checkpoint on
disk so a continuation re-reads the last artifact instead of rebuilding it.
Artifacts live in a per-run directory (default
`/workspace/group/state/cfp-run/`, override via `CFP_RUN_STATE_DIR`):

  manifest.json   {"schema_version": 1, "run_date": "<YYYY-MM-DD UTC>",
                   "completed": ["fetch", "candidates", ...]}
  <stage>.json    the JSON artifact saved for that stage

Subcommands:
  begin          Start (or resume) a run. If manifest.json exists with
                 run_date == today (UTC), resume: emit
                 {"resume": true, "run_date", "completed": [...]}. Otherwise
                 (absent, stale date, or unreadable manifest) reset the dir
                 to a fresh manifest and emit
                 {"resume": false, "run_date", "completed": []}.
  save <stage>   Read a JSON artifact on stdin, write <stage>.json
                 atomically, append <stage> to manifest.completed
                 (order-preserving, deduped). Emit {"saved": "<stage>"}.
  load <stage>   Print the saved <stage>.json artifact verbatim. Exit 2 if
                 no artifact was saved for that stage.
  done           Remove the run directory (success teardown). Emit
                 {"cleared": true}.

Resume is best-effort, not a correctness requirement: stages are
idempotent and Step 5 re-verifies the full cohort, so a fresh full run is
always safe. `begin` resets across UTC-day boundaries precisely so a
days-later continuation starts clean rather than resuming a stale run.

Stage names are free-form lowercase identifiers (`[a-z0-9][a-z0-9_-]*`);
check-cfps uses fetch, candidates, prep, sessionize_results, decisions,
working_set. Schema doc: `references/run-state.md`.

Exit 0 on success; exit 1 with a stderr diagnostic on bad usage /
malformed input / I/O failure; exit 2 when `load` is asked for an absent
stage.
"""

import argparse
import json
import os
import re
import sys
import tempfile
from datetime import datetime, timezone
from pathlib import Path

DEFAULT_RUN_DIR = Path("/workspace/group/state/cfp-run")
MANIFEST_NAME = "manifest.json"
SCHEMA_VERSION = 1
STAGE_RE = re.compile(r"^[a-z0-9][a-z0-9_-]*$")


def _run_dir() -> Path:
    override = os.environ.get("CFP_RUN_STATE_DIR")
    return Path(override) if override else DEFAULT_RUN_DIR


def _today() -> str:
    return datetime.now(timezone.utc).date().isoformat()


def _atomic_write_json(path: Path, payload) -> None:
    """Write `payload` as JSON to `path` via temp file + fsync + os.replace,
    preserving the existing file's mode (0644 fallback). Raises on failure;
    cleanup uses try/finally (no broad except) per
    `coding-policy: error-handling`."""
    path.parent.mkdir(parents=True, exist_ok=True)
    try:
        mode = path.stat().st_mode & 0o777
    except FileNotFoundError:
        mode = 0o644
    tmp = tempfile.NamedTemporaryFile(
        mode="w",
        dir=path.parent,
        prefix=f".{path.name}.",
        suffix=".tmp",
        delete=False,
        encoding="utf-8",
    )
    replaced = False
    try:
        json.dump(payload, tmp, indent=2, ensure_ascii=False)
        tmp.flush()
        os.fsync(tmp.fileno())
        tmp.close()
        os.chmod(tmp.name, mode)
        os.replace(tmp.name, path)
        replaced = True
    finally:
        if not replaced:
            if not tmp.closed:
                tmp.close()
            try:
                os.unlink(tmp.name)
            except FileNotFoundError:
                pass


def _read_manifest(run_dir: Path):
    """Return the manifest dict, or None when absent/unreadable/not a dict
    (any of which `begin` treats as 'no usable prior run')."""
    try:
        data = json.loads((run_dir / MANIFEST_NAME).read_text(encoding="utf-8"))
    except (OSError, json.JSONDecodeError, UnicodeDecodeError):
        return None
    return data if isinstance(data, dict) else None


def _clear_dir(run_dir: Path) -> None:
    """Delete the run dir's files (manifest + saved stage artifacts).
    Only files are removed — no recursion — since the store is flat."""
    if not run_dir.exists():
        return
    for child in run_dir.iterdir():
        if child.is_file():
            child.unlink()


def cmd_begin(run_dir: Path) -> int:
    today = _today()
    manifest = _read_manifest(run_dir)
    if (
        manifest is not None
        and manifest.get("schema_version") == SCHEMA_VERSION
        and manifest.get("run_date") == today
        and isinstance(manifest.get("completed"), list)
    ):
        print(json.dumps({"resume": True, "run_date": today, "completed": manifest["completed"]}))
        return 0

    _clear_dir(run_dir)
    fresh = {"schema_version": SCHEMA_VERSION, "run_date": today, "completed": []}
    _atomic_write_json(run_dir / MANIFEST_NAME, fresh)
    print(json.dumps({"resume": False, "run_date": today, "completed": []}))
    return 0


def cmd_save(run_dir: Path, stage: str) -> int:
    if not STAGE_RE.match(stage):
        sys.stderr.write(f"run-state: invalid stage name {stage!r}\n")
        return 1
    raw = sys.stdin.read()
    try:
        artifact = json.loads(raw)
    except json.JSONDecodeError as exc:
        sys.stderr.write(f"run-state: stage {stage!r} stdin is not valid JSON: {exc}\n")
        return 1

    manifest = _read_manifest(run_dir)
    if manifest is None:
        # save before begin (or after a corrupt manifest): start a minimal
        # run rather than dropping the artifact on the floor. No reset here —
        # never destroy artifacts on a save.
        manifest = {"schema_version": SCHEMA_VERSION, "run_date": _today(), "completed": []}

    _atomic_write_json(run_dir / f"{stage}.json", artifact)

    completed = manifest.get("completed")
    if not isinstance(completed, list):
        completed = []
    if stage not in completed:
        completed.append(stage)
    manifest["completed"] = completed
    manifest.setdefault("schema_version", SCHEMA_VERSION)
    manifest.setdefault("run_date", _today())
    _atomic_write_json(run_dir / MANIFEST_NAME, manifest)

    print(json.dumps({"saved": stage}))
    return 0


def cmd_load(run_dir: Path, stage: str) -> int:
    if not STAGE_RE.match(stage):
        sys.stderr.write(f"run-state: invalid stage name {stage!r}\n")
        return 1
    path = run_dir / f"{stage}.json"
    try:
        text = path.read_text(encoding="utf-8")
    except FileNotFoundError:
        sys.stderr.write(f"run-state: no saved artifact for stage {stage!r}\n")
        return 2
    except (OSError, UnicodeDecodeError) as exc:
        sys.stderr.write(f"run-state: cannot read stage {stage!r}: {type(exc).__name__}: {exc}\n")
        return 1
    try:
        json.loads(text)
    except json.JSONDecodeError as exc:
        sys.stderr.write(f"run-state: saved stage {stage!r} is corrupt JSON: {exc}\n")
        return 1
    sys.stdout.write(text if text.endswith("\n") else text + "\n")
    return 0


def cmd_done(run_dir: Path) -> int:
    _clear_dir(run_dir)
    if run_dir.exists():
        try:
            run_dir.rmdir()
        except OSError:
            # Non-empty (a subdir we didn't create) — leave it; the files
            # are already gone, which is what `done` promises.
            pass
    print(json.dumps({"cleared": True}))
    return 0


def main(argv=None) -> int:
    parser = argparse.ArgumentParser(
        description="Run-state checkpoint store for the check-cfps pipeline."
    )
    sub = parser.add_subparsers(dest="command", required=True)
    sub.add_parser("begin", help="start or resume a run")
    p_save = sub.add_parser("save", help="save a stage artifact from stdin")
    p_save.add_argument("stage")
    p_load = sub.add_parser("load", help="print a saved stage artifact")
    p_load.add_argument("stage")
    sub.add_parser("done", help="clear the run directory on success")
    args = parser.parse_args(argv)

    run_dir = _run_dir()
    # Catch write/remove/mkdir failures from any subcommand so the process
    # contract is explicit (exit 1 + stderr diagnostic) rather than an
    # accidental traceback per `coding-policy: script-delegation`. cmd_load's
    # own read handling returns 2/1 before reaching here.
    try:
        if args.command == "begin":
            return cmd_begin(run_dir)
        if args.command == "save":
            return cmd_save(run_dir, args.stage)
        if args.command == "load":
            return cmd_load(run_dir, args.stage)
        # `done` — the only remaining branch under a required subparser.
        return cmd_done(run_dir)
    except OSError as exc:
        sys.stderr.write(f"run-state: {args.command} failed: {type(exc).__name__}: {exc}\n")
        return 1


if __name__ == "__main__":
    sys.exit(main())

skills

check-cfps

SKILL.md

CHANGELOG.md

README.md

tile.json