or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

actors.mdbuilds.mdindex.mdlogging.mdrequest-queues.mdruns.mdschedules.mdstorage.mdstore.mdtasks.mdusers.mdwebhooks.md

request-queues.mddocs/

0

# Request Queue Management

1

2

Request queue operations for managing crawling workflows and Actor communication. Request queues provide distributed, persistent storage for web crawling requests with advanced features like deduplication, prioritization, and request locking.

3

4

## Capabilities

5

6

### Request Queue Operations

7

8

Individual request queue management with comprehensive request handling capabilities.

9

10

```python { .api }

11

class RequestQueueClient:

12

def get(self) -> dict | None:

13

"""Get request queue information."""

14

15

def update(self, *, name: str | None = None, general_access: StorageGeneralAccess | None = None) -> dict:

16

"""Update queue configuration.

17

18

Args:

19

name: Queue name

20

general_access: Storage access level (from apify_shared.consts)

21

"""

22

23

def delete(self) -> None:

24

"""Delete queue."""

25

26

def list_head(self, *, limit: int | None = None) -> dict:

27

"""Get requests from queue head.

28

29

Args:

30

limit: Maximum number of requests to return

31

"""

32

33

def list_and_lock_head(self, *, lock_secs: int, limit: int | None = None) -> dict:

34

"""Get and lock requests from head.

35

36

Args:

37

lock_secs: Lock duration in seconds

38

limit: Maximum number of requests to return

39

"""

40

41

def add_request(self, request: dict, *, forefront: bool | None = None) -> dict:

42

"""Add single request to queue.

43

44

Args:

45

request: Request object with url, method, headers, etc.

46

forefront: Whether to add to front of queue

47

"""

48

49

def get_request(self, request_id: str) -> dict | None:

50

"""Get specific request by ID.

51

52

Args:

53

request_id: Request identifier

54

"""

55

56

def update_request(self, request: dict, *, forefront: bool | None = None) -> dict:

57

"""Update existing request.

58

59

Args:

60

request: Updated request object

61

forefront: Whether to move to front of queue

62

"""

63

64

def delete_request(self, request_id: str) -> None:

65

"""Delete request by ID.

66

67

Args:

68

request_id: Request identifier

69

"""

70

71

def prolong_request_lock(

72

self,

73

request_id: str,

74

*,

75

forefront: bool | None = None,

76

lock_secs: int

77

) -> dict:

78

"""Extend request lock duration.

79

80

Args:

81

request_id: Request identifier

82

forefront: Whether to move to front when unlocked

83

lock_secs: New lock duration in seconds

84

"""

85

86

def delete_request_lock(self, request_id: str, *, forefront: bool | None = None) -> None:

87

"""Remove request lock.

88

89

Args:

90

request_id: Request identifier

91

forefront: Whether to move to front of queue

92

"""

93

94

def batch_add_requests(self, requests: list[dict], **kwargs) -> BatchAddRequestsResult:

95

"""Add multiple requests in batches.

96

97

Args:

98

requests: List of request objects

99

forefront: Whether to add to front of queue

100

**kwargs: Additional batch parameters

101

"""

102

103

def batch_delete_requests(self, requests: list[dict]) -> dict:

104

"""Delete multiple requests.

105

106

Args:

107

requests: List of request objects with IDs

108

"""

109

110

def list_requests(

111

self,

112

*,

113

limit: int | None = None,

114

exclusive_start_id: str | None = None

115

) -> dict:

116

"""List all requests in queue.

117

118

Args:

119

limit: Maximum number of requests

120

exclusive_start_id: ID to start listing from

121

"""

122

123

def unlock_requests(self) -> dict:

124

"""Unlock all requests locked by this client."""

125

126

class RequestQueueClientAsync:

127

"""Async version of RequestQueueClient with identical methods."""

128

129

class RequestQueueCollectionClient:

130

def list(self, **kwargs) -> ListPage[dict]:

131

"""List request queues.

132

133

Args:

134

unnamed (bool, optional): Include unnamed queues

135

limit (int, optional): Maximum number of items

136

offset (int, optional): Offset for pagination

137

desc (bool, optional): Sort in descending order

138

"""

139

140

def get_or_create(self, *, name: str | None = None) -> dict:

141

"""Get or create request queue.

142

143

Args:

144

name: Queue name

145

"""

146

147

class RequestQueueCollectionClientAsync:

148

"""Async version of RequestQueueCollectionClient with identical methods."""

149

```

150

151

### Batch Operations Result Types

152

153

```python { .api }

154

class BatchAddRequestsResult:

155

"""Result of batch add requests operation."""

156

157

added_requests: list[dict]

158

"""Successfully added requests."""

159

160

unprocessed_requests: list[dict]

161

"""Requests that could not be processed."""

162

163

processed_requests: int

164

"""Total number of processed requests."""

165

166

was_limit_reached: bool

167

"""Whether the operation hit rate limits."""

168

```

