0
# Batch Processing
1
2
Submit batch prediction jobs for high-volume inference with cost savings. Batch processing allows you to process large numbers of requests asynchronously at reduced costs compared to online inference, ideal for processing datasets, bulk content generation, and embeddings.
3
4
## Capabilities
5
6
### Create Batch Job
7
8
Create a batch prediction job for content generation. Processes requests from Cloud Storage or BigQuery and writes results back.
9
10
```python { .api }
11
class Batches:
12
"""Synchronous batch prediction jobs API."""
13
14
def create(
15
self,
16
*,
17
model: str,
18
src: Union[str, list[dict]],
19
dest: Optional[str] = None,
20
config: Optional[CreateBatchJobConfig] = None
21
) -> BatchJob:
22
"""
23
Create batch prediction job for content generation.
24
25
Parameters:
26
model (str): Model identifier (e.g., 'gemini-2.0-flash').
27
src (Union[str, list[dict]]): Source of requests. Can be:
28
- str: GCS URI ('gs://bucket/input.jsonl') or BigQuery table
29
('bq://project.dataset.table')
30
- list[dict]: List of request dictionaries for inline requests
31
dest (str, optional): Destination for results. GCS URI ('gs://bucket/output')
32
or BigQuery table ('bq://project.dataset.table'). Required for GCS/BigQuery
33
sources, optional for inline requests.
34
config (CreateBatchJobConfig, optional): Job configuration.
35
36
Returns:
37
BatchJob: Created batch job with name and status.
38
39
Raises:
40
ClientError: For client errors (4xx status codes)
41
ServerError: For server errors (5xx status codes)
42
"""
43
...
44
45
class AsyncBatches:
46
"""Asynchronous batch prediction jobs API."""
47
48
async def create(
49
self,
50
*,
51
model: str,
52
src: Union[str, list[dict]],
53
dest: Optional[str] = None,
54
config: Optional[CreateBatchJobConfig] = None
55
) -> BatchJob:
56
"""Async version of create."""
57
...
58
```
59
60
**Usage Example - GCS:**
61
62
```python
63
from google.genai import Client
64
65
client = Client(vertexai=True, project='PROJECT_ID', location='us-central1')
66
67
# Create batch job from GCS
68
job = client.batches.create(
69
model='gemini-2.0-flash',
70
src='gs://my-bucket/requests.jsonl',
71
dest='gs://my-bucket/results/'
72
)
73
74
print(f"Job created: {job.name}")
75
print(f"State: {job.state}")
76
77
# Poll for completion
78
import time
79
while job.state in ['JOB_STATE_PENDING', 'JOB_STATE_RUNNING']:
80
time.sleep(60)
81
job = client.batches.get(name=job.name)
82
print(f"State: {job.state}")
83
84
print(f"Final state: {job.state}")
85
if job.state == 'JOB_STATE_SUCCEEDED':
86
print(f"Processed: {job.metadata.completed_requests} requests")
87
```
88
89
**Usage Example - Inline Requests:**
90
91
```python
92
from google.genai import Client
93
94
client = Client(vertexai=True, project='PROJECT_ID', location='us-central1')
95
96
# Create batch job with inline requests
97
requests = [
98
{'contents': 'What is AI?'},
99
{'contents': 'Explain machine learning'},
100
{'contents': 'What is deep learning?'}
101
]
102
103
job = client.batches.create(
104
model='gemini-2.0-flash',
105
src=requests,
106
dest='gs://my-bucket/inline-results/'
107
)
108
109
print(f"Job: {job.name}")
110
```
111
112
### Create Embeddings Batch Job
113
114
Create a batch job specifically for embedding generation, optimized for generating embeddings for large datasets.
115
116
```python { .api }
117
class Batches:
118
"""Synchronous batch prediction jobs API."""
119
120
def create_embeddings(
121
self,
122
*,
123
model: str,
124
src: Union[str, list[dict]],
125
dest: Optional[str] = None,
126
config: Optional[CreateBatchJobConfig] = None
127
) -> BatchJob:
128
"""
129
Create batch embeddings job.
130
131
Parameters:
132
model (str): Embedding model (e.g., 'text-embedding-004').
133
src (Union[str, list[dict]]): Source of embedding requests.
134
dest (str, optional): Destination for embeddings.
135
config (CreateBatchJobConfig, optional): Job configuration.
136
137
Returns:
138
BatchJob: Created batch embeddings job.
139
"""
140
...
141
142
class AsyncBatches:
143
"""Asynchronous batch prediction jobs API."""
144
145
async def create_embeddings(
146
self,
147
*,
148
model: str,
149
src: Union[str, list[dict]],
150
dest: Optional[str] = None,
151
config: Optional[CreateBatchJobConfig] = None
152
) -> BatchJob:
153
"""Async version of create_embeddings."""
154
...
155
```
156
157
**Usage Example:**
158
159
```python
160
from google.genai import Client
161
162
client = Client(vertexai=True, project='PROJECT_ID', location='us-central1')
163
164
job = client.batches.create_embeddings(
165
model='text-embedding-004',
166
src='gs://my-bucket/texts.jsonl',
167
dest='gs://my-bucket/embeddings/'
168
)
169
170
print(f"Embeddings job: {job.name}")
171
```
172
173
### Get Batch Job
174
175
Retrieve information about a batch job including status and progress.
176
177
```python { .api }
178
class Batches:
179
"""Synchronous batch prediction jobs API."""
180
181
def get(self, *, name: str) -> BatchJob:
182
"""
183
Get batch job status and information.
184
185
Parameters:
186
name (str): Job name in format 'projects/*/locations/*/batchPredictionJobs/*'.
187
188
Returns:
189
BatchJob: Job information including state, progress, and metadata.
190
"""
191
...
192
193
class AsyncBatches:
194
"""Asynchronous batch prediction jobs API."""
195
196
async def get(self, *, name: str) -> BatchJob:
197
"""Async version of get."""
198
...
199
```
200
201
### Cancel Batch Job
202
203
Cancel a running batch job.
204
205
```python { .api }
206
class Batches:
207
"""Synchronous batch prediction jobs API."""
208
209
def cancel(self, *, name: str) -> None:
210
"""
211
Cancel a batch job.
212
213
Parameters:
214
name (str): Job name.
215
"""
216
...
217
218
class AsyncBatches:
219
"""Asynchronous batch prediction jobs API."""
220
221
async def cancel(self, *, name: str) -> None:
222
"""Async version of cancel."""
223
...
224
```
225
226
### Delete Batch Job
227
228
Delete a batch job.
229
230
```python { .api }
231
class Batches:
232
"""Synchronous batch prediction jobs API."""
233
234
def delete(self, *, name: str) -> None:
235
"""
236
Delete a batch job.
237
238
Parameters:
239
name (str): Job name.
240
"""
241
...
242
243
class AsyncBatches:
244
"""Asynchronous batch prediction jobs API."""
245
246
async def delete(self, *, name: str) -> None:
247
"""Async version of delete."""
248
...
249
```
250
251
### List Batch Jobs
252
253
List all batch jobs with optional filtering and pagination.
254
255
```python { .api }
256
class Batches:
257
"""Synchronous batch prediction jobs API."""
258
259
def list(
260
self,
261
*,
262
config: Optional[ListBatchJobsConfig] = None
263
) -> Union[Pager[BatchJob], Iterator[BatchJob]]:
264
"""
265
List batch jobs.
266
267
Parameters:
268
config (ListBatchJobsConfig, optional): List configuration including:
269
- page_size: Number of jobs per page
270
- page_token: Token for pagination
271
- filter: Filter expression
272
273
Returns:
274
Union[Pager[BatchJob], Iterator[BatchJob]]: Paginated job list.
275
"""
276
...
277
278
class AsyncBatches:
279
"""Asynchronous batch prediction jobs API."""
280
281
async def list(
282
self,
283
*,
284
config: Optional[ListBatchJobsConfig] = None
285
) -> Union[AsyncPager[BatchJob], AsyncIterator[BatchJob]]:
286
"""Async version of list."""
287
...
288
```
289
290
## Types
291
292
```python { .api }
293
from typing import Optional, Union, List, Iterator, AsyncIterator, Dict, Any
294
from datetime import datetime
295
from enum import Enum
296
297
# Configuration types
298
class CreateBatchJobConfig:
299
"""
300
Configuration for creating batch jobs.
301
302
Attributes:
303
display_name (str, optional): Display name for the job.
304
labels (dict[str, str], optional): Labels for the job.
305
"""
306
display_name: Optional[str] = None
307
labels: Optional[dict[str, str]] = None
308
309
class ListBatchJobsConfig:
310
"""
311
Configuration for listing batch jobs.
312
313
Attributes:
314
page_size (int, optional): Number of jobs per page.
315
page_token (str, optional): Token for pagination.
316
filter (str, optional): Filter expression (e.g., 'state=JOB_STATE_RUNNING').
317
"""
318
page_size: Optional[int] = None
319
page_token: Optional[str] = None
320
filter: Optional[str] = None
321
322
# Response types
323
class BatchJob:
324
"""
325
Batch prediction job information.
326
327
Attributes:
328
name (str): Job resource name.
329
display_name (str, optional): Display name.
330
model (str): Model used for the job.
331
state (JobState): Current job state.
332
create_time (datetime): When job was created.
333
start_time (datetime, optional): When job started running.
334
end_time (datetime, optional): When job completed.
335
update_time (datetime): Last update time.
336
labels (dict[str, str], optional): Job labels.
337
metadata (BatchJobMetadata, optional): Job metadata and progress.
338
error (JobError, optional): Error if job failed.
339
"""
340
name: str
341
display_name: Optional[str] = None
342
model: str
343
state: JobState
344
create_time: datetime
345
start_time: Optional[datetime] = None
346
end_time: Optional[datetime] = None
347
update_time: datetime
348
labels: Optional[dict[str, str]] = None
349
metadata: Optional[BatchJobMetadata] = None
350
error: Optional[JobError] = None
351
352
class BatchJobMetadata:
353
"""
354
Batch job metadata and progress.
355
356
Attributes:
357
total_requests (int, optional): Total number of requests.
358
completed_requests (int, optional): Number of completed requests.
359
failed_requests (int, optional): Number of failed requests.
360
input_config (dict, optional): Input configuration.
361
output_config (dict, optional): Output configuration.
362
"""
363
total_requests: Optional[int] = None
364
completed_requests: Optional[int] = None
365
failed_requests: Optional[int] = None
366
input_config: Optional[dict] = None
367
output_config: Optional[dict] = None
368
369
class JobState(Enum):
370
"""Batch job states."""
371
JOB_STATE_UNSPECIFIED = 'JOB_STATE_UNSPECIFIED'
372
JOB_STATE_QUEUED = 'JOB_STATE_QUEUED'
373
JOB_STATE_PENDING = 'JOB_STATE_PENDING'
374
JOB_STATE_RUNNING = 'JOB_STATE_RUNNING'
375
JOB_STATE_SUCCEEDED = 'JOB_STATE_SUCCEEDED'
376
JOB_STATE_FAILED = 'JOB_STATE_FAILED'
377
JOB_STATE_CANCELLING = 'JOB_STATE_CANCELLING'
378
JOB_STATE_CANCELLED = 'JOB_STATE_CANCELLED'
379
JOB_STATE_PAUSED = 'JOB_STATE_PAUSED'
380
JOB_STATE_EXPIRED = 'JOB_STATE_EXPIRED'
381
382
class JobError:
383
"""
384
Job error information.
385
386
Attributes:
387
code (int): Error code.
388
message (str): Error message.
389
details (list[dict], optional): Error details.
390
"""
391
code: int
392
message: str
393
details: Optional[list[dict]] = None
394
395
# Pager types
396
class Pager[T]:
397
"""Synchronous pager."""
398
page: list[T]
399
def next_page(self) -> None: ...
400
def __iter__(self) -> Iterator[T]: ...
401
402
class AsyncPager[T]:
403
"""Asynchronous pager."""
404
page: list[T]
405
async def next_page(self) -> None: ...
406
async def __aiter__(self) -> AsyncIterator[T]: ...
407
```
408