or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

authentication.mdconfiguration.mdexceptions.mdindex.mdpaging.mdpipeline.mdpolling.mdserialization.mdservice-client.md

pipeline.mddocs/

0

# Pipeline System

1

2

Configurable HTTP request/response pipeline with policy-based architecture supporting custom middleware, authentication injection, logging, and retry logic. The pipeline system provides a flexible way to process HTTP requests and responses through a chain of policies.

3

4

## Capabilities

5

6

### Pipeline

7

8

Main pipeline class that orchestrates request/response processing through policies.

9

10

```python { .api }

11

class Pipeline:

12

def __init__(self, policies=None, sender=None):

13

"""

14

Initialize HTTP pipeline.

15

16

Parameters:

17

- policies: List of HTTPPolicy or SansIOHTTPPolicy objects

18

- sender: HTTPSender for executing requests (defaults to RequestsHTTPSender)

19

"""

20

21

def run(self, request, **kwargs):

22

"""

23

Execute request through pipeline.

24

25

Parameters:

26

- request: HTTP request object

27

- kwargs: Additional configuration options

28

29

Returns:

30

Response object from pipeline execution

31

"""

32

33

def __enter__(self):

34

"""Enter context manager."""

35

36

def __exit__(self, *exc_details):

37

"""Exit context manager."""

38

```

39

40

### HTTP Policies

41

42

Policy interfaces for processing requests and responses.

43

44

```python { .api }

45

class HTTPPolicy:

46

def __init__(self):

47

"""Initialize HTTP policy."""

48

self.next = None # Next policy in chain

49

50

def send(self, request, **kwargs):

51

"""

52

Process request and call next policy.

53

54

Parameters:

55

- request: Request object to process

56

- kwargs: Additional configuration

57

58

Returns:

59

Response object

60

"""

61

62

class SansIOHTTPPolicy:

63

"""

64

Sans I/O policy for request/response processing.

65

Can act before and after I/O without being tied to specific HTTP implementation.

66

"""

67

68

def on_request(self, request, **kwargs):

69

"""

70

Process request before sending.

71

72

Parameters:

73

- request: Request to process

74

- kwargs: Additional options

75

"""

76

77

def on_response(self, request, response, **kwargs):

78

"""

79

Process response after receiving.

80

81

Parameters:

82

- request: Original request

83

- response: Received response

84

- kwargs: Additional options

85

"""

86

87

def on_exception(self, request, **kwargs) -> bool:

88

"""

89

Handle exceptions during request processing.

90

91

Parameters:

92

- request: Request that caused exception

93

- kwargs: Additional context

94

95

Returns:

96

True if exception was handled, False to re-raise

97

"""

98

```

99

100

### HTTP Sender

101

102

Abstract base for HTTP request execution.

103

104

```python { .api }

105

class HTTPSender:

106

def send(self, request, **config):

107

"""

108

Send HTTP request.

109

110

Parameters:

111

- request: Request object to send

112

- config: Configuration overrides

113

114

Returns:

115

Response object

116

"""

117

118

def build_context(self):

119

"""

120

Build context object for pipeline.

121

122

Returns:

123

Context object (implementation specific)

124

"""

125

126

def __enter__(self):

127

"""Enter context manager."""

128

129

def __exit__(self, *exc_details):

130

"""Exit context manager."""

131

```

132

133

### Request and Response Wrappers

134

135

Pipeline-specific request and response containers.

136

137

```python { .api }

138

class Request:

139

def __init__(self, http_request, context=None):

140

"""

141

Pipeline request wrapper.

142

143

Parameters:

144

- http_request: Underlying HTTP request object

145

- context: Pipeline context data

146

"""

147

148

http_request: any # Underlying HTTP request

149

context: any # Pipeline context

150

151

class Response:

152

def __init__(self, request, http_response, context=None):

153

"""

154

Pipeline response wrapper.

155

156

Parameters:

157

- request: Original Request object

158

- http_response: Underlying HTTP response

159

- context: Pipeline context

160

"""

161

162

request: Request # Original request

163

http_response: any # Underlying HTTP response

164

context: dict # Pipeline context dictionary

165

```

166

167

### Built-in Policies

168

169

Common policies provided by msrest.

170

171

