or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

core-stream-types.mdcustom-stages.mderror-handling.mdindex.mdintegration.mdmaterialization.mdstream-combining.mdstream-control.mdstream-sinks.mdstream-sources.mdstream-transformations.md

integration.mddocs/

0

# Integration

1

2

Integration with file systems, TCP/TLS networking, actors, and external reactive streams publishers/subscribers. This provides connectivity between Akka Stream and external systems.

3

4

## Capabilities

5

6

### File I/O Integration

7

8

Stream-based file reading and writing operations.

9

10

```scala { .api }

11

/**

12

* File I/O utilities for streaming file operations

13

*/

14

object FileIO {

15

/**

16

* Create a source that reads from a file

17

* @param f Path to the file to read

18

* @param chunkSize Size of chunks to read at a time

19

* @return Source of ByteString chunks with IOResult materialized value

20

*/

21

def fromPath(f: Path, chunkSize: Int = 8192): Source[ByteString, Future[IOResult]]

22

23

/**

24

* Create a sink that writes to a file

25

* @param f Path to the file to write

26

* @param options File open options (default: write, truncate, create)

27

* @return Sink that materializes to Future[IOResult]

28

*/

29

def toPath(f: Path, options: Set[OpenOption] = Set(WRITE, TRUNCATE_EXISTING, CREATE)): Sink[ByteString, Future[IOResult]]

30

}

31

32

/**

33

* Result of file I/O operations

34

* @param count Number of bytes processed

35

* @param status Success or failure status

36

*/

37

final case class IOResult(count: Long, status: Try[Done]) {

38

def wasSuccessful: Boolean = status.isSuccess

39

}

40

```

41

42

**Usage Examples:**

43

44

```scala

45

import akka.stream.scaladsl.FileIO

46

import akka.util.ByteString

47

import java.nio.file.Paths

48

49

// Read file as stream

50

val filePath = Paths.get("input.txt")

51

val fileSource: Source[ByteString, Future[IOResult]] = FileIO.fromPath(filePath)

52

53

fileSource

54

.map(_.utf8String)

55

.runWith(Sink.foreach(println))

56

57

// Write stream to file

58

val outputPath = Paths.get("output.txt")

59

Source(List("Hello", "World", "!"))

60

.map(s => ByteString(s + "\n"))

61

.runWith(FileIO.toPath(outputPath))

62

.map { result =>

63

println(s"Wrote ${result.count} bytes")

64

}

65

66

// Copy file with transformation

67

FileIO.fromPath(Paths.get("input.txt"))

68

.map(_.utf8String.toUpperCase)

69

.map(ByteString(_))

70

.runWith(FileIO.toPath(Paths.get("output.txt")))

71

```

72

73

### TCP Networking

74

75

TCP client and server streaming capabilities.

76

77

```scala { .api }

78

/**

79

* TCP streaming utilities

80

*/

81

object Tcp {

82

/**

83

* Create an outgoing TCP connection

84

* @param remoteAddress Address to connect to

85

* @param localAddress Optional local address to bind to

86

* @param options TCP socket options

87

* @param halfClose Enable half-close for the connection

88

* @param connectTimeout Connection timeout duration

89

* @param idleTimeout Idle timeout for the connection

90

* @return Flow representing the TCP connection

91

*/

92

def outgoingConnection(

93

remoteAddress: InetSocketAddress,

94

localAddress: Option[InetSocketAddress] = None,

95

options: immutable.Traversable[SocketOption] = Nil,

96

halfClose: Boolean = true,

97

connectTimeout: Duration = Duration.Inf,

98

idleTimeout: Duration = Duration.Inf

99

): Flow[ByteString, ByteString, Future[OutgoingConnection]]

100

101

/**

102

* Bind to a TCP port to accept incoming connections

103

* @param interface Interface to bind to

104

* @param port Port to bind to

105

* @param backlog TCP backlog size

106

* @param options TCP socket options

107

* @param halfClose Enable half-close for connections

108

* @param idleTimeout Idle timeout for connections

109

* @return Source of incoming connections

110

*/

111

def bind(

112

interface: String,

113

port: Int,

114

backlog: Int = 100,

115

options: immutable.Traversable[SocketOption] = Nil,

116

halfClose: Boolean = true,

117

idleTimeout: Duration = Duration.Inf

118

): Source[IncomingConnection, Future[ServerBinding]]

119

}

120

121

/**

122

* Represents an outgoing TCP connection

123

*/

124

final case class OutgoingConnection(remoteAddress: InetSocketAddress, localAddress: InetSocketAddress)

125

126

/**

127

* Represents an incoming TCP connection

128

*/

129

final case class IncomingConnection(

130

remoteAddress: InetSocketAddress,

131

localAddress: InetSocketAddress,

132

flow: Flow[ByteString, ByteString, NotUsed]

133

) {

134

/**

135

* Handle this connection with the given flow

136

*/

137

def handleWith[Mat](handler: Flow[ByteString, ByteString, Mat]): Mat

138

}

139

140

/**

141

* Represents a bound TCP server

142

*/

143

trait ServerBinding {

144

def localAddress: InetSocketAddress

145

def unbind(): Future[Done]

146

}

147

```

