The official Python library for the anthropic API
—
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Pending
The risk profile of this skill
Message batching allows efficient processing of multiple message requests in batches, providing cost optimization and throughput improvements for high-volume applications. This is ideal for bulk processing, data analysis, and scenarios where real-time responses are not required.
Create, retrieve, list, and cancel message batches for asynchronous processing of multiple requests.
def create(
requests: List[Dict[str, Any]],
**kwargs
) -> Any
async def create(
requests: List[Dict[str, Any]],
**kwargs
) -> Any
def retrieve(batch_id: str, **kwargs) -> Any
async def retrieve(batch_id: str, **kwargs) -> Any
def list(**kwargs) -> Any
async def list(**kwargs) -> Any
def cancel(batch_id: str, **kwargs) -> Any
async def cancel(batch_id: str, **kwargs) -> AnyThe exact types for batching are part of the beta API and may vary. The following represents the general structure:
class BatchRequest(TypedDict):
custom_id: str
method: Literal["POST"]
url: str
body: Dict[str, Any]
class Batch(TypedDict):
id: str
type: Literal["message_batch"]
processing_status: str
request_counts: Dict[str, int]
ended_at: Optional[str]
created_at: str
expires_at: str
archived_at: Optional[str]
cancel_initiated_at: Optional[str]
results_url: Optional[str]
class BatchResponse(TypedDict):
custom_id: str
result: Optional[Dict[str, Any]]
error: Optional[Dict[str, Any]]from anthropic import Anthropic
client = Anthropic()
# Prepare batch requests
batch_requests = [
{
"custom_id": "request-1",
"method": "POST",
"url": "/v1/messages",
"body": {
"model": "claude-sonnet-4-20250514",
"max_tokens": 1024,
"messages": [
{"role": "user", "content": "What is the capital of France?"}
]
}
},
{
"custom_id": "request-2",
"method": "POST",
"url": "/v1/messages",
"body": {
"model": "claude-sonnet-4-20250514",
"max_tokens": 1024,
"messages": [
{"role": "user", "content": "What is the capital of Germany?"}
]
}
},
{
"custom_id": "request-3",
"method": "POST",
"url": "/v1/messages",
"body": {
"model": "claude-sonnet-4-20250514",
"max_tokens": 1024,
"messages": [
{"role": "user", "content": "What is the capital of Italy?"}
]
}
}
]
# Create the batch
batch = client.messages.batches.create(requests=batch_requests)
print(f"Batch created with ID: {batch.id}")
print(f"Status: {batch.processing_status}")import time
def wait_for_batch_completion(client: Anthropic, batch_id: str, max_wait_time: int = 300) -> Any:
"""Wait for a batch to complete processing"""
start_time = time.time()
while time.time() - start_time < max_wait_time:
batch = client.messages.batches.retrieve(batch_id)
print(f"Batch {batch_id} status: {batch.processing_status}")
if batch.processing_status in ["completed", "failed", "cancelled"]:
return batch
elif batch.processing_status == "in_progress":
print(f"Progress: {batch.request_counts}")
time.sleep(10) # Check every 10 seconds
raise TimeoutError(f"Batch {batch_id} did not complete within {max_wait_time} seconds")
# Usage
try:
completed_batch = wait_for_batch_completion(client, batch.id)
print(f"Batch completed: {completed_batch.processing_status}")
if completed_batch.results_url:
print(f"Results available at: {completed_batch.results_url}")
except TimeoutError as e:
print(f"Timeout error: {e}")def process_documents_in_batch(documents: List[str], task: str) -> str:
"""Process multiple documents in a single batch"""
batch_requests = []
for i, document in enumerate(documents):
request = {
"custom_id": f"doc-{i}",
"method": "POST",
"url": "/v1/messages",
"body": {
"model": "claude-sonnet-4-20250514",
"max_tokens": 2048,
"messages": [
{
"role": "user",
"content": f"{task}\n\nDocument:\n{document}"
}
]
}
}
batch_requests.append(request)
# Create batch
batch = client.messages.batches.create(requests=batch_requests)
# Wait for completion
completed_batch = wait_for_batch_completion(client, batch.id)
return completed_batch.id
# Example usage
documents = [
"Annual report showing 15% growth in Q4...",
"Marketing campaign results indicate 23% increase...",
"Customer feedback survey reveals high satisfaction..."
]
batch_id = process_documents_in_batch(
documents,
"Please summarize the key points from this document in 3 bullet points."
)
print(f"Document summarization batch completed: {batch_id}")def analyze_customer_feedback_batch(feedback_list: List[str]) -> str:
"""Analyze customer feedback in batch for sentiment and themes"""
batch_requests = []
for i, feedback in enumerate(feedback_list):
request = {
"custom_id": f"feedback-{i}",
"method": "POST",
"url": "/v1/messages",
"body": {
"model": "claude-sonnet-4-20250514",
"max_tokens": 512,
"system": "You are an expert at analyzing customer feedback. Provide sentiment (positive/negative/neutral) and key themes.",
"messages": [
{
"role": "user",
"content": f"Analyze this customer feedback:\n\n{feedback}"
}
]
}
}
batch_requests.append(request)
batch = client.messages.batches.create(requests=batch_requests)
return batch.id
# Usage
customer_feedback = [
"The product is amazing, but delivery was slow.",
"Great customer service, very helpful staff.",
"Product quality is poor, disappointed with purchase.",
"Fast shipping, product exactly as described."
]
analysis_batch_id = analyze_customer_feedback_batch(customer_feedback)
print(f"Feedback analysis batch started: {analysis_batch_id}")def multi_model_comparison_batch(prompt: str) -> str:
"""Compare responses from different models in a single batch"""
models = ["claude-haiku-3-20241022", "claude-sonnet-4-20250514"]
batch_requests = []
for model in models:
request = {
"custom_id": f"model-{model}",
"method": "POST",
"url": "/v1/messages",
"body": {
"model": model,
"max_tokens": 1024,
"messages": [
{"role": "user", "content": prompt}
]
}
}
batch_requests.append(request)
batch = client.messages.batches.create(requests=batch_requests)
return batch.id
# Usage
comparison_batch_id = multi_model_comparison_batch(
"Explain quantum computing in simple terms."
)
print(f"Model comparison batch started: {comparison_batch_id}")class BatchManager:
def __init__(self, client: Anthropic):
self.client = client
def list_active_batches(self) -> List[Any]:
"""List all active batches"""
batches = self.client.messages.batches.list()
active_batches = [
batch for batch in batches
if batch.processing_status in ["validating", "in_progress"]
]
return active_batches
def cancel_batch(self, batch_id: str) -> bool:
"""Cancel a batch if it's still processing"""
try:
batch = self.client.messages.batches.retrieve(batch_id)
if batch.processing_status in ["validating", "in_progress"]:
self.client.messages.batches.cancel(batch_id)
print(f"Batch {batch_id} cancellation initiated")
return True
else:
print(f"Cannot cancel batch {batch_id} - status: {batch.processing_status}")
return False
except Exception as e:
print(f"Error cancelling batch {batch_id}: {e}")
return False
def get_batch_stats(self, batch_id: str) -> Dict[str, Any]:
"""Get detailed statistics for a batch"""
batch = self.client.messages.batches.retrieve(batch_id)
stats = {
"id": batch.id,
"status": batch.processing_status,
"created_at": batch.created_at,
"request_counts": batch.request_counts,
"total_requests": sum(batch.request_counts.values()) if batch.request_counts else 0
}
if batch.ended_at:
stats["ended_at"] = batch.ended_at
if batch.results_url:
stats["results_url"] = batch.results_url
return stats
# Usage
manager = BatchManager(client)
# List active batches
active_batches = manager.list_active_batches()
print(f"Active batches: {len(active_batches)}")
# Get stats for a specific batch
if active_batches:
batch_stats = manager.get_batch_stats(active_batches[0].id)
print(f"Batch stats: {batch_stats}")import asyncio
from anthropic import AsyncAnthropic
async def async_batch_processing():
client = AsyncAnthropic()
# Create batch requests
batch_requests = [
{
"custom_id": f"async-request-{i}",
"method": "POST",
"url": "/v1/messages",
"body": {
"model": "claude-sonnet-4-20250514",
"max_tokens": 512,
"messages": [
{"role": "user", "content": f"Generate a creative title for article #{i}"}
]
}
}
for i in range(5)
]
# Create batch
batch = await client.messages.batches.create(requests=batch_requests)
print(f"Async batch created: {batch.id}")
# Monitor progress
while True:
batch_status = await client.messages.batches.retrieve(batch.id)
print(f"Status: {batch_status.processing_status}")
if batch_status.processing_status in ["completed", "failed", "cancelled"]:
break
await asyncio.sleep(5)
return batch_status
# Run async batch
batch_result = asyncio.run(async_batch_processing())
print(f"Final batch status: {batch_result.processing_status}")def robust_batch_creation(requests: List[Dict[str, Any]], max_retries: int = 3) -> Optional[Any]:
"""Create a batch with error handling and retries"""
for attempt in range(max_retries):
try:
batch = client.messages.batches.create(requests=requests)
print(f"Batch created successfully on attempt {attempt + 1}")
return batch
except Exception as e:
print(f"Attempt {attempt + 1} failed: {e}")
if attempt < max_retries - 1:
# Wait before retrying
time.sleep(2 ** attempt) # Exponential backoff
else:
print("All attempts failed")
return None
# Usage with error handling
batch_requests = [
{
"custom_id": "safe-request-1",
"method": "POST",
"url": "/v1/messages",
"body": {
"model": "claude-sonnet-4-20250514",
"max_tokens": 1024,
"messages": [
{"role": "user", "content": "Hello world"}
]
}
}
]
batch = robust_batch_creation(batch_requests)
if batch:
print(f"Batch created: {batch.id}")
else:
print("Failed to create batch after all retries")