or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

adapters.mdauthentication.mdcookies-exceptions.mddownloads.mdindex.mdmultipart.mdsessions-streaming.mdthreading.mdutilities.md

sessions-streaming.mddocs/

0

# Sessions and Streaming

1

2

Enhanced session classes with base URL support and streaming iterators for efficient handling of large uploads with known sizes, providing better control over HTTP request lifecycle and data streaming.

3

4

## Capabilities

5

6

### Base URL Sessions

7

8

Session class that automatically prepends a base URL to all requests, simplifying API client development.

9

10

```python { .api }

11

class BaseUrlSession:

12

"""

13

Session with automatic base URL handling for all requests.

14

15

Parameters:

16

- base_url: str, base URL for all requests (optional)

17

"""

18

def __init__(self, base_url=None): ...

19

20

def request(self, method, url, *args, **kwargs):

21

"""

22

Send request with base URL automatically prepended.

23

24

Parameters:

25

- method: str, HTTP method

26

- url: str, relative or absolute URL

27

- *args, **kwargs: standard requests arguments

28

29

Returns:

30

Response: HTTP response object

31

"""

32

33

def prepare_request(self, request, *args, **kwargs):

34

"""

35

Prepare request with base URL handling.

36

37

Parameters:

38

- request: Request object to prepare

39

- *args, **kwargs: preparation arguments

40

41

Returns:

42

PreparedRequest: prepared request with full URL

43

"""

44

45

def create_url(self, url):

46

"""

47

Create full URL from base URL and relative path.

48

49

Parameters:

50

- url: str, relative or absolute URL

51

52

Returns:

53

str: complete URL

54

"""

55

```

56

57

#### Usage Examples

58

59

```python

60

import requests

61

from requests_toolbelt.sessions import BaseUrlSession

62

63

# API client with base URL

64

api_session = BaseUrlSession(base_url='https://api.example.com/v1/')

65

66

# All requests automatically use base URL

67

users = api_session.get('users/') # GET https://api.example.com/v1/users/

68

user = api_session.get('users/123') # GET https://api.example.com/v1/users/123

69

api_session.post('users/', json={'name': 'John'}) # POST https://api.example.com/v1/users/

70

71

# Absolute URLs still work

72

external = api_session.get('https://external-api.com/data')

73

74

# Trailing slash behavior

75

session1 = BaseUrlSession('https://api.example.com/v1/')

76

session2 = BaseUrlSession('https://api.example.com/v1')

77

78

# Both work the same way

79

response1 = session1.get('users')

80

response2 = session2.get('users')

81

82

# Leading slash changes behavior (absolute path)

83

api_session = BaseUrlSession('https://api.example.com/v1/')

84

relative = api_session.get('users') # https://api.example.com/v1/users

85

absolute = api_session.get('/users') # https://api.example.com/users (note: no /v1/)

86

87

# Custom URL creation

88

class CustomBaseUrlSession(BaseUrlSession):

89

def create_url(self, url):

90

"""Custom URL creation with version parameter."""

91

if not url.startswith(('http://', 'https://')):

92

# Add version parameter to all relative URLs

93

separator = '&' if '?' in url else '?'

94

url = f"{url}{separator}version=2"

95

return super().create_url(url)

96

97

custom_session = CustomBaseUrlSession('https://api.example.com/')

98

response = custom_session.get('data') # GET https://api.example.com/data?version=2

99

```

100

101

### Streaming Iterator

102

103

Iterator interface for streaming large uploads with known sizes without using chunked transfer encoding.

104

105

```python { .api }

106

class StreamingIterator:

107

"""

108

Iterator for streaming data with known size to avoid chunked encoding.

109

110

Parameters:

111

- size: int, total size of data in bytes (must be positive)

112

- iterator: iterator yielding data chunks or file-like object with read method

113

- encoding: str, character encoding (default: 'utf-8')

114

"""

115

def __init__(self, size, iterator, encoding='utf-8'): ...

116

117

# Instance attributes

118

size: int # Expected size of the upload

119

len: int # Attribute that requests checks for body length

120

encoding: str # Encoding the input data is using

121

iterator: any # The iterator used to generate upload data

122

123

def read(self, n=-1):

124

"""

125

Read up to n bytes from the iterator.

126

127

Parameters:

128

- n: int, number of bytes to read (-1 for available)

129

130

Returns:

131

bytes: data chunk

132

"""

133

134

def __len__(self):

135

"""

136

Return total size of data.

137

138

Returns:

139

int: total size in bytes

140

"""

141

142

def __iter__(self):

143

"""Return iterator interface."""

144

```

145

146

#### Usage Examples

147

148

