The official Python library for the groq API
—
Submit and manage batch jobs for processing large volumes of requests efficiently. The batch API allows you to send multiple requests in a single operation, which is processed asynchronously and can provide significant cost savings for non-time-sensitive workloads.
Submit a batch job for processing multiple requests asynchronously.
def create(
input_file_id: str,
endpoint: Literal["/v1/chat/completions"],
completion_window: Literal["24h"],
metadata: Optional[Dict[str, str]] = NOT_GIVEN,
extra_headers: Headers | None = None,
extra_query: Query | None = None,
extra_body: Body | None = None,
timeout: float | httpx.Timeout | None | NotGiven = NOT_GIVEN
) -> BatchCreateResponse:
"""
Create a batch job for processing multiple requests.
Parameters:
- input_file_id: ID of the uploaded file containing batch requests
- endpoint: API endpoint to process requests against
- completion_window: Time window for batch completion
- metadata: Optional metadata to attach to the batch
Returns:
BatchCreateResponse with batch job information and status
"""Get detailed information about a specific batch job by its ID.
def retrieve(
batch_id: str,
extra_headers: Headers | None = None,
extra_query: Query | None = None,
extra_body: Body | None = None,
timeout: float | httpx.Timeout | None | NotGiven = NOT_GIVEN
) -> BatchRetrieveResponse:
"""
Retrieve information about a specific batch job.
Parameters:
- batch_id: ID of the batch job to retrieve
Returns:
BatchRetrieveResponse with detailed batch information and status
"""Retrieve a list of all batch jobs with their current status.
def list(
after: Optional[str] = NOT_GIVEN,
limit: Optional[int] = NOT_GIVEN,
extra_headers: Headers | None = None,
extra_query: Query | None = None,
extra_body: Body | None = None,
timeout: float | httpx.Timeout | None | NotGiven = NOT_GIVEN
) -> BatchListResponse:
"""
List batch jobs.
Parameters:
- after: Cursor for pagination, returns objects after this ID
- limit: Number of objects to return (default 20, max 100)
Returns:
BatchListResponse containing list of batch jobs
"""Cancel a batch job that is in progress or queued.
def cancel(
batch_id: str,
extra_headers: Headers | None = None,
extra_query: Query | None = None,
extra_body: Body | None = None,
timeout: float | httpx.Timeout | None | NotGiven = NOT_GIVEN
) -> BatchCancelResponse:
"""
Cancel a batch job.
Parameters:
- batch_id: ID of the batch job to cancel
Returns:
BatchCancelResponse with cancellation confirmation
"""All batch operations have asynchronous counterparts with identical parameters.
async def create(input_file_id: str, endpoint: str, completion_window: str, **kwargs) -> BatchCreateResponse: ...
async def retrieve(batch_id: str, **kwargs) -> BatchRetrieveResponse: ...
async def list(**kwargs) -> BatchListResponse: ...
async def cancel(batch_id: str, **kwargs) -> BatchCancelResponse: ...from groq import Groq
import json
import time
client = Groq()
# 1. Create batch request data
batch_requests = [
{
"custom_id": "request-1",
"method": "POST",
"url": "/v1/chat/completions",
"body": {
"model": "llama3-8b-8192",
"messages": [{"role": "user", "content": "What is artificial intelligence?"}],
"max_tokens": 100
}
},
{
"custom_id": "request-2",
"method": "POST",
"url": "/v1/chat/completions",
"body": {
"model": "llama3-8b-8192",
"messages": [{"role": "user", "content": "Explain machine learning in simple terms."}],
"max_tokens": 100
}
},
{
"custom_id": "request-3",
"method": "POST",
"url": "/v1/chat/completions",
"body": {
"model": "llama3-8b-8192",
"messages": [{"role": "user", "content": "What are the benefits of deep learning?"}],
"max_tokens": 100
}
}
]
# 2. Write requests to JSONL file
with open("batch_requests.jsonl", "w") as f:
for request in batch_requests:
f.write(json.dumps(request) + "\n")
# 3. Upload the file
with open("batch_requests.jsonl", "rb") as file:
file_response = client.files.create(
file=file,
purpose="batch"
)
print(f"File uploaded: {file_response.id}")
# 4. Create batch job
batch = client.batches.create(
input_file_id=file_response.id,
endpoint="/v1/chat/completions",
completion_window="24h",
metadata={"description": "AI Q&A batch processing"}
)
print(f"Batch created: {batch.id}")
print(f"Status: {batch.status}")
# 5. Monitor batch progress
while True:
batch_status = client.batches.retrieve(batch.id)
print(f"Batch status: {batch_status.status}")
if batch_status.status == "completed":
print("Batch completed!")
print(f"Output file: {batch_status.output_file_id}")
break
elif batch_status.status == "failed":
print("Batch failed!")
if batch_status.errors:
print(f"Errors: {batch_status.errors}")
break
elif batch_status.status == "cancelled":
print("Batch was cancelled!")
break
time.sleep(30) # Wait 30 seconds before checking againfrom groq import Groq
client = Groq()
# Assuming you have already uploaded a file with batch requests
batch = client.batches.create(
input_file_id="file-abc123",
endpoint="/v1/chat/completions",
completion_window="24h",
metadata={
"project": "customer_support_analysis",
"batch_type": "chat_completions"
}
)
print(f"Batch ID: {batch.id}")
print(f"Status: {batch.status}")
print(f"Created at: {batch.created_at}")
print(f"Request counts: {batch.request_counts}")from groq import Groq
client = Groq()
# Get batch information
batch = client.batches.retrieve("batch_abc123")
print(f"Batch ID: {batch.id}")
print(f"Status: {batch.status}")
print(f"Input file: {batch.input_file_id}")
print(f"Endpoint: {batch.endpoint}")
print(f"Completion window: {batch.completion_window}")
# Print request counts
if batch.request_counts:
print(f"Total requests: {batch.request_counts.total}")
print(f"Completed: {batch.request_counts.completed}")
print(f"Failed: {batch.request_counts.failed}")
# Check if completed and get output
if batch.status == "completed" and batch.output_file_id:
print(f"Output file: {batch.output_file_id}")
# You can then download the output file using the files APIfrom groq import Groq
client = Groq()
# List all batches
batches = client.batches.list(limit=10)
print(f"Total batches: {len(batches.data)}")
for batch in batches.data:
print(f"- {batch.id}: {batch.status}")
print(f" Endpoint: {batch.endpoint}")
print(f" Created: {batch.created_at}")
if batch.request_counts:
print(f" Requests: {batch.request_counts.completed}/{batch.request_counts.total}")from groq import Groq
client = Groq()
# Cancel a batch that's in progress
try:
result = client.batches.cancel("batch_abc123")
print(f"Batch cancelled: {result.id}")
print(f"Status: {result.status}")
except Exception as e:
print(f"Failed to cancel batch: {e}")import asyncio
from groq import AsyncGroq
async def main():
client = AsyncGroq()
# Create batch asynchronously
batch = await client.batches.create(
input_file_id="file-abc123",
endpoint="/v1/chat/completions",
completion_window="24h"
)
print(f"Batch created: {batch.id}")
# Monitor progress asynchronously
while True:
batch_status = await client.batches.retrieve(batch.id)
if batch_status.status in ["completed", "failed", "cancelled"]:
break
await asyncio.sleep(30)
print(f"Final status: {batch_status.status}")
asyncio.run(main())class BatchCreateParams:
input_file_id: str
endpoint: Literal["/v1/chat/completions"]
completion_window: Literal["24h"]
metadata: Optional[Dict[str, str]]class BatchCreateResponse:
id: str
object: Literal["batch"]
endpoint: str
errors: Optional[BatchErrors]
input_file_id: str
completion_window: str
status: Literal["validating", "failed", "in_progress", "finalizing", "completed", "expired", "cancelling", "cancelled"]
output_file_id: Optional[str]
error_file_id: Optional[str]
created_at: int
in_progress_at: Optional[int]
expires_at: Optional[int]
finalizing_at: Optional[int]
completed_at: Optional[int]
failed_at: Optional[int]
expired_at: Optional[int]
cancelling_at: Optional[int]
cancelled_at: Optional[int]
request_counts: Optional[BatchRequestCounts]
metadata: Optional[Dict[str, str]]
class BatchRetrieveResponse:
# Same structure as BatchCreateResponse
id: str
object: Literal["batch"]
endpoint: str
errors: Optional[BatchErrors]
input_file_id: str
completion_window: str
status: Literal["validating", "failed", "in_progress", "finalizing", "completed", "expired", "cancelling", "cancelled"]
output_file_id: Optional[str]
error_file_id: Optional[str]
created_at: int
in_progress_at: Optional[int]
expires_at: Optional[int]
finalizing_at: Optional[int]
completed_at: Optional[int]
failed_at: Optional[int]
expired_at: Optional[int]
cancelling_at: Optional[int]
cancelled_at: Optional[int]
request_counts: Optional[BatchRequestCounts]
metadata: Optional[Dict[str, str]]
class BatchListResponse:
object: Literal["list"]
data: List[BatchRetrieveResponse]
first_id: Optional[str]
last_id: Optional[str]
has_more: bool
class BatchCancelResponse:
# Same structure as BatchCreateResponse
id: str
object: Literal["batch"]
endpoint: str
errors: Optional[BatchErrors]
input_file_id: str
completion_window: str
status: Literal["validating", "failed", "in_progress", "finalizing", "completed", "expired", "cancelling", "cancelled"]
output_file_id: Optional[str]
error_file_id: Optional[str]
created_at: int
in_progress_at: Optional[int]
expires_at: Optional[int]
finalizing_at: Optional[int]
completed_at: Optional[int]
failed_at: Optional[int]
expired_at: Optional[int]
cancelling_at: Optional[int]
cancelled_at: Optional[int]
request_counts: Optional[BatchRequestCounts]
metadata: Optional[Dict[str, str]]class BatchRequestCounts:
total: int
completed: int
failed: int
class BatchErrors:
object: str
data: List[BatchError]
class BatchError:
code: str
message: str
param: Optional[str]
line: Optional[int]Each line in the input JSONL file should follow this format:
class BatchRequest:
custom_id: str # Your unique identifier for the request
method: Literal["POST"] # HTTP method
url: str # API endpoint (e.g., "/v1/chat/completions")
body: Dict[str, Any] # Request parameters for the endpointvalidating - Input file is being validatedfailed - Batch failed during validation or processingin_progress - Batch is currently being processedfinalizing - Batch processing is complete, output file being preparedcompleted - Batch completed successfullyexpired - Batch expired before completioncancelling - Batch cancellation in progresscancelled - Batch was cancelledInstall with Tessl CLI
npx tessl i tessl/pypi-groq