CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-replicate

Python client for Replicate

Pending
Overview
Eval results
Files

deployments-webhooks.mddocs/

Deployments & Webhooks

Management of model deployments and webhook configuration for production use cases and real-time notifications.

Capabilities

Deployment Management

Manage dedicated model deployments for production workloads.

class Deployments:
    def get(self, owner: str, name: str) -> Deployment:
        """
        Get a deployment by owner and name.

        Parameters:
        - owner: Deployment owner username
        - name: Deployment name

        Returns:
        Deployment object with configuration and status
        """

    def list(self, **params) -> Page[Deployment]:
        """
        List deployments.

        Returns:
        Paginated list of Deployment objects
        """

    def create(
        self,
        name: str,
        model: str,
        version: str,
        *,
        hardware: Optional[str] = None,
        min_instances: Optional[int] = None,
        max_instances: Optional[int] = None,
        **params
    ) -> Deployment:
        """
        Create a new deployment.

        Parameters:
        - name: Name for the deployment
        - model: Model name in format "owner/name"
        - version: Model version ID
        - hardware: Hardware tier for deployment
        - min_instances: Minimum number of instances
        - max_instances: Maximum number of instances

        Returns:
        Deployment object
        """

    def update(
        self,
        owner: str,
        name: str,
        *,
        version: Optional[str] = None,
        hardware: Optional[str] = None,
        min_instances: Optional[int] = None,
        max_instances: Optional[int] = None,
        **params
    ) -> Deployment:
        """
        Update an existing deployment.

        Parameters:
        - owner: Deployment owner username
        - name: Deployment name
        - version: New model version ID
        - hardware: New hardware tier
        - min_instances: New minimum instances
        - max_instances: New maximum instances

        Returns:
        Updated Deployment object
        """

    def delete(self, owner: str, name: str) -> None:
        """
        Delete a deployment.

        Parameters:
        - owner: Deployment owner username
        - name: Deployment name
        """

Deployment Objects

Deployments represent dedicated model instances with scaling and performance configuration.

class Deployment:
    owner: str
    """The owner of the deployment."""

    name: str
    """The name of the deployment."""

    model: str
    """The model being deployed."""

    version: str
    """The version of the model being deployed."""

    status: str
    """The current status of the deployment."""

    hardware: Optional[str]
    """The hardware tier for the deployment."""

    min_instances: int
    """The minimum number of instances."""

    max_instances: int
    """The maximum number of instances."""

    current_instances: Optional[int]
    """The current number of running instances."""

    created_at: str
    """When the deployment was created."""

    updated_at: str
    """When the deployment was last updated."""

Webhook Management

Configure and validate webhooks for receiving real-time notifications.

class Webhooks:
    @property
    def default(self) -> "Webhooks.Default":
        """Namespace for operations related to the default webhook."""

    class Default:
        def secret(self) -> WebhookSigningSecret:
            """
            Get the default webhook signing secret.

            Returns:
            WebhookSigningSecret object for signature validation
            """

        async def async_secret(self) -> WebhookSigningSecret:
            """
            Get the default webhook signing secret asynchronously.

            Returns:
            WebhookSigningSecret object for signature validation
            """

    @staticmethod
    def validate(
        request: Optional[httpx.Request] = None,
        headers: Optional[Dict[str, str]] = None,
        body: Optional[str] = None,
        secret: Optional[WebhookSigningSecret] = None,
        tolerance: Optional[int] = None
    ) -> None:
        """
        Validate the signature from an incoming webhook request.

        Parameters:
        - request: The HTTPX request object (alternative to headers/body)
        - headers: The request headers dict (alternative to request)  
        - body: The request body string (alternative to request)
        - secret: The webhook signing secret
        - tolerance: Maximum allowed time difference in seconds

        Raises:
        WebhookValidationError: If validation fails
        """

Webhook Objects and Exceptions