148

149

**Usage Examples:**

150

151

```scala

152

import akka.stream.scaladsl.{Tcp, Flow}

153

import akka.util.ByteString

154

import java.net.InetSocketAddress

155

156

// TCP client

157

val connection = Tcp.outgoingConnection(new InetSocketAddress("example.com", 80))

158

159

Source.single(ByteString("GET / HTTP/1.1\r\nHost: example.com\r\n\r\n"))

160

.via(connection)

161

.runWith(Sink.foreach(response => println(response.utf8String)))

162

163

// TCP server

164

val serverBinding = Tcp.bind("localhost", 8080)

165

.runForeach { connection =>

166

println(s"New connection from: ${connection.remoteAddress}")

167

168

connection.handleWith(Flow[ByteString]

169

.map(_.utf8String)

170

.map(_.toUpperCase)

171

.map(ByteString(_))

172

)

173

}

174

175

// Shutdown server

176

serverBinding.flatMap(_.unbind())

177

```

178

179

### TLS/SSL Support

180

181

TLS encryption and decryption for secure communications.

182

183

```scala { .api }

184

/**

185

* TLS/SSL utilities for secure communications

186

*/

187

object TLS {

188

/**

189

* Create a TLS flow for client-side TLS

190

* @param sslContext SSL context for TLS

191

* @param firstSession Optional function to configure first session

192

* @param role TLS role (client or server)

193

* @param closing Closing behavior

194

* @return BidiFlow for TLS encryption/decryption

195

*/

196

def create(

197

sslContext: SSLContext,

198

firstSession: Option[NegotiateNewSession] = None,

199

role: TLSRole = Client,

200

closing: TLSClosing = IgnoreComplete

201

): BidiFlow[SslTlsOutbound, ByteString, ByteString, SslTlsInbound, NotUsed]

202

}

203

204

/**

205

* TLS protocol messages

206

*/

207

sealed trait SslTlsInbound

208

sealed trait SslTlsOutbound

209

210

final case class SessionBytes(session: SSLSession, bytes: ByteString) extends SslTlsInbound

211

final case class SendBytes(bytes: ByteString) extends SslTlsOutbound

212

213

/**

214

* TLS roles

215

*/

216

sealed trait TLSRole

217

case object Client extends TLSRole

218

case object Server extends TLSRole

219

220

/**

221

* TLS closing behaviors

222

*/

223

sealed trait TLSClosing

224

case object EagerClose extends TLSClosing

225

case object IgnoreCancel extends TLSClosing

226

case object IgnoreComplete extends TLSClosing

227

```

228

229

**Usage Examples:**

230

231

```scala

232

import akka.stream.scaladsl.TLS

233

import javax.net.ssl.SSLContext

234

235

// TLS client

236

val sslContext = SSLContext.getInstance("TLS")

237

sslContext.init(null, null, null)

238

239

val tlsFlow = TLS.create(sslContext, role = TLSRole.Client)

240

241

// Secure TCP connection

242

val secureConnection = Tcp.outgoingConnection(new InetSocketAddress("secure.example.com", 443))

243

.join(tlsFlow)

244

245

Source.single(SendBytes(ByteString("GET / HTTP/1.1\r\nHost: secure.example.com\r\n\r\n")))

246

.via(secureConnection)

247

.runWith(Sink.foreach {

248

case SessionBytes(session, bytes) =>

249

println(s"Received: ${bytes.utf8String}")

250

})

251

```

252

253

### Actor Integration

254

255

Integration with Akka actors for sending and receiving messages.

256

257

