CtrlK
BlogDocsLog inGet started
Tessl Logo

odyssey4me/google-docs

Create and modify Google Docs documents. Read content, insert tables, apply heading styles, and manage formatting. Use when asked to edit a gdoc, write a Google document, update a doc, or format document content.

89

Quality

89%

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Skills
Evals
Files

google-docs.pyscripts/

#!/usr/bin/env python3
"""Google Docs integration skill for AI agents.

This is a self-contained script that provides Google Docs functionality.

Usage:
    python google-docs.py check
    python google-docs.py auth setup --client-id ID --client-secret SECRET
    python google-docs.py documents create --title "My Document"
    python google-docs.py documents get DOCUMENT_ID
    python google-docs.py documents read DOCUMENT_ID
    python google-docs.py content append DOCUMENT_ID --text "Hello World"
    python google-docs.py content insert DOCUMENT_ID --text "Insert this" --index 1
    python google-docs.py content delete DOCUMENT_ID --start-index 1 --end-index 10

Requirements:
    pip install --user google-auth google-auth-oauthlib google-api-python-client keyring pyyaml
"""

from __future__ import annotations

# Standard library imports
import argparse
import contextlib
import json
import os
import sys
from pathlib import Path
from typing import Any

# ============================================================================
# DEPENDENCY CHECKS
# ============================================================================

try:
    from google.auth.transport.requests import Request
    from google.oauth2.credentials import Credentials
    from google_auth_oauthlib.flow import InstalledAppFlow

    GOOGLE_AUTH_AVAILABLE = True
except ImportError:
    GOOGLE_AUTH_AVAILABLE = False

try:
    from googleapiclient.discovery import build
    from googleapiclient.errors import HttpError

    GOOGLE_API_CLIENT_AVAILABLE = True
except ImportError:
    GOOGLE_API_CLIENT_AVAILABLE = False

try:
    import keyring

    KEYRING_AVAILABLE = True
except ImportError:
    KEYRING_AVAILABLE = False

try:
    import yaml

    YAML_AVAILABLE = True
except ImportError:
    YAML_AVAILABLE = False


# ============================================================================
# CONSTANTS
# ============================================================================

SERVICE_NAME = "agent-skills"
CONFIG_DIR = Path.home() / ".config" / "agent-skills"

# Google Docs API scopes - granular scopes for different operations
DOCS_SCOPES_READONLY = ["https://www.googleapis.com/auth/documents.readonly"]
DOCS_SCOPES = ["https://www.googleapis.com/auth/documents"]

# Drive API scope needed for markdown export
DRIVE_SCOPES_READONLY = ["https://www.googleapis.com/auth/drive.readonly"]

# Minimal read-only scope (default)
DOCS_SCOPES_DEFAULT = DOCS_SCOPES_READONLY


# ============================================================================
# KEYRING CREDENTIAL STORAGE
# ============================================================================


def get_credential(key: str) -> str | None:
    """Get a credential from the system keyring.

    Args:
        key: The credential key (e.g., "google-docs-token-json").

    Returns:
        The credential value, or None if not found.
    """
    return keyring.get_password(SERVICE_NAME, key)


def set_credential(key: str, value: str) -> None:
    """Store a credential in the system keyring.

    Args:
        key: The credential key.
        value: The credential value.
    """
    keyring.set_password(SERVICE_NAME, key, value)


def delete_credential(key: str) -> None:
    """Delete a credential from the system keyring.

    Args:
        key: The credential key.
    """
    with contextlib.suppress(keyring.errors.PasswordDeleteError):
        keyring.delete_password(SERVICE_NAME, key)


# ============================================================================
# CONFIGURATION MANAGEMENT
# ============================================================================


def load_config(service: str) -> dict[str, Any] | None:
    """Load configuration from file.

    Args:
        service: Service name.

    Returns:
        Configuration dictionary or None if not found.
    """
    config_file = CONFIG_DIR / f"{service}.yaml"
    if config_file.exists():
        with open(config_file) as f:
            return yaml.safe_load(f)
    return None


def save_config(service: str, config: dict[str, Any]) -> None:
    """Save configuration to file.

    Args:
        service: Service name.
        config: Configuration dictionary.
    """
    CONFIG_DIR.mkdir(parents=True, exist_ok=True)
    config_file = CONFIG_DIR / f"{service}.yaml"
    with open(config_file, "w") as f:
        yaml.safe_dump(config, f, default_flow_style=False)


# ============================================================================
# GOOGLE AUTHENTICATION
# ============================================================================


class AuthenticationError(Exception):
    """Exception raised for authentication errors."""

    pass


def _build_oauth_config(client_id: str, client_secret: str) -> dict[str, Any]:
    """Build OAuth client configuration dict.

    Args:
        client_id: OAuth client ID.
        client_secret: OAuth client secret.

    Returns:
        OAuth client configuration dict.
    """
    return {
        "installed": {
            "client_id": client_id,
            "client_secret": client_secret,
            "auth_uri": "https://accounts.google.com/o/oauth2/auth",
            "token_uri": "https://oauth2.googleapis.com/token",
            "redirect_uris": ["http://localhost"],
        }
    }


