or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

async-io.mddata-streams.mdexecution-environment.mdindex.mdkeyed-streams.mdprocessing-functions.mdsinks-output.mdstream-connections.mdwindow-functions.mdwindowing.md

async-io.mddocs/

0

# Async I/O Operations

1

2

AsyncDataStream provides high-performance async operations for enriching streams with external data without blocking stream processing. This is essential for calling external services, databases, or APIs while maintaining streaming performance.

3

4

## Capabilities

5

6

### Unordered Async Operations

7

8

Async operations where result order doesn't need to match input order (higher throughput).

9

10

```scala { .api }

11

object AsyncDataStream {

12

/**

13

* Apply async transformation with unordered results using AsyncFunction

14

* @param input Input DataStream to transform

15

* @param asyncFunction AsyncFunction implementation

16

* @param timeout Timeout for async operations

17

* @param timeUnit Time unit for timeout

18

* @param capacity Queue capacity for async operations

19

* @return DataStream with async transformation results

20

*/

21

def unorderedWait[IN, OUT: TypeInformation](

22

input: DataStream[IN],

23

asyncFunction: AsyncFunction[IN, OUT],

24

timeout: Long,

25

timeUnit: TimeUnit,

26

capacity: Int

27

): DataStream[OUT]

28

29

/**

30

* Apply async transformation with unordered results (default capacity)

31

* @param input Input DataStream to transform

32

* @param asyncFunction AsyncFunction implementation

33

* @param timeout Timeout for async operations

34

* @param timeUnit Time unit for timeout

35

* @return DataStream with async transformation results

36

*/

37

def unorderedWait[IN, OUT: TypeInformation](

38

input: DataStream[IN],

39

asyncFunction: AsyncFunction[IN, OUT],

40

timeout: Long,

41

timeUnit: TimeUnit

42

): DataStream[OUT]

43

44

/**

45

* Apply async transformation using function syntax (with capacity)

46

* @param input Input DataStream to transform

47

* @param timeout Timeout for async operations

48

* @param timeUnit Time unit for timeout

49

* @param capacity Queue capacity for async operations

50

* @param asyncFunction Function taking input and ResultFuture

51

* @return DataStream with async transformation results

52

*/

53

def unorderedWait[IN, OUT: TypeInformation](

54

input: DataStream[IN],

55

timeout: Long,

56

timeUnit: TimeUnit,

57

capacity: Int

58

)(asyncFunction: (IN, ResultFuture[OUT]) => Unit): DataStream[OUT]

59

60

/**

61

* Apply async transformation using function syntax (default capacity)

62

* @param input Input DataStream to transform

63

* @param timeout Timeout for async operations

64

* @param timeUnit Time unit for timeout

65

* @param asyncFunction Function taking input and ResultFuture

66

* @return DataStream with async transformation results

67

*/

68

def unorderedWait[IN, OUT: TypeInformation](

69

input: DataStream[IN],

70

timeout: Long,

71

timeUnit: TimeUnit

72

)(asyncFunction: (IN, ResultFuture[OUT]) => Unit): DataStream[OUT]

73

}

74

```

75

76

### Ordered Async Operations

77

78

Async operations where result order matches input order (preserves stream ordering).

79

80

```scala { .api }

81

object AsyncDataStream {

82

/**

83

* Apply async transformation with ordered results using AsyncFunction

84

* @param input Input DataStream to transform

85

* @param asyncFunction AsyncFunction implementation

86

* @param timeout Timeout for async operations

87

* @param timeUnit Time unit for timeout

88

* @param capacity Queue capacity for async operations

89

* @return DataStream with async transformation results (ordered)

90

*/

91

def orderedWait[IN, OUT: TypeInformation](

92

input: DataStream[IN],

93

asyncFunction: AsyncFunction[IN, OUT],

94

timeout: Long,

95

timeUnit: TimeUnit,

96

capacity: Int

97

): DataStream[OUT]

98

99

/**

100

* Apply async transformation with ordered results (default capacity)

101

* @param input Input DataStream to transform

102

* @param asyncFunction AsyncFunction implementation

103

* @param timeout Timeout for async operations

104

* @param timeUnit Time unit for timeout

105

* @return DataStream with async transformation results (ordered)

106

*/

107

def orderedWait[IN, OUT: TypeInformation](

108

input: DataStream[IN],

109

asyncFunction: AsyncFunction[IN, OUT],

110

timeout: Long,

111

timeUnit: TimeUnit

112

): DataStream[OUT]

113

114

/**

115

* Apply async transformation using function syntax with ordering (with capacity)

116

* @param input Input DataStream to transform

117

* @param timeout Timeout for async operations

118

* @param timeUnit Time unit for timeout

119

* @param capacity Queue capacity for async operations

120

* @param asyncFunction Function taking input and ResultFuture

121

* @return DataStream with async transformation results (ordered)

122

*/

123

def orderedWait[IN, OUT: TypeInformation](

124

input: DataStream[IN],

125

timeout: Long,

126

timeUnit: TimeUnit,

127

capacity: Int

128

)(asyncFunction: (IN, ResultFuture[OUT]) => Unit): DataStream[OUT]

129

130

/**

131

* Apply async transformation using function syntax with ordering (default capacity)

132

* @param input Input DataStream to transform

133

* @param timeout Timeout for async operations

134

* @param timeUnit Time unit for timeout

135

* @param asyncFunction Function taking input and ResultFuture

136

* @return DataStream with async transformation results (ordered)

137

*/

138

def orderedWait[IN, OUT: TypeInformation](

139

input: DataStream[IN],

140

timeout: Long,

141

timeUnit: TimeUnit

142

)(asyncFunction: (IN, ResultFuture[OUT]) => Unit): DataStream[OUT]

143

}

144

```

