pypi-anthropic

Description
The official Python library for the anthropic API
Author
tessl
Last updated

How to use

npx @tessl/cli registry install tessl/pypi-anthropic@0.66.0

batching.md docs/

1
# Message Batching
2
3
Message batching allows efficient processing of multiple message requests in batches, providing cost optimization and throughput improvements for high-volume applications. This is ideal for bulk processing, data analysis, and scenarios where real-time responses are not required.
4
5
## Capabilities
6
7
### Batch Operations
8
9
Create, retrieve, list, and cancel message batches for asynchronous processing of multiple requests.
10
11
```python { .api }
12
def create(
13
requests: List[Dict[str, Any]],
14
**kwargs
15
) -> Any
16
17
async def create(
18
requests: List[Dict[str, Any]],
19
**kwargs
20
) -> Any
21
22
def retrieve(batch_id: str, **kwargs) -> Any
23
async def retrieve(batch_id: str, **kwargs) -> Any
24
25
def list(**kwargs) -> Any
26
async def list(**kwargs) -> Any
27
28
def cancel(batch_id: str, **kwargs) -> Any
29
async def cancel(batch_id: str, **kwargs) -> Any
30
```
31
32
## Core Types
33
34
### Batch Types
35
36
The exact types for batching are part of the beta API and may vary. The following represents the general structure:
37
38
```python { .api }
39
class BatchRequest(TypedDict):
40
custom_id: str
41
method: Literal["POST"]
42
url: str
43
body: Dict[str, Any]
44
45
class Batch(TypedDict):
46
id: str
47
type: Literal["message_batch"]
48
processing_status: str
49
request_counts: Dict[str, int]
50
ended_at: Optional[str]
51
created_at: str
52
expires_at: str
53
archived_at: Optional[str]
54
cancel_initiated_at: Optional[str]
55
results_url: Optional[str]
56
57
class BatchResponse(TypedDict):
58
custom_id: str
59
result: Optional[Dict[str, Any]]
60
error: Optional[Dict[str, Any]]
61
```
62
63
## Usage Examples
64
65
### Basic Batch Creation
66
67
```python
68
from anthropic import Anthropic
69
70
client = Anthropic()
71
72
# Prepare batch requests
73
batch_requests = [
74
{
75
"custom_id": "request-1",
76
"method": "POST",
77
"url": "/v1/messages",
78
"body": {
79
"model": "claude-sonnet-4-20250514",
80
"max_tokens": 1024,
81
"messages": [
82
{"role": "user", "content": "What is the capital of France?"}
83
]
84
}
85
},
86
{
87
"custom_id": "request-2",
88
"method": "POST",
89
"url": "/v1/messages",
90
"body": {
91
"model": "claude-sonnet-4-20250514",
92
"max_tokens": 1024,
93
"messages": [
94
{"role": "user", "content": "What is the capital of Germany?"}
95
]
96
}
97
},
98
{
99
"custom_id": "request-3",
100
"method": "POST",
101
"url": "/v1/messages",
102
"body": {
103
"model": "claude-sonnet-4-20250514",
104
"max_tokens": 1024,
105
"messages": [
106
{"role": "user", "content": "What is the capital of Italy?"}
107
]
108
}
109
}
110
]
111
112
# Create the batch
113
batch = client.messages.batches.create(requests=batch_requests)
114
115
print(f"Batch created with ID: {batch.id}")
116
print(f"Status: {batch.processing_status}")
117
```
118
119
### Monitoring Batch Progress
120
121
```python
122
import time
123
124
def wait_for_batch_completion(client: Anthropic, batch_id: str, max_wait_time: int = 300) -> Any:
125
"""Wait for a batch to complete processing"""
126
127
start_time = time.time()
128
129
while time.time() - start_time < max_wait_time:
130
batch = client.messages.batches.retrieve(batch_id)
131
132
print(f"Batch {batch_id} status: {batch.processing_status}")
133
134
if batch.processing_status in ["completed", "failed", "cancelled"]:
135
return batch
136
elif batch.processing_status == "in_progress":
137
print(f"Progress: {batch.request_counts}")
138
139
time.sleep(10) # Check every 10 seconds
140
141
raise TimeoutError(f"Batch {batch_id} did not complete within {max_wait_time} seconds")
142
143
# Usage
144
try:
145
completed_batch = wait_for_batch_completion(client, batch.id)
146
print(f"Batch completed: {completed_batch.processing_status}")
147
148
if completed_batch.results_url:
149
print(f"Results available at: {completed_batch.results_url}")
150
151
except TimeoutError as e:
152
print(f"Timeout error: {e}")
153
```
154
155
### Bulk Text Processing
156
157
```python
158
def process_documents_in_batch(documents: List[str], task: str) -> str:
159
"""Process multiple documents in a single batch"""
160
161
batch_requests = []
162
163
for i, document in enumerate(documents):
164
request = {
165
"custom_id": f"doc-{i}",
166
"method": "POST",
167
"url": "/v1/messages",
168
"body": {
169
"model": "claude-sonnet-4-20250514",
170
"max_tokens": 2048,
171
"messages": [
172
{
173
"role": "user",
174
"content": f"{task}\n\nDocument:\n{document}"
175
}
176
]
177
}
178
}
179
batch_requests.append(request)
180
181
# Create batch
182
batch = client.messages.batches.create(requests=batch_requests)
183
184
# Wait for completion
185
completed_batch = wait_for_batch_completion(client, batch.id)
186
187
return completed_batch.id
188
189
# Example usage
190
documents = [
191
"Annual report showing 15% growth in Q4...",
192
"Marketing campaign results indicate 23% increase...",
193
"Customer feedback survey reveals high satisfaction..."
194
]
195
196
batch_id = process_documents_in_batch(
197
documents,
198
"Please summarize the key points from this document in 3 bullet points."
199
)
200
201
print(f"Document summarization batch completed: {batch_id}")
202
```
203
204
### Data Analysis Batch
205
206
```python
207
def analyze_customer_feedback_batch(feedback_list: List[str]) -> str:
208
"""Analyze customer feedback in batch for sentiment and themes"""
209
210
batch_requests = []
211
212
for i, feedback in enumerate(feedback_list):
213
request = {
214
"custom_id": f"feedback-{i}",
215
"method": "POST",
216
"url": "/v1/messages",
217
"body": {
218
"model": "claude-sonnet-4-20250514",
219
"max_tokens": 512,
220
"system": "You are an expert at analyzing customer feedback. Provide sentiment (positive/negative/neutral) and key themes.",
221
"messages": [
222
{
223
"role": "user",
224
"content": f"Analyze this customer feedback:\n\n{feedback}"
225
}
226
]
227
}
228
}
229
batch_requests.append(request)
230
231
batch = client.messages.batches.create(requests=batch_requests)
232
return batch.id
233
234
# Usage
235
customer_feedback = [
236
"The product is amazing, but delivery was slow.",
237
"Great customer service, very helpful staff.",
238
"Product quality is poor, disappointed with purchase.",
239
"Fast shipping, product exactly as described."
240
]
241
242
analysis_batch_id = analyze_customer_feedback_batch(customer_feedback)
243
print(f"Feedback analysis batch started: {analysis_batch_id}")
244
```
245
246
### Batch with Different Models
247
248
```python
249
def multi_model_comparison_batch(prompt: str) -> str:
250
"""Compare responses from different models in a single batch"""
251
252
models = ["claude-haiku-3-20241022", "claude-sonnet-4-20250514"]
253
254
batch_requests = []
255
256
for model in models:
257
request = {
258
"custom_id": f"model-{model}",
259
"method": "POST",
260
"url": "/v1/messages",
261
"body": {
262
"model": model,
263
"max_tokens": 1024,
264
"messages": [
265
{"role": "user", "content": prompt}
266
]
267
}
268
}
269
batch_requests.append(request)
270
271
batch = client.messages.batches.create(requests=batch_requests)
272
return batch.id
273
274
# Usage
275
comparison_batch_id = multi_model_comparison_batch(
276
"Explain quantum computing in simple terms."
277
)
278
print(f"Model comparison batch started: {comparison_batch_id}")
279
```
280
281
### Batch Management
282
283
```python
284
class BatchManager:
285
def __init__(self, client: Anthropic):
286
self.client = client
287
288
def list_active_batches(self) -> List[Any]:
289
"""List all active batches"""
290
batches = self.client.messages.batches.list()
291
active_batches = [
292
batch for batch in batches
293
if batch.processing_status in ["validating", "in_progress"]
294
]
295
return active_batches
296
297
def cancel_batch(self, batch_id: str) -> bool:
298
"""Cancel a batch if it's still processing"""
299
try:
300
batch = self.client.messages.batches.retrieve(batch_id)
301
302
if batch.processing_status in ["validating", "in_progress"]:
303
self.client.messages.batches.cancel(batch_id)
304
print(f"Batch {batch_id} cancellation initiated")
305
return True
306
else:
307
print(f"Cannot cancel batch {batch_id} - status: {batch.processing_status}")
308
return False
309
310
except Exception as e:
311
print(f"Error cancelling batch {batch_id}: {e}")
312
return False
313
314
def get_batch_stats(self, batch_id: str) -> Dict[str, Any]:
315
"""Get detailed statistics for a batch"""
316
batch = self.client.messages.batches.retrieve(batch_id)
317
318
stats = {
319
"id": batch.id,
320
"status": batch.processing_status,
321
"created_at": batch.created_at,
322
"request_counts": batch.request_counts,
323
"total_requests": sum(batch.request_counts.values()) if batch.request_counts else 0
324
}
325
326
if batch.ended_at:
327
stats["ended_at"] = batch.ended_at
328
329
if batch.results_url:
330
stats["results_url"] = batch.results_url
331
332
return stats
333
334
# Usage
335
manager = BatchManager(client)
336
337
# List active batches
338
active_batches = manager.list_active_batches()
339
print(f"Active batches: {len(active_batches)}")
340
341
# Get stats for a specific batch
342
if active_batches:
343
batch_stats = manager.get_batch_stats(active_batches[0].id)
344
print(f"Batch stats: {batch_stats}")
345
```
346
347
### Async Batch Processing
348
349
```python
350
import asyncio
351
from anthropic import AsyncAnthropic
352
353
async def async_batch_processing():
354
client = AsyncAnthropic()
355
356
# Create batch requests
357
batch_requests = [
358
{
359
"custom_id": f"async-request-{i}",
360
"method": "POST",
361
"url": "/v1/messages",
362
"body": {
363
"model": "claude-sonnet-4-20250514",
364
"max_tokens": 512,
365
"messages": [
366
{"role": "user", "content": f"Generate a creative title for article #{i}"}
367
]
368
}
369
}
370
for i in range(5)
371
]
372
373
# Create batch
374
batch = await client.messages.batches.create(requests=batch_requests)
375
print(f"Async batch created: {batch.id}")
376
377
# Monitor progress
378
while True:
379
batch_status = await client.messages.batches.retrieve(batch.id)
380
print(f"Status: {batch_status.processing_status}")
381
382
if batch_status.processing_status in ["completed", "failed", "cancelled"]:
383
break
384
385
await asyncio.sleep(5)
386
387
return batch_status
388
389
# Run async batch
390
batch_result = asyncio.run(async_batch_processing())
391
print(f"Final batch status: {batch_result.processing_status}")
392
```
393
394
### Error Handling in Batches
395
396
```python
397
def robust_batch_creation(requests: List[Dict[str, Any]], max_retries: int = 3) -> Optional[Any]:
398
"""Create a batch with error handling and retries"""
399
400
for attempt in range(max_retries):
401
try:
402
batch = client.messages.batches.create(requests=requests)
403
print(f"Batch created successfully on attempt {attempt + 1}")
404
return batch
405
406
except Exception as e:
407
print(f"Attempt {attempt + 1} failed: {e}")
408
409
if attempt < max_retries - 1:
410
# Wait before retrying
411
time.sleep(2 ** attempt) # Exponential backoff
412
else:
413
print("All attempts failed")
414
return None
415
416
# Usage with error handling
417
batch_requests = [
418
{
419
"custom_id": "safe-request-1",
420
"method": "POST",
421
"url": "/v1/messages",
422
"body": {
423
"model": "claude-sonnet-4-20250514",
424
"max_tokens": 1024,
425
"messages": [
426
{"role": "user", "content": "Hello world"}
427
]
428
}
429
}
430
]
431
432
batch = robust_batch_creation(batch_requests)
433
if batch:
434
print(f"Batch created: {batch.id}")
435
else:
436
print("Failed to create batch after all retries")
437
```