CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-langgraph-sdk

Python SDK for interacting with the LangGraph Platform REST API to build and manage AI assistants and conversational workflows

Overview
Eval results
Files

scheduled-tasks.mddocs/

Scheduled Tasks

Create and manage cron jobs for automated execution of assistants on threads or with dynamic thread creation. Supports timezone handling, webhook notifications, and flexible scheduling.

Capabilities

Cron Job Creation

Create scheduled tasks for automated assistant execution with flexible scheduling and configuration options.

from collections.abc import Mapping
from typing import Any
from langgraph_sdk.schema import (
    Cron, Config, Context, All, QueryParamTypes
)

# Via client.crons
async def create(
    assistant_id: str,
    *,
    schedule: str,
    input: Mapping[str, Any] | None = None,
    metadata: Mapping[str, Any] | None = None,
    config: Config | None = None,
    context: Context | None = None,
    checkpoint_during: bool | None = None,
    interrupt_before: All | list[str] | None = None,
    interrupt_after: All | list[str] | None = None,
    webhook: str | None = None,
    webhook_mode: str | None = None,
    headers: Mapping[str, str] | None = None,
    params: QueryParamTypes | None = None,
) -> Cron:
    """
    Create a cron job that will create a new thread for each run.

    Args:
        assistant_id: The assistant ID or graph name to cron.
        schedule: The schedule to run the assistant on.
        input: The input to the assistant.
        metadata: The metadata to add to the runs.
        config: The config to use for the runs.
        context: The context to add to the runs.
        checkpoint_during: Whether to checkpoint during the run.
        interrupt_before: Nodes to interrupt immediately before they run.
        interrupt_after: Nodes to interrupt immediately after they run.
        webhook: Webhook to call after the run is done.
        webhook_mode: Mode to call the webhook. Options are "GET" and "POST".
        headers: Optional custom headers to include with the request.
        params: Optional query parameters to include with the request.

    Returns:
        Cron: The created cron job.
    """

async def create_for_thread(
    thread_id: str,
    assistant_id: str,
    *,
    schedule: str,
    input: Mapping[str, Any] | None = None,
    metadata: Mapping[str, Any] | None = None,
    config: Config | None = None,
    context: Context | None = None,
    checkpoint_during: bool | None = None,
    interrupt_before: All | list[str] | None = None,
    interrupt_after: All | list[str] | None = None,
    webhook: str | None = None,
    webhook_mode: str | None = None,
    headers: Mapping[str, str] | None = None,
    params: QueryParamTypes | None = None,
) -> Cron:
    """
    Create a cron job that will run on a specific thread.

    Args:
        thread_id: The thread ID to cron.
        assistant_id: The assistant ID or graph name to cron.
        schedule: The schedule to run the assistant on.
        input: The input to the assistant.
        metadata: The metadata to add to the runs.
        config: The config to use for the runs.
        context: The context to add to the runs.
        checkpoint_during: Whether to checkpoint during the run.
        interrupt_before: Nodes to interrupt immediately before they run.
        interrupt_after: Nodes to interrupt immediately after they run.
        webhook: Webhook to call after the run is done.
        webhook_mode: Mode to call the webhook. Options are "GET" and "POST".
        headers: Optional custom headers to include with the request.
        params: Optional query parameters to include with the request.

    Returns:
        Cron: The created cron job.
    """

Cron Job Management

Search, list, count, and delete scheduled tasks with filtering capabilities.

from langgraph_sdk.schema import (
    CronSelectField, CronSortBy, SortOrder, QueryParamTypes
)

async def search(
    *,
    assistant_id: str | None = None,
    thread_id: str | None = None,
    limit: int = 10,
    offset: int = 0,
    sort_by: CronSortBy | None = None,
    sort_order: SortOrder | None = None,
    select: list[CronSelectField] | None = None,
    headers: Mapping[str, str] | None = None,
    params: QueryParamTypes | None = None,
) -> list[Cron]:
    """
    List cron jobs.

    Args:
        assistant_id: Assistant ID to filter by.
        thread_id: Thread ID to filter by.
        limit: Limit the number of cron jobs to return.
        offset: Offset to start from.
        sort_by: Field to sort by.
        sort_order: Order to sort by.
        select: Fields to include in the response.
        headers: Optional custom headers to include with the request.
        params: Optional query parameters to include with the request.

    Returns:
        list[Cron]: List of cron jobs.
    """

async def count(
    *,
    assistant_id: str | None = None,
    thread_id: str | None = None,
    headers: Mapping[str, str] | None = None,
    params: QueryParamTypes | None = None,
) -> int:
    """
    Count cron jobs matching filters.

    Args:
        assistant_id: Assistant ID to filter by.
        thread_id: Thread ID to filter by.
        headers: Optional custom headers to include with the request.
        params: Optional query parameters to include with the request.

    Returns:
        int: Number of crons matching the criteria.
    """

async def delete(
    cron_id: str,
    *,
    headers: Mapping[str, str] | None = None,
    params: QueryParamTypes | None = None,
) -> None:
    """
    Delete a cron job.

    Args:
        cron_id: The cron ID to delete.
        headers: Optional custom headers to include with the request.
        params: Optional query parameters to include with the request.
    """

Types

