CtrlK
BlogDocsLog inGet started
Tessl Logo

try-tessl/agent-quality

Analyze agent sessions against verifier checklists, detect friction points, and create structured verifiers from skills and docs. Produces per-session verdicts and aggregated quality reports.

88

2.93x
Quality

86%

Does it follow best practices?

Impact

97%

2.93x

Average score across 3 eval scenarios

SecuritybySnyk

Passed

No known issues

Overview
Quality
Evals
Security
Files

search_sessions.pyskills/analyze-sessions/scripts/

#!/usr/bin/env python3
"""
Search prepared session transcripts for keywords or patterns.

Fast text search across all prepared transcripts to find sessions relevant
to a particular behavior, skill, tool, or topic. Useful when users want to
analyze a specific behavior but don't know which sessions contain it.

Reads from prepared/ transcripts (or normalized/ JSONL if prepared/ doesn't
exist yet). Returns matching sessions with context snippets.

No external dependencies.

Usage:
    python3 search_sessions.py --query "tessl tile new"
    python3 search_sessions.py --query "tessl tile new" --context 3
    python3 search_sessions.py --project-dir /path/to/project --query "pattern1" "pattern2" --match-all
"""

import argparse
import json
import os
import re
import sys
from pathlib import Path


def _analysis_dir_from_project(project_dir: str) -> str:
    """Derive the analysis data directory from a project directory path."""
    slug = project_dir.replace("/", "-")
    return os.path.join(os.path.expanduser("~"), ".tessl", "session-analyses", slug)


def find_prepared_sessions(analysis_dir: Path) -> list[dict]:
    """Find all prepared session transcripts, deduplicating across runs.

    Scans all runs (newest first) and collects unique sessions by ID.
    This ensures search covers all sessions, not just the latest run.
    """
    seen = set()
    sessions = []

    runs_dir = analysis_dir / "runs"
    if not runs_dir.exists():
        return sessions

    # Scan all runs newest-first; keep first occurrence of each session
    for run_dir in sorted(runs_dir.iterdir(), reverse=True):
        prepared = run_dir / "prepared"
        if not prepared.exists():
            continue
        for s in _scan_prepared_dir(prepared, str(run_dir)):
            key = f"{s['agent']}/{s['session_id']}"
            if key not in seen:
                seen.add(key)
                sessions.append(s)

    return sessions


def find_normalized_sessions(analysis_dir: Path) -> list[dict]:
    """Find all normalized session JSONL files (fallback when no prepared transcripts)."""
    sessions = []
    norm_dir = analysis_dir / "normalized"
    if not norm_dir.exists():
        return sessions

    for agent_dir in sorted(norm_dir.iterdir()):
        if not agent_dir.is_dir() or agent_dir.name.startswith((".", "_")):
            continue
        for f in sorted(agent_dir.glob("*.jsonl")):
            sessions.append({
                "agent": agent_dir.name,
                "session_id": f.stem,
                "file": f,
                "source": "normalized",
            })
    return sessions


def _scan_prepared_dir(prepared_dir: Path, run_path: str) -> list[dict]:
    """Scan a prepared/ directory for session transcript files."""
    sessions = []
    for agent_dir in sorted(prepared_dir.iterdir()):
        if not agent_dir.is_dir() or agent_dir.name.startswith((".", "_")):
            continue
        for txt_file in sorted(agent_dir.glob("*.txt")):
            sessions.append({
                "agent": agent_dir.name,
                "session_id": txt_file.stem,
                "file": txt_file,
                "source": "prepared",
                "run_path": run_path,
            })
    return sessions


def search_file(
    file_path: Path,
    patterns: list[re.Pattern],
    match_all: bool = False,
    context_lines: int = 2,
    max_matches_per_file: int = 5,
) -> list[dict]:
    """Search a file for patterns, returning matches with context."""
    try:
        content = file_path.read_text(encoding="utf-8", errors="replace")
    except OSError:
        return []

    lines = content.splitlines()
    all_matches = []

    for i, line in enumerate(lines):
        matched_patterns = []
        for pat in patterns:
            if pat.search(line):
                matched_patterns.append(pat.pattern)

        if not matched_patterns:
            continue

        # For match_all, we check at the file level later
        start = max(0, i - context_lines)
        end = min(len(lines), i + context_lines + 1)
        context = lines[start:end]

        all_matches.append({
            "line_number": i + 1,
            "matched_line": line.strip(),
            "matched_patterns": matched_patterns,
            "context": "\n".join(context),
        })

        if not match_all and len(all_matches) >= max_matches_per_file:
            break

    return all_matches[:max_matches_per_file]