```python

149

import requests

150

from requests_toolbelt import StreamingIterator

151

152

# Stream large file with known size

153

def file_chunks(filename, chunk_size=8192):

154

"""Generator that yields file chunks."""

155

with open(filename, 'rb') as f:

156

while True:

157

chunk = f.read(chunk_size)

158

if not chunk:

159

break

160

yield chunk

161

162

# Get file size

163

import os

164

file_size = os.path.getsize('large_file.dat')

165

166

# Create streaming iterator

167

stream = StreamingIterator(file_size, file_chunks('large_file.dat'))

168

169

# Upload without chunked encoding

170

response = requests.post(

171

'https://upload.example.com/files',

172

data=stream,

173

headers={'Content-Length': str(len(stream))}

174

)

175

176

# Stream generated data

177

def generate_data():

178

"""Generator that creates data on the fly."""

179

for i in range(1000):

180

yield f"Line {i}: {'x' * 100}\\n".encode('utf-8')

181

182

# Calculate total size

183

data_size = sum(len(f"Line {i}: {'x' * 100}\\n".encode('utf-8')) for i in range(1000))

184

185

stream = StreamingIterator(data_size, generate_data())

186

187

response = requests.put(

188

'https://api.example.com/data',

189

data=stream,

190

headers={

191

'Content-Type': 'text/plain',

192

'Content-Length': str(len(stream))

193

}

194

)

195

196

# Stream with progress monitoring

197

def monitored_file_stream(filename, progress_callback=None):

198

"""Stream file with progress updates."""

199

file_size = os.path.getsize(filename)

200

bytes_sent = 0

201

202

def chunks_with_progress():

203

nonlocal bytes_sent

204

with open(filename, 'rb') as f:

205

while True:

206

chunk = f.read(8192)

207

if not chunk:

208

break

209

bytes_sent += len(chunk)

210

if progress_callback:

211

progress_callback(bytes_sent, file_size)

212

yield chunk

213

214

return StreamingIterator(file_size, chunks_with_progress())

215

216

def upload_progress(sent, total):

217

percent = (sent / total) * 100

218

print(f"\\rUpload progress: {percent:.1f}% ({sent}/{total} bytes)", end='')

219

220

# Upload with progress

221

stream = monitored_file_stream('large_video.mp4', upload_progress)

222

response = requests.post(

223

'https://upload.service.com/video',

224

data=stream,

225

headers={'Content-Length': str(len(stream))}

226

)

227

print("\\nUpload complete!")

228

```

229

230

### Advanced Session Patterns

231

232

```python

233

from requests_toolbelt.sessions import BaseUrlSession

234

from requests_toolbelt.utils import user_agent

235

from requests_toolbelt import GuessAuth

236

237

class APIClient(BaseUrlSession):

238

"""Enhanced API client with authentication and error handling."""

239

240

def __init__(self, base_url, api_key=None, username=None, password=None):

241

super().__init__(base_url)

242

243

# Set user agent

244

self.headers['User-Agent'] = user_agent('api-client', '1.0')

245

246

# Configure authentication

247

if api_key:

248

self.headers['Authorization'] = f'Bearer {api_key}'

249

elif username and password:

250

self.auth = GuessAuth(username, password)

251

252

def request(self, method, url, **kwargs):

253

"""Override to add error handling."""

254

response = super().request(method, url, **kwargs)

255

256

# Handle common errors

257

if response.status_code == 401:

258

raise Exception("Authentication failed")

259

elif response.status_code == 429:

260

raise Exception("Rate limit exceeded")

261

elif response.status_code >= 500:

262

raise Exception(f"Server error: {response.status_code}")

263

264

return response

265

266

# Usage

267

client = APIClient(

268

'https://api.myservice.com/v2/',

269

api_key='your-api-key-here'

270

)

271

272

try:

273

users = client.get('users').json()

274

user = client.post('users', json={'name': 'John', 'email': 'john@example.com'}).json()

275

print(f"Created user: {user}")

276

except Exception as e:

277

print(f"API error: {e}")

278

279

# Multi-environment client

280

class MultiEnvAPIClient:

281

"""API client supporting multiple environments."""

282

283

def __init__(self, environment='production'):

284

base_urls = {

285

'development': 'https://dev-api.example.com/',

286

'staging': 'https://staging-api.example.com/',

287

'production': 'https://api.example.com/'

288

}

289

290

self.session = BaseUrlSession(base_urls[environment])

291

self.session.headers['User-Agent'] = user_agent('multi-env-client', '1.0')

292

293

def get_user(self, user_id):

294

return self.session.get(f'users/{user_id}').json()

295

296

def create_user(self, user_data):

297

return self.session.post('users', json=user_data).json()

298

299

# Usage

300

dev_client = MultiEnvAPIClient('development')

301

prod_client = MultiEnvAPIClient('production')

302

303

# Same interface, different endpoints

304

dev_user = dev_client.get_user(123)

305

prod_user = prod_client.get_user(123)

306

```

307

308

### Memory-Efficient Streaming

309

310

```python

311

import requests

312

from requests_toolbelt import StreamingIterator

313

import hashlib

314

315

def stream_with_checksum(data_iterator, total_size):

316

"""Stream data while calculating checksum."""

317

hasher = hashlib.sha256()

318

319

def chunks_with_hash():

320

for chunk in data_iterator:

321

hasher.update(chunk)

322

yield chunk

323

324

stream = StreamingIterator(total_size, chunks_with_hash())

325

326

# Upload data

327

response = requests.post(

328

'https://upload.example.com/verify',

329

data=stream,

330

headers={

331

'Content-Length': str(total_size),

332

'Content-Type': 'application/octet-stream'

333

}

334

)

335

336

# Return response and checksum

337

return response, hasher.hexdigest()

338

339

# Usage with large file

340

def large_file_iterator(filename):

341

with open(filename, 'rb') as f:

342

while True:

343

chunk = f.read(64 * 1024) # 64KB chunks

344

if not chunk:

345

break

346

yield chunk

347

348

file_size = os.path.getsize('large_database_backup.sql')

349

response, checksum = stream_with_checksum(

350

large_file_iterator('large_database_backup.sql'),

351

file_size

352

)

353

354

print(f"Upload completed: {response.status_code}")

355

print(f"SHA256 checksum: {checksum}")

356

```