Collect and normalize agent logs, discover installed verifiers, and dispatch LLM judges to evaluate adherence. Produces per-session verdicts and aggregated reports.
91
90%
Does it follow best practices?
Impact
96%
3.09xAverage score across 3 eval scenarios
Passed
No known issues
#!/usr/bin/env python3
"""
Normalize raw agent logs to NormalizedEvent JSONL format.
Self-contained — no external dependencies beyond stdlib.
Supports: claude-code, codex, gemini, cursor-ide, cursor-agent.
Security: All raw log content is passed through redact_secrets() before parsing
to strip credentials (API keys, JWTs, private keys, database URLs, etc.).
Downstream pipeline stages (prepare_sessions, dispatch_judges, generate_report)
only ever read from the normalized output, so secrets never reach LLM judges or
HTML reports.
Input: .tessl/logs/<project>/raw/<agent>/*.(jsonl|json)
Output: .tessl/logs/<project>/normalized/<agent>/*.jsonl
Each output line is a JSON object with:
kind: message | tool_call | tool_result | usage | system | error
turn: int >= 1
timestamp: ISO 8601 with trailing Z
stream_id: string
sequence: int >= 0
channel: output | thinking | actions | status
actor: user | assistant | tool | system
segments: [{type, data}]
tool: {provider, name, display_name, call_id, status} (optional)
annotations: {}
raw: original payload (optional)
"""
import argparse
import json
import os
import re
import sys
from collections import defaultdict
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Any, Iterator
# ---------------------------------------------------------------------------
# Secret redaction (applied to raw text before parsing)
# ---------------------------------------------------------------------------
_REDACTION_PATTERNS: list[tuple[re.Pattern, str]] = [
# Private keys (PEM blocks)
(re.compile(
r"-----BEGIN [A-Z ]*PRIVATE KEY-----[\s\S]*?-----END [A-Z ]*PRIVATE KEY-----"
), "[REDACTED]"),
# JWT tokens
(re.compile(
r"eyJ[A-Za-z0-9_-]+\.eyJ[A-Za-z0-9_-]+\.[A-Za-z0-9_-]+"
), "[REDACTED]"),
# Database URLs with credentials
(re.compile(
r"(postgres|mysql|mongodb)://[^\s'\"]+:[^\s'\"]+@[^\s'\"]+"
), "[REDACTED]"),
# AWS access keys
(re.compile(r"AKIA[0-9A-Z]{16}"), "[REDACTED]"),
# Authorization Bearer tokens
(re.compile(
r"Authorization:\s*Bearer\s+[A-Za-z0-9_-]+"
), "Authorization: Bearer [REDACTED]"),
# API keys (sk_*, pk_* style — Stripe, OpenAI, etc.)
(re.compile(r"(sk|pk)_[a-z]+_[A-Za-z0-9]{20,}"), "[REDACTED]"),
]
def redact_secrets(text: str) -> str:
"""Apply built-in redaction patterns to remove secrets from raw text."""
for pattern, replacement in _REDACTION_PATTERNS:
text = pattern.sub(replacement, text)
return text
# ---------------------------------------------------------------------------
# Shared helpers
# ---------------------------------------------------------------------------
_CAMEL = re.compile(r"(?<!^)(?=[A-Z])")
_NON_ALNUM = re.compile(r"[^a-z0-9]+")
def _slugify(value: str) -> str:
lowered = _CAMEL.sub("_", value.strip()).lower()
return _NON_ALNUM.sub("_", lowered).strip("_")
def _iso_from_epoch(value: int | float) -> str:
if value > 1e12: # assume milliseconds
value = value / 1000.0
dt = datetime.fromtimestamp(value, tz=timezone.utc)
return dt.isoformat().replace("+00:00", "Z")
def _iso(ts: str | int | float | datetime | None) -> str:
"""Normalise to ISO-8601 with trailing Z."""
if ts is None:
return datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
if isinstance(ts, datetime):
if ts.tzinfo is None:
ts = ts.replace(tzinfo=timezone.utc)
return ts.astimezone(timezone.utc).isoformat().replace("+00:00", "Z")
if isinstance(ts, (int, float)):
return _iso_from_epoch(ts)
s = str(ts).strip()
if s.isdigit():
return _iso_from_epoch(int(s))
if s.endswith("+00:00"):
s = s[:-6] + "Z"
if not s.endswith("Z"):
s += "Z"
return s
class _Sequencer:
"""Track per-stream sequence numbers."""
def __init__(self):
self._seqs: defaultdict[str, int] = defaultdict(int)
def next(self, stream_id: str) -> int:
seq = self._seqs[stream_id]
self._seqs[stream_id] = seq + 1
return seq
def _event(
kind: str,
actor: str,
channel: str,
timestamp: str,
stream_id: str,
sequence: int,
turn: int,
segments: list[dict],
*,
tool: dict | None = None,
annotations: dict | None = None,
usage: dict | None = None,
raw: dict | None = None,
) -> dict:
d: dict[str, Any] = {
"kind": kind,
"turn": turn,
"timestamp": timestamp,
"stream_id": stream_id,
"sequence": sequence,
"channel": channel,
"actor": actor,
"segments": segments,
"annotations": annotations or {},
}
if tool:
d["tool"] = tool
if usage:
d["usage"] = usage
if raw:
d["raw"] = raw
return d
def _parse_tool_name(raw_name: str) -> dict:
"""Parse a tool name into {provider, namespace, name, display_name}."""
if raw_name.startswith("mcp__"):
remainder = raw_name[5:]
parts = remainder.split("__", 1)
if len(parts) == 2:
ns, n = parts
else:
ns, n = "default", parts[0]
return {
"provider": "mcp",
"namespace": _slugify(ns),
"name": _slugify(n),
"display_name": raw_name,
}
return {
"provider": "builtin",
"name": _slugify(raw_name),
"display_name": raw_name,
}
def _text_seg(text: str) -> dict:
return {"type": "text", "data": text}
def _json_seg(obj: Any) -> dict:
return {"type": "json", "data": json.dumps(obj, ensure_ascii=False, separators=(",", ":"))}
_MAX_TOOL_OUTPUT_CHARS = 5000
def _add_text_segment(segments: list[dict], text: str, annotations: dict, max_chars: int) -> None:
if not text:
return
if len(text) > max_chars:
annotations["truncated"] = True
annotations["truncated_max_chars"] = max_chars
text = text[:max_chars]
segments.append(_text_seg(text))
def _add_json_and_text_segments(
segments: list[dict], obj: Any, annotations: dict, max_chars: int
) -> None:
segments.append(_json_seg(obj))
try:
text = json.dumps(obj, ensure_ascii=False)
except TypeError:
text = str(obj)
_add_text_segment(segments, text, annotations, max_chars)
# ---------------------------------------------------------------------------
# claude-code normalizer
# ---------------------------------------------------------------------------
def normalize_claude_code(lines: list[str]) -> Iterator[dict]:
"""Normalize claude-code raw JSONL.
Fixes the upstream gap: user text messages are now emitted.
"""
seq = _Sequencer()
turn = 0
pending_tools: dict[str, dict] = {} # call_id -> tool dict
event_counter = 0
base_ts: str | None = None
def next_ts() -> str:
nonlocal event_counter, base_ts
if base_ts is None:
base_ts = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
# Add microsecond offset to preserve ordering
ts = base_ts[:-1] + f"{event_counter:06d}Z" if base_ts.endswith("Z") else base_ts
event_counter += 1
return ts
for raw_line in lines:
raw_line = raw_line.strip()
if not raw_line:
continue
try:
payload = json.loads(raw_line)
except json.JSONDecodeError:
continue
if not isinstance(payload, dict):
continue
entry_type = payload.get("type")
# Use timestamp from payload if available
ts = _iso(payload.get("timestamp")) if payload.get("timestamp") else next_ts()
if entry_type == "system":
subtype = payload.get("subtype")
annotations = {"system_subtype": subtype} if subtype else {}
if subtype == "init":
parts = []
if payload.get("model"):
parts.append(f"Model: {payload['model']}")
if payload.get("claude_code_version"):
parts.append(f"Version: {payload['claude_code_version']}")
if payload.get("permissionMode"):
parts.append(f"Permission Mode: {payload['permissionMode']}")
text = "\n".join(parts)
if not text:
continue
sid = "system_init"
yield _event("system", "system", "status", ts, sid, seq.next(sid), max(turn, 1),
[_text_seg(text)], annotations=annotations, raw=payload)
else:
text = payload.get("message") or payload.get("text") or ""
if not text and subtype:
text = f"System event: {subtype}"
if not text:
continue
sid = f"system_{subtype or 'event'}"
yield _event("system", "system", "status", ts, sid, seq.next(sid), max(turn, 1),
[_text_seg(text)], annotations=annotations, raw=payload)
elif entry_type == "user":
turn += 1
message = payload.get("message") or {}
content = message.get("content")
# Handle string content (user text message)
if isinstance(content, str) and content.strip():
msg_id = payload.get("uuid") or f"user_msg_{turn}"
yield _event("message", "user", "output", ts, msg_id, seq.next(msg_id), turn,
[_text_seg(content)], raw={"type": "text", "text": content})
# Handle list content (tool results + possibly text)
elif isinstance(content, list):
for item in content:
if not isinstance(item, dict):
# Plain string item in content list
if isinstance(item, str) and item.strip():
msg_id = payload.get("uuid") or f"user_msg_{turn}"
yield _event("message", "user", "output", ts, msg_id, seq.next(msg_id),
turn, [_text_seg(item)],
raw={"type": "text", "text": item})
continue
item_type = item.get("type")
if item_type == "text":
text = item.get("text", "")
if text.strip():
msg_id = payload.get("uuid") or f"user_msg_{turn}"
yield _event("message", "user", "output", ts, msg_id,
seq.next(msg_id), turn, [_text_seg(text)],
raw=item)
elif item_type == "tool_result":
tool_use_id = item.get("tool_use_id")
is_error = item.get("is_error", False)
content_data = item.get("content", "")
# Resolve pending tool call
tool_info = None
if tool_use_id and tool_use_id in pending_tools:
tool_info = dict(pending_tools[tool_use_id])
tool_info["status"] = "error" if is_error else "ok"
segments: list[dict] = []
annotations: dict[str, Any] = {}
if isinstance(content_data, str) and content_data:
_add_text_segment(segments, content_data, annotations, _MAX_TOOL_OUTPUT_CHARS)
elif isinstance(content_data, list):
for block in content_data:
if isinstance(block, dict):
if block.get("type") == "text" and block.get("text"):
_add_text_segment(segments, block["text"], annotations,
_MAX_TOOL_OUTPUT_CHARS)
else:
_add_json_and_text_segments(segments, block, annotations,
_MAX_TOOL_OUTPUT_CHARS)
else:
_add_text_segment(segments, str(block), annotations,
_MAX_TOOL_OUTPUT_CHARS)
sid = tool_use_id or "tool_result"
yield _event("tool_result", "tool", "actions", ts, sid, seq.next(sid),
turn, segments, tool=tool_info, annotations=annotations, raw=item)
elif entry_type == "assistant":
message = payload.get("message") or {}
content = message.get("content") or []
message_id = message.get("id") or f"assistant_msg_{turn}"
for item in content:
if not isinstance(item, dict):
continue
item_type = item.get("type")
if item_type == "text":
text = item.get("text")
if text:
yield _event("message", "assistant", "output", ts, message_id,
seq.next(message_id), max(turn, 1), [_text_seg(text)],
raw=item)
elif item_type == "tool_use":
tool_id = item.get("id")
tool_name = item.get("name", "unknown")
tool_input = item.get("input", {})
tool_info = _parse_tool_name(tool_name)
tool_info["call_id"] = tool_id
if tool_id:
pending_tools[tool_id] = dict(tool_info)
segments = []
if tool_input:
segments.append(_json_seg(tool_input))
sid = tool_id or tool_name
yield _event("tool_call", "assistant", "actions", ts, sid, seq.next(sid),
max(turn, 1), segments, tool=tool_info, raw=item)
elif entry_type == "progress":
# Subagent streaming events. data.message has the same
# structure as top-level user/assistant events.
data = payload.get("data") or {}
msg = data.get("message") or {}
msg_type = msg.get("type") # "assistant" or "user"
inner_content = (msg.get("message") or {}).get("content") or []
if not isinstance(inner_content, list) or not msg_type:
continue
parent_tool_id = payload.get("parentToolUseID", "")
subagent_ann = {"subagent": True}
if parent_tool_id:
subagent_ann["parent_tool_use_id"] = parent_tool_id
if msg_type == "assistant":
for item in inner_content:
if not isinstance(item, dict):
continue
if item.get("type") == "tool_use":
tool_id = item.get("id")
tool_name = item.get("name", "unknown")
tool_input = item.get("input", {})
tool_info = _parse_tool_name(tool_name)
tool_info["call_id"] = tool_id
if tool_id:
pending_tools[tool_id] = dict(tool_info)
segments = []
if tool_input:
segments.append(_json_seg(tool_input))
sid = tool_id or tool_name
yield _event("tool_call", "assistant", "actions", ts,
sid, seq.next(sid), max(turn, 1), segments,
tool=tool_info, annotations=subagent_ann,
raw=item)
elif msg_type == "user":
for item in inner_content:
if not isinstance(item, dict):
continue
if item.get("type") == "tool_result":
tool_use_id = item.get("tool_use_id")
is_error = item.get("is_error", False)
content_data = item.get("content", "")
tool_info = None
if tool_use_id and tool_use_id in pending_tools:
tool_info = dict(pending_tools[tool_use_id])
tool_info["status"] = "error" if is_error else "ok"
segments: list[dict] = []
annotations: dict[str, Any] = dict(subagent_ann)
if isinstance(content_data, str) and content_data:
_add_text_segment(segments, content_data, annotations,
_MAX_TOOL_OUTPUT_CHARS)
elif isinstance(content_data, list):
for block in content_data:
if isinstance(block, dict):
if (block.get("type") == "text"
and block.get("text")):
_add_text_segment(
segments, block["text"], annotations,
_MAX_TOOL_OUTPUT_CHARS)
else:
_add_json_and_text_segments(
segments, block, annotations,
_MAX_TOOL_OUTPUT_CHARS)
else:
_add_text_segment(segments, str(block),
annotations,
_MAX_TOOL_OUTPUT_CHARS)
sid = tool_use_id or "tool_result"
yield _event("tool_result", "tool", "actions", ts,
sid, seq.next(sid), turn, segments,
tool=tool_info, annotations=annotations,
raw=item)
elif entry_type == "result":
usage_data: dict[str, Any] = {}
if payload.get("usage"):
usage_data.update(payload["usage"])
if payload.get("modelUsage"):
usage_data["model_usage"] = payload["modelUsage"]
if payload.get("total_cost_usd") is not None:
usage_data["cost_usd"] = payload["total_cost_usd"]
if payload.get("duration_ms") is not None:
usage_data["duration_ms"] = payload["duration_ms"]
segments = []
if payload.get("result"):
segments.append(_text_seg(payload["result"]))
sid = "turn_result"
yield _event("usage", "system", "status", ts, sid, seq.next(sid), max(turn, 1),
segments, usage=usage_data or None, raw=payload)
# ---------------------------------------------------------------------------
# codex normalizer
# ---------------------------------------------------------------------------
def _unwrap_codex_rs(lines: list[str]) -> list[str]:
"""Unwrap codex_cli_rs wrapper format to legacy flat format.
The new format wraps each entry in {type, timestamp, payload}.
- response_item → payload is the legacy entry (message, function_call, etc.)
- event_msg → map user_message/agent_message to legacy message format
- session_meta → map to legacy session init format
"""
# Detect format: if first parseable line has type=session_meta or response_item
is_new = False
for raw_line in lines:
raw_line = raw_line.strip()
if not raw_line:
continue
try:
data = json.loads(raw_line)
except json.JSONDecodeError:
continue
if isinstance(data, dict) and data.get("type") in ("session_meta", "response_item", "event_msg"):
is_new = True
break
if not is_new:
return lines
unwrapped = []
for raw_line in lines:
raw_line = raw_line.strip()
if not raw_line:
continue
try:
data = json.loads(raw_line)
except json.JSONDecodeError:
continue
if not isinstance(data, dict):
continue
wrapper_type = data.get("type")
ts = data.get("timestamp")
payload = data.get("payload", {})
if not isinstance(payload, dict):
continue
if wrapper_type == "session_meta":
# Convert to legacy session init format
entry = dict(payload)
if ts and "timestamp" not in entry:
entry["timestamp"] = ts
unwrapped.append(json.dumps(entry))
elif wrapper_type == "response_item":
# Payload is already in legacy format (message, function_call, etc.)
entry = dict(payload)
if ts and "timestamp" not in entry:
entry["timestamp"] = ts
unwrapped.append(json.dumps(entry))
elif wrapper_type == "event_msg":
ptype = payload.get("type", "")
if ptype == "user_message":
entry = {
"type": "message",
"role": "user",
"timestamp": ts,
"content": [{"type": "input_text", "text": payload.get("message", "")}],
}
unwrapped.append(json.dumps(entry))
elif ptype == "agent_message":
entry = {
"type": "message",
"role": "assistant",
"timestamp": ts,
"content": [{"type": "output_text", "text": payload.get("text", "")}],
}
unwrapped.append(json.dumps(entry))
elif ptype == "agent_reasoning":
entry = {
"type": "reasoning",
"timestamp": ts,
"summary": [{"text": payload.get("text", "")}],
}
unwrapped.append(json.dumps(entry))
# Skip token_count, task_started, task_complete, etc.
return unwrapped
def normalize_codex(lines: list[str]) -> Iterator[dict]:
"""Normalize codex (OpenAI CLI) raw JSONL."""
lines = _unwrap_codex_rs(lines)
seq = _Sequencer()
turn = 0
pending_tools: dict[str, dict] = {}
for raw_line in lines:
raw_line = raw_line.strip()
if not raw_line:
continue
try:
payload = json.loads(raw_line)
except json.JSONDecodeError:
continue
if not isinstance(payload, dict):
continue
record_type = payload.get("record_type")
if record_type == "state":
continue
entry_type = payload.get("type")
ts = _iso(payload.get("timestamp"))
# Session init (first line with id + instructions)
if "instructions" in payload and "id" in payload and not entry_type:
sid = "system_init"
parts = []
if payload.get("instructions"):
parts.append(f"Instructions loaded ({len(payload['instructions'])} chars)")
git = payload.get("git", {})
if git.get("branch"):
parts.append(f"Branch: {git['branch']}")
text = "\n".join(parts) if parts else "Session init"
yield _event("system", "system", "status", ts, sid, seq.next(sid), 1,
[_text_seg(text)], raw=payload)
continue
if entry_type == "message":
role = payload.get("role")
content = payload.get("content") or []
if role == "user":
turn += 1
for item in content:
if isinstance(item, dict) and item.get("type") == "input_text":
text = item.get("text", "")
if text.strip():
msg_id = f"user_msg_{turn}"
yield _event("message", "user", "output", ts, msg_id,
seq.next(msg_id), turn, [_text_seg(text)], raw=item)
elif role == "assistant":
for item in content:
if isinstance(item, dict) and item.get("type") == "output_text":
text = item.get("text", "")
if text.strip():
msg_id = f"assistant_msg_{turn}"
yield _event("message", "assistant", "output", ts, msg_id,
seq.next(msg_id), max(turn, 1), [_text_seg(text)],
raw=item)
elif entry_type == "function_call":
call_id = payload.get("call_id", "")
name = payload.get("name", "unknown")
args_str = payload.get("arguments", "{}")
tool_info = {"provider": "builtin", "name": _slugify(name),
"display_name": name, "call_id": call_id}
if call_id:
pending_tools[call_id] = dict(tool_info)
segments = []
try:
args = json.loads(args_str)
segments.append(_json_seg(args))
except (json.JSONDecodeError, TypeError):
if args_str:
segments.append(_text_seg(args_str))
sid = call_id or name
yield _event("tool_call", "assistant", "actions", ts, sid, seq.next(sid),
max(turn, 1), segments, tool=tool_info, raw=payload)
elif entry_type == "function_call_output":
call_id = payload.get("call_id", "")
output = payload.get("output", "")
tool_info = None
if call_id and call_id in pending_tools:
tool_info = dict(pending_tools[call_id])
tool_info["status"] = "ok"
segments = []
annotations: dict[str, Any] = {}
if isinstance(output, str) and output:
# Try to parse as JSON to extract metadata
try:
parsed = json.loads(output)
if isinstance(parsed, dict):
actual_output = parsed.get("output", "")
exit_code = parsed.get("metadata", {}).get("exit_code")
if actual_output:
_add_text_segment(segments, actual_output, annotations,
_MAX_TOOL_OUTPUT_CHARS)
if tool_info and exit_code is not None:
tool_info["exit_code"] = exit_code
if exit_code != 0:
tool_info["status"] = "error"
_add_json_and_text_segments(segments, parsed, annotations,
_MAX_TOOL_OUTPUT_CHARS)
else:
_add_text_segment(segments, output, annotations, _MAX_TOOL_OUTPUT_CHARS)
except (json.JSONDecodeError, TypeError):
_add_text_segment(segments, output, annotations, _MAX_TOOL_OUTPUT_CHARS)
sid = call_id or "tool_result"
yield _event("tool_result", "tool", "actions", ts, sid, seq.next(sid),
max(turn, 1), segments, tool=tool_info, annotations=annotations, raw=payload)
elif entry_type == "reasoning":
summary = payload.get("summary", [])
for s in summary:
if isinstance(s, dict) and s.get("text"):
sid = payload.get("id", "reasoning")
yield _event("message", "assistant", "thinking", ts, sid, seq.next(sid),
max(turn, 1), [_text_seg(s["text"])], raw=payload)
# ---------------------------------------------------------------------------
# gemini normalizer
# ---------------------------------------------------------------------------
# Gemini tool names → canonical slugs
_GEMINI_TOOL_MAP = {
"run_shell_command": "bash",
"read_file": "read",
"write_file": "write",
"edit_file": "edit",
"list_dir": "glob",
"activate_skill": "skill",
}
def normalize_gemini(data: dict | list) -> Iterator[dict]:
"""Normalize gemini JSON.
Handles two formats:
1. Session format: {sessionId, messages: [{id, timestamp, type, content, toolCalls}]}
2. Flat log format (logs.json): [{sessionId, messageId, type, message, timestamp}]
"""
# Detect format
if isinstance(data, list) and data and isinstance(data[0], dict):
if "messages" in data[0]:
# List of session objects
sessions = data
elif "message" in data[0] and "messageId" in data[0]:
# Flat log format — group by sessionId into pseudo-sessions
by_session: dict[str, list] = {}
for entry in data:
sid = entry.get("sessionId", "unknown")
by_session.setdefault(sid, []).append(entry)
sessions = []
for sid, entries in by_session.items():
msgs = []
for e in sorted(entries, key=lambda x: x.get("messageId", 0)):
msgs.append({
"id": f"{sid}_{e.get('messageId', 0)}",
"timestamp": e.get("timestamp"),
"type": e.get("type", ""),
"content": e.get("message", ""),
})
sessions.append({"sessionId": sid, "messages": msgs})
else:
sessions = data
elif isinstance(data, dict):
sessions = [data]
else:
return
for session in sessions:
if not isinstance(session, dict):
continue
messages = session.get("messages", [])
seq = _Sequencer()
turn = 0
for msg in messages:
if not isinstance(msg, dict):
continue
msg_type = msg.get("type", "")
ts = _iso(msg.get("timestamp"))
raw_content = msg.get("content", "")
msg_id = msg.get("id", "")
# Gemini content can be a string or a list of {text: "..."} parts
if isinstance(raw_content, list):
content = "\n".join(
p.get("text", "") for p in raw_content
if isinstance(p, dict) and p.get("text")
).strip()
elif isinstance(raw_content, str):
content = raw_content.strip()
else:
content = str(raw_content).strip()
if msg_type == "info":
if content:
sid = msg_id or "info"
yield _event("system", "system", "status", ts, sid, seq.next(sid),
max(turn, 1), [_text_seg(content)], raw=msg)
elif msg_type == "user":
turn += 1
if content:
sid = msg_id or f"user_{turn}"
yield _event("message", "user", "output", ts, sid, seq.next(sid), turn,
[_text_seg(content)], raw=msg)
elif msg_type == "gemini":
if content:
sid = msg_id or f"gemini_{turn}"
yield _event("message", "assistant", "output", ts, sid, seq.next(sid),
max(turn, 1), [_text_seg(content)], raw=msg)
# Process tool calls
for tc in msg.get("toolCalls", []):
tc_id = tc.get("id", "")
tc_name = tc.get("name", "unknown")
tc_args = tc.get("args", {})
canonical = _GEMINI_TOOL_MAP.get(tc_name)
tool_info = {"provider": "builtin",
"name": canonical or _slugify(tc_name),
"display_name": tc_name, "call_id": tc_id}
segments = [_json_seg(tc_args)] if tc_args else []
sid = tc_id or tc_name
yield _event("tool_call", "assistant", "actions", ts, sid, seq.next(sid),
max(turn, 1), segments, tool=tool_info, raw=tc)
# Tool results are inline in gemini format
for result in tc.get("result", []):
if isinstance(result, dict):
resp = result.get("functionResponse", {})
output = resp.get("response", {}).get("output", "")
result_tool = dict(tool_info)
result_tool["status"] = "ok"
r_segments: list[dict] = []
annotations: dict[str, Any] = {}
if output:
try:
parsed = json.loads(output)
_add_json_and_text_segments(r_segments, parsed, annotations,
_MAX_TOOL_OUTPUT_CHARS)
except (json.JSONDecodeError, TypeError):
_add_text_segment(r_segments, output, annotations,
_MAX_TOOL_OUTPUT_CHARS)
r_sid = tc_id or "tool_result"
yield _event("tool_result", "tool", "actions", ts, r_sid,
seq.next(r_sid), max(turn, 1), r_segments,
tool=result_tool, annotations=annotations, raw=result)
# ---------------------------------------------------------------------------
# cursor-ide normalizer
# ---------------------------------------------------------------------------
# Cursor-ide tool names → canonical slugs (matching claude-code conventions)
_CURSOR_TOOL_MAP = {
"run_terminal_cmd": "bash",
"run_terminal_command": "bash",
"run_terminal_command_v2": "bash",
"read_file": "read",
"read_file_v2": "read",
"search_replace": "edit",
"edit_file": "edit",
"edit_file_v2": "edit",
"write": "write",
"write_file": "write",
"grep": "grep",
"grep_search": "grep",
"codebase_search": "grep",
"file_search": "grep",
"glob_file_search": "glob",
"list_dir": "glob",
"list_dir_v2": "glob",
"todo_write": "task",
"read_lints": "lint",
}
def _normalize_cursor_args(name: str, raw_args: dict) -> dict:
"""Normalize cursor-ide tool arg names to canonical field names.
Ensures field names match what other agents produce so verifiers
can query file_path, command, etc. consistently.
"""
args = dict(raw_args)
# file_path normalization
if "target_file" in args and "file_path" not in args:
args["file_path"] = args.pop("target_file")
if "target_directory" in args and "path" not in args:
args["path"] = args.pop("target_directory")
# write tool: "contents" → "content"
if name in ("write", "write_file") and "contents" in args and "content" not in args:
args["content"] = args.pop("contents")
return args
def _parse_cursor_tool_event(tfd: dict, ts: str, turn: int,
seq: "_Sequencer") -> list[dict]:
"""Parse a toolFormerData dict into tool_call + tool_result events."""
events: list[dict] = []
cursor_name = tfd["name"]
canonical = _CURSOR_TOOL_MAP.get(cursor_name)
call_id = tfd.get("toolCallId", "")
if canonical:
tool_info = {"provider": "builtin", "name": canonical,
"display_name": cursor_name, "call_id": call_id}
elif cursor_name.startswith("mcp_"):
tool_info = _parse_tool_name(cursor_name)
tool_info["call_id"] = call_id
else:
tool_info = {"provider": "builtin", "name": _slugify(cursor_name),
"display_name": cursor_name, "call_id": call_id}
# Parse and normalize tool input (rawArgs preferred, params fallback)
segments: list[dict] = []
raw_args_str = tfd.get("rawArgs") or tfd.get("params")
if raw_args_str:
try:
args = json.loads(raw_args_str)
if isinstance(args, dict):
if "relativeWorkspacePath" in args:
args["file_path"] = args.pop("relativeWorkspacePath")
if "streamingContent" in args:
args["content"] = args.pop("streamingContent")
args = _normalize_cursor_args(cursor_name, args)
segments.append(_json_seg(args))
except (json.JSONDecodeError, TypeError):
segments.append(_text_seg(raw_args_str))
sid = call_id or cursor_name
events.append(_event("tool_call", "assistant", "actions", ts, sid,
seq.next(sid), turn, segments, tool=tool_info, raw=tfd))
# Emit tool result if available
result_str = tfd.get("result")
if result_str is not None:
result_tool = dict(tool_info)
status = tfd.get("status", "")
if status == "error":
result_tool["status"] = "error"
elif status in ("completed", "ok", "success"):
result_tool["status"] = "ok"
else:
result_tool["status"] = "unknown"
result_segments: list[dict] = []
annotations: dict[str, Any] = {}
if isinstance(result_str, str) and result_str:
try:
parsed = json.loads(result_str)
if isinstance(parsed, dict):
out = (parsed.get("contents")
or parsed.get("output")
or parsed.get("text")
or "")
if out:
_add_text_segment(result_segments, str(out), annotations,
_MAX_TOOL_OUTPUT_CHARS)
_add_json_and_text_segments(result_segments, parsed, annotations,
_MAX_TOOL_OUTPUT_CHARS)
else:
_add_text_segment(result_segments, result_str, annotations,
_MAX_TOOL_OUTPUT_CHARS)
except (json.JSONDecodeError, TypeError):
_add_text_segment(result_segments, result_str, annotations,
_MAX_TOOL_OUTPUT_CHARS)
events.append(_event("tool_result", "tool", "actions", ts, sid,
seq.next(sid), turn, result_segments,
tool=result_tool, annotations=annotations, raw=tfd))
return events
def _parse_cursor_session(lines: list[str]) -> tuple[dict | None, list[dict], list[dict]]:
"""Parse a cursor-ide JSONL file into metadata, bubbles, and transcript entries.
A single file may contain any combination of:
- ``"type": "metadata"`` — session metadata (workspace, composer ID)
- ``"type": "bubble"`` — cursorDiskKV bubble data (tool calls, text)
- ``"type": "transcript"`` — agent-transcript lines (full reasoning text)
Returns (metadata, bubbles, transcript_entries).
"""
metadata = None
bubbles: list[dict] = []
transcript: list[dict] = []
for raw_line in lines:
raw_line = raw_line.strip()
if not raw_line:
continue
try:
payload = json.loads(raw_line)
except json.JSONDecodeError:
continue
if not isinstance(payload, dict):
continue
ptype = payload.get("type")
if ptype == "metadata":
metadata = payload
elif ptype == "bubble":
raw = payload.get("raw", {})
bubbles.append({
"bubble_id": payload.get("bubble_id", ""),
"bubble_type": raw.get("type"), # 1=user, 2=assistant
"text": (raw.get("text") or raw.get("rawText") or "").strip(),
"ts": _iso(raw.get("timestamp") or raw.get("createdAt")),
"tfd": raw.get("toolFormerData") if isinstance(raw.get("toolFormerData"), dict) else None,
"thinking": (raw.get("thinking") if isinstance(raw.get("thinking"), str) else "").strip(),
"raw": raw,
})
elif ptype == "transcript":
role = payload.get("role", "")
text_parts = []
for item in payload.get("message", {}).get("content", []):
if isinstance(item, dict) and item.get("type") == "text":
text_parts.append(item.get("text", ""))
text = "\n".join(text_parts).strip()
if not text:
continue
# Strip <user_query> wrapper
if role == "user" and "<user_query>" in text:
m = re.search(r"<user_query>\s*(.*?)\s*</user_query>", text, re.DOTALL)
if m:
text = m.group(1)
transcript.append({"role": role, "text": text})
return metadata, bubbles, transcript
def _text_prefix(text: str, length: int = 60) -> str:
"""Normalize text prefix for fuzzy matching."""
return " ".join(text.split())[:length].lower()
def normalize_cursor_ide(lines: list[str]) -> Iterator[dict]:
"""Normalize cursor-ide session, merging bubbles and transcript when both exist.
A single JSONL file may contain bubble records (from cursorDiskKV) and/or
transcript records (from agent-transcripts), combined at collection time.
When both are present:
- Transcript provides the message text (full reasoning, no truncation)
- Bubbles provide structured tool calls (name, args, results)
- Events are interleaved: message → tool calls → message → tool calls ...
When only bubbles are available (older Cursor): uses bubble text + tool data.
When only transcript is available: text-only messages, no tool events.
"""
seq = _Sequencer()
metadata, bubbles, transcript = _parse_cursor_session(lines)
# Emit system init from metadata
if metadata:
sid = "system_init"
parts = []
if metadata.get("workspace_path"):
parts.append(f"Workspace: {metadata['workspace_path']}")
if metadata.get("composer_id"):
parts.append(f"Composer: {metadata['composer_id']}")
text = "\n".join(parts) if parts else "Session init"
ts = _iso(metadata.get("composer_data_raw", {}).get("createdAt"))
yield _event("system", "system", "status", ts, sid, seq.next(sid), 1,
[_text_seg(text)], raw=metadata)
# --- Merged path: both bubbles and transcript ---
if bubbles and transcript:
# Build index of tool events between text/user bubbles.
# Walk the bubble list and group: each text/user bubble starts a group,
# tool bubbles following it belong to that group.
#
# Bubble types:
# user (type=1, has text) → user message
# assistant text (type=2, has text, no tfd) → assistant message
# assistant tool (type=2, has tfd) → tool call
# assistant thinking (type=2, has thinking, no text, no tfd) → skip
# Separate bubbles into text entries and tool groups
text_bubbles: list[dict] = [] # user + assistant text bubbles in order
tool_groups: list[list[dict]] = [] # tool bubbles following each text bubble
current_tools: list[dict] = []
for b in bubbles:
is_user = b["bubble_type"] == 1
is_text = b["bubble_type"] == 2 and b["text"] and not b["tfd"]
is_tool = b["bubble_type"] == 2 and b["tfd"]
if is_user or is_text:
# Flush accumulated tools to the previous text bubble's group
if text_bubbles:
tool_groups.append(current_tools)
current_tools = []
text_bubbles.append(b)
elif is_tool:
current_tools.append(b)
# Flush last group
if text_bubbles:
tool_groups.append(current_tools)
# Match transcript entries to text bubbles by content prefix.
# Build a map: transcript_index → bubble_index
tb_prefixes = [_text_prefix(b["text"]) for b in text_bubbles]
tr_prefixes = [_text_prefix(t["text"]) for t in transcript]
# Greedy forward matching: for each transcript entry, find the next
# unmatched text bubble whose prefix matches.
tr_to_tb: dict[int, int] = {}
next_tb = 0
for ti, tp in enumerate(tr_prefixes):
for bi in range(next_tb, len(tb_prefixes)):
if tb_prefixes[bi] == tp:
tr_to_tb[ti] = bi
next_tb = bi + 1
break
# Emit events: walk transcript entries, emit message from transcript text,
# then emit any tool events from the matched bubble's tool group.
turn = 0
for ti, tr_entry in enumerate(transcript):
if tr_entry["role"] == "user":
turn += 1
bi = tr_to_tb.get(ti)
# Use bubble timestamp if matched, else current time
ts = text_bubbles[bi]["ts"] if bi is not None else _iso(None)
actor = "user" if tr_entry["role"] == "user" else "assistant"
sid = text_bubbles[bi]["bubble_id"] if bi is not None else f"{actor}_{turn}"
yield _event("message", actor, "output", ts, sid, seq.next(sid),
max(turn, 1), [_text_seg(tr_entry["text"])])
# Emit tool calls from the matched bubble's tool group
if bi is not None and bi < len(tool_groups):
for tool_bubble in tool_groups[bi]:
if tool_bubble["tfd"] and tool_bubble["tfd"].get("name"):
yield from _parse_cursor_tool_event(
tool_bubble["tfd"], tool_bubble["ts"],
max(turn, 1), seq)
# Emit any remaining tool events from unmatched bubble groups (e.g. tools
# after the last text bubble that didn't match a transcript entry)
matched_bis = set(tr_to_tb.values())
for bi, tb in enumerate(text_bubbles):
if bi not in matched_bis and bi < len(tool_groups):
for tool_bubble in tool_groups[bi]:
if tool_bubble["tfd"] and tool_bubble["tfd"].get("name"):
yield from _parse_cursor_tool_event(
tool_bubble["tfd"], tool_bubble["ts"],
max(turn, 1), seq)
return
# --- Bubbles only (older Cursor, no transcript available) ---
if bubbles:
turn = 0
for b in bubbles:
if b["bubble_type"] == 1:
turn += 1
if b["text"]:
bid = b["bubble_id"] or f"user_{turn}"
yield _event("message", "user", "output", b["ts"], bid,
seq.next(bid), turn, [_text_seg(b["text"])], raw=b["raw"])
elif b["bubble_type"] == 2:
if b["text"] and not b["tfd"]:
bid = b["bubble_id"] or f"assistant_{turn}"
yield _event("message", "assistant", "output", b["ts"], bid,
seq.next(bid), max(turn, 1), [_text_seg(b["text"])],
raw=b["raw"])
if b["tfd"] and b["tfd"].get("name"):
yield from _parse_cursor_tool_event(
b["tfd"], b["ts"], max(turn, 1), seq)
return
# --- Transcript only (no cursorDiskKV data) ---
if transcript:
turn = 0
for tr_entry in transcript:
if tr_entry["role"] == "user":
turn += 1
actor = "user" if tr_entry["role"] == "user" else "assistant"
sid = f"{actor}_{turn}"
yield _event("message", actor, "output", _iso(None), sid,
seq.next(sid), max(turn, 1), [_text_seg(tr_entry["text"])])
# ---------------------------------------------------------------------------
# cursor-agent normalizer
# ---------------------------------------------------------------------------
def normalize_cursor_agent(lines: list[str]) -> Iterator[dict]:
"""Normalize cursor-agent JSONL (metadata + blob records)."""
seq = _Sequencer()
turn = 0
pending_tools: dict[str, dict] = {}
event_counter = 0
session_base_ts: datetime | None = None
session_ts_source: str | None = None
def next_ts() -> tuple[str, str]:
nonlocal event_counter, session_base_ts, session_ts_source
if session_base_ts is None:
session_base_ts = datetime.now(timezone.utc)
session_ts_source = "inferred"
ts = session_base_ts + timedelta(microseconds=event_counter)
event_counter += 1
return _iso(ts), session_ts_source or "inferred"
for raw_line in lines:
raw_line = raw_line.strip()
if not raw_line:
continue
try:
payload = json.loads(raw_line)
except json.JSONDecodeError:
continue
if not isinstance(payload, dict):
continue
ptype = payload.get("type")
if ptype == "metadata":
sid = "system_init"
parts = []
if payload.get("workspace_path"):
parts.append(f"Workspace: {payload['workspace_path']}")
if payload.get("session_id"):
parts.append(f"Session: {payload['session_id']}")
text = "\n".join(parts) if parts else "Session init"
created_at = payload.get("meta_raw", {}).get("createdAt")
if created_at is not None:
session_base_ts = datetime.fromtimestamp(
(created_at / 1000.0) if created_at > 1e12 else created_at,
tz=timezone.utc,
)
session_ts_source = "session_createdAt"
ts, ts_source = next_ts()
yield _event("system", "system", "status", ts, sid, seq.next(sid), 1,
[_text_seg(text)], annotations={"timestamp_source": ts_source}, raw=payload)
elif ptype == "blob" and payload.get("format") == "json":
raw = payload.get("raw", {})
if not isinstance(raw, dict):
continue
role = raw.get("role", "")
content = raw.get("content", "")
ts, ts_source = next_ts()
ts_annotations = {"timestamp_source": ts_source}
if role == "system":
sid = "system_msg"
text = content if isinstance(content, str) else str(content)
if text.strip():
yield _event("system", "system", "status", ts, sid, seq.next(sid),
max(turn, 1), [_text_seg(text[:500])],
annotations=ts_annotations, raw=raw)
elif role == "user":
turn += 1
# Content can be string, stringified list, or native list
text_parts = []
items = None
if isinstance(content, list):
items = content
elif isinstance(content, str):
try:
items = eval(content) # cursor stores as repr'd list
if not isinstance(items, list):
text_parts.append(content)
items = None
except Exception:
text_parts.append(content)
if isinstance(items, list):
for item in items:
if isinstance(item, dict):
if item.get("type") == "text":
t = item.get("text", "")
# Strip XML-like wrapper tags
t = re.sub(r"<user_query>\s*", "", t)
t = re.sub(r"\s*</user_query>", "", t)
t = re.sub(r"<user_info>.*?</user_info>", "", t, flags=re.DOTALL)
t = t.strip()
if t:
text_parts.append(t)
if text_parts:
sid = f"user_{turn}"
for tp in text_parts:
if tp.strip():
yield _event("message", "user", "output", ts, sid, seq.next(sid),
turn, [_text_seg(tp)], annotations=ts_annotations, raw=raw)
elif role == "assistant":
# Content can be string, stringified list, or native list
text_parts = []
tool_calls = []
items = None
if isinstance(content, list):
items = content
elif isinstance(content, str):
try:
items = eval(content)
if not isinstance(items, list):
text_parts.append(("output", content))
items = None
except Exception:
text_parts.append(("output", content))
if isinstance(items, list):
for item in items:
if isinstance(item, dict):
if item.get("type") == "text":
t = item.get("text", "")
# Separate thinking from output
think_match = re.match(r"<think>(.*?)</think>\s*(.*)", t, re.DOTALL)
if think_match:
thinking = think_match.group(1).strip()
rest = think_match.group(2).strip()
if thinking:
text_parts.append(("thinking", thinking))
if rest:
text_parts.append(("output", rest))
elif t.strip():
text_parts.append(("output", t.strip()))
elif item.get("type") in ("tool_use", "tool-call"):
tool_calls.append(item)
for channel, text in text_parts:
if text.strip():
sid = f"assistant_{turn}"
yield _event("message", "assistant", channel, ts, sid, seq.next(sid),
max(turn, 1), [_text_seg(text)],
annotations=ts_annotations, raw=raw)
for tc in tool_calls:
tc_id = tc.get("id", "") or tc.get("toolCallId", "")
tc_name = tc.get("name", "") or tc.get("toolName", "") or "unknown"
tc_input = tc.get("input", {}) or tc.get("args", {})
tool_info = {"provider": "builtin", "name": _slugify(tc_name),
"display_name": tc_name, "call_id": tc_id}
if tc_id:
pending_tools[tc_id] = dict(tool_info)
segments = [_json_seg(tc_input)] if tc_input else []
sid = tc_id or tc_name
yield _event("tool_call", "assistant", "actions", ts, sid, seq.next(sid),
max(turn, 1), segments, tool=tool_info,
annotations=ts_annotations, raw=tc)
elif role == "tool":
# Tool results — content can be string, stringified list, or native list
items = None
if isinstance(content, list):
items = content
elif isinstance(content, str):
try:
items = eval(content)
if not isinstance(items, list):
items = None
except Exception:
items = None
if isinstance(items, list):
for item in items:
if isinstance(item, dict) and item.get("type") == "tool-result":
tc_id = item.get("toolCallId", "")
tc_name = item.get("toolName", "unknown")
result = item.get("result", "")
tool_info = None
if tc_id and tc_id in pending_tools:
tool_info = dict(pending_tools[tc_id])
tool_info["status"] = "unknown"
else:
tool_info = {"provider": "builtin",
"name": _slugify(tc_name),
"display_name": tc_name,
"call_id": tc_id, "status": "unknown"}
segments: list[dict] = []
annotations: dict[str, Any] = dict(ts_annotations)
parsed_result = None
if isinstance(result, str) and result:
try:
parsed_result = json.loads(result)
_add_json_and_text_segments(segments, parsed_result,
annotations,
_MAX_TOOL_OUTPUT_CHARS)
except (json.JSONDecodeError, TypeError):
_add_text_segment(segments, result, annotations,
_MAX_TOOL_OUTPUT_CHARS)
elif result:
parsed_result = result
_add_json_and_text_segments(segments, result, annotations,
_MAX_TOOL_OUTPUT_CHARS)
if isinstance(parsed_result, dict):
if parsed_result.get("error") or parsed_result.get("stderr"):
tool_info["status"] = "error"
elif parsed_result.get("exit_code") is not None:
tool_info["status"] = "ok" if parsed_result.get("exit_code") == 0 else "error"
sid = tc_id or "tool_result"
yield _event("tool_result", "tool", "actions", ts, sid,
seq.next(sid), max(turn, 1), segments,
tool=tool_info, annotations=annotations, raw=item)
# ---------------------------------------------------------------------------
# Enrichment — add semantic `action` and `files` fields to tool_call events
# ---------------------------------------------------------------------------
_PATCH_FILE_RE = re.compile(r"\*\*\* (?:Update|Add|Delete) File: (.+)")
# Bash commands that are primarily file reads
_BASH_READ_RE = re.compile(
r"^(?:bash\s+-\w+\s+)?(?:cat|head|tail|less|bat|more)\s+"
r"|^(?:bash\s+-\w+\s+)?sed\s+-?n\s+"
)
# Bash commands that are file writes
_BASH_WRITE_RE = re.compile(
r"apply_patch\b|\*\*\* Begin Patch|^(?:bash\s+-\w+\s+)?(?:sed\s+-i|tee)\s+"
r"|cat\s*>\s*\S+|>\s*\S+\.\w+|<<['\"]?\w+"
)
# Bash commands that are file/content searches
_BASH_SEARCH_RE = re.compile(
r"^(?:bash\s+-\w+\s+)?(?:grep|rg|find|fd|ag|ack|ls)\s+"
)
# Extract file path from simple bash commands like `sed -n '1,200p' path/to/file`
# or `cat path/to/file` — take the last non-flag, non-range argument
_BASH_FILE_ARG_RE = re.compile(
r"(?:cat|head|tail|less|bat|more|sed\s+[^|]+?)\s+"
r"(?:'[^']*'\s+)*" # skip quoted args like '1,200p'
r"([^\s|><;'\"]+\.\w+)" # file path with extension
)
def _json_input_from_event(event: dict) -> dict | None:
"""Parse first JSON segment into a dict."""
for seg in event.get("segments", []):
if isinstance(seg, dict) and seg.get("type") == "json":
try:
parsed = json.loads(seg["data"])
return parsed if isinstance(parsed, dict) else None
except (json.JSONDecodeError, KeyError):
pass
return None
def _get_bash_command(event: dict) -> str | None:
"""Extract the bash command string from a bash/shell tool_call."""
inp = _json_input_from_event(event)
if not inp:
return None
cmd = inp.get("command") or inp.get("cmd")
if isinstance(cmd, list):
return cmd[-1] if cmd else None
return cmd if isinstance(cmd, str) else None
def _extract_files_from_tool_input(event: dict) -> list[str]:
"""Extract file paths from structured tool input (read/edit/write/grep/glob)."""
inp = _json_input_from_event(event)
if not inp:
# Cursor-IDE fallback: raw.uri
raw = event.get("raw", {})
if isinstance(raw, dict):
uri = raw.get("uri")
if isinstance(uri, dict):
fp = uri.get("_fsPath") or uri.get("path")
if fp:
return [fp]
return []
fp = inp.get("file_path") or inp.get("path") or inp.get("file")
if fp:
return [str(fp)]
return []
_HEREDOC_RE = re.compile(r"<<['\"]?(\w+)['\"]?\n(.*)", re.DOTALL)
def _extract_new_text(event: dict) -> str | None:
"""Extract written/edited content from a tool_call event.
Handles:
- Structured tool input: content, new_string, or contents field
- Bash heredoc: cat > file <<'DELIM'\\n...content...\\nDELIM
"""
inp = _json_input_from_event(event)
if not inp:
return None
# Structured write/edit tools
text = inp.get("new_string") or inp.get("content") or inp.get("contents")
if text and isinstance(text, str):
return text
# Bash heredoc (codex style: cat > file <<'PY'\n...code...\nPY)
cmd = inp.get("cmd") or inp.get("command")
if isinstance(cmd, str) and "<<" in cmd:
m = _HEREDOC_RE.search(cmd)
if m:
delim = m.group(1)
body = m.group(2)
end = body.rfind(delim)
if end > 0:
return body[:end].rstrip("\n")
return body
return None
_BASH_REDIRECT_RE = re.compile(r">\s*([^\s|><;'\"]+\.\w+)")
def _extract_files_from_bash(cmd: str) -> list[str]:
"""Extract file paths from bash commands."""
# apply_patch: extract from patch headers
patch_files = _PATCH_FILE_RE.findall(cmd)
if patch_files:
return [f.strip() for f in patch_files]
# Redirect writes: cat > file.html, echo > file.txt
m = _BASH_REDIRECT_RE.search(cmd)
if m:
return [m.group(1)]
# Simple commands: sed -n '1,200p' path/to/file, cat path/to/file
m = _BASH_FILE_ARG_RE.search(cmd)
if m:
return [m.group(1)]
return []
def _skill_name_from_id(skill_id: str) -> str:
"""Extract the bare skill name from a possibly-prefixed identifier.
Skill identifiers may be bare ("pdf"), tile-prefixed ("tessl__pdf"),
or fully qualified ("org__tile__skill-name"). The bare name is
everything after the last ``__`` separator, or the whole string if
there is no separator.
"""
if "__" in skill_id:
return skill_id.rsplit("__", 1)[-1]
return skill_id
# Regex to pull the skill name out of "Launching skill: <id>"
_LAUNCHING_SKILL_RE = re.compile(r"Launching skill:\s*(.+)")
_ACTIVATED_SKILL_RE = re.compile(r'<activated_skill\s+name="([^"]+)"')
# Regex to detect a SKILL.md read — captures the skill directory name
_SKILL_MD_PATH_RE = re.compile(r"(?:^|/)skills/([^/]+)/SKILL\.md$")
def _enrich_event(event: dict) -> dict:
"""Add semantic fields to normalized events.
For tool_call events: ``action``, ``files``, and skill activation fields.
For tool_result events: skill activation fields when tool is ``skill``.
Skill activation fields (added to both the tool_call and tool_result):
- ``skill_id`` — the identifier as passed to the tool (e.g. "tessl__pdf")
- ``skill_name`` — the bare name with any prefix stripped (e.g. "pdf")
- ``action`` — set to ``"skill_activate"`` on the tool_call
SKILL.md direct reads also get ``skill_name`` so verifiers can detect
both activation paths with a single field check.
"""
kind = event.get("kind")
tool = event.get("tool")
if not isinstance(tool, dict):
return event
name = tool.get("name", "")
# ---- Skill activation (tool_call) ----
if kind == "tool_call" and name == "skill":
inp = _json_input_from_event(event)
if inp:
sid = inp.get("skill", "") or inp.get("name", "")
if sid:
event["action"] = "skill_activate"
event["skill_id"] = sid
event["skill_name"] = _skill_name_from_id(sid)
return event
# ---- Skill activation (tool_result) ----
if kind == "tool_result" and name == "skill":
# Extract from "Launching skill: <id>" or <activated_skill name="..."> in text segments
for seg in event.get("segments", []):
if isinstance(seg, dict) and seg.get("type") == "text":
text = seg.get("data", "")
m = _LAUNCHING_SKILL_RE.search(text) or _ACTIVATED_SKILL_RE.search(text)
if m:
sid = m.group(1).strip()
event["skill_id"] = sid
event["skill_name"] = _skill_name_from_id(sid)
break
return event
# ---- Only tool_calls from here on ----
if kind != "tool_call":
return event
action = None
files: list[str] = []
if name in ("read",):
action = "file_read"
files = _extract_files_from_tool_input(event)
# Detect direct SKILL.md reads
for fp in files:
sm = _SKILL_MD_PATH_RE.search(fp)
if sm:
event["skill_name"] = sm.group(1)
break
elif name in ("edit", "write"):
action = "file_write"
files = _extract_files_from_tool_input(event)
new_text = _extract_new_text(event)
if new_text:
event["new_text"] = new_text
elif name in ("grep",):
action = "file_search"
files = _extract_files_from_tool_input(event)
elif name in ("glob",):
action = "file_search"
files = _extract_files_from_tool_input(event)
elif name in ("bash", "shell", "shell_command", "exec_command"):
cmd = _get_bash_command(event)
if cmd:
if _BASH_WRITE_RE.search(cmd):
action = "file_write"
files = _extract_files_from_bash(cmd)
new_text = _extract_new_text(event)
if new_text:
event["new_text"] = new_text
elif _BASH_READ_RE.search(cmd):
action = "file_read"
files = _extract_files_from_bash(cmd)
# Detect SKILL.md reads via bash (e.g. cat .../SKILL.md)
for fp in files:
sm = _SKILL_MD_PATH_RE.search(fp)
if sm:
event["skill_name"] = sm.group(1)
break
elif _BASH_SEARCH_RE.search(cmd):
action = "file_search"
files = _extract_files_from_bash(cmd)
else:
action = "command_run"
else:
action = "command_run"
elif name == "task":
action = "command_run"
if action:
event["action"] = action
if files:
event["files"] = files
return event
# ---------------------------------------------------------------------------
# Orchestrator
# ---------------------------------------------------------------------------
NORMALIZERS = {
"claude-code": ("jsonl", normalize_claude_code),
"codex": ("jsonl", normalize_codex),
"gemini": ("json", normalize_gemini),
"cursor-ide": ("jsonl", normalize_cursor_ide),
"cursor-agent": ("jsonl", normalize_cursor_agent),
}
def normalize_file(agent: str, raw_path: Path, out_path: Path) -> int:
"""Normalize a single file. Returns event count."""
fmt, normalizer = NORMALIZERS[agent]
# Security: redact secrets from raw text before any parsing so that
# credentials never appear in normalized output or downstream stages.
raw_text = redact_secrets(raw_path.read_text(encoding="utf-8", errors="replace"))
if fmt == "jsonl":
events = list(normalizer(raw_text.splitlines()))
else:
events = list(normalizer(json.loads(raw_text)))
# Enrich with semantic action/files fields
events = [_enrich_event(ev) for ev in events]
out_path.parent.mkdir(parents=True, exist_ok=True)
with open(out_path, "w", encoding="utf-8") as f:
for ev in events:
f.write(json.dumps(ev, ensure_ascii=False) + "\n")
return len(events)
def _discover_projects(base_dir: Path) -> list[str]:
"""Find all project directories under base_dir that have a raw/ subdirectory."""
projects = []
if not base_dir.is_dir():
return projects
for d in sorted(base_dir.iterdir()):
if d.is_dir() and (d / "raw").is_dir():
projects.append(d.name)
return projects
def _normalize_project(raw_dir: Path, out_dir: Path, agents: list[str] | None,
refresh: bool) -> tuple[int, int]:
"""Normalize one project's raw logs. Returns (files, events)."""
total_files = 0
total_events = 0
for agent_dir in sorted(raw_dir.iterdir()):
if not agent_dir.is_dir():
continue
agent = agent_dir.name
if agents and agent not in agents:
continue
if agent not in NORMALIZERS:
print(f" {agent}: no normalizer available (skipping)")
continue
fmt = NORMALIZERS[agent][0]
glob_pattern = "*.jsonl" if fmt == "jsonl" else "*.json"
for raw_file in sorted(agent_dir.glob(glob_pattern)):
out_file = out_dir / agent / raw_file.with_suffix(".jsonl").name
if out_file.exists() and not refresh:
continue
try:
count = normalize_file(agent, raw_file, out_file)
total_files += 1
total_events += count
print(f" {agent}/{raw_file.name}: {count} events")
except Exception as e:
print(f" {agent}/{raw_file.name}: ERROR {e}", file=sys.stderr)
return total_files, total_events
def main():
parser = argparse.ArgumentParser(description="Normalize raw agent logs")
parser.add_argument("--project", default=None,
help="Project name (default: auto-discover all projects)")
parser.add_argument("--cwd", default=os.getcwd(),
help="Project directory (default: cwd)")
parser.add_argument("--audit-dir", default=None,
help="Output directory for audit data (default: <cwd>/.tessl/logs)")
parser.add_argument("--base-dir", default=None, dest="audit_dir_compat",
help=argparse.SUPPRESS)
parser.add_argument("--raw-dir", default=None,
help="Raw logs directory (overrides --base-dir/--project)")
parser.add_argument("--out-dir", default=None,
help="Output directory (overrides --base-dir/--project)")
parser.add_argument("--agents", nargs="*", default=None,
help="Only normalize these agents (default: all found)")
parser.add_argument("--refresh", action="store_true",
help="Re-normalize even if output exists")
args = parser.parse_args()
cwd = os.path.realpath(args.cwd)
audit_dir = args.audit_dir or args.audit_dir_compat
base_dir = Path(audit_dir) if audit_dir else Path(cwd) / ".tessl" / "logs"
# If explicit dirs given, use them directly (backwards compat)
if args.raw_dir or args.out_dir:
raw_dir = Path(args.raw_dir or f"{base_dir}/raw")
out_dir = Path(args.out_dir or f"{base_dir}/normalized")
if not raw_dir.exists():
print(f"Raw logs not found: {raw_dir}")
return
files, events = _normalize_project(raw_dir, out_dir, args.agents, args.refresh)
print(f"\nNormalized {files} files, {events} events -> {out_dir}")
return
# Project mode
if args.project:
projects = [args.project]
else:
projects = _discover_projects(base_dir)
if not projects:
print(f"No projects found under {base_dir}/ (looking for <project>/raw/)")
return
total_files = 0
total_events = 0
for project in projects:
raw_dir = base_dir / project / "raw"
out_dir = base_dir / project / "normalized"
if not raw_dir.exists():
print(f"Project '{project}': raw dir not found ({raw_dir})")
continue
print(f"── {project} ──")
files, events = _normalize_project(raw_dir, out_dir, args.agents, args.refresh)
total_files += files
total_events += events
print(f"\nNormalized {total_files} files, {total_events} events")
if __name__ == "__main__":
main()