Python SDK for interacting with the LangGraph Platform REST API to build and manage AI assistants and conversational workflows
Create and manage cron jobs for automated execution of assistants on threads or with dynamic thread creation. Supports timezone handling, webhook notifications, and flexible scheduling.
Create scheduled tasks for automated assistant execution with flexible scheduling and configuration options.
from collections.abc import Mapping
from typing import Any
from langgraph_sdk.schema import (
Cron, Config, Context, All, QueryParamTypes
)
# Via client.crons
async def create(
assistant_id: str,
*,
schedule: str,
input: Mapping[str, Any] | None = None,
metadata: Mapping[str, Any] | None = None,
config: Config | None = None,
context: Context | None = None,
checkpoint_during: bool | None = None,
interrupt_before: All | list[str] | None = None,
interrupt_after: All | list[str] | None = None,
webhook: str | None = None,
webhook_mode: str | None = None,
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
) -> Cron:
"""
Create a cron job that will create a new thread for each run.
Args:
assistant_id: The assistant ID or graph name to cron.
schedule: The schedule to run the assistant on.
input: The input to the assistant.
metadata: The metadata to add to the runs.
config: The config to use for the runs.
context: The context to add to the runs.
checkpoint_during: Whether to checkpoint during the run.
interrupt_before: Nodes to interrupt immediately before they run.
interrupt_after: Nodes to interrupt immediately after they run.
webhook: Webhook to call after the run is done.
webhook_mode: Mode to call the webhook. Options are "GET" and "POST".
headers: Optional custom headers to include with the request.
params: Optional query parameters to include with the request.
Returns:
Cron: The created cron job.
"""
async def create_for_thread(
thread_id: str,
assistant_id: str,
*,
schedule: str,
input: Mapping[str, Any] | None = None,
metadata: Mapping[str, Any] | None = None,
config: Config | None = None,
context: Context | None = None,
checkpoint_during: bool | None = None,
interrupt_before: All | list[str] | None = None,
interrupt_after: All | list[str] | None = None,
webhook: str | None = None,
webhook_mode: str | None = None,
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
) -> Cron:
"""
Create a cron job that will run on a specific thread.
Args:
thread_id: The thread ID to cron.
assistant_id: The assistant ID or graph name to cron.
schedule: The schedule to run the assistant on.
input: The input to the assistant.
metadata: The metadata to add to the runs.
config: The config to use for the runs.
context: The context to add to the runs.
checkpoint_during: Whether to checkpoint during the run.
interrupt_before: Nodes to interrupt immediately before they run.
interrupt_after: Nodes to interrupt immediately after they run.
webhook: Webhook to call after the run is done.
webhook_mode: Mode to call the webhook. Options are "GET" and "POST".
headers: Optional custom headers to include with the request.
params: Optional query parameters to include with the request.
Returns:
Cron: The created cron job.
"""Search, list, count, and delete scheduled tasks with filtering capabilities.
from langgraph_sdk.schema import (
CronSelectField, CronSortBy, SortOrder, QueryParamTypes
)
async def search(
*,
assistant_id: str | None = None,
thread_id: str | None = None,
limit: int = 10,
offset: int = 0,
sort_by: CronSortBy | None = None,
sort_order: SortOrder | None = None,
select: list[CronSelectField] | None = None,
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
) -> list[Cron]:
"""
List cron jobs.
Args:
assistant_id: Assistant ID to filter by.
thread_id: Thread ID to filter by.
limit: Limit the number of cron jobs to return.
offset: Offset to start from.
sort_by: Field to sort by.
sort_order: Order to sort by.
select: Fields to include in the response.
headers: Optional custom headers to include with the request.
params: Optional query parameters to include with the request.
Returns:
list[Cron]: List of cron jobs.
"""
async def count(
*,
assistant_id: str | None = None,
thread_id: str | None = None,
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
) -> int:
"""
Count cron jobs matching filters.
Args:
assistant_id: Assistant ID to filter by.
thread_id: Thread ID to filter by.
headers: Optional custom headers to include with the request.
params: Optional query parameters to include with the request.
Returns:
int: Number of crons matching the criteria.
"""
async def delete(
cron_id: str,
*,
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
) -> None:
"""
Delete a cron job.
Args:
cron_id: The cron ID to delete.
headers: Optional custom headers to include with the request.
params: Optional query parameters to include with the request.
"""class Cron(TypedDict):
"""Scheduled task definition."""
cron_id: str
thread_id: str
assistant_id: str
schedule: str
timezone: str
created_at: str
updated_at: str
metadata: dict
config: Config
input: dict
next_run_time: str
last_run_time: str
enabled: bool
CronSelectField = Literal[
"cron_id", "thread_id", "assistant_id", "schedule",
"timezone", "created_at", "updated_at", "metadata",
"config", "input", "next_run_time", "last_run_time", "enabled"
]
CronSortBy = Literal["created_at", "updated_at", "next_run_time", "last_run_time"]# Daily report generation
daily_report = await client.crons.create(
assistant_id="report-assistant",
schedule="0 9 * * *", # 9 AM daily
input={"report_type": "daily", "email_list": ["admin@company.com"]},
config={"timeout": 600},
metadata={"purpose": "daily_report", "owner": "operations"},
timezone="America/New_York",
webhook="https://myapp.com/webhooks/report-complete"
)
# Hourly data processing for specific thread
hourly_processor = await client.crons.create_for_thread(
thread_id="data-thread-123",
assistant_id="data-processor",
schedule="0 * * * *", # Every hour
input={"source": "api", "format": "json"},
config={"batch_size": 1000},
metadata={"environment": "production"}
)
# Weekly cleanup task
cleanup_job = await client.crons.create(
assistant_id="cleanup-assistant",
schedule="0 2 * * 0", # 2 AM on Sundays
input={"retention_days": 30, "dry_run": False},
timezone="UTC",
on_completion="keep" # Keep run records for audit
)# List all cron jobs for an assistant
assistant_jobs = await client.crons.search(
assistant_id="report-assistant",
limit=50
)
# Find jobs for a specific thread
thread_jobs = await client.crons.search(
thread_id="data-thread-123"
)
# Get total job count
total_jobs = await client.crons.count()
# Delete a scheduled task
await client.crons.delete("cron-456")# Common cron schedule patterns
# Every minute
await client.crons.create(
assistant_id="monitoring-assistant",
schedule="* * * * *"
)
# Every 15 minutes
await client.crons.create(
assistant_id="health-check-assistant",
schedule="*/15 * * * *"
)
# Daily at 3:30 AM
await client.crons.create(
assistant_id="backup-assistant",
schedule="30 3 * * *"
)
# Weekly on Monday at 9 AM
await client.crons.create(
assistant_id="weekly-report-assistant",
schedule="0 9 * * 1"
)
# Monthly on the 1st at midnight
await client.crons.create(
assistant_id="monthly-billing-assistant",
schedule="0 0 1 * *"
)
# Weekdays at 6 PM
await client.crons.create(
assistant_id="daily-summary-assistant",
schedule="0 18 * * 1-5"
)# Schedule with specific timezone
tokyo_job = await client.crons.create(
assistant_id="tokyo-report-assistant",
schedule="0 9 * * *", # 9 AM Tokyo time
timezone="Asia/Tokyo",
input={"region": "APAC"}
)
# Multiple timezone jobs for global operation
timezones = ["UTC", "America/New_York", "Europe/London", "Asia/Tokyo"]
for tz in timezones:
await client.crons.create(
assistant_id="regional-assistant",
schedule="0 8 * * *", # 8 AM local time
timezone=tz,
input={"timezone": tz, "region": tz.split("/")[-1]}
)# Job with webhook notifications
webhook_job = await client.crons.create(
assistant_id="critical-task-assistant",
schedule="0 */6 * * *", # Every 6 hours
input={"task": "system_health_check"},
webhook="https://monitoring.company.com/webhooks/cron-complete",
metadata={"priority": "critical", "alert_on_failure": True}
)
# Webhook payload will include:
# {
# "cron_id": "cron-123",
# "run_id": "run-456",
# "status": "success|error|timeout",
# "metadata": {...},
# "completed_at": "2023-12-01T12:00:00Z"
# }# Create jobs with error handling configuration
robust_job = await client.crons.create(
assistant_id="data-sync-assistant",
schedule="0 2 * * *",
input={"source": "external_api", "retries": 3},
config={"timeout": 1800, "retry_policy": "exponential_backoff"},
multitask_strategy="enqueue", # Queue if previous run still active
webhook="https://alerts.company.com/cron-status"
)
# Monitor scheduled tasks
all_jobs = await client.crons.search(limit=100)
for job in all_jobs:
next_run = job.get("next_run_time")
last_run = job.get("last_run_time")
enabled = job.get("enabled", True)
print(f"Job {job['cron_id']}: next={next_run}, last={last_run}, enabled={enabled}")
if not enabled:
print(f"Warning: Job {job['cron_id']} is disabled")Install with Tessl CLI
npx tessl i tessl/pypi-langgraph-sdk