Python client library for interacting with machine learning models deployed on the fal.ai platform
—
Status tracking, cancellation, and result retrieval for long-running inference tasks. Provides handle-based request lifecycle management with real-time status updates and event streaming capabilities.
Manage the lifecycle of synchronous requests with status monitoring, event streaming, and result retrieval.
class SyncRequestHandle:
"""Handle for managing synchronous request lifecycle."""
request_id: str
def status(self, *, with_logs: bool = False) -> Status:
"""
Get the current status of the request.
Parameters:
- with_logs: Include logs in the response (default: False)
Returns:
Status: Current request status (Queued, InProgress, or Completed)
"""
def iter_events(self, *, with_logs: bool = False, interval: float = 0.1) -> Iterator[Status]:
"""
Continuously poll for status updates until the request completes.
Parameters:
- with_logs: Include logs in status updates (default: False)
- interval: Polling interval in seconds (default: 0.1)
Returns:
Iterator[Status]: Iterator of status updates
"""
def cancel(self) -> None:
"""Cancel the request if it's still pending or in progress."""
def get(self) -> AnyJSON:
"""
Get the final result, blocking until the request completes.
Returns:
dict: The inference result
"""Usage example:
import fal_client
# Submit a request and get handle
handle = fal_client.submit("fal-ai/fast-sdxl", arguments={"prompt": "a landscape"})
# Check status manually
current_status = handle.status(with_logs=True)
if isinstance(current_status, fal_client.Queued):
print(f"Request queued at position: {current_status.position}")
# Or monitor continuously
for event in handle.iter_events(with_logs=True):
if isinstance(event, fal_client.Queued):
print(f"Queued at position: {event.position}")
elif isinstance(event, fal_client.InProgress):
if event.logs:
for log in event.logs[-5:]: # Show last 5 logs
print(f"Log: {log.get('message', '')}")
elif isinstance(event, fal_client.Completed):
print("Request completed!")
break
# Get the final result
result = handle.get()
print(result["images"][0]["url"])Manage the lifecycle of asynchronous requests with non-blocking status monitoring and event streaming.
class AsyncRequestHandle:
"""Handle for managing asynchronous request lifecycle."""
request_id: str
async def status(self, *, with_logs: bool = False) -> Status:
"""
Get the current status of the request asynchronously.
Parameters:
- with_logs: Include logs in the response (default: False)
Returns:
Status: Current request status (Queued, InProgress, or Completed)
"""
async def iter_events(self, *, with_logs: bool = False, interval: float = 0.1) -> AsyncIterator[Status]:
"""
Continuously poll for status updates until the request completes asynchronously.
Parameters:
- with_logs: Include logs in status updates (default: False)
- interval: Polling interval in seconds (default: 0.1)
Returns:
AsyncIterator[Status]: Async iterator of status updates
"""
async def cancel(self) -> None:
"""Cancel the request if it's still pending or in progress asynchronously."""
async def get(self) -> AnyJSON:
"""
Get the final result asynchronously, awaiting until the request completes.
Returns:
dict: The inference result
"""Usage example:
import asyncio
import fal_client
async def main():
# Submit a request and get handle
handle = await fal_client.submit_async("fal-ai/fast-sdxl", arguments={"prompt": "a landscape"})
# Check status manually
current_status = await handle.status(with_logs=True)
if isinstance(current_status, fal_client.Queued):
print(f"Request queued at position: {current_status.position}")
# Or monitor continuously
async for event in handle.iter_events(with_logs=True):
if isinstance(event, fal_client.Queued):
print(f"Queued at position: {event.position}")
elif isinstance(event, fal_client.InProgress):
if event.logs:
for log in event.logs[-5:]: # Show last 5 logs
print(f"Log: {log.get('message', '')}")
elif isinstance(event, fal_client.Completed):
print("Request completed!")
break
# Get the final result
result = await handle.get()
print(result["images"][0]["url"])
asyncio.run(main())Type-safe status indicators for tracking request progress through the queue and execution pipeline.
class Status:
"""Base class for all request statuses."""
class Queued(Status):
"""Request is waiting in the queue."""
position: int # Position in queue (0-indexed)
class InProgress(Status):
"""Request is currently being processed."""
logs: list[dict] | None # Processing logs if requested
class Completed(Status):
"""Request has finished processing."""
logs: list[dict] | None # Processing logs if requested
metrics: dict # Execution metrics (timing, etc.)Create handles from existing request IDs for managing requests created elsewhere.
@classmethod
def SyncRequestHandle.from_request_id(client, application: str, request_id: str) -> SyncRequestHandle:
"""
Create a handle from an existing request ID.
Parameters:
- client: httpx.Client instance
- application: The fal.ai application ID
- request_id: Existing request ID
Returns:
SyncRequestHandle: Handle for the request
"""Common error scenarios and handling patterns for request management.
import fal_client
import httpx
try:
handle = fal_client.submit("fal-ai/fast-sdxl", arguments={"prompt": "test"})
# Monitor with timeout
import time
start_time = time.time()
timeout = 300 # 5 minutes
for event in handle.iter_events():
if time.time() - start_time > timeout:
handle.cancel()
raise TimeoutError("Request timed out")
if isinstance(event, fal_client.Completed):
break
result = handle.get()
except httpx.HTTPError as e:
print(f"HTTP error: {e}")
except fal_client.MissingCredentialsError:
print("API key not found. Set FAL_KEY environment variable.")
except Exception as e:
print(f"Unexpected error: {e}")Managing multiple requests concurrently with async handles.
import asyncio
import fal_client
async def process_batch(prompts):
"""Process multiple prompts concurrently with request tracking."""
# Submit all requests
handles = []
for prompt in prompts:
handle = await fal_client.submit_async(
"fal-ai/fast-sdxl",
arguments={"prompt": prompt}
)
handles.append(handle)
# Monitor all requests concurrently
async def monitor_request(handle, prompt):
async for event in handle.iter_events():
if isinstance(event, fal_client.Queued):
print(f"'{prompt}' queued at position: {event.position}")
elif isinstance(event, fal_client.Completed):
result = await handle.get()
print(f"'{prompt}' completed: {result['images'][0]['url']}")
return result
# Wait for all to complete
tasks = [monitor_request(handle, prompt) for handle, prompt in zip(handles, prompts)]
results = await asyncio.gather(*tasks)
return results
# Usage
prompts = ["a cat", "a dog", "a bird"]
results = asyncio.run(process_batch(prompts))Install with Tessl CLI
npx tessl i tessl/pypi-fal-client