Python SDK for interacting with the LangGraph Platform REST API to build and manage AI assistants and conversational workflows
Comprehensive authentication and authorization system supporting custom authentication handlers, fine-grained authorization rules, and flexible security policies for all resources and actions.
Custom authentication handlers for verifying user credentials and extracting user information from requests.
from typing import Callable, TypeVar
from collections.abc import Sequence
from langgraph_sdk.auth import types, exceptions
TH = TypeVar("TH", bound=types.Handler)
AH = TypeVar("AH", bound=types.Authenticator)
class Auth:
"""Add custom authentication and authorization management to your LangGraph application.
The Auth class provides a unified system for handling authentication and
authorization in LangGraph applications. It supports custom user authentication
protocols and fine-grained authorization rules for different resources and
actions.
"""
types = types
"""Reference to auth type definitions.
Provides access to all type definitions used in the auth system,
like ThreadsCreate, AssistantsRead, etc."""
exceptions = exceptions
"""Reference to auth exception definitions.
Provides access to all exception definitions used in the auth system,
like HTTPException, etc.
"""
def __init__(self) -> None:
self.on: _On = ... # Authorization handlers
def authenticate(self, fn: AH) -> AH:
"""Register an authentication handler function.
The authentication handler is responsible for verifying credentials
and returning user scopes. It can accept any of the following parameters
by name:
- request (Request): The raw ASGI request object
- body (dict): The parsed request body
- path (str): The request path
- method (str): The HTTP method
- path_params (dict[str, str]): URL path parameters
- query_params (dict[str, str]): URL query parameters
- headers (dict[bytes, bytes]): Request headers
- authorization (str | None): The Authorization header value
Args:
fn: The authentication handler function to register.
Must return a representation of the user. This could be a:
- string (the user id)
- dict containing {"identity": str, "permissions": list[str]}
- or an object with identity and permissions properties
Returns:
The registered handler function.
"""Fine-grained authorization control with resource-specific and action-specific handlers.
class _On:
"""Authorization handler registration system."""
assistants: _AssistantsOn
threads: _ThreadsOn
crons: _CronsOn
store: _StoreOn
def __call__(
self,
fn: Callable = None,
*,
resources: str | list[str] = None,
actions: str | list[str] = None
) -> Callable:
"""
Register global or filtered authorization handler.
Parameters:
- fn: Handler function (for direct decoration)
- resources: Resource names to handle
- actions: Action names to handle
Returns:
Handler function or decorator
"""
# Resource-specific authorization handlers
class _AssistantsOn:
"""Authorization handlers for assistant operations."""
create: Callable # @auth.on.assistants.create
read: Callable # @auth.on.assistants.read
update: Callable # @auth.on.assistants.update
delete: Callable # @auth.on.assistants.delete
search: Callable # @auth.on.assistants.search
def __call__(self, fn: Callable) -> Callable:
"""Handle all assistant operations: @auth.on.assistants"""
class _ThreadsOn:
"""Authorization handlers for thread operations."""
create: Callable # @auth.on.threads.create
read: Callable # @auth.on.threads.read
update: Callable # @auth.on.threads.update
delete: Callable # @auth.on.threads.delete
search: Callable # @auth.on.threads.search
create_run: Callable # @auth.on.threads.create_run
def __call__(self, fn: Callable) -> Callable:
"""Handle all thread operations: @auth.on.threads"""
class _CronsOn:
"""Authorization handlers for cron operations."""
create: Callable # @auth.on.crons.create
read: Callable # @auth.on.crons.read
update: Callable # @auth.on.crons.update
delete: Callable # @auth.on.crons.delete
search: Callable # @auth.on.crons.search
def __call__(self, fn: Callable) -> Callable:
"""Handle all cron operations: @auth.on.crons"""
class _StoreOn:
"""Authorization handlers for store operations."""
def __call__(
self,
fn: Callable = None,
*,
actions: str | list[str] = None
) -> Callable:
"""
Handle store operations.
Parameters:
- fn: Handler function
- actions: Specific store actions ("put", "get", "search", "list_namespaces", "delete")
"""Type definitions for user representation and authentication context.
from typing import Protocol, TypedDict, Callable, Union, Literal, Any
from collections.abc import Sequence, Awaitable, Mapping
import typing_extensions
class MinimalUser(Protocol):
"""User objects must at least expose the identity property."""
@property
def identity(self) -> str:
"""The unique identifier for the user."""
...
class BaseUser(Protocol):
"""The base ASGI user protocol."""
@property
def is_authenticated(self) -> bool:
"""Whether the user is authenticated."""
...
@property
def display_name(self) -> str:
"""The display name of the user."""
...
@property
def identity(self) -> str:
"""The unique identifier for the user."""
...
@property
def permissions(self) -> Sequence[str]:
"""The permissions associated with the user."""
...
class StudioUser:
"""A user object that's populated from authenticated requests from the LangGraph studio."""
identity: str
display_name: str
is_authenticated: bool = True
kind: Literal["StudioUser"] = "StudioUser"
class BaseAuthContext:
"""Base class for authentication context."""
permissions: Sequence[str]
user: BaseUser
class AuthContext(BaseAuthContext):
"""Complete authentication context with resource and action information."""
resource: Literal["runs", "threads", "crons", "assistants", "store"]
action: Literal["create", "read", "update", "delete", "search", "create_run", "put", "get", "list_namespaces"]
class MinimalUserDict(TypedDict, total=False):
"""The dictionary representation of a user."""
identity: typing_extensions.Required[str]
display_name: str
is_authenticated: bool
permissions: Sequence[str]
# Filter types for authorization responses
FilterType = Union[
dict[str, Union[str, dict[Literal["$eq", "$contains"], str]]],
dict[str, str],
]
HandlerResult = Union[None, bool, FilterType]
"""The result of a handler can be:
* None | True: accept the request.
* False: reject the request with a 403 error
* FilterType: filter to apply
"""
Handler = Callable[..., Awaitable[HandlerResult]]
Authenticator = Callable[
...,
Awaitable[
Union[MinimalUser, str, BaseUser, MinimalUserDict, Mapping[str, Any]],
],
]
"""Type for authentication functions."""Type definitions for operation-specific authorization data.
# Thread operation types
class ThreadsCreate(TypedDict):
metadata: dict
thread_id: str
thread_ttl: int
class ThreadsRead(TypedDict):
thread_id: str
class ThreadsUpdate(TypedDict):
thread_id: str
metadata: dict
class ThreadsDelete(TypedDict):
thread_id: str
class ThreadsSearch(TypedDict):
metadata: dict
values: dict
status: str
# Assistant operation types
class AssistantsCreate(TypedDict):
graph_id: str
config: dict
metadata: dict
class AssistantsRead(TypedDict):
assistant_id: str
class AssistantsUpdate(TypedDict):
assistant_id: str
config: dict
metadata: dict
class AssistantsDelete(TypedDict):
assistant_id: str
class AssistantsSearch(TypedDict):
metadata: dict
graph_id: str
# Run operation types
class RunsCreate(TypedDict):
thread_id: str
assistant_id: str
input: dict
config: dict
# Cron operation types
class CronsCreate(TypedDict):
assistant_id: str
schedule: str
thread_id: str
config: dict
class CronsRead(TypedDict):
cron_id: str
class CronsDelete(TypedDict):
cron_id: str
class CronsSearch(TypedDict):
assistant_id: str
thread_id: str
# Store operation types
class StoreGet(TypedDict):
namespace: list[str]
key: str
class StorePut(TypedDict):
namespace: list[str]
key: str
value: dict
class StoreDelete(TypedDict):
namespace: list[str]
key: str
class StoreSearch(TypedDict):
namespace_prefix: list[str]
query: str
filter: dict
class StoreListNamespaces(TypedDict):
prefix: list[str]
suffix: list[str]class HTTPException(Exception):
"""HTTP exception for authentication/authorization errors."""
def __init__(self, status_code: int, detail: str):
self.status_code = status_code
self.detail = detail
super().__init__(detail)from langgraph_sdk import Auth
# Create auth instance
auth = Auth()
@auth.authenticate
async def authenticate(authorization: str) -> str:
"""
Simple token-based authentication.
Returns user ID if token is valid.
"""
if not authorization or not authorization.startswith("Bearer "):
raise Auth.exceptions.HTTPException(
status_code=401,
detail="Missing or invalid authorization header"
)
token = authorization[7:] # Remove "Bearer "
user_id = await verify_token(token) # Your token verification logic
if not user_id:
raise Auth.exceptions.HTTPException(
status_code=401,
detail="Invalid token"
)
return user_id
async def verify_token(token: str) -> str:
"""Your token verification implementation."""
# Call your auth service, check database, etc.
if token == "valid-token-123":
return "user-123"
return None@auth.authenticate
async def authenticate(
authorization: str,
path: str,
method: str
) -> Auth.types.MinimalUserDict:
"""
Authentication with user permissions.
"""
if not authorization:
raise Auth.exceptions.HTTPException(401, "Authorization required")
# Verify token and get user info
user_data = await verify_jwt_token(authorization)
return {
"identity": user_data["user_id"],
"permissions": user_data.get("permissions", []),
"display_name": user_data.get("name", "Unknown User")
}
async def verify_jwt_token(authorization: str) -> dict:
"""Verify JWT and return user data."""
# JWT verification logic
import jwt
try:
token = authorization.replace("Bearer ", "")
payload = jwt.decode(token, "secret", algorithms=["HS256"])
return payload
except jwt.InvalidTokenError:
raise Auth.exceptions.HTTPException(401, "Invalid token")# Global authorization handler
@auth.on
async def global_auth(ctx: Auth.types.AuthContext, value: dict) -> bool:
"""
Global handler for all requests.
Applied when no specific handler matches.
"""
# Log all requests
print(f"Request: {ctx.method} {ctx.path} by {ctx.user.identity}")
# Allow all requests (specific handlers can override)
return True
# Resource-specific handlers
@auth.on.threads
async def thread_auth(ctx: Auth.types.AuthContext, value: dict) -> bool:
"""
Handle all thread operations.
More specific than global handler.
"""
# Users can only access their own threads
thread_metadata = value.get("metadata", {})
owner = thread_metadata.get("owner")
return owner == ctx.user.identity
# Action-specific handlers
@auth.on.threads.create
async def thread_create_auth(
ctx: Auth.types.AuthContext,
value: Auth.types.ThreadsCreate
) -> bool:
"""
Handle thread creation specifically.
Most specific handler.
"""
# Check user has permission to create threads
return "threads:create" in getattr(ctx.user, "permissions", [])
@auth.on.threads.delete
async def thread_delete_auth(
ctx: Auth.types.AuthContext,
value: Auth.types.ThreadsDelete
) -> bool:
"""
Restrict thread deletion to admins.
"""
return "admin" in getattr(ctx.user, "permissions", [])@auth.on.store
async def store_auth(ctx: Auth.types.AuthContext, value: dict) -> bool:
"""
Authorize store operations.
Enforce user isolation in namespaces.
"""
namespace = value.get("namespace", [])
# Ensure user can only access their own namespace
if len(namespace) >= 2 and namespace[0] == "users":
return namespace[1] == ctx.user.identity
# Allow access to shared namespaces for certain users
if namespace[0] in ["shared", "public"]:
return True
# Admins can access everything
return "admin" in getattr(ctx.user, "permissions", [])
# Action-specific store handlers
@auth.on.store(actions=["put", "delete"])
async def store_write_auth(ctx: Auth.types.AuthContext, value: dict) -> bool:
"""
Restrict write operations.
"""
# Only users with write permission can modify data
return "store:write" in getattr(ctx.user, "permissions", [])
@auth.on.store(actions=["search", "list_namespaces"])
async def store_search_auth(ctx: Auth.types.AuthContext, value: dict) -> bool:
"""
Allow search operations for authenticated users.
"""
return not getattr(ctx.user, "is_anonymous", False)# Multi-resource authorization
@auth.on(resources=["threads", "runs"], actions=["create", "update"])
async def rate_limit_writes(ctx: Auth.types.AuthContext, value: dict) -> bool:
"""
Rate limit write operations across resources.
"""
user_id = ctx.user.identity
# Check rate limit
current_minute = datetime.now().strftime("%Y-%m-%d %H:%M")
rate_key = f"rate_limit:{user_id}:{current_minute}"
current_count = await redis_client.get(rate_key) or 0
if int(current_count) >= 100: # 100 writes per minute
raise Auth.exceptions.HTTPException(
status_code=429,
detail="Rate limit exceeded"
)
await redis_client.incr(rate_key)
await redis_client.expire(rate_key, 60)
return True
# Conditional authorization
@auth.on.assistants.update
async def assistant_update_auth(
ctx: Auth.types.AuthContext,
value: Auth.types.AssistantsUpdate
) -> bool:
"""
Allow assistant updates based on ownership or admin role.
"""
assistant_id = value["assistant_id"]
# Get assistant metadata
assistant = await get_assistant_metadata(assistant_id)
# Owner can always update
if assistant.get("owner") == ctx.user.identity:
return True
# Admins can update any assistant
if "admin" in getattr(ctx.user, "permissions", []):
return True
# Collaborators can update if they have permission
collaborators = assistant.get("collaborators", [])
return ctx.user.identity in collaborators
# Data filtering authorization
@auth.on.threads.search
async def thread_search_filter(
ctx: Auth.types.AuthContext,
value: Auth.types.ThreadsSearch
) -> dict:
"""
Filter search results based on user permissions.
Returns filter dict instead of boolean.
"""
# Regular users can only see their own threads
if "admin" not in getattr(ctx.user, "permissions", []):
return {
"metadata.owner": ctx.user.identity
}
# Admins see everything
return {}# Example langgraph.json configuration
"""
{
"dependencies": ["."],
"graphs": {
"my_assistant": "./assistant.py:graph"
},
"env": ".env",
"auth": {
"path": "./auth.py:auth"
}
}
"""
# auth.py file
auth = Auth()
@auth.authenticate
async def authenticate(authorization: str) -> Auth.types.MinimalUserDict:
# Your authentication logic
return await verify_user(authorization)
@auth.on
async def default_auth(ctx: Auth.types.AuthContext, value: dict) -> bool:
# Default authorization logic
return True
# Export the auth instance
__all__ = ["auth"]Install with Tessl CLI
npx tessl i tessl/pypi-langgraph-sdk