or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

core-stream-types.mdcustom-stages.mderror-handling.mdindex.mdintegration.mdmaterialization.mdstream-combining.mdstream-control.mdstream-sinks.mdstream-sources.mdstream-transformations.md

materialization.mddocs/

0

# Materialization and Execution

1

2

System for converting stream blueprints into running streams, managing resources, and controlling materialized values. Materialization is the process that transforms stream descriptions into executing stream processors.

3

4

## Capabilities

5

6

### Materializer

7

8

The core component responsible for turning stream graphs into running stream processors.

9

10

```scala { .api }

11

/**

12

* Component responsible for materializing stream graphs into running streams

13

*/

14

trait Materializer {

15

/**

16

* Materialize a stream graph

17

* @param runnable Complete stream graph ready for execution

18

* @return The materialized value

19

*/

20

def materialize[Mat](runnable: Graph[ClosedShape, Mat]): Mat

21

22

/**

23

* Schedule a task to run once after a delay

24

* @param delay Duration to wait before execution

25

* @param task Task to execute

26

* @return Cancellable to cancel the scheduled task

27

*/

28

def scheduleOnce(delay: FiniteDuration, task: Runnable): Cancellable

29

30

/**

31

* Schedule a task to run repeatedly with fixed delay

32

* @param initialDelay Initial delay before first execution

33

* @param delay Delay between executions

34

* @param task Task to execute

35

* @return Cancellable to cancel the scheduled task

36

*/

37

def scheduleWithFixedDelay(initialDelay: FiniteDuration, delay: FiniteDuration, task: Runnable): Cancellable

38

39

/**

40

* Schedule a task to run repeatedly at fixed rate

41

* @param initialDelay Initial delay before first execution

42

* @param interval Interval between executions

43

* @param task Task to execute

44

* @return Cancellable to cancel the scheduled task

45

*/

46

def scheduleAtFixedRate(initialDelay: FiniteDuration, interval: FiniteDuration, task: Runnable): Cancellable

47

48

/**

49

* Get the execution context used by this materializer

50

* @return ExecutionContext for async operations

51

*/

52

def executionContext: ExecutionContext

53

54

/**

55

* Create a materializer with a name prefix for debugging

56

* @param name Prefix for materializer name

57

* @return New materializer with the specified name prefix

58

*/

59

def withNamePrefix(name: String): Materializer

60

}

61

62

object Materializer {

63

/**

64

* Create a materializer from an ActorSystem

65

* @param system ActorSystem to create materializer from

66

* @return Materializer instance

67

*/

68

def apply(system: ActorSystem): Materializer

69

70

/**

71

* Create a materializer with custom settings

72

* @param system ActorSystem to use

73

* @param settings Custom materializer settings

74

* @return Materializer with custom configuration

75

*/

76

def apply(settings: ActorMaterializerSettings, system: ActorSystem): Materializer

77

}

78

```

79

80

**Usage Examples:**

81

82

```scala

83

import akka.actor.ActorSystem

84

import akka.stream.Materializer

85

import akka.stream.scaladsl.{Source, Sink}

86

87

// Create materializer

88

implicit val system: ActorSystem = ActorSystem("stream-system")

89

implicit val materializer: Materializer = Materializer(system)

90

91

// Materialize and run stream

92

val result = Source(1 to 10)

93

.map(_ * 2)

94

.runWith(Sink.seq)

95

96

// Schedule tasks

97

val cancellable = materializer.scheduleOnce(1.second, new Runnable {

98

def run(): Unit = println("Delayed task executed")

99

})

100

```

101

102

### Stream Execution Methods

103

104

Methods for running streams and controlling materialization.

105

106

```scala { .api }

107

/**

108

* Run a complete stream graph

109

* @param materializer Implicit materializer for execution

110

* @return The materialized value

111

*/

112

def run()(implicit materializer: Materializer): Mat

113

114

/**

115

* Run this source with the given sink

116

* @param sink Sink to connect to this source

117

* @param materializer Implicit materializer for execution

118

* @return The materialized value from the sink

119

*/

120

def runWith[Mat2](sink: Graph[SinkShape[Out], Mat2])(implicit materializer: Materializer): Mat2

121

122

/**

123

* Connect this source to a sink and get a runnable graph

124

* @param sink Sink to connect to

125

* @return RunnableGraph ready for materialization

126

*/

127

def to[Mat2](sink: Graph[SinkShape[Out], Mat2]): RunnableGraph[Mat]

128

129

/**

130

* Connect this source to a sink and combine materialized values

131

* @param sink Sink to connect to

132

* @param combine Function to combine materialized values

133

* @return RunnableGraph with combined materialized value

134

*/

135

def toMat[Mat2, Mat3](sink: Graph[SinkShape[Out], Mat2])(combine: (Mat, Mat2) => Mat3): RunnableGraph[Mat3]

136

```

