CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl-labs/audit-logs

Collect and normalize agent logs, discover installed verifiers, and dispatch LLM judges to evaluate adherence. Produces per-session verdicts and aggregated reports.

91

3.09x
Quality

90%

Does it follow best practices?

Impact

96%

3.09x

Average score across 3 eval scenarios

SecuritybySnyk

Passed

No known issues

Overview
Quality
Evals
Security
Files

normalize_logs.pyskills/audit-logs/scripts/

#!/usr/bin/env python3
"""
Normalize raw agent logs to NormalizedEvent JSONL format.

Self-contained — no external dependencies beyond stdlib.
Supports: claude-code, codex, gemini, cursor-ide, cursor-agent.

Security: All raw log content is passed through redact_secrets() before parsing
to strip credentials (API keys, JWTs, private keys, database URLs, etc.).
Downstream pipeline stages (prepare_sessions, dispatch_judges, generate_report)
only ever read from the normalized output, so secrets never reach LLM judges or
HTML reports.

Input:  .tessl/logs/<project>/raw/<agent>/*.(jsonl|json)
Output: .tessl/logs/<project>/normalized/<agent>/*.jsonl

Each output line is a JSON object with:
  kind:      message | tool_call | tool_result | usage | system | error
  turn:      int >= 1
  timestamp: ISO 8601 with trailing Z
  stream_id: string
  sequence:  int >= 0
  channel:   output | thinking | actions | status
  actor:     user | assistant | tool | system
  segments:  [{type, data}]
  tool:      {provider, name, display_name, call_id, status} (optional)
  annotations: {}
  raw:       original payload (optional)
"""

import argparse
import json
import os
import re
import sys
from collections import defaultdict
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Any, Iterator

# ---------------------------------------------------------------------------
# Secret redaction (applied to raw text before parsing)
# ---------------------------------------------------------------------------

_REDACTION_PATTERNS: list[tuple[re.Pattern, str]] = [
    # Private keys (PEM blocks)
    (re.compile(
        r"-----BEGIN [A-Z ]*PRIVATE KEY-----[\s\S]*?-----END [A-Z ]*PRIVATE KEY-----"
    ), "[REDACTED]"),
    # JWT tokens
    (re.compile(
        r"eyJ[A-Za-z0-9_-]+\.eyJ[A-Za-z0-9_-]+\.[A-Za-z0-9_-]+"
    ), "[REDACTED]"),
    # Database URLs with credentials
    (re.compile(
        r"(postgres|mysql|mongodb)://[^\s'\"]+:[^\s'\"]+@[^\s'\"]+"
    ), "[REDACTED]"),
    # AWS access keys
    (re.compile(r"AKIA[0-9A-Z]{16}"), "[REDACTED]"),
    # Authorization Bearer tokens
    (re.compile(
        r"Authorization:\s*Bearer\s+[A-Za-z0-9_-]+"
    ), "Authorization: Bearer [REDACTED]"),
    # API keys (sk_*, pk_* style — Stripe, OpenAI, etc.)
    (re.compile(r"(sk|pk)_[a-z]+_[A-Za-z0-9]{20,}"), "[REDACTED]"),
]


def redact_secrets(text: str) -> str:
    """Apply built-in redaction patterns to remove secrets from raw text."""
    for pattern, replacement in _REDACTION_PATTERNS:
        text = pattern.sub(replacement, text)
    return text


# ---------------------------------------------------------------------------
# Shared helpers
# ---------------------------------------------------------------------------

_CAMEL = re.compile(r"(?<!^)(?=[A-Z])")
_NON_ALNUM = re.compile(r"[^a-z0-9]+")


def _slugify(value: str) -> str:
    lowered = _CAMEL.sub("_", value.strip()).lower()
    return _NON_ALNUM.sub("_", lowered).strip("_")


def _iso_from_epoch(value: int | float) -> str:
    if value > 1e12:  # assume milliseconds
        value = value / 1000.0
    dt = datetime.fromtimestamp(value, tz=timezone.utc)
    return dt.isoformat().replace("+00:00", "Z")


def _iso(ts: str | int | float | datetime | None) -> str:
    """Normalise to ISO-8601 with trailing Z."""
    if ts is None:
        return datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
    if isinstance(ts, datetime):
        if ts.tzinfo is None:
            ts = ts.replace(tzinfo=timezone.utc)
        return ts.astimezone(timezone.utc).isoformat().replace("+00:00", "Z")
    if isinstance(ts, (int, float)):
        return _iso_from_epoch(ts)
    s = str(ts).strip()
    if s.isdigit():
        return _iso_from_epoch(int(s))
    if s.endswith("+00:00"):
        s = s[:-6] + "Z"
    if not s.endswith("Z"):
        s += "Z"
    return s


class _Sequencer:
    """Track per-stream sequence numbers."""

    def __init__(self):
        self._seqs: defaultdict[str, int] = defaultdict(int)

    def next(self, stream_id: str) -> int:
        seq = self._seqs[stream_id]
        self._seqs[stream_id] = seq + 1
        return seq


def _event(
    kind: str,
    actor: str,
    channel: str,
    timestamp: str,
    stream_id: str,
    sequence: int,
    turn: int,
    segments: list[dict],
    *,
    tool: dict | None = None,
    annotations: dict | None = None,
    usage: dict | None = None,
    raw: dict | None = None,
) -> dict:
    d: dict[str, Any] = {
        "kind": kind,
        "turn": turn,
        "timestamp": timestamp,
        "stream_id": stream_id,
        "sequence": sequence,
        "channel": channel,
        "actor": actor,
        "segments": segments,
        "annotations": annotations or {},
    }
    if tool:
        d["tool"] = tool
    if usage:
        d["usage"] = usage
    if raw:
        d["raw"] = raw
    return d


def _parse_tool_name(raw_name: str) -> dict:
    """Parse a tool name into {provider, namespace, name, display_name}."""
    if raw_name.startswith("mcp__"):
        remainder = raw_name[5:]
        parts = remainder.split("__", 1)
        if len(parts) == 2:
            ns, n = parts
        else:
            ns, n = "default", parts[0]
        return {
            "provider": "mcp",
            "namespace": _slugify(ns),
            "name": _slugify(n),
            "display_name": raw_name,
        }
    return {
        "provider": "builtin",
        "name": _slugify(raw_name),
        "display_name": raw_name,
    }


def _text_seg(text: str) -> dict:
    return {"type": "text", "data": text}


def _json_seg(obj: Any) -> dict:
    return {"type": "json", "data": json.dumps(obj, ensure_ascii=False, separators=(",", ":"))}


_MAX_TOOL_OUTPUT_CHARS = 5000


def _add_text_segment(segments: list[dict], text: str, annotations: dict, max_chars: int) -> None:
    if not text:
        return
    if len(text) > max_chars:
        annotations["truncated"] = True
        annotations["truncated_max_chars"] = max_chars
        text = text[:max_chars]
    segments.append(_text_seg(text))


def _add_json_and_text_segments(
    segments: list[dict], obj: Any, annotations: dict, max_chars: int
) -> None:
    segments.append(_json_seg(obj))
    try:
        text = json.dumps(obj, ensure_ascii=False)
    except TypeError:
        text = str(obj)
    _add_text_segment(segments, text, annotations, max_chars)


# ---------------------------------------------------------------------------
# claude-code normalizer
# ---------------------------------------------------------------------------


