Python SDK for Claude Code enabling developers to build AI-powered applications and agents with support for custom tools, hooks, and bidirectional interactive conversations
Handling unusual situations, error recovery, and complex workflows.
Retry logic for transient failures:
import anyio
from claude_agent_sdk import (
query,
ClaudeSDKError,
CLIConnectionError,
ProcessError
)
async def query_with_retry(prompt: str, max_retries: int = 3):
"""Query with automatic retry on transient errors."""
for attempt in range(max_retries):
try:
messages = []
async for message in query(prompt=prompt):
messages.append(message)
return messages # Success
except (CLIConnectionError, ProcessError) as e:
if attempt < max_retries - 1:
wait_time = 2 ** attempt # Exponential backoff
print(f"Attempt {attempt + 1} failed: {e}")
print(f"Retrying in {wait_time}s...")
await anyio.sleep(wait_time)
else:
print(f"All {max_retries} attempts failed")
raise
except ClaudeSDKError:
# Don't retry for other SDK errors
raise
async def main():
try:
messages = await query_with_retry("What is 2 + 2?")
print(f"Success! Received {len(messages)} messages")
except ClaudeSDKError as e:
print(f"Final error: {e}")
anyio.run(main)Process large files incrementally:
import anyio
from pathlib import Path
from claude_agent_sdk import ClaudeSDKClient, ClaudeAgentOptions
async def process_large_file(file_path: Path):
"""Process large file in chunks to avoid memory issues."""
options = ClaudeAgentOptions(
allowed_tools=["Read"],
max_buffer_size=1048576 # 1MB buffer limit
)
async with ClaudeSDKClient(options=options) as client:
# Get file size
file_size = file_path.stat().st_size
chunk_size = 100000 # Process 100KB at a time
results = []
for offset in range(0, file_size, chunk_size):
await client.query(f"""
Read {file_path} from offset {offset} to {offset + chunk_size}.
Analyze this chunk for issues.
""")
async for msg in client.receive_response():
if isinstance(msg, AssistantMessage):
for block in msg.content:
if isinstance(block, TextBlock):
results.append(block.text)
return results
anyio.run(lambda: process_large_file(Path("large_file.txt")))Manage multiple concurrent operations:
import anyio
from claude_agent_sdk import query, ClaudeAgentOptions
class RateLimiter:
def __init__(self, max_concurrent: int):
self.semaphore = anyio.Semaphore(max_concurrent)
async def execute(self, prompt: str, options: ClaudeAgentOptions):
async with self.semaphore:
messages = []
async for msg in query(prompt=prompt, options=options):
messages.append(msg)
return messages
async def process_files_with_rate_limit(files: list[str]):
"""Process multiple files with rate limiting."""
limiter = RateLimiter(max_concurrent=3) # Max 3 concurrent requests
options = ClaudeAgentOptions(
allowed_tools=["Read"],
model="haiku" # Use cheaper model for batch processing
)
async def process_file(file: str):
print(f"Processing {file}...")
messages = await limiter.execute(
f"Analyze {file} for issues",
options
)
print(f"Completed {file}")
return (file, messages)
async with anyio.create_task_group() as tg:
results = []
for file in files:
result = await tg.start(process_file, file)
results.append(result)
return results
anyio.run(lambda: process_files_with_rate_limit(["file1.py", "file2.py", "file3.py"]))Manage conversations approaching context limits:
import anyio
from claude_agent_sdk import ClaudeSDKClient, ClaudeAgentOptions, ResultMessage
async def long_conversation_with_context_management():
"""Manage long conversations by forking when approaching limits."""
options = ClaudeAgentOptions(
betas=["context-1m-2025-08-07"], # Extended context
allowed_tools=["Read", "Write"]
)
session_history = []
async with ClaudeSDKClient(options=options) as client:
for task_num in range(20): # Many tasks
await client.query(f"Task {task_num}: Do something")
async for msg in client.receive_response():
if isinstance(msg, ResultMessage):
session_history.append(msg.session_id)
# Check usage
if msg.usage and msg.usage.get("input_tokens", 0) > 900000:
print("Approaching context limit, forking session...")
# Start new session, fork from current
await client.disconnect()
# Reconnect with fork
fork_options = ClaudeAgentOptions(
resume=msg.session_id,
fork_session=True
)
new_client = ClaudeSDKClient(options=fork_options)
await new_client.connect()
# Continue with new_client...
anyio.run(long_conversation_with_context_management)Graceful error recovery with context:
import anyio
from claude_agent_sdk import (
ClaudeSDKClient,
ClaudeAgentOptions,
ClaudeSDKError,
CLIJSONDecodeError,
MessageParseError
)
async def robust_query_handler():
"""Handle various error conditions gracefully."""
try:
async with ClaudeSDKClient() as client:
await client.query("Do something")
async for msg in client.receive_response():
print(msg)
except CLIJSONDecodeError as e:
print(f"JSON decode error: {e}")
print(f"Malformed line: {e.line}")
print("This might indicate a version mismatch. Suggestions:")
print("1. Update SDK: pip install --upgrade claude-agent-sdk")
print("2. Check CLI version compatibility")
except MessageParseError as e:
print(f"Message parse error: {e}")
print(f"Raw data: {e.data}")
print("This might indicate a protocol change")
except ProcessError as e:
print(f"Process error: {e}")
if e.exit_code == 137:
print("Process killed (likely out of memory)")
elif e.exit_code == 1:
print("Process exited with error")
if e.stderr:
print(f"Error output: {e.stderr}")
except ClaudeSDKError as e:
print(f"SDK error: {e}")
print("Check logs and configuration")
anyio.run(robust_query_handler)Load tools based on runtime conditions:
import anyio
from claude_agent_sdk import (
tool,
create_sdk_mcp_server,
ClaudeSDKClient,
ClaudeAgentOptions
)
def create_dynamic_tools(environment: str):
"""Create different tools based on environment."""
if environment == "production":
@tool("safe_operation", "Safe prod operation", {"input": str})
async def safe_op(args):
# Limited functionality for production
return {"content": [{"type": "text", "text": "Safe operation completed"}]}
return [safe_op]
else: # development
@tool("debug_operation", "Debug operation", {"input": str})
async def debug_op(args):
# More verbose for development
return {"content": [{"type": "text", "text": f"Debug: {args['input']}"}]}
@tool("experimental", "Experimental feature", {"data": dict})
async def experimental(args):
return {"content": [{"type": "text", "text": "Experimental result"}]}
return [debug_op, experimental]
async def main():
environment = "development" # or "production"
tools = create_dynamic_tools(environment)
server = create_sdk_mcp_server(f"{environment}_tools", tools=tools)
options = ClaudeAgentOptions(
mcp_servers={f"{environment}_tools": server},
allowed_tools=[f"mcp__{environment}_tools__{t.name}" for t in tools]
)
async with ClaudeSDKClient(options=options) as client:
await client.query("Perform operations")
async for msg in client.receive_response():
print(msg)
anyio.run(main)Resume or recover from incomplete sessions:
import anyio
import json
from pathlib import Path
from claude_agent_sdk import (
ClaudeSDKClient,
ClaudeAgentOptions,
ResultMessage,
ClaudeSDKError
)
async def resumable_workflow(state_file: Path):
"""Workflow that can be resumed after interruption."""
# Load previous state if exists
if state_file.exists():
with open(state_file) as f:
state = json.load(f)
print(f"Resuming from session: {state['session_id']}")
print(f"Completed tasks: {state['completed_tasks']}")
options = ClaudeAgentOptions(
resume=state["session_id"],
continue_conversation=True
)
else:
print("Starting new workflow")
state = {"completed_tasks": [], "session_id": None}
options = ClaudeAgentOptions()
tasks = ["Task A", "Task B", "Task C", "Task D"]
remaining_tasks = [t for t in tasks if t not in state["completed_tasks"]]
try:
async with ClaudeSDKClient(options=options) as client:
for task in remaining_tasks:
print(f"\nExecuting: {task}")
await client.query(f"Complete {task}")
async for msg in client.receive_response():
if isinstance(msg, ResultMessage):
state["session_id"] = msg.session_id
state["completed_tasks"].append(task)
# Save state after each task
with open(state_file, 'w') as f:
json.dump(state, f)
print(f"✅ {task} completed and saved")
print("\n✅ All tasks completed!")
state_file.unlink() # Clean up state file
except (ClaudeSDKError, KeyboardInterrupt) as e:
print(f"\n⚠️ Workflow interrupted: {e}")
print(f"Progress saved. Resume by running again.")
print(f"Completed: {state['completed_tasks']}")
anyio.run(lambda: resumable_workflow(Path("workflow_state.json")))Process batch operations with partial failures:
import anyio
from claude_agent_sdk import query, ClaudeAgentOptions, ResultMessage
async def batch_process_with_partial_failure(items: list[str]):
"""Process batch with handling of partial failures."""
results = {"success": [], "failed": []}
options = ClaudeAgentOptions(
allowed_tools=["Read", "Write"],
max_budget_usd=0.10 # Limit per item
)
for item in items:
print(f"Processing {item}...")
try:
async for msg in query(f"Process {item}", options=options):
if isinstance(msg, ResultMessage):
if msg.is_error:
results["failed"].append({
"item": item,
"error": msg.result
})
else:
results["success"].append(item)
except Exception as e:
results["failed"].append({
"item": item,
"error": str(e)
})
continue # Continue with next item
# Report results
print(f"\n{'='*60}")
print(f"Batch Processing Complete")
print(f"{'='*60}")
print(f"Successful: {len(results['success'])}/{len(items)}")
print(f"Failed: {len(results['failed'])}/{len(items)}")
if results["failed"]:
print("\nFailed items:")
for failure in results["failed"]:
print(f" - {failure['item']}: {failure['error']}")
return results
anyio.run(lambda: batch_process_with_partial_failure(["item1", "item2", "item3"]))import anyio
from claude_agent_sdk import ClaudeSDKClient, ClaudeAgentOptions
async def query_with_timeout_and_fallback(primary_task: str, fallback_task: str):
"""Try primary task with timeout, fall back to simpler task."""
primary_options = ClaudeAgentOptions(
model="opus", # More capable but slower
max_turns=20
)
fallback_options = ClaudeAgentOptions(
model="haiku", # Faster but less capable
max_turns=5
)
try:
print("Attempting primary task with opus...")
async with ClaudeSDKClient(options=primary_options) as client:
await client.query(primary_task)
# 60 second timeout
with anyio.fail_after(60):
async for msg in client.receive_response():
print(msg)
print("✅ Primary task completed")
except TimeoutError:
print("⚠️ Primary task timed out, trying fallback...")
async with ClaudeSDKClient(options=fallback_options) as client:
await client.query(fallback_task)
async for msg in client.receive_response():
print(msg)
print("✅ Fallback task completed")
anyio.run(lambda: query_with_timeout_and_fallback(
"Perform comprehensive analysis with detailed report",
"Perform quick analysis with summary"
))Install with Tessl CLI
npx tessl i tessl/pypi-claude-agent-sdk@0.1.3