CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-claude-agent-sdk

Python SDK for Claude Code enabling developers to build AI-powered applications and agents with support for custom tools, hooks, and bidirectional interactive conversations

Moderation error
Malicious code detected in tile.json: This tile describes a package named 'claude-agent-sdk' on PyPI, which appears to be typosquatting/impersonating official Anthropic SDK packages. The legitimate Anthropic Python SDK is 'anthropic', not 'claude-agent-sdk'. This naming pattern (claude-agent-sdk) is designed to deceive developers into installing a potentially malicious package by mimicking official Anthropic/Claude branding. This is a classic supply chain attack vector through package name confusion.
Overview
Eval results
Files

edge-cases.mddocs/examples/

Edge Cases and Advanced Scenarios

Handling unusual situations, error recovery, and complex workflows.

Handling Network Failures

Retry logic for transient failures:

import anyio
from claude_agent_sdk import (
    query,
    ClaudeSDKError,
    CLIConnectionError,
    ProcessError
)

async def query_with_retry(prompt: str, max_retries: int = 3):
    """Query with automatic retry on transient errors."""
    for attempt in range(max_retries):
        try:
            messages = []
            async for message in query(prompt=prompt):
                messages.append(message)
            return messages  # Success
            
        except (CLIConnectionError, ProcessError) as e:
            if attempt < max_retries - 1:
                wait_time = 2 ** attempt  # Exponential backoff
                print(f"Attempt {attempt + 1} failed: {e}")
                print(f"Retrying in {wait_time}s...")
                await anyio.sleep(wait_time)
            else:
                print(f"All {max_retries} attempts failed")
                raise
        
        except ClaudeSDKError:
            # Don't retry for other SDK errors
            raise

async def main():
    try:
        messages = await query_with_retry("What is 2 + 2?")
        print(f"Success! Received {len(messages)} messages")
    except ClaudeSDKError as e:
        print(f"Final error: {e}")

anyio.run(main)

Handling Large Files

Process large files incrementally:

import anyio
from pathlib import Path
from claude_agent_sdk import ClaudeSDKClient, ClaudeAgentOptions

async def process_large_file(file_path: Path):
    """Process large file in chunks to avoid memory issues."""
    
    options = ClaudeAgentOptions(
        allowed_tools=["Read"],
        max_buffer_size=1048576  # 1MB buffer limit
    )
    
    async with ClaudeSDKClient(options=options) as client:
        # Get file size
        file_size = file_path.stat().st_size
        chunk_size = 100000  # Process 100KB at a time
        
        results = []
        for offset in range(0, file_size, chunk_size):
            await client.query(f"""
            Read {file_path} from offset {offset} to {offset + chunk_size}.
            Analyze this chunk for issues.
            """)
            
            async for msg in client.receive_response():
                if isinstance(msg, AssistantMessage):
                    for block in msg.content:
                        if isinstance(block, TextBlock):
                            results.append(block.text)
        
        return results

anyio.run(lambda: process_large_file(Path("large_file.txt")))

Concurrent Sessions with Rate Limiting

Manage multiple concurrent operations:

import anyio
from claude_agent_sdk import query, ClaudeAgentOptions

class RateLimiter:
    def __init__(self, max_concurrent: int):
        self.semaphore = anyio.Semaphore(max_concurrent)
    
    async def execute(self, prompt: str, options: ClaudeAgentOptions):
        async with self.semaphore:
            messages = []
            async for msg in query(prompt=prompt, options=options):
                messages.append(msg)
            return messages

async def process_files_with_rate_limit(files: list[str]):
    """Process multiple files with rate limiting."""
    limiter = RateLimiter(max_concurrent=3)  # Max 3 concurrent requests
    
    options = ClaudeAgentOptions(
        allowed_tools=["Read"],
        model="haiku"  # Use cheaper model for batch processing
    )
    
    async def process_file(file: str):
        print(f"Processing {file}...")
        messages = await limiter.execute(
            f"Analyze {file} for issues",
            options
        )
        print(f"Completed {file}")
        return (file, messages)
    
    async with anyio.create_task_group() as tg:
        results = []
        for file in files:
            result = await tg.start(process_file, file)
            results.append(result)
    
    return results

anyio.run(lambda: process_files_with_rate_limit(["file1.py", "file2.py", "file3.py"]))

Handling Context Window Limits

Manage conversations approaching context limits:

import anyio
from claude_agent_sdk import ClaudeSDKClient, ClaudeAgentOptions, ResultMessage

