or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

adapters.mdauthentication.mdcookies-exceptions.mddownloads.mdindex.mdmultipart.mdsessions-streaming.mdthreading.mdutilities.md

threading.mddocs/

0

# Parallel Requests

1

2

Thread pool implementation for executing multiple HTTP requests concurrently with session reuse, customizable initialization, and comprehensive error handling.

3

4

## Capabilities

5

6

### Simple Parallel Execution

7

8

Execute multiple requests concurrently using a simple interface.

9

10

```python { .api }

11

def map(requests, **kwargs):

12

"""

13

Execute multiple requests in parallel using thread pool.

14

15

Parameters:

16

- requests: list of dict, each containing request parameters

17

- num_processes: int, number of worker threads (default: CPU count)

18

- initializer: callable, function to initialize each session

19

- initargs: tuple, arguments for initializer function

20

21

Returns:

22

tuple: (responses, exceptions) - generators for successful and failed requests

23

"""

24

```

25

26

#### Usage Examples

27

28

```python

29

from requests_toolbelt import threaded

30

31

# Simple parallel GET requests

32

urls_to_get = [

33

{'url': 'https://api.github.com/users/octocat', 'method': 'GET'},

34

{'url': 'https://api.github.com/users/defunkt', 'method': 'GET'},

35

{'url': 'https://api.github.com/repos/requests/requests', 'method': 'GET'},

36

]

37

38

responses, exceptions = threaded.map(urls_to_get)

39

40

for response in responses:

41

print(f"Status: {response.status_code}, URL: {response.url}")

42

43

for exception in exceptions:

44

print(f"Error: {exception}")

45

46

# Mixed request methods with parameters

47

requests_to_make = [

48

{

49

'url': 'https://httpbin.org/get',

50

'method': 'GET',

51

'params': {'key': 'value'}

52

},

53

{

54

'url': 'https://httpbin.org/post',

55

'method': 'POST',

56

'json': {'data': 'test'}

57

},

58

{

59

'url': 'https://httpbin.org/put',

60

'method': 'PUT',

61

'data': 'raw data'

62

}

63

]

64

65

responses, exceptions = threaded.map(requests_to_make, num_processes=5)

66

67

# Custom number of threads

68

responses, exceptions = threaded.map(urls_to_get, num_processes=10)

69

```

70

71

### Session Initialization

72

73

Customize session configuration for all threads.

74

75

```python

76

from requests_toolbelt import threaded, user_agent

77

78

def setup_session(session):

79

"""Initialize session with custom settings."""

80

session.headers['User-Agent'] = user_agent('my-scraper', '1.0')

81

session.headers['Accept'] = 'application/json'

82

session.timeout = 30

83

84

urls = [

85

{'url': 'https://api.example.com/data/1', 'method': 'GET'},

86

{'url': 'https://api.example.com/data/2', 'method': 'GET'},

87

{'url': 'https://api.example.com/data/3', 'method': 'GET'},

88

]

89

90

responses, exceptions = threaded.map(

91

urls,

92

initializer=setup_session,

93

num_processes=3

94

)

95

96

# Session with authentication

97

def setup_authenticated_session(session, api_key):

98

"""Setup session with API key authentication."""

99

session.headers['Authorization'] = f'Bearer {api_key}'

100

session.headers['Content-Type'] = 'application/json'

101

102

responses, exceptions = threaded.map(

103

requests_to_make,

104

initializer=setup_authenticated_session,

105

initargs=('your-api-key-here',),

106

num_processes=5

107

)

108

```

109

110

### Advanced Thread Pool

111

112

Direct access to the thread pool for more control over execution.

113

114

```python { .api }

115

class Pool:

116

"""

117

Thread pool for parallel HTTP requests.

118

119

Parameters:

120

- num_processes: int, number of worker threads (default: CPU count)

121

- initializer: callable, function to initialize each session

122

- initargs: tuple, arguments for initializer function

123

- job_queue: Queue, custom job queue (optional)

124

"""

125

def __init__(self, num_processes=None, initializer=None, initargs=None, job_queue=None): ...

126

127

def responses(self):

128

"""

129

Generator yielding successful responses.

130

131

Yields:

132

ThreadResponse: wrapped response objects

133

"""

134

135

def exceptions(self):

136

"""

137

Generator yielding exceptions from failed requests.

138

139

Yields:

140

ThreadException: wrapped exception objects

141

"""

142

143

def join_all(self):

144

"""Wait for all threads to complete."""

145

146

class ThreadResponse:

147

"""Wrapper for successful HTTP responses."""

148

def __init__(self, response): ...

149

150

class ThreadException:

151

"""Wrapper for failed HTTP requests."""

152

def __init__(self, exception, request): ...

153

```