```python { .api }

172

class UserAgentPolicy:

173

"""Policy for managing User-Agent header."""

174

175

def __init__(self, user_agent=None):

176

"""

177

Initialize User-Agent policy.

178

179

Parameters:

180

- user_agent: Custom user agent string

181

"""

182

183

user_agent: str

184

185

def add_user_agent(self, value: str):

186

"""Add value to user agent string."""

187

188

class HTTPLogger:

189

"""Policy for logging HTTP requests and responses."""

190

191

enable_http_logger: bool = True

192

193

def __init__(self, enable_http_logger=True):

194

"""

195

Initialize HTTP logger policy.

196

197

Parameters:

198

- enable_http_logger: Enable/disable logging

199

"""

200

201

class RawDeserializer:

202

"""Policy for deserializing raw HTTP responses."""

203

204

CONTEXT_NAME: str = "deserialized_data"

205

206

@staticmethod

207

def deserialize_from_text(data, content_type=None):

208

"""

209

Deserialize text data.

210

211

Parameters:

212

- data: Text data to deserialize

213

- content_type: Content type hint

214

215

Returns:

216

Deserialized data

217

"""

218

219

@staticmethod

220

def deserialize_from_http_generics(text, headers):

221

"""

222

Deserialize from HTTP response components.

223

224

Parameters:

225

- text: Response text

226

- headers: Response headers

227

228

Returns:

229

Deserialized data

230

"""

231

```

232

233

### Async Pipeline (Python 3.5+)

234

235

Async versions of pipeline components.

236

237

```python { .api }

238

class AsyncPipeline:

239

"""Async version of Pipeline."""

240

241

def __init__(self, policies=None, sender=None):

242

"""Initialize async pipeline."""

243

244

async def run(self, request, **kwargs):

245

"""Execute request through async pipeline."""

246

247

class AsyncHTTPPolicy:

248

"""Async HTTP policy interface."""

249

250

async def send(self, request, **kwargs):

251

"""Process request asynchronously."""

252

253

class AsyncHTTPSender:

254

"""Async HTTP sender interface."""

255

256

async def send(self, request, **config):

257

"""Send request asynchronously."""

258

```

259

260

## Usage Examples

261

262

### Basic Pipeline Setup

263

264

```python

265

from msrest.pipeline import Pipeline, HTTPPolicy

266

from msrest import ServiceClient, Configuration

267

268

# Create custom policy

269

class LoggingPolicy(HTTPPolicy):

270

def send(self, request, **kwargs):

271

print(f"Sending request to: {request.http_request.url}")

272

response = self.next.send(request, **kwargs)

273

print(f"Received response: {response.http_response.status_code}")

274

return response

275

276

# Create pipeline with policies

277

policies = [LoggingPolicy()]

278

pipeline = Pipeline(policies)

279

280

# Use with service client

281

config = Configuration(base_url='https://api.example.com')

282

config.pipeline = pipeline

283

284

client = ServiceClient(None, config)

285

```

286

287

### Custom Authentication Policy

288

289

```python

290

from msrest.pipeline import SansIOHTTPPolicy

291

292

class CustomAuthPolicy(SansIOHTTPPolicy):

293

"""Custom authentication policy."""

294

295

def __init__(self, api_key):

296

self.api_key = api_key

297

298

def on_request(self, request, **kwargs):

299

"""Add authentication header to request."""

300

request.http_request.headers['Authorization'] = f'Bearer {self.api_key}'

301

302

def on_response(self, request, response, **kwargs):

303

"""Handle authentication errors."""

304

if response.http_response.status_code == 401:

305

print("Authentication failed - token may be expired")

306

307

# Use custom auth policy

308

auth_policy = CustomAuthPolicy('your-api-key')

309

policies = [auth_policy]

310

pipeline = Pipeline(policies)

311

```

312

313

### Retry Policy

314

315