def normalize_claude_code(lines: list[str]) -> Iterator[dict]:
    """Normalize claude-code raw JSONL.

    Fixes the upstream gap: user text messages are now emitted.
    """
    seq = _Sequencer()
    turn = 0
    pending_tools: dict[str, dict] = {}  # call_id -> tool dict
    event_counter = 0
    base_ts: str | None = None

    def next_ts() -> str:
        nonlocal event_counter, base_ts
        if base_ts is None:
            base_ts = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
        # Add microsecond offset to preserve ordering
        ts = base_ts[:-1] + f"{event_counter:06d}Z" if base_ts.endswith("Z") else base_ts
        event_counter += 1
        return ts

    for raw_line in lines:
        raw_line = raw_line.strip()
        if not raw_line:
            continue
        try:
            payload = json.loads(raw_line)
        except json.JSONDecodeError:
            continue
        if not isinstance(payload, dict):
            continue

        entry_type = payload.get("type")

        # Use timestamp from payload if available
        ts = _iso(payload.get("timestamp")) if payload.get("timestamp") else next_ts()

        if entry_type == "system":
            subtype = payload.get("subtype")
            annotations = {"system_subtype": subtype} if subtype else {}
            if subtype == "init":
                parts = []
                if payload.get("model"):
                    parts.append(f"Model: {payload['model']}")
                if payload.get("claude_code_version"):
                    parts.append(f"Version: {payload['claude_code_version']}")
                if payload.get("permissionMode"):
                    parts.append(f"Permission Mode: {payload['permissionMode']}")
                text = "\n".join(parts)
                if not text:
                    continue
                sid = "system_init"
                yield _event("system", "system", "status", ts, sid, seq.next(sid), max(turn, 1),
                             [_text_seg(text)], annotations=annotations, raw=payload)
            else:
                text = payload.get("message") or payload.get("text") or ""
                if not text and subtype:
                    text = f"System event: {subtype}"
                if not text:
                    continue
                sid = f"system_{subtype or 'event'}"
                yield _event("system", "system", "status", ts, sid, seq.next(sid), max(turn, 1),
                             [_text_seg(text)], annotations=annotations, raw=payload)

        elif entry_type == "user":
            turn += 1
            message = payload.get("message") or {}
            content = message.get("content")

            # Handle string content (user text message)
            if isinstance(content, str) and content.strip():
                msg_id = payload.get("uuid") or f"user_msg_{turn}"
                yield _event("message", "user", "output", ts, msg_id, seq.next(msg_id), turn,
                             [_text_seg(content)], raw={"type": "text", "text": content})

            # Handle list content (tool results + possibly text)
            elif isinstance(content, list):
                for item in content:
                    if not isinstance(item, dict):
                        # Plain string item in content list
                        if isinstance(item, str) and item.strip():
                            msg_id = payload.get("uuid") or f"user_msg_{turn}"
                            yield _event("message", "user", "output", ts, msg_id, seq.next(msg_id),
                                         turn, [_text_seg(item)],
                                         raw={"type": "text", "text": item})
                        continue

                    item_type = item.get("type")

                    if item_type == "text":
                        text = item.get("text", "")
                        if text.strip():
                            msg_id = payload.get("uuid") or f"user_msg_{turn}"
                            yield _event("message", "user", "output", ts, msg_id,
                                         seq.next(msg_id), turn, [_text_seg(text)],
                                         raw=item)

                    elif item_type == "tool_result":
                        tool_use_id = item.get("tool_use_id")
                        is_error = item.get("is_error", False)
                        content_data = item.get("content", "")

                        # Resolve pending tool call
                        tool_info = None
                        if tool_use_id and tool_use_id in pending_tools:
                            tool_info = dict(pending_tools[tool_use_id])
                            tool_info["status"] = "error" if is_error else "ok"

                        segments: list[dict] = []
                        annotations: dict[str, Any] = {}
                        if isinstance(content_data, str) and content_data:
                            _add_text_segment(segments, content_data, annotations, _MAX_TOOL_OUTPUT_CHARS)
                        elif isinstance(content_data, list):
                            for block in content_data:
                                if isinstance(block, dict):
                                    if block.get("type") == "text" and block.get("text"):
                                        _add_text_segment(segments, block["text"], annotations,
                                                          _MAX_TOOL_OUTPUT_CHARS)
                                    else:
                                        _add_json_and_text_segments(segments, block, annotations,
                                                                    _MAX_TOOL_OUTPUT_CHARS)
                                else:
                                    _add_text_segment(segments, str(block), annotations,
                                                      _MAX_TOOL_OUTPUT_CHARS)

                        sid = tool_use_id or "tool_result"
                        yield _event("tool_result", "tool", "actions", ts, sid, seq.next(sid),
                                     turn, segments, tool=tool_info, annotations=annotations, raw=item)

        elif entry_type == "assistant":
            message = payload.get("message") or {}
            content = message.get("content") or []
            message_id = message.get("id") or f"assistant_msg_{turn}"

            for item in content:
                if not isinstance(item, dict):
                    continue
                item_type = item.get("type")

                if item_type == "text":
                    text = item.get("text")
                    if text:
                        yield _event("message", "assistant", "output", ts, message_id,
                                     seq.next(message_id), max(turn, 1), [_text_seg(text)],
                                     raw=item)

                elif item_type == "tool_use":
                    tool_id = item.get("id")
                    tool_name = item.get("name", "unknown")
                    tool_input = item.get("input", {})

                    tool_info = _parse_tool_name(tool_name)
                    tool_info["call_id"] = tool_id
                    if tool_id:
                        pending_tools[tool_id] = dict(tool_info)

                    segments = []
                    if tool_input:
                        segments.append(_json_seg(tool_input))

                    sid = tool_id or tool_name
                    yield _event("tool_call", "assistant", "actions", ts, sid, seq.next(sid),
                                 max(turn, 1), segments, tool=tool_info, raw=item)

        elif entry_type == "progress":
            # Subagent streaming events.  data.message has the same
            # structure as top-level user/assistant events.
            data = payload.get("data") or {}
            msg = data.get("message") or {}
            msg_type = msg.get("type")  # "assistant" or "user"
            inner_content = (msg.get("message") or {}).get("content") or []
            if not isinstance(inner_content, list) or not msg_type:
                continue

            parent_tool_id = payload.get("parentToolUseID", "")
            subagent_ann = {"subagent": True}
            if parent_tool_id:
                subagent_ann["parent_tool_use_id"] = parent_tool_id

            if msg_type == "assistant":
                for item in inner_content:
                    if not isinstance(item, dict):
                        continue
                    if item.get("type") == "tool_use":
                        tool_id = item.get("id")
                        tool_name = item.get("name", "unknown")
                        tool_input = item.get("input", {})

                        tool_info = _parse_tool_name(tool_name)
                        tool_info["call_id"] = tool_id
                        if tool_id:
                            pending_tools[tool_id] = dict(tool_info)

                        segments = []
                        if tool_input:
                            segments.append(_json_seg(tool_input))

                        sid = tool_id or tool_name
                        yield _event("tool_call", "assistant", "actions", ts,
                                     sid, seq.next(sid), max(turn, 1), segments,
                                     tool=tool_info, annotations=subagent_ann,
                                     raw=item)

            elif msg_type == "user":
                for item in inner_content:
                    if not isinstance(item, dict):
                        continue
                    if item.get("type") == "tool_result":
                        tool_use_id = item.get("tool_use_id")
                        is_error = item.get("is_error", False)
                        content_data = item.get("content", "")

                        tool_info = None
                        if tool_use_id and tool_use_id in pending_tools:
                            tool_info = dict(pending_tools[tool_use_id])
                            tool_info["status"] = "error" if is_error else "ok"

                        segments: list[dict] = []
                        annotations: dict[str, Any] = dict(subagent_ann)
                        if isinstance(content_data, str) and content_data:
                            _add_text_segment(segments, content_data, annotations,
                                              _MAX_TOOL_OUTPUT_CHARS)
                        elif isinstance(content_data, list):
                            for block in content_data:
                                if isinstance(block, dict):
                                    if (block.get("type") == "text"
                                            and block.get("text")):
                                        _add_text_segment(
                                            segments, block["text"], annotations,
                                            _MAX_TOOL_OUTPUT_CHARS)
                                    else:
                                        _add_json_and_text_segments(
                                            segments, block, annotations,
                                            _MAX_TOOL_OUTPUT_CHARS)
                                else:
                                    _add_text_segment(segments, str(block),
                                                      annotations,
                                                      _MAX_TOOL_OUTPUT_CHARS)

                        sid = tool_use_id or "tool_result"
                        yield _event("tool_result", "tool", "actions", ts,
                                     sid, seq.next(sid), turn, segments,
                                     tool=tool_info, annotations=annotations,
                                     raw=item)

        elif entry_type == "result":
            usage_data: dict[str, Any] = {}
            if payload.get("usage"):
                usage_data.update(payload["usage"])
            if payload.get("modelUsage"):
                usage_data["model_usage"] = payload["modelUsage"]
            if payload.get("total_cost_usd") is not None:
                usage_data["cost_usd"] = payload["total_cost_usd"]
            if payload.get("duration_ms") is not None:
                usage_data["duration_ms"] = payload["duration_ms"]

            segments = []
            if payload.get("result"):
                segments.append(_text_seg(payload["result"]))

            sid = "turn_result"
            yield _event("usage", "system", "status", ts, sid, seq.next(sid), max(turn, 1),
                         segments, usage=usage_data or None, raw=payload)