def get_oauth_client_config(service: str) -> dict[str, Any]:
    """Get OAuth 2.0 client configuration from config file or environment.

    Priority:
    1. Service-specific config file (~/.config/agent-skills/{service}.yaml)
    2. Service-specific environment variables ({SERVICE}_CLIENT_ID, {SERVICE}_CLIENT_SECRET)
    3. Shared Google config file (~/.config/agent-skills/google.yaml)
    4. Shared environment variables (GOOGLE_CLIENT_ID, GOOGLE_CLIENT_SECRET)

    Args:
        service: Service name (e.g., "google-docs").

    Returns:
        OAuth client configuration dict.

    Raises:
        AuthenticationError: If client configuration is not found.
    """
    # 1. Try service-specific config file first
    config = load_config(service)
    if config and "oauth_client" in config:
        client_id = config["oauth_client"].get("client_id")
        client_secret = config["oauth_client"].get("client_secret")
        if client_id and client_secret:
            return _build_oauth_config(client_id, client_secret)

    # 2. Try service-specific environment variables
    prefix = service.upper().replace("-", "_")
    client_id = os.environ.get(f"{prefix}_CLIENT_ID")
    client_secret = os.environ.get(f"{prefix}_CLIENT_SECRET")
    if client_id and client_secret:
        return _build_oauth_config(client_id, client_secret)

    # 3. Try shared Google config file
    shared_config = load_config("google")
    if shared_config and "oauth_client" in shared_config:
        client_id = shared_config["oauth_client"].get("client_id")
        client_secret = shared_config["oauth_client"].get("client_secret")
        if client_id and client_secret:
            return _build_oauth_config(client_id, client_secret)

    # 4. Try shared environment variables
    client_id = os.environ.get("GOOGLE_CLIENT_ID")
    client_secret = os.environ.get("GOOGLE_CLIENT_SECRET")
    if client_id and client_secret:
        return _build_oauth_config(client_id, client_secret)

    raise AuthenticationError(
        f"OAuth client credentials not found for {service}. "
        f"Options:\n"
        f"  1. Service config: Run python google-docs.py auth setup --client-id YOUR_ID --client-secret YOUR_SECRET\n"
        f"  2. Service env vars: Set GOOGLE_DOCS_CLIENT_ID and GOOGLE_DOCS_CLIENT_SECRET\n"
        f"  3. Shared config: Create ~/.config/agent-skills/google.yaml with oauth_client credentials\n"
        f"  4. Shared env vars: Set GOOGLE_CLIENT_ID and GOOGLE_CLIENT_SECRET"
    )


def _run_oauth_flow(service: str, scopes: list[str]) -> Credentials:
    """Run OAuth browser flow and store resulting token.

    Args:
        service: Service name (e.g., "google-docs").
        scopes: List of OAuth scopes required.

    Returns:
        Valid Google credentials.

    Raises:
        AuthenticationError: If OAuth flow fails.
    """
    client_config = get_oauth_client_config(service)
    flow = InstalledAppFlow.from_client_config(client_config, scopes)
    creds = flow.run_local_server(port=0)  # Opens browser for consent
    # Save token to keyring for future use
    set_credential(f"{service}-token-json", creds.to_json())
    return creds


def get_google_credentials(service: str, scopes: list[str]) -> Credentials:
    """Get Google credentials for human-in-the-loop use cases.

    Priority:
    1. Saved OAuth tokens from keyring - from previous OAuth flow
    2. OAuth 2.0 flow - opens browser for user consent

    Note: Service account authentication is NOT supported - this is
    designed for interactive human use cases only.

    Args:
        service: Service name (e.g., "google-docs").
        scopes: List of OAuth scopes required.

    Returns:
        Valid Google credentials.

    Raises:
        AuthenticationError: If authentication fails.
    """
    # 1. Try keyring-stored OAuth token from previous flow
    token_json = get_credential(f"{service}-token-json")
    if token_json:
        try:
            token_data = json.loads(token_json)
            creds = Credentials.from_authorized_user_info(token_data, scopes)
            if creds and creds.valid:
                # Check if stored token has all requested scopes
                granted = set(token_data.get("scopes", []))
                requested = set(scopes)
                if granted and not requested.issubset(granted):
                    # Merge scopes so user doesn't lose existing access
                    merged = list(granted | requested)
                    print(
                        "Current token lacks required scopes. "
                        "Opening browser for re-authentication...",
                        file=sys.stderr,
                    )
                    delete_credential(f"{service}-token-json")
                    return _run_oauth_flow(service, merged)
                return creds
            # Refresh if expired but has refresh token
            if creds and creds.expired and creds.refresh_token:
                creds.refresh(Request())
                # Save refreshed token
                set_credential(f"{service}-token-json", creds.to_json())
                return creds
        except Exception:
            # Invalid or corrupted token, fall through to OAuth flow
            pass

    # 2. Initiate OAuth flow - human interaction required
    try:
        return _run_oauth_flow(service, scopes)
    except Exception as e:
        raise AuthenticationError(f"OAuth flow failed: {e}") from e


def build_docs_service(scopes: list[str] | None = None):
    """Build and return Google Docs API service.

    Args:
        scopes: List of OAuth scopes to request. Defaults to read-only.

    Returns:
        Google Docs API service object.

    Raises:
        AuthenticationError: If authentication fails.
    """
    if scopes is None:
        scopes = DOCS_SCOPES_DEFAULT
    creds = get_google_credentials("google-docs", scopes)
    return build("docs", "v1", credentials=creds)


def build_drive_service(scopes: list[str] | None = None):
    """Build and return Google Drive API service for export operations.

    Args:
        scopes: List of OAuth scopes to request. Defaults to drive.readonly.

    Returns:
        Google Drive API service object.

    Raises:
        AuthenticationError: If authentication fails.
    """
    if scopes is None:
        scopes = DRIVE_SCOPES_READONLY
    creds = get_google_credentials("google-docs", scopes)
    return build("drive", "v3", credentials=creds)


# ============================================================================
# GOOGLE DOCS API ERROR HANDLING
# ============================================================================


class DocsAPIError(Exception):
    """Exception raised for Google Docs API errors."""

    def __init__(self, message: str, status_code: int | None = None, details: Any = None):
        super().__init__(message)
        self.status_code = status_code
        self.details = details


