CtrlK
BlogDocsLog inGet started
Tessl Logo

dirien/pulumi-skills

Pulumi infrastructure-as-code skills for Claude Code with ESC, OIDC, and cloud provider best practices.

99

Quality

99%

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Skills
Evals
Files

neo_task.pypulumi-neo/scripts/

#!/usr/bin/env python3
"""
Pulumi Neo Task Manager

Create and manage Pulumi Neo agent tasks via the REST API.
Requires PULUMI_ACCESS_TOKEN environment variable.
"""

import argparse
import json
import os
import subprocess
import sys
import time
from datetime import datetime, timezone
from typing import Optional

try:
    import requests
except ImportError:
    print("Error: requests library required. Install with: pip install requests")
    sys.exit(1)


BASE_URL = "https://api.pulumi.com/api/preview/agents"


def get_headers() -> dict:
    """Get required API headers with authentication."""
    token = os.environ.get("PULUMI_ACCESS_TOKEN")
    if not token:
        print("Error: PULUMI_ACCESS_TOKEN environment variable not set")
        print("Set it with: export PULUMI_ACCESS_TOKEN=<your-token>")
        sys.exit(1)

    return {
        "Authorization": f"token {token}",
        "Accept": "application/vnd.pulumi+8",
        "Content-Type": "application/json",
    }


def get_default_org() -> Optional[str]:
    """Get default organization from Pulumi CLI."""
    try:
        result = subprocess.run(
            ["pulumi", "org", "get-default"],
            capture_output=True,
            text=True,
            timeout=10,
        )
        if result.returncode == 0 and result.stdout.strip():
            org = result.stdout.strip()
            # Check if it looks like a valid cloud org (not a local path)
            if not org.startswith("/") and not org.startswith("file://"):
                return org
    except (subprocess.TimeoutExpired, FileNotFoundError):
        pass
    return None


def create_task(
    org: str,
    message: str,
    stack_name: Optional[str] = None,
    stack_project: Optional[str] = None,
    repo_name: Optional[str] = None,
    repo_org: Optional[str] = None,
    repo_forge: str = "github",
) -> str:
    """Create a new Neo task and return the task ID."""
    url = f"{BASE_URL}/{org}/tasks"

    entities_to_add = []

    if stack_name and stack_project:
        entities_to_add.append({
            "type": "stack",
            "name": stack_name,
            "project": stack_project,
        })

    if repo_name and repo_org:
        entities_to_add.append({
            "type": "repository",
            "name": repo_name,
            "org": repo_org,
            "forge": repo_forge,
        })

    payload = {
        "message": {
            "type": "user_message",
            "content": message,
            "timestamp": datetime.now(timezone.utc).isoformat(),
            "entity_diff": {
                "add": entities_to_add,
                "remove": [],
            },
        }
    }

    response = requests.post(url, headers=get_headers(), json=payload)

    if response.status_code == 201:
        data = response.json()
        return data["taskId"]
    else:
        print(f"Error creating task: {response.status_code}")
        print(response.text)
        sys.exit(1)


def get_task(org: str, task_id: str) -> dict:
    """Get task metadata."""
    url = f"{BASE_URL}/{org}/tasks/{task_id}"
    response = requests.get(url, headers=get_headers())

    if response.status_code == 200:
        return response.json()
    else:
        print(f"Error getting task: {response.status_code}")
        print(response.text)
        sys.exit(1)


def list_tasks(org: str, page_size: int = 20) -> list:
    """List all tasks for an organization."""
    url = f"{BASE_URL}/{org}/tasks"
    params = {"pageSize": page_size}
    response = requests.get(url, headers=get_headers(), params=params)

    if response.status_code == 200:
        return response.json().get("tasks", [])
    else:
        print(f"Error listing tasks: {response.status_code}")
        print(response.text)
        sys.exit(1)


def get_events(
    org: str,
    task_id: str,
    continuation_token: Optional[str] = None,
) -> tuple[list, Optional[str]]:
    """Get task events with pagination."""
    url = f"{BASE_URL}/{org}/tasks/{task_id}/events"
    params = {"pageSize": 100}
    if continuation_token:
        params["continuationToken"] = continuation_token

    response = requests.get(url, headers=get_headers(), params=params)

    if response.status_code == 200:
        data = response.json()
        return data.get("events", []), data.get("continuationToken")
    else:
        print(f"Error getting events: {response.status_code}")
        print(response.text)
        return [], None


