0
# Middleware Layers
1
2
OpenDAL's layered architecture allows composing cross-cutting functionality through middleware layers. Layers intercept operations to add features like logging, retry logic, metrics collection, and performance optimization without modifying core business logic.
3
4
## Capabilities
5
6
### Layer Architecture
7
8
All layers implement the `Layer` trait and can be composed using the builder pattern.
9
10
```rust { .api }
11
pub trait Layer<A: Access> {
12
type LayeredAccessor: Access;
13
14
/// Apply this layer to the given accessor
15
fn layer(&self, inner: A) -> Self::LayeredAccessor;
16
}
17
18
impl<A: Access> OperatorBuilder<A> {
19
/// Add a layer to the operator
20
pub fn layer<L: Layer<A>>(self, layer: L) -> OperatorBuilder<L::LayeredAccessor>;
21
22
/// Finalize the operator with all layers applied
23
pub fn finish(self) -> Operator;
24
}
25
```
26
27
**Usage Pattern:**
28
29
```rust
30
use opendal::{Operator, services, layers};
31
32
let op = Operator::new(services::S3::default())?
33
.layer(layers::LoggingLayer::default())
34
.layer(layers::RetryLayer::default())
35
.layer(layers::MetricsLayer::default())
36
.finish();
37
```
38
39
### Core Layers
40
41
Essential layers for production deployments.
42
43
```rust { .api }
44
/// Logging layer for operation tracing
45
pub struct LoggingLayer {
46
level: log::Level,
47
}
48
49
impl LoggingLayer {
50
pub fn new() -> Self;
51
pub fn with_level(level: log::Level) -> Self;
52
}
53
54
impl Default for LoggingLayer {
55
fn default() -> Self;
56
}
57
58
/// Retry layer with exponential backoff
59
pub struct RetryLayer {
60
builder: ExponentialBuilder,
61
}
62
63
impl RetryLayer {
64
pub fn new() -> Self;
65
pub fn with_jitter() -> Self;
66
pub fn with_max_times(max_times: usize) -> Self;
67
pub fn with_factor(factor: f32) -> Self;
68
pub fn with_min_delay(min_delay: Duration) -> Self;
69
pub fn with_max_delay(max_delay: Duration) -> Self;
70
}
71
72
/// Configurable retry interceptor
73
pub struct RetryInterceptor;
74
impl RetryInterceptor {
75
pub fn new() -> Self;
76
}
77
78
/// Timeout layer for operation deadlines
79
pub struct TimeoutLayer {
80
timeout: Duration,
81
}
82
83
impl TimeoutLayer {
84
pub fn new(timeout: Duration) -> Self;
85
}
86
87
/// Concurrency limiting layer
88
pub struct ConcurrentLimitLayer {
89
permits: usize,
90
}
91
92
impl ConcurrentLimitLayer {
93
pub fn new(permits: usize) -> Self;
94
}
95
96
/// Immutable file indexing layer
97
pub struct ImmutableIndexLayer;
98
impl ImmutableIndexLayer {
99
pub fn new() -> Self;
100
}
101
```
102
103
**Usage Examples:**
104
105
```rust
106
use opendal::layers::{LoggingLayer, RetryLayer, TimeoutLayer, ConcurrentLimitLayer};
107
use std::time::Duration;
108
109
// Logging with custom level
110
let logging = LoggingLayer::with_level(log::Level::Debug);
111
112
// Retry with custom configuration
113
let retry = RetryLayer::new()
114
.with_max_times(5)
115
.with_min_delay(Duration::from_millis(100))
116
.with_max_delay(Duration::from_secs(30))
117
.with_jitter();
118
119
// Timeout for long operations
120
let timeout = TimeoutLayer::new(Duration::from_secs(300));
121
122
// Limit concurrent operations
123
let concurrent_limit = ConcurrentLimitLayer::new(10);
124
125
let op = Operator::new(services::S3::default())?
126
.layer(logging)
127
.layer(retry)
128
.layer(timeout)
129
.layer(concurrent_limit)
130
.finish();
131
```
132
133
### Observability Layers
134
135
Layers for monitoring, tracing, and metrics collection.
136
137
```rust { .api }
138
/// Distributed tracing integration
139
#[cfg(feature = "layers-tracing")]
140
pub struct TracingLayer;
141
impl TracingLayer {
142
pub fn new() -> Self;
143
}
144
145
/// Metrics collection layer
146
#[cfg(feature = "layers-metrics")]
147
pub struct MetricsLayer;
148
impl MetricsLayer {
149
pub fn new() -> Self;
150
}
151
152
/// Prometheus metrics integration (tikv/prometheus-rs)
153
#[cfg(feature = "layers-prometheus")]
154
pub struct PrometheusLayer {
155
registry: prometheus::Registry,
156
}
157
impl PrometheusLayer {
158
pub fn new(registry: prometheus::Registry) -> Self;
159
}
160
161
/// Prometheus metrics integration (prometheus/client_rust)
162
#[cfg(feature = "layers-prometheus-client")]
163
pub struct PrometheusClientLayer;
164
impl PrometheusClientLayer {
165
pub fn new() -> Self;
166
}
167
168
/// MiniTrace integration for lightweight tracing
169
#[cfg(feature = "layers-minitrace")]
170
pub struct MinitraceLayer;
171
impl MinitraceLayer {
172
pub fn new() -> Self;
173
}
174
175
/// OpenTelemetry tracing integration
176
#[cfg(feature = "layers-otel-trace")]
177
pub struct OtelTraceLayer;
178
impl OtelTraceLayer {
179
pub fn new() -> Self;
180
}
181
182
/// Async task tree visualization
183
#[cfg(feature = "layers-await-tree")]
184
pub struct AwaitTreeLayer;
185
impl AwaitTreeLayer {
186
pub fn new() -> Self;
187
}
188
189
/// Async backtrace support for debugging
190
#[cfg(feature = "layers-async-backtrace")]
191
pub struct AsyncBacktraceLayer;
192
impl AsyncBacktraceLayer {
193
pub fn new() -> Self;
194
}
195
```
196
197
**Usage Examples:**
198
199
```rust
200
use opendal::layers::{TracingLayer, MetricsLayer, PrometheusLayer};
201
202
// Distributed tracing
203
let tracing = TracingLayer::new();
204
205
// General metrics collection
206
let metrics = MetricsLayer::new();
207
208
// Custom Prometheus registry
209
let registry = prometheus::Registry::new();
210
let prometheus = PrometheusLayer::new(registry);
211
212
let op = Operator::new(services::Fs::default())?
213
.layer(tracing)
214
.layer(metrics)
215
.layer(prometheus)
216
.finish();
217
```
218
219
### Performance Layers
220
221
Layers for optimization and rate limiting.
222
223
```rust { .api }
224
/// Rate limiting layer with token bucket algorithm
225
#[cfg(feature = "layers-throttle")]
226
pub struct ThrottleLayer {
227
quota: Quota,
228
}
229
230
impl ThrottleLayer {
231
pub fn new(quota: Quota) -> Self;
232
}
233
234
/// Quota configuration for throttling
235
pub struct Quota;
236
impl Quota {
237
pub fn per_second(rate: std::num::NonZeroU32) -> Self;
238
pub fn per_minute(rate: std::num::NonZeroU32) -> Self;
239
pub fn per_hour(rate: std::num::NonZeroU32) -> Self;
240
}
241
```
242
243
**Usage Examples:**
244
245
```rust
246
use opendal::layers::ThrottleLayer;
247
use std::num::NonZeroU32;
248
249
// Limit to 100 operations per second
250
let quota = Quota::per_second(NonZeroU32::new(100).unwrap());
251
let throttle = ThrottleLayer::new(quota);
252
253
let op = Operator::new(services::S3::default())?
254
.layer(throttle)
255
.finish();
256
```
257
258
### Development and Testing Layers
259
260
Layers for development, testing, and debugging.
261
262
```rust { .api }
263
/// Chaos engineering layer for testing resilience
264
#[cfg(feature = "layers-chaos")]
265
pub struct ChaosLayer {
266
error_ratio: f32,
267
latency_range: (Duration, Duration),
268
}
269
270
impl ChaosLayer {
271
pub fn new() -> Self;
272
pub fn with_error_ratio(ratio: f32) -> Self;
273
pub fn with_latency_range(min: Duration, max: Duration) -> Self;
274
}
275
276
/// Madsim simulation layer for testing
277
#[cfg(feature = "layers-madsim")]
278
pub struct MadsimLayer;
279
impl MadsimLayer {
280
pub fn new() -> Self;
281
}
282
283
/// Madsim server for distributed simulation
284
#[cfg(feature = "layers-madsim")]
285
pub struct MadsimServer;
286
impl MadsimServer {
287
pub fn new() -> Self;
288
}
289
290
/// DTrace probes for system-level tracing (Linux only)
291
#[cfg(all(target_os = "linux", feature = "layers-dtrace"))]
292
pub struct DtraceLayer;
293
impl DtraceLayer {
294
pub fn new() -> Self;
295
}
296
```
297
298
**Usage Examples:**
299
300
```rust
301
use opendal::layers::ChaosLayer;
302
use std::time::Duration;
303
304
// Chaos testing with 10% error rate and variable latency
305
let chaos = ChaosLayer::new()
306
.with_error_ratio(0.1)
307
.with_latency_range(
308
Duration::from_millis(10),
309
Duration::from_millis(100)
310
);
311
312
let op = Operator::new(services::Memory::default())?
313
.layer(chaos)
314
.finish();
315
```
316
317
### Blocking Layer
318
319
Special layer for async-to-sync conversion.
320
321
```rust { .api }
322
/// Convert async operator to blocking
323
#[cfg(feature = "layers-blocking")]
324
pub struct BlockingLayer;
325
impl BlockingLayer {
326
pub fn new() -> Self;
327
}
328
329
impl Operator {
330
/// Create blocking operator from async operator
331
pub fn blocking(&self) -> BlockingOperator;
332
}
333
```
334
335
**Usage Examples:**
336
337
```rust
338
use opendal::layers::BlockingLayer;
339
340
// Method 1: Use blocking() method (recommended)
341
let async_op = Operator::new(services::Fs::default())?.finish();
342
let blocking_op = async_op.blocking();
343
344
// Method 2: Use BlockingLayer explicitly
345
let blocking_op = Operator::new(services::Fs::default())?
346
.layer(BlockingLayer::new())
347
.finish()
348
.blocking();
349
350
// Use synchronous operations
351
let content = blocking_op.read("file.txt")?;
352
blocking_op.write("output.txt", "data")?;
353
```
354
355
### Layer Composition Examples
356
357
Complex layer combinations for production scenarios.
358
359
**Production Stack:**
360
361
```rust
362
use opendal::{Operator, services, layers};
363
use std::time::Duration;
364
365
let production_op = Operator::new(services::S3::default())?
366
.layer(layers::LoggingLayer::with_level(log::Level::Info))
367
.layer(layers::RetryLayer::new().with_max_times(3))
368
.layer(layers::TimeoutLayer::new(Duration::from_secs(60)))
369
.layer(layers::ConcurrentLimitLayer::new(20))
370
.layer(layers::MetricsLayer::new())
371
.finish();
372
```
373
374
**Development Stack:**
375
376
```rust
377
let development_op = Operator::new(services::Fs::default())?
378
.layer(layers::LoggingLayer::with_level(log::Level::Debug))
379
.layer(layers::TracingLayer::new())
380
.layer(layers::ChaosLayer::new().with_error_ratio(0.05))
381
.finish();
382
```
383
384
**High-Performance Stack:**
385
386
```rust
387
let high_perf_op = Operator::new(services::Memory::default())?
388
.layer(layers::ConcurrentLimitLayer::new(100))
389
.layer(layers::ImmutableIndexLayer::new())
390
.layer(layers::MetricsLayer::new())
391
.finish();
392
```
393
394
### Custom Layer Implementation
395
396
Framework for implementing custom layers.
397
398
```rust { .api }
399
use opendal::raw::*;
400
401
/// Example custom layer structure
402
pub struct CustomLayer {
403
config: CustomConfig,
404
}
405
406
impl CustomLayer {
407
pub fn new(config: CustomConfig) -> Self {
408
Self { config }
409
}
410
}
411
412
impl<A: Access> Layer<A> for CustomLayer {
413
type LayeredAccessor = CustomAccessor<A>;
414
415
fn layer(&self, inner: A) -> Self::LayeredAccessor {
416
CustomAccessor {
417
inner,
418
config: self.config.clone(),
419
}
420
}
421
}
422
423
/// Custom accessor implementing the Access trait
424
pub struct CustomAccessor<A> {
425
inner: A,
426
config: CustomConfig,
427
}
428
429
#[async_trait::async_trait]
430
impl<A: Access> Access for CustomAccessor<A> {
431
type Reader = A::Reader;
432
type Writer = A::Writer;
433
type Lister = A::Lister;
434
type BlockingReader = A::BlockingReader;
435
type BlockingWriter = A::BlockingWriter;
436
type BlockingLister = A::BlockingLister;
437
438
fn info(&self) -> AccessorInfo {
439
self.inner.info()
440
}
441
442
async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
443
// Custom logic before/after inner operation
444
self.inner.read(path, args).await
445
}
446
447
// Implement other Access methods...
448
}
449
```
450
451
**Usage Example:**
452
453
```rust
454
let custom_layer = CustomLayer::new(CustomConfig::default());
455
456
let op = Operator::new(services::S3::default())?
457
.layer(custom_layer)
458
.layer(layers::LoggingLayer::default())
459
.finish();
460
```