async def long_conversation_with_context_management():
    """Manage long conversations by forking when approaching limits."""
    
    options = ClaudeAgentOptions(
        betas=["context-1m-2025-08-07"],  # Extended context
        allowed_tools=["Read", "Write"]
    )
    
    session_history = []
    
    async with ClaudeSDKClient(options=options) as client:
        for task_num in range(20):  # Many tasks
            await client.query(f"Task {task_num}: Do something")
            
            async for msg in client.receive_response():
                if isinstance(msg, ResultMessage):
                    session_history.append(msg.session_id)
                    
                    # Check usage
                    if msg.usage and msg.usage.get("input_tokens", 0) > 900000:
                        print("Approaching context limit, forking session...")
                        
                        # Start new session, fork from current
                        await client.disconnect()
                        
                        # Reconnect with fork
                        fork_options = ClaudeAgentOptions(
                            resume=msg.session_id,
                            fork_session=True
                        )
                        new_client = ClaudeSDKClient(options=fork_options)
                        await new_client.connect()
                        # Continue with new_client...

anyio.run(long_conversation_with_context_management)

Handling Ambiguous Errors

Graceful error recovery with context:

import anyio
from claude_agent_sdk import (
    ClaudeSDKClient,
    ClaudeAgentOptions,
    ClaudeSDKError,
    CLIJSONDecodeError,
    MessageParseError
)

async def robust_query_handler():
    """Handle various error conditions gracefully."""
    
    try:
        async with ClaudeSDKClient() as client:
            await client.query("Do something")
            async for msg in client.receive_response():
                print(msg)
                
    except CLIJSONDecodeError as e:
        print(f"JSON decode error: {e}")
        print(f"Malformed line: {e.line}")
        print("This might indicate a version mismatch. Suggestions:")
        print("1. Update SDK: pip install --upgrade claude-agent-sdk")
        print("2. Check CLI version compatibility")
        
    except MessageParseError as e:
        print(f"Message parse error: {e}")
        print(f"Raw data: {e.data}")
        print("This might indicate a protocol change")
        
    except ProcessError as e:
        print(f"Process error: {e}")
        if e.exit_code == 137:
            print("Process killed (likely out of memory)")
        elif e.exit_code == 1:
            print("Process exited with error")
        if e.stderr:
            print(f"Error output: {e.stderr}")
            
    except ClaudeSDKError as e:
        print(f"SDK error: {e}")
        print("Check logs and configuration")

anyio.run(robust_query_handler)

Dynamic Tool Loading

Load tools based on runtime conditions:

import anyio
from claude_agent_sdk import (
    tool,
    create_sdk_mcp_server,
    ClaudeSDKClient,
    ClaudeAgentOptions
)

def create_dynamic_tools(environment: str):
    """Create different tools based on environment."""
    
    if environment == "production":
        @tool("safe_operation", "Safe prod operation", {"input": str})
        async def safe_op(args):
            # Limited functionality for production
            return {"content": [{"type": "text", "text": "Safe operation completed"}]}
        
        return [safe_op]
    
    else:  # development
        @tool("debug_operation", "Debug operation", {"input": str})
        async def debug_op(args):
            # More verbose for development
            return {"content": [{"type": "text", "text": f"Debug: {args['input']}"}]}
        
        @tool("experimental", "Experimental feature", {"data": dict})
        async def experimental(args):
            return {"content": [{"type": "text", "text": "Experimental result"}]}
        
        return [debug_op, experimental]

async def main():
    environment = "development"  # or "production"
    
    tools = create_dynamic_tools(environment)
    server = create_sdk_mcp_server(f"{environment}_tools", tools=tools)
    
    options = ClaudeAgentOptions(
        mcp_servers={f"{environment}_tools": server},
        allowed_tools=[f"mcp__{environment}_tools__{t.name}" for t in tools]
    )
    
    async with ClaudeSDKClient(options=options) as client:
        await client.query("Perform operations")
        async for msg in client.receive_response():
            print(msg)

anyio.run(main)

Handling Incomplete Sessions

Resume or recover from incomplete sessions:

import anyio
import json
from pathlib import Path
from claude_agent_sdk import (
    ClaudeSDKClient,
    ClaudeAgentOptions,
    ResultMessage,
    ClaudeSDKError
)

