or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

async-operations.mddata-stream.mdfunctions.mdindex.mdjoining.mdkeyed-stream.mdstream-execution-environment.mdwindowing.md

async-operations.mddocs/

0

# Async Operations

1

2

Flink's Async I/O functionality allows you to perform non-blocking calls to external systems (databases, web services, etc.) without blocking the stream processing pipeline. This can significantly improve throughput when dealing with external data enrichment or lookups.

3

4

## Capabilities

5

6

### AsyncDataStream

7

8

Main entry point for creating asynchronous data streams.

9

10

```scala { .api }

11

/**

12

* AsyncDataStream operations for creating async streams

13

*/

14

object AsyncDataStream {

15

def orderedWait[IN, OUT](

16

stream: DataStream[IN],

17

function: AsyncFunction[IN, OUT],

18

timeout: Long,

19

timeUnit: TimeUnit

20

): DataStream[OUT]

21

22

def orderedWait[IN, OUT](

23

stream: DataStream[IN],

24

function: AsyncFunction[IN, OUT],

25

timeout: Long,

26

timeUnit: TimeUnit,

27

capacity: Int

28

): DataStream[OUT]

29

30

def unorderedWait[IN, OUT](

31

stream: DataStream[IN],

32

function: AsyncFunction[IN, OUT],

33

timeout: Long,

34

timeUnit: TimeUnit

35

): DataStream[OUT]

36

37

def unorderedWait[IN, OUT](

38

stream: DataStream[IN],

39

function: AsyncFunction[IN, OUT],

40

timeout: Long,

41

timeUnit: TimeUnit,

42

capacity: Int

43

): DataStream[OUT]

44

}

45

```

46

47

**Usage Examples:**

48

49

```scala

50

import org.apache.flink.streaming.api.scala._

51

import org.apache.flink.streaming.api.scala.async.AsyncDataStream

52

import java.util.concurrent.TimeUnit

53

54

case class User(id: Int, name: String)

55

case class UserWithEmail(id: Int, name: String, email: String)

56

57

val users = env.fromElements(

58

User(1, "Alice"),

59

User(2, "Bob"),

60

User(3, "Charlie")

61

)

62

63

// Ordered async operation - maintains order of results

64

val enrichedUsersOrdered = AsyncDataStream.orderedWait(

65

users,

66

new UserEmailAsyncFunction(),

67

5000, // 5 second timeout

68

TimeUnit.MILLISECONDS

69

)

70

71

// Unordered async operation - better throughput, no order guarantee

72

val enrichedUsersUnordered = AsyncDataStream.unorderedWait(

73

users,

74

new UserEmailAsyncFunction(),

75

5000, // 5 second timeout

76

TimeUnit.MILLISECONDS,

77

100 // capacity of 100 concurrent requests

78

)

79

```

80

81

### AsyncFunction Interface

82

83

Core interface for implementing asynchronous functions.

84

85

```scala { .api }

86

/**

87

* AsyncFunction interface for async operations

88

*/

89

trait AsyncFunction[IN, OUT] {

90

def asyncInvoke(input: IN, resultFuture: ResultFuture[OUT]): Unit

91

92

// Optional timeout handling

93

def timeout(input: IN, resultFuture: ResultFuture[OUT]): Unit = {

94

resultFuture.completeExceptionally(

95

new TimeoutException("Async operation timed out")

96

)

97

}

98

}

99

```

100

101

**Usage Examples:**

102

103

