Supabase client for Python providing database operations, authentication, storage, real-time subscriptions, and edge functions.
Serverless function invocation and management for running custom server-side logic at the edge. Provides seamless integration with Supabase Edge Functions for executing TypeScript/JavaScript code with automatic scaling and global distribution.
Access the edge functions client through the main Supabase client instance.
@property
def functions(self) -> SyncFunctionsClient | AsyncFunctionsClient:
"""
Functions client providing edge function invocation capabilities.
Returns:
Functions client instance (sync or async) with methods for:
- Function invocation with custom parameters
- Request/response handling with proper error management
- Integration with authentication context
- Custom headers and configuration options
"""Execute edge functions with parameters, custom headers, and response handling.
def invoke(
self,
function_name: str,
invoke_options: dict = None
) -> FunctionsResponse:
"""
Invoke an edge function by name.
Parameters:
- function_name: Name of the edge function to invoke
- invoke_options: Invocation options including body, headers, method, etc.
Returns:
FunctionsResponse with data, headers, status, and error information
Raises:
FunctionsHttpError: If HTTP request fails
FunctionsRelayError: If function relay fails
FunctionsError: For general function errors
Note: For async clients, this method is async
"""Usage Examples:
# Basic function invocation
response = supabase.functions.invoke("hello-world")
print(f"Response: {response.data}")
# Function with parameters
response = supabase.functions.invoke("greeting", {
"body": {"name": "Alice", "language": "en"}
})
print(f"Greeting: {response.data}")
# Function with custom headers
response = supabase.functions.invoke("protected-function", {
"headers": {
"Authorization": f"Bearer {user_token}",
"Content-Type": "application/json"
},
"body": {"action": "get_user_data"}
})
# POST request with JSON data
response = supabase.functions.invoke("process-data", {
"method": "POST",
"headers": {"Content-Type": "application/json"},
"body": {
"data": [1, 2, 3, 4, 5],
"operation": "sum",
"metadata": {"source": "python-client"}
}
})
# Handle response data
if response.data:
result = response.data
print(f"Processing result: {result}")
else:
print(f"Function failed: {response.error}")
# Async function invocation
async def call_async_function():
client = await create_async_client(url, key)
response = await client.functions.invoke("async-processor", {
"body": {"items": ["a", "b", "c"]}
})
return response.dataProcess function responses, handle errors, and extract data from function execution results.
# FunctionsResponse attributes (from supabase_functions)
class FunctionsResponse:
"""Response from edge function invocation."""
data: Any
"""Function response data (parsed JSON or raw text)"""
error: Optional[str]
"""Error message if function execution failed"""
status: int
"""HTTP status code from function response"""
headers: Dict[str, str]
"""Response headers from function"""Usage Examples:
# Comprehensive response handling
response = supabase.functions.invoke("data-processor", {
"body": {"input": "test data"}
})
# Check response status
if response.status == 200:
print("Function executed successfully")
data = response.data
print(f"Result: {data}")
elif response.status == 400:
print(f"Bad request: {response.error}")
elif response.status == 500:
print(f"Function error: {response.error}")
else:
print(f"Unexpected status {response.status}: {response.error}")
# Access response headers
print(f"Content-Type: {response.headers.get('content-type')}")
print(f"Execution time: {response.headers.get('x-execution-time')}")
# Handle different response types
def process_function_response(response):
"""Process function response based on content type"""
content_type = response.headers.get('content-type', '')
if 'application/json' in content_type:
# JSON response
return response.data
elif 'text/plain' in content_type:
# Text response
return str(response.data)
elif 'application/octet-stream' in content_type:
# Binary response
return response.data # Raw bytes
else:
# Unknown content type
return response.data
# Example with file processing function
response = supabase.functions.invoke("image-resize", {
"body": {"image_url": "https://example.com/image.jpg", "width": 300},
"headers": {"Accept": "application/json"}
})
if response.status == 200:
result = response.data
print(f"Resized image URL: {result['resized_url']}")
print(f"Original size: {result['original_size']}")
print(f"New size: {result['new_size']}")Invoke functions with user authentication context and handle auth-dependent operations.
# Functions automatically inherit auth context from client
# No explicit API methods, but functions receive user contextUsage Examples:
# Functions automatically receive user auth context
# The user's JWT token is automatically passed to functions
# Get current user session
session = supabase.auth.get_session()
if session:
# Function will receive authenticated user context
response = supabase.functions.invoke("user-profile", {
"body": {"operation": "get_preferences"}
})
user_data = response.data
print(f"User preferences: {user_data}")
else:
print("User not authenticated")
# Functions can access user data through auth context
response = supabase.functions.invoke("protected-operation", {
"body": {"action": "update_profile", "data": {"theme": "dark"}}
})
# Function can use auth to authorize operations
if response.status == 401:
print("Authentication required")
elif response.status == 403:
print("Insufficient permissions")
elif response.status == 200:
print(f"Operation successful: {response.data}")
# Example: User-specific data processing
def process_user_data(operation, data):
"""Process user data through edge function"""
# Check if user is authenticated
session = supabase.auth.get_session()
if not session:
raise Exception("Authentication required")
response = supabase.functions.invoke("user-data-processor", {
"body": {
"operation": operation,
"data": data,
"user_id": session.user.id # Explicit user ID
}
})
if response.status != 200:
raise Exception(f"Function failed: {response.error}")
return response.data
# Usage
try:
result = process_user_data("calculate_stats", {"period": "monthly"})
print(f"Stats: {result}")
except Exception as e:
print(f"Error: {e}")Send custom headers, configure request options, and handle different HTTP methods.
# Invoke options configuration
invoke_options = {
"method": str, # HTTP method (GET, POST, PUT, etc.)
"headers": Dict[str, str], # Custom request headers
"body": Any, # Request body (JSON serializable)
"region": str, # Function region (optional)
"timeout": int # Request timeout in milliseconds
}Usage Examples:
# Custom headers for API integration
response = supabase.functions.invoke("external-api-proxy", {
"method": "POST",
"headers": {
"Content-Type": "application/json",
"X-API-Key": "external-service-key",
"User-Agent": "MyApp/1.0"
},
"body": {
"endpoint": "https://api.external.com/data",
"params": {"limit": 100}
}
})
# Different HTTP methods
# GET request
get_response = supabase.functions.invoke("data-fetcher", {
"method": "GET",
"headers": {"Accept": "application/json"}
})
# POST request with form data
post_response = supabase.functions.invoke("form-processor", {
"method": "POST",
"headers": {"Content-Type": "application/x-www-form-urlencoded"},
"body": "name=John&email=john@example.com"
})
# PUT request for updates
put_response = supabase.functions.invoke("resource-updater", {
"method": "PUT",
"headers": {"Content-Type": "application/json"},
"body": {"id": 123, "name": "Updated Name"}
})
# DELETE request
delete_response = supabase.functions.invoke("resource-deleter", {
"method": "DELETE",
"headers": {"Accept": "application/json"},
"body": {"id": 123}
})
# Configure timeout for long-running functions
response = supabase.functions.invoke("data-processor", {
"body": {"large_dataset": data},
"timeout": 60000 # 60 seconds
})
# Regional function invocation
response = supabase.functions.invoke("region-specific", {
"region": "us-east-1",
"body": {"data": "region-specific-data"}
})Handle multiple function calls, batch operations, and utility patterns.
Usage Examples:
# Batch function calls
async def batch_invoke_functions(functions_data):
"""Invoke multiple functions concurrently (async)"""
client = await create_async_client(url, key)
tasks = []
for func_name, options in functions_data.items():
task = client.functions.invoke(func_name, options)
tasks.append(task)
results = await asyncio.gather(*tasks, return_exceptions=True)
response_map = {}
for i, (func_name, _) in enumerate(functions_data.items()):
response_map[func_name] = results[i]
return response_map
# Batch processing example
batch_data = {
"process-images": {
"body": {"images": ["img1.jpg", "img2.jpg"]}
},
"send-notifications": {
"body": {"users": ["user1", "user2"], "message": "Hello"}
},
"update-analytics": {
"body": {"events": ["login", "purchase"]}
}
}
# async batch execution
results = await batch_invoke_functions(batch_data)
for func_name, response in results.items():
if isinstance(response, Exception):
print(f"{func_name} failed: {response}")
else:
print(f"{func_name} result: {response.data}")
# Synchronous batch with error handling
def batch_invoke_sync(function_calls):
"""Invoke multiple functions synchronously with error handling"""
results = {}
for func_name, options in function_calls.items():
try:
response = supabase.functions.invoke(func_name, options)
results[func_name] = {
"success": True,
"data": response.data,
"status": response.status
}
except Exception as e:
results[func_name] = {
"success": False,
"error": str(e),
"status": None
}
return results
# Pipeline processing
def function_pipeline(data, pipeline_steps):
"""Process data through a pipeline of functions"""
current_data = data
for step in pipeline_steps:
func_name = step["function"]
params = step.get("params", {})
response = supabase.functions.invoke(func_name, {
"body": {**params, "input": current_data}
})
if response.status != 200:
raise Exception(f"Pipeline failed at {func_name}: {response.error}")
current_data = response.data
return current_data
# Usage
pipeline = [
{"function": "validate-data", "params": {"schema": "user_data"}},
{"function": "transform-data", "params": {"format": "normalized"}},
{"function": "enrich-data", "params": {"source": "external_api"}},
{"function": "store-data", "params": {"table": "processed_users"}}
]
try:
result = function_pipeline(user_input_data, pipeline)
print(f"Pipeline result: {result}")
except Exception as e:
print(f"Pipeline failed: {e}")Handle function errors, debug issues, and implement retry logic.
# Functions exceptions (from supabase_functions)
class FunctionsError(Exception):
"""Base class for functions errors"""
class FunctionsHttpError(Exception):
"""HTTP request errors when invoking functions"""
class FunctionsRelayError(Exception):
"""Function relay and communication errors"""Error Handling Examples:
from supabase_functions.errors import FunctionsError, FunctionsHttpError, FunctionsRelayError
# Comprehensive error handling
def invoke_with_retry(func_name, options, max_retries=3):
"""Invoke function with retry logic"""
for attempt in range(max_retries):
try:
response = supabase.functions.invoke(func_name, options)
# Check for successful response
if response.status == 200:
return response
elif response.status >= 500:
# Server error - retry
if attempt < max_retries - 1:
print(f"Server error (attempt {attempt + 1}), retrying...")
time.sleep(2 ** attempt) # Exponential backoff
continue
else:
raise Exception(f"Server error after {max_retries} attempts")
else:
# Client error - don't retry
raise Exception(f"Client error {response.status}: {response.error}")
except FunctionsHttpError as e:
# Network/HTTP error - retry
if attempt < max_retries - 1:
print(f"HTTP error (attempt {attempt + 1}): {e}")
time.sleep(2 ** attempt)
continue
else:
raise Exception(f"HTTP error after {max_retries} attempts: {e}")
except FunctionsRelayError as e:
# Relay error - retry
if attempt < max_retries - 1:
print(f"Relay error (attempt {attempt + 1}): {e}")
time.sleep(2 ** attempt)
continue
else:
raise Exception(f"Relay error after {max_retries} attempts: {e}")
except FunctionsError as e:
# General function error - don't retry
raise Exception(f"Function error: {e}")
# Error categorization and handling
def handle_function_error(response):
"""Categorize and handle function errors"""
status = response.status
error = response.error
if status == 400:
return {"type": "client_error", "message": "Invalid request", "retry": False}
elif status == 401:
return {"type": "auth_error", "message": "Authentication required", "retry": False}
elif status == 403:
return {"type": "permission_error", "message": "Insufficient permissions", "retry": False}
elif status == 404:
return {"type": "not_found", "message": "Function not found", "retry": False}
elif status == 429:
return {"type": "rate_limit", "message": "Rate limit exceeded", "retry": True}
elif status >= 500:
return {"type": "server_error", "message": "Server error", "retry": True}
else:
return {"type": "unknown", "message": error, "retry": False}
# Usage with error handling
try:
response = invoke_with_retry("unreliable-function", {
"body": {"data": "test"}
})
print(f"Success: {response.data}")
except Exception as e:
print(f"Function invocation failed: {e}")
# Debugging and logging
def invoke_with_logging(func_name, options):
"""Invoke function with detailed logging"""
print(f"Invoking function: {func_name}")
print(f"Options: {options}")
start_time = time.time()
try:
response = supabase.functions.invoke(func_name, options)
duration = time.time() - start_time
print(f"Function completed in {duration:.2f}s")
print(f"Status: {response.status}")
print(f"Headers: {response.headers}")
if response.status == 200:
print(f"Response data: {response.data}")
else:
print(f"Error: {response.error}")
return response
except Exception as e:
duration = time.time() - start_time
print(f"Function failed after {duration:.2f}s: {e}")
raise
# Function health check
def check_function_health(func_name):
"""Check if function is healthy and responsive"""
try:
response = supabase.functions.invoke(func_name, {
"method": "GET",
"timeout": 5000 # 5 second timeout
})
return {
"healthy": response.status == 200,
"status": response.status,
"response_time": response.headers.get("x-execution-time"),
"error": response.error if response.status != 200 else None
}
except Exception as e:
return {
"healthy": False,
"status": None,
"response_time": None,
"error": str(e)
}
# Monitor function performance
health = check_function_health("critical-function")
if health["healthy"]:
print(f"Function is healthy (response time: {health['response_time']})")
else:
print(f"Function is unhealthy: {health['error']}")# Optimize function calls for performance and reliability
# 1. Use appropriate timeouts
short_timeout_response = supabase.functions.invoke("quick-function", {
"body": {"data": "test"},
"timeout": 5000 # 5 seconds for quick operations
})
long_timeout_response = supabase.functions.invoke("heavy-processing", {
"body": {"large_dataset": data},
"timeout": 120000 # 2 minutes for heavy processing
})
# 2. Minimize payload size
# Good: Send only necessary data
response = supabase.functions.invoke("processor", {
"body": {"ids": [1, 2, 3], "operation": "summarize"}
})
# Avoid: Sending large unnecessary data
# response = supabase.functions.invoke("processor", {
# "body": {"full_records": large_dataset, "operation": "summarize"}
# })
# 3. Use appropriate error handling
def robust_function_call(func_name, data):
"""Make robust function calls with proper error handling"""
try:
response = supabase.functions.invoke(func_name, {
"body": data,
"timeout": 30000
})
if response.status == 200:
return {"success": True, "data": response.data}
else:
return {"success": False, "error": response.error, "status": response.status}
except Exception as e:
return {"success": False, "error": str(e), "status": None}
# 4. Cache function results when appropriate
function_cache = {}
def cached_function_call(func_name, cache_key, data):
"""Cache function results to avoid redundant calls"""
if cache_key in function_cache:
return function_cache[cache_key]
result = robust_function_call(func_name, data)
if result["success"]:
function_cache[cache_key] = result
return result
# 5. Async batch processing for multiple independent calls
async def process_items_concurrently(items):
"""Process multiple items concurrently using functions"""
client = await create_async_client(url, key)
tasks = []
for item in items:
task = client.functions.invoke("item-processor", {
"body": {"item": item}
})
tasks.append(task)
responses = await asyncio.gather(*tasks, return_exceptions=True)
results = []
for i, response in enumerate(responses):
if isinstance(response, Exception):
results.append({"item": items[i], "success": False, "error": str(response)})
else:
results.append({"item": items[i], "success": True, "data": response.data})
return resultsInstall with Tessl CLI
npx tessl i tessl/pypi-supabase