145

146

**Usage Examples:**

147

148

```scala

149

import org.apache.flink.streaming.api.scala._

150

import org.apache.flink.streaming.api.scala.async.AsyncDataStream

151

import org.apache.flink.streaming.api.scala.async.{AsyncFunction, ResultFuture}

152

import java.util.concurrent.TimeUnit

153

import scala.concurrent.Future

154

import scala.util.{Success, Failure}

155

156

case class User(id: String, name: String)

157

case class UserProfile(id: String, name: String, email: String, department: String)

158

159

val users = env.fromElements(

160

User("u1", "Alice"),

161

User("u2", "Bob"),

162

User("u3", "Charlie")

163

)

164

165

// Using AsyncFunction implementation

166

class DatabaseLookupFunction extends AsyncFunction[User, UserProfile] {

167

override def asyncInvoke(user: User, resultFuture: ResultFuture[UserProfile]): Unit = {

168

// Simulate async database call

169

Future {

170

// Database lookup simulation

171

UserProfile(user.id, user.name, s"${user.name.toLowerCase}@company.com", "Engineering")

172

}.onComplete {

173

case Success(profile) => resultFuture.complete(java.util.Collections.singletonList(profile))

174

case Failure(exception) => resultFuture.completeExceptionally(exception)

175

}

176

}

177

}

178

179

// Unordered async transformation (higher throughput)

180

val enrichedUsersUnordered = AsyncDataStream.unorderedWait(

181

users,

182

new DatabaseLookupFunction,

183

5000, // 5 second timeout

184

TimeUnit.MILLISECONDS

185

)

186

187

// Ordered async transformation (preserves order)

188

val enrichedUsersOrdered = AsyncDataStream.orderedWait(

189

users,

190

new DatabaseLookupFunction,

191

5000,

192

TimeUnit.MILLISECONDS

193

)

194

195

// Using function syntax

196

val enrichedWithFunction = AsyncDataStream.unorderedWait(

197

users,

198

5000,

199

TimeUnit.MILLISECONDS

200

) { (user: User, resultFuture: ResultFuture[UserProfile]) =>

201

// Async operation using function syntax

202

Future {

203

UserProfile(user.id, user.name, s"${user.name.toLowerCase}@company.com", "Engineering")

204

}.onComplete {

205

case Success(profile) => resultFuture.complete(java.util.Collections.singletonList(profile))

206

case Failure(exception) => resultFuture.completeExceptionally(exception)

207

}

208

}

209

```

210

211

### AsyncFunction Interface

212

213

Base interface for implementing async operations.

214

215

```scala { .api }

216

trait AsyncFunction[IN, OUT] {

217

/**

218

* Async invocation method - implement async logic here

219

* @param input Input element to process

220

* @param resultFuture ResultFuture to complete with results

221

*/

222

def asyncInvoke(input: IN, resultFuture: ResultFuture[OUT]): Unit

223

224

/**

225

* Optional timeout handling method

226

* @param input Input element that timed out

227

* @param resultFuture ResultFuture to complete for timeout case

228

*/

229

def timeout(input: IN, resultFuture: ResultFuture[OUT]): Unit = {

230

resultFuture.completeExceptionally(

231

new RuntimeException(s"Async operation timed out for input: $input")

232

)

233

}

234

}

235

```

236

237

### RichAsyncFunction

238

239

Rich version of AsyncFunction with access to runtime context.

240

241

```scala { .api }

242

abstract class RichAsyncFunction[IN, OUT] extends AsyncFunction[IN, OUT] with RichFunction {

243

/**

244

* Get the runtime context

245

* @return RuntimeContext for accessing configuration and metrics

246

*/

247

def getRuntimeContext: RuntimeContext

248

249

/**

250

* Open method called once per parallel instance

251

* @param parameters Configuration parameters

252

*/

253

override def open(parameters: Configuration): Unit = {}

254

255

/**

256

* Close method called when function is shut down

257

*/

258

override def close(): Unit = {}

259

260

/**

261

* Set runtime context (called by Flink runtime)

262

* @param t Runtime context

263

*/

264

override def setRuntimeContext(t: RuntimeContext): Unit

265

}

266

```

267

268

**Usage Examples:**

269

270