def handle_api_error(error: HttpError) -> None:
    """Convert Google API HttpError to DocsAPIError.

    Args:
        error: HttpError from Google API.

    Raises:
        DocsAPIError: With appropriate message and status code.
    """
    status_code = error.resp.status
    reason = error.resp.reason
    details = None

    try:
        error_content = json.loads(error.content.decode("utf-8"))
        details = error_content.get("error", {})
        message = details.get("message", reason)
    except Exception:
        message = reason

    # Check for insufficient scope error (403)
    if status_code == 403 and "insufficient" in message.lower():
        scope_help = (
            "\n\nInsufficient OAuth scope. This operation requires additional permissions.\n"
            "To re-authenticate with the required scopes:\n\n"
            "  1. Reset token: python scripts/google-docs.py auth reset\n"
            "  2. Re-run: python scripts/google-docs.py check\n\n"
            "For setup help, see: docs/google-oauth-setup.md\n"
        )
        message = f"{message}{scope_help}"

    raise DocsAPIError(
        f"Google Docs API error: {message} (HTTP {status_code})",
        status_code=status_code,
        details=details,
    )


# ============================================================================
# DOCUMENT OPERATIONS
# ============================================================================


def create_document(service, title: str) -> dict[str, Any]:
    """Create a new blank Google Doc.

    Args:
        service: Google Docs API service object.
        title: Document title.

    Returns:
        Created document dictionary with documentId.

    Raises:
        DocsAPIError: If the API call fails.
    """
    try:
        body = {"title": title}
        doc = service.documents().create(body=body).execute()
        return doc
    except HttpError as e:
        handle_api_error(e)
        return {}  # Unreachable


def get_document(service, document_id: str) -> dict[str, Any]:
    """Get a Google Doc by ID.

    Args:
        service: Google Docs API service object.
        document_id: The document ID.

    Returns:
        Document dictionary with full content and metadata.

    Raises:
        DocsAPIError: If the API call fails.
    """
    try:
        doc = service.documents().get(documentId=document_id).execute()
        return doc
    except HttpError as e:
        handle_api_error(e)
        return {}  # Unreachable


def read_document_content(service, document_id: str) -> str:
    """Extract plain text content from a Google Doc.

    Args:
        service: Google Docs API service object.
        document_id: The document ID.

    Returns:
        Plain text content of the document.

    Raises:
        DocsAPIError: If the API call fails.
    """
    doc = get_document(service, document_id)
    content = doc.get("body", {}).get("content", [])

    text_parts = []
    for element in content:
        if "paragraph" in element:
            para = element["paragraph"]
            for text_element in para.get("elements", []):
                if "textRun" in text_element:
                    text_parts.append(text_element["textRun"].get("content", ""))

    return "".join(text_parts)


def export_document_as_markdown(document_id: str) -> str:
    """Export document as markdown using Google's native export.

    Uses the Drive API to export the document in markdown format.
    This preserves tables, headings, formatting, and structure.

    Args:
        document_id: The Google Docs document ID.

    Returns:
        Markdown content of the document.

    Raises:
        DocsAPIError: If the export fails.
    """
    try:
        service = build_drive_service()
        # Export using Drive API with markdown MIME type
        response = service.files().export(fileId=document_id, mimeType="text/markdown").execute()

        # Response is bytes, decode to string
        if isinstance(response, bytes):
            return response.decode("utf-8")
        return response
    except HttpError as e:
        handle_api_error(e)
        return ""  # Unreachable


def export_document_as_pdf(document_id: str) -> bytes:
    """Export document as PDF using Google's native export.

    Uses the Drive API to export the document in PDF format.

    Args:
        document_id: The Google Docs document ID.

    Returns:
        PDF content as bytes.

    Raises:
        DocsAPIError: If the export fails.
    """
    try:
        service = build_drive_service()
        response = service.files().export(fileId=document_id, mimeType="application/pdf").execute()
        return response
    except HttpError as e:
        handle_api_error(e)
        return b""  # Unreachable


def append_text(service, document_id: str, text: str) -> dict[str, Any]:
    """Append text to the end of a Google Doc.

    Args:
        service: Google Docs API service object.
        document_id: The document ID.
        text: Text to append.

    Returns:
        Response from the batchUpdate API.

    Raises:
        DocsAPIError: If the API call fails.
    """
    try:
        # Get document to find the end index
        doc = get_document(service, document_id)
        end_index = doc.get("body", {}).get("content", [{}])[-1].get("endIndex", 1)

        # Insert text at the end (end_index - 1 because last char is always \n)
        requests = [
            {
                "insertText": {
                    "location": {"index": end_index - 1},
                    "text": text,
                }
            }
        ]

        result = (
            service.documents()
            .batchUpdate(documentId=document_id, body={"requests": requests})
            .execute()
        )
        return result
    except HttpError as e:
        handle_api_error(e)
        return {}  # Unreachable


def insert_text(service, document_id: str, text: str, index: int) -> dict[str, Any]:
    """Insert text at a specific position in a Google Doc.

    Args:
        service: Google Docs API service object.
        document_id: The document ID.
        text: Text to insert.
        index: Position to insert at (0-based).

    Returns:
        Response from the batchUpdate API.

    Raises:
        DocsAPIError: If the API call fails.
    """
    try:
        requests = [
            {
                "insertText": {
                    "location": {"index": index},
                    "text": text,
                }
            }
        ]

        result = (
            service.documents()
            .batchUpdate(documentId=document_id, body={"requests": requests})
            .execute()
        )
        return result
    except HttpError as e:
        handle_api_error(e)
        return {}  # Unreachable


def delete_content(service, document_id: str, start_index: int, end_index: int) -> dict[str, Any]:
    """Delete a range of content from a Google Doc.

    Args:
        service: Google Docs API service object.
        document_id: The document ID.
        start_index: Start position (inclusive).
        end_index: End position (exclusive).

    Returns:
        Response from the batchUpdate API.

    Raises:
        DocsAPIError: If the API call fails.
    """
    try:
        requests = [
            {
                "deleteContentRange": {
                    "range": {
                        "startIndex": start_index,
                        "endIndex": end_index,
                    }
                }
            }
        ]

        result = (
            service.documents()
            .batchUpdate(documentId=document_id, body={"requests": requests})
            .execute()
        )
        return result
    except HttpError as e:
        handle_api_error(e)
        return {}  # Unreachable