# ---------------------------------------------------------------------------
# codex normalizer
# ---------------------------------------------------------------------------


def _unwrap_codex_rs(lines: list[str]) -> list[str]:
    """Unwrap codex_cli_rs wrapper format to legacy flat format.

    The new format wraps each entry in {type, timestamp, payload}.
    - response_item → payload is the legacy entry (message, function_call, etc.)
    - event_msg → map user_message/agent_message to legacy message format
    - session_meta → map to legacy session init format
    """
    # Detect format: if first parseable line has type=session_meta or response_item
    is_new = False
    for raw_line in lines:
        raw_line = raw_line.strip()
        if not raw_line:
            continue
        try:
            data = json.loads(raw_line)
        except json.JSONDecodeError:
            continue
        if isinstance(data, dict) and data.get("type") in ("session_meta", "response_item", "event_msg"):
            is_new = True
        break

    if not is_new:
        return lines

    unwrapped = []
    for raw_line in lines:
        raw_line = raw_line.strip()
        if not raw_line:
            continue
        try:
            data = json.loads(raw_line)
        except json.JSONDecodeError:
            continue
        if not isinstance(data, dict):
            continue

        wrapper_type = data.get("type")
        ts = data.get("timestamp")
        payload = data.get("payload", {})
        if not isinstance(payload, dict):
            continue

        if wrapper_type == "session_meta":
            # Convert to legacy session init format
            entry = dict(payload)
            if ts and "timestamp" not in entry:
                entry["timestamp"] = ts
            unwrapped.append(json.dumps(entry))

        elif wrapper_type == "response_item":
            # Payload is already in legacy format (message, function_call, etc.)
            entry = dict(payload)
            if ts and "timestamp" not in entry:
                entry["timestamp"] = ts
            unwrapped.append(json.dumps(entry))

        elif wrapper_type == "event_msg":
            ptype = payload.get("type", "")
            if ptype == "user_message":
                entry = {
                    "type": "message",
                    "role": "user",
                    "timestamp": ts,
                    "content": [{"type": "input_text", "text": payload.get("message", "")}],
                }
                unwrapped.append(json.dumps(entry))
            elif ptype == "agent_message":
                entry = {
                    "type": "message",
                    "role": "assistant",
                    "timestamp": ts,
                    "content": [{"type": "output_text", "text": payload.get("text", "")}],
                }
                unwrapped.append(json.dumps(entry))
            elif ptype == "agent_reasoning":
                entry = {
                    "type": "reasoning",
                    "timestamp": ts,
                    "summary": [{"text": payload.get("text", "")}],
                }
                unwrapped.append(json.dumps(entry))
            # Skip token_count, task_started, task_complete, etc.

    return unwrapped


def normalize_codex(lines: list[str]) -> Iterator[dict]:
    """Normalize codex (OpenAI CLI) raw JSONL."""
    lines = _unwrap_codex_rs(lines)
    seq = _Sequencer()
    turn = 0
    pending_tools: dict[str, dict] = {}

    for raw_line in lines:
        raw_line = raw_line.strip()
        if not raw_line:
            continue
        try:
            payload = json.loads(raw_line)
        except json.JSONDecodeError:
            continue
        if not isinstance(payload, dict):
            continue

        record_type = payload.get("record_type")
        if record_type == "state":
            continue

        entry_type = payload.get("type")
        ts = _iso(payload.get("timestamp"))

        # Session init (first line with id + instructions)
        if "instructions" in payload and "id" in payload and not entry_type:
            sid = "system_init"
            parts = []
            if payload.get("instructions"):
                parts.append(f"Instructions loaded ({len(payload['instructions'])} chars)")
            git = payload.get("git", {})
            if git.get("branch"):
                parts.append(f"Branch: {git['branch']}")
            text = "\n".join(parts) if parts else "Session init"
            yield _event("system", "system", "status", ts, sid, seq.next(sid), 1,
                         [_text_seg(text)], raw=payload)
            continue

        if entry_type == "message":
            role = payload.get("role")
            content = payload.get("content") or []

            if role == "user":
                turn += 1
                for item in content:
                    if isinstance(item, dict) and item.get("type") == "input_text":
                        text = item.get("text", "")
                        if text.strip():
                            msg_id = f"user_msg_{turn}"
                            yield _event("message", "user", "output", ts, msg_id,
                                         seq.next(msg_id), turn, [_text_seg(text)], raw=item)

            elif role == "assistant":
                for item in content:
                    if isinstance(item, dict) and item.get("type") == "output_text":
                        text = item.get("text", "")
                        if text.strip():
                            msg_id = f"assistant_msg_{turn}"
                            yield _event("message", "assistant", "output", ts, msg_id,
                                         seq.next(msg_id), max(turn, 1), [_text_seg(text)],
                                         raw=item)

        elif entry_type == "function_call":
            call_id = payload.get("call_id", "")
            name = payload.get("name", "unknown")
            args_str = payload.get("arguments", "{}")

            tool_info = {"provider": "builtin", "name": _slugify(name),
                         "display_name": name, "call_id": call_id}
            if call_id:
                pending_tools[call_id] = dict(tool_info)

            segments = []
            try:
                args = json.loads(args_str)
                segments.append(_json_seg(args))
            except (json.JSONDecodeError, TypeError):
                if args_str:
                    segments.append(_text_seg(args_str))

            sid = call_id or name
            yield _event("tool_call", "assistant", "actions", ts, sid, seq.next(sid),
                         max(turn, 1), segments, tool=tool_info, raw=payload)

        elif entry_type == "function_call_output":
            call_id = payload.get("call_id", "")
            output = payload.get("output", "")

            tool_info = None
            if call_id and call_id in pending_tools:
                tool_info = dict(pending_tools[call_id])
                tool_info["status"] = "ok"

            segments = []
            annotations: dict[str, Any] = {}
            if isinstance(output, str) and output:
                # Try to parse as JSON to extract metadata
                try:
                    parsed = json.loads(output)
                    if isinstance(parsed, dict):
                        actual_output = parsed.get("output", "")
                        exit_code = parsed.get("metadata", {}).get("exit_code")
                        if actual_output:
                            _add_text_segment(segments, actual_output, annotations,
                                              _MAX_TOOL_OUTPUT_CHARS)
                        if tool_info and exit_code is not None:
                            tool_info["exit_code"] = exit_code
                            if exit_code != 0:
                                tool_info["status"] = "error"
                        _add_json_and_text_segments(segments, parsed, annotations,
                                                    _MAX_TOOL_OUTPUT_CHARS)
                    else:
                        _add_text_segment(segments, output, annotations, _MAX_TOOL_OUTPUT_CHARS)
                except (json.JSONDecodeError, TypeError):
                    _add_text_segment(segments, output, annotations, _MAX_TOOL_OUTPUT_CHARS)

            sid = call_id or "tool_result"
            yield _event("tool_result", "tool", "actions", ts, sid, seq.next(sid),
                         max(turn, 1), segments, tool=tool_info, annotations=annotations, raw=payload)

        elif entry_type == "reasoning":
            summary = payload.get("summary", [])
            for s in summary:
                if isinstance(s, dict) and s.get("text"):
                    sid = payload.get("id", "reasoning")
                    yield _event("message", "assistant", "thinking", ts, sid, seq.next(sid),
                                 max(turn, 1), [_text_seg(s["text"])], raw=payload)


