or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

async.mdexternal-systems.mdindex.mditeration.mdjoins.mdmachine-learning.mdside-output.mdsocket.mdutilities.mdwindowing.mdwordcount.md

external-systems.mddocs/

0

# External System Integration

1

2

Integration examples for Kafka, Twitter, and other external data sources with fault-tolerant connectors. Demonstrates real-world stream processing with external system integration patterns.

3

4

## Capabilities

5

6

### Kafka Integration

7

8

#### ReadFromKafka

9

10

Read strings from Kafka topics and process them in Flink streaming jobs.

11

12

```java { .api }

13

/**

14

* Read strings from Kafka and print to standard output

15

* Demonstrates Kafka consumer integration with checkpointing

16

* @param args Command line arguments (--topic, --bootstrap.servers, --zookeeper.connect, --group.id)

17

*/

18

public class ReadFromKafka {

19

public static void main(String[] args) throws Exception;

20

}

21

```

22

23

**Usage Example:**

24

25

```bash

26

# Run Kafka consumer example

27

java -cp flink-examples-streaming_2.10-1.3.3.jar \

28

org.apache.flink.streaming.examples.kafka.ReadFromKafka \

29

--topic test-topic \

30

--bootstrap.servers localhost:9092 \

31

--zookeeper.connect localhost:2181 \

32

--group.id flink-consumer-group

33

```

34

35

#### WriteIntoKafka

36

37

Write processed data to Kafka topics with exactly-once semantics and continuous data generation.

38

39

```java { .api }

40

/**

41

* Write data into Kafka topics with continuous data generation

42

* Demonstrates Kafka producer integration with fault tolerance

43

* @param args Command line arguments (--topic, --bootstrap.servers)

44

*/

45

public class WriteIntoKafka {

46

public static void main(String[] args) throws Exception;

47

}

48

```

49

50

**Usage Example:**

51

52

```bash

53

# Run Kafka producer example

54

java -cp flink-examples-streaming_2.10-1.3.3.jar \

55

org.apache.flink.streaming.examples.kafka.WriteIntoKafka \

56

--topic test-topic \

57

--bootstrap.servers localhost:9092

58

```

59

60

### Twitter Integration

61

62

#### TwitterExample

63

64

Real-time Twitter stream processing and analysis with trending hashtag detection.

65

66

```java { .api }

67

/**

68

* Real-time Twitter stream processing

69

* Analyzes tweet streams for trending hashtags and user activity

70

* @param args Command line arguments (Twitter API credentials)

71

*/

72

public class TwitterExample {

73

public static void main(String[] args) throws Exception;

74

}

75

```

76

77

**Usage Example:**

78

79

```bash

80

# Run Twitter streaming example (requires Twitter API credentials)

81

java -cp flink-examples-streaming_2.10-1.3.3.jar \

82

org.apache.flink.streaming.examples.twitter.TwitterExample \

83

--twitter-source.consumerKey YOUR_CONSUMER_KEY \

84

--twitter-source.consumerSecret YOUR_CONSUMER_SECRET \

85

--twitter-source.token YOUR_ACCESS_TOKEN \

86

--twitter-source.tokenSecret YOUR_ACCESS_TOKEN_SECRET

87

```

88

89

## Kafka Integration Patterns

90

91

### Consumer Configuration

92

93

```java

94

// Parameter validation for Kafka consumer

95

final ParameterTool parameterTool = ParameterTool.fromArgs(args);

96

97

if (parameterTool.getNumberOfParameters() < 4) {

98

System.out.println("Missing parameters! Usage: Kafka --topic <topic> " +

99

"--bootstrap.servers <kafka brokers> " +

100

"--zookeeper.connect <zk quorum> " +

101

"--group.id <consumer group>");

102

return;

103

}

104

105

// Configure execution environment

106

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

107

env.getConfig().disableSysoutLogging();

108

env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000));

109

env.enableCheckpointing(5000); // Checkpoint every 5 seconds

110

env.getConfig().setGlobalJobParameters(parameterTool);

111

```

112

113

### Kafka Source Setup

114

115

```java

116

// Create Kafka consumer source

117

DataStream<String> messageStream = env.addSource(

118

new FlinkKafkaConsumer08<>(

119

parameterTool.getRequired("topic"), // Topic name

120

new SimpleStringSchema(), // Deserialization schema

121

parameterTool.getProperties() // Kafka properties

122

)

123

);

124

125

// Process the stream

126

messageStream.print();

127

```

128

129

### Data Generation for Kafka Producer

130

131

