or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

core-operations.mdindex.mdlayers.mdservices.mdtypes.md

layers.mddocs/

0

# Middleware Layers

1

2

OpenDAL's layered architecture allows composing cross-cutting functionality through middleware layers. Layers intercept operations to add features like logging, retry logic, metrics collection, and performance optimization without modifying core business logic.

3

4

## Capabilities

5

6

### Layer Architecture

7

8

All layers implement the `Layer` trait and can be composed using the builder pattern.

9

10

```rust { .api }

11

pub trait Layer<A: Access> {

12

type LayeredAccessor: Access;

13

14

/// Apply this layer to the given accessor

15

fn layer(&self, inner: A) -> Self::LayeredAccessor;

16

}

17

18

impl<A: Access> OperatorBuilder<A> {

19

/// Add a layer to the operator

20

pub fn layer<L: Layer<A>>(self, layer: L) -> OperatorBuilder<L::LayeredAccessor>;

21

22

/// Finalize the operator with all layers applied

23

pub fn finish(self) -> Operator;

24

}

25

```

26

27

**Usage Pattern:**

28

29

```rust

30

use opendal::{Operator, services, layers};

31

32

let op = Operator::new(services::S3::default())?

33

.layer(layers::LoggingLayer::default())

34

.layer(layers::RetryLayer::default())

35

.layer(layers::MetricsLayer::default())

36

.finish();

37

```

38

39

### Core Layers

40

41

Essential layers for production deployments.

42

43

```rust { .api }

44

/// Logging layer for operation tracing

45

pub struct LoggingLayer {

46

level: log::Level,

47

}

48

49

impl LoggingLayer {

50

pub fn new() -> Self;

51

pub fn with_level(level: log::Level) -> Self;

52

}

53

54

impl Default for LoggingLayer {

55

fn default() -> Self;

56

}

57

58

/// Retry layer with exponential backoff

59

pub struct RetryLayer {

60

builder: ExponentialBuilder,

61

}

62

63

impl RetryLayer {

64

pub fn new() -> Self;

65

pub fn with_jitter() -> Self;

66

pub fn with_max_times(max_times: usize) -> Self;

67

pub fn with_factor(factor: f32) -> Self;

68

pub fn with_min_delay(min_delay: Duration) -> Self;

69

pub fn with_max_delay(max_delay: Duration) -> Self;

70

}

71

72

/// Configurable retry interceptor

73

pub struct RetryInterceptor;

74

impl RetryInterceptor {

75

pub fn new() -> Self;

76

}

77

78

/// Timeout layer for operation deadlines

79

pub struct TimeoutLayer {

80

timeout: Duration,

81

}

82

83

impl TimeoutLayer {

84

pub fn new(timeout: Duration) -> Self;

85

}

86

87

/// Concurrency limiting layer

88

pub struct ConcurrentLimitLayer {

89

permits: usize,

90

}

91

92

impl ConcurrentLimitLayer {

93

pub fn new(permits: usize) -> Self;

94

}

95

96

/// Immutable file indexing layer

97

pub struct ImmutableIndexLayer;

98

impl ImmutableIndexLayer {

99

pub fn new() -> Self;

100

}

101

```

102

103

**Usage Examples:**

104

105

```rust

106

use opendal::layers::{LoggingLayer, RetryLayer, TimeoutLayer, ConcurrentLimitLayer};

107

use std::time::Duration;

108

109

// Logging with custom level

110

let logging = LoggingLayer::with_level(log::Level::Debug);

111

112

// Retry with custom configuration

113

let retry = RetryLayer::new()

114

.with_max_times(5)

115

.with_min_delay(Duration::from_millis(100))

116

.with_max_delay(Duration::from_secs(30))

117

.with_jitter();

118

119

// Timeout for long operations

120

let timeout = TimeoutLayer::new(Duration::from_secs(300));

121

122

// Limit concurrent operations

123

let concurrent_limit = ConcurrentLimitLayer::new(10);

124

125

let op = Operator::new(services::S3::default())?

126

.layer(logging)

127

.layer(retry)

128

.layer(timeout)

129

.layer(concurrent_limit)

130

.finish();

131

```

132

133

### Observability Layers

134

135

Layers for monitoring, tracing, and metrics collection.

136

137

