The official Python library for the anthropic API
—
—
Does it follow best practices?
Impact
—
No eval scenarios have been run
—
The risk profile of this skill
This plugin was archived by the owner on Jun 3, 2026
Reason: Retiring all tiles created prior to the transition to plugin support
Message batching allows efficient processing of multiple message requests in batches, providing cost optimization and throughput improvements for high-volume applications. This is ideal for bulk processing, data analysis, and scenarios where real-time responses are not required.
Create, retrieve, list, and cancel message batches for asynchronous processing of multiple requests.
def create(
requests: List[Dict[str, Any]],
**kwargs
) -> Any
async def create(
requests: List[Dict[str, Any]],
**kwargs
) -> Any
def retrieve(batch_id: str, **kwargs) -> Any
async def retrieve(batch_id: str, **kwargs) -> Any
def list(**kwargs) -> Any
async def list(**kwargs) -> Any
def cancel(batch_id: str, **kwargs) -> Any
async def cancel(batch_id: str, **kwargs) -> AnyThe exact types for batching are part of the beta API and may vary. The following represents the general structure:
class BatchRequest(TypedDict):
custom_id: str
method: Literal["POST"]
url: str
body: Dict[str, Any]
class Batch(TypedDict):
id: str
type: Literal["message_batch"]
processing_status: str
request_counts: Dict[str, int]
ended_at: Optional[str]
created_at: str
expires_at: str
archived_at: Optional[str]
cancel_initiated_at: Optional[str]
results_url: Optional[str]
class BatchResponse(TypedDict):
custom_id: str
result: Optional[Dict[str, Any]]
error: Optional[Dict[str, Any]]from anthropic import Anthropic
client = Anthropic()
# Prepare batch requests
batch_requests = [
{
"custom_id": "request-1",
"method": "POST",
"url": "/v1/messages",
"body": {
"model": "claude-sonnet-4-20250514",
"max_tokens": 1024,
"messages": [
{"role": "user", "content": "What is the capital of France?"}
]
}
},
{
"custom_id": "request-2",
"method": "POST",
"url": "/v1/messages",
"body": {
"model": "claude-sonnet-4-20250514",
"max_tokens": 1024,
"messages": [
{"role": "user", "content": "What is the capital of Germany?"}
]
}
},
{
"custom_id": "request-3",
"method": "POST",
"url": "/v1/messages",
"body": {
"model": "claude-sonnet-4-20250514",
"max_tokens": 1024,
"messages": [
{"role": "user", "content": "What is the capital of Italy?"}
]
}
}
]
# Create the batch
batch = client.messages.batches.create(requests=batch_requests)
print(f"Batch created with ID: {batch.id}")
print(f"Status: {batch.processing_status}")import time
def wait_for_batch_completion(client: Anthropic, batch_id: str, max_wait_time: int = 300) -> Any:
"""Wait for a batch to complete processing"""
start_time = time.time()
while time.time() - start_time < max_wait_time:
batch = client.messages.batches.retrieve(batch_id)
print(f"Batch {batch_id} status: {batch.processing_status}")
if batch.processing_status in ["completed", "failed", "cancelled"]:
return batch
elif batch.processing_status == "in_progress":
print(f"Progress: {batch.request_counts}")
time.sleep(10) # Check every 10 seconds
raise TimeoutError(f"Batch {batch_id} did not complete within {max_wait_time} seconds")
# Usage
try:
completed_batch = wait_for_batch_completion(client, batch.id)
print(f"Batch completed: {completed_batch.processing_status}")
if completed_batch.results_url:
print(f"Results available at: {completed_batch.results_url}")
except TimeoutError as e:
print(f"Timeout error: {e}")def process_documents_in_batch(documents: List[str], task: str) -> str:
"""Process multiple documents in a single batch"""
batch_requests = []
for i, document in enumerate(documents):
request = {
"custom_id": f"doc-{i}",
"method": "POST",
"url": "/v1/messages",
"body": {
"model": "claude-sonnet-4-20250514",
"max_tokens": 2048,
"messages": [
{
"role": "user",
"content": f"{task}\n\nDocument:\n{document}"
}
]
}
}
batch_requests.append(request)
# Create batch
batch = client.messages.batches.create(requests=batch_requests)
# Wait for completion
completed_batch = wait_for_batch_completion(client, batch.id)
return completed_batch.id
# Example usage
documents = [
"Annual report showing 15% growth in Q4...",
"Marketing campaign results indicate 23% increase...",
"Customer feedback survey reveals high satisfaction..."
]
batch_id = process_documents_in_batch(
documents,
"Please summarize the key points from this document in 3 bullet points."
)
print(f"Document summarization batch completed: {batch_id}")def analyze_customer_feedback_batch(feedback_list: List[str]) -> str:
"""Analyze customer feedback in batch for sentiment and themes"""
batch_requests = []
for i, feedback in enumerate(feedback_list):
request = {
"custom_id": f"feedback-{i}",
"method": "POST",
"url": "/v1/messages",
"body": {
"model": "claude-sonnet-4-20250514",
"max_tokens": 512,
"system": "You are an expert at analyzing customer feedback. Provide sentiment (positive/negative/neutral) and key themes.",
"messages": [
{
"role": "user",
"content": f"Analyze this customer feedback:\n\n{feedback}"
}
]
}
}
batch_requests.append(request)
batch = client.messages.batches.create(requests=batch_requests)
return batch.id
# Usage
customer_feedback = [
"The product is amazing, but delivery was slow.",
"Great customer service, very helpful staff.",
"Product quality is poor, disappointed with purchase.",
"Fast shipping, product exactly as described."
]
analysis_batch_id = analyze_customer_feedback_batch(customer_feedback)
print(f"Feedback analysis batch started: {analysis_batch_id}")def multi_model_comparison_batch(prompt: str) -> str:
"""Compare responses from different models in a single batch"""
models = ["claude-haiku-3-20241022", "claude-sonnet-4-20250514"]
batch_requests = []
for model in models:
request = {
"custom_id": f"model-{model}",
"method": "POST",
"url": "/v1/messages",
"body": {
"model": model,
"max_tokens": 1024,
"messages": [
{"role": "user", "content": prompt}
]
}
}
batch_requests.append(request)
batch = client.messages.batches.create(requests=batch_requests)
return batch.id
# Usage
comparison_batch_id = multi_model_comparison_batch(
"Explain quantum computing in simple terms."
)
print(f"Model comparison batch started: {comparison_batch_id}")class BatchManager:
def __init__(self, client: Anthropic):
self.client = client
def list_active_batches(self) -> List[Any]:
"""List all active batches"""
batches = self.client.messages.batches.list()
active_batches = [
batch for batch in batches
if batch.processing_status in ["validating", "in_progress"]
]
return active_batches
def cancel_batch(self, batch_id: str) -> bool:
"""Cancel a batch if it's still processing"""
try:
batch = self.client.messages.batches.retrieve(batch_id)
if batch.processing_status in ["validating", "in_progress"]:
self.client.messages.batches.cancel(batch_id)
print(f"Batch {batch_id} cancellation initiated")
return True
else:
print(f"Cannot cancel batch {batch_id} - status: {batch.processing_status}")
return False
except Exception as e:
print(f"Error cancelling batch {batch_id}: {e}")
return False
def get_batch_stats(self, batch_id: str) -> Dict[str, Any]:
"""Get detailed statistics for a batch"""
batch = self.client.messages.batches.retrieve(batch_id)
stats = {
"id": batch.id,
"status": batch.processing_status,
"created_at": batch.created_at,
"request_counts": batch.request_counts,
"total_requests": sum(batch.request_counts.values()) if batch.request_counts else 0
}
if batch.ended_at:
stats["ended_at"] = batch.ended_at
if batch.results_url:
stats["results_url"] = batch.results_url
return stats
# Usage
manager = BatchManager(client)
# List active batches
active_batches = manager.list_active_batches()
print(f"Active batches: {len(active_batches)}")
# Get stats for a specific batch
if active_batches:
batch_stats = manager.get_batch_stats(active_batches[0].id)
print(f"Batch stats: {batch_stats}")import asyncio
from anthropic import AsyncAnthropic
async def async_batch_processing():
client = AsyncAnthropic()
# Create batch requests
batch_requests = [
{
"custom_id": f"async-request-{i}",
"method": "POST",
"url": "/v1/messages",
"body": {
"model": "claude-sonnet-4-20250514",
"max_tokens": 512,
"messages": [
{"role": "user", "content": f"Generate a creative title for article #{i}"}
]
}
}
for i in range(5)
]
# Create batch
batch = await client.messages.batches.create(requests=batch_requests)
print(f"Async batch created: {batch.id}")
# Monitor progress
while True:
batch_status = await client.messages.batches.retrieve(batch.id)
print(f"Status: {batch_status.processing_status}")
if batch_status.processing_status in ["completed", "failed", "cancelled"]:
break
await asyncio.sleep(5)
return batch_status
# Run async batch
batch_result = asyncio.run(async_batch_processing())
print(f"Final batch status: {batch_result.processing_status}")def robust_batch_creation(requests: List[Dict[str, Any]], max_retries: int = 3) -> Optional[Any]:
"""Create a batch with error handling and retries"""
for attempt in range(max_retries):
try:
batch = client.messages.batches.create(requests=requests)
print(f"Batch created successfully on attempt {attempt + 1}")
return batch
except Exception as e:
print(f"Attempt {attempt + 1} failed: {e}")
if attempt < max_retries - 1:
# Wait before retrying
time.sleep(2 ** attempt) # Exponential backoff
else:
print("All attempts failed")
return None
# Usage with error handling
batch_requests = [
{
"custom_id": "safe-request-1",
"method": "POST",
"url": "/v1/messages",
"body": {
"model": "claude-sonnet-4-20250514",
"max_tokens": 1024,
"messages": [
{"role": "user", "content": "Hello world"}
]
}
}
]
batch = robust_batch_creation(batch_requests)
if batch:
print(f"Batch created: {batch.id}")
else:
print("Failed to create batch after all retries")