Analyze agent sessions against verifier checklists, detect friction points, and create structured verifiers from skills and docs. Produces per-session verdicts and aggregated quality reports.
88
86%
Does it follow best practices?
Impact
97%
2.93xAverage score across 3 eval scenarios
Passed
No known issues
#!/usr/bin/env python3
"""
Prepare condensed session transcripts for judge-based analysis.
Reads normalized JSONL session files and produces condensed text transcripts
optimized for LLM consumption. Keeps user messages, assistant messages, tool
calls and results, with turn numbers for citation.
Output: per-session .txt files + manifest.json in _prepared/
No external dependencies.
"""
from __future__ import annotations
import argparse
import json
import os
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
# ─── Helpers ────────────────────────────────────────────────────────────────
def _text(event: dict) -> str | None:
segments = event.get("segments", [])
texts = [
s.get("data", "")
for s in segments
if isinstance(s, dict) and s.get("type") == "text"
]
return " ".join(texts).strip() if texts else None
def _json_data(event: dict) -> dict | None:
"""Extract first JSON segment as parsed dict."""
for s in event.get("segments", []):
if isinstance(s, dict) and s.get("type") == "json":
data = s.get("data", "")
if isinstance(data, str):
try:
return json.loads(data)
except (json.JSONDecodeError, ValueError):
pass
elif isinstance(data, dict):
return data
return None
def _tool_name(event: dict) -> str:
tool = event.get("tool", {})
return tool.get("display_name") or tool.get("name") or "?"
def _tool_status(event: dict) -> str | None:
return event.get("tool", {}).get("status")
def _trunc(s: str, n: int) -> str:
if len(s) <= n:
return s
return s[:n] + "..."
def _trunc_multiline(s: str, max_lines: int = 100, keep: int = 20) -> str:
"""Truncate content with many lines, keeping first/last `keep` lines."""
lines = s.splitlines()
if len(lines) <= max_lines:
return s
head = lines[:keep]
tail = lines[-keep:]
omitted = len(lines) - 2 * keep
return "\n".join(head + [f"[... {omitted} lines omitted ...]"] + tail)
def _summarize_tool_args(event: dict) -> str:
"""Extract a short summary of tool call arguments."""
jdata = _json_data(event)
text = _text(event)
parts = []
if jdata and isinstance(jdata, dict):
for key in (
"file_path",
"path",
"command",
"pattern",
"query",
"url",
"skill",
):
if key in jdata:
val = str(jdata[key])
parts.append(f"{key}={_trunc(val, 80)}")
if not parts:
for key in list(jdata.keys())[:2]:
val = str(jdata[key])
parts.append(f"{key}={_trunc(val, 60)}")
elif text:
parts.append(_trunc(text, 100))
return ", ".join(parts) if parts else ""
def _extract_file_content(event: dict) -> str | None:
"""Extract written/edited file content from a tool_call event.
Works across agents:
- Claude Write/Edit: JSON segment has ``content`` or ``new_string``
- Codex cat-redirect: JSON segment has ``cmd`` with heredoc content
- Cursor/Gemini: similar ``content`` field in JSON segment
"""
action = event.get("action", "")
if action not in ("file_write", "file_edit"):
return None
jdata = _json_data(event)
if not jdata or not isinstance(jdata, dict):
return None
# Direct content fields (Claude Write, Cursor write, Gemini)
content = (jdata.get("content") or jdata.get("contents")
or jdata.get("new_string") or "")
if content:
return content
# Codex: content embedded in cmd after heredoc marker (cat > file <<'TAG'\n...)
cmd = jdata.get("cmd", "")
if cmd and ("<<" in cmd):
# Extract everything after the first newline (skip the cat > ... <<'TAG' line)
first_nl = cmd.find("\n")
if first_nl >= 0:
body = cmd[first_nl + 1:]
# Strip trailing heredoc delimiter line if present
lines = body.rsplit("\n", 1)
if len(lines) == 2 and lines[1].strip() in (
"HTML", "EOF", "HEREDOC", "END", "DOC", "CSS", "JS", "PY",
"'HTML'", "'EOF'", "'HEREDOC'", "'END'",
):
body = lines[0]
return body
return None
# ─── Timestamp fallback ────────────────────────────────────────────────────
def _extract_raw_timestamp(normalized_path: str) -> str:
"""Try to get a timestamp from the raw JSONL log when normalized events lack one.
Reads the first line of the normalized file looking for any timestamp field,
then falls back to the file's mtime as ISO 8601.
"""
if not normalized_path:
return ""
p = Path(normalized_path)
if not p.exists():
return ""
# Try first few lines for any timestamp-like field
try:
with open(p, encoding="utf-8", errors="replace") as f:
for line in f:
line = line.strip()
if not line:
continue
try:
event = json.loads(line)
for key in ("timestamp", "ts", "created_at", "time", "date"):
val = event.get(key, "")
if val and isinstance(val, str) and len(val) >= 10:
return val
except (json.JSONDecodeError, ValueError):
pass
break # only check first non-empty line
except OSError:
pass
# Last resort: file modification time
try:
mtime = p.stat().st_mtime
return datetime.fromtimestamp(mtime, tz=timezone.utc).isoformat()
except OSError:
return ""
# ─── Condensation ──────────────────────────────────────────────────────────
def condense_session(events: list[dict], max_chars: int = 150000) -> str:
"""Convert normalized events into a condensed text transcript.
Limits are generous to preserve evidence the judge needs — especially
file write content and tool results. The overall max_chars cap is the
main safety valve; per-element limits just keep individual items sane.
"""
lines: list[str] = []
# Track the action of the most recent tool_call so we can
# collapse read results (tool_results don't carry the action).
last_call_action: str = ""
prev_turn = None
for ev in events:
kind = ev.get("kind", "")
actor = ev.get("actor", "")
turn = ev.get("turn", "?")
if kind in ("usage", "system"):
continue
# Visual separator between turns
if turn != prev_turn and prev_turn is not None:
lines.append(f"{'~' * 40}")
prev_turn = turn
prefix = f"[turn {turn}]"
if kind == "message" and actor == "user":
text = _text(ev) or ""
lines.append(f"{prefix} USER: {text}")
elif kind == "message" and actor == "assistant":
text = _text(ev) or ""
if len(text) > 3000:
text = text[:2000] + " [...] " + text[-800:]
lines.append(f"{prefix} ASSISTANT: {text}")
elif kind == "tool_call":
name = _tool_name(ev)
action = ev.get("action", "")
last_call_action = action
args_summary = _summarize_tool_args(ev)
# For shell/bash tools, annotate with the semantic action so
# the judge can distinguish reads from writes (especially for
# Codex where all operations go through shell()).
action_label = ""
if action == "file_write":
action_label = " [WRITE]"
elif action == "file_read":
action_label = " [READ]"
elif action == "file_search":
action_label = " [SEARCH]"
# Include file content for writes/edits — keep generously,
# since this is the primary evidence the judge needs to
# verify code-level rules (fonts, CSS variables, animations…).
# Only truncate truly large files; anything under 25k chars
# is kept in full since it easily fits the token budget.
file_content = _extract_file_content(ev) or ""
content_note = ""
if file_content:
if len(file_content) <= 25000:
truncated = file_content
else:
truncated = _trunc_multiline(file_content, max_lines=600, keep=200)
if len(truncated) > 40000:
truncated = truncated[:28000] + "\n[... content truncated ...]\n" + truncated[-10000:]
content_note = f"\n FILE_CONTENT:\n{truncated}"
call_line = f"{prefix} [TOOL] {name}{action_label}"
if args_summary:
call_line += f"({_trunc(args_summary, 150)})"
call_line += content_note
lines.append(call_line)
elif kind == "tool_result":
name = _tool_name(ev)
status = _tool_status(ev)
text = _text(ev) or ""
is_read = (
name.lower() in ("read", "read_file", "read_file_v2")
or last_call_action == "file_read"
)
if status == "error":
lines.append(f"{prefix} [ERROR] {name}: {_trunc(text, 500)}")
elif is_read:
# Collapse file read results to just metadata.
# The judge only needs to know what was read, not its
# content — showing content causes false positives when
# the judge confuses read code with agent-authored code.
line_count = text.count("\n") + 1 if text else 0
lines.append(
f"{prefix} [RESULT] {name}: ({line_count} lines read)"
)
elif name.lower() in ("glob", "grep", "search") or last_call_action == "file_search":
# Search results — keep short, truncate long lists
if len(text) <= 500:
lines.append(f"{prefix} [RESULT] {name}: {text}")
else:
result_lines = text.splitlines()
kept = result_lines[:10]
if len(result_lines) > 10:
kept.append(f"[... {len(result_lines) - 10} more results ...]")
lines.append(
f"{prefix} [RESULT] {name}: {chr(10).join(kept)}"
)
elif len(text) <= 500:
lines.append(f"{prefix} [RESULT] {name}: {text}")
else:
truncated = _trunc_multiline(text, max_lines=120, keep=40)
if len(truncated) > 3000:
truncated = truncated[:2500] + "... ({} chars)".format(len(text))
lines.append(
f"{prefix} [RESULT] {name}: {truncated}"
)
elif kind == "error":
text = _text(ev) or ""
lines.append(f"{prefix} [ERROR] {_trunc(text, 500)}")
result = "\n".join(lines)
if len(result) > max_chars:
# Smart truncation: keep the beginning (context, skill activation)
# and end (final output, verification) with more weight on the start
head_chars = int(max_chars * 0.6)
tail_chars = max_chars - head_chars
result = (
result[:head_chars]
+ "\n\n[... transcript truncated — middle turns omitted ...]\n\n"
+ result[-tail_chars:]
)
return result
# ─── Session loading ───────────────────────────────────────────────────────
def load_sessions(
logs_dir: Path, agents: list[str] | None = None
) -> list[dict[str, Any]]:
"""Load all normalized sessions with metadata."""
sessions = []
for agent_dir in sorted(logs_dir.iterdir()):
if not agent_dir.is_dir():
continue
agent = agent_dir.name
if agents and agent not in agents:
continue
for f in sorted(agent_dir.glob("*.jsonl")):
events = []
for line in f.read_text(errors="replace").splitlines():
try:
events.append(json.loads(line))
except json.JSONDecodeError:
pass
if events:
sessions.append(
{
"agent": agent,
"file": str(f),
"session_id": f.stem,
"events": events,
}
)
return sessions
def prepare_session(session: dict, max_chars: int = 150000) -> dict | None:
"""Prepare a single session. Returns metadata + condensed text, or None if too short."""
events = session["events"]
if len(events) < 3:
return None
transcript = condense_session(events, max_chars)
max_turn = max((e.get("turn", 0) for e in events), default=0)
timestamps = [e.get("timestamp", "") for e in events if e.get("timestamp")]
first_ts = timestamps[0] if timestamps else ""
last_ts = timestamps[-1] if timestamps else ""
# Classify session type based on whether the agent wrote/edited files
has_writes = any(
e.get("action") in ("file_write", "file_edit")
for e in events
if e.get("kind") == "tool_call"
)
session_type = "modifying" if has_writes else "read-only"
header = (
f"SESSION: {session['session_id']}\n"
f"AGENT: {session['agent']}\n"
f"SOURCE: {session['file']}\n"
f"EVENTS: {len(events)}\n"
f"TURNS: {max_turn}\n"
f"DATE: {first_ts}\n"
f"SESSION_TYPE: {session_type}\n"
f"---\n"
)
# If no timestamps in normalized events, try to extract from raw log
if not first_ts:
first_ts = _extract_raw_timestamp(session.get("file", ""))
if first_ts and not last_ts:
last_ts = first_ts
return {
"session_id": session["session_id"],
"agent": session["agent"],
"source_file": session["file"],
"events": len(events),
"turns": max_turn,
"session_timestamp": first_ts,
"first_timestamp": first_ts,
"last_timestamp": last_ts,
"condensed_chars": len(transcript),
"full_text": header + transcript,
}
# ─── Project helpers ───────────────────────────────────────────────────────
def _discover_projects(base_dir: Path) -> list[str]:
if not base_dir.exists():
return []
return [
d.name
for d in sorted(base_dir.iterdir())
if d.is_dir()
and not d.name.startswith((".", "_"))
and (d / "normalized").is_dir()
]
def prepare_project(
logs_dir: Path,
out_dir: Path,
max_chars: int = 150000,
agents: list[str] | None = None,
refresh: bool = False,
max_sessions: int | None = None,
session_ids: list[str] | None = None,
) -> dict:
"""Prepare all sessions for a single project. Returns manifest data.
If *max_sessions* is set, only the N most recent sessions (by timestamp)
are prepared — useful for quick-check mode.
If *session_ids* is set, only those specific sessions are prepared.
Format: ``agent/session_id`` (e.g. ``claude-code/abc123``).
"""
prep_dir = out_dir / "prepared"
manifest_path = prep_dir / "manifest.json"
if not refresh and manifest_path.exists():
manifest = json.loads(manifest_path.read_text())
print(
f" Already prepared ({manifest.get('total_sessions', '?')} sessions). "
f"Use --refresh to redo."
)
return manifest
sessions = load_sessions(logs_dir, agents)
# When --sessions is set, filter to only those specific session IDs
if session_ids:
id_set = set(session_ids)
sessions = [
s for s in sessions
if f"{s['agent']}/{s['session_id']}" in id_set
]
print(f" Filtering to {len(sessions)} of {len(id_set)} requested session(s)")
# When --max-sessions is set, sort by most recent first and take top N.
# We peek at each session's first timestamp (or file mtime as fallback)
# to determine recency.
if max_sessions and max_sessions > 0 and len(sessions) > max_sessions:
def _session_sort_key(sess: dict) -> str:
for ev in sess["events"]:
ts = ev.get("timestamp", "")
if ts:
return ts
# Fallback: file mtime
try:
mtime = Path(sess["file"]).stat().st_mtime
return datetime.fromtimestamp(mtime, tz=timezone.utc).isoformat()
except OSError:
return ""
sessions.sort(key=_session_sort_key, reverse=True)
sessions = sessions[:max_sessions]
print(f" Limiting to {max_sessions} most recent session(s)")
prep_dir.mkdir(parents=True, exist_ok=True)
prepared = []
skipped = 0
for sess in sessions:
result = prepare_session(sess, max_chars)
if result is None:
skipped += 1
continue
# Write condensed transcript
agent_dir = prep_dir / result["agent"]
agent_dir.mkdir(parents=True, exist_ok=True)
txt_file = agent_dir / f"{result['session_id']}.txt"
txt_file.write_text(result["full_text"], encoding="utf-8")
# Add to manifest (without full_text)
entry = {k: v for k, v in result.items() if k != "full_text"}
entry["prepared_file"] = str(txt_file.relative_to(out_dir))
prepared.append(entry)
manifest = {
"prepared_at": datetime.now(timezone.utc).isoformat(),
"total_sessions": len(prepared),
"sessions_skipped": skipped,
"total_condensed_chars": sum(p["condensed_chars"] for p in prepared),
"sessions": prepared,
}
manifest_path.write_text(json.dumps(manifest, indent=2), encoding="utf-8")
print(f" Prepared {len(prepared)} sessions ({skipped} skipped, <3 events)")
print(f" Total condensed: {manifest['total_condensed_chars']:,} chars")
print(f" Written to {prep_dir}/")
return manifest
# ─── CLI ────────────────────────────────────────────────────────────────────
def main():
parser = argparse.ArgumentParser(
description="Prepare condensed session transcripts for judge-based analysis"
)
parser.add_argument(
"--project", default=None, help="Project name (default: auto-discover)"
)
parser.add_argument(
"--cwd", default=os.getcwd(), help="Project directory (default: cwd)"
)
parser.add_argument(
"--analysis-dir",
default=None,
help="Base logs directory (default: <cwd>/.tessl/logs)",
)
parser.add_argument(
"--label",
default=None,
help="Label for output directory (default: today's date)",
)
parser.add_argument(
"--agents",
nargs="+",
default=None,
help="Filter to specific agents (e.g. claude-code codex)",
)
parser.add_argument(
"--max-transcript-chars",
type=int,
default=150000,
help="Max chars per condensed transcript (default: 150000)",
)
parser.add_argument(
"--max-sessions",
type=int,
default=None,
help="Only prepare the N most recent sessions (default: all)",
)
parser.add_argument(
"--refresh", action="store_true", help="Re-prepare even if already done"
)
parser.add_argument(
"--sessions",
nargs="+",
default=None,
help="Only prepare specific sessions (agent/session_id format)",
)
parser.add_argument(
"--out-dir",
default=None,
help="Output directory for prepared transcripts (default: <analysis-dir>/<project>/results/<label>)",
)
args = parser.parse_args()
cwd = os.path.realpath(args.cwd)
base_dir = Path(args.analysis_dir) if args.analysis_dir else Path(cwd) / ".tessl" / "logs"
label = args.label or datetime.now().strftime("%Y-%m-%d")
if args.project:
projects = [args.project]
else:
projects = _discover_projects(base_dir)
if not projects:
print(f"No projects found under {base_dir}/")
return
for project in projects:
if len(projects) > 1:
print(f"── {project} ──")
# Security: reads only from normalized/ where secrets have already been
# redacted by normalize_logs.py — raw logs are never used past that stage.
logs_dir = base_dir / project / "normalized"
if not logs_dir.exists():
print(f" Normalized logs not found: {logs_dir}")
continue
if args.out_dir:
out_dir = Path(args.out_dir)
else:
out_dir = base_dir / project / "results" / label
prepare_project(
logs_dir, out_dir, args.max_transcript_chars, args.agents, args.refresh,
max_sessions=args.max_sessions,
session_ids=args.sessions,
)
if __name__ == "__main__":
main()