```rust { .api }

138

/// Distributed tracing integration

139

#[cfg(feature = "layers-tracing")]

140

pub struct TracingLayer;

141

impl TracingLayer {

142

pub fn new() -> Self;

143

}

144

145

/// Metrics collection layer

146

#[cfg(feature = "layers-metrics")]

147

pub struct MetricsLayer;

148

impl MetricsLayer {

149

pub fn new() -> Self;

150

}

151

152

/// Prometheus metrics integration (tikv/prometheus-rs)

153

#[cfg(feature = "layers-prometheus")]

154

pub struct PrometheusLayer {

155

registry: prometheus::Registry,

156

}

157

impl PrometheusLayer {

158

pub fn new(registry: prometheus::Registry) -> Self;

159

}

160

161

/// Prometheus metrics integration (prometheus/client_rust)

162

#[cfg(feature = "layers-prometheus-client")]

163

pub struct PrometheusClientLayer;

164

impl PrometheusClientLayer {

165

pub fn new() -> Self;

166

}

167

168

/// MiniTrace integration for lightweight tracing

169

#[cfg(feature = "layers-minitrace")]

170

pub struct MinitraceLayer;

171

impl MinitraceLayer {

172

pub fn new() -> Self;

173

}

174

175

/// OpenTelemetry tracing integration

176

#[cfg(feature = "layers-otel-trace")]

177

pub struct OtelTraceLayer;

178

impl OtelTraceLayer {

179

pub fn new() -> Self;

180

}

181

182

/// Async task tree visualization

183

#[cfg(feature = "layers-await-tree")]

184

pub struct AwaitTreeLayer;

185

impl AwaitTreeLayer {

186

pub fn new() -> Self;

187

}

188

189

/// Async backtrace support for debugging

190

#[cfg(feature = "layers-async-backtrace")]

191

pub struct AsyncBacktraceLayer;

192

impl AsyncBacktraceLayer {

193

pub fn new() -> Self;

194

}

195

```

196

197

**Usage Examples:**

198

199

```rust

200

use opendal::layers::{TracingLayer, MetricsLayer, PrometheusLayer};

201

202

// Distributed tracing

203

let tracing = TracingLayer::new();

204

205

// General metrics collection

206

let metrics = MetricsLayer::new();

207

208

// Custom Prometheus registry

209

let registry = prometheus::Registry::new();

210

let prometheus = PrometheusLayer::new(registry);

211

212

let op = Operator::new(services::Fs::default())?

213

.layer(tracing)

214

.layer(metrics)

215

.layer(prometheus)

216

.finish();

217

```

218

219

### Performance Layers

220

221

Layers for optimization and rate limiting.

222

223

```rust { .api }

224

/// Rate limiting layer with token bucket algorithm

225

#[cfg(feature = "layers-throttle")]

226

pub struct ThrottleLayer {

227

quota: Quota,

228

}

229

230

impl ThrottleLayer {

231

pub fn new(quota: Quota) -> Self;

232

}

233

234

/// Quota configuration for throttling

235

pub struct Quota;

236

impl Quota {

237

pub fn per_second(rate: std::num::NonZeroU32) -> Self;

238

pub fn per_minute(rate: std::num::NonZeroU32) -> Self;

239

pub fn per_hour(rate: std::num::NonZeroU32) -> Self;

240

}

241

```

242

243

**Usage Examples:**

244

245

```rust

246

use opendal::layers::ThrottleLayer;

247

use std::num::NonZeroU32;

248

249

// Limit to 100 operations per second

250

let quota = Quota::per_second(NonZeroU32::new(100).unwrap());

251

let throttle = ThrottleLayer::new(quota);

252

253

let op = Operator::new(services::S3::default())?

254

.layer(throttle)

255

.finish();

256

```

257

258

### Development and Testing Layers

259

260

Layers for development, testing, and debugging.

261

262

```rust { .api }

263

/// Chaos engineering layer for testing resilience

264

#[cfg(feature = "layers-chaos")]

265

pub struct ChaosLayer {

266

error_ratio: f32,

267

latency_range: (Duration, Duration),

268

}

269

270

impl ChaosLayer {

271

pub fn new() -> Self;

272

pub fn with_error_ratio(ratio: f32) -> Self;

273

pub fn with_latency_range(min: Duration, max: Duration) -> Self;

274

}

275

276

/// Madsim simulation layer for testing

277

#[cfg(feature = "layers-madsim")]

278

pub struct MadsimLayer;

279

impl MadsimLayer {

280

pub fn new() -> Self;

281

}

282

283

/// Madsim server for distributed simulation

284

#[cfg(feature = "layers-madsim")]

285

pub struct MadsimServer;

286

impl MadsimServer {

287

pub fn new() -> Self;

288

}

289

290

/// DTrace probes for system-level tracing (Linux only)

291

#[cfg(all(target_os = "linux", feature = "layers-dtrace"))]

292

pub struct DtraceLayer;

293

impl DtraceLayer {

294

pub fn new() -> Self;

295

}

296

```

297

298

**Usage Examples:**

299