```python

316

import time

317

import random

318

from msrest.pipeline import HTTPPolicy

319

from msrest.exceptions import HttpOperationError

320

321

class RetryPolicy(HTTPPolicy):

322

"""Simple retry policy with exponential backoff."""

323

324

def __init__(self, max_retries=3, base_delay=1):

325

super(RetryPolicy, self).__init__()

326

self.max_retries = max_retries

327

self.base_delay = base_delay

328

329

def send(self, request, **kwargs):

330

"""Send request with retry logic."""

331

last_exception = None

332

333

for attempt in range(self.max_retries + 1):

334

try:

335

response = self.next.send(request, **kwargs)

336

337

# Check if we should retry based on status code

338

if response.http_response.status_code >= 500:

339

if attempt < self.max_retries:

340

delay = self.base_delay * (2 ** attempt) + random.uniform(0, 1)

341

print(f"Server error, retrying in {delay:.1f}s (attempt {attempt + 1})")

342

time.sleep(delay)

343

continue

344

345

return response

346

347

except Exception as e:

348

last_exception = e

349

if attempt < self.max_retries:

350

delay = self.base_delay * (2 ** attempt)

351

print(f"Request failed, retrying in {delay}s (attempt {attempt + 1})")

352

time.sleep(delay)

353

continue

354

355

# All retries exhausted

356

if last_exception:

357

raise last_exception

358

else:

359

return response

360

361

# Use retry policy

362

retry_policy = RetryPolicy(max_retries=3, base_delay=2)

363

policies = [retry_policy]

364

pipeline = Pipeline(policies)

365

```

366

367

### Request/Response Transformation

368

369

```python

370

from msrest.pipeline import SansIOHTTPPolicy

371

import json

372

373

class RequestTransformPolicy(SansIOHTTPPolicy):

374

"""Transform requests and responses."""

375

376

def on_request(self, request, **kwargs):

377

"""Transform outgoing requests."""

378

# Add timestamp to all requests

379

if hasattr(request.http_request, 'data') and request.http_request.data:

380

try:

381

data = json.loads(request.http_request.data)

382

data['timestamp'] = time.time()

383

request.http_request.data = json.dumps(data)

384

except (json.JSONDecodeError, TypeError):

385

pass # Skip transformation for non-JSON data

386

387

# Add correlation ID

388

import uuid

389

correlation_id = str(uuid.uuid4())

390

request.http_request.headers['X-Correlation-ID'] = correlation_id

391

392

# Store in context for response processing

393

if not request.context:

394

request.context = {}

395

request.context['correlation_id'] = correlation_id

396

397

def on_response(self, request, response, **kwargs):

398

"""Transform incoming responses."""

399

# Log correlation

400

correlation_id = request.context.get('correlation_id')

401

if correlation_id:

402

print(f"Response for correlation ID {correlation_id}")

403

404

# Add custom header to context

405

if hasattr(response.http_response, 'headers'):

406

response.context['server_time'] = response.http_response.headers.get('Date')

407

408

# Use transformation policy

409

transform_policy = RequestTransformPolicy()

410

policies = [transform_policy]

411

pipeline = Pipeline(policies)

412

```

413

414

### Pipeline with Multiple Policies

415

416

```python

417

from msrest.pipeline import Pipeline

418

from msrest.pipeline.universal import UserAgentPolicy, HTTPLogger

419

420

# Create multiple policies

421

user_agent_policy = UserAgentPolicy()

422

user_agent_policy.add_user_agent('MyApp/1.0')

423

424

http_logger = HTTPLogger(enable_http_logger=True)

425

426

class MetricsPolicy(SansIOHTTPPolicy):

427

"""Collect request metrics."""

428

429

def __init__(self):

430

self.request_count = 0

431

self.response_times = []

432

433

def on_request(self, request, **kwargs):

434

self.request_count += 1

435

request.context['start_time'] = time.time()

436

437

def on_response(self, request, response, **kwargs):

438

if 'start_time' in request.context:

439

duration = time.time() - request.context['start_time']

440

self.response_times.append(duration)

441

print(f"Request took {duration:.3f}s")

442

443

metrics_policy = MetricsPolicy()

444

445

# Combine policies (order matters)

446

policies = [

447

user_agent_policy, # Set user agent first

448

metrics_policy, # Collect metrics

449

retry_policy, # Retry on failures

450

http_logger # Log requests (usually last)

451

]

452

453

pipeline = Pipeline(policies)

454

455

# Use pipeline

456

config = Configuration(base_url='https://api.example.com')

457

config.pipeline = pipeline

458

459

with ServiceClient(None, config) as client:

460

# Make multiple requests

461

for i in range(5):

462

request = client.get(f'/data/{i}')

463

response = client.send(request)

464

465

# Check metrics

466

print(f"Total requests: {metrics_policy.request_count}")

467

print(f"Average response time: {sum(metrics_policy.response_times) / len(metrics_policy.response_times):.3f}s")

468

```

469

470

### Async Pipeline Usage

471

472

