CtrlK
BlogDocsLog inGet started
Tessl Logo

odyssey4me/gerrit

Submit, amend, and review Gerrit changes using git-review CLI. Use when asked to submit a patchset, download a change, rebase a change request, check CR status, or manage code reviews in Gerrit.

94

Quality

94%

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Skills
Evals
Files

gerrit.pyscripts/

#!/usr/bin/env python3
"""Gerrit wrapper skill for AI agents.

Wraps Gerrit SSH query commands to produce markdown-formatted output for
read/view operations. Action commands (review, abandon, submit) should use
SSH gerrit commands directly.

Usage:
    python gerrit.py check
    python gerrit.py changes list
    python gerrit.py changes view 12345
    python gerrit.py changes search "status:open project:myproject"
    python gerrit.py projects list

Requirements:
    SSH access to a Gerrit server (typically configured via .gitreview)
"""

from __future__ import annotations

import argparse
import configparser
import json
import subprocess
import sys
from datetime import datetime, timezone
from pathlib import Path
from typing import Any

# ============================================================================
# SSH / GERRIT HELPERS
# ============================================================================


def _read_gitreview(path: str | None = None) -> dict[str, str]:
    """Parse .gitreview file for Gerrit connection details.

    Args:
        path: Path to .gitreview file. Defaults to .gitreview in cwd.

    Returns:
        Dict with host, port, project, username keys (values may be empty).
    """
    gitreview_path = Path(path) if path else Path(".gitreview")
    result: dict[str, str] = {"host": "", "port": "29418", "project": "", "username": ""}

    if not gitreview_path.exists():
        return result

    config = configparser.ConfigParser()
    config.read(str(gitreview_path))

    if config.has_section("gerrit"):
        result["host"] = config.get("gerrit", "host", fallback="")
        result["port"] = config.get("gerrit", "port", fallback="29418")
        result["project"] = config.get("gerrit", "project", fallback="")

    return result


def _get_ssh_cmd(host: str, port: str = "29418", username: str | None = None) -> list[str]:
    """Build SSH command prefix for Gerrit.

    Args:
        host: Gerrit server hostname.
        port: SSH port (default 29418).
        username: SSH username (optional).

    Returns:
        List of command parts for SSH connection.
    """
    cmd = ["ssh", "-p", port]
    if username:
        cmd.append(f"{username}@{host}")
    else:
        cmd.append(host)
    return cmd


def run_gerrit_query(
    host: str,
    query: str,
    port: str = "29418",
    username: str | None = None,
    extra_args: list[str] | None = None,
) -> list[dict[str, Any]]:
    """Execute a Gerrit SSH query and return parsed results.

    Gerrit query returns newline-delimited JSON with a stats line at the end.

    Args:
        host: Gerrit server hostname.
        query: Gerrit query string.
        port: SSH port.
        username: SSH username.
        extra_args: Additional arguments (e.g., --current-patch-set).

    Returns:
        List of change/result dicts (stats line excluded).

    Raises:
        SystemExit: If SSH command fails.
    """
    ssh_cmd = _get_ssh_cmd(host, port, username)
    gerrit_args = ["gerrit", "query", "--format=JSON", query]
    if extra_args:
        gerrit_args.extend(extra_args)

    cmd = [*ssh_cmd, *gerrit_args]
    result = subprocess.run(cmd, capture_output=True, text=True)
    if result.returncode != 0:
        print(f"Error: {result.stderr.strip()}", file=sys.stderr)
        sys.exit(1)

    results = []
    for line in result.stdout.strip().splitlines():
        if not line.strip():
            continue
        try:
            obj = json.loads(line)
        except json.JSONDecodeError:
            continue
        # Skip the stats line (has "type": "stats")
        if obj.get("type") == "stats":
            continue
        results.append(obj)

    return results


# ============================================================================
# DATE FORMATTING
# ============================================================================


def format_timestamp(timestamp: int | None) -> str:
    """Format a Unix timestamp to YYYY-MM-DD HH:MM.

    Args:
        timestamp: Unix timestamp (seconds since epoch).

    Returns:
        Formatted date string, or "N/A" if input is None/0.
    """
    if not timestamp:
        return "N/A"
    try:
        dt = datetime.fromtimestamp(timestamp, tz=timezone.utc)
        return dt.strftime("%Y-%m-%d %H:%M")
    except (OSError, ValueError, OverflowError):
        return "N/A"