```scala

104

import org.apache.flink.streaming.api.scala.async.{AsyncFunction, ResultFuture}

105

import scala.concurrent.{ExecutionContext, Future}

106

import scala.util.{Success, Failure}

107

import java.util.concurrent.{CompletableFuture, TimeoutException}

108

109

// Async function using Scala Future

110

class UserEmailAsyncFunction extends AsyncFunction[User, UserWithEmail] {

111

112

implicit val ec: ExecutionContext = ExecutionContext.global

113

114

override def asyncInvoke(user: User, resultFuture: ResultFuture[UserWithEmail]): Unit = {

115

// Simulate async database lookup

116

val emailFuture: Future[String] = lookupUserEmail(user.id)

117

118

emailFuture.onComplete {

119

case Success(email) =>

120

resultFuture.complete(List(UserWithEmail(user.id, user.name, email)).asJava)

121

case Failure(exception) =>

122

resultFuture.completeExceptionally(exception)

123

}

124

}

125

126

override def timeout(user: User, resultFuture: ResultFuture[UserWithEmail]): Unit = {

127

// Custom timeout handling

128

resultFuture.complete(List(UserWithEmail(user.id, user.name, "timeout@example.com")).asJava)

129

}

130

131

private def lookupUserEmail(userId: Int): Future[String] = {

132

// Simulate async call to external service

133

Future {

134

Thread.sleep(100) // Simulate network delay

135

s"user$userId@example.com"

136

}

137

}

138

}

139

140

// Async function using Java CompletableFuture

141

class UserAsyncFunctionJava extends AsyncFunction[User, UserWithEmail] {

142

143

override def asyncInvoke(user: User, resultFuture: ResultFuture[UserWithEmail]): Unit = {

144

val future = CompletableFuture

145

.supplyAsync(() => s"user${user.id}@example.com")

146

.thenApply(email => UserWithEmail(user.id, user.name, email))

147

148

future.whenComplete((result, exception) => {

149

if (exception == null) {

150

resultFuture.complete(List(result).asJava)

151

} else {

152

resultFuture.completeExceptionally(exception)

153

}

154

})

155

}

156

}

157

```

158

159

### RichAsyncFunction

160

161

Rich version of AsyncFunction with access to runtime context and lifecycle methods.

162

163

```scala { .api }

164

/**

165

* RichAsyncFunction with lifecycle and context access

166

*/

167

abstract class RichAsyncFunction[IN, OUT] extends AsyncFunction[IN, OUT] {

168

def open(parameters: Configuration): Unit = {}

169

def close(): Unit = {}

170

def getRuntimeContext: RuntimeContext

171

def setRuntimeContext(context: RuntimeContext): Unit

172

def getIterationRuntimeContext: IterationRuntimeContext

173

}

174

```

175

176

**Usage Examples:**

177

178

```scala

179

import org.apache.flink.streaming.api.scala.async.RichAsyncFunction

180

import org.apache.flink.configuration.Configuration

181

import java.util.concurrent.{ExecutorService, Executors}

182

183

class DatabaseAsyncFunction extends RichAsyncFunction[String, String] {

184

185

private var executor: ExecutorService = _

186

private var connectionPool: DatabaseConnectionPool = _

187

188

override def open(parameters: Configuration): Unit = {

189

// Initialize resources

190

executor = Executors.newFixedThreadPool(10)

191

connectionPool = new DatabaseConnectionPool(

192

getRuntimeContext.getExecutionConfig.getGlobalJobParameters

193

)

194

}

195

196

override def close(): Unit = {

197

// Cleanup resources

198

if (executor != null) {

199

executor.shutdown()

200

}

201

if (connectionPool != null) {

202

connectionPool.close()

203

}

204

}

205

206

override def asyncInvoke(input: String, resultFuture: ResultFuture[String]): Unit = {

207

executor.submit(new Runnable {

208

override def run(): Unit = {

209

try {

210

val connection = connectionPool.getConnection()

211

val result = connection.query(s"SELECT value FROM table WHERE key = '$input'")

212

resultFuture.complete(List(result).asJava)

213

} catch {

214

case ex: Exception => resultFuture.completeExceptionally(ex)

215

}

216

}

217

})

218

}

219

}

220

```

221

222

### ResultFuture Interface

223

224

Interface for completing asynchronous operations.

225

226

```scala { .api }

227

/**

228

* ResultFuture interface for completing async operations

229

*/

230

trait ResultFuture[OUT] {

231

def complete(result: java.util.Collection[OUT]): Unit

232

def completeExceptionally(throwable: Throwable): Unit

233

}

234

```

