0
# External System Integration
1
2
Integration examples for Kafka, Twitter, and other external data sources with fault-tolerant connectors. Demonstrates real-world stream processing with external system integration patterns.
3
4
## Capabilities
5
6
### Kafka Integration
7
8
#### ReadFromKafka
9
10
Read strings from Kafka topics and process them in Flink streaming jobs.
11
12
```java { .api }
13
/**
14
* Read strings from Kafka and print to standard output
15
* Demonstrates Kafka consumer integration with checkpointing
16
* @param args Command line arguments (--topic, --bootstrap.servers, --zookeeper.connect, --group.id)
17
*/
18
public class ReadFromKafka {
19
public static void main(String[] args) throws Exception;
20
}
21
```
22
23
**Usage Example:**
24
25
```bash
26
# Run Kafka consumer example
27
java -cp flink-examples-streaming_2.10-1.3.3.jar \
28
org.apache.flink.streaming.examples.kafka.ReadFromKafka \
29
--topic test-topic \
30
--bootstrap.servers localhost:9092 \
31
--zookeeper.connect localhost:2181 \
32
--group.id flink-consumer-group
33
```
34
35
#### WriteIntoKafka
36
37
Write processed data to Kafka topics with exactly-once semantics and continuous data generation.
38
39
```java { .api }
40
/**
41
* Write data into Kafka topics with continuous data generation
42
* Demonstrates Kafka producer integration with fault tolerance
43
* @param args Command line arguments (--topic, --bootstrap.servers)
44
*/
45
public class WriteIntoKafka {
46
public static void main(String[] args) throws Exception;
47
}
48
```
49
50
**Usage Example:**
51
52
```bash
53
# Run Kafka producer example
54
java -cp flink-examples-streaming_2.10-1.3.3.jar \
55
org.apache.flink.streaming.examples.kafka.WriteIntoKafka \
56
--topic test-topic \
57
--bootstrap.servers localhost:9092
58
```
59
60
### Twitter Integration
61
62
#### TwitterExample
63
64
Real-time Twitter stream processing and analysis with trending hashtag detection.
65
66
```java { .api }
67
/**
68
* Real-time Twitter stream processing
69
* Analyzes tweet streams for trending hashtags and user activity
70
* @param args Command line arguments (Twitter API credentials)
71
*/
72
public class TwitterExample {
73
public static void main(String[] args) throws Exception;
74
}
75
```
76
77
**Usage Example:**
78
79
```bash
80
# Run Twitter streaming example (requires Twitter API credentials)
81
java -cp flink-examples-streaming_2.10-1.3.3.jar \
82
org.apache.flink.streaming.examples.twitter.TwitterExample \
83
--twitter-source.consumerKey YOUR_CONSUMER_KEY \
84
--twitter-source.consumerSecret YOUR_CONSUMER_SECRET \
85
--twitter-source.token YOUR_ACCESS_TOKEN \
86
--twitter-source.tokenSecret YOUR_ACCESS_TOKEN_SECRET
87
```
88
89
## Kafka Integration Patterns
90
91
### Consumer Configuration
92
93
```java
94
// Parameter validation for Kafka consumer
95
final ParameterTool parameterTool = ParameterTool.fromArgs(args);
96
97
if (parameterTool.getNumberOfParameters() < 4) {
98
System.out.println("Missing parameters! Usage: Kafka --topic <topic> " +
99
"--bootstrap.servers <kafka brokers> " +
100
"--zookeeper.connect <zk quorum> " +
101
"--group.id <consumer group>");
102
return;
103
}
104
105
// Configure execution environment
106
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
107
env.getConfig().disableSysoutLogging();
108
env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000));
109
env.enableCheckpointing(5000); // Checkpoint every 5 seconds
110
env.getConfig().setGlobalJobParameters(parameterTool);
111
```
112
113
### Kafka Source Setup
114
115
```java
116
// Create Kafka consumer source
117
DataStream<String> messageStream = env.addSource(
118
new FlinkKafkaConsumer08<>(
119
parameterTool.getRequired("topic"), // Topic name
120
new SimpleStringSchema(), // Deserialization schema
121
parameterTool.getProperties() // Kafka properties
122
)
123
);
124
125
// Process the stream
126
messageStream.print();
127
```
128
129
### Data Generation for Kafka Producer
130
131
```java
132
// Simple sequential data generator
133
DataStream<String> messageStream = env.addSource(new SourceFunction<String>() {
134
public boolean running = true;
135
136
@Override
137
public void run(SourceContext<String> ctx) throws Exception {
138
int counter = 0;
139
while (running) {
140
Thread.sleep(500); // 500ms intervals
141
ctx.collect("Element - " + counter);
142
counter++;
143
}
144
}
145
146
@Override
147
public void cancel() {
148
running = false;
149
}
150
});
151
152
// Send generated data to Kafka
153
messageStream.addSink(new FlinkKafkaProducer08<>(
154
parameterTool.getRequired("topic"),
155
new SimpleStringSchema(),
156
parameterTool.getProperties()));
157
```
158
159
### Kafka Producer Configuration
160
161
```java
162
Properties kafkaProps = new Properties();
163
kafkaProps.setProperty("bootstrap.servers", bootstrapServers);
164
kafkaProps.setProperty("group.id", consumerGroup);
165
166
// Configure producer for exactly-once semantics
167
FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>(
168
outputTopic, // Target topic
169
new SimpleStringSchema(), // Serialization schema
170
kafkaProps, // Producer properties
171
FlinkKafkaProducer.Semantic.EXACTLY_ONCE // Delivery guarantee
172
);
173
174
// Add producer to stream
175
processedStream.addSink(kafkaProducer);
176
```
177
178
## Twitter Integration Patterns
179
180
### Twitter Source Configuration
181
182
```java
183
// Twitter API credentials from parameters
184
Properties twitterProps = new Properties();
185
twitterProps.setProperty(TwitterSource.CONSUMER_KEY, params.get("twitter-source.consumerKey"));
186
twitterProps.setProperty(TwitterSource.CONSUMER_SECRET, params.get("twitter-source.consumerSecret"));
187
twitterProps.setProperty(TwitterSource.TOKEN, params.get("twitter-source.token"));
188
twitterProps.setProperty(TwitterSource.TOKEN_SECRET, params.get("twitter-source.tokenSecret"));
189
190
// Create Twitter source
191
DataStream<String> twitterStream = env.addSource(new TwitterSource(twitterProps));
192
```
193
194
### Tweet Processing Pipeline
195
196
```java
197
// Parse JSON tweets and extract hashtags
198
DataStream<Tuple2<String, Integer>> hashtagCounts = twitterStream
199
.flatMap(new HashtagExtractor())
200
.keyBy(0)
201
.timeWindow(Time.minutes(1))
202
.sum(1);
203
204
// Extract trending hashtags
205
DataStream<String> trendingHashtags = hashtagCounts
206
.timeWindowAll(Time.minutes(5))
207
.apply(new TopHashtagsFunction(10)); // Top 10 hashtags
208
```
209
210
### JSON Processing
211
212
```java
213
private static class HashtagExtractor implements FlatMapFunction<String, Tuple2<String, Integer>> {
214
@Override
215
public void flatMap(String tweetJson, Collector<Tuple2<String, Integer>> out) throws Exception {
216
try {
217
// Parse tweet JSON
218
ObjectMapper mapper = new ObjectMapper();
219
JsonNode tweet = mapper.readTree(tweetJson);
220
221
JsonNode entities = tweet.get("entities");
222
if (entities != null) {
223
JsonNode hashtags = entities.get("hashtags");
224
if (hashtags != null && hashtags.isArray()) {
225
for (JsonNode hashtag : hashtags) {
226
String tag = hashtag.get("text").asText();
227
out.collect(new Tuple2<>("#" + tag.toLowerCase(), 1));
228
}
229
}
230
}
231
} catch (Exception e) {
232
// Skip malformed tweets
233
}
234
}
235
}
236
```
237
238
## Error Handling and Fault Tolerance
239
240
### Restart Strategies
241
242
```java
243
// Fixed delay restart strategy
244
env.getConfig().setRestartStrategy(
245
RestartStrategies.fixedDelayRestart(
246
4, // Number of restart attempts
247
10000 // Delay between restarts (ms)
248
)
249
);
250
251
// Exponential backoff restart strategy
252
env.getConfig().setRestartStrategy(
253
RestartStrategies.exponentialDelayRestart(
254
Time.milliseconds(1000), // Initial delay
255
Time.milliseconds(60000), // Max delay
256
1.2 // Backoff multiplier
257
)
258
);
259
```
260
261
### Checkpointing Configuration
262
263
```java
264
// Enable checkpointing for fault tolerance
265
env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
266
267
// Checkpoint configuration
268
CheckpointConfig checkpointConfig = env.getCheckpointConfig();
269
checkpointConfig.setMinPauseBetweenCheckpoints(1000);
270
checkpointConfig.setCheckpointTimeout(60000);
271
checkpointConfig.setMaxConcurrentCheckpoints(1);
272
checkpointConfig.enableExternalizedCheckpoints(
273
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
274
```
275
276
### Connection Resilience
277
278
```java
279
// Configure connector resilience
280
Properties connectorProps = new Properties();
281
connectorProps.setProperty("flink.partition-discovery.interval-millis", "30000");
282
connectorProps.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
283
connectorProps.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
284
connectorProps.setProperty("auto.offset.reset", "latest");
285
connectorProps.setProperty("enable.auto.commit", "false");
286
```
287
288
## Data Serialization
289
290
### Simple String Schema
291
292
```java
293
// Basic string serialization/deserialization
294
SimpleStringSchema stringSchema = new SimpleStringSchema();
295
296
// For Kafka string messages
297
new FlinkKafkaConsumer08<>(topic, stringSchema, properties);
298
new FlinkKafkaProducer<>(topic, stringSchema, properties);
299
```
300
301
### JSON Schema
302
303
```java
304
// Custom JSON schema for complex objects
305
public class TweetSchema implements DeserializationSchema<Tweet> {
306
private ObjectMapper objectMapper = new ObjectMapper();
307
308
@Override
309
public Tweet deserialize(byte[] message) throws IOException {
310
return objectMapper.readValue(message, Tweet.class);
311
}
312
313
@Override
314
public boolean isEndOfStream(Tweet nextElement) {
315
return false;
316
}
317
318
@Override
319
public TypeInformation<Tweet> getProducedType() {
320
return TypeInformation.of(Tweet.class);
321
}
322
}
323
```
324
325
## Performance Tuning
326
327
### Parallelism Configuration
328
329
```java
330
// Set source parallelism
331
twitterStream.setParallelism(1); // Single Twitter connection
332
333
// Set processing parallelism
334
processedStream.setParallelism(4); // Parallel processing
335
336
// Set sink parallelism
337
kafkaSink.setParallelism(2); // Multiple Kafka producers
338
```
339
340
### Backpressure Handling
341
342
```java
343
// Configure buffering and batching
344
env.setBufferTimeout(100); // 100ms buffer timeout
345
346
// Kafka producer batching
347
kafkaProps.setProperty("batch.size", "16384");
348
kafkaProps.setProperty("linger.ms", "10");
349
kafkaProps.setProperty("buffer.memory", "33554432");
350
```
351
352
## External System Requirements
353
354
### Kafka Setup
355
- Apache Kafka 0.8+ cluster
356
- Zookeeper ensemble
357
- Configured topics with appropriate partitioning
358
- Network connectivity from Flink cluster
359
360
### Twitter API Setup
361
- Twitter Developer Account
362
- API credentials (Consumer Key/Secret, Access Token/Secret)
363
- Rate limit considerations (Twitter API limits)
364
365
## Dependencies
366
367
```xml
368
<!-- Kafka Connector -->
369
<dependency>
370
<groupId>org.apache.flink</groupId>
371
<artifactId>flink-connector-kafka-0.8_2.10</artifactId>
372
<version>1.3.3</version>
373
</dependency>
374
375
<!-- Twitter Connector -->
376
<dependency>
377
<groupId>org.apache.flink</groupId>
378
<artifactId>flink-connector-twitter_2.10</artifactId>
379
<version>1.3.3</version>
380
</dependency>
381
382
<!-- JSON Processing -->
383
<dependency>
384
<groupId>com.fasterxml.jackson.core</groupId>
385
<artifactId>jackson-databind</artifactId>
386
<version>2.8.8</version>
387
</dependency>
388
```
389
390
## Required Imports
391
392
### Kafka Integration
393
```java
394
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
395
import org.apache.flink.streaming.api.datastream.DataStream;
396
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
397
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
398
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
399
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
400
```
401
402
### Twitter Integration
403
```java
404
import org.apache.flink.streaming.connectors.twitter.TwitterSource;
405
import org.apache.flink.api.common.functions.FlatMapFunction;
406
import org.apache.flink.streaming.api.windowing.time.Time;
407
import com.fasterxml.jackson.databind.JsonNode;
408
import com.fasterxml.jackson.databind.ObjectMapper;
409
```