# ============================================================================
# HELPER FUNCTIONS
# ============================================================================


def _get_owner(owner: dict[str, Any] | None) -> str:
    """Extract owner name from a Gerrit owner dict.

    Args:
        owner: Owner dictionary with 'username' or 'name' key.

    Returns:
        Username/name string or "Unknown".
    """
    if not owner:
        return "Unknown"
    if isinstance(owner, dict):
        return owner.get("username", owner.get("name", "Unknown"))
    return str(owner)


# ============================================================================
# FORMAT FUNCTIONS — one per entity type (markdown output)
# ============================================================================


def format_change_summary(change: dict[str, Any]) -> str:
    """Format a Gerrit change for markdown display.

    Args:
        change: Change dictionary from Gerrit query JSON.

    Returns:
        Markdown-formatted string.
    """
    number = change.get("number", "?")
    subject = change.get("subject", "(No subject)")
    status = change.get("status", "UNKNOWN")
    owner = _get_owner(change.get("owner"))
    project = change.get("project", "")
    branch = change.get("branch", "")
    created = format_timestamp(change.get("createdOn"))

    lines = [
        f"### Change {number}: {subject}",
        f"- **Status:** {status}",
        f"- **Owner:** {owner}",
    ]
    if project:
        lines.append(f"- **Project:** {project}")
    if branch:
        lines.append(f"- **Branch:** {branch}")

    topic = change.get("topic")
    if topic:
        lines.append(f"- **Topic:** {topic}")

    lines.append(f"- **Created:** {created}")

    updated = format_timestamp(change.get("lastUpdated"))
    if updated != "N/A":
        lines.append(f"- **Updated:** {updated}")

    # Current patch set approvals
    patch_set = change.get("currentPatchSet", {})
    if isinstance(patch_set, dict):
        approvals = patch_set.get("approvals", [])
        if approvals:
            lines.append("\n**Approvals:**")
            for approval in approvals:
                if isinstance(approval, dict):
                    by = approval.get("by", {})
                    reviewer = (
                        by.get("username", by.get("name", "?")) if isinstance(by, dict) else "?"
                    )
                    a_type = approval.get("type", "?")
                    value = approval.get("value", "?")
                    lines.append(f"- **{a_type}:** {value} (by {reviewer})")

    # Comments
    comments = change.get("comments", [])
    if comments:
        lines.append(f"\n**Comments ({len(comments)}):**")
        for comment in comments[-5:]:  # Show last 5 comments
            if isinstance(comment, dict):
                reviewer = _get_owner(comment.get("reviewer"))
                message = comment.get("message", "").strip()
                ts = format_timestamp(comment.get("timestamp"))
                if message:
                    # Truncate long messages
                    if len(message) > 200:
                        message = message[:200] + "..."
                    lines.append(f"- **{reviewer}** ({ts}): {message}")

    url = change.get("url")
    if url:
        lines.append(f"\n- **URL:** {url}")

    return "\n".join(lines)


def format_change_row(change: dict[str, Any]) -> str:
    """Format a Gerrit change as a compact markdown entry for lists.

    Args:
        change: Change dictionary from Gerrit query JSON.

    Returns:
        Markdown-formatted string.
    """
    number = change.get("number", "?")
    subject = change.get("subject", "(No subject)")
    status = change.get("status", "UNKNOWN")
    owner = _get_owner(change.get("owner"))
    project = change.get("project", "")
    created = format_timestamp(change.get("createdOn"))

    lines = [
        f"### Change {number}: {subject}",
        f"- **Status:** {status}",
        f"- **Owner:** {owner}",
    ]
    if project:
        lines.append(f"- **Project:** {project}")
    lines.append(f"- **Created:** {created}")
    return "\n".join(lines)


def format_project_row(project_name: str) -> str:
    """Format a Gerrit project name as a compact markdown entry.

    Args:
        project_name: Project name string.

    Returns:
        Markdown-formatted string.
    """
    return f"### {project_name}"


# ============================================================================
# COMMAND HANDLERS — one per subcommand, return exit code
# ============================================================================