# ---------------------------------------------------------------------------
# gemini normalizer
# ---------------------------------------------------------------------------

# Gemini tool names → canonical slugs
_GEMINI_TOOL_MAP = {
    "run_shell_command": "bash",
    "read_file": "read",
    "write_file": "write",
    "edit_file": "edit",
    "list_dir": "glob",
    "activate_skill": "skill",
}


def normalize_gemini(data: dict | list) -> Iterator[dict]:
    """Normalize gemini JSON.

    Handles two formats:
    1. Session format: {sessionId, messages: [{id, timestamp, type, content, toolCalls}]}
    2. Flat log format (logs.json): [{sessionId, messageId, type, message, timestamp}]
    """
    # Detect format
    if isinstance(data, list) and data and isinstance(data[0], dict):
        if "messages" in data[0]:
            # List of session objects
            sessions = data
        elif "message" in data[0] and "messageId" in data[0]:
            # Flat log format — group by sessionId into pseudo-sessions
            by_session: dict[str, list] = {}
            for entry in data:
                sid = entry.get("sessionId", "unknown")
                by_session.setdefault(sid, []).append(entry)
            sessions = []
            for sid, entries in by_session.items():
                msgs = []
                for e in sorted(entries, key=lambda x: x.get("messageId", 0)):
                    msgs.append({
                        "id": f"{sid}_{e.get('messageId', 0)}",
                        "timestamp": e.get("timestamp"),
                        "type": e.get("type", ""),
                        "content": e.get("message", ""),
                    })
                sessions.append({"sessionId": sid, "messages": msgs})
        else:
            sessions = data
    elif isinstance(data, dict):
        sessions = [data]
    else:
        return

    for session in sessions:
        if not isinstance(session, dict):
            continue
        messages = session.get("messages", [])
        seq = _Sequencer()
        turn = 0

        for msg in messages:
            if not isinstance(msg, dict):
                continue
            msg_type = msg.get("type", "")
            ts = _iso(msg.get("timestamp"))
            raw_content = msg.get("content", "")
            msg_id = msg.get("id", "")

            # Gemini content can be a string or a list of {text: "..."} parts
            if isinstance(raw_content, list):
                content = "\n".join(
                    p.get("text", "") for p in raw_content
                    if isinstance(p, dict) and p.get("text")
                ).strip()
            elif isinstance(raw_content, str):
                content = raw_content.strip()
            else:
                content = str(raw_content).strip()

            if msg_type == "info":
                if content:
                    sid = msg_id or "info"
                    yield _event("system", "system", "status", ts, sid, seq.next(sid),
                                 max(turn, 1), [_text_seg(content)], raw=msg)

            elif msg_type == "user":
                turn += 1
                if content:
                    sid = msg_id or f"user_{turn}"
                    yield _event("message", "user", "output", ts, sid, seq.next(sid), turn,
                                 [_text_seg(content)], raw=msg)

            elif msg_type == "gemini":
                if content:
                    sid = msg_id or f"gemini_{turn}"
                    yield _event("message", "assistant", "output", ts, sid, seq.next(sid),
                                 max(turn, 1), [_text_seg(content)], raw=msg)

                # Process tool calls
                for tc in msg.get("toolCalls", []):
                    tc_id = tc.get("id", "")
                    tc_name = tc.get("name", "unknown")
                    tc_args = tc.get("args", {})

                    canonical = _GEMINI_TOOL_MAP.get(tc_name)
                    tool_info = {"provider": "builtin",
                                 "name": canonical or _slugify(tc_name),
                                 "display_name": tc_name, "call_id": tc_id}

                    segments = [_json_seg(tc_args)] if tc_args else []
                    sid = tc_id or tc_name
                    yield _event("tool_call", "assistant", "actions", ts, sid, seq.next(sid),
                                 max(turn, 1), segments, tool=tool_info, raw=tc)

                    # Tool results are inline in gemini format
                    for result in tc.get("result", []):
                        if isinstance(result, dict):
                            resp = result.get("functionResponse", {})
                            output = resp.get("response", {}).get("output", "")
                            result_tool = dict(tool_info)
                            result_tool["status"] = "ok"
                            r_segments: list[dict] = []
                            annotations: dict[str, Any] = {}
                            if output:
                                try:
                                    parsed = json.loads(output)
                                    _add_json_and_text_segments(r_segments, parsed, annotations,
                                                                _MAX_TOOL_OUTPUT_CHARS)
                                except (json.JSONDecodeError, TypeError):
                                    _add_text_segment(r_segments, output, annotations,
                                                      _MAX_TOOL_OUTPUT_CHARS)
                            r_sid = tc_id or "tool_result"
                            yield _event("tool_result", "tool", "actions", ts, r_sid,
                                         seq.next(r_sid), max(turn, 1), r_segments,
                                         tool=result_tool, annotations=annotations, raw=result)


# ---------------------------------------------------------------------------
# cursor-ide normalizer
# ---------------------------------------------------------------------------

# Cursor-ide tool names → canonical slugs (matching claude-code conventions)
_CURSOR_TOOL_MAP = {
    "run_terminal_cmd": "bash",
    "run_terminal_command": "bash",
    "run_terminal_command_v2": "bash",
    "read_file": "read",
    "read_file_v2": "read",
    "search_replace": "edit",
    "edit_file": "edit",
    "edit_file_v2": "edit",
    "write": "write",
    "write_file": "write",
    "grep": "grep",
    "grep_search": "grep",
    "codebase_search": "grep",
    "file_search": "grep",
    "glob_file_search": "glob",
    "list_dir": "glob",
    "list_dir_v2": "glob",
    "todo_write": "task",
    "read_lints": "lint",
}


def _normalize_cursor_args(name: str, raw_args: dict) -> dict:
    """Normalize cursor-ide tool arg names to canonical field names.

    Ensures field names match what other agents produce so verifiers
    can query file_path, command, etc. consistently.
    """
    args = dict(raw_args)

    # file_path normalization
    if "target_file" in args and "file_path" not in args:
        args["file_path"] = args.pop("target_file")
    if "target_directory" in args and "path" not in args:
        args["path"] = args.pop("target_directory")

    # write tool: "contents" → "content"
    if name in ("write", "write_file") and "contents" in args and "content" not in args:
        args["content"] = args.pop("contents")

    return args


def _parse_cursor_tool_event(tfd: dict, ts: str, turn: int,
                             seq: "_Sequencer") -> list[dict]:
    """Parse a toolFormerData dict into tool_call + tool_result events."""
    events: list[dict] = []
    cursor_name = tfd["name"]
    canonical = _CURSOR_TOOL_MAP.get(cursor_name)
    call_id = tfd.get("toolCallId", "")

    if canonical:
        tool_info = {"provider": "builtin", "name": canonical,
                     "display_name": cursor_name, "call_id": call_id}
    elif cursor_name.startswith("mcp_"):
        tool_info = _parse_tool_name(cursor_name)
        tool_info["call_id"] = call_id
    else:
        tool_info = {"provider": "builtin", "name": _slugify(cursor_name),
                     "display_name": cursor_name, "call_id": call_id}

    # Parse and normalize tool input (rawArgs preferred, params fallback)
    segments: list[dict] = []
    raw_args_str = tfd.get("rawArgs") or tfd.get("params")
    if raw_args_str:
        try:
            args = json.loads(raw_args_str)
            if isinstance(args, dict):
                if "relativeWorkspacePath" in args:
                    args["file_path"] = args.pop("relativeWorkspacePath")
                if "streamingContent" in args:
                    args["content"] = args.pop("streamingContent")
                args = _normalize_cursor_args(cursor_name, args)
            segments.append(_json_seg(args))
        except (json.JSONDecodeError, TypeError):
            segments.append(_text_seg(raw_args_str))

    sid = call_id or cursor_name
    events.append(_event("tool_call", "assistant", "actions", ts, sid,
                         seq.next(sid), turn, segments, tool=tool_info, raw=tfd))

    # Emit tool result if available
    result_str = tfd.get("result")
    if result_str is not None:
        result_tool = dict(tool_info)
        status = tfd.get("status", "")
        if status == "error":
            result_tool["status"] = "error"
        elif status in ("completed", "ok", "success"):
            result_tool["status"] = "ok"
        else:
            result_tool["status"] = "unknown"

        result_segments: list[dict] = []
        annotations: dict[str, Any] = {}
        if isinstance(result_str, str) and result_str:
            try:
                parsed = json.loads(result_str)
                if isinstance(parsed, dict):
                    out = (parsed.get("contents")
                           or parsed.get("output")
                           or parsed.get("text")
                           or "")
                    if out:
                        _add_text_segment(result_segments, str(out), annotations,
                                          _MAX_TOOL_OUTPUT_CHARS)
                    _add_json_and_text_segments(result_segments, parsed, annotations,
                                                _MAX_TOOL_OUTPUT_CHARS)
                else:
                    _add_text_segment(result_segments, result_str, annotations,
                                      _MAX_TOOL_OUTPUT_CHARS)
            except (json.JSONDecodeError, TypeError):
                _add_text_segment(result_segments, result_str, annotations,
                                  _MAX_TOOL_OUTPUT_CHARS)

        events.append(_event("tool_result", "tool", "actions", ts, sid,
                             seq.next(sid), turn, result_segments,
                             tool=result_tool, annotations=annotations, raw=tfd))
    return events


