or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

async.mdexternal-systems.mdindex.mditeration.mdjoins.mdmachine-learning.mdside-output.mdsocket.mdutilities.mdwindowing.mdwordcount.md

async.mddocs/

0

# Asynchronous I/O Examples

1

2

Non-blocking external system integration with configurable parallelism and error handling. Demonstrates async functions, thread pool management, and ordered/unordered async processing patterns.

3

4

## Capabilities

5

6

### AsyncIOExample (Java)

7

8

Comprehensive async I/O example with configurable parameters for simulating external system interactions.

9

10

```java { .api }

11

/**

12

* Example illustrating asynchronous I/O operations with external systems

13

* Supports ordered/unordered processing, checkpointing, and error simulation

14

* @param args Command line arguments for configuration

15

*/

16

public class AsyncIOExample {

17

public static void main(String[] args) throws Exception;

18

}

19

```

20

21

**Usage Example:**

22

23

```bash

24

# Run with default configuration

25

java -cp flink-examples-streaming_2.10-1.3.3.jar \

26

org.apache.flink.streaming.examples.async.AsyncIOExample

27

28

# Run with custom configuration

29

java -cp flink-examples-streaming_2.10-1.3.3.jar \

30

org.apache.flink.streaming.examples.async.AsyncIOExample \

31

--fsStatePath /tmp/checkpoints \

32

--checkpointMode exactly_once \

33

--maxCount 50000 \

34

--sleepFactor 200 \

35

--failRatio 0.01 \

36

--waitMode ordered \

37

--eventType EventTime \

38

--timeout 5000

39

```

40

41

### AsyncIOExample (Scala)

42

43

Scala implementation of async I/O patterns using functional programming constructs.

44

45

```scala { .api }

46

/**

47

* Scala version of async I/O example

48

* @param args Command line arguments

49

*/

50

object AsyncIOExample {

51

def main(args: Array[String]): Unit;

52

}

53

```

54

55

## Key Async Patterns

56

57

### Async Function Implementation

58

59

```java

60

private static class SampleAsyncFunction extends RichAsyncFunction<Integer, String> {

61

private static ExecutorService executorService;

62

63

@Override

64

public void open(Configuration parameters) throws Exception {

65

super.open(parameters);

66

synchronized (SampleAsyncFunction.class) {

67

if (counter == 0) {

68

executorService = Executors.newFixedThreadPool(30);

69

}

70

++counter;

71

}

72

}

73

74

@Override

75

public void asyncInvoke(final Integer input, final AsyncCollector<String> collector)

76

throws Exception {

77

executorService.submit(new Runnable() {

78

@Override

79

public void run() {

80

try {

81

// Simulate async operation delay

82

long sleep = (long) (random.nextFloat() * sleepFactor);

83

Thread.sleep(sleep);

84

85

// Simulate occasional failures

86

if (random.nextFloat() < failRatio) {

87

collector.collect(new Exception("Simulated async error"));

88

} else {

89

collector.collect(Collections.singletonList("key-" + (input % 10)));

90

}

91

} catch (InterruptedException e) {

92

collector.collect(new ArrayList<String>(0));

93

}

94

}

95

});

96

}

97

98

@Override

99

public void close() throws Exception {

100

synchronized (SampleAsyncFunction.class) {

101

--counter;

102

if (counter == 0) {

103

executorService.shutdown();

104

if (!executorService.awaitTermination(shutdownWaitTS, TimeUnit.MILLISECONDS)) {

105

executorService.shutdownNow();

106

}

107

}

108

}

109

}

110

}

111

```

112

113

### Ordered vs Unordered Async Processing

114

115

```java

116

// Create async function

117

AsyncFunction<Integer, String> function = new SampleAsyncFunction(sleepFactor, failRatio, shutdownWaitTS);

118

119

// Ordered async processing (maintains element order)

120

DataStream<String> orderedResult = AsyncDataStream.orderedWait(

121

inputStream,

122

function,

123

timeout,

124

TimeUnit.MILLISECONDS,

125

20 // Queue capacity

126

).setParallelism(taskNum);

127

128

// Unordered async processing (higher throughput)

129

DataStream<String> unorderedResult = AsyncDataStream.unorderedWait(

130

inputStream,

131

function,

132

timeout,

133

TimeUnit.MILLISECONDS,

134

20 // Queue capacity

135

).setParallelism(taskNum);

136

```

137

138

### Checkpointed Source Function

139

140

```java

141

private static class SimpleSource implements SourceFunction<Integer>, ListCheckpointed<Integer> {

142

private volatile boolean isRunning = true;

143

private int counter = 0;

144

private int start = 0;

145

146

@Override

147

public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {

148

return Collections.singletonList(start);

149

}

150

151

@Override

152

public void restoreState(List<Integer> state) throws Exception {

153

for (Integer i : state) {

154

this.start = i;

155

}

156

}

157

158

@Override

159

public void run(SourceContext<Integer> ctx) throws Exception {

160

while ((start < counter || counter == -1) && isRunning) {

161

synchronized (ctx.getCheckpointLock()) {

162

ctx.collect(start);

163

++start;

164

if (start == Integer.MAX_VALUE) {

165

start = 0; // Loop back to 0

166

}

167

}

168

Thread.sleep(10L);

169

}

170

}

171

172

@Override

173

public void cancel() {

174

isRunning = false;

175

}

176

}

177

```

178

179

## Configuration Options

180

181

### Checkpointing Configuration

