docs
0
# Batch Processing
1
2
Submit batch requests for asynchronous processing of multiple API calls. Process large volumes of requests cost-effectively with 50% discount on costs and 24-hour completion window.
3
4
## Capabilities
5
6
### Create Batch
7
8
Create a batch request for async processing.
9
10
```python { .api }
11
def create(
12
self,
13
*,
14
completion_window: str,
15
endpoint: str,
16
input_file_id: str,
17
metadata: dict[str, str] | Omit = omit,
18
output_expires_after: dict | Omit = omit,
19
extra_headers: dict[str, str] | None = None,
20
extra_query: dict[str, object] | None = None,
21
extra_body: dict[str, object] | None = None,
22
timeout: float | httpx.Timeout | None | NotGiven = NOT_GIVEN,
23
) -> Batch:
24
"""
25
Create a batch request for asynchronous processing.
26
27
Args:
28
completion_window: Time frame for completion. Currently only "24h".
29
30
endpoint: API endpoint for batch requests. Supported:
31
- "/v1/responses": Response API (structured outputs)
32
- "/v1/chat/completions": Chat completions
33
- "/v1/embeddings": Embeddings (max 50,000 inputs per batch)
34
- "/v1/completions": Text completions
35
- "/v1/moderations": Moderations
36
37
input_file_id: ID of uploaded JSONL file with requests.
38
Each line: {"custom_id": "...", "method": "POST", "url": "...", "body": {...}}
39
40
metadata: Optional key-value pairs (max 16). Keys max 64 chars, values max 512 chars.
41
42
output_expires_after: Expiration policy for output and error files.
43
Dict with keys:
44
- anchor: "created_at" (file creation time)
45
- seconds: int (3600-2592000, i.e., 1 hour to 30 days)
46
47
extra_headers: Additional HTTP headers.
48
extra_query: Additional query parameters.
49
extra_body: Additional JSON fields.
50
timeout: Request timeout in seconds.
51
52
Returns:
53
Batch: Created batch with status "validating".
54
55
Raises:
56
BadRequestError: Invalid file format or endpoint
57
NotFoundError: Input file not found
58
"""
59
```
60
61
Usage examples:
62
63
```python
64
from openai import OpenAI
65
import json
66
67
client = OpenAI()
68
69
# Create batch input file (JSONL format)
70
batch_requests = [
71
{
72
"custom_id": "request-1",
73
"method": "POST",
74
"url": "/v1/chat/completions",
75
"body": {
76
"model": "gpt-3.5-turbo",
77
"messages": [{"role": "user", "content": "What is 2+2?"}]
78
}
79
},
80
{
81
"custom_id": "request-2",
82
"method": "POST",
83
"url": "/v1/chat/completions",
84
"body": {
85
"model": "gpt-3.5-turbo",
86
"messages": [{"role": "user", "content": "What is the capital of France?"}]
87
}
88
}
89
]
90
91
# Write to JSONL file
92
with open("batch_requests.jsonl", "w") as f:
93
for request in batch_requests:
94
f.write(json.dumps(request) + "\n")
95
96
# Upload file
97
with open("batch_requests.jsonl", "rb") as f:
98
batch_file = client.files.create(file=f, purpose="batch")
99
100
# Create batch
101
batch = client.batches.create(
102
input_file_id=batch_file.id,
103
endpoint="/v1/chat/completions",
104
completion_window="24h"
105
)
106
107
print(f"Batch ID: {batch.id}")
108
print(f"Status: {batch.status}")
109
110
# With metadata
111
batch = client.batches.create(
112
input_file_id=batch_file.id,
113
endpoint="/v1/chat/completions",
114
completion_window="24h",
115
metadata={
116
"experiment": "test-run-1",
117
"dataset": "evaluation-set"
118
}
119
)
120
121
# With output file expiration (e.g., 7 days)
122
batch = client.batches.create(
123
input_file_id=batch_file.id,
124
endpoint="/v1/chat/completions",
125
completion_window="24h",
126
output_expires_after={
127
"anchor": "created_at",
128
"seconds": 604800 # 7 days
129
}
130
)
131
132
# Embeddings batch
133
embeddings_requests = [
134
{
135
"custom_id": "embed-1",
136
"method": "POST",
137
"url": "/v1/embeddings",
138
"body": {
139
"model": "text-embedding-3-small",
140
"input": "Sample text 1"
141
}
142
},
143
{
144
"custom_id": "embed-2",
145
"method": "POST",
146
"url": "/v1/embeddings",
147
"body": {
148
"model": "text-embedding-3-small",
149
"input": "Sample text 2"
150
}
151
}
152
]
153
154
with open("embed_requests.jsonl", "w") as f:
155
for request in embeddings_requests:
156
f.write(json.dumps(request) + "\n")
157
158
with open("embed_requests.jsonl", "rb") as f:
159
embed_file = client.files.create(file=f, purpose="batch")
160
161
batch = client.batches.create(
162
input_file_id=embed_file.id,
163
endpoint="/v1/embeddings",
164
completion_window="24h"
165
)
166
```
167
168
### Retrieve Batch
169
170
Get batch status and results.
171
172
```python { .api }
173
def retrieve(
174
self,
175
batch_id: str,
176
*,
177
extra_headers: dict[str, str] | None = None,
178
extra_query: dict[str, object] | None = None,
179
extra_body: dict[str, object] | None = None,
180
timeout: float | httpx.Timeout | None | NotGiven = NOT_GIVEN,
181
) -> Batch:
182
"""
183
Retrieve batch details and status.
184
185
Args:
186
batch_id: The ID of the batch.
187
extra_headers: Additional HTTP headers.
188
extra_query: Additional query parameters.
189
extra_body: Additional JSON fields.
190
timeout: Request timeout in seconds.
191
192
Returns:
193
Batch: Batch details with current status.
194
195
Raises:
196
NotFoundError: Batch not found
197
"""
198
```
199
200
Usage example:
201
202
```python
203
# Check batch status
204
batch = client.batches.retrieve("batch_abc123")
205
206
print(f"Status: {batch.status}")
207
print(f"Total: {batch.request_counts.total}")
208
print(f"Completed: {batch.request_counts.completed}")
209
print(f"Failed: {batch.request_counts.failed}")
210
211
if batch.status == "completed":
212
# Download results
213
if batch.output_file_id:
214
result_content = client.files.content(batch.output_file_id)
215
216
# Parse JSONL results
217
import json
218
results = []
219
for line in result_content.text.split("\n"):
220
if line.strip():
221
results.append(json.loads(line))
222
223
for result in results:
224
custom_id = result["custom_id"]
225
response = result["response"]
226
print(f"{custom_id}: {response}")
227
228
# Check error file if any failed
229
if batch.error_file_id:
230
error_content = client.files.content(batch.error_file_id)
231
print("Errors:", error_content.text)
232
```
233
234
### List Batches
235
236
List batch requests with pagination.
237
238
```python { .api }
239
def list(
240
self,
241
*,
242
after: str | Omit = omit,
243
limit: int | Omit = omit,
244
extra_headers: dict[str, str] | None = None,
245
extra_query: dict[str, object] | None = None,
246
extra_body: dict[str, object] | None = None,
247
timeout: float | httpx.Timeout | None | NotGiven = NOT_GIVEN,
248
) -> SyncCursorPage[Batch]:
249
"""
250
List batches with pagination.
251
252
Args:
253
after: Cursor for pagination. Return batches after this batch ID.
254
limit: Number of batches to retrieve (max 100). Default 20.
255
extra_headers: Additional HTTP headers.
256
extra_query: Additional query parameters.
257
extra_body: Additional JSON fields.
258
timeout: Request timeout in seconds.
259
260
Returns:
261
SyncCursorPage[Batch]: Paginated list of batches.
262
"""
263
```
264
265
Usage example:
266
267
```python
268
# List all batches
269
batches = client.batches.list()
270
271
for batch in batches:
272
print(f"{batch.id}: {batch.status}")
273
274
# Pagination
275
page1 = client.batches.list(limit=10)
276
page2 = client.batches.list(limit=10, after=page1.data[-1].id)
277
278
# Filter by status
279
completed_batches = [
280
b for b in client.batches.list()
281
if b.status == "completed"
282
]
283
```
284
285
### Cancel Batch
286
287
Cancel an in-progress batch.
288
289
```python { .api }
290
def cancel(
291
self,
292
batch_id: str,
293
*,
294
extra_headers: dict[str, str] | None = None,
295
extra_query: dict[str, object] | None = None,
296
extra_body: dict[str, object] | None = None,
297
timeout: float | httpx.Timeout | None | NotGiven = NOT_GIVEN,
298
) -> Batch:
299
"""
300
Cancel a batch that is in progress.
301
302
Args:
303
batch_id: The ID of the batch to cancel.
304
extra_headers: Additional HTTP headers.
305
extra_query: Additional query parameters.
306
extra_body: Additional JSON fields.
307
timeout: Request timeout in seconds.
308
309
Returns:
310
Batch: Batch with status "cancelling" or "cancelled".
311
312
Raises:
313
NotFoundError: Batch not found
314
BadRequestError: Batch not in cancellable state
315
"""
316
```
317
318
Usage example:
319
320
```python
321
# Cancel batch
322
batch = client.batches.cancel("batch_abc123")
323
324
print(f"Status: {batch.status}") # "cancelling" or "cancelled"
325
```
326
327
## Types
328
329
```python { .api }
330
from typing import Literal
331
from pydantic import BaseModel
332
333
class Batch(BaseModel):
334
"""Batch request."""
335
id: str
336
completion_window: str
337
created_at: int
338
endpoint: str
339
input_file_id: str
340
object: Literal["batch"]
341
status: Literal[
342
"validating", "failed", "in_progress",
343
"finalizing", "completed", "expired", "cancelling", "cancelled"
344
]
345
cancelled_at: int | None
346
cancelling_at: int | None
347
completed_at: int | None
348
error_file_id: str | None
349
errors: dict | None
350
expired_at: int | None
351
expires_at: int | None
352
failed_at: int | None
353
finalizing_at: int | None
354
in_progress_at: int | None
355
metadata: dict[str, str] | None
356
output_file_id: str | None
357
request_counts: BatchRequestCounts
358
359
class BatchRequestCounts(BaseModel):
360
"""Request count statistics."""
361
completed: int
362
failed: int
363
total: int
364
365
class OutputExpiresAfter(BaseModel):
366
"""Expiration policy for batch output files."""
367
anchor: Literal["created_at"] # File creation time anchor
368
seconds: int # Expiration time in seconds (3600-2592000)
369
370
# Pagination
371
class SyncCursorPage[T](BaseModel):
372
data: list[T]
373
object: str
374
first_id: str | None
375
last_id: str | None
376
has_more: bool
377
def __iter__(self) -> Iterator[T]: ...
378
```
379
380
## Input File Format
381
382
Batch input JSONL format:
383
384
```jsonl
385
{"custom_id": "request-1", "method": "POST", "url": "/v1/chat/completions", "body": {"model": "gpt-3.5-turbo", "messages": [{"role": "user", "content": "Hello!"}]}}
386
{"custom_id": "request-2", "method": "POST", "url": "/v1/embeddings", "body": {"model": "text-embedding-3-small", "input": "Hello world"}}
387
```
388
389
## Output File Format
390
391
Batch output JSONL format:
392
393
```jsonl
394
{"id": "batch_req_abc", "custom_id": "request-1", "response": {"status_code": 200, "request_id": "req_xyz", "body": {"id": "chatcmpl-123", "object": "chat.completion", ...}}, "error": null}
395
{"id": "batch_req_def", "custom_id": "request-2", "response": {"status_code": 200, "request_id": "req_uvw", "body": {"object": "list", "data": [...]}}, "error": null}
396
```
397
398
## Best Practices
399
400
```python
401
from openai import OpenAI
402
import json
403
import time
404
405
client = OpenAI()
406
407
# 1. Monitor batch progress
408
def wait_for_batch(batch_id: str, poll_interval: int = 60):
409
"""Wait for batch to complete."""
410
while True:
411
batch = client.batches.retrieve(batch_id)
412
413
if batch.status in ["completed", "failed", "cancelled", "expired"]:
414
return batch
415
416
print(f"Status: {batch.status}")
417
print(f"Progress: {batch.request_counts.completed}/{batch.request_counts.total}")
418
time.sleep(poll_interval)
419
420
# 2. Process results efficiently
421
def process_batch_results(batch_id: str):
422
"""Download and process batch results."""
423
batch = client.batches.retrieve(batch_id)
424
425
if batch.status != "completed":
426
raise Exception(f"Batch not completed: {batch.status}")
427
428
# Download results
429
result_content = client.files.content(batch.output_file_id)
430
431
results = {}
432
for line in result_content.text.split("\n"):
433
if line.strip():
434
result = json.loads(line)
435
custom_id = result["custom_id"]
436
response = result["response"]["body"]
437
results[custom_id] = response
438
439
return results
440
441
# 3. Handle errors
442
def get_batch_errors(batch_id: str):
443
"""Get failed requests from batch."""
444
batch = client.batches.retrieve(batch_id)
445
446
if not batch.error_file_id:
447
return []
448
449
error_content = client.files.content(batch.error_file_id)
450
451
errors = []
452
for line in error_content.text.split("\n"):
453
if line.strip():
454
errors.append(json.loads(line))
455
456
return errors
457
458
# 4. Retry failed requests
459
def retry_failed_requests(batch_id: str):
460
"""Create new batch with failed requests."""
461
errors = get_batch_errors(batch_id)
462
463
if not errors:
464
return None
465
466
# Create retry input file
467
with open("retry_requests.jsonl", "w") as f:
468
for error in errors:
469
# Reconstruct request from error
470
request = {
471
"custom_id": error["custom_id"],
472
"method": error["method"],
473
"url": error["url"],
474
"body": error["body"]
475
}
476
f.write(json.dumps(request) + "\n")
477
478
# Upload and create new batch
479
with open("retry_requests.jsonl", "rb") as f:
480
retry_file = client.files.create(file=f, purpose="batch")
481
482
return client.batches.create(
483
input_file_id=retry_file.id,
484
endpoint=errors[0]["url"],
485
completion_window="24h",
486
metadata={"retry_of": batch_id}
487
)
488
489
# Complete workflow
490
# 1. Create batch
491
batch = create_batch()
492
493
# 2. Wait for completion
494
completed_batch = wait_for_batch(batch.id)
495
496
# 3. Process results
497
if completed_batch.status == "completed":
498
results = process_batch_results(batch.id)
499
print(f"Processed {len(results)} results")
500
501
# Check for failures
502
if completed_batch.request_counts.failed > 0:
503
retry_batch = retry_failed_requests(batch.id)
504
print(f"Created retry batch: {retry_batch.id}")
505
```
506
507
## Async Usage
508
509
```python
510
import asyncio
511
from openai import AsyncOpenAI
512
513
async def create_batch():
514
client = AsyncOpenAI()
515
516
batch = await client.batches.create(
517
input_file_id="file-abc123",
518
endpoint="/v1/chat/completions",
519
completion_window="24h"
520
)
521
522
return batch.id
523
524
batch_id = asyncio.run(create_batch())
525
```
526