```java

132

// Simple sequential data generator

133

DataStream<String> messageStream = env.addSource(new SourceFunction<String>() {

134

public boolean running = true;

135

136

@Override

137

public void run(SourceContext<String> ctx) throws Exception {

138

int counter = 0;

139

while (running) {

140

Thread.sleep(500); // 500ms intervals

141

ctx.collect("Element - " + counter);

142

counter++;

143

}

144

}

145

146

@Override

147

public void cancel() {

148

running = false;

149

}

150

});

151

152

// Send generated data to Kafka

153

messageStream.addSink(new FlinkKafkaProducer08<>(

154

parameterTool.getRequired("topic"),

155

new SimpleStringSchema(),

156

parameterTool.getProperties()));

157

```

158

159

### Kafka Producer Configuration

160

161

```java

162

Properties kafkaProps = new Properties();

163

kafkaProps.setProperty("bootstrap.servers", bootstrapServers);

164

kafkaProps.setProperty("group.id", consumerGroup);

165

166

// Configure producer for exactly-once semantics

167

FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>(

168

outputTopic, // Target topic

169

new SimpleStringSchema(), // Serialization schema

170

kafkaProps, // Producer properties

171

FlinkKafkaProducer.Semantic.EXACTLY_ONCE // Delivery guarantee

172

);

173

174

// Add producer to stream

175

processedStream.addSink(kafkaProducer);

176

```

177

178

## Twitter Integration Patterns

179

180

### Twitter Source Configuration

181

182

```java

183

// Twitter API credentials from parameters

184

Properties twitterProps = new Properties();

185

twitterProps.setProperty(TwitterSource.CONSUMER_KEY, params.get("twitter-source.consumerKey"));

186

twitterProps.setProperty(TwitterSource.CONSUMER_SECRET, params.get("twitter-source.consumerSecret"));

187

twitterProps.setProperty(TwitterSource.TOKEN, params.get("twitter-source.token"));

188

twitterProps.setProperty(TwitterSource.TOKEN_SECRET, params.get("twitter-source.tokenSecret"));

189

190

// Create Twitter source

191

DataStream<String> twitterStream = env.addSource(new TwitterSource(twitterProps));

192

```

193

194

### Tweet Processing Pipeline

195

196

```java

197

// Parse JSON tweets and extract hashtags

198

DataStream<Tuple2<String, Integer>> hashtagCounts = twitterStream

199

.flatMap(new HashtagExtractor())

200

.keyBy(0)

201

.timeWindow(Time.minutes(1))

202

.sum(1);

203

204

// Extract trending hashtags

205

DataStream<String> trendingHashtags = hashtagCounts

206

.timeWindowAll(Time.minutes(5))

207

.apply(new TopHashtagsFunction(10)); // Top 10 hashtags

208

```

209

210

### JSON Processing

211

212

```java

213

private static class HashtagExtractor implements FlatMapFunction<String, Tuple2<String, Integer>> {

214

@Override

215

public void flatMap(String tweetJson, Collector<Tuple2<String, Integer>> out) throws Exception {

216

try {

217

// Parse tweet JSON

218

ObjectMapper mapper = new ObjectMapper();

219

JsonNode tweet = mapper.readTree(tweetJson);

220

221

JsonNode entities = tweet.get("entities");

222

if (entities != null) {

223

JsonNode hashtags = entities.get("hashtags");

224

if (hashtags != null && hashtags.isArray()) {

225

for (JsonNode hashtag : hashtags) {

226

String tag = hashtag.get("text").asText();

227

out.collect(new Tuple2<>("#" + tag.toLowerCase(), 1));

228

}

229

}

230

}

231

} catch (Exception e) {

232

// Skip malformed tweets

233

}

234

}

235

}

236

```

237

238

## Error Handling and Fault Tolerance

239

240

### Restart Strategies

241

242

```java

243

// Fixed delay restart strategy

244

env.getConfig().setRestartStrategy(

245

RestartStrategies.fixedDelayRestart(

246

4, // Number of restart attempts

247

10000 // Delay between restarts (ms)

248

)

249

);

250

251

// Exponential backoff restart strategy

252

env.getConfig().setRestartStrategy(

253

RestartStrategies.exponentialDelayRestart(

254

Time.milliseconds(1000), // Initial delay

255

Time.milliseconds(60000), // Max delay

256

1.2 // Backoff multiplier

257

)

258

);

259

```

260

261

### Checkpointing Configuration

262

263