def _parse_cursor_session(lines: list[str]) -> tuple[dict | None, list[dict], list[dict]]:
    """Parse a cursor-ide JSONL file into metadata, bubbles, and transcript entries.

    A single file may contain any combination of:
      - ``"type": "metadata"`` — session metadata (workspace, composer ID)
      - ``"type": "bubble"``   — cursorDiskKV bubble data (tool calls, text)
      - ``"type": "transcript"`` — agent-transcript lines (full reasoning text)

    Returns (metadata, bubbles, transcript_entries).
    """
    metadata = None
    bubbles: list[dict] = []
    transcript: list[dict] = []
    for raw_line in lines:
        raw_line = raw_line.strip()
        if not raw_line:
            continue
        try:
            payload = json.loads(raw_line)
        except json.JSONDecodeError:
            continue
        if not isinstance(payload, dict):
            continue
        ptype = payload.get("type")
        if ptype == "metadata":
            metadata = payload
        elif ptype == "bubble":
            raw = payload.get("raw", {})
            bubbles.append({
                "bubble_id": payload.get("bubble_id", ""),
                "bubble_type": raw.get("type"),  # 1=user, 2=assistant
                "text": (raw.get("text") or raw.get("rawText") or "").strip(),
                "ts": _iso(raw.get("timestamp") or raw.get("createdAt")),
                "tfd": raw.get("toolFormerData") if isinstance(raw.get("toolFormerData"), dict) else None,
                "thinking": (raw.get("thinking") if isinstance(raw.get("thinking"), str) else "").strip(),
                "raw": raw,
            })
        elif ptype == "transcript":
            role = payload.get("role", "")
            text_parts = []
            for item in payload.get("message", {}).get("content", []):
                if isinstance(item, dict) and item.get("type") == "text":
                    text_parts.append(item.get("text", ""))
            text = "\n".join(text_parts).strip()
            if not text:
                continue
            # Strip <user_query> wrapper
            if role == "user" and "<user_query>" in text:
                m = re.search(r"<user_query>\s*(.*?)\s*</user_query>", text, re.DOTALL)
                if m:
                    text = m.group(1)
            transcript.append({"role": role, "text": text})
    return metadata, bubbles, transcript


def _text_prefix(text: str, length: int = 60) -> str:
    """Normalize text prefix for fuzzy matching."""
    return " ".join(text.split())[:length].lower()


def normalize_cursor_ide(lines: list[str]) -> Iterator[dict]:
    """Normalize cursor-ide session, merging bubbles and transcript when both exist.

    A single JSONL file may contain bubble records (from cursorDiskKV) and/or
    transcript records (from agent-transcripts), combined at collection time.

    When both are present:
      - Transcript provides the message text (full reasoning, no truncation)
      - Bubbles provide structured tool calls (name, args, results)
      - Events are interleaved: message → tool calls → message → tool calls ...

    When only bubbles are available (older Cursor): uses bubble text + tool data.
    When only transcript is available: text-only messages, no tool events.
    """
    seq = _Sequencer()
    metadata, bubbles, transcript = _parse_cursor_session(lines)

    # Emit system init from metadata
    if metadata:
        sid = "system_init"
        parts = []
        if metadata.get("workspace_path"):
            parts.append(f"Workspace: {metadata['workspace_path']}")
        if metadata.get("composer_id"):
            parts.append(f"Composer: {metadata['composer_id']}")
        text = "\n".join(parts) if parts else "Session init"
        ts = _iso(metadata.get("composer_data_raw", {}).get("createdAt"))
        yield _event("system", "system", "status", ts, sid, seq.next(sid), 1,
                     [_text_seg(text)], raw=metadata)

    # --- Merged path: both bubbles and transcript ---
    if bubbles and transcript:
        # Build index of tool events between text/user bubbles.
        # Walk the bubble list and group: each text/user bubble starts a group,
        # tool bubbles following it belong to that group.
        #
        # Bubble types:
        #   user (type=1, has text) → user message
        #   assistant text (type=2, has text, no tfd) → assistant message
        #   assistant tool (type=2, has tfd) → tool call
        #   assistant thinking (type=2, has thinking, no text, no tfd) → skip

        # Separate bubbles into text entries and tool groups
        text_bubbles: list[dict] = []  # user + assistant text bubbles in order
        tool_groups: list[list[dict]] = []  # tool bubbles following each text bubble
        current_tools: list[dict] = []

        for b in bubbles:
            is_user = b["bubble_type"] == 1
            is_text = b["bubble_type"] == 2 and b["text"] and not b["tfd"]
            is_tool = b["bubble_type"] == 2 and b["tfd"]

            if is_user or is_text:
                # Flush accumulated tools to the previous text bubble's group
                if text_bubbles:
                    tool_groups.append(current_tools)
                    current_tools = []
                text_bubbles.append(b)
            elif is_tool:
                current_tools.append(b)
        # Flush last group
        if text_bubbles:
            tool_groups.append(current_tools)

        # Match transcript entries to text bubbles by content prefix.
        # Build a map: transcript_index → bubble_index
        tb_prefixes = [_text_prefix(b["text"]) for b in text_bubbles]
        tr_prefixes = [_text_prefix(t["text"]) for t in transcript]

        # Greedy forward matching: for each transcript entry, find the next
        # unmatched text bubble whose prefix matches.
        tr_to_tb: dict[int, int] = {}
        next_tb = 0
        for ti, tp in enumerate(tr_prefixes):
            for bi in range(next_tb, len(tb_prefixes)):
                if tb_prefixes[bi] == tp:
                    tr_to_tb[ti] = bi
                    next_tb = bi + 1
                    break

        # Emit events: walk transcript entries, emit message from transcript text,
        # then emit any tool events from the matched bubble's tool group.
        turn = 0
        for ti, tr_entry in enumerate(transcript):
            if tr_entry["role"] == "user":
                turn += 1

            bi = tr_to_tb.get(ti)
            # Use bubble timestamp if matched, else current time
            ts = text_bubbles[bi]["ts"] if bi is not None else _iso(None)
            actor = "user" if tr_entry["role"] == "user" else "assistant"
            sid = text_bubbles[bi]["bubble_id"] if bi is not None else f"{actor}_{turn}"

            yield _event("message", actor, "output", ts, sid, seq.next(sid),
                         max(turn, 1), [_text_seg(tr_entry["text"])])

            # Emit tool calls from the matched bubble's tool group
            if bi is not None and bi < len(tool_groups):
                for tool_bubble in tool_groups[bi]:
                    if tool_bubble["tfd"] and tool_bubble["tfd"].get("name"):
                        yield from _parse_cursor_tool_event(
                            tool_bubble["tfd"], tool_bubble["ts"],
                            max(turn, 1), seq)

        # Emit any remaining tool events from unmatched bubble groups (e.g. tools
        # after the last text bubble that didn't match a transcript entry)
        matched_bis = set(tr_to_tb.values())
        for bi, tb in enumerate(text_bubbles):
            if bi not in matched_bis and bi < len(tool_groups):
                for tool_bubble in tool_groups[bi]:
                    if tool_bubble["tfd"] and tool_bubble["tfd"].get("name"):
                        yield from _parse_cursor_tool_event(
                            tool_bubble["tfd"], tool_bubble["ts"],
                            max(turn, 1), seq)
        return

    # --- Bubbles only (older Cursor, no transcript available) ---
    if bubbles:
        turn = 0
        for b in bubbles:
            if b["bubble_type"] == 1:
                turn += 1
                if b["text"]:
                    bid = b["bubble_id"] or f"user_{turn}"
                    yield _event("message", "user", "output", b["ts"], bid,
                                 seq.next(bid), turn, [_text_seg(b["text"])], raw=b["raw"])
            elif b["bubble_type"] == 2:
                if b["text"] and not b["tfd"]:
                    bid = b["bubble_id"] or f"assistant_{turn}"
                    yield _event("message", "assistant", "output", b["ts"], bid,
                                 seq.next(bid), max(turn, 1), [_text_seg(b["text"])],
                                 raw=b["raw"])
                if b["tfd"] and b["tfd"].get("name"):
                    yield from _parse_cursor_tool_event(
                        b["tfd"], b["ts"], max(turn, 1), seq)
        return

    # --- Transcript only (no cursorDiskKV data) ---
    if transcript:
        turn = 0
        for tr_entry in transcript:
            if tr_entry["role"] == "user":
                turn += 1
            actor = "user" if tr_entry["role"] == "user" else "assistant"
            sid = f"{actor}_{turn}"
            yield _event("message", actor, "output", _iso(None), sid,
                         seq.next(sid), max(turn, 1), [_text_seg(tr_entry["text"])])