async def resumable_workflow(state_file: Path):
    """Workflow that can be resumed after interruption."""
    
    # Load previous state if exists
    if state_file.exists():
        with open(state_file) as f:
            state = json.load(f)
        print(f"Resuming from session: {state['session_id']}")
        print(f"Completed tasks: {state['completed_tasks']}")
        
        options = ClaudeAgentOptions(
            resume=state["session_id"],
            continue_conversation=True
        )
    else:
        print("Starting new workflow")
        state = {"completed_tasks": [], "session_id": None}
        options = ClaudeAgentOptions()
    
    tasks = ["Task A", "Task B", "Task C", "Task D"]
    remaining_tasks = [t for t in tasks if t not in state["completed_tasks"]]
    
    try:
        async with ClaudeSDKClient(options=options) as client:
            for task in remaining_tasks:
                print(f"\nExecuting: {task}")
                await client.query(f"Complete {task}")
                
                async for msg in client.receive_response():
                    if isinstance(msg, ResultMessage):
                        state["session_id"] = msg.session_id
                        state["completed_tasks"].append(task)
                        
                        # Save state after each task
                        with open(state_file, 'w') as f:
                            json.dump(state, f)
                        
                        print(f"✅ {task} completed and saved")
        
        print("\n✅ All tasks completed!")
        state_file.unlink()  # Clean up state file
        
    except (ClaudeSDKError, KeyboardInterrupt) as e:
        print(f"\n⚠️  Workflow interrupted: {e}")
        print(f"Progress saved. Resume by running again.")
        print(f"Completed: {state['completed_tasks']}")

anyio.run(lambda: resumable_workflow(Path("workflow_state.json")))

Handling Mixed Success/Failure

Process batch operations with partial failures:

import anyio
from claude_agent_sdk import query, ClaudeAgentOptions, ResultMessage

async def batch_process_with_partial_failure(items: list[str]):
    """Process batch with handling of partial failures."""
    
    results = {"success": [], "failed": []}
    
    options = ClaudeAgentOptions(
        allowed_tools=["Read", "Write"],
        max_budget_usd=0.10  # Limit per item
    )
    
    for item in items:
        print(f"Processing {item}...")
        try:
            async for msg in query(f"Process {item}", options=options):
                if isinstance(msg, ResultMessage):
                    if msg.is_error:
                        results["failed"].append({
                            "item": item,
                            "error": msg.result
                        })
                    else:
                        results["success"].append(item)
        
        except Exception as e:
            results["failed"].append({
                "item": item,
                "error": str(e)
            })
            continue  # Continue with next item
    
    # Report results
    print(f"\n{'='*60}")
    print(f"Batch Processing Complete")
    print(f"{'='*60}")
    print(f"Successful: {len(results['success'])}/{len(items)}")
    print(f"Failed: {len(results['failed'])}/{len(items)}")
    
    if results["failed"]:
        print("\nFailed items:")
        for failure in results["failed"]:
            print(f"  - {failure['item']}: {failure['error']}")
    
    return results

anyio.run(lambda: batch_process_with_partial_failure(["item1", "item2", "item3"]))

Handling Timeout with Graceful Degradation

import anyio
from claude_agent_sdk import ClaudeSDKClient, ClaudeAgentOptions

async def query_with_timeout_and_fallback(primary_task: str, fallback_task: str):
    """Try primary task with timeout, fall back to simpler task."""
    
    primary_options = ClaudeAgentOptions(
        model="opus",  # More capable but slower
        max_turns=20
    )
    
    fallback_options = ClaudeAgentOptions(
        model="haiku",  # Faster but less capable
        max_turns=5
    )
    
    try:
        print("Attempting primary task with opus...")
        async with ClaudeSDKClient(options=primary_options) as client:
            await client.query(primary_task)
            
            # 60 second timeout
            with anyio.fail_after(60):
                async for msg in client.receive_response():
                    print(msg)
        
        print("✅ Primary task completed")
        
    except TimeoutError:
        print("⚠️  Primary task timed out, trying fallback...")
        
        async with ClaudeSDKClient(options=fallback_options) as client:
            await client.query(fallback_task)
            async for msg in client.receive_response():
                print(msg)
        
        print("✅ Fallback task completed")

anyio.run(lambda: query_with_timeout_and_fallback(
    "Perform comprehensive analysis with detailed report",
    "Perform quick analysis with summary"
))

Next Steps

  • Integration patterns: Integration Patterns
  • Real-world scenarios: Real-World Scenarios
  • Error handling reference: Error Handling

Install with Tessl CLI

npx tessl i tessl/pypi-claude-agent-sdk

docs

examples

edge-cases.md

integration-patterns.md

real-world-scenarios.md

index.md

tile.json