class WebhookSigningSecret:
    key: str
    """The webhook signing secret key."""

# Webhook validation exceptions
class WebhookValidationError(ValueError):
    """Base webhook validation error."""

class MissingWebhookHeaderError(WebhookValidationError):
    """Missing required webhook header."""

class InvalidSecretKeyError(WebhookValidationError):
    """Invalid webhook secret key."""

class MissingWebhookBodyError(WebhookValidationError):
    """Missing webhook request body."""

class InvalidTimestampError(WebhookValidationError):
    """Invalid or expired timestamp."""

class InvalidSignatureError(WebhookValidationError):
    """Invalid webhook signature."""

Hardware Management

Access hardware configuration options for deployments.

class Hardware:
    name: str
    """The hardware tier name."""

    sku: str
    """The hardware SKU identifier."""

class HardwareNamespace:
    def list(self, **params) -> List[Hardware]:
        """
        List available hardware options.

        Returns:
        List of Hardware objects with available tiers
        """

Usage Examples

Create and Manage Deployments

import replicate

# List available hardware options
hardware_options = replicate.hardware.list()
for hw in hardware_options:
    print(f"Hardware: {hw.name} (SKU: {hw.sku})")

# Create a deployment
deployment = replicate.deployments.create(
    name="my-production-model",
    model="stability-ai/stable-diffusion-3",
    version="version-id-here",
    hardware="gpu-t4",
    min_instances=1,
    max_instances=5
)

print(f"Deployment created: {deployment.owner}/{deployment.name}")
print(f"Status: {deployment.status}")
print(f"Hardware: {deployment.hardware}")
print(f"Instances: {deployment.min_instances}-{deployment.max_instances}")

Update Deployment Configuration

import replicate

# Update deployment scaling
deployment = replicate.deployments.update(
    owner="myusername",
    name="my-production-model",
    min_instances=2,
    max_instances=10,
    hardware="gpu-a40-large"
)

print(f"Updated deployment: {deployment.status}")
print(f"Current instances: {deployment.current_instances}")

List and Monitor Deployments

import replicate

# List all deployments
deployments = replicate.deployments.list()

for deployment in deployments.results:
    print(f"Deployment: {deployment.owner}/{deployment.name}")
    print(f"Model: {deployment.model} ({deployment.version})")
    print(f"Status: {deployment.status}")
    print(f"Instances: {deployment.current_instances}/{deployment.max_instances}")
    print(f"Created: {deployment.created_at}")
    print("---")

# Get specific deployment
deployment = replicate.deployments.get("myusername", "my-production-model")
print(f"Deployment status: {deployment.status}")

Run Predictions on Deployments

import replicate

# Run prediction on deployment
deployment_url = "https://my-deployment.replicate.com"

output = replicate.run(
    "myusername/my-production-model",  # Deployment reference
    input={"prompt": "a beautiful landscape"},
    # Optionally specify deployment URL directly
    # model=deployment_url
)

with open("deployment_output.png", "wb") as f:
    f.write(output.read())

Webhook Configuration

import replicate

# Get default webhook secret
secret = replicate.webhooks.default.secret()
print(f"Webhook secret: {secret.key}")

# Create prediction with webhook
prediction = replicate.predictions.create(
    model="stability-ai/stable-diffusion-3",
    input={"prompt": "webhook test"},
    webhook="https://myapp.com/webhook",
    webhook_events_filter=["start", "output", "logs", "completed"]
)

print(f"Prediction with webhook: {prediction.id}")

Webhook Validation

import replicate
from replicate.exceptions import WebhookValidationError

