or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

async-io.mdcheckpointing.mddatastream-transformations.mdexecution-environment.mdindex.mdkeyed-streams-state.mdprocess-functions.mdsources-sinks.mdtime-watermarks.mdwindowing.md

async-io.mddocs/

0

# Async I/O Operations

1

2

AsyncDataStream provides utilities for asynchronous I/O operations in Apache Flink, enabling efficient integration with external systems without blocking stream processing. This is particularly useful for database lookups, REST API calls, and other I/O-bound operations.

3

4

## Capabilities

5

6

### Async Data Stream Operations

7

8

Create asynchronous processing operators that can handle concurrent I/O operations.

9

10

```java { .api }

11

/**

12

* Apply async function with ordered results

13

* @param in - input DataStream

14

* @param func - async function to apply

15

* @param timeout - timeout for async operations

16

* @param timeUnit - time unit for timeout

17

* @return async processed DataStream with ordered results

18

*/

19

static <IN, OUT> SingleOutputStreamOperator<OUT> orderedWait(

20

DataStream<IN> in,

21

AsyncFunction<IN, OUT> func,

22

long timeout,

23

TimeUnit timeUnit

24

);

25

26

/**

27

* Apply async function with unordered results for better performance

28

* @param in - input DataStream

29

* @param func - async function to apply

30

* @param timeout - timeout for async operations

31

* @param timeUnit - time unit for timeout

32

* @return async processed DataStream with unordered results

33

*/

34

static <IN, OUT> SingleOutputStreamOperator<OUT> unorderedWait(

35

DataStream<IN> in,

36

AsyncFunction<IN, OUT> func,

37

long timeout,

38

TimeUnit timeUnit

39

);

40

41

/**

42

* Apply async function with ordered results and capacity

43

* @param in - input DataStream

44

* @param func - async function to apply

45

* @param timeout - timeout for async operations

46

* @param timeUnit - time unit for timeout

47

* @param capacity - capacity of async operator

48

* @return async processed DataStream

49

*/

50

static <IN, OUT> SingleOutputStreamOperator<OUT> orderedWait(

51

DataStream<IN> in,

52

AsyncFunction<IN, OUT> func,

53

long timeout,

54

TimeUnit timeUnit,

55

int capacity

56

);

57

58

/**

59

* Apply async function with unordered results and capacity

60

* @param in - input DataStream

61

* @param func - async function to apply

62

* @param timeout - timeout for async operations

63

* @param timeUnit - time unit for timeout

64

* @param capacity - capacity of async operator

65

* @return async processed DataStream

66

*/

67

static <IN, OUT> SingleOutputStreamOperator<OUT> unorderedWait(

68

DataStream<IN> in,

69

AsyncFunction<IN, OUT> func,

70

long timeout,

71

TimeUnit timeUnit,

72

int capacity

73

);

74

```

75

76

**Usage Examples:**

77

78

```java

79

// Database lookup example

80

DataStream<String> input = env.fromElements("key1", "key2", "key3");

81

82

// Ordered async processing - maintains event order

83

DataStream<String> orderedResult = AsyncDataStream.orderedWait(

84

input,

85

new DatabaseAsyncFunction(),

86

1000, TimeUnit.MILLISECONDS

87

);

88

89

// Unordered async processing - better performance, no order guarantee

90

DataStream<String> unorderedResult = AsyncDataStream.unorderedWait(

91

input,

92

new DatabaseAsyncFunction(),

93

1000, TimeUnit.MILLISECONDS,

94

100 // capacity

95

);

96

97

// Custom async function for database lookup

98

class DatabaseAsyncFunction implements AsyncFunction<String, String> {

99

private transient DatabaseClient client;

100

101

@Override

102

public void open(Configuration parameters) throws Exception {

103

client = new DatabaseClient();

104

}

105

106

@Override

107

public void asyncInvoke(String key, ResultFuture<String> resultFuture) throws Exception {

108

// Perform async database lookup

109

CompletableFuture<String> future = client.asyncGet(key);

110

111

future.whenComplete((result, throwable) -> {

112

if (throwable != null) {

113

resultFuture.completeExceptionally(throwable);

114

} else {

115

resultFuture.complete(Collections.singletonList(result));

116

}

117

});

118

}

119

}

120

```

121

122

### Rich Async Function

123

124

Use RichAsyncFunction for async operations with access to runtime context and lifecycle methods.

125

126

```java { .api }

127

/**

128

* Rich async function with lifecycle methods and runtime context access

129

*/

130

abstract class RichAsyncFunction<IN, OUT> extends AbstractRichFunction implements AsyncFunction<IN, OUT> {

131

// Inherits open(), close(), getRuntimeContext() from AbstractRichFunction

132

abstract void asyncInvoke(IN input, ResultFuture<OUT> resultFuture) throws Exception;

133

134

// Optional timeout handling

135

void timeout(IN input, ResultFuture<OUT> resultFuture) throws Exception;

136

}

137

```