# ---------------------------------------------------------------------------
# cursor-agent normalizer
# ---------------------------------------------------------------------------


def normalize_cursor_agent(lines: list[str]) -> Iterator[dict]:
    """Normalize cursor-agent JSONL (metadata + blob records)."""
    seq = _Sequencer()
    turn = 0
    pending_tools: dict[str, dict] = {}
    event_counter = 0
    session_base_ts: datetime | None = None
    session_ts_source: str | None = None

    def next_ts() -> tuple[str, str]:
        nonlocal event_counter, session_base_ts, session_ts_source
        if session_base_ts is None:
            session_base_ts = datetime.now(timezone.utc)
            session_ts_source = "inferred"
        ts = session_base_ts + timedelta(microseconds=event_counter)
        event_counter += 1
        return _iso(ts), session_ts_source or "inferred"

    for raw_line in lines:
        raw_line = raw_line.strip()
        if not raw_line:
            continue
        try:
            payload = json.loads(raw_line)
        except json.JSONDecodeError:
            continue
        if not isinstance(payload, dict):
            continue

        ptype = payload.get("type")

        if ptype == "metadata":
            sid = "system_init"
            parts = []
            if payload.get("workspace_path"):
                parts.append(f"Workspace: {payload['workspace_path']}")
            if payload.get("session_id"):
                parts.append(f"Session: {payload['session_id']}")
            text = "\n".join(parts) if parts else "Session init"
            created_at = payload.get("meta_raw", {}).get("createdAt")
            if created_at is not None:
                session_base_ts = datetime.fromtimestamp(
                    (created_at / 1000.0) if created_at > 1e12 else created_at,
                    tz=timezone.utc,
                )
                session_ts_source = "session_createdAt"
            ts, ts_source = next_ts()
            yield _event("system", "system", "status", ts, sid, seq.next(sid), 1,
                         [_text_seg(text)], annotations={"timestamp_source": ts_source}, raw=payload)

        elif ptype == "blob" and payload.get("format") == "json":
            raw = payload.get("raw", {})
            if not isinstance(raw, dict):
                continue
            role = raw.get("role", "")
            content = raw.get("content", "")
            ts, ts_source = next_ts()
            ts_annotations = {"timestamp_source": ts_source}

            if role == "system":
                sid = "system_msg"
                text = content if isinstance(content, str) else str(content)
                if text.strip():
                    yield _event("system", "system", "status", ts, sid, seq.next(sid),
                                 max(turn, 1), [_text_seg(text[:500])],
                                 annotations=ts_annotations, raw=raw)

            elif role == "user":
                turn += 1
                # Content can be string, stringified list, or native list
                text_parts = []
                items = None
                if isinstance(content, list):
                    items = content
                elif isinstance(content, str):
                    try:
                        items = eval(content)  # cursor stores as repr'd list
                        if not isinstance(items, list):
                            text_parts.append(content)
                            items = None
                    except Exception:
                        text_parts.append(content)
                if isinstance(items, list):
                    for item in items:
                        if isinstance(item, dict):
                            if item.get("type") == "text":
                                t = item.get("text", "")
                                # Strip XML-like wrapper tags
                                t = re.sub(r"<user_query>\s*", "", t)
                                t = re.sub(r"\s*</user_query>", "", t)
                                t = re.sub(r"<user_info>.*?</user_info>", "", t, flags=re.DOTALL)
                                t = t.strip()
                                if t:
                                    text_parts.append(t)
                if text_parts:
                    sid = f"user_{turn}"
                    for tp in text_parts:
                        if tp.strip():
                            yield _event("message", "user", "output", ts, sid, seq.next(sid),
                                         turn, [_text_seg(tp)], annotations=ts_annotations, raw=raw)

            elif role == "assistant":
                # Content can be string, stringified list, or native list
                text_parts = []
                tool_calls = []
                items = None
                if isinstance(content, list):
                    items = content
                elif isinstance(content, str):
                    try:
                        items = eval(content)
                        if not isinstance(items, list):
                            text_parts.append(("output", content))
                            items = None
                    except Exception:
                        text_parts.append(("output", content))
                if isinstance(items, list):
                    for item in items:
                        if isinstance(item, dict):
                            if item.get("type") == "text":
                                t = item.get("text", "")
                                # Separate thinking from output
                                think_match = re.match(r"<think>(.*?)</think>\s*(.*)", t, re.DOTALL)
                                if think_match:
                                    thinking = think_match.group(1).strip()
                                    rest = think_match.group(2).strip()
                                    if thinking:
                                        text_parts.append(("thinking", thinking))
                                    if rest:
                                        text_parts.append(("output", rest))
                                elif t.strip():
                                    text_parts.append(("output", t.strip()))
                            elif item.get("type") in ("tool_use", "tool-call"):
                                tool_calls.append(item)

                for channel, text in text_parts:
                    if text.strip():
                        sid = f"assistant_{turn}"
                        yield _event("message", "assistant", channel, ts, sid, seq.next(sid),
                                     max(turn, 1), [_text_seg(text)],
                                     annotations=ts_annotations, raw=raw)

                for tc in tool_calls:
                    tc_id = tc.get("id", "") or tc.get("toolCallId", "")
                    tc_name = tc.get("name", "") or tc.get("toolName", "") or "unknown"
                    tc_input = tc.get("input", {}) or tc.get("args", {})
                    tool_info = {"provider": "builtin", "name": _slugify(tc_name),
                                 "display_name": tc_name, "call_id": tc_id}
                    if tc_id:
                        pending_tools[tc_id] = dict(tool_info)
                    segments = [_json_seg(tc_input)] if tc_input else []
                    sid = tc_id or tc_name
                    yield _event("tool_call", "assistant", "actions", ts, sid, seq.next(sid),
                                 max(turn, 1), segments, tool=tool_info,
                                 annotations=ts_annotations, raw=tc)

            elif role == "tool":
                # Tool results — content can be string, stringified list, or native list
                items = None
                if isinstance(content, list):
                    items = content
                elif isinstance(content, str):
                    try:
                        items = eval(content)
                        if not isinstance(items, list):
                            items = None
                    except Exception:
                        items = None
                if isinstance(items, list):
                    for item in items:
                        if isinstance(item, dict) and item.get("type") == "tool-result":
                            tc_id = item.get("toolCallId", "")
                            tc_name = item.get("toolName", "unknown")
                            result = item.get("result", "")

                            tool_info = None
                            if tc_id and tc_id in pending_tools:
                                tool_info = dict(pending_tools[tc_id])
                                tool_info["status"] = "unknown"
                            else:
                                tool_info = {"provider": "builtin",
                                             "name": _slugify(tc_name),
                                             "display_name": tc_name,
                                             "call_id": tc_id, "status": "unknown"}

                            segments: list[dict] = []
                            annotations: dict[str, Any] = dict(ts_annotations)
                            parsed_result = None
                            if isinstance(result, str) and result:
                                try:
                                    parsed_result = json.loads(result)
                                    _add_json_and_text_segments(segments, parsed_result,
                                                                annotations,
                                                                _MAX_TOOL_OUTPUT_CHARS)
                                except (json.JSONDecodeError, TypeError):
                                    _add_text_segment(segments, result, annotations,
                                                      _MAX_TOOL_OUTPUT_CHARS)
                            elif result:
                                parsed_result = result
                                _add_json_and_text_segments(segments, result, annotations,
                                                            _MAX_TOOL_OUTPUT_CHARS)

                            if isinstance(parsed_result, dict):
                                if parsed_result.get("error") or parsed_result.get("stderr"):
                                    tool_info["status"] = "error"
                                elif parsed_result.get("exit_code") is not None:
                                    tool_info["status"] = "ok" if parsed_result.get("exit_code") == 0 else "error"

                            sid = tc_id or "tool_result"
                            yield _event("tool_result", "tool", "actions", ts, sid,
                                         seq.next(sid), max(turn, 1), segments,
                                         tool=tool_info, annotations=annotations, raw=item)