```scala

271

import org.apache.flink.streaming.api.scala.async.RichAsyncFunction

272

import org.apache.flink.configuration.Configuration

273

import java.sql.{Connection, DriverManager}

274

275

class DatabaseAsyncFunction extends RichAsyncFunction[User, UserProfile] {

276

private var connection: Connection = _

277

278

override def open(parameters: Configuration): Unit = {

279

// Initialize database connection

280

connection = DriverManager.getConnection("jdbc:h2:mem:testdb", "sa", "")

281

// Setup connection pool, initialize client, etc.

282

}

283

284

override def asyncInvoke(user: User, resultFuture: ResultFuture[UserProfile]): Unit = {

285

Future {

286

// Use the connection for database lookup

287

val stmt = connection.prepareStatement("SELECT * FROM user_profiles WHERE id = ?")

288

stmt.setString(1, user.id)

289

val rs = stmt.executeQuery()

290

291

if (rs.next()) {

292

UserProfile(

293

rs.getString("id"),

294

rs.getString("name"),

295

rs.getString("email"),

296

rs.getString("department")

297

)

298

} else {

299

throw new RuntimeException(s"User profile not found for ${user.id}")

300

}

301

}.onComplete {

302

case Success(profile) => resultFuture.complete(java.util.Collections.singletonList(profile))

303

case Failure(exception) => resultFuture.completeExceptionally(exception)

304

}

305

}

306

307

override def timeout(user: User, resultFuture: ResultFuture[UserProfile]): Unit = {

308

// Custom timeout handling

309

resultFuture.complete(java.util.Collections.singletonList(

310

UserProfile(user.id, user.name, "unknown@company.com", "Unknown")

311

))

312

}

313

314

override def close(): Unit = {

315

if (connection != null) {

316

connection.close()

317

}

318

}

319

}

320

```

321

322

### ResultFuture Interface

323

324

Interface for completing async operations with results or errors.

325

326

```scala { .api }

327

trait ResultFuture[T] {

328

/**

329

* Complete the async operation with a collection of results

330

* @param result Collection of result elements

331

*/

332

def complete(result: java.util.Collection[T]): Unit

333

334

/**

335

* Complete the async operation with varargs results

336

* @param result Result elements as varargs

337

*/

338

def complete(result: T*): Unit = {

339

complete(java.util.Arrays.asList(result: _*))

340

}

341

342

/**

343

* Complete the async operation with an exception

344

* @param error Exception that occurred during async operation

345

*/

346

def completeExceptionally(error: Throwable): Unit

347

}

348

```

349

350

## Retry Strategies

351

352

Support for retry logic in async operations.

353

354

```scala { .api }

355

// Retry strategy interface

356

trait AsyncRetryStrategy[OUT] {

357

def canRetry(currentAttempts: Int): Boolean

358

def getRetryDelay(currentAttempts: Int): Long

359

}

360

361

// Retry predicate interface

362

trait AsyncRetryPredicate[OUT] {

363

def resultPredicate(result: OUT): Boolean

364

def exceptionPredicate(exception: Throwable): Boolean

365

}

366

367

// Built-in retry strategies

368

object AsyncRetryStrategies {

369

def fixedDelay(maxAttempts: Int, delay: Long): AsyncRetryStrategy[_]

370

def exponentialBackoff(maxAttempts: Int, initialDelay: Long, multiplier: Double): AsyncRetryStrategy[_]

371

def noRetry(): AsyncRetryStrategy[_]

372

}

373

374

// Built-in retry predicates

375

object RetryPredicates {

376

def hasException[OUT](exceptionClass: Class[_ <: Throwable]): AsyncRetryPredicate[OUT]

377

def hasResult[OUT](predicate: OUT => Boolean): AsyncRetryPredicate[OUT]

378

}

379

```

380

381

## Types

382

383

```scala { .api }

384

// Time units for timeout specification

385

object TimeUnit extends Enumeration {

386

val NANOSECONDS, MICROSECONDS, MILLISECONDS, SECONDS, MINUTES, HOURS, DAYS = Value

387

}

388

389

// Runtime context for rich functions

390

trait RuntimeContext {

391

def getTaskName: String

392

def getMetricGroup: MetricGroup

393

def getNumberOfParallelSubtasks: Int

394

def getIndexOfThisSubtask: Int

395

def getExecutionConfig: ExecutionConfig

396

def getUserCodeClassLoader: ClassLoader

397

}

398

399

// Rich function base interface

400

trait RichFunction {

401

def open(parameters: Configuration): Unit

402

def close(): Unit

403

def getRuntimeContext: RuntimeContext

404

def setRuntimeContext(t: RuntimeContext): Unit

405

}

406

407

// Configuration for open method

408

class Configuration extends java.util.HashMap[String, String] {

409

def getString(key: String, defaultValue: String): String

410

def getInteger(key: String, defaultValue: Int): Int

411

def getBoolean(key: String, defaultValue: Boolean): Boolean

412

def getFloat(key: String, defaultValue: Float): Float

413

def getDouble(key: String, defaultValue: Double): Double

414

def getLong(key: String, defaultValue: Long): Long

415

}

416

```