pypi-openai

Description
Official Python library for the OpenAI API providing chat completions, embeddings, audio, images, and more
Author
tessl
Last updated

How to use

npx @tessl/cli registry install tessl/pypi-openai@1.106.0

batch-operations.md docs/

1
# Batch Operations
2
3
Process large volumes of requests efficiently using the batch API for cost-effective bulk operations with 24-hour processing windows.
4
5
## Capabilities
6
7
### Batch Creation and Management
8
9
Create and manage batch processing jobs for efficient bulk API operations.
10
11
```python { .api }
12
def create(
13
self,
14
*,
15
completion_window: Literal["24h"],
16
endpoint: Literal["/v1/chat/completions", "/v1/embeddings", "/v1/completions"],
17
input_file_id: str,
18
metadata: Optional[Dict[str, str]] | NotGiven = NOT_GIVEN,
19
extra_headers: Headers | None = None,
20
extra_query: Query | None = None,
21
extra_body: Body | None = None,
22
timeout: float | httpx.Timeout | None | NotGiven = NOT_GIVEN
23
) -> Batch: ...
24
25
def list(
26
self,
27
*,
28
after: str | NotGiven = NOT_GIVEN,
29
limit: int | NotGiven = NOT_GIVEN,
30
extra_headers: Headers | None = None,
31
extra_query: Query | None = None,
32
extra_body: Body | None = None,
33
timeout: float | httpx.Timeout | None | NotGiven = NOT_GIVEN
34
) -> SyncPage[Batch]: ...
35
36
def retrieve(
37
self,
38
batch_id: str,
39
*,
40
extra_headers: Headers | None = None,
41
extra_query: Query | None = None,
42
extra_body: Body | None = None,
43
timeout: float | httpx.Timeout | None | NotGiven = NOT_GIVEN
44
) -> Batch: ...
45
46
def cancel(
47
self,
48
batch_id: str,
49
*,
50
extra_headers: Headers | None = None,
51
extra_query: Query | None = None,
52
extra_body: Body | None = None,
53
timeout: float | httpx.Timeout | None | NotGiven = NOT_GIVEN
54
) -> Batch: ...
55
```
56
57
Usage examples:
58
59
```python
60
from openai import OpenAI
61
import json
62
63
client = OpenAI()
64
65
# Prepare batch requests file
66
def create_batch_requests_file(requests_data, output_file):
67
"""Create JSONL file with batch requests"""
68
69
with open(output_file, 'w') as f:
70
for i, request_data in enumerate(requests_data):
71
batch_request = {
72
"custom_id": f"request-{i}",
73
"method": "POST",
74
"url": "/v1/chat/completions",
75
"body": request_data
76
}
77
f.write(json.dumps(batch_request) + '\n')
78
79
print(f"Created batch requests file: {output_file}")
80
return output_file
81
82
# Example chat completion requests
83
chat_requests = [
84
{
85
"model": "gpt-3.5-turbo",
86
"messages": [{"role": "user", "content": "What is machine learning?"}],
87
"max_tokens": 100
88
},
89
{
90
"model": "gpt-3.5-turbo",
91
"messages": [{"role": "user", "content": "Explain neural networks"}],
92
"max_tokens": 100
93
},
94
{
95
"model": "gpt-3.5-turbo",
96
"messages": [{"role": "user", "content": "What is deep learning?"}],
97
"max_tokens": 100
98
}
99
] * 100 # 300 requests total
100
101
# Create requests file
102
requests_file = create_batch_requests_file(chat_requests, "batch_requests.jsonl")
103
104
# Upload requests file
105
with open(requests_file, "rb") as f:
106
input_file = client.files.create(
107
file=f,
108
purpose="batch"
109
)
110
111
print(f"Uploaded input file: {input_file.id}")
112
113
# Create batch job
114
batch = client.batches.create(
115
input_file_id=input_file.id,
116
endpoint="/v1/chat/completions",
117
completion_window="24h",
118
metadata={"description": "ML questions batch", "project": "education"}
119
)
120
121
print(f"Created batch: {batch.id}")
122
print(f"Status: {batch.status}")
123
print(f"Endpoint: {batch.endpoint}")
124
125
# List batches
126
recent_batches = client.batches.list(limit=10)
127
128
print("Recent batches:")
129
for batch_item in recent_batches:
130
print(f" {batch_item.id}: {batch_item.status} - {batch_item.endpoint}")
131
132
# Retrieve specific batch
133
batch_info = client.batches.retrieve(batch.id)
134
135
print(f"Batch details:")
136
print(f" ID: {batch_info.id}")
137
print(f" Status: {batch_info.status}")
138
print(f" Request counts: {batch_info.request_counts}")
139
print(f" Created: {batch_info.created_at}")
140
print(f" Metadata: {batch_info.metadata}")
141
```
142
143
### Batch Monitoring and Results
144
145
Monitor batch processing progress and retrieve results when completed.
146
147
Usage examples:
148
149
```python
150
import time
151
152
def monitor_batch_progress(batch_id: str):
153
"""Monitor batch until completion"""
154
155
print(f"Monitoring batch {batch_id}...")
156
157
while True:
158
batch = client.batches.retrieve(batch_id)
159
160
status = batch.status
161
request_counts = batch.request_counts
162
163
print(f"Status: {status}")
164
165
if request_counts:
166
total = request_counts.total
167
completed = request_counts.completed
168
failed = request_counts.failed
169
170
if total > 0:
171
progress = (completed + failed) / total * 100
172
print(f"Progress: {progress:.1f}% ({completed} completed, {failed} failed of {total})")
173
174
if status in ["completed", "failed", "cancelled", "expired"]:
175
print(f"Batch finished with status: {status}")
176
return batch
177
178
# Wait before checking again
179
time.sleep(30)
180
181
return batch
182
183
# Monitor batch
184
final_batch = monitor_batch_progress(batch.id)
185
186
# Download results when completed
187
if final_batch.status == "completed":
188
# Get output file
189
output_file_id = final_batch.output_file_id
190
191
if output_file_id:
192
# Download results
193
output_content = client.files.content(output_file_id)
194
195
# Save results file
196
with open("batch_results.jsonl", "wb") as f:
197
f.write(output_content.content)
198
199
print("Results downloaded to batch_results.jsonl")
200
201
# Process results
202
process_batch_results("batch_results.jsonl")
203
204
def process_batch_results(results_file: str):
205
"""Process and analyze batch results"""
206
207
results = []
208
errors = []
209
210
with open(results_file, 'r') as f:
211
for line in f:
212
result = json.loads(line)
213
214
if result.get('error'):
215
errors.append(result)
216
else:
217
results.append(result)
218
219
print(f"Processed {len(results)} successful results")
220
print(f"Found {len(errors)} errors")
221
222
# Analyze successful results
223
for result in results[:5]: # Show first 5
224
custom_id = result.get('custom_id')
225
response = result.get('response', {})
226
body = response.get('body', {})
227
228
if 'choices' in body:
229
content = body['choices'][0]['message']['content']
230
print(f"{custom_id}: {content[:100]}...")
231
232
# Show errors if any
233
for error in errors[:3]: # Show first 3 errors
234
custom_id = error.get('custom_id')
235
error_info = error.get('error', {})
236
print(f"Error in {custom_id}: {error_info}")
237
238
return results, errors
239
240
# Get error file if available
241
if final_batch.error_file_id:
242
error_content = client.files.content(final_batch.error_file_id)
243
244
with open("batch_errors.jsonl", "wb") as f:
245
f.write(error_content.content)
246
247
print("Error details downloaded to batch_errors.jsonl")
248
```
249
250
### Embeddings Batch Processing
251
252
Process large volumes of text embeddings efficiently using batch operations.
253
254
Usage examples:
255
256
```python
257
# Prepare embeddings batch
258
def create_embeddings_batch(texts: list, model: str = "text-embedding-3-small"):
259
"""Create embeddings batch requests"""
260
261
requests = []
262
263
for i, text in enumerate(texts):
264
request = {
265
"custom_id": f"embedding-{i}",
266
"method": "POST",
267
"url": "/v1/embeddings",
268
"body": {
269
"model": model,
270
"input": text
271
}
272
}
273
requests.append(request)
274
275
return requests
276
277
# Large text dataset for embeddings
278
texts = [
279
"Machine learning is a method of data analysis",
280
"Deep learning uses neural networks with multiple layers",
281
"Natural language processing enables computers to understand text",
282
"Computer vision allows machines to interpret visual information",
283
"Reinforcement learning learns through trial and error"
284
] * 1000 # 5000 texts
285
286
# Create embeddings batch
287
embeddings_requests = create_embeddings_batch(texts)
288
289
# Save to file
290
embeddings_file = "embeddings_batch.jsonl"
291
with open(embeddings_file, 'w') as f:
292
for request in embeddings_requests:
293
f.write(json.dumps(request) + '\n')
294
295
# Upload and create batch
296
with open(embeddings_file, "rb") as f:
297
embeddings_input_file = client.files.create(
298
file=f,
299
purpose="batch"
300
)
301
302
embeddings_batch = client.batches.create(
303
input_file_id=embeddings_input_file.id,
304
endpoint="/v1/embeddings",
305
completion_window="24h",
306
metadata={"type": "embeddings", "count": str(len(texts))}
307
)
308
309
print(f"Embeddings batch created: {embeddings_batch.id}")
310
311
# Monitor embeddings batch
312
def monitor_embeddings_batch(batch_id: str):
313
"""Monitor embeddings batch with specific processing"""
314
315
final_batch = monitor_batch_progress(batch_id)
316
317
if final_batch.status == "completed":
318
# Download embeddings
319
output_content = client.files.content(final_batch.output_file_id)
320
321
# Process embeddings results
322
embeddings_data = []
323
324
with open("temp_embeddings.jsonl", "wb") as f:
325
f.write(output_content.content)
326
327
with open("temp_embeddings.jsonl", "r") as f:
328
for line in f:
329
result = json.loads(line)
330
331
if not result.get('error'):
332
response = result['response']['body']
333
embedding = response['data'][0]['embedding']
334
custom_id = result['custom_id']
335
336
embeddings_data.append({
337
'id': custom_id,
338
'embedding': embedding,
339
'text': texts[int(custom_id.split('-')[1])]
340
})
341
342
print(f"Processed {len(embeddings_data)} embeddings")
343
344
# Save processed embeddings
345
with open("processed_embeddings.json", "w") as f:
346
json.dump(embeddings_data, f)
347
348
return embeddings_data
349
350
# Process when complete
351
# embeddings_results = monitor_embeddings_batch(embeddings_batch.id)
352
```
353
354
### Legacy Completions Batch Processing
355
356
Handle batch processing for legacy text completion models.
357
358
Usage examples:
359
360
```python
361
# Create completions batch requests
362
def create_completions_batch(prompts: list, model: str = "gpt-3.5-turbo-instruct"):
363
"""Create text completions batch requests"""
364
365
requests = []
366
367
for i, prompt in enumerate(prompts):
368
request = {
369
"custom_id": f"completion-{i}",
370
"method": "POST",
371
"url": "/v1/completions",
372
"body": {
373
"model": model,
374
"prompt": prompt,
375
"max_tokens": 150,
376
"temperature": 0.7
377
}
378
}
379
requests.append(request)
380
381
return requests
382
383
# Completion prompts
384
completion_prompts = [
385
"Once upon a time in a distant galaxy,",
386
"The future of artificial intelligence is",
387
"In the world of quantum computing,",
388
"The benefits of renewable energy include",
389
"The most important discovery in science was"
390
] * 200 # 1000 prompts
391
392
# Create completions batch
393
completions_requests = create_completions_batch(completion_prompts)
394
395
completions_file = "completions_batch.jsonl"
396
with open(completions_file, 'w') as f:
397
for request in completions_requests:
398
f.write(json.dumps(request) + '\n')
399
400
# Upload and create batch
401
with open(completions_file, "rb") as f:
402
completions_input_file = client.files.create(
403
file=f,
404
purpose="batch"
405
)
406
407
completions_batch = client.batches.create(
408
input_file_id=completions_input_file.id,
409
endpoint="/v1/completions",
410
completion_window="24h",
411
metadata={"type": "text_completions"}
412
)
413
414
print(f"Completions batch created: {completions_batch.id}")
415
```
416
417
### Advanced Batch Management
418
419
Implement advanced batch management with error handling, retry logic, and optimization.
420
421
Usage examples:
422
423
```python
424
from typing import List, Dict, Any
425
import os
426
427
class BatchManager:
428
"""Advanced batch processing management"""
429
430
def __init__(self, client):
431
self.client = client
432
self.active_batches = {}
433
434
def create_optimized_batch(self, requests: List[Dict], endpoint: str,
435
batch_size: int = 50000, **kwargs):
436
"""Create optimized batches with size limits"""
437
438
if len(requests) <= batch_size:
439
# Single batch
440
return [self._create_single_batch(requests, endpoint, **kwargs)]
441
442
# Split into multiple batches
443
batches = []
444
for i in range(0, len(requests), batch_size):
445
batch_requests = requests[i:i + batch_size]
446
batch = self._create_single_batch(
447
batch_requests,
448
endpoint,
449
suffix=f"_part_{i//batch_size + 1}",
450
**kwargs
451
)
452
batches.append(batch)
453
454
return batches
455
456
def _create_single_batch(self, requests: List[Dict], endpoint: str,
457
suffix: str = "", **kwargs):
458
"""Create single batch from requests"""
459
460
# Create temporary file
461
temp_file = f"batch_requests{suffix}.jsonl"
462
463
with open(temp_file, 'w') as f:
464
for request in requests:
465
f.write(json.dumps(request) + '\n')
466
467
# Upload file
468
with open(temp_file, "rb") as f:
469
input_file = self.client.files.create(
470
file=f,
471
purpose="batch"
472
)
473
474
# Create batch
475
batch = self.client.batches.create(
476
input_file_id=input_file.id,
477
endpoint=endpoint,
478
completion_window="24h",
479
**kwargs
480
)
481
482
# Track batch
483
self.active_batches[batch.id] = {
484
'batch': batch,
485
'temp_file': temp_file,
486
'request_count': len(requests)
487
}
488
489
# Clean up temp file
490
os.remove(temp_file)
491
492
return batch
493
494
def monitor_all_batches(self):
495
"""Monitor all active batches"""
496
497
completed_batches = []
498
499
for batch_id in list(self.active_batches.keys()):
500
batch = self.client.batches.retrieve(batch_id)
501
502
print(f"Batch {batch_id}: {batch.status}")
503
504
if batch.status in ["completed", "failed", "cancelled", "expired"]:
505
completed_batches.append(batch_id)
506
507
# Process completed batch
508
self._process_completed_batch(batch_id, batch)
509
510
# Remove completed batches
511
for batch_id in completed_batches:
512
del self.active_batches[batch_id]
513
514
return len(self.active_batches) == 0 # All done
515
516
def _process_completed_batch(self, batch_id: str, batch):
517
"""Process completed batch results"""
518
519
if batch.status == "completed" and batch.output_file_id:
520
# Download results
521
output_content = self.client.files.content(batch.output_file_id)
522
523
results_file = f"results_{batch_id}.jsonl"
524
with open(results_file, "wb") as f:
525
f.write(output_content.content)
526
527
print(f"Results saved to {results_file}")
528
529
# Process errors if available
530
if batch.error_file_id:
531
error_content = self.client.files.content(batch.error_file_id)
532
533
errors_file = f"errors_{batch_id}.jsonl"
534
with open(errors_file, "wb") as f:
535
f.write(error_content.content)
536
537
print(f"Errors saved to {errors_file}")
538
539
def cancel_all_batches(self):
540
"""Cancel all active batches"""
541
542
for batch_id in self.active_batches:
543
try:
544
self.client.batches.cancel(batch_id)
545
print(f"Cancelled batch {batch_id}")
546
except Exception as e:
547
print(f"Failed to cancel {batch_id}: {e}")
548
549
# Example usage
550
batch_manager = BatchManager(client)
551
552
# Create large batch job
553
large_requests = chat_requests * 500 # 150,000 requests
554
555
# This will automatically split into multiple batches
556
batches = batch_manager.create_optimized_batch(
557
large_requests,
558
"/v1/chat/completions",
559
metadata={"project": "large_scale_processing"}
560
)
561
562
print(f"Created {len(batches)} batches for {len(large_requests)} requests")
563
564
# Monitor progress
565
while not batch_manager.monitor_all_batches():
566
print("Waiting for batches to complete...")
567
time.sleep(60)
568
569
print("All batches completed!")
570
571
# Cost estimation
572
def estimate_batch_cost(requests: List[Dict], endpoint: str):
573
"""Estimate batch processing cost"""
574
575
# Approximate token counts and costs
576
costs = {
577
"/v1/chat/completions": {"input": 0.0015, "output": 0.002}, # per 1K tokens
578
"/v1/completions": {"input": 0.0015, "output": 0.002},
579
"/v1/embeddings": {"input": 0.0001, "output": 0}
580
}
581
582
if endpoint not in costs:
583
return "Unknown endpoint"
584
585
# Simple estimation (would need actual token counting)
586
avg_tokens_per_request = 100 # Rough estimate
587
total_tokens = len(requests) * avg_tokens_per_request
588
589
cost_per_1k = costs[endpoint]["input"]
590
estimated_cost = (total_tokens / 1000) * cost_per_1k
591
592
# 50% discount for batch API
593
batch_cost = estimated_cost * 0.5
594
595
return {
596
"requests": len(requests),
597
"estimated_tokens": total_tokens,
598
"regular_cost": estimated_cost,
599
"batch_cost": batch_cost,
600
"savings": estimated_cost - batch_cost
601
}
602
603
# Estimate cost
604
cost_estimate = estimate_batch_cost(large_requests, "/v1/chat/completions")
605
print(f"Cost estimate: {cost_estimate}")
606
```
607
608
## Types
609
610
### Core Response Types
611
612
```python { .api }
613
class Batch(BaseModel):
614
id: str
615
completion_window: str
616
created_at: int
617
endpoint: str
618
input_file_id: str
619
object: Literal["batch"]
620
status: BatchStatus
621
cancelled_at: Optional[int]
622
cancelling_at: Optional[int]
623
completed_at: Optional[int]
624
error_file_id: Optional[str]
625
errors: Optional[BatchErrors]
626
expired_at: Optional[int]
627
expires_at: Optional[int]
628
failed_at: Optional[int]
629
finalizing_at: Optional[int]
630
in_progress_at: Optional[int]
631
metadata: Optional[Dict[str, str]]
632
output_file_id: Optional[str]
633
request_counts: Optional[BatchRequestCounts]
634
635
class BatchRequestCounts(BaseModel):
636
completed: int
637
failed: int
638
total: int
639
640
class BatchErrors(BaseModel):
641
data: List[BatchError]
642
object: Literal["list"]
643
644
class BatchError(BaseModel):
645
code: Optional[str]
646
line: Optional[int]
647
message: Optional[str]
648
param: Optional[str]
649
```
650
651
### Parameter Types
652
653
```python { .api }
654
# Batch creation parameters
655
BatchCreateParams = TypedDict('BatchCreateParams', {
656
'completion_window': Required[Literal["24h"]],
657
'endpoint': Required[Literal["/v1/chat/completions", "/v1/embeddings", "/v1/completions"]],
658
'input_file_id': Required[str],
659
'metadata': NotRequired[Optional[Dict[str, str]]],
660
}, total=False)
661
662
# Batch request format
663
class BatchRequest(TypedDict, total=False):
664
custom_id: Required[str]
665
method: Required[Literal["POST"]]
666
url: Required[str]
667
body: Required[Dict[str, Any]]
668
669
# Batch response format
670
class BatchResponse(TypedDict, total=False):
671
id: str
672
custom_id: str
673
response: Optional[BatchResponseData]
674
error: Optional[BatchResponseError]
675
676
class BatchResponseData(TypedDict, total=False):
677
status_code: int
678
request_id: str
679
body: Dict[str, Any]
680
681
class BatchResponseError(TypedDict, total=False):
682
code: str
683
message: str
684
```
685
686
### Status and Configuration Types
687
688
```python { .api }
689
# Batch status enumeration
690
BatchStatus = Literal[
691
"validating",
692
"failed",
693
"in_progress",
694
"finalizing",
695
"completed",
696
"expired",
697
"cancelling",
698
"cancelled"
699
]
700
701
# Supported endpoints
702
BatchEndpoint = Literal[
703
"/v1/chat/completions",
704
"/v1/embeddings",
705
"/v1/completions"
706
]
707
708
# Completion windows
709
CompletionWindow = Literal["24h"]
710
711
# Batch limits and configuration
712
class BatchLimits:
713
max_requests_per_batch: int = 50000
714
max_file_size_mb: int = 100
715
completion_window_hours: int = 24
716
717
# Cost savings
718
discount_percentage: float = 50.0 # 50% discount vs regular API
719
720
# Rate limits
721
max_queued_batches: int = 50
722
max_total_requests: int = 2000000 # Across all batches
723
724
# Supported models by endpoint
725
supported_models = {
726
"/v1/chat/completions": [
727
"gpt-4o", "gpt-4o-mini", "gpt-4-turbo", "gpt-4",
728
"gpt-3.5-turbo", "gpt-3.5-turbo-0125"
729
],
730
"/v1/completions": [
731
"gpt-3.5-turbo-instruct", "davinci-002", "babbage-002"
732
],
733
"/v1/embeddings": [
734
"text-embedding-3-large", "text-embedding-3-small",
735
"text-embedding-ada-002"
736
]
737
}
738
```
739
740
## Best Practices
741
742
### Batch Design
743
744
- Group similar requests together for optimal processing
745
- Use descriptive custom IDs for easy result identification
746
- Keep batch sizes under the 50,000 request limit
747
- Include relevant metadata for tracking and organization
748
749
### Cost Optimization
750
751
- Use batch API for 50% cost savings on large volumes
752
- Batch requests during off-peak hours when possible
753
- Optimize request parameters to minimize token usage
754
- Monitor usage across multiple batches
755
756
### Error Handling
757
758
- Always check both output and error files
759
- Implement retry logic for failed requests
760
- Validate request format before batch submission
761
- Monitor batch status and handle timeouts appropriately
762
763
### File Management
764
765
- Clean up uploaded batch files after processing
766
- Organize results files with clear naming conventions
767
- Download and process results promptly after completion
768
- Keep backups of important batch results
769
770
### Performance Considerations
771
772
- Submit batches during periods of lower API usage
773
- Plan for 24-hour completion windows in your workflows
774
- Consider splitting very large datasets across multiple batches
775
- Use appropriate completion windows based on urgency needs
776
777
### Monitoring and Maintenance
778
779
- Implement comprehensive batch monitoring systems
780
- Set up alerts for batch completion or failures
781
- Track batch processing times and success rates
782
- Regularly review and optimize batch processing workflows