```scala { .api }

258

/**

259

* Create a source that receives messages from an actor

260

* @param bufferSize Size of the buffer for incoming messages

261

* @param overflowStrategy Strategy when buffer overflows

262

* @return Source materialized as ActorRef for sending messages

263

*/

264

def actorRef[T](

265

bufferSize: Int,

266

overflowStrategy: OverflowStrategy

267

): Source[T, ActorRef]

268

269

/**

270

* Create a source with backpressure-aware actor integration

271

* @param ackMessage Message sent to acknowledge element processing

272

* @param completionMatcher Partial function to detect completion messages

273

* @param failureMatcher Partial function to detect failure messages

274

* @return Source materialized as ActorRef with backpressure support

275

*/

276

def actorRefWithBackpressure[T](

277

ackMessage: Any,

278

completionMatcher: PartialFunction[Any, CompletionStrategy] = PartialFunction.empty,

279

failureMatcher: PartialFunction[Any, Throwable] = PartialFunction.empty

280

): Source[T, ActorRef]

281

282

/**

283

* Create a sink that sends messages to an actor

284

* @param ref Target actor reference

285

* @param onCompleteMessage Message sent when stream completes

286

* @return Sink that sends elements as messages

287

*/

288

def actorRef[T](ref: ActorRef, onCompleteMessage: Any): Sink[T, NotUsed]

289

290

/**

291

* Create a sink with backpressure support for actors

292

* @param ref Target actor reference

293

* @param messageAdapter Function to wrap elements in messages

294

* @param initMessage Optional initialization message

295

* @param ackMessage Message that actor sends to acknowledge receipt

296

* @param onCompleteMessage Message sent when stream completes

297

* @param onFailureMessage Function to create failure message

298

* @return Sink with backpressure control

299

*/

300

def actorRefWithBackpressure[T](

301

ref: ActorRef,

302

messageAdapter: T => Any,

303

initMessage: Option[Any] = None,

304

ackMessage: Any,

305

onCompleteMessage: Any,

306

onFailureMessage: Throwable => Any = Status.Failure(_)

307

): Sink[T, NotUsed]

308

```

309

310

**Usage Examples:**

311

312

```scala

313

import akka.actor.{Actor, ActorRef, Props}

314

315

// Actor that processes stream elements

316

class ProcessingActor extends Actor {

317

def receive = {

318

case element: String =>

319

println(s"Processing: $element")

320

sender() ! "ack" // Acknowledge processing

321

case "complete" =>

322

println("Stream completed")

323

context.stop(self)

324

}

325

}

326

327

// Actor source

328

val (actorRef, source) = Source.actorRefWithBackpressure[String](

329

ackMessage = "ack",

330

completionMatcher = {

331

case "complete" => CompletionStrategy.immediately

332

}

333

).preMaterialize()

334

335

// Send messages to the source

336

actorRef ! "Hello"

337

actorRef ! "World"

338

actorRef ! "complete"

339

340

// Actor sink

341

val processingActor = system.actorOf(Props[ProcessingActor])

342

Source(List("msg1", "msg2", "msg3"))

343

.runWith(Sink.actorRefWithBackpressure(

344

ref = processingActor,

345

messageAdapter = identity,

346

ackMessage = "ack",

347

onCompleteMessage = "complete"

348

))

349

```

350

351

### Reactive Streams Integration

352

353

Integration with standard Reactive Streams publishers and subscribers.

354

355

```scala { .api }

356

/**

357

* Create a source from a Reactive Streams Publisher

358

* @param publisher Publisher to wrap as a source

359

* @return Source that subscribes to the publisher

360

*/

361

def fromPublisher[T](publisher: Publisher[T]): Source[T, NotUsed]

362

363

/**

364

* Create a sink from a Reactive Streams Subscriber

365

* @param subscriber Subscriber to wrap as a sink

366

* @return Sink that publishes to the subscriber

367

*/

368

def fromSubscriber[T](subscriber: Subscriber[T]): Sink[T, NotUsed]

369

370

/**

371

* Convert this source to a Reactive Streams Publisher

372

* @param fanout Whether to support multiple subscribers

373

* @return Source materialized as Publisher

374

*/

375

def toPublisher(fanout: Boolean): Source[T, Publisher[T]]

376

377

/**

378

* Convert this sink to a Reactive Streams Subscriber

379

* @return Sink materialized as Subscriber

380

*/

381

def toSubscriber[T]: Sink[T, Subscriber[T]]

382

```

383

384

**Usage Examples:**

385

386