```python

473

import asyncio

474

from msrest.pipeline import AsyncPipeline, AsyncHTTPPolicy

475

476

class AsyncLoggingPolicy(AsyncHTTPPolicy):

477

"""Async logging policy."""

478

479

async def send(self, request, **kwargs):

480

print(f"[ASYNC] Sending request to: {request.http_request.url}")

481

response = await self.next.send(request, **kwargs)

482

print(f"[ASYNC] Received response: {response.http_response.status_code}")

483

return response

484

485

# Create async pipeline

486

async_policies = [AsyncLoggingPolicy()]

487

async_pipeline = AsyncPipeline(async_policies)

488

489

# Use with async client (pseudo-code)

490

async def async_example():

491

async_client = await create_async_client()

492

async_client.config.pipeline = async_pipeline

493

494

request = async_client.get('/async-data')

495

response = await async_client.send(request)

496

497

return response

498

499

# Run async pipeline

500

result = asyncio.run(async_example())

501

```

502

503

### Pipeline Context Usage

504

505

```python

506

from msrest.pipeline import SansIOHTTPPolicy

507

508

class ContextPolicy(SansIOHTTPPolicy):

509

"""Policy demonstrating context usage."""

510

511

def on_request(self, request, **kwargs):

512

"""Add data to request context."""

513

if not request.context:

514

request.context = {}

515

516

# Add request metadata

517

request.context.update({

518

'request_id': str(uuid.uuid4()),

519

'start_time': time.time(),

520

'user_data': kwargs.get('user_data', {})

521

})

522

523

def on_response(self, request, response, **kwargs):

524

"""Process context data in response."""

525

# Calculate request duration

526

if 'start_time' in request.context:

527

duration = time.time() - request.context['start_time']

528

response.context['request_duration'] = duration

529

530

# Copy request ID to response

531

if 'request_id' in request.context:

532

response.context['request_id'] = request.context['request_id']

533

534

# Use context policy

535

context_policy = ContextPolicy()

536

pipeline = Pipeline([context_policy])

537

538

# Send request with context data

539

with ServiceClient(None, config) as client:

540

request = client.get('/data')

541

response = client.send(request, user_data={'session_id': '12345'})

542

543

# Access response context

544

print(f"Request duration: {response.context.get('request_duration', 'unknown')}")

545

print(f"Request ID: {response.context.get('request_id', 'unknown')}")

546

```

547

548

### Error Handling in Policies

549

550

```python

551

from msrest.pipeline import HTTPPolicy

552

from msrest.exceptions import ClientException

553

554

class ErrorHandlingPolicy(HTTPPolicy):

555

"""Policy with comprehensive error handling."""

556

557

def send(self, request, **kwargs):

558

try:

559

response = self.next.send(request, **kwargs)

560

561

# Check for client errors

562

if 400 <= response.http_response.status_code < 500:

563

# Handle client errors

564

self._handle_client_error(request, response)

565

566

# Check for server errors

567

elif response.http_response.status_code >= 500:

568

# Handle server errors

569

self._handle_server_error(request, response)

570

571

return response

572

573

except Exception as e:

574

# Handle network/connection errors

575

self._handle_network_error(request, e)

576

raise

577

578

def _handle_client_error(self, request, response):

579

"""Handle 4xx client errors."""

580

status = response.http_response.status_code

581

if status == 401:

582

print("Authentication required")

583

elif status == 403:

584

print("Access forbidden")

585

elif status == 404:

586

print("Resource not found")

587

elif status == 429:

588

print("Rate limit exceeded")

589

590

def _handle_server_error(self, request, response):

591

"""Handle 5xx server errors."""

592

print(f"Server error: {response.http_response.status_code}")

593

594

def _handle_network_error(self, request, exception):

595

"""Handle network/connection errors."""

596

print(f"Network error: {type(exception).__name__}: {exception}")

597

598

# Use error handling policy

599

error_policy = ErrorHandlingPolicy()

600

pipeline = Pipeline([error_policy])

601

```

602

603

## Types

604

605

```python { .api }

606

class ClientRawResponse:

607

"""

608

Wrapper for response with additional data.

609

610

Attributes:

611

- output: Deserialized response object

612

- response: Raw HTTP response

613

- headers: Dict of deserialized headers

614

"""

615

616

def __init__(self, output, response):

617

"""Initialize raw response wrapper."""

618

619

def add_headers(self, header_dict: dict):

620

"""

621

Deserialize specific headers.

622

623

Parameters:

624

- header_dict: Dict mapping header names to types

625

"""

626

```