Analyze agent sessions against verifier checklists, detect friction points, and create structured verifiers from skills and docs. Produces per-session verdicts and aggregated quality reports.
88
86%
Does it follow best practices?
Impact
97%
2.93xAverage score across 3 eval scenarios
Passed
No known issues
#!/usr/bin/env python3
"""
Search prepared session transcripts for keywords or patterns.
Fast text search across all prepared transcripts to find sessions relevant
to a particular behavior, skill, tool, or topic. Useful when users want to
analyze a specific behavior but don't know which sessions contain it.
Reads from prepared/ transcripts (or normalized/ JSONL if prepared/ doesn't
exist yet). Returns matching sessions with context snippets.
No external dependencies.
Usage:
python3 search_sessions.py --query "tessl tile new"
python3 search_sessions.py --query "tessl tile new" --context 3
python3 search_sessions.py --project-dir /path/to/project --query "pattern1" "pattern2" --match-all
"""
import argparse
import json
import os
import re
import sys
from pathlib import Path
def _analysis_dir_from_project(project_dir: str) -> str:
"""Derive the analysis data directory from a project directory path."""
slug = project_dir.replace("/", "-")
return os.path.join(os.path.expanduser("~"), ".tessl", "session-analyses", slug)
def find_prepared_sessions(analysis_dir: Path) -> list[dict]:
"""Find all prepared session transcripts, deduplicating across runs.
Scans all runs (newest first) and collects unique sessions by ID.
This ensures search covers all sessions, not just the latest run.
"""
seen = set()
sessions = []
runs_dir = analysis_dir / "runs"
if not runs_dir.exists():
return sessions
# Scan all runs newest-first; keep first occurrence of each session
for run_dir in sorted(runs_dir.iterdir(), reverse=True):
prepared = run_dir / "prepared"
if not prepared.exists():
continue
for s in _scan_prepared_dir(prepared, str(run_dir)):
key = f"{s['agent']}/{s['session_id']}"
if key not in seen:
seen.add(key)
sessions.append(s)
return sessions
def find_normalized_sessions(analysis_dir: Path) -> list[dict]:
"""Find all normalized session JSONL files (fallback when no prepared transcripts)."""
sessions = []
norm_dir = analysis_dir / "normalized"
if not norm_dir.exists():
return sessions
for agent_dir in sorted(norm_dir.iterdir()):
if not agent_dir.is_dir() or agent_dir.name.startswith((".", "_")):
continue
for f in sorted(agent_dir.glob("*.jsonl")):
sessions.append({
"agent": agent_dir.name,
"session_id": f.stem,
"file": f,
"source": "normalized",
})
return sessions
def _scan_prepared_dir(prepared_dir: Path, run_path: str) -> list[dict]:
"""Scan a prepared/ directory for session transcript files."""
sessions = []
for agent_dir in sorted(prepared_dir.iterdir()):
if not agent_dir.is_dir() or agent_dir.name.startswith((".", "_")):
continue
for txt_file in sorted(agent_dir.glob("*.txt")):
sessions.append({
"agent": agent_dir.name,
"session_id": txt_file.stem,
"file": txt_file,
"source": "prepared",
"run_path": run_path,
})
return sessions
def search_file(
file_path: Path,
patterns: list[re.Pattern],
match_all: bool = False,
context_lines: int = 2,
max_matches_per_file: int = 5,
) -> list[dict]:
"""Search a file for patterns, returning matches with context."""
try:
content = file_path.read_text(encoding="utf-8", errors="replace")
except OSError:
return []
lines = content.splitlines()
all_matches = []
for i, line in enumerate(lines):
matched_patterns = []
for pat in patterns:
if pat.search(line):
matched_patterns.append(pat.pattern)
if not matched_patterns:
continue
# For match_all, we check at the file level later
start = max(0, i - context_lines)
end = min(len(lines), i + context_lines + 1)
context = lines[start:end]
all_matches.append({
"line_number": i + 1,
"matched_line": line.strip(),
"matched_patterns": matched_patterns,
"context": "\n".join(context),
})
if not match_all and len(all_matches) >= max_matches_per_file:
break
return all_matches[:max_matches_per_file]
def search_sessions(
analysis_dir: Path,
queries: list[str],
match_all: bool = False,
context_lines: int = 2,
max_matches_per_session: int = 5,
case_insensitive: bool = True,
) -> list[dict]:
"""Search all sessions for query patterns.
Returns list of matching sessions with match details.
"""
# Try prepared transcripts first, fall back to normalized
sessions = find_prepared_sessions(analysis_dir)
source_type = "prepared"
if not sessions:
sessions = find_normalized_sessions(analysis_dir)
source_type = "normalized"
if not sessions:
return []
# Compile patterns — queries come from the local user's own CLI args (not
# network input), so ReDoS is not a risk here. We validate anyway to give
# a clear error on malformed regex.
flags = re.IGNORECASE if case_insensitive else 0
patterns = []
for q in queries:
try:
patterns.append(re.compile(q, flags))
except re.error as exc:
raise ValueError(f"Invalid regex pattern {q!r}: {exc}") from exc
results = []
for session in sessions:
matches = search_file(
session["file"],
patterns,
match_all=match_all,
context_lines=context_lines,
max_matches_per_file=max_matches_per_session,
)
if not matches:
continue
# For match_all, verify all patterns were matched somewhere in the file
if match_all:
found_patterns = set()
for m in matches:
found_patterns.update(m["matched_patterns"])
if len(found_patterns) < len(patterns):
continue
results.append({
"agent": session["agent"],
"session_id": session["session_id"],
"file": str(session["file"]),
"source": source_type,
"match_count": len(matches),
"matches": matches,
})
return results
def main():
parser = argparse.ArgumentParser(
description="Search session transcripts for keywords or patterns"
)
parser.add_argument(
"--project-dir", default=os.getcwd(),
help="Project directory (default: cwd). Analysis dir is derived from this unless --analysis-dir is given.",
)
parser.add_argument(
"--analysis-dir", default=None,
help="Analysis data directory (default: ~/.tessl/session-analyses/<project-slug>)",
)
parser.add_argument(
"--query", "-q", nargs="+", required=True,
help="Search terms or regex patterns",
)
parser.add_argument(
"--match-all", action="store_true",
help="Require all query patterns to match (default: any)",
)
parser.add_argument(
"--context", "-C", type=int, default=2,
help="Context lines around each match (default: 2)",
)
parser.add_argument(
"--max-matches", type=int, default=5,
help="Max matches shown per session (default: 5)",
)
parser.add_argument(
"--case-sensitive", action="store_true",
help="Case-sensitive search (default: case-insensitive)",
)
parser.add_argument(
"--json", action="store_true", dest="json_output",
help="Output results as JSON (for piping to other scripts)",
)
parser.add_argument(
"--ids-only", action="store_true",
help="Only output session IDs (agent/session_id), one per line",
)
args = parser.parse_args()
project_dir = os.path.realpath(args.project_dir)
analysis_dir = Path(os.path.realpath(args.analysis_dir)) if args.analysis_dir else Path(_analysis_dir_from_project(project_dir))
results = search_sessions(
analysis_dir,
args.query,
match_all=args.match_all,
context_lines=args.context,
max_matches_per_session=args.max_matches,
case_insensitive=not args.case_sensitive,
)
if args.ids_only:
for r in results:
print(f"{r['agent']}/{r['session_id']}")
return
if args.json_output:
print(json.dumps(results, indent=2))
return
# Human-readable output
if not results:
print(f"No sessions matched query: {' '.join(args.query)}")
print(f"Searched in: {analysis_dir}")
return
print(f"Found {len(results)} session(s) matching: {' '.join(args.query)}\n")
for r in results:
print(f"{'=' * 60}")
print(f" {r['agent']}/{r['session_id']} ({r['match_count']} matches)")
print(f" File: {r['file']}")
print(f"{'=' * 60}")
for m in r["matches"]:
print(f"\n Line {m['line_number']}:")
for ctx_line in m["context"].splitlines():
# Strip terminal escape sequences from untrusted log content
safe_line = re.sub(r"\x1b\[[0-?]*[ -/]*[@-~]", "", ctx_line)
# Highlight the matched line
if ctx_line.strip() == m["matched_line"]:
print(f" > {safe_line}")
else:
print(f" {safe_line}")
print()
# Summary suitable for piping to --sessions
print(f"{'─' * 60}")
print("Session IDs (use with --sessions flag):")
for r in results:
print(f" {r['agent']}/{r['session_id']}")
if __name__ == "__main__":
main()