182

183

```java

184

// State backend configuration

185

if (statePath != null) {

186

env.setStateBackend(new FsStateBackend(statePath));

187

}

188

189

// Checkpointing mode

190

if (EXACTLY_ONCE_MODE.equals(cpMode)) {

191

env.enableCheckpointing(1000L, CheckpointingMode.EXACTLY_ONCE);

192

} else {

193

env.enableCheckpointing(1000L, CheckpointingMode.AT_LEAST_ONCE);

194

}

195

```

196

197

### Time Characteristics

198

199

```java

200

// Event time processing

201

if (EVENT_TIME.equals(timeType)) {

202

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

203

} else if (INGESTION_TIME.equals(timeType)) {

204

env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);

205

}

206

// Processing time is default

207

```

208

209

### Async Function Parameters

210

211

```java { .api }

212

/**

213

* Async function constructor parameters

214

* @param sleepFactor Maximum sleep time for simulating async delay

215

* @param failRatio Probability of generating exceptions (0.0 to 1.0)

216

* @param shutdownWaitTS Milliseconds to wait for executor shutdown

217

*/

218

SampleAsyncFunction(long sleepFactor, float failRatio, long shutdownWaitTS);

219

```

220

221

## Command Line Parameters

222

223

### Required Parameters

224

- `--fsStatePath`: File system path for checkpointing state

225

- `--checkpointMode`: `exactly_once` or `at_least_once`

226

- `--maxCount`: Maximum number of elements from source (-1 for infinite)

227

- `--sleepFactor`: Async operation delay simulation factor

228

- `--failRatio`: Error simulation ratio (0.0 to 1.0)

229

- `--waitMode`: `ordered` or `unordered` async processing

230

- `--waitOperatorParallelism`: Parallelism for async wait operator

231

- `--eventType`: `EventTime`, `IngestionTime`, or `ProcessingTime`

232

- `--shutdownWaitTS`: Thread pool shutdown timeout (milliseconds)

233

- `--timeout`: Timeout for async operations (milliseconds)

234

235

### Parameter Examples

236

```bash

237

--fsStatePath /tmp/flink-checkpoints

238

--checkpointMode exactly_once

239

--maxCount 100000

240

--sleepFactor 100

241

--failRatio 0.001

242

--waitMode ordered

243

--waitOperatorParallelism 4

244

--eventType EventTime

245

--shutdownWaitTS 20000

246

--timeout 10000

247

```

248

249

## Error Handling Patterns

250

251

### Exception Handling in Async Functions

252

```java

253

@Override

254

public void asyncInvoke(final Integer input, final AsyncCollector<String> collector)

255

throws Exception {

256

try {

257

// Perform async operation

258

CompletableFuture<String> result = externalSystemCall(input);

259

result.whenComplete((value, exception) -> {

260

if (exception != null) {

261

collector.collect(exception);

262

} else {

263

collector.collect(Collections.singletonList(value));

264

}

265

});

266

} catch (Exception e) {

267

collector.collect(e);

268

}

269

}

270

```

271

272

### Timeout Configuration

273

```java

274

// Set timeout for async operations

275

AsyncDataStream.orderedWait(

276

inputStream,

277

asyncFunction,

278

5000L, // 5 second timeout

279

TimeUnit.MILLISECONDS,

280

100 // Queue capacity

281

);

282

```

283

284

## Thread Pool Management

285

286

### Thread Pool Lifecycle

287

```java

288

@Override

289

public void open(Configuration parameters) throws Exception {

290

synchronized (SampleAsyncFunction.class) {

291

if (counter == 0) {

292

// Create shared thread pool

293

executorService = Executors.newFixedThreadPool(30);

294

}

295

++counter;

296

}

297

}

298

299

@Override

300

public void close() throws Exception {

301

synchronized (SampleAsyncFunction.class) {

302

--counter;

303

if (counter == 0) {

304

// Shutdown thread pool when last instance closes

305

executorService.shutdown();

306

if (!executorService.awaitTermination(shutdownWaitTS, TimeUnit.MILLISECONDS)) {

307

executorService.shutdownNow();

308

}

309

}

310

}

311

}

312

```

313

314

## Dependencies

315

316

```xml

317

<dependency>

318

<groupId>org.apache.flink</groupId>

319

<artifactId>flink-streaming-java_2.10</artifactId>

320

<version>1.3.3</version>

321

</dependency>

322

323

<dependency>

324

<groupId>org.apache.flink</groupId>

325

<artifactId>flink-runtime_2.10</artifactId>

326

<version>1.3.3</version>

327

</dependency>

328

```

329

330

## Required Imports

331

332

```java

333

import org.apache.flink.api.common.functions.FlatMapFunction;

334

import org.apache.flink.api.java.tuple.Tuple2;

335

import org.apache.flink.configuration.Configuration;

336

import org.apache.flink.runtime.state.filesystem.FsStateBackend;

337

import org.apache.flink.streaming.api.CheckpointingMode;

338

import org.apache.flink.streaming.api.TimeCharacteristic;

339

import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;

340

import org.apache.flink.streaming.api.datastream.AsyncDataStream;

341

import org.apache.flink.streaming.api.datastream.DataStream;

342

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

343

import org.apache.flink.streaming.api.functions.async.AsyncFunction;

344

import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;

345

import org.apache.flink.streaming.api.functions.source.SourceFunction;

346

import org.apache.flink.streaming.api.functions.async.collector.AsyncCollector;

347

```