```java

264

// Enable checkpointing for fault tolerance

265

env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);

266

267

// Checkpoint configuration

268

CheckpointConfig checkpointConfig = env.getCheckpointConfig();

269

checkpointConfig.setMinPauseBetweenCheckpoints(1000);

270

checkpointConfig.setCheckpointTimeout(60000);

271

checkpointConfig.setMaxConcurrentCheckpoints(1);

272

checkpointConfig.enableExternalizedCheckpoints(

273

CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

274

```

275

276

### Connection Resilience

277

278

```java

279

// Configure connector resilience

280

Properties connectorProps = new Properties();

281

connectorProps.setProperty("flink.partition-discovery.interval-millis", "30000");

282

connectorProps.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

283

connectorProps.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

284

connectorProps.setProperty("auto.offset.reset", "latest");

285

connectorProps.setProperty("enable.auto.commit", "false");

286

```

287

288

## Data Serialization

289

290

### Simple String Schema

291

292

```java

293

// Basic string serialization/deserialization

294

SimpleStringSchema stringSchema = new SimpleStringSchema();

295

296

// For Kafka string messages

297

new FlinkKafkaConsumer08<>(topic, stringSchema, properties);

298

new FlinkKafkaProducer<>(topic, stringSchema, properties);

299

```

300

301

### JSON Schema

302

303

```java

304

// Custom JSON schema for complex objects

305

public class TweetSchema implements DeserializationSchema<Tweet> {

306

private ObjectMapper objectMapper = new ObjectMapper();

307

308

@Override

309

public Tweet deserialize(byte[] message) throws IOException {

310

return objectMapper.readValue(message, Tweet.class);

311

}

312

313

@Override

314

public boolean isEndOfStream(Tweet nextElement) {

315

return false;

316

}

317

318

@Override

319

public TypeInformation<Tweet> getProducedType() {

320

return TypeInformation.of(Tweet.class);

321

}

322

}

323

```

324

325

## Performance Tuning

326

327

### Parallelism Configuration

328

329

```java

330

// Set source parallelism

331

twitterStream.setParallelism(1); // Single Twitter connection

332

333

// Set processing parallelism

334

processedStream.setParallelism(4); // Parallel processing

335

336

// Set sink parallelism

337

kafkaSink.setParallelism(2); // Multiple Kafka producers

338

```

339

340

### Backpressure Handling

341

342

```java

343

// Configure buffering and batching

344

env.setBufferTimeout(100); // 100ms buffer timeout

345

346

// Kafka producer batching

347

kafkaProps.setProperty("batch.size", "16384");

348

kafkaProps.setProperty("linger.ms", "10");

349

kafkaProps.setProperty("buffer.memory", "33554432");

350

```

351

352

## External System Requirements

353

354

### Kafka Setup

355

- Apache Kafka 0.8+ cluster

356

- Zookeeper ensemble

357

- Configured topics with appropriate partitioning

358

- Network connectivity from Flink cluster

359

360

### Twitter API Setup

361

- Twitter Developer Account

362

- API credentials (Consumer Key/Secret, Access Token/Secret)

363

- Rate limit considerations (Twitter API limits)

364

365

## Dependencies

366

367

```xml

368

<!-- Kafka Connector -->

369

<dependency>

370

<groupId>org.apache.flink</groupId>

371

<artifactId>flink-connector-kafka-0.8_2.10</artifactId>

372

<version>1.3.3</version>

373

</dependency>

374

375

<!-- Twitter Connector -->

376

<dependency>

377

<groupId>org.apache.flink</groupId>

378

<artifactId>flink-connector-twitter_2.10</artifactId>

379

<version>1.3.3</version>

380

</dependency>

381

382

<!-- JSON Processing -->

383

<dependency>

384

<groupId>com.fasterxml.jackson.core</groupId>

385

<artifactId>jackson-databind</artifactId>

386

<version>2.8.8</version>

387

</dependency>

388

```

389

390

## Required Imports

391

392

### Kafka Integration

393

```java

394

import org.apache.flink.api.common.restartstrategy.RestartStrategies;

395

import org.apache.flink.streaming.api.datastream.DataStream;

396

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

397

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;

398

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;

399

import org.apache.flink.streaming.util.serialization.SimpleStringSchema;

400

```

401

402

### Twitter Integration

403

```java

404

import org.apache.flink.streaming.connectors.twitter.TwitterSource;

405

import org.apache.flink.api.common.functions.FlatMapFunction;

406

import org.apache.flink.streaming.api.windowing.time.Time;

407

import com.fasterxml.jackson.databind.JsonNode;

408

import com.fasterxml.jackson.databind.ObjectMapper;

409

```