235

236

**Usage Examples:**

237

238

```scala

239

// Complete with single result

240

resultFuture.complete(List(result).asJava)

241

242

// Complete with multiple results

243

resultFuture.complete(List(result1, result2, result3).asJava)

244

245

// Complete with empty result (filter effect)

246

resultFuture.complete(List.empty[OUT].asJava)

247

248

// Complete with exception

249

resultFuture.completeExceptionally(new RuntimeException("Async operation failed"))

250

```

251

252

### Async Function Patterns

253

254

Common patterns for implementing async functions.

255

256

**Database Lookup Pattern:**

257

258

```scala

259

class DatabaseLookupFunction extends AsyncFunction[String, (String, String)] {

260

261

private val httpClient = HttpAsyncClients.createDefault()

262

263

override def asyncInvoke(key: String, resultFuture: ResultFuture[(String, String)]): Unit = {

264

val request = new HttpGet(s"http://api.example.com/lookup?key=$key")

265

266

httpClient.execute(request, new FutureCallback[HttpResponse] {

267

override def completed(response: HttpResponse): Unit = {

268

val value = EntityUtils.toString(response.getEntity)

269

resultFuture.complete(List((key, value)).asJava)

270

}

271

272

override def failed(ex: Exception): Unit = {

273

resultFuture.completeExceptionally(ex)

274

}

275

276

override def cancelled(): Unit = {

277

resultFuture.completeExceptionally(new RuntimeException("Request cancelled"))

278

}

279

})

280

}

281

}

282

```

283

284

**Caching Pattern:**

285

286

```scala

287

import scala.collection.concurrent.TrieMap

288

289

class CachedAsyncFunction extends RichAsyncFunction[String, String] {

290

291

private val cache = new TrieMap[String, String]()

292

private var cacheHits: Counter = _

293

private var cacheMisses: Counter = _

294

295

override def open(parameters: Configuration): Unit = {

296

cacheHits = getRuntimeContext.getMetricGroup.counter("cache_hits")

297

cacheMisses = getRuntimeContext.getMetricGroup.counter("cache_misses")

298

}

299

300

override def asyncInvoke(input: String, resultFuture: ResultFuture[String]): Unit = {

301

cache.get(input) match {

302

case Some(cachedValue) =>

303

cacheHits.inc()

304

resultFuture.complete(List(cachedValue).asJava)

305

306

case None =>

307

cacheMisses.inc()

308

// Perform async lookup

309

performAsyncLookup(input).onComplete {

310

case Success(value) =>

311

cache.put(input, value)

312

resultFuture.complete(List(value).asJava)

313

case Failure(exception) =>

314

resultFuture.completeExceptionally(exception)

315

}

316

}

317

}

318

319

private def performAsyncLookup(input: String): Future[String] = {

320

// Implementation of actual async lookup

321

???

322

}

323

}

324

```

325

326

### Configuration and Tuning

327

328

Important configuration parameters for async operations.

329

330

```scala { .api }

331

/**

332

* Configuration parameters for async operations

333

*/

334

// Timeout: Maximum time to wait for async operation

335

// Capacity: Maximum number of concurrent async operations

336

// Order: Whether to maintain order of results

337

338

// Environment-level configuration

339

val env = StreamExecutionEnvironment.getExecutionEnvironment

340

env.getConfig.setAsyncTimeout(Duration.ofSeconds(10))

341

```

342

343

## Types

344

345

```scala { .api }

346

// Main async types

347

object AsyncDataStream

348

trait AsyncFunction[IN, OUT]

349

abstract class RichAsyncFunction[IN, OUT] extends AsyncFunction[IN, OUT]

350

trait ResultFuture[OUT]

351

352

// Configuration and context types

353

class Configuration

354

trait RuntimeContext

355

trait IterationRuntimeContext

356

357

// Java interop types

358

class TimeUnit

359

class TimeoutException

360

class CompletableFuture[T]

361

362

// Metric types

363

trait Counter

364

trait MetricGroup

365

```