0
# Request Management
1
2
Status tracking, cancellation, and result retrieval for long-running inference tasks. Provides handle-based request lifecycle management with real-time status updates and event streaming capabilities.
3
4
## Capabilities
5
6
### Synchronous Request Handles
7
8
Manage the lifecycle of synchronous requests with status monitoring, event streaming, and result retrieval.
9
10
```python { .api }
11
class SyncRequestHandle:
12
"""Handle for managing synchronous request lifecycle."""
13
14
request_id: str
15
16
def status(self, *, with_logs: bool = False) -> Status:
17
"""
18
Get the current status of the request.
19
20
Parameters:
21
- with_logs: Include logs in the response (default: False)
22
23
Returns:
24
Status: Current request status (Queued, InProgress, or Completed)
25
"""
26
27
def iter_events(self, *, with_logs: bool = False, interval: float = 0.1) -> Iterator[Status]:
28
"""
29
Continuously poll for status updates until the request completes.
30
31
Parameters:
32
- with_logs: Include logs in status updates (default: False)
33
- interval: Polling interval in seconds (default: 0.1)
34
35
Returns:
36
Iterator[Status]: Iterator of status updates
37
"""
38
39
def cancel(self) -> None:
40
"""Cancel the request if it's still pending or in progress."""
41
42
def get(self) -> AnyJSON:
43
"""
44
Get the final result, blocking until the request completes.
45
46
Returns:
47
dict: The inference result
48
"""
49
```
50
51
Usage example:
52
```python
53
import fal_client
54
55
# Submit a request and get handle
56
handle = fal_client.submit("fal-ai/fast-sdxl", arguments={"prompt": "a landscape"})
57
58
# Check status manually
59
current_status = handle.status(with_logs=True)
60
if isinstance(current_status, fal_client.Queued):
61
print(f"Request queued at position: {current_status.position}")
62
63
# Or monitor continuously
64
for event in handle.iter_events(with_logs=True):
65
if isinstance(event, fal_client.Queued):
66
print(f"Queued at position: {event.position}")
67
elif isinstance(event, fal_client.InProgress):
68
if event.logs:
69
for log in event.logs[-5:]: # Show last 5 logs
70
print(f"Log: {log.get('message', '')}")
71
elif isinstance(event, fal_client.Completed):
72
print("Request completed!")
73
break
74
75
# Get the final result
76
result = handle.get()
77
print(result["images"][0]["url"])
78
```
79
80
### Asynchronous Request Handles
81
82
Manage the lifecycle of asynchronous requests with non-blocking status monitoring and event streaming.
83
84
```python { .api }
85
class AsyncRequestHandle:
86
"""Handle for managing asynchronous request lifecycle."""
87
88
request_id: str
89
90
async def status(self, *, with_logs: bool = False) -> Status:
91
"""
92
Get the current status of the request asynchronously.
93
94
Parameters:
95
- with_logs: Include logs in the response (default: False)
96
97
Returns:
98
Status: Current request status (Queued, InProgress, or Completed)
99
"""
100
101
async def iter_events(self, *, with_logs: bool = False, interval: float = 0.1) -> AsyncIterator[Status]:
102
"""
103
Continuously poll for status updates until the request completes asynchronously.
104
105
Parameters:
106
- with_logs: Include logs in status updates (default: False)
107
- interval: Polling interval in seconds (default: 0.1)
108
109
Returns:
110
AsyncIterator[Status]: Async iterator of status updates
111
"""
112
113
async def cancel(self) -> None:
114
"""Cancel the request if it's still pending or in progress asynchronously."""
115
116
async def get(self) -> AnyJSON:
117
"""
118
Get the final result asynchronously, awaiting until the request completes.
119
120
Returns:
121
dict: The inference result
122
"""
123
```
124
125
Usage example:
126
```python
127
import asyncio
128
import fal_client
129
130
async def main():
131
# Submit a request and get handle
132
handle = await fal_client.submit_async("fal-ai/fast-sdxl", arguments={"prompt": "a landscape"})
133
134
# Check status manually
135
current_status = await handle.status(with_logs=True)
136
if isinstance(current_status, fal_client.Queued):
137
print(f"Request queued at position: {current_status.position}")
138
139
# Or monitor continuously
140
async for event in handle.iter_events(with_logs=True):
141
if isinstance(event, fal_client.Queued):
142
print(f"Queued at position: {event.position}")
143
elif isinstance(event, fal_client.InProgress):
144
if event.logs:
145
for log in event.logs[-5:]: # Show last 5 logs
146
print(f"Log: {log.get('message', '')}")
147
elif isinstance(event, fal_client.Completed):
148
print("Request completed!")
149
break
150
151
# Get the final result
152
result = await handle.get()
153
print(result["images"][0]["url"])
154
155
asyncio.run(main())
156
```
157
158
### Status Classes
159
160
Type-safe status indicators for tracking request progress through the queue and execution pipeline.
161
162
```python { .api }
163
class Status:
164
"""Base class for all request statuses."""
165
166
class Queued(Status):
167
"""Request is waiting in the queue."""
168
position: int # Position in queue (0-indexed)
169
170
class InProgress(Status):
171
"""Request is currently being processed."""
172
logs: list[dict] | None # Processing logs if requested
173
174
class Completed(Status):
175
"""Request has finished processing."""
176
logs: list[dict] | None # Processing logs if requested
177
metrics: dict # Execution metrics (timing, etc.)
178
```
179
180
### Handle Creation
181
182
Create handles from existing request IDs for managing requests created elsewhere.
183
184
```python { .api }
185
@classmethod
186
def SyncRequestHandle.from_request_id(client, application: str, request_id: str) -> SyncRequestHandle:
187
"""
188
Create a handle from an existing request ID.
189
190
Parameters:
191
- client: httpx.Client instance
192
- application: The fal.ai application ID
193
- request_id: Existing request ID
194
195
Returns:
196
SyncRequestHandle: Handle for the request
197
"""
198
```
199
200
### Error Handling
201
202
Common error scenarios and handling patterns for request management.
203
204
```python
205
import fal_client
206
import httpx
207
208
try:
209
handle = fal_client.submit("fal-ai/fast-sdxl", arguments={"prompt": "test"})
210
211
# Monitor with timeout
212
import time
213
start_time = time.time()
214
timeout = 300 # 5 minutes
215
216
for event in handle.iter_events():
217
if time.time() - start_time > timeout:
218
handle.cancel()
219
raise TimeoutError("Request timed out")
220
221
if isinstance(event, fal_client.Completed):
222
break
223
224
result = handle.get()
225
226
except httpx.HTTPError as e:
227
print(f"HTTP error: {e}")
228
except fal_client.MissingCredentialsError:
229
print("API key not found. Set FAL_KEY environment variable.")
230
except Exception as e:
231
print(f"Unexpected error: {e}")
232
```
233
234
### Concurrent Request Management
235
236
Managing multiple requests concurrently with async handles.
237
238
```python
239
import asyncio
240
import fal_client
241
242
async def process_batch(prompts):
243
"""Process multiple prompts concurrently with request tracking."""
244
245
# Submit all requests
246
handles = []
247
for prompt in prompts:
248
handle = await fal_client.submit_async(
249
"fal-ai/fast-sdxl",
250
arguments={"prompt": prompt}
251
)
252
handles.append(handle)
253
254
# Monitor all requests concurrently
255
async def monitor_request(handle, prompt):
256
async for event in handle.iter_events():
257
if isinstance(event, fal_client.Queued):
258
print(f"'{prompt}' queued at position: {event.position}")
259
elif isinstance(event, fal_client.Completed):
260
result = await handle.get()
261
print(f"'{prompt}' completed: {result['images'][0]['url']}")
262
return result
263
264
# Wait for all to complete
265
tasks = [monitor_request(handle, prompt) for handle, prompt in zip(handles, prompts)]
266
results = await asyncio.gather(*tasks)
267
return results
268
269
# Usage
270
prompts = ["a cat", "a dog", "a bird"]
271
results = asyncio.run(process_batch(prompts))
272
```