137

138

**Usage Examples:**

139

140

```scala

141

// Direct execution

142

val future1 = Source(1 to 5).runWith(Sink.seq)

143

144

// Create runnable graph first

145

val graph = Source(1 to 5).toMat(Sink.seq)(Keep.right)

146

val future2 = graph.run()

147

148

// Combine materialized values

149

val (killSwitch, future3) = Source(1 to 5)

150

.viaMat(KillSwitches.single)(Keep.right)

151

.toMat(Sink.seq)(Keep.both)

152

.run()

153

```

154

155

### Materialized Value Control

156

157

Operations for controlling and transforming materialized values.

158

159

```scala { .api }

160

/**

161

* Transform the materialized value of a graph

162

* @param f Function to transform the materialized value

163

* @return Graph with transformed materialized value

164

*/

165

def mapMaterializedValue[Mat2](f: Mat => Mat2): Source[Out, Mat2]

166

167

/**

168

* Pre-materialize a source to get both materialized value and new source

169

* @param materializer Materializer to use for pre-materialization

170

* @return Tuple of materialized value and equivalent source

171

*/

172

def preMaterialize()(implicit materializer: Materializer): (Mat, Source[Out, NotUsed])

173

174

/**

175

* Utility functions for combining materialized values

176

*/

177

object Keep {

178

/**

179

* Keep the left materialized value

180

*/

181

def left[L, R]: (L, R) => L = (l, _) => l

182

183

/**

184

* Keep the right materialized value

185

*/

186

def right[L, R]: (L, R) => R = (_, r) => r

187

188

/**

189

* Keep both materialized values as a tuple

190

*/

191

def both[L, R]: (L, R) => (L, R) = (l, r) => (l, r)

192

193

/**

194

* Discard both materialized values

195

*/

196

def none[L, R]: (L, R) => NotUsed = (_, _) => NotUsed

197

}

198

```

199

200

**Usage Examples:**

201

202

```scala

203

// Transform materialized value

204

val countSource: Source[String, Future[Int]] = Source(List("a", "bb", "ccc"))

205

.toMat(Sink.seq)(Keep.right)

206

.mapMaterializedValue(_.map(_.length))

207

208

// Pre-materialize for reuse

209

val (future: Future[Seq[Int]], reusableSource: Source[Int, NotUsed]) =

210

Source(1 to 10)

211

.toMat(Sink.seq)(Keep.right)

212

.preMaterialize()

213

214

// Use Keep combinators

215

val (actorRef: ActorRef, future: Future[Done]) =

216

Source.actorRef[String](100, OverflowStrategy.fail)

217

.toMat(Sink.foreach(println))(Keep.both)

218

.run()

219

```

220

221

### System Materializer

222

223

Utility for getting a system-wide materializer instance.

224

225

```scala { .api }

226

/**

227

* System-wide materializer that uses the guardian actor's dispatcher

228

*/

229

object SystemMaterializer {

230

/**

231

* Get the system materializer for an ActorSystem

232

* @param system ActorSystem to get materializer for

233

* @return System materializer instance

234

*/

235

def get(system: ActorSystem): Materializer

236

237

/**

238

* Alias for get()

239

*/

240

def apply(system: ActorSystem): Materializer = get(system)

241

}

242

```

243

244

**Usage Examples:**

245

246

```scala

247

// Get system materializer

248

val system = ActorSystem("my-system")

249

val materializer = SystemMaterializer.get(system)

250

251

// Use in stream operations

252

Source(1 to 10).runWith(Sink.seq)(materializer)

253

```

254

255

### Materializer Settings

256

257

Configuration options for customizing materializer behavior.

258

259

```scala { .api }

260

/**

261

* Settings for configuring materializer behavior

262

*/

263

final class ActorMaterializerSettings private (

264

val initialInputBufferSize: Int,

265

val maxInputBufferSize: Int,

266

val dispatcher: String,

267

val supervisionDecider: Supervision.Decider,

268

val subscriptionTimeoutSettings: StreamSubscriptionTimeoutSettings,

269

val debugLogging: Boolean,

270

val outputBurstLimit: Int,

271

val fuzzingMode: Boolean,

272

val autoFusing: Boolean,

273

val maxFixedBufferSize: Int,

274

val syncProcessingLimit: Int,

275

val blockingIoDispatcher: String

276

) {

277

/**

278

* Create new settings with different buffer sizes

279

*/

280

def withInputBuffer(initialSize: Int, maxSize: Int): ActorMaterializerSettings

281

282

/**

283

* Create new settings with different dispatcher

284

*/

285

def withDispatcher(dispatcher: String): ActorMaterializerSettings

286

287

/**

288

* Create new settings with different supervision decider

289

*/

290

def withSupervisionStrategy(decider: Supervision.Decider): ActorMaterializerSettings

291

292

/**

293

* Enable or disable debug logging

294

*/

295

def withDebugLogging(enable: Boolean): ActorMaterializerSettings

296

}

297

298

object ActorMaterializerSettings {

299

/**

300

* Create settings from ActorSystem configuration

301

*/

302

def apply(system: ActorSystem): ActorMaterializerSettings

303

304

/**

305

* Create settings from custom configuration

306

*/

307

def apply(config: Config): ActorMaterializerSettings

308

}

309

```