154

155

#### Usage Examples

156

157

```python

158

from requests_toolbelt.threaded.pool import Pool

159

import queue

160

161

# Create job queue

162

job_queue = queue.Queue()

163

requests_to_make = [

164

{'url': 'https://api.example.com/slow-endpoint', 'method': 'GET'},

165

{'url': 'https://api.example.com/fast-endpoint', 'method': 'GET'},

166

{'url': 'https://api.example.com/medium-endpoint', 'method': 'GET'},

167

]

168

169

for request in requests_to_make:

170

job_queue.put(request)

171

172

# Create and use pool directly

173

pool = Pool(num_processes=2, job_queue=job_queue)

174

175

# Process responses as they complete

176

for response in pool.responses():

177

print(f"Completed: {response.url} - Status: {response.status_code}")

178

print(f"Response time: {response.elapsed.total_seconds()}s")

179

180

# Handle any exceptions

181

for exception in pool.exceptions():

182

print(f"Failed request: {exception.request}")

183

print(f"Exception: {exception.exception}")

184

185

pool.join_all()

186

```

187

188

### Session Thread Management

189

190

Individual thread workers for custom threading scenarios.

191

192

```python { .api }

193

class SessionThread:

194

"""

195

Individual thread worker for HTTP requests.

196

197

Parameters:

198

- job_queue: Queue, queue containing request jobs

199

- response_queue: Queue, queue for successful responses

200

- exception_queue: Queue, queue for exceptions

201

- initializer: callable, session initialization function

202

- initargs: tuple, arguments for initializer

203

"""

204

def __init__(self, job_queue, response_queue, exception_queue,

205

initializer=None, initargs=None): ...

206

```

207

208

### Error Handling and Monitoring

209

210

```python

211

from requests_toolbelt import threaded

212

import time

213

214

def monitor_parallel_requests():

215

"""Example of monitoring parallel requests with error handling."""

216

217

urls = [

218

{'url': 'https://httpbin.org/delay/1', 'method': 'GET'},

219

{'url': 'https://httpbin.org/status/404', 'method': 'GET'}, # Will fail

220

{'url': 'https://httpbin.org/delay/2', 'method': 'GET'},

221

{'url': 'https://invalid-url-example.com', 'method': 'GET'}, # Will fail

222

]

223

224

start_time = time.time()

225

responses, exceptions = threaded.map(urls, num_processes=4)

226

227

successful_count = 0

228

error_count = 0

229

230

print("Successful responses:")

231

for response in responses:

232

successful_count += 1

233

print(f" {response.url}: {response.status_code}")

234

235

print("\\nErrors:")

236

for exception in exceptions:

237

error_count += 1

238

print(f" {exception.request.get('url', 'Unknown')}: {exception.exception}")

239

240

total_time = time.time() - start_time

241

print(f"\\nCompleted in {total_time:.2f}s")

242

print(f"Success: {successful_count}, Errors: {error_count}")

243

244

# Usage

245

monitor_parallel_requests()

246

```

247

248

### Batch Processing with Rate Limiting

249

250

```python

251

from requests_toolbelt import threaded

252

import time

253

254

def batch_with_rate_limit(urls, batch_size=10, delay_between_batches=1):

255

"""Process URLs in batches with rate limiting."""

256

257

def setup_session(session):

258

session.timeout = 30

259

session.headers['User-Agent'] = 'Batch Processor 1.0'

260

261

results = []

262

263

for i in range(0, len(urls), batch_size):

264

batch = urls[i:i + batch_size]

265

print(f"Processing batch {i//batch_size + 1} ({len(batch)} requests)")

266

267

responses, exceptions = threaded.map(

268

batch,

269

initializer=setup_session,

270

num_processes=min(5, len(batch))

271

)

272

273

batch_results = {

274

'responses': list(responses),

275

'exceptions': list(exceptions)

276

}

277

results.append(batch_results)

278

279

if i + batch_size < len(urls): # Don't delay after last batch

280

time.sleep(delay_between_batches)

281

282

return results

283

284

# Usage

285

urls = [{'url': f'https://httpbin.org/delay/{i%3}', 'method': 'GET'} for i in range(50)]

286

results = batch_with_rate_limit(urls, batch_size=10, delay_between_batches=2)

287

```