def handle_webhook(request):
    """Handle incoming webhook request."""
    try:
        # Extract headers and body
        signature = request.headers.get('Replicate-Signature')
        timestamp = request.headers.get('Replicate-Timestamp')
        body = request.body
        
        # Validate webhook
        replicate.webhooks.validate(
            headers={"webhook-signature": signature, "webhook-timestamp": timestamp, "webhook-id": "webhook-id"},
            body=body.decode('utf-8'),
            secret=secret,
            tolerance=300  # 5 minutes
        )
        
        # Process validated webhook
        import json
        payload = json.loads(body.decode('utf-8'))
        
        if payload['status'] == 'succeeded':
            print(f"Prediction {payload['id']} completed successfully")
            # Process output
            output_url = payload['output'][0] if payload['output'] else None
        elif payload['status'] == 'failed':
            print(f"Prediction {payload['id']} failed: {payload.get('error')}")
            
    except WebhookValidationError as e:
        print(f"Webhook validation failed: {e}")
        return {"error": "Invalid webhook"}, 400
    except Exception as e:
        print(f"Webhook processing error: {e}")
        return {"error": "Processing failed"}, 500
    
    return {"success": True}, 200

Advanced Webhook Handling

import replicate
import hashlib
import hmac
import time
from replicate.exceptions import WebhookValidationError

class WebhookHandler:
    def __init__(self, secret_key: str):
        self.secret_key = secret_key
    
    def validate_signature(self, body: bytes, signature: str, timestamp: str):
        """Validate webhook signature manually."""
        try:
            # Parse signature
            if not signature.startswith('v1='):
                raise InvalidSignatureError("Invalid signature format")
            
            expected_signature = signature[3:]  # Remove 'v1=' prefix
            
            # Create signature payload
            payload = f"{timestamp}.{body.decode('utf-8')}"
            
            # Calculate expected signature
            expected = hmac.new(
                self.secret_key.encode('utf-8'),
                payload.encode('utf-8'),
                hashlib.sha256
            ).hexdigest()
            
            # Compare signatures
            if not hmac.compare_digest(expected, expected_signature):
                raise InvalidSignatureError("Signature mismatch")
            
            # Check timestamp freshness
            current_time = int(time.time())
            webhook_time = int(timestamp)
            if abs(current_time - webhook_time) > 300:  # 5 minutes
                raise InvalidTimestampError("Timestamp too old")
                
        except ValueError as e:
            raise WebhookValidationError(f"Validation error: {e}")
    
    def process_webhook(self, body: bytes, signature: str, timestamp: str):
        """Process validated webhook."""
        self.validate_signature(body, signature, timestamp)
        
        import json
        payload = json.loads(body.decode('utf-8'))
        
        event_type = payload.get('status')
        prediction_id = payload.get('id')
        
        if event_type == 'processing':
            print(f"Prediction {prediction_id} is processing...")
        elif event_type == 'succeeded':
            print(f"Prediction {prediction_id} completed!")
            self.handle_success(payload)
        elif event_type == 'failed':
            print(f"Prediction {prediction_id} failed: {payload.get('error')}")
            self.handle_failure(payload)
    
    def handle_success(self, payload):
        """Handle successful prediction."""
        output = payload.get('output')
        if output:
            # Download and process outputs
            for i, url in enumerate(output):
                print(f"Output {i}: {url}")
    
    def handle_failure(self, payload):
        """Handle failed prediction."""
        error = payload.get('error')
        logs = payload.get('logs')
        print(f"Error: {error}")
        if logs:
            print(f"Logs: {logs[-500:]}")  # Last 500 chars

# Usage
handler = WebhookHandler("your-webhook-secret")
# handler.process_webhook(request_body, signature, timestamp)

Delete Deployment

import replicate

# Delete a deployment
replicate.deployments.delete("myusername", "my-production-model")
print("Deployment deleted")

# Verify deletion
try:
    deployment = replicate.deployments.get("myusername", "my-production-model")
except Exception as e:
    print(f"Deployment not found (expected): {e}")

Install with Tessl CLI

npx tessl i tessl/pypi-replicate

docs

client.md

collections-training.md

deployments-webhooks.md

exceptions.md

files.md

index.md

models-predictions.md

tile.json