def cmd_check(args: argparse.Namespace) -> int:
    """Verify Gerrit SSH access is working.

    Args:
        args: Parsed arguments with host, port, username.

    Returns:
        Exit code (0 success, 1 error).
    """
    gitreview = _read_gitreview()
    host = args.host or gitreview["host"]
    port = args.port or gitreview["port"]
    username = args.username or gitreview.get("username") or None

    if not host:
        print(
            "Error: No Gerrit host specified. Use --host or create a .gitreview file.",
            file=sys.stderr,
        )
        return 1

    ssh_cmd = _get_ssh_cmd(host, port, username)
    cmd = [*ssh_cmd, "gerrit", "version"]

    result = subprocess.run(cmd, capture_output=True, text=True)
    if result.returncode != 0:
        print("Error: Cannot connect to Gerrit.", file=sys.stderr)
        print(result.stderr.strip(), file=sys.stderr)
        print(f"\nCheck SSH access: ssh -p {port} {host} gerrit version", file=sys.stderr)
        return 1

    print("\u2713 Gerrit SSH access is working")
    output = result.stdout.strip() or result.stderr.strip()
    if output:
        print(f"  {output}")
    return 0


def cmd_changes_list(args: argparse.Namespace) -> int:
    """List open changes.

    Args:
        args: Parsed arguments with host, port, username, project, limit, json flags.

    Returns:
        Exit code.
    """
    gitreview = _read_gitreview()
    host = args.host or gitreview["host"]
    port = args.port or gitreview["port"]
    username = args.username or gitreview.get("username") or None
    project = gitreview.get("project", "")

    if not host:
        print("Error: No Gerrit host specified.", file=sys.stderr)
        return 1

    query = "status:open"
    if project:
        query += f" project:{project}"
    query += f" limit:{args.limit}"

    changes = run_gerrit_query(host, query, port, username)

    if args.json:
        print(json.dumps(changes, indent=2))
    else:
        if not changes:
            print("No open changes found")
        else:
            print(f"## Open Changes\n\nFound {len(changes)} change(s):\n")
            print("\n\n".join(format_change_row(c) for c in changes))
    return 0


def cmd_changes_view(args: argparse.Namespace) -> int:
    """View a single change.

    Args:
        args: Parsed arguments with change number, host, port, username, json flags.

    Returns:
        Exit code.
    """
    gitreview = _read_gitreview()
    host = args.host or gitreview["host"]
    port = args.port or gitreview["port"]
    username = args.username or gitreview.get("username") or None

    if not host:
        print("Error: No Gerrit host specified.", file=sys.stderr)
        return 1

    changes = run_gerrit_query(
        host,
        f"change:{args.number}",
        port,
        username,
        extra_args=["--current-patch-set", "--comments"],
    )

    if args.json:
        print(json.dumps(changes, indent=2))
    else:
        if not changes:
            print(f"Change {args.number} not found")
        else:
            print(format_change_summary(changes[0]))
    return 0


def cmd_changes_search(args: argparse.Namespace) -> int:
    """Search changes with a custom query.

    Args:
        args: Parsed arguments with query, host, port, username, limit, json flags.

    Returns:
        Exit code.
    """
    gitreview = _read_gitreview()
    host = args.host or gitreview["host"]
    port = args.port or gitreview["port"]
    username = args.username or gitreview.get("username") or None

    if not host:
        print("Error: No Gerrit host specified.", file=sys.stderr)
        return 1

    query = f"{args.query} limit:{args.limit}"
    changes = run_gerrit_query(host, query, port, username)

    if args.json:
        print(json.dumps(changes, indent=2))
    else:
        if not changes:
            print("No changes found")
        else:
            print(f"## Search Results\n\nFound {len(changes)} change(s):\n")
            print("\n\n".join(format_change_row(c) for c in changes))
    return 0