# ---------------------------------------------------------------------------
# Enrichment — add semantic `action` and `files` fields to tool_call events
# ---------------------------------------------------------------------------

_PATCH_FILE_RE = re.compile(r"\*\*\* (?:Update|Add|Delete) File: (.+)")

# Bash commands that are primarily file reads
_BASH_READ_RE = re.compile(
    r"^(?:bash\s+-\w+\s+)?(?:cat|head|tail|less|bat|more)\s+"
    r"|^(?:bash\s+-\w+\s+)?sed\s+-?n\s+"
)
# Bash commands that are file writes
_BASH_WRITE_RE = re.compile(
    r"apply_patch\b|\*\*\* Begin Patch|^(?:bash\s+-\w+\s+)?(?:sed\s+-i|tee)\s+"
    r"|cat\s*>\s*\S+|>\s*\S+\.\w+|<<['\"]?\w+"
)
# Bash commands that are file/content searches
_BASH_SEARCH_RE = re.compile(
    r"^(?:bash\s+-\w+\s+)?(?:grep|rg|find|fd|ag|ack|ls)\s+"
)

# Extract file path from simple bash commands like `sed -n '1,200p' path/to/file`
# or `cat path/to/file` — take the last non-flag, non-range argument
_BASH_FILE_ARG_RE = re.compile(
    r"(?:cat|head|tail|less|bat|more|sed\s+[^|]+?)\s+"
    r"(?:'[^']*'\s+)*"  # skip quoted args like '1,200p'
    r"([^\s|><;'\"]+\.\w+)"  # file path with extension
)


def _json_input_from_event(event: dict) -> dict | None:
    """Parse first JSON segment into a dict."""
    for seg in event.get("segments", []):
        if isinstance(seg, dict) and seg.get("type") == "json":
            try:
                parsed = json.loads(seg["data"])
                return parsed if isinstance(parsed, dict) else None
            except (json.JSONDecodeError, KeyError):
                pass
    return None


def _get_bash_command(event: dict) -> str | None:
    """Extract the bash command string from a bash/shell tool_call."""
    inp = _json_input_from_event(event)
    if not inp:
        return None
    cmd = inp.get("command") or inp.get("cmd")
    if isinstance(cmd, list):
        return cmd[-1] if cmd else None
    return cmd if isinstance(cmd, str) else None


def _extract_files_from_tool_input(event: dict) -> list[str]:
    """Extract file paths from structured tool input (read/edit/write/grep/glob)."""
    inp = _json_input_from_event(event)
    if not inp:
        # Cursor-IDE fallback: raw.uri
        raw = event.get("raw", {})
        if isinstance(raw, dict):
            uri = raw.get("uri")
            if isinstance(uri, dict):
                fp = uri.get("_fsPath") or uri.get("path")
                if fp:
                    return [fp]
        return []
    fp = inp.get("file_path") or inp.get("path") or inp.get("file")
    if fp:
        return [str(fp)]
    return []


_HEREDOC_RE = re.compile(r"<<['\"]?(\w+)['\"]?\n(.*)", re.DOTALL)


def _extract_new_text(event: dict) -> str | None:
    """Extract written/edited content from a tool_call event.

    Handles:
    - Structured tool input: content, new_string, or contents field
    - Bash heredoc: cat > file <<'DELIM'\\n...content...\\nDELIM
    """
    inp = _json_input_from_event(event)
    if not inp:
        return None

    # Structured write/edit tools
    text = inp.get("new_string") or inp.get("content") or inp.get("contents")
    if text and isinstance(text, str):
        return text

    # Bash heredoc (codex style: cat > file <<'PY'\n...code...\nPY)
    cmd = inp.get("cmd") or inp.get("command")
    if isinstance(cmd, str) and "<<" in cmd:
        m = _HEREDOC_RE.search(cmd)
        if m:
            delim = m.group(1)
            body = m.group(2)
            end = body.rfind(delim)
            if end > 0:
                return body[:end].rstrip("\n")
            return body

    return None


_BASH_REDIRECT_RE = re.compile(r">\s*([^\s|><;'\"]+\.\w+)")


def _extract_files_from_bash(cmd: str) -> list[str]:
    """Extract file paths from bash commands."""
    # apply_patch: extract from patch headers
    patch_files = _PATCH_FILE_RE.findall(cmd)
    if patch_files:
        return [f.strip() for f in patch_files]
    # Redirect writes: cat > file.html, echo > file.txt
    m = _BASH_REDIRECT_RE.search(cmd)
    if m:
        return [m.group(1)]
    # Simple commands: sed -n '1,200p' path/to/file, cat path/to/file
    m = _BASH_FILE_ARG_RE.search(cmd)
    if m:
        return [m.group(1)]
    return []


def _skill_name_from_id(skill_id: str) -> str:
    """Extract the bare skill name from a possibly-prefixed identifier.

    Skill identifiers may be bare ("pdf"), tile-prefixed ("tessl__pdf"),
    or fully qualified ("org__tile__skill-name").  The bare name is
    everything after the last ``__`` separator, or the whole string if
    there is no separator.
    """
    if "__" in skill_id:
        return skill_id.rsplit("__", 1)[-1]
    return skill_id


# Regex to pull the skill name out of "Launching skill: <id>"
_LAUNCHING_SKILL_RE = re.compile(r"Launching skill:\s*(.+)")
_ACTIVATED_SKILL_RE = re.compile(r'<activated_skill\s+name="([^"]+)"')

# Regex to detect a SKILL.md read — captures the skill directory name
_SKILL_MD_PATH_RE = re.compile(r"(?:^|/)skills/([^/]+)/SKILL\.md$")