def send_approval(org: str, task_id: str, approval_request_id: str) -> bool:
    """Send approval confirmation for a pending request."""
    url = f"{BASE_URL}/{org}/tasks/{task_id}"

    payload = {
        "event": {
            "type": "user_confirmation",
            "approval_request_id": approval_request_id,
            "timestamp": datetime.now(timezone.utc).isoformat(),
        }
    }

    response = requests.post(url, headers=get_headers(), json=payload)
    return response.status_code == 202


def send_cancel(org: str, task_id: str) -> bool:
    """Send cancellation for a pending request."""
    url = f"{BASE_URL}/{org}/tasks/{task_id}"

    payload = {
        "event": {
            "type": "user_cancel",
            "timestamp": datetime.now(timezone.utc).isoformat(),
        }
    }

    response = requests.post(url, headers=get_headers(), json=payload)
    return response.status_code == 202


def send_message(org: str, task_id: str, message: str) -> bool:
    """Send a follow-up message to an existing task."""
    url = f"{BASE_URL}/{org}/tasks/{task_id}"

    payload = {
        "event": {
            "type": "user_message",
            "content": message,
            "timestamp": datetime.now(timezone.utc).isoformat(),
            "entity_diff": {"add": [], "remove": []},
        }
    }

    response = requests.post(url, headers=get_headers(), json=payload)
    return response.status_code == 202


def format_event(event: dict) -> str:
    """Format an event for display."""
    event_type = event.get("type", "unknown")
    body = event.get("eventBody", {})
    inner_type = body.get("type", "")

    if event_type == "agentResponse":
        content = body.get("content", "")
        result = f"\n[Neo] {content}" if content else ""

        # Check for tool calls (including approval requests)
        tool_calls = body.get("tool_calls", [])
        for call in tool_calls:
            func = call.get("function", {})
            func_name = func.get("name", "")
            if func_name == "approval_request":
                try:
                    args = json.loads(func.get("arguments", "{}"))
                    req_id = args.get("approval_request_id", "")
                    desc = args.get("description", "")
                    result += f"\n[Approval Required] {desc}\n  Request ID: {req_id}"
                except json.JSONDecodeError:
                    pass
        return result if result else f"\n[agentResponse:{inner_type}]"
    elif event_type == "userInput":
        content = body.get("content", "")
        return f"\n[You] {content}"
    else:
        return f"\n[{event_type}:{inner_type}] {json.dumps(body, indent=2)}"


def find_pending_approval(events: list) -> Optional[str]:
    """Find pending approval request ID from events."""
    for event in reversed(events):
        if event.get("type") == "agentResponse":
            body = event.get("eventBody", {})
            tool_calls = body.get("tool_calls", [])
            for call in tool_calls:
                func = call.get("function", {})
                if func.get("name") == "approval_request":
                    try:
                        args = json.loads(func.get("arguments", "{}"))
                        return args.get("approval_request_id")
                    except json.JSONDecodeError:
                        pass
    return None


def poll_task(
    org: str,
    task_id: str,
    poll_interval: int = 5,
    max_wait: int = 600,
) -> None:
    """Poll a task for updates until completion or timeout."""
    print(f"Polling task {task_id}...")
    print(f"View in console: https://app.pulumi.com/{org}/neo/{task_id}")
    print("-" * 60)

    seen_events = set()
    all_events = []
    continuation_token = None
    start_time = time.time()
    last_status = None

    while True:
        # Check timeout
        elapsed = time.time() - start_time
        if elapsed > max_wait:
            print(f"\nTimeout after {max_wait} seconds. Task may still be running.")
            print(f"Continue with: python neo_task.py --org {org} --task-id {task_id}")
            break

        # Get task status
        task = get_task(org, task_id)
        status = task.get("status", "unknown")

        # Get new events
        events, new_token = get_events(org, task_id, continuation_token)

        for event in events:
            event_id = event.get("id")
            if event_id and event_id not in seen_events:
                seen_events.add(event_id)
                all_events.append(event)
                print(format_event(event))

        if new_token:
            continuation_token = new_token

        # Check for pending approval (detected from events, not status)
        pending_approval_id = find_pending_approval(all_events)

        # If status is idle, check if we're waiting for approval or truly done
        if status == "idle":
            if pending_approval_id:
                print(f"\n{'='*60}")
                print("Task is waiting for approval.")
                print(f"Approve: python neo_task.py --org {org} --task-id {task_id} --approve")
                print(f"Cancel:  python neo_task.py --org {org} --task-id {task_id} --cancel")
                break
            else:
                # Task finished processing
                print(f"\n{'='*60}")
                print("Task completed (idle)")
                break

        # If status changed from running to idle, task is done
        if last_status == "running" and status == "idle":
            print(f"\n{'='*60}")
            print("Task completed")
            break

        last_status = status
        time.sleep(poll_interval)