310

311

**Usage Examples:**

312

313

```scala

314

// Custom materializer settings

315

val settings = ActorMaterializerSettings(system)

316

.withInputBuffer(initialSize = 4, maxSize = 16)

317

.withDispatcher("my-custom-dispatcher")

318

.withDebugLogging(true)

319

320

val customMaterializer = Materializer(settings, system)

321

322

// Use custom materializer

323

Source(1 to 100).runWith(Sink.seq)(customMaterializer)

324

```

325

326

### Resource Management

327

328

Controlling materializer lifecycle and resource cleanup.

329

330

```scala { .api }

331

/**

332

* Shutdown the materializer and cleanup resources

333

* @return Future that completes when shutdown is finished

334

*/

335

def shutdown(): Future[Done]

336

337

/**

338

* Check if the materializer has been shutdown

339

* @return True if shutdown, false otherwise

340

*/

341

def isShutdown: Boolean

342

```

343

344

**Usage Examples:**

345

346

```scala

347

// Proper resource cleanup

348

val materializer = Materializer(system)

349

350

// Use materializer for stream processing

351

Source(1 to 10).runWith(Sink.seq)(materializer)

352

353

// Shutdown when done

354

materializer.shutdown().onComplete { _ =>

355

println("Materializer shut down")

356

system.terminate()

357

}

358

```

359

360

### Attributes

361

362

Configuration metadata that can be attached to stream operators.

363

364

```scala { .api }

365

/**

366

* Metadata container for stream operators

367

*/

368

final class Attributes private (val attributeList: List[Attributes.Attribute]) {

369

/**

370

* Get an attribute of the specified type

371

* @param c Class of the attribute type

372

* @return Optional attribute value

373

*/

374

def get[T <: Attributes.Attribute](c: Class[T]): Option[T]

375

376

/**

377

* Get an attribute with a default value

378

* @param default Default value if attribute not found

379

* @return Attribute value or default

380

*/

381

def getAttribute[T <: Attributes.Attribute](default: T): T

382

383

/**

384

* Combine with other attributes

385

* @param other Other attributes to combine with

386

* @return Combined attributes

387

*/

388

def and(other: Attributes): Attributes

389

}

390

391

object Attributes {

392

/**

393

* Empty attributes

394

*/

395

val none: Attributes

396

397

/**

398

* Create attributes with name

399

*/

400

def name(name: String): Attributes

401

402

/**

403

* Create async boundary attribute

404

*/

405

def asyncBoundary: Attributes

406

407

/**

408

* Create dispatcher attribute

409

*/

410

def dispatcher(dispatcher: String): Attributes

411

412

/**

413

* Create input buffer attribute

414

*/

415

def inputBuffer(initial: Int, max: Int): Attributes

416

}

417

```

418

419

**Usage Examples:**

420

421

```scala

422

// Add attributes to operators

423

Source(1 to 10)

424

.map(_ * 2)

425

.withAttributes(Attributes.name("doubler"))

426

.async // Add async boundary

427

.filter(_ > 10)

428

.withAttributes(Attributes.dispatcher("my-dispatcher"))

429

.runWith(Sink.seq)

430

```

431

432

## Types

433

434

```scala { .api }

435

// Essential types for materialization

436

type NotUsed = akka.NotUsed

437

438

// Completion marker

439

sealed abstract class Done

440

case object Done extends Done

441

442

// Cancellable interface for scheduled tasks

443

trait Cancellable {

444

def cancel(): Boolean

445

def isCancelled: Boolean

446

}

447

448

// Stream subscription timeout settings

449

final class StreamSubscriptionTimeoutSettings(

450

val mode: StreamSubscriptionTimeoutTerminationMode,

451

val timeout: FiniteDuration

452

)

453

454

sealed abstract class StreamSubscriptionTimeoutTerminationMode

455

case object NoopTermination extends StreamSubscriptionTimeoutTerminationMode

456

case object WarnTermination extends StreamSubscriptionTimeoutTerminationMode

457

case object CancelTermination extends StreamSubscriptionTimeoutTerminationMode

458

```