def _enrich_event(event: dict) -> dict:
    """Add semantic fields to normalized events.

    For tool_call events: ``action``, ``files``, and skill activation fields.
    For tool_result events: skill activation fields when tool is ``skill``.

    Skill activation fields (added to both the tool_call and tool_result):
      - ``skill_id``   — the identifier as passed to the tool (e.g. "tessl__pdf")
      - ``skill_name`` — the bare name with any prefix stripped (e.g. "pdf")
      - ``action``     — set to ``"skill_activate"`` on the tool_call

    SKILL.md direct reads also get ``skill_name`` so verifiers can detect
    both activation paths with a single field check.
    """
    kind = event.get("kind")
    tool = event.get("tool")
    if not isinstance(tool, dict):
        return event
    name = tool.get("name", "")

    # ---- Skill activation (tool_call) ----
    if kind == "tool_call" and name == "skill":
        inp = _json_input_from_event(event)
        if inp:
            sid = inp.get("skill", "") or inp.get("name", "")
            if sid:
                event["action"] = "skill_activate"
                event["skill_id"] = sid
                event["skill_name"] = _skill_name_from_id(sid)
        return event

    # ---- Skill activation (tool_result) ----
    if kind == "tool_result" and name == "skill":
        # Extract from "Launching skill: <id>" or <activated_skill name="..."> in text segments
        for seg in event.get("segments", []):
            if isinstance(seg, dict) and seg.get("type") == "text":
                text = seg.get("data", "")
                m = _LAUNCHING_SKILL_RE.search(text) or _ACTIVATED_SKILL_RE.search(text)
                if m:
                    sid = m.group(1).strip()
                    event["skill_id"] = sid
                    event["skill_name"] = _skill_name_from_id(sid)
                    break
        return event

    # ---- Only tool_calls from here on ----
    if kind != "tool_call":
        return event

    action = None
    files: list[str] = []

    if name in ("read",):
        action = "file_read"
        files = _extract_files_from_tool_input(event)
        # Detect direct SKILL.md reads
        for fp in files:
            sm = _SKILL_MD_PATH_RE.search(fp)
            if sm:
                event["skill_name"] = sm.group(1)
                break
    elif name in ("edit", "write"):
        action = "file_write"
        files = _extract_files_from_tool_input(event)
        new_text = _extract_new_text(event)
        if new_text:
            event["new_text"] = new_text
    elif name in ("grep",):
        action = "file_search"
        files = _extract_files_from_tool_input(event)
    elif name in ("glob",):
        action = "file_search"
        files = _extract_files_from_tool_input(event)
    elif name in ("bash", "shell", "shell_command", "exec_command"):
        cmd = _get_bash_command(event)
        if cmd:
            if _BASH_WRITE_RE.search(cmd):
                action = "file_write"
                files = _extract_files_from_bash(cmd)
                new_text = _extract_new_text(event)
                if new_text:
                    event["new_text"] = new_text
            elif _BASH_READ_RE.search(cmd):
                action = "file_read"
                files = _extract_files_from_bash(cmd)
                # Detect SKILL.md reads via bash (e.g. cat .../SKILL.md)
                for fp in files:
                    sm = _SKILL_MD_PATH_RE.search(fp)
                    if sm:
                        event["skill_name"] = sm.group(1)
                        break
            elif _BASH_SEARCH_RE.search(cmd):
                action = "file_search"
                files = _extract_files_from_bash(cmd)
            else:
                action = "command_run"
        else:
            action = "command_run"
    elif name == "task":
        action = "command_run"

    if action:
        event["action"] = action
    if files:
        event["files"] = files
    return event


# ---------------------------------------------------------------------------
# Orchestrator
# ---------------------------------------------------------------------------

NORMALIZERS = {
    "claude-code": ("jsonl", normalize_claude_code),
    "codex": ("jsonl", normalize_codex),
    "gemini": ("json", normalize_gemini),
    "cursor-ide": ("jsonl", normalize_cursor_ide),
    "cursor-agent": ("jsonl", normalize_cursor_agent),
}


def normalize_file(agent: str, raw_path: Path, out_path: Path) -> int:
    """Normalize a single file. Returns event count."""
    fmt, normalizer = NORMALIZERS[agent]

    # Security: redact secrets from raw text before any parsing so that
    # credentials never appear in normalized output or downstream stages.
    raw_text = redact_secrets(raw_path.read_text(encoding="utf-8", errors="replace"))

    if fmt == "jsonl":
        events = list(normalizer(raw_text.splitlines()))
    else:
        events = list(normalizer(json.loads(raw_text)))

    # Enrich with semantic action/files fields
    events = [_enrich_event(ev) for ev in events]

    out_path.parent.mkdir(parents=True, exist_ok=True)
    with open(out_path, "w", encoding="utf-8") as f:
        for ev in events:
            f.write(json.dumps(ev, ensure_ascii=False) + "\n")

    return len(events)


def _discover_projects(base_dir: Path) -> list[str]:
    """Find all project directories under base_dir that have a raw/ subdirectory."""
    projects = []
    if not base_dir.is_dir():
        return projects
    for d in sorted(base_dir.iterdir()):
        if d.is_dir() and (d / "raw").is_dir():
            projects.append(d.name)
    return projects


def _normalize_project(raw_dir: Path, out_dir: Path, agents: list[str] | None,
                       refresh: bool) -> tuple[int, int]:
    """Normalize one project's raw logs. Returns (files, events)."""
    total_files = 0
    total_events = 0

    for agent_dir in sorted(raw_dir.iterdir()):
        if not agent_dir.is_dir():
            continue
        agent = agent_dir.name
        if agents and agent not in agents:
            continue

        if agent not in NORMALIZERS:
            print(f"  {agent}: no normalizer available (skipping)")
            continue

        fmt = NORMALIZERS[agent][0]
        glob_pattern = "*.jsonl" if fmt == "jsonl" else "*.json"

        for raw_file in sorted(agent_dir.glob(glob_pattern)):
            out_file = out_dir / agent / raw_file.with_suffix(".jsonl").name
            if out_file.exists() and not refresh:
                continue
            try:
                count = normalize_file(agent, raw_file, out_file)
                total_files += 1
                total_events += count
                print(f"  {agent}/{raw_file.name}: {count} events")
            except Exception as e:
                print(f"  {agent}/{raw_file.name}: ERROR {e}", file=sys.stderr)

    return total_files, total_events


def main():
    parser = argparse.ArgumentParser(description="Normalize raw agent logs")
    parser.add_argument("--project", default=None,
                        help="Project name (default: auto-discover all projects)")
    parser.add_argument("--cwd", default=os.getcwd(),
                        help="Project directory (default: cwd)")
    parser.add_argument("--audit-dir", default=None,
                        help="Output directory for audit data (default: <cwd>/.tessl/logs)")
    parser.add_argument("--base-dir", default=None, dest="audit_dir_compat",
                        help=argparse.SUPPRESS)
    parser.add_argument("--raw-dir", default=None,
                        help="Raw logs directory (overrides --base-dir/--project)")
    parser.add_argument("--out-dir", default=None,
                        help="Output directory (overrides --base-dir/--project)")
    parser.add_argument("--agents", nargs="*", default=None,
                        help="Only normalize these agents (default: all found)")
    parser.add_argument("--refresh", action="store_true",
                        help="Re-normalize even if output exists")
    args = parser.parse_args()

    cwd = os.path.realpath(args.cwd)
    audit_dir = args.audit_dir or args.audit_dir_compat
    base_dir = Path(audit_dir) if audit_dir else Path(cwd) / ".tessl" / "logs"

    # If explicit dirs given, use them directly (backwards compat)
    if args.raw_dir or args.out_dir:
        raw_dir = Path(args.raw_dir or f"{base_dir}/raw")
        out_dir = Path(args.out_dir or f"{base_dir}/normalized")
        if not raw_dir.exists():
            print(f"Raw logs not found: {raw_dir}")
            return
        files, events = _normalize_project(raw_dir, out_dir, args.agents, args.refresh)
        print(f"\nNormalized {files} files, {events} events -> {out_dir}")
        return

    # Project mode
    if args.project:
        projects = [args.project]
    else:
        projects = _discover_projects(base_dir)
        if not projects:
            print(f"No projects found under {base_dir}/ (looking for <project>/raw/)")
            return

    total_files = 0
    total_events = 0
    for project in projects:
        raw_dir = base_dir / project / "raw"
        out_dir = base_dir / project / "normalized"
        if not raw_dir.exists():
            print(f"Project '{project}': raw dir not found ({raw_dir})")
            continue
        print(f"── {project} ──")
        files, events = _normalize_project(raw_dir, out_dir, args.agents, args.refresh)
        total_files += files
        total_events += events

    print(f"\nNormalized {total_files} files, {total_events} events")


if __name__ == "__main__":
    main()

tile.json