def main():
    # Validate token early
    token = os.environ.get("PULUMI_ACCESS_TOKEN")
    if not token:
        print("Error: PULUMI_ACCESS_TOKEN environment variable not set")
        print("Set it with: export PULUMI_ACCESS_TOKEN=<your-token>")
        sys.exit(1)

    parser = argparse.ArgumentParser(
        description="Pulumi Neo Task Manager",
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog="""
Examples:
  # Create a new task (with polling)
  %(prog)s --org myorg --message "Analyze my infrastructure"

  # Create task without polling (returns immediately - best for Claude Code)
  %(prog)s --org myorg --message "Analyze my infrastructure" --no-poll

  # Create task with JSON output (best for programmatic use)
  %(prog)s --org myorg --message "Analyze this" --no-poll --json

  # Create task with stack context
  %(prog)s --org myorg --message "Optimize this stack" \\
    --stack-name prod --stack-project my-infra --no-poll

  # List tasks
  %(prog)s --org myorg --list

  # Get current events for a task (single fetch, no polling)
  %(prog)s --org myorg --task-id TASK_ID --get-events

  # Poll an existing task for updates
  %(prog)s --org myorg --task-id TASK_ID

  # Approve a pending request
  %(prog)s --org myorg --task-id TASK_ID --approve

  # Cancel a pending request
  %(prog)s --org myorg --task-id TASK_ID --cancel
""",
    )

    parser.add_argument(
        "--org",
        help="Pulumi organization name (auto-detected if not specified)",
    )
    parser.add_argument(
        "--message", "-m",
        help="Message to send to Neo (creates new task if no task-id)",
    )
    parser.add_argument(
        "--task-id", "-t",
        help="Existing task ID to poll or respond to",
    )
    parser.add_argument(
        "--list", "-l",
        action="store_true",
        help="List existing tasks",
    )
    parser.add_argument(
        "--approve",
        action="store_true",
        help="Approve a pending request",
    )
    parser.add_argument(
        "--cancel",
        action="store_true",
        help="Cancel a pending request",
    )
    parser.add_argument(
        "--approval-id",
        help="Specific approval request ID (auto-detected from last event if not specified)",
    )

    # Entity context options
    parser.add_argument(
        "--stack-name",
        help="Stack name for context",
    )
    parser.add_argument(
        "--stack-project",
        help="Stack project for context (required with --stack-name)",
    )
    parser.add_argument(
        "--repo-name",
        help="Repository name for context",
    )
    parser.add_argument(
        "--repo-org",
        help="Repository organization for context (required with --repo-name)",
    )
    parser.add_argument(
        "--repo-forge",
        default="github",
        choices=["github", "gitlab", "bitbucket"],
        help="Repository forge (default: github)",
    )

    # Polling options
    parser.add_argument(
        "--poll-interval",
        type=int,
        default=5,
        help="Seconds between polls (default: 5)",
    )
    parser.add_argument(
        "--max-wait",
        type=int,
        default=600,
        help="Maximum seconds to wait (default: 600)",
    )
    parser.add_argument(
        "--no-poll",
        action="store_true",
        help="Create task and return immediately without polling (useful for CI/CD or Claude Code)",
    )
    parser.add_argument(
        "--get-events",
        action="store_true",
        help="Fetch and display current events once without polling",
    )
    parser.add_argument(
        "--json",
        action="store_true",
        help="Output results as JSON (useful for programmatic use)",
    )

    args = parser.parse_args()

    # Determine organization
    org = args.org
    if not org:
        org = get_default_org()
        if org:
            print(f"Using organization: {org}")
        else:
            print("Error: Could not detect organization.")
            print("Please specify with --org <organization>")
            print("")
            print("To set a default organization:")
            print("  pulumi org set-default <organization>")
            sys.exit(1)

    # Handle list command
    if args.list:
        tasks = list_tasks(org)
        if args.json:
            print(json.dumps({"tasks": tasks}, indent=2))
        elif not tasks:
            print("No tasks found.")
        else:
            print(f"{'ID':<40} {'Status':<15} {'Created'}")
            print("-" * 80)
            for task in tasks:
                print(f"{task.get('id', 'N/A'):<40} {task.get('status', 'N/A'):<15} {task.get('createdAt', 'N/A')}")
        return

    # Handle get-events command (single fetch, no polling)
    if args.get_events:
        if not args.task_id:
            print("Error: --task-id required for --get-events")
            sys.exit(1)
        task = get_task(org, args.task_id)
        events, _ = get_events(org, args.task_id)
        if args.json:
            print(json.dumps({"task": task, "events": events}, indent=2))
        else:
            print(f"Task: {args.task_id}")
            print(f"Status: {task.get('status', 'unknown')}")
            print(f"Console: https://app.pulumi.com/{org}/neo/{args.task_id}")
            print("-" * 60)
            for event in events:
                print(format_event(event))
        return

    # Handle approval/cancel
    if args.approve or args.cancel:
        if not args.task_id:
            print("Error: --task-id required for approval/cancel")
            sys.exit(1)

        approval_id = args.approval_id
        if not approval_id and args.approve:
            # Try to find approval ID from events
            events, _ = get_events(org, args.task_id)
            approval_id = find_pending_approval(events)

        if args.approve:
            if not approval_id:
                print("Error: Could not find approval request ID")
                print("Specify with --approval-id")
                sys.exit(1)
            if send_approval(org, args.task_id, approval_id):
                print("Approval sent. Continuing to poll...")
                poll_task(org, args.task_id, args.poll_interval, args.max_wait)
            else:
                print("Error sending approval")
                sys.exit(1)
        else:
            if send_cancel(org, args.task_id):
                print("Cancellation sent.")
            else:
                print("Error sending cancellation")
                sys.exit(1)
        return

    # Handle existing task polling
    if args.task_id and not args.message:
        poll_task(org, args.task_id, args.poll_interval, args.max_wait)
        return

    # Handle follow-up message to existing task
    if args.task_id and args.message:
        if send_message(org, args.task_id, args.message):
            print("Message sent. Polling for response...")
            poll_task(org, args.task_id, args.poll_interval, args.max_wait)
        else:
            print("Error sending message")
            sys.exit(1)
        return

    # Create new task
    if args.message:
        # Validate entity context combinations
        if args.stack_name and not args.stack_project:
            print("Error: --stack-project required with --stack-name")
            sys.exit(1)
        if args.repo_name and not args.repo_org:
            print("Error: --repo-org required with --repo-name")
            sys.exit(1)

        task_id = create_task(
            org=org,
            message=args.message,
            stack_name=args.stack_name,
            stack_project=args.stack_project,
            repo_name=args.repo_name,
            repo_org=args.repo_org,
            repo_forge=args.repo_forge,
        )

        console_url = f"https://app.pulumi.com/{org}/neo/{task_id}"

        if args.json:
            print(json.dumps({
                "taskId": task_id,
                "consoleUrl": console_url,
                "org": org,
            }, indent=2))
        else:
            print(f"Created task: {task_id}")
            print(f"Console: {console_url}")

        if not args.no_poll:
            poll_task(org, task_id, args.poll_interval, args.max_wait)
        return

    # No action specified
    parser.print_help()


if __name__ == "__main__":
    main()

Install with Tessl CLI

npx tessl i dirien/pulumi-skills

pulumi-neo

CLAUDE.md

README.md

tile.json