Python client for Replicate
—
Management of model deployments and webhook configuration for production use cases and real-time notifications.
Manage dedicated model deployments for production workloads.
class Deployments:
def get(self, owner: str, name: str) -> Deployment:
"""
Get a deployment by owner and name.
Parameters:
- owner: Deployment owner username
- name: Deployment name
Returns:
Deployment object with configuration and status
"""
def list(self, **params) -> Page[Deployment]:
"""
List deployments.
Returns:
Paginated list of Deployment objects
"""
def create(
self,
name: str,
model: str,
version: str,
*,
hardware: Optional[str] = None,
min_instances: Optional[int] = None,
max_instances: Optional[int] = None,
**params
) -> Deployment:
"""
Create a new deployment.
Parameters:
- name: Name for the deployment
- model: Model name in format "owner/name"
- version: Model version ID
- hardware: Hardware tier for deployment
- min_instances: Minimum number of instances
- max_instances: Maximum number of instances
Returns:
Deployment object
"""
def update(
self,
owner: str,
name: str,
*,
version: Optional[str] = None,
hardware: Optional[str] = None,
min_instances: Optional[int] = None,
max_instances: Optional[int] = None,
**params
) -> Deployment:
"""
Update an existing deployment.
Parameters:
- owner: Deployment owner username
- name: Deployment name
- version: New model version ID
- hardware: New hardware tier
- min_instances: New minimum instances
- max_instances: New maximum instances
Returns:
Updated Deployment object
"""
def delete(self, owner: str, name: str) -> None:
"""
Delete a deployment.
Parameters:
- owner: Deployment owner username
- name: Deployment name
"""Deployments represent dedicated model instances with scaling and performance configuration.
class Deployment:
owner: str
"""The owner of the deployment."""
name: str
"""The name of the deployment."""
model: str
"""The model being deployed."""
version: str
"""The version of the model being deployed."""
status: str
"""The current status of the deployment."""
hardware: Optional[str]
"""The hardware tier for the deployment."""
min_instances: int
"""The minimum number of instances."""
max_instances: int
"""The maximum number of instances."""
current_instances: Optional[int]
"""The current number of running instances."""
created_at: str
"""When the deployment was created."""
updated_at: str
"""When the deployment was last updated."""Configure and validate webhooks for receiving real-time notifications.
class Webhooks:
@property
def default(self) -> "Webhooks.Default":
"""Namespace for operations related to the default webhook."""
class Default:
def secret(self) -> WebhookSigningSecret:
"""
Get the default webhook signing secret.
Returns:
WebhookSigningSecret object for signature validation
"""
async def async_secret(self) -> WebhookSigningSecret:
"""
Get the default webhook signing secret asynchronously.
Returns:
WebhookSigningSecret object for signature validation
"""
@staticmethod
def validate(
request: Optional[httpx.Request] = None,
headers: Optional[Dict[str, str]] = None,
body: Optional[str] = None,
secret: Optional[WebhookSigningSecret] = None,
tolerance: Optional[int] = None
) -> None:
"""
Validate the signature from an incoming webhook request.
Parameters:
- request: The HTTPX request object (alternative to headers/body)
- headers: The request headers dict (alternative to request)
- body: The request body string (alternative to request)
- secret: The webhook signing secret
- tolerance: Maximum allowed time difference in seconds
Raises:
WebhookValidationError: If validation fails
"""class WebhookSigningSecret:
key: str
"""The webhook signing secret key."""
# Webhook validation exceptions
class WebhookValidationError(ValueError):
"""Base webhook validation error."""
class MissingWebhookHeaderError(WebhookValidationError):
"""Missing required webhook header."""
class InvalidSecretKeyError(WebhookValidationError):
"""Invalid webhook secret key."""
class MissingWebhookBodyError(WebhookValidationError):
"""Missing webhook request body."""
class InvalidTimestampError(WebhookValidationError):
"""Invalid or expired timestamp."""
class InvalidSignatureError(WebhookValidationError):
"""Invalid webhook signature."""Access hardware configuration options for deployments.
class Hardware:
name: str
"""The hardware tier name."""
sku: str
"""The hardware SKU identifier."""
class HardwareNamespace:
def list(self, **params) -> List[Hardware]:
"""
List available hardware options.
Returns:
List of Hardware objects with available tiers
"""import replicate
# List available hardware options
hardware_options = replicate.hardware.list()
for hw in hardware_options:
print(f"Hardware: {hw.name} (SKU: {hw.sku})")
# Create a deployment
deployment = replicate.deployments.create(
name="my-production-model",
model="stability-ai/stable-diffusion-3",
version="version-id-here",
hardware="gpu-t4",
min_instances=1,
max_instances=5
)
print(f"Deployment created: {deployment.owner}/{deployment.name}")
print(f"Status: {deployment.status}")
print(f"Hardware: {deployment.hardware}")
print(f"Instances: {deployment.min_instances}-{deployment.max_instances}")import replicate
# Update deployment scaling
deployment = replicate.deployments.update(
owner="myusername",
name="my-production-model",
min_instances=2,
max_instances=10,
hardware="gpu-a40-large"
)
print(f"Updated deployment: {deployment.status}")
print(f"Current instances: {deployment.current_instances}")import replicate
# List all deployments
deployments = replicate.deployments.list()
for deployment in deployments.results:
print(f"Deployment: {deployment.owner}/{deployment.name}")
print(f"Model: {deployment.model} ({deployment.version})")
print(f"Status: {deployment.status}")
print(f"Instances: {deployment.current_instances}/{deployment.max_instances}")
print(f"Created: {deployment.created_at}")
print("---")
# Get specific deployment
deployment = replicate.deployments.get("myusername", "my-production-model")
print(f"Deployment status: {deployment.status}")import replicate
# Run prediction on deployment
deployment_url = "https://my-deployment.replicate.com"
output = replicate.run(
"myusername/my-production-model", # Deployment reference
input={"prompt": "a beautiful landscape"},
# Optionally specify deployment URL directly
# model=deployment_url
)
with open("deployment_output.png", "wb") as f:
f.write(output.read())import replicate
# Get default webhook secret
secret = replicate.webhooks.default.secret()
print(f"Webhook secret: {secret.key}")
# Create prediction with webhook
prediction = replicate.predictions.create(
model="stability-ai/stable-diffusion-3",
input={"prompt": "webhook test"},
webhook="https://myapp.com/webhook",
webhook_events_filter=["start", "output", "logs", "completed"]
)
print(f"Prediction with webhook: {prediction.id}")import replicate
from replicate.exceptions import WebhookValidationError
def handle_webhook(request):
"""Handle incoming webhook request."""
try:
# Extract headers and body
signature = request.headers.get('Replicate-Signature')
timestamp = request.headers.get('Replicate-Timestamp')
body = request.body
# Validate webhook
replicate.webhooks.validate(
headers={"webhook-signature": signature, "webhook-timestamp": timestamp, "webhook-id": "webhook-id"},
body=body.decode('utf-8'),
secret=secret,
tolerance=300 # 5 minutes
)
# Process validated webhook
import json
payload = json.loads(body.decode('utf-8'))
if payload['status'] == 'succeeded':
print(f"Prediction {payload['id']} completed successfully")
# Process output
output_url = payload['output'][0] if payload['output'] else None
elif payload['status'] == 'failed':
print(f"Prediction {payload['id']} failed: {payload.get('error')}")
except WebhookValidationError as e:
print(f"Webhook validation failed: {e}")
return {"error": "Invalid webhook"}, 400
except Exception as e:
print(f"Webhook processing error: {e}")
return {"error": "Processing failed"}, 500
return {"success": True}, 200import replicate
import hashlib
import hmac
import time
from replicate.exceptions import WebhookValidationError
class WebhookHandler:
def __init__(self, secret_key: str):
self.secret_key = secret_key
def validate_signature(self, body: bytes, signature: str, timestamp: str):
"""Validate webhook signature manually."""
try:
# Parse signature
if not signature.startswith('v1='):
raise InvalidSignatureError("Invalid signature format")
expected_signature = signature[3:] # Remove 'v1=' prefix
# Create signature payload
payload = f"{timestamp}.{body.decode('utf-8')}"
# Calculate expected signature
expected = hmac.new(
self.secret_key.encode('utf-8'),
payload.encode('utf-8'),
hashlib.sha256
).hexdigest()
# Compare signatures
if not hmac.compare_digest(expected, expected_signature):
raise InvalidSignatureError("Signature mismatch")
# Check timestamp freshness
current_time = int(time.time())
webhook_time = int(timestamp)
if abs(current_time - webhook_time) > 300: # 5 minutes
raise InvalidTimestampError("Timestamp too old")
except ValueError as e:
raise WebhookValidationError(f"Validation error: {e}")
def process_webhook(self, body: bytes, signature: str, timestamp: str):
"""Process validated webhook."""
self.validate_signature(body, signature, timestamp)
import json
payload = json.loads(body.decode('utf-8'))
event_type = payload.get('status')
prediction_id = payload.get('id')
if event_type == 'processing':
print(f"Prediction {prediction_id} is processing...")
elif event_type == 'succeeded':
print(f"Prediction {prediction_id} completed!")
self.handle_success(payload)
elif event_type == 'failed':
print(f"Prediction {prediction_id} failed: {payload.get('error')}")
self.handle_failure(payload)
def handle_success(self, payload):
"""Handle successful prediction."""
output = payload.get('output')
if output:
# Download and process outputs
for i, url in enumerate(output):
print(f"Output {i}: {url}")
def handle_failure(self, payload):
"""Handle failed prediction."""
error = payload.get('error')
logs = payload.get('logs')
print(f"Error: {error}")
if logs:
print(f"Logs: {logs[-500:]}") # Last 500 chars
# Usage
handler = WebhookHandler("your-webhook-secret")
# handler.process_webhook(request_body, signature, timestamp)import replicate
# Delete a deployment
replicate.deployments.delete("myusername", "my-production-model")
print("Deployment deleted")
# Verify deletion
try:
deployment = replicate.deployments.get("myusername", "my-production-model")
except Exception as e:
print(f"Deployment not found (expected): {e}")Install with Tessl CLI
npx tessl i tessl/pypi-replicate