class Cron(TypedDict):
    """Scheduled task definition."""
    cron_id: str
    thread_id: str
    assistant_id: str
    schedule: str
    timezone: str
    created_at: str
    updated_at: str
    metadata: dict
    config: Config
    input: dict
    next_run_time: str
    last_run_time: str
    enabled: bool

CronSelectField = Literal[
    "cron_id", "thread_id", "assistant_id", "schedule",
    "timezone", "created_at", "updated_at", "metadata",
    "config", "input", "next_run_time", "last_run_time", "enabled"
]

CronSortBy = Literal["created_at", "updated_at", "next_run_time", "last_run_time"]

Usage Examples

Creating Scheduled Tasks

# Daily report generation
daily_report = await client.crons.create(
    assistant_id="report-assistant",
    schedule="0 9 * * *",  # 9 AM daily
    input={"report_type": "daily", "email_list": ["admin@company.com"]},
    config={"timeout": 600},
    metadata={"purpose": "daily_report", "owner": "operations"},
    timezone="America/New_York",
    webhook="https://myapp.com/webhooks/report-complete"
)

# Hourly data processing for specific thread
hourly_processor = await client.crons.create_for_thread(
    thread_id="data-thread-123",
    assistant_id="data-processor",
    schedule="0 * * * *",  # Every hour
    input={"source": "api", "format": "json"},
    config={"batch_size": 1000},
    metadata={"environment": "production"}
)

# Weekly cleanup task
cleanup_job = await client.crons.create(
    assistant_id="cleanup-assistant",
    schedule="0 2 * * 0",  # 2 AM on Sundays
    input={"retention_days": 30, "dry_run": False},
    timezone="UTC",
    on_completion="keep"  # Keep run records for audit
)

Managing Scheduled Tasks

# List all cron jobs for an assistant
assistant_jobs = await client.crons.search(
    assistant_id="report-assistant",
    limit=50
)

# Find jobs for a specific thread
thread_jobs = await client.crons.search(
    thread_id="data-thread-123"
)

# Get total job count
total_jobs = await client.crons.count()

# Delete a scheduled task
await client.crons.delete("cron-456")

Schedule Patterns

# Common cron schedule patterns

# Every minute
await client.crons.create(
    assistant_id="monitoring-assistant",
    schedule="* * * * *"
)

# Every 15 minutes
await client.crons.create(
    assistant_id="health-check-assistant",
    schedule="*/15 * * * *"
)

# Daily at 3:30 AM
await client.crons.create(
    assistant_id="backup-assistant",
    schedule="30 3 * * *"
)

# Weekly on Monday at 9 AM
await client.crons.create(
    assistant_id="weekly-report-assistant",
    schedule="0 9 * * 1"
)

# Monthly on the 1st at midnight
await client.crons.create(
    assistant_id="monthly-billing-assistant",
    schedule="0 0 1 * *"
)

# Weekdays at 6 PM
await client.crons.create(
    assistant_id="daily-summary-assistant",
    schedule="0 18 * * 1-5"
)

Timezone Handling

# Schedule with specific timezone
tokyo_job = await client.crons.create(
    assistant_id="tokyo-report-assistant",
    schedule="0 9 * * *",  # 9 AM Tokyo time
    timezone="Asia/Tokyo",
    input={"region": "APAC"}
)

# Multiple timezone jobs for global operation
timezones = ["UTC", "America/New_York", "Europe/London", "Asia/Tokyo"]
for tz in timezones:
    await client.crons.create(
        assistant_id="regional-assistant",
        schedule="0 8 * * *",  # 8 AM local time
        timezone=tz,
        input={"timezone": tz, "region": tz.split("/")[-1]}
    )

Webhook Integration

# Job with webhook notifications
webhook_job = await client.crons.create(
    assistant_id="critical-task-assistant",
    schedule="0 */6 * * *",  # Every 6 hours
    input={"task": "system_health_check"},
    webhook="https://monitoring.company.com/webhooks/cron-complete",
    metadata={"priority": "critical", "alert_on_failure": True}
)

# Webhook payload will include:
# {
#   "cron_id": "cron-123",
#   "run_id": "run-456",
#   "status": "success|error|timeout",
#   "metadata": {...},
#   "completed_at": "2023-12-01T12:00:00Z"
# }

Error Handling and Monitoring

# Create jobs with error handling configuration
robust_job = await client.crons.create(
    assistant_id="data-sync-assistant",
    schedule="0 2 * * *",
    input={"source": "external_api", "retries": 3},
    config={"timeout": 1800, "retry_policy": "exponential_backoff"},
    multitask_strategy="enqueue",  # Queue if previous run still active
    webhook="https://alerts.company.com/cron-status"
)

# Monitor scheduled tasks
all_jobs = await client.crons.search(limit=100)
for job in all_jobs:
    next_run = job.get("next_run_time")
    last_run = job.get("last_run_time")
    enabled = job.get("enabled", True)

    print(f"Job {job['cron_id']}: next={next_run}, last={last_run}, enabled={enabled}")

    if not enabled:
        print(f"Warning: Job {job['cron_id']} is disabled")

Install with Tessl CLI

npx tessl i tessl/pypi-langgraph-sdk

docs

assistant-management.md

authentication.md

client-management.md

index.md

persistent-storage.md

run-execution.md

scheduled-tasks.md

thread-management.md

tile.json