```scala

387

import org.reactivestreams.{Publisher, Subscriber}

388

389

// From publisher

390

val publisher: Publisher[Int] = createSomePublisher()

391

val source = Source.fromPublisher(publisher)

392

393

// To publisher

394

val (publisher2, source2) = Source(1 to 10)

395

.toPublisher(fanout = false)

396

.preMaterialize()

397

398

// From subscriber

399

val subscriber: Subscriber[String] = createSomeSubscriber()

400

val sink = Sink.fromSubscriber(subscriber)

401

402

// To subscriber

403

val (subscriber2, sink2) = Sink.seq[Int]

404

.toSubscriber

405

.preMaterialize()

406

```

407

408

### Stream Converters

409

410

Utilities for converting between different stream types and Java I/O.

411

412

```scala { .api }

413

/**

414

* Conversion utilities for integrating with Java I/O and other stream types

415

*/

416

object StreamConverters {

417

/**

418

* Create a source from an InputStream

419

* @param createInputStream Function that creates the InputStream

420

* @param chunkSize Size of chunks to read

421

* @return Source of ByteString chunks

422

*/

423

def fromInputStream(createInputStream: () => InputStream, chunkSize: Int = 8192): Source[ByteString, Future[IOResult]]

424

425

/**

426

* Create a source from an OutputStream

427

* @param createOutputStream Function that creates the OutputStream

428

* @return Source materialized as OutputStream for writing

429

*/

430

def fromOutputStream(createOutputStream: () => OutputStream): Source[ByteString, OutputStream]

431

432

/**

433

* Convert this source to an InputStream

434

* @param readTimeout Timeout for read operations

435

* @return Source materialized as InputStream

436

*/

437

def asInputStream(readTimeout: FiniteDuration = 5.seconds): Source[ByteString, InputStream]

438

439

/**

440

* Convert this sink to an OutputStream

441

* @param writeTimeout Timeout for write operations

442

* @return Sink materialized as OutputStream

443

*/

444

def asOutputStream(writeTimeout: FiniteDuration = 5.seconds): Sink[ByteString, OutputStream]

445

446

/**

447

* Convert source to Java 8 Stream

448

* @return Source materialized as Java Stream

449

*/

450

def asJavaStream[T]: Source[T, java.util.stream.Stream[T]]

451

}

452

```

453

454

**Usage Examples:**

455

456

```scala

457

import akka.stream.scaladsl.StreamConverters

458

import java.io.{FileInputStream, FileOutputStream}

459

460

// From InputStream

461

val inputSource = StreamConverters.fromInputStream(() => new FileInputStream("input.txt"))

462

inputSource.runWith(Sink.foreach(chunk => println(chunk.utf8String)))

463

464

// To OutputStream

465

Source(List("Hello", "World"))

466

.map(s => ByteString(s + "\n"))

467

.runWith(StreamConverters.asOutputStream())

468

.map { outputStream =>

469

// Use the OutputStream

470

new PrintWriter(outputStream).println("Additional data")

471

}

472

473

// Java Stream integration

474

val javaStream: java.util.stream.Stream[Int] = Source(1 to 100)

475

.runWith(StreamConverters.asJavaStream())

476

```

477

478

## Types

479

480

```scala { .api }

481

// I/O result

482

final case class IOResult(count: Long, status: Try[Done]) {

483

def wasSuccessful: Boolean = status.isSuccess

484

}

485

486

// TCP connection types

487

final case class OutgoingConnection(remoteAddress: InetSocketAddress, localAddress: InetSocketAddress)

488

final case class IncomingConnection(

489

remoteAddress: InetSocketAddress,

490

localAddress: InetSocketAddress,

491

flow: Flow[ByteString, ByteString, NotUsed]

492

)

493

494

trait ServerBinding {

495

def localAddress: InetSocketAddress

496

def unbind(): Future[Done]

497

}

498

499

// TLS types

500

sealed trait SslTlsInbound

501

sealed trait SslTlsOutbound

502

final case class SessionBytes(session: SSLSession, bytes: ByteString) extends SslTlsInbound

503

final case class SendBytes(bytes: ByteString) extends SslTlsOutbound

504

505

sealed trait TLSRole

506

case object Client extends TLSRole

507

case object Server extends TLSRole

508

509

// Actor integration

510

sealed abstract class CompletionStrategy

511

case object ImmediateCompletionStrategy extends CompletionStrategy

512

case object DrainAndCompletionStrategy extends CompletionStrategy

513

```