def search_sessions(
    analysis_dir: Path,
    queries: list[str],
    match_all: bool = False,
    context_lines: int = 2,
    max_matches_per_session: int = 5,
    case_insensitive: bool = True,
) -> list[dict]:
    """Search all sessions for query patterns.

    Returns list of matching sessions with match details.
    """
    # Try prepared transcripts first, fall back to normalized
    sessions = find_prepared_sessions(analysis_dir)
    source_type = "prepared"
    if not sessions:
        sessions = find_normalized_sessions(analysis_dir)
        source_type = "normalized"

    if not sessions:
        return []

    # Compile patterns — queries come from the local user's own CLI args (not
    # network input), so ReDoS is not a risk here.  We validate anyway to give
    # a clear error on malformed regex.
    flags = re.IGNORECASE if case_insensitive else 0
    patterns = []
    for q in queries:
        try:
            patterns.append(re.compile(q, flags))
        except re.error as exc:
            raise ValueError(f"Invalid regex pattern {q!r}: {exc}") from exc

    results = []
    for session in sessions:
        matches = search_file(
            session["file"],
            patterns,
            match_all=match_all,
            context_lines=context_lines,
            max_matches_per_file=max_matches_per_session,
        )

        if not matches:
            continue

        # For match_all, verify all patterns were matched somewhere in the file
        if match_all:
            found_patterns = set()
            for m in matches:
                found_patterns.update(m["matched_patterns"])
            if len(found_patterns) < len(patterns):
                continue

        results.append({
            "agent": session["agent"],
            "session_id": session["session_id"],
            "file": str(session["file"]),
            "source": source_type,
            "match_count": len(matches),
            "matches": matches,
        })

    return results


def main():
    parser = argparse.ArgumentParser(
        description="Search session transcripts for keywords or patterns"
    )
    parser.add_argument(
        "--project-dir", default=os.getcwd(),
        help="Project directory (default: cwd). Analysis dir is derived from this unless --analysis-dir is given.",
    )
    parser.add_argument(
        "--analysis-dir", default=None,
        help="Analysis data directory (default: ~/.tessl/session-analyses/<project-slug>)",
    )
    parser.add_argument(
        "--query", "-q", nargs="+", required=True,
        help="Search terms or regex patterns",
    )
    parser.add_argument(
        "--match-all", action="store_true",
        help="Require all query patterns to match (default: any)",
    )
    parser.add_argument(
        "--context", "-C", type=int, default=2,
        help="Context lines around each match (default: 2)",
    )
    parser.add_argument(
        "--max-matches", type=int, default=5,
        help="Max matches shown per session (default: 5)",
    )
    parser.add_argument(
        "--case-sensitive", action="store_true",
        help="Case-sensitive search (default: case-insensitive)",
    )
    parser.add_argument(
        "--json", action="store_true", dest="json_output",
        help="Output results as JSON (for piping to other scripts)",
    )
    parser.add_argument(
        "--ids-only", action="store_true",
        help="Only output session IDs (agent/session_id), one per line",
    )
    args = parser.parse_args()

    project_dir = os.path.realpath(args.project_dir)
    analysis_dir = Path(os.path.realpath(args.analysis_dir)) if args.analysis_dir else Path(_analysis_dir_from_project(project_dir))
    results = search_sessions(
        analysis_dir,
        args.query,
        match_all=args.match_all,
        context_lines=args.context,
        max_matches_per_session=args.max_matches,
        case_insensitive=not args.case_sensitive,
    )

    if args.ids_only:
        for r in results:
            print(f"{r['agent']}/{r['session_id']}")
        return

    if args.json_output:
        print(json.dumps(results, indent=2))
        return

    # Human-readable output
    if not results:
        print(f"No sessions matched query: {' '.join(args.query)}")
        print(f"Searched in: {analysis_dir}")
        return

    print(f"Found {len(results)} session(s) matching: {' '.join(args.query)}\n")

    for r in results:
        print(f"{'=' * 60}")
        print(f"  {r['agent']}/{r['session_id']}  ({r['match_count']} matches)")
        print(f"  File: {r['file']}")
        print(f"{'=' * 60}")

        for m in r["matches"]:
            print(f"\n  Line {m['line_number']}:")
            for ctx_line in m["context"].splitlines():
                # Strip terminal escape sequences from untrusted log content
                safe_line = re.sub(r"\x1b\[[0-?]*[ -/]*[@-~]", "", ctx_line)
                # Highlight the matched line
                if ctx_line.strip() == m["matched_line"]:
                    print(f"  > {safe_line}")
                else:
                    print(f"    {safe_line}")

        print()

    # Summary suitable for piping to --sessions
    print(f"{'─' * 60}")
    print("Session IDs (use with --sessions flag):")
    for r in results:
        print(f"  {r['agent']}/{r['session_id']}")


if __name__ == "__main__":
    main()

README.md

tile.json