300

```rust

301

use opendal::layers::ChaosLayer;

302

use std::time::Duration;

303

304

// Chaos testing with 10% error rate and variable latency

305

let chaos = ChaosLayer::new()

306

.with_error_ratio(0.1)

307

.with_latency_range(

308

Duration::from_millis(10),

309

Duration::from_millis(100)

310

);

311

312

let op = Operator::new(services::Memory::default())?

313

.layer(chaos)

314

.finish();

315

```

316

317

### Blocking Layer

318

319

Special layer for async-to-sync conversion.

320

321

```rust { .api }

322

/// Convert async operator to blocking

323

#[cfg(feature = "layers-blocking")]

324

pub struct BlockingLayer;

325

impl BlockingLayer {

326

pub fn new() -> Self;

327

}

328

329

impl Operator {

330

/// Create blocking operator from async operator

331

pub fn blocking(&self) -> BlockingOperator;

332

}

333

```

334

335

**Usage Examples:**

336

337

```rust

338

use opendal::layers::BlockingLayer;

339

340

// Method 1: Use blocking() method (recommended)

341

let async_op = Operator::new(services::Fs::default())?.finish();

342

let blocking_op = async_op.blocking();

343

344

// Method 2: Use BlockingLayer explicitly

345

let blocking_op = Operator::new(services::Fs::default())?

346

.layer(BlockingLayer::new())

347

.finish()

348

.blocking();

349

350

// Use synchronous operations

351

let content = blocking_op.read("file.txt")?;

352

blocking_op.write("output.txt", "data")?;

353

```

354

355

### Layer Composition Examples

356

357

Complex layer combinations for production scenarios.

358

359

**Production Stack:**

360

361

```rust

362

use opendal::{Operator, services, layers};

363

use std::time::Duration;

364

365

let production_op = Operator::new(services::S3::default())?

366

.layer(layers::LoggingLayer::with_level(log::Level::Info))

367

.layer(layers::RetryLayer::new().with_max_times(3))

368

.layer(layers::TimeoutLayer::new(Duration::from_secs(60)))

369

.layer(layers::ConcurrentLimitLayer::new(20))

370

.layer(layers::MetricsLayer::new())

371

.finish();

372

```

373

374

**Development Stack:**

375

376

```rust

377

let development_op = Operator::new(services::Fs::default())?

378

.layer(layers::LoggingLayer::with_level(log::Level::Debug))

379

.layer(layers::TracingLayer::new())

380

.layer(layers::ChaosLayer::new().with_error_ratio(0.05))

381

.finish();

382

```

383

384

**High-Performance Stack:**

385

386

```rust

387

let high_perf_op = Operator::new(services::Memory::default())?

388

.layer(layers::ConcurrentLimitLayer::new(100))

389

.layer(layers::ImmutableIndexLayer::new())

390

.layer(layers::MetricsLayer::new())

391

.finish();

392

```

393

394

### Custom Layer Implementation

395

396

Framework for implementing custom layers.

397

398

```rust { .api }

399

use opendal::raw::*;

400

401

/// Example custom layer structure

402

pub struct CustomLayer {

403

config: CustomConfig,

404

}

405

406

impl CustomLayer {

407

pub fn new(config: CustomConfig) -> Self {

408

Self { config }

409

}

410

}

411

412

impl<A: Access> Layer<A> for CustomLayer {

413

type LayeredAccessor = CustomAccessor<A>;

414

415

fn layer(&self, inner: A) -> Self::LayeredAccessor {

416

CustomAccessor {

417

inner,

418

config: self.config.clone(),

419

}

420

}

421

}

422

423

/// Custom accessor implementing the Access trait

424

pub struct CustomAccessor<A> {

425

inner: A,

426

config: CustomConfig,

427

}

428

429

#[async_trait::async_trait]

430

impl<A: Access> Access for CustomAccessor<A> {

431

type Reader = A::Reader;

432

type Writer = A::Writer;

433

type Lister = A::Lister;

434

type BlockingReader = A::BlockingReader;

435

type BlockingWriter = A::BlockingWriter;

436

type BlockingLister = A::BlockingLister;

437

438

fn info(&self) -> AccessorInfo {

439

self.inner.info()

440

}

441

442

async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {

443

// Custom logic before/after inner operation

444

self.inner.read(path, args).await

445

}

446

447

// Implement other Access methods...

448

}

449

```

450

451

**Usage Example:**

452

453

```rust

454

let custom_layer = CustomLayer::new(CustomConfig::default());

455

456

let op = Operator::new(services::S3::default())?

457

.layer(custom_layer)

458

.layer(layers::LoggingLayer::default())

459

.finish();

460

```