def cmd_projects_list(args: argparse.Namespace) -> int:
    """List projects.

    Args:
        args: Parsed arguments with host, port, username, limit, json flags.

    Returns:
        Exit code.
    """
    gitreview = _read_gitreview()
    host = args.host or gitreview["host"]
    port = args.port or gitreview["port"]
    username = args.username or gitreview.get("username") or None

    if not host:
        print("Error: No Gerrit host specified.", file=sys.stderr)
        return 1

    ssh_cmd = _get_ssh_cmd(host, port, username)
    cmd = [*ssh_cmd, "gerrit", "ls-projects", "--format", "json"]

    result = subprocess.run(cmd, capture_output=True, text=True)
    if result.returncode != 0:
        print(f"Error: {result.stderr.strip()}", file=sys.stderr)
        sys.exit(1)

    output = result.stdout.strip()
    if not output:
        if args.json:
            print("{}")
        else:
            print("No projects found")
        return 0

    try:
        projects_data = json.loads(output)
    except json.JSONDecodeError:
        # Fallback: treat as line-delimited project names
        project_names = [line.strip() for line in output.splitlines() if line.strip()]
        if args.json:
            print(json.dumps(project_names, indent=2))
        else:
            if not project_names:
                print("No projects found")
            else:
                items = project_names[: args.limit]
                print(f"## Projects\n\nFound {len(items)} project(s):\n")
                print("\n\n".join(format_project_row(p) for p in items))
        return 0

    if args.json:
        print(json.dumps(projects_data, indent=2))
    else:
        # Gerrit ls-projects --format json returns {name: {id: ...}, ...}
        project_names = sorted(projects_data.keys())[: args.limit]
        if not project_names:
            print("No projects found")
        else:
            print(f"## Projects\n\nFound {len(project_names)} project(s):\n")
            print("\n\n".join(format_project_row(p) for p in project_names))
    return 0


# ============================================================================
# ARGUMENT PARSER
# ============================================================================


def build_parser() -> argparse.ArgumentParser:
    """Build the argument parser with nested subcommands.

    Returns:
        Configured ArgumentParser.
    """
    parser = argparse.ArgumentParser(
        description="Gerrit wrapper for AI agents \u2014 markdown-formatted query output",
        formatter_class=argparse.RawDescriptionHelpFormatter,
    )

    # Global connection args
    parser.add_argument("--host", help="Gerrit server hostname")
    parser.add_argument("--port", default="", help="SSH port (default: 29418)")
    parser.add_argument("--username", help="SSH username")

    subparsers = parser.add_subparsers(dest="command", help="Command to execute")

    # check
    subparsers.add_parser("check", help="Verify Gerrit SSH access")

    # changes
    changes_parser = subparsers.add_parser("changes", help="Change operations")
    changes_sub = changes_parser.add_subparsers(dest="changes_command")

    changes_list = changes_sub.add_parser("list", help="List open changes")
    changes_list.add_argument("--limit", type=int, default=30, help="Max results (default 30)")
    changes_list.add_argument("--json", action="store_true", help="Output raw JSON")

    changes_view = changes_sub.add_parser("view", help="View change details")
    changes_view.add_argument("number", type=int, help="Change number")
    changes_view.add_argument("--json", action="store_true", help="Output raw JSON")

    changes_search = changes_sub.add_parser("search", help="Search changes")
    changes_search.add_argument("query", help="Gerrit query string")
    changes_search.add_argument("--limit", type=int, default=30, help="Max results (default 30)")
    changes_search.add_argument("--json", action="store_true", help="Output raw JSON")

    # projects
    projects_parser = subparsers.add_parser("projects", help="Project operations")
    projects_sub = projects_parser.add_subparsers(dest="projects_command")

    projects_list = projects_sub.add_parser("list", help="List projects")
    projects_list.add_argument("--limit", type=int, default=30, help="Max results (default 30)")
    projects_list.add_argument("--json", action="store_true", help="Output raw JSON")

    return parser


# ============================================================================
# MAIN
# ============================================================================


def main() -> int:
    """Main entry point.

    Returns:
        Exit code.
    """
    parser = build_parser()
    args = parser.parse_args()

    if not args.command:
        parser.print_help()
        return 1

    if args.command == "check":
        return cmd_check(args)
    elif args.command == "changes":
        if not hasattr(args, "changes_command") or not args.changes_command:
            parser.parse_args(["changes", "--help"])
            return 1
        if args.changes_command == "list":
            return cmd_changes_list(args)
        elif args.changes_command == "view":
            return cmd_changes_view(args)
        elif args.changes_command == "search":
            return cmd_changes_search(args)
    elif args.command == "projects":
        if not hasattr(args, "projects_command") or not args.projects_command:
            parser.parse_args(["projects", "--help"])
            return 1
        if args.projects_command == "list":
            return cmd_projects_list(args)

    parser.print_help()
    return 1


if __name__ == "__main__":
    sys.exit(main())

Install with Tessl CLI

npx tessl i odyssey4me/gerrit@0.2.2

scripts

SKILL.md

tile.json