138

139

**Usage Examples:**

140

141

```java

142

DataStream<UserEvent> events = env.addSource(new UserEventSource());

143

144

DataStream<EnrichedUserEvent> enriched = AsyncDataStream.orderedWait(

145

events,

146

new UserEnrichmentFunction(),

147

2000, TimeUnit.MILLISECONDS

148

);

149

150

class UserEnrichmentFunction extends RichAsyncFunction<UserEvent, EnrichedUserEvent> {

151

private transient UserProfileService profileService;

152

private transient MetricGroup asyncMetrics;

153

154

@Override

155

public void open(Configuration parameters) throws Exception {

156

super.open(parameters);

157

158

// Initialize external service

159

profileService = new UserProfileService();

160

161

// Get metrics for monitoring

162

asyncMetrics = getRuntimeContext()

163

.getMetricGroup()

164

.addGroup("async-enrichment");

165

}

166

167

@Override

168

public void asyncInvoke(UserEvent event, ResultFuture<EnrichedUserEvent> resultFuture) throws Exception {

169

// Async call to external service

170

CompletableFuture<UserProfile> profileFuture = profileService.getUserProfile(event.getUserId());

171

172

profileFuture.whenComplete((profile, throwable) -> {

173

if (throwable != null) {

174

// Handle failure - could emit default value or propagate error

175

resultFuture.complete(Collections.singletonList(

176

new EnrichedUserEvent(event, UserProfile.defaultProfile())

177

));

178

} else {

179

resultFuture.complete(Collections.singletonList(

180

new EnrichedUserEvent(event, profile)

181

));

182

}

183

});

184

}

185

186

@Override

187

public void timeout(UserEvent event, ResultFuture<EnrichedUserEvent> resultFuture) throws Exception {

188

// Handle timeout - provide default enrichment

189

resultFuture.complete(Collections.singletonList(

190

new EnrichedUserEvent(event, UserProfile.timeoutProfile())

191

));

192

}

193

194

@Override

195

public void close() throws Exception {

196

if (profileService != null) {

197

profileService.close();

198

}

199

super.close();

200

}

201

}

202

```

203

204

## Types

205

206

### Async Function Interface

207

208

```java { .api }

209

/**

210

* Interface for asynchronous functions

211

* @param <IN> - input type

212

* @param <OUT> - output type

213

*/

214

interface AsyncFunction<IN, OUT> extends Function {

215

/**

216

* Trigger async operation for the given input

217

* @param input - input element

218

* @param resultFuture - future to complete with results

219

*/

220

void asyncInvoke(IN input, ResultFuture<OUT> resultFuture) throws Exception;

221

222

/**

223

* Optional method to handle timeouts

224

* @param input - input element that timed out

225

* @param resultFuture - future to complete with results or error

226

*/

227

default void timeout(IN input, ResultFuture<OUT> resultFuture) throws Exception {

228

resultFuture.completeExceptionally(new TimeoutException("Async operation timed out"));

229

}

230

}

231

232

/**

233

* Rich async function with lifecycle methods

234

*/

235

abstract class RichAsyncFunction<IN, OUT> extends AbstractRichFunction implements AsyncFunction<IN, OUT> {

236

// Inherits lifecycle methods: open(), close(), getRuntimeContext()

237

}

238

```

239

240

### Result Future

241

242

```java { .api }

243

/**

244

* Future for completing asynchronous operations

245

* @param <OUT> - output type

246

*/

247

interface ResultFuture<OUT> {

248

/**

249

* Complete the async operation with results

250

* @param result - collection of result elements

251

*/

252

void complete(Collection<OUT> result);

253

254

/**

255

* Complete the async operation with an exception

256

* @param error - exception that occurred

257

*/

258

void completeExceptionally(Throwable error);

259

}

260

```

261

262

### AsyncDataStream Utility

263

264

```java { .api }

265

/**

266

* Utility class for creating async operators

267

*/

268

class AsyncDataStream {

269

// Static factory methods for creating async operators

270

static <IN, OUT> SingleOutputStreamOperator<OUT> orderedWait(

271

DataStream<IN> in, AsyncFunction<IN, OUT> func, long timeout, TimeUnit timeUnit);

272

273

static <IN, OUT> SingleOutputStreamOperator<OUT> unorderedWait(

274

DataStream<IN> in, AsyncFunction<IN, OUT> func, long timeout, TimeUnit timeUnit);

275

276

static <IN, OUT> SingleOutputStreamOperator<OUT> orderedWait(

277

DataStream<IN> in, AsyncFunction<IN, OUT> func, long timeout, TimeUnit timeUnit, int capacity);

278

279

static <IN, OUT> SingleOutputStreamOperator<OUT> unorderedWait(

280

DataStream<IN> in, AsyncFunction<IN, OUT> func, long timeout, TimeUnit timeUnit, int capacity);

281

}

282

```