0
# Request Queue Management
1
2
Request queue operations for managing crawling workflows and Actor communication. Request queues provide distributed, persistent storage for web crawling requests with advanced features like deduplication, prioritization, and request locking.
3
4
## Capabilities
5
6
### Request Queue Operations
7
8
Individual request queue management with comprehensive request handling capabilities.
9
10
```python { .api }
11
class RequestQueueClient:
12
def get(self) -> dict | None:
13
"""Get request queue information."""
14
15
def update(self, *, name: str | None = None, general_access: StorageGeneralAccess | None = None) -> dict:
16
"""Update queue configuration.
17
18
Args:
19
name: Queue name
20
general_access: Storage access level (from apify_shared.consts)
21
"""
22
23
def delete(self) -> None:
24
"""Delete queue."""
25
26
def list_head(self, *, limit: int | None = None) -> dict:
27
"""Get requests from queue head.
28
29
Args:
30
limit: Maximum number of requests to return
31
"""
32
33
def list_and_lock_head(self, *, lock_secs: int, limit: int | None = None) -> dict:
34
"""Get and lock requests from head.
35
36
Args:
37
lock_secs: Lock duration in seconds
38
limit: Maximum number of requests to return
39
"""
40
41
def add_request(self, request: dict, *, forefront: bool | None = None) -> dict:
42
"""Add single request to queue.
43
44
Args:
45
request: Request object with url, method, headers, etc.
46
forefront: Whether to add to front of queue
47
"""
48
49
def get_request(self, request_id: str) -> dict | None:
50
"""Get specific request by ID.
51
52
Args:
53
request_id: Request identifier
54
"""
55
56
def update_request(self, request: dict, *, forefront: bool | None = None) -> dict:
57
"""Update existing request.
58
59
Args:
60
request: Updated request object
61
forefront: Whether to move to front of queue
62
"""
63
64
def delete_request(self, request_id: str) -> None:
65
"""Delete request by ID.
66
67
Args:
68
request_id: Request identifier
69
"""
70
71
def prolong_request_lock(
72
self,
73
request_id: str,
74
*,
75
forefront: bool | None = None,
76
lock_secs: int
77
) -> dict:
78
"""Extend request lock duration.
79
80
Args:
81
request_id: Request identifier
82
forefront: Whether to move to front when unlocked
83
lock_secs: New lock duration in seconds
84
"""
85
86
def delete_request_lock(self, request_id: str, *, forefront: bool | None = None) -> None:
87
"""Remove request lock.
88
89
Args:
90
request_id: Request identifier
91
forefront: Whether to move to front of queue
92
"""
93
94
def batch_add_requests(self, requests: list[dict], **kwargs) -> BatchAddRequestsResult:
95
"""Add multiple requests in batches.
96
97
Args:
98
requests: List of request objects
99
forefront: Whether to add to front of queue
100
**kwargs: Additional batch parameters
101
"""
102
103
def batch_delete_requests(self, requests: list[dict]) -> dict:
104
"""Delete multiple requests.
105
106
Args:
107
requests: List of request objects with IDs
108
"""
109
110
def list_requests(
111
self,
112
*,
113
limit: int | None = None,
114
exclusive_start_id: str | None = None
115
) -> dict:
116
"""List all requests in queue.
117
118
Args:
119
limit: Maximum number of requests
120
exclusive_start_id: ID to start listing from
121
"""
122
123
def unlock_requests(self) -> dict:
124
"""Unlock all requests locked by this client."""
125
126
class RequestQueueClientAsync:
127
"""Async version of RequestQueueClient with identical methods."""
128
129
class RequestQueueCollectionClient:
130
def list(self, **kwargs) -> ListPage[dict]:
131
"""List request queues.
132
133
Args:
134
unnamed (bool, optional): Include unnamed queues
135
limit (int, optional): Maximum number of items
136
offset (int, optional): Offset for pagination
137
desc (bool, optional): Sort in descending order
138
"""
139
140
def get_or_create(self, *, name: str | None = None) -> dict:
141
"""Get or create request queue.
142
143
Args:
144
name: Queue name
145
"""
146
147
class RequestQueueCollectionClientAsync:
148
"""Async version of RequestQueueCollectionClient with identical methods."""
149
```
150
151
### Batch Operations Result Types
152
153
```python { .api }
154
class BatchAddRequestsResult:
155
"""Result of batch add requests operation."""
156
157
added_requests: list[dict]
158
"""Successfully added requests."""
159
160
unprocessed_requests: list[dict]
161
"""Requests that could not be processed."""
162
163
processed_requests: int
164
"""Total number of processed requests."""
165
166
was_limit_reached: bool
167
"""Whether the operation hit rate limits."""
168
```
169
170
## Usage Examples
171
172
### Basic Request Queue Operations
173
174
```python
175
from apify_client import ApifyClient
176
177
client = ApifyClient('your-api-token')
178
179
# Create or get request queue
180
queue = client.request_queues().get_or_create(name='crawling-queue')
181
queue_client = client.request_queue(queue['id'])
182
183
# Add requests to queue
184
requests = [
185
{
186
'url': 'https://example.com',
187
'method': 'GET',
188
'headers': {'User-Agent': 'My Bot 1.0'}
189
},
190
{
191
'url': 'https://example.org/api/data',
192
'method': 'POST',
193
'headers': {'Content-Type': 'application/json'},
194
'payload': '{"query": "search term"}'
195
}
196
]
197
198
for request in requests:
199
result = queue_client.add_request(request)
200
print(f"Added request: {result['id']}")
201
202
# Get requests from head (FIFO)
203
head_requests = queue_client.list_head(limit=10)
204
print(f"Got {len(head_requests['items'])} requests from queue head")
205
```
206
207
### Advanced Request Processing
208
209
```python
210
# Process requests with locking
211
queue_client = client.request_queue('queue-id', client_key='worker-1')
212
213
# Get and lock requests for processing
214
locked_requests = queue_client.list_and_lock_head(lock_secs=300, limit=5)
215
216
for request in locked_requests['items']:
217
try:
218
# Process request
219
response = process_request(request)
220
221
# Update request with results
222
request['userData']['processed'] = True
223
request['userData']['response_status'] = response.status_code
224
queue_client.update_request(request)
225
226
# Remove processed request
227
queue_client.delete_request(request['id'])
228
229
except Exception as e:
230
# Extend lock if processing takes longer
231
queue_client.prolong_request_lock(request['id'], lock_secs=300)
232
233
# Or unlock to allow retry by other workers
234
queue_client.delete_request_lock(request['id'])
235
```
236
237
### Batch Request Management
238
239
```python
240
# Add large number of requests efficiently
241
urls = [f'https://example.com/page/{i}' for i in range(1000)]
242
243
batch_requests = []
244
for url in urls:
245
batch_requests.append({
246
'url': url,
247
'method': 'GET',
248
'userData': {'pageNumber': i}
249
})
250
251
# Add in batches
252
batch_size = 100
253
for i in range(0, len(batch_requests), batch_size):
254
batch = batch_requests[i:i + batch_size]
255
result = queue_client.batch_add_requests(batch)
256
257
print(f"Added {result.processed_requests} requests")
258
if result.was_limit_reached:
259
print("Rate limit reached, waiting...")
260
time.sleep(10)
261
```
262
263
### Queue Monitoring and Management
264
265
```python
266
# Monitor queue status
267
queue_info = queue_client.get()
268
print(f"Queue: {queue_info['name']}")
269
print(f"Total requests: {queue_info['totalRequestCount']}")
270
print(f"Handled requests: {queue_info['handledRequestCount']}")
271
print(f"Pending requests: {queue_info['pendingRequestCount']}")
272
273
# List all requests with pagination
274
all_requests = []
275
exclusive_start_id = None
276
277
while True:
278
batch = queue_client.list_requests(
279
limit=1000,
280
exclusive_start_id=exclusive_start_id
281
)
282
283
if not batch['items']:
284
break
285
286
all_requests.extend(batch['items'])
287
exclusive_start_id = batch['items'][-1]['id']
288
289
print(f"Retrieved {len(all_requests)} total requests")
290
291
# Clean up: delete failed requests
292
failed_requests = [
293
req for req in all_requests
294
if req.get('userData', {}).get('failed', False)
295
]
296
297
if failed_requests:
298
queue_client.batch_delete_requests(failed_requests)
299
print(f"Deleted {len(failed_requests)} failed requests")
300
```
301
302
### Multi-Worker Coordination
303
304
```python
305
# Worker coordination with client keys
306
worker_id = 'worker-001'
307
queue_client = client.request_queue('shared-queue', client_key=worker_id)
308
309
def worker_loop():
310
while True:
311
# Get exclusive access to requests
312
locked_requests = queue_client.list_and_lock_head(
313
lock_secs=600, # 10 minute lock
314
limit=3
315
)
316
317
if not locked_requests['items']:
318
print("No requests available, waiting...")
319
time.sleep(30)
320
continue
321
322
for request in locked_requests['items']:
323
try:
324
# Process request
325
result = crawl_page(request['url'])
326
327
# Mark as completed
328
queue_client.delete_request(request['id'])
329
330
except Exception as e:
331
print(f"Processing failed: {e}")
332
# Release lock for retry by other workers
333
queue_client.delete_request_lock(request['id'])
334
335
# Start worker
336
worker_loop()
337
```