def apply_formatting(
    service,
    document_id: str,
    start_index: int,
    end_index: int,
    bold: bool | None = None,
    italic: bool | None = None,
    underline: bool | None = None,
    font_size: int | None = None,
) -> dict[str, Any]:
    """Apply text formatting to a range in a Google Doc.

    Args:
        service: Google Docs API service object.
        document_id: The document ID.
        start_index: Start position (inclusive).
        end_index: End position (exclusive).
        bold: Apply bold formatting.
        italic: Apply italic formatting.
        underline: Apply underline formatting.
        font_size: Font size in points.

    Returns:
        Response from the batchUpdate API.

    Raises:
        DocsAPIError: If the API call fails.
    """
    try:
        text_style = {}
        fields = []

        if bold is not None:
            text_style["bold"] = bold
            fields.append("bold")
        if italic is not None:
            text_style["italic"] = italic
            fields.append("italic")
        if underline is not None:
            text_style["underline"] = underline
            fields.append("underline")
        if font_size is not None:
            text_style["fontSize"] = {"magnitude": font_size, "unit": "PT"}
            fields.append("fontSize")

        if not fields:
            return {}

        requests = [
            {
                "updateTextStyle": {
                    "range": {
                        "startIndex": start_index,
                        "endIndex": end_index,
                    },
                    "textStyle": text_style,
                    "fields": ",".join(fields),
                }
            }
        ]

        result = (
            service.documents()
            .batchUpdate(documentId=document_id, body={"requests": requests})
            .execute()
        )
        return result
    except HttpError as e:
        handle_api_error(e)
        return {}  # Unreachable


# ============================================================================
# MARKDOWN PARSING
# ============================================================================


def _parse_inline(text: str) -> list[dict]:
    """Parse inline markdown formatting within a single line.

    Handles **bold** and [text](url) patterns.

    Args:
        text: A single line of text with possible inline formatting.

    Returns:
        List of run dicts with keys: text, bold, link.
    """
    runs: list[dict] = []
    i = 0
    n = len(text)

    while i < n:
        # Check for **bold**
        if text[i : i + 2] == "**":
            end = text.find("**", i + 2)
            if end != -1:
                bold_text = text[i + 2 : end]
                if bold_text:
                    runs.append({"text": bold_text, "bold": True, "link": None})
                i = end + 2
                continue

        # Check for [text](url)
        if text[i] == "[":
            bracket_end = text.find("]", i + 1)
            if bracket_end != -1 and bracket_end + 1 < n and text[bracket_end + 1] == "(":
                paren_end = text.find(")", bracket_end + 2)
                if paren_end != -1:
                    link_text = text[i + 1 : bracket_end]
                    link_url = text[bracket_end + 2 : paren_end]
                    if link_text:
                        runs.append({"text": link_text, "bold": False, "link": link_url})
                    i = paren_end + 1
                    continue

        # Collect plain text until next special character
        j = i + 1
        while j < n:
            if text[j : j + 2] == "**":
                break
            if text[j] == "[":
                break
            j += 1
        runs.append({"text": text[i:j], "bold": False, "link": None})
        i = j

    return runs


def parse_markdown(text: str) -> list[dict]:
    """Parse markdown text into structured elements.

    Supported syntax:
    - Headings: ## Heading text
    - Bullets: - item or * item (indent for nesting)
    - Bold: **text**
    - Links: [text](url)
    - Paragraphs: any other non-blank line

    Args:
        text: Markdown-formatted text.

    Returns:
        List of element dicts with type-specific fields.
    """
    import re

    elements: list[dict] = []

    for line in text.split("\n"):
        # Skip blank lines
        if not line.strip():
            continue

        # Headings
        heading_match = re.match(r"^(#{1,6})\s+(.*)", line)
        if heading_match:
            level = len(heading_match.group(1))
            runs = _parse_inline(heading_match.group(2))
            elements.append({"type": "heading", "level": level, "runs": runs})
            continue

        # Bullets
        bullet_match = re.match(r"^(\s*)[-*]\s+(.*)", line)
        if bullet_match:
            indent = len(bullet_match.group(1))
            level = indent // 2
            runs = _parse_inline(bullet_match.group(2))
            elements.append({"type": "bullet", "level": level, "runs": runs})
            continue

        # Paragraph
        runs = _parse_inline(line)
        elements.append({"type": "paragraph", "runs": runs})

    return elements


# ============================================================================
# ANCHOR FINDING
# ============================================================================


def find_anchor_index(
    doc: dict, anchor_type: str, anchor_value: str | None = None, occurrence: int = 1
) -> int:
    """Find the endIndex of a structural anchor in a Google Doc.

    Args:
        doc: Full document dict from Google Docs API.
        anchor_type: One of 'horizontal_rule', 'heading', 'bookmark'.
        anchor_value: For heading: text to match (case-insensitive).
            For bookmark: bookmark ID. For horizontal_rule: ignored.
        occurrence: Which occurrence to match (1-based, default 1).

    Returns:
        The endIndex of the matched anchor element.

    Raises:
        DocsAPIError: If the anchor is not found.
    """
    content = doc.get("body", {}).get("content", [])
    count = 0

    if anchor_type == "horizontal_rule":
        for element in content:
            if "paragraph" in element:
                for para_element in element["paragraph"].get("elements", []):
                    if "horizontalRule" in para_element:
                        count += 1
                        if count == occurrence:
                            return element.get("endIndex", 0)

    elif anchor_type == "heading":
        if not anchor_value:
            raise DocsAPIError("anchor_value is required for heading anchor type")
        for element in content:
            if "paragraph" in element:
                para = element["paragraph"]
                style_type = para.get("paragraphStyle", {}).get("namedStyleType", "")
                if style_type.startswith("HEADING_"):
                    # Extract text from paragraph
                    para_text = ""
                    for text_element in para.get("elements", []):
                        if "textRun" in text_element:
                            para_text += text_element["textRun"].get("content", "")
                    para_text = para_text.strip()
                    if para_text.lower() == anchor_value.lower():
                        count += 1
                        if count == occurrence:
                            return element.get("endIndex", 0)

    elif anchor_type == "bookmark":
        if not anchor_value:
            raise DocsAPIError("anchor_value is required for bookmark anchor type")
        for element in content:
            if "paragraph" in element:
                for para_element in element["paragraph"].get("elements", []):
                    if "bookmarkId" in para_element and para_element["bookmarkId"] == anchor_value:
                        count += 1
                        if count == occurrence:
                            return element.get("endIndex", 0)

    else:
        raise DocsAPIError(f"Unknown anchor type: {anchor_type}")

    raise DocsAPIError(
        f"Anchor not found: type={anchor_type}, value={anchor_value}, occurrence={occurrence}"
    )


# ============================================================================
# MARKDOWN TO DOCS REQUESTS
# ============================================================================


def build_insert_requests(parsed_elements: list[dict], insert_index: int) -> list[dict]:
    """Convert parsed markdown elements to Google Docs API batchUpdate requests.

    Strategy: insert all text as a single insertText request, then apply
    formatting to known ranges. This avoids shifting-index complexity.

    Args:
        parsed_elements: Output from parse_markdown().
        insert_index: Document index to insert at.

    Returns:
        List of Google Docs API request dicts for batchUpdate.
    """
    if not parsed_elements:
        return []

    # Phase 1: Assemble full text and track element/run offsets
    full_text = ""
    element_offsets: list[dict] = []

    for elem in parsed_elements:
        elem_start = len(full_text)
        run_offsets: list[dict] = []

        # Add tab prefixes for nested bullets
        if elem["type"] == "bullet" and elem.get("level", 0) > 0:
            full_text += "\t" * elem["level"]

        for run in elem["runs"]:
            run_start = len(full_text)
            full_text += run["text"]
            run_offsets.append(
                {
                    "start": run_start,
                    "end": len(full_text),
                    "bold": run.get("bold", False),
                    "link": run.get("link"),
                }
            )

        full_text += "\n"
        element_offsets.append(
            {
                "type": elem["type"],
                "level": elem.get("level", 0),
                "start": elem_start,
                "end": len(full_text),
                "run_offsets": run_offsets,
            }
        )

    # Phase 2: Build requests
    requests: list[dict] = []

    # Single insertText request
    requests.append(
        {
            "insertText": {
                "location": {"index": insert_index},
                "text": full_text,
            }
        }
    )

    # Heading styles
    for elem_info in element_offsets:
        if elem_info["type"] == "heading":
            requests.append(
                {
                    "updateParagraphStyle": {
                        "range": {
                            "startIndex": insert_index + elem_info["start"],
                            "endIndex": insert_index + elem_info["end"],
                        },
                        "paragraphStyle": {
                            "namedStyleType": f"HEADING_{elem_info['level']}",
                        },
                        "fields": "namedStyleType",
                    }
                }
            )

    # Bullet ranges - find consecutive bullet elements and apply as ranges
    i = 0
    while i < len(element_offsets):
        if element_offsets[i]["type"] == "bullet":
            # Find the end of consecutive bullets
            j = i
            while j < len(element_offsets) and element_offsets[j]["type"] == "bullet":
                j += 1
            # Apply bullet preset to the range
            requests.append(
                {
                    "createParagraphBullets": {
                        "range": {
                            "startIndex": insert_index + element_offsets[i]["start"],
                            "endIndex": insert_index + element_offsets[j - 1]["end"],
                        },
                        "bulletPreset": "BULLET_DISC_CIRCLE_SQUARE",
                    }
                }
            )
            i = j
        else:
            i += 1

    # Text style formatting (bold and links)
    for elem_info in element_offsets:
        for run_info in elem_info["run_offsets"]:
            if run_info["bold"]:
                requests.append(
                    {
                        "updateTextStyle": {
                            "range": {
                                "startIndex": insert_index + run_info["start"],
                                "endIndex": insert_index + run_info["end"],
                            },
                            "textStyle": {"bold": True},
                            "fields": "bold",
                        }
                    }
                )
            if run_info["link"]:
                requests.append(
                    {
                        "updateTextStyle": {
                            "range": {
                                "startIndex": insert_index + run_info["start"],
                                "endIndex": insert_index + run_info["end"],
                            },
                            "textStyle": {"link": {"url": run_info["link"]}},
                            "fields": "link",
                        }
                    }
                )

    return requests


def insert_after_anchor(
    service,
    document_id: str,
    anchor_type: str,
    anchor_value: str | None,
    markdown: str,
) -> dict[str, Any]:
    """Insert markdown-formatted content after a structural anchor.

    Args:
        service: Google Docs API service object.
        document_id: The document ID.
        anchor_type: One of 'horizontal_rule', 'heading', 'bookmark'.
        anchor_value: Anchor-specific value (heading text, bookmark ID, or occurrence).
        markdown: Markdown-formatted content to insert.

    Returns:
        Response from the batchUpdate API.

    Raises:
        DocsAPIError: If the anchor is not found or the API call fails.
    """
    doc = get_document(service, document_id)

    # Determine occurrence for horizontal_rule
    occurrence = 1
    if anchor_type == "horizontal_rule" and anchor_value:
        occurrence = int(anchor_value)
        anchor_value = None

    insert_index = find_anchor_index(doc, anchor_type, anchor_value, occurrence)
    parsed = parse_markdown(markdown)
    requests = build_insert_requests(parsed, insert_index)

    if not requests:
        return {}

    try:
        result = (
            service.documents()
            .batchUpdate(documentId=document_id, body={"requests": requests})
            .execute()
        )
        return result
    except HttpError as e:
        handle_api_error(e)
        return {}  # Unreachable


# ============================================================================
# OUTPUT FORMATTING
# ============================================================================


def format_document_summary(doc: dict[str, Any]) -> str:
    """Format a document for display.

    Args:
        doc: Document dictionary from Google Docs API.

    Returns:
        Formatted string.
    """
    title = doc.get("title", "(Untitled)")
    doc_id = doc.get("documentId", "(Unknown)")

    # Count content elements
    content = doc.get("body", {}).get("content", [])
    char_count = 0
    for element in content:
        if "paragraph" in element:
            for text_element in element["paragraph"].get("elements", []):
                if "textRun" in text_element:
                    char_count += len(text_element["textRun"].get("content", ""))

    return (
        f"### {title}\n"
        f"- **Document ID:** {doc_id}\n"
        f"- **Characters:** {char_count}\n"
        f"- **Revision ID:** {doc.get('revisionId', 'N/A')}"
    )


# ============================================================================
# HEALTH CHECK
# ============================================================================


def check_docs_connectivity() -> dict[str, Any]:
    """Check Google Docs API connectivity and authentication.

    Returns:
        Dictionary with status information including available scopes.
    """
    result = {
        "authenticated": False,
        "scopes": None,
        "error": None,
    }

    try:
        # Get credentials to check scopes
        creds = get_google_credentials("google-docs", DOCS_SCOPES_DEFAULT)

        # Check which scopes are available
        available_scopes = []
        if hasattr(creds, "scopes"):
            available_scopes = creds.scopes
        elif hasattr(creds, "_scopes"):
            available_scopes = creds._scopes

        # Build service - if this works, we're authenticated
        service = build("docs", "v1", credentials=creds)

        # Try a simple API call to verify connectivity
        # We'll create a test document and immediately get it to verify
        test_doc = service.documents().create(body={"title": "_test_connectivity"}).execute()
        test_doc_id = test_doc.get("documentId")

        result["authenticated"] = True
        result["test_document_id"] = test_doc_id
        result["scopes"] = {
            "readonly": any("documents.readonly" in s for s in available_scopes),
            "write": any("documents" in s and "readonly" not in s for s in available_scopes),
            "all_scopes": available_scopes,
        }
    except Exception as e:
        result["error"] = str(e)

    return result


# ============================================================================
# CLI COMMAND HANDLERS
# ============================================================================


def cmd_check(_args):
    """Handle 'check' command."""
    print("Checking Google Docs connectivity...")
    result = check_docs_connectivity()

    if result["authenticated"]:
        print("✓ Successfully authenticated to Google Docs")

        # Display scope information
        scopes = result.get("scopes", {})
        if scopes:
            print("\nGranted OAuth Scopes:")
            print(f"  Read-only (documents.readonly): {'✓' if scopes.get('readonly') else '✗'}")
            print(f"  Write (documents):               {'✓' if scopes.get('write') else '✗'}")

            # Check if write scope is granted
            if not scopes.get("write"):
                print("\n⚠️  Write scope not granted. Some operations will fail.")
                print("   To grant full access, reset and re-authenticate:")
                print()
                print("   1. Reset token: python scripts/google-docs.py auth reset")
                print("   2. Re-run: python scripts/google-docs.py check")
                print()
                print("   See: docs/google-oauth-setup.md")

        print(f"\nTest document created: {result.get('test_document_id')}")
        print("(You can delete this test document from Google Drive)")
        return 0
    else:
        print(f"✗ Authentication failed: {result['error']}")
        print()
        print("Setup instructions:")
        print()
        print("  1. Set up a GCP project with OAuth credentials:")
        print("     See: docs/gcp-project-setup.md")
        print()
        print("  2. Configure your credentials:")
        print("     Create ~/.config/agent-skills/google.yaml:")
        print()
        print("     oauth_client:")
        print("       client_id: YOUR_CLIENT_ID.apps.googleusercontent.com")
        print("       client_secret: YOUR_CLIENT_SECRET")
        print()
        print("  3. Run check again to trigger OAuth flow:")
        print("     python scripts/google-docs.py check")
        print()
        print("For detailed setup instructions, see: docs/google-oauth-setup.md")
        return 1


def cmd_auth_setup(args):
    """Handle 'auth setup' command."""
    if not args.client_id or not args.client_secret:
        print("Error: Both --client-id and --client-secret are required", file=sys.stderr)
        return 1

    config = load_config("google-docs") or {}
    config["oauth_client"] = {
        "client_id": args.client_id,
        "client_secret": args.client_secret,
    }
    save_config("google-docs", config)
    print("✓ OAuth client credentials saved to config file")
    print(f"  Config location: {CONFIG_DIR / 'google-docs.yaml'}")
    print("\nNext step: Run any Google Docs command to initiate OAuth flow")
    return 0


def cmd_auth_reset(_args):
    """Handle 'auth reset' command."""
    delete_credential("google-docs-token-json")
    print("OAuth token cleared. Next command will trigger re-authentication.")
    return 0


def cmd_auth_status(_args):
    """Handle 'auth status' command."""
    token_json = get_credential("google-docs-token-json")
    if not token_json:
        print("No OAuth token stored.")
        return 1

    try:
        token_data = json.loads(token_json)
    except json.JSONDecodeError:
        print("Stored token is corrupted.")
        return 1

    print("OAuth token is stored.")

    # Granted scopes
    scopes = token_data.get("scopes", [])
    if scopes:
        print("\nGranted scopes:")
        for scope in scopes:
            print(f"  - {scope}")
    else:
        print("\nGranted scopes: (unknown - legacy token)")

    # Refresh token
    has_refresh = bool(token_data.get("refresh_token"))
    print(f"\nRefresh token: {'present' if has_refresh else 'missing'}")

    # Expiry
    expiry = token_data.get("expiry")
    if expiry:
        print(f"Token expiry: {expiry}")

    # Client ID (truncated)
    client_id = token_data.get("client_id", "")
    if client_id:
        truncated = client_id[:16] + "..." if len(client_id) > 16 else client_id
        print(f"Client ID: {truncated}")

    return 0


def cmd_documents_create(args):
    """Handle 'documents create' command."""
    service = build_docs_service(DOCS_SCOPES)
    doc = create_document(service, args.title)

    if args.json:
        print(json.dumps(doc, indent=2))
    else:
        print("✓ Document created successfully")
        print(f"  Title: {doc.get('title')}")
        print(f"  Document ID: {doc.get('documentId')}")
        print(f"  URL: https://docs.google.com/document/d/{doc.get('documentId')}/edit")

    return 0


def cmd_documents_get(args):
    """Handle 'documents get' command."""
    service = build_docs_service(DOCS_SCOPES_READONLY)
    doc = get_document(service, args.document_id)

    if args.json:
        print(json.dumps(doc, indent=2))
    else:
        print(format_document_summary(doc))

    return 0


def cmd_documents_read(args):
    """Handle 'documents read' command."""
    if args.format == "pdf":
        content = export_document_as_pdf(args.document_id)
        output_file = args.output or f"{args.document_id}.pdf"
        with open(output_file, "wb") as f:
            f.write(content)
        print(f"PDF saved to: {output_file}")
        return 0
    elif args.format == "markdown":
        content = export_document_as_markdown(args.document_id)
    else:
        service = build_docs_service(DOCS_SCOPES_READONLY)
        content = read_document_content(service, args.document_id)

    if args.json:
        print(json.dumps({"content": content}, indent=2))
    else:
        print(content)

    return 0


def cmd_content_append(args):
    """Handle 'content append' command."""
    service = build_docs_service(DOCS_SCOPES)
    result = append_text(service, args.document_id, args.text)

    if args.json:
        print(json.dumps(result, indent=2))
    else:
        print("✓ Text appended successfully")

    return 0


def cmd_content_insert(args):
    """Handle 'content insert' command."""
    service = build_docs_service(DOCS_SCOPES)
    result = insert_text(service, args.document_id, args.text, args.index)

    if args.json:
        print(json.dumps(result, indent=2))
    else:
        print("✓ Text inserted successfully")

    return 0


def cmd_content_delete(args):
    """Handle 'content delete' command."""
    service = build_docs_service(DOCS_SCOPES)
    result = delete_content(service, args.document_id, args.start_index, args.end_index)

    if args.json:
        print(json.dumps(result, indent=2))
    else:
        print("✓ Content deleted successfully")

    return 0


def cmd_content_insert_after_anchor(args):
    """Handle 'content insert-after-anchor' command."""
    service = build_docs_service(DOCS_SCOPES)
    markdown = args.markdown.replace("\\n", "\n")
    result = insert_after_anchor(
        service, args.document_id, args.anchor_type, args.anchor_value, markdown
    )

    if args.json:
        print(json.dumps(result, indent=2))
    else:
        print("✓ Content inserted after anchor successfully")

    return 0


def cmd_formatting_apply(args):
    """Handle 'formatting apply' command."""
    service = build_docs_service(DOCS_SCOPES)
    result = apply_formatting(
        service,
        args.document_id,
        args.start_index,
        args.end_index,
        bold=args.bold,
        italic=args.italic,
        underline=args.underline,
        font_size=args.font_size,
    )

    if args.json:
        print(json.dumps(result, indent=2))
    else:
        print("✓ Formatting applied successfully")

    return 0


# ============================================================================
# CLI ARGUMENT PARSER
# ============================================================================


def build_parser() -> argparse.ArgumentParser:
    """Build the argument parser."""
    parser = argparse.ArgumentParser(
        description="Google Docs integration for AI agents",
        formatter_class=argparse.RawDescriptionHelpFormatter,
    )

    subparsers = parser.add_subparsers(dest="command", help="Command to execute")

    # check command
    subparsers.add_parser("check", help="Check Google Docs connectivity and authentication")

    # auth commands
    auth_parser = subparsers.add_parser("auth", help="Authentication management")
    auth_subparsers = auth_parser.add_subparsers(dest="auth_command")

    setup_parser = auth_subparsers.add_parser("setup", help="Setup OAuth client credentials")
    setup_parser.add_argument("--client-id", required=True, help="OAuth client ID")
    setup_parser.add_argument("--client-secret", required=True, help="OAuth client secret")

    auth_subparsers.add_parser("reset", help="Clear stored OAuth token")
    auth_subparsers.add_parser("status", help="Show current token info")

    # documents commands
    documents_parser = subparsers.add_parser("documents", help="Document operations")
    documents_subparsers = documents_parser.add_subparsers(dest="documents_command")

    create_parser = documents_subparsers.add_parser("create", help="Create a new document")
    create_parser.add_argument("--title", required=True, help="Document title")
    create_parser.add_argument("--json", action="store_true", help="Output as JSON")

    get_parser = documents_subparsers.add_parser("get", help="Get document metadata")
    get_parser.add_argument("document_id", help="Document ID")
    get_parser.add_argument("--json", action="store_true", help="Output as JSON")

    read_parser = documents_subparsers.add_parser("read", help="Read document content as text")
    read_parser.add_argument("document_id", help="Document ID")
    read_parser.add_argument(
        "--format",
        choices=["text", "markdown", "pdf"],
        default="markdown",
        help="Output format: markdown (default, preserves tables and headings), text (plain text), or pdf",
    )
    read_parser.add_argument(
        "--output",
        "-o",
        help="Output file path (used with pdf format)",
    )
    read_parser.add_argument("--json", action="store_true", help="Output as JSON")

    # content commands
    content_parser = subparsers.add_parser("content", help="Content operations")
    content_subparsers = content_parser.add_subparsers(dest="content_command")

    append_parser = content_subparsers.add_parser("append", help="Append text to document")
    append_parser.add_argument("document_id", help="Document ID")
    append_parser.add_argument("--text", required=True, help="Text to append")
    append_parser.add_argument("--json", action="store_true", help="Output as JSON")

    insert_parser = content_subparsers.add_parser("insert", help="Insert text at position")
    insert_parser.add_argument("document_id", help="Document ID")
    insert_parser.add_argument("--text", required=True, help="Text to insert")
    insert_parser.add_argument("--index", type=int, required=True, help="Position to insert at")
    insert_parser.add_argument("--json", action="store_true", help="Output as JSON")

    delete_parser = content_subparsers.add_parser("delete", help="Delete content range")
    delete_parser.add_argument("document_id", help="Document ID")
    delete_parser.add_argument("--start-index", type=int, required=True, help="Start position")
    delete_parser.add_argument("--end-index", type=int, required=True, help="End position")
    delete_parser.add_argument("--json", action="store_true", help="Output as JSON")

    insert_anchor_parser = content_subparsers.add_parser(
        "insert-after-anchor", help="Insert markdown content after a structural anchor"
    )
    insert_anchor_parser.add_argument("document_id", help="Document ID")
    insert_anchor_parser.add_argument(
        "--anchor-type",
        required=True,
        choices=["horizontal_rule", "heading", "bookmark"],
        help="Type of anchor to find",
    )
    insert_anchor_parser.add_argument(
        "--anchor-value",
        help="Heading text, bookmark ID, or occurrence number for horizontal_rule",
    )
    insert_anchor_parser.add_argument(
        "--markdown", required=True, help="Markdown-formatted content to insert"
    )
    insert_anchor_parser.add_argument("--json", action="store_true", help="Output as JSON")

    # formatting commands
    formatting_parser = subparsers.add_parser("formatting", help="Formatting operations")
    formatting_subparsers = formatting_parser.add_subparsers(dest="formatting_command")

    apply_parser = formatting_subparsers.add_parser("apply", help="Apply text formatting")
    apply_parser.add_argument("document_id", help="Document ID")
    apply_parser.add_argument("--start-index", type=int, required=True, help="Start position")
    apply_parser.add_argument("--end-index", type=int, required=True, help="End position")
    apply_parser.add_argument("--bold", action="store_true", help="Apply bold")
    apply_parser.add_argument("--italic", action="store_true", help="Apply italic")
    apply_parser.add_argument("--underline", action="store_true", help="Apply underline")
    apply_parser.add_argument("--font-size", type=int, help="Font size in points")
    apply_parser.add_argument("--json", action="store_true", help="Output as JSON")

    return parser


# ============================================================================
# MAIN
# ============================================================================


def main():
    """Main entry point."""
    # Check dependencies first (allows --help to work even if deps missing)
    parser = build_parser()
    args = parser.parse_args()

    # Now check dependencies if not just showing help
    if not GOOGLE_AUTH_AVAILABLE:
        print(
            "Error: Google auth libraries not found. Install with: "
            "pip install --user google-auth google-auth-oauthlib",
            file=sys.stderr,
        )
        return 1

    if not GOOGLE_API_CLIENT_AVAILABLE:
        print(
            "Error: 'google-api-python-client' not found. Install with: "
            "pip install --user google-api-python-client",
            file=sys.stderr,
        )
        return 1

    if not KEYRING_AVAILABLE:
        print(
            "Error: 'keyring' library not found. Install with: pip install --user keyring",
            file=sys.stderr,
        )
        return 1

    if not YAML_AVAILABLE:
        print(
            "Error: 'pyyaml' library not found. Install with: pip install --user pyyaml",
            file=sys.stderr,
        )
        return 1

    if not args.command:
        parser.print_help()
        return 1

    try:
        # Route to command handlers
        if args.command == "check":
            return cmd_check(args)
        elif args.command == "auth":
            if args.auth_command == "setup":
                return cmd_auth_setup(args)
            elif args.auth_command == "reset":
                return cmd_auth_reset(args)
            elif args.auth_command == "status":
                return cmd_auth_status(args)
        elif args.command == "documents":
            if args.documents_command == "create":
                return cmd_documents_create(args)
            elif args.documents_command == "get":
                return cmd_documents_get(args)
            elif args.documents_command == "read":
                return cmd_documents_read(args)
        elif args.command == "content":
            if args.content_command == "append":
                return cmd_content_append(args)
            elif args.content_command == "insert":
                return cmd_content_insert(args)
            elif args.content_command == "delete":
                return cmd_content_delete(args)
            elif args.content_command == "insert-after-anchor":
                return cmd_content_insert_after_anchor(args)
        elif args.command == "formatting" and args.formatting_command == "apply":
            return cmd_formatting_apply(args)

        parser.print_help()
        return 1

    except (DocsAPIError, AuthenticationError) as e:
        print(f"Error: {e}", file=sys.stderr)
        return 1
    except KeyboardInterrupt:
        print("\nInterrupted", file=sys.stderr)
        return 130
    except Exception as e:
        print(f"Unexpected error: {e}", file=sys.stderr)
        return 1


if __name__ == "__main__":
    sys.exit(main())

Install with Tessl CLI

npx tessl i odyssey4me/google-docs@0.2.1

SKILL.md

tile.json