169

170

## Usage Examples

171

172

### Basic Request Queue Operations

173

174

```python

175

from apify_client import ApifyClient

176

177

client = ApifyClient('your-api-token')

178

179

# Create or get request queue

180

queue = client.request_queues().get_or_create(name='crawling-queue')

181

queue_client = client.request_queue(queue['id'])

182

183

# Add requests to queue

184

requests = [

185

{

186

'url': 'https://example.com',

187

'method': 'GET',

188

'headers': {'User-Agent': 'My Bot 1.0'}

189

},

190

{

191

'url': 'https://example.org/api/data',

192

'method': 'POST',

193

'headers': {'Content-Type': 'application/json'},

194

'payload': '{"query": "search term"}'

195

}

196

]

197

198

for request in requests:

199

result = queue_client.add_request(request)

200

print(f"Added request: {result['id']}")

201

202

# Get requests from head (FIFO)

203

head_requests = queue_client.list_head(limit=10)

204

print(f"Got {len(head_requests['items'])} requests from queue head")

205

```

206

207

### Advanced Request Processing

208

209

```python

210

# Process requests with locking

211

queue_client = client.request_queue('queue-id', client_key='worker-1')

212

213

# Get and lock requests for processing

214

locked_requests = queue_client.list_and_lock_head(lock_secs=300, limit=5)

215

216

for request in locked_requests['items']:

217

try:

218

# Process request

219

response = process_request(request)

220

221

# Update request with results

222

request['userData']['processed'] = True

223

request['userData']['response_status'] = response.status_code

224

queue_client.update_request(request)

225

226

# Remove processed request

227

queue_client.delete_request(request['id'])

228

229

except Exception as e:

230

# Extend lock if processing takes longer

231

queue_client.prolong_request_lock(request['id'], lock_secs=300)

232

233

# Or unlock to allow retry by other workers

234

queue_client.delete_request_lock(request['id'])

235

```

236

237

### Batch Request Management

238

239

```python

240

# Add large number of requests efficiently

241

urls = [f'https://example.com/page/{i}' for i in range(1000)]

242

243

batch_requests = []

244

for url in urls:

245

batch_requests.append({

246

'url': url,

247

'method': 'GET',

248

'userData': {'pageNumber': i}

249

})

250

251

# Add in batches

252

batch_size = 100

253

for i in range(0, len(batch_requests), batch_size):

254

batch = batch_requests[i:i + batch_size]

255

result = queue_client.batch_add_requests(batch)

256

257

print(f"Added {result.processed_requests} requests")

258

if result.was_limit_reached:

259

print("Rate limit reached, waiting...")

260

time.sleep(10)

261

```

262

263

### Queue Monitoring and Management

264

265

```python

266

# Monitor queue status

267

queue_info = queue_client.get()

268

print(f"Queue: {queue_info['name']}")

269

print(f"Total requests: {queue_info['totalRequestCount']}")

270

print(f"Handled requests: {queue_info['handledRequestCount']}")

271

print(f"Pending requests: {queue_info['pendingRequestCount']}")

272

273

# List all requests with pagination

274

all_requests = []

275

exclusive_start_id = None

276

277

while True:

278

batch = queue_client.list_requests(

279

limit=1000,

280

exclusive_start_id=exclusive_start_id

281

)

282

283

if not batch['items']:

284

break

285

286

all_requests.extend(batch['items'])

287

exclusive_start_id = batch['items'][-1]['id']

288

289

print(f"Retrieved {len(all_requests)} total requests")

290

291

# Clean up: delete failed requests

292

failed_requests = [

293

req for req in all_requests

294

if req.get('userData', {}).get('failed', False)

295

]

296

297

if failed_requests:

298

queue_client.batch_delete_requests(failed_requests)

299

print(f"Deleted {len(failed_requests)} failed requests")

300

```

301

302

### Multi-Worker Coordination

303

304

```python

305

# Worker coordination with client keys

306

worker_id = 'worker-001'

307

queue_client = client.request_queue('shared-queue', client_key=worker_id)

308

309

def worker_loop():

310

while True:

311

# Get exclusive access to requests

312

locked_requests = queue_client.list_and_lock_head(

313

lock_secs=600, # 10 minute lock

314

limit=3

315

)

316

317

if not locked_requests['items']:

318

print("No requests available, waiting...")

319

time.sleep(30)

320

continue

321

322

for request in locked_requests['items']:

323

try:

324

# Process request

325

result = crawl_page(request['url'])

326

327

# Mark as completed

328

queue_client.delete_request(request['id'])

329

330

except Exception as e:

331

print(f"Processing failed: {e}")

332

# Release lock for retry by other workers

333

queue_client.delete_request_lock(request['id'])

334

335

# Start worker

336

worker_loop()

337

```