Apache Flink streaming examples demonstrating various stream processing patterns and use cases
—
Integration examples for Kafka, Twitter, and other external data sources with fault-tolerant connectors. Demonstrates real-world stream processing with external system integration patterns.
Read strings from Kafka topics and process them in Flink streaming jobs.
/**
* Read strings from Kafka and print to standard output
* Demonstrates Kafka consumer integration with checkpointing
* @param args Command line arguments (--topic, --bootstrap.servers, --zookeeper.connect, --group.id)
*/
public class ReadFromKafka {
public static void main(String[] args) throws Exception;
}Usage Example:
# Run Kafka consumer example
java -cp flink-examples-streaming_2.10-1.3.3.jar \
org.apache.flink.streaming.examples.kafka.ReadFromKafka \
--topic test-topic \
--bootstrap.servers localhost:9092 \
--zookeeper.connect localhost:2181 \
--group.id flink-consumer-groupWrite processed data to Kafka topics with exactly-once semantics and continuous data generation.
/**
* Write data into Kafka topics with continuous data generation
* Demonstrates Kafka producer integration with fault tolerance
* @param args Command line arguments (--topic, --bootstrap.servers)
*/
public class WriteIntoKafka {
public static void main(String[] args) throws Exception;
}Usage Example:
# Run Kafka producer example
java -cp flink-examples-streaming_2.10-1.3.3.jar \
org.apache.flink.streaming.examples.kafka.WriteIntoKafka \
--topic test-topic \
--bootstrap.servers localhost:9092Real-time Twitter stream processing and analysis with trending hashtag detection.
/**
* Real-time Twitter stream processing
* Analyzes tweet streams for trending hashtags and user activity
* @param args Command line arguments (Twitter API credentials)
*/
public class TwitterExample {
public static void main(String[] args) throws Exception;
}Usage Example:
# Run Twitter streaming example (requires Twitter API credentials)
java -cp flink-examples-streaming_2.10-1.3.3.jar \
org.apache.flink.streaming.examples.twitter.TwitterExample \
--twitter-source.consumerKey YOUR_CONSUMER_KEY \
--twitter-source.consumerSecret YOUR_CONSUMER_SECRET \
--twitter-source.token YOUR_ACCESS_TOKEN \
--twitter-source.tokenSecret YOUR_ACCESS_TOKEN_SECRET// Parameter validation for Kafka consumer
final ParameterTool parameterTool = ParameterTool.fromArgs(args);
if (parameterTool.getNumberOfParameters() < 4) {
System.out.println("Missing parameters! Usage: Kafka --topic <topic> " +
"--bootstrap.servers <kafka brokers> " +
"--zookeeper.connect <zk quorum> " +
"--group.id <consumer group>");
return;
}
// Configure execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().disableSysoutLogging();
env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000));
env.enableCheckpointing(5000); // Checkpoint every 5 seconds
env.getConfig().setGlobalJobParameters(parameterTool);// Create Kafka consumer source
DataStream<String> messageStream = env.addSource(
new FlinkKafkaConsumer08<>(
parameterTool.getRequired("topic"), // Topic name
new SimpleStringSchema(), // Deserialization schema
parameterTool.getProperties() // Kafka properties
)
);
// Process the stream
messageStream.print();// Simple sequential data generator
DataStream<String> messageStream = env.addSource(new SourceFunction<String>() {
public boolean running = true;
@Override
public void run(SourceContext<String> ctx) throws Exception {
int counter = 0;
while (running) {
Thread.sleep(500); // 500ms intervals
ctx.collect("Element - " + counter);
counter++;
}
}
@Override
public void cancel() {
running = false;
}
});
// Send generated data to Kafka
messageStream.addSink(new FlinkKafkaProducer08<>(
parameterTool.getRequired("topic"),
new SimpleStringSchema(),
parameterTool.getProperties()));Properties kafkaProps = new Properties();
kafkaProps.setProperty("bootstrap.servers", bootstrapServers);
kafkaProps.setProperty("group.id", consumerGroup);
// Configure producer for exactly-once semantics
FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>(
outputTopic, // Target topic
new SimpleStringSchema(), // Serialization schema
kafkaProps, // Producer properties
FlinkKafkaProducer.Semantic.EXACTLY_ONCE // Delivery guarantee
);
// Add producer to stream
processedStream.addSink(kafkaProducer);// Twitter API credentials from parameters
Properties twitterProps = new Properties();
twitterProps.setProperty(TwitterSource.CONSUMER_KEY, params.get("twitter-source.consumerKey"));
twitterProps.setProperty(TwitterSource.CONSUMER_SECRET, params.get("twitter-source.consumerSecret"));
twitterProps.setProperty(TwitterSource.TOKEN, params.get("twitter-source.token"));
twitterProps.setProperty(TwitterSource.TOKEN_SECRET, params.get("twitter-source.tokenSecret"));
// Create Twitter source
DataStream<String> twitterStream = env.addSource(new TwitterSource(twitterProps));// Parse JSON tweets and extract hashtags
DataStream<Tuple2<String, Integer>> hashtagCounts = twitterStream
.flatMap(new HashtagExtractor())
.keyBy(0)
.timeWindow(Time.minutes(1))
.sum(1);
// Extract trending hashtags
DataStream<String> trendingHashtags = hashtagCounts
.timeWindowAll(Time.minutes(5))
.apply(new TopHashtagsFunction(10)); // Top 10 hashtagsprivate static class HashtagExtractor implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String tweetJson, Collector<Tuple2<String, Integer>> out) throws Exception {
try {
// Parse tweet JSON
ObjectMapper mapper = new ObjectMapper();
JsonNode tweet = mapper.readTree(tweetJson);
JsonNode entities = tweet.get("entities");
if (entities != null) {
JsonNode hashtags = entities.get("hashtags");
if (hashtags != null && hashtags.isArray()) {
for (JsonNode hashtag : hashtags) {
String tag = hashtag.get("text").asText();
out.collect(new Tuple2<>("#" + tag.toLowerCase(), 1));
}
}
}
} catch (Exception e) {
// Skip malformed tweets
}
}
}// Fixed delay restart strategy
env.getConfig().setRestartStrategy(
RestartStrategies.fixedDelayRestart(
4, // Number of restart attempts
10000 // Delay between restarts (ms)
)
);
// Exponential backoff restart strategy
env.getConfig().setRestartStrategy(
RestartStrategies.exponentialDelayRestart(
Time.milliseconds(1000), // Initial delay
Time.milliseconds(60000), // Max delay
1.2 // Backoff multiplier
)
);// Enable checkpointing for fault tolerance
env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
// Checkpoint configuration
CheckpointConfig checkpointConfig = env.getCheckpointConfig();
checkpointConfig.setMinPauseBetweenCheckpoints(1000);
checkpointConfig.setCheckpointTimeout(60000);
checkpointConfig.setMaxConcurrentCheckpoints(1);
checkpointConfig.enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);// Configure connector resilience
Properties connectorProps = new Properties();
connectorProps.setProperty("flink.partition-discovery.interval-millis", "30000");
connectorProps.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
connectorProps.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
connectorProps.setProperty("auto.offset.reset", "latest");
connectorProps.setProperty("enable.auto.commit", "false");// Basic string serialization/deserialization
SimpleStringSchema stringSchema = new SimpleStringSchema();
// For Kafka string messages
new FlinkKafkaConsumer08<>(topic, stringSchema, properties);
new FlinkKafkaProducer<>(topic, stringSchema, properties);// Custom JSON schema for complex objects
public class TweetSchema implements DeserializationSchema<Tweet> {
private ObjectMapper objectMapper = new ObjectMapper();
@Override
public Tweet deserialize(byte[] message) throws IOException {
return objectMapper.readValue(message, Tweet.class);
}
@Override
public boolean isEndOfStream(Tweet nextElement) {
return false;
}
@Override
public TypeInformation<Tweet> getProducedType() {
return TypeInformation.of(Tweet.class);
}
}// Set source parallelism
twitterStream.setParallelism(1); // Single Twitter connection
// Set processing parallelism
processedStream.setParallelism(4); // Parallel processing
// Set sink parallelism
kafkaSink.setParallelism(2); // Multiple Kafka producers// Configure buffering and batching
env.setBufferTimeout(100); // 100ms buffer timeout
// Kafka producer batching
kafkaProps.setProperty("batch.size", "16384");
kafkaProps.setProperty("linger.ms", "10");
kafkaProps.setProperty("buffer.memory", "33554432");<!-- Kafka Connector -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.8_2.10</artifactId>
<version>1.3.3</version>
</dependency>
<!-- Twitter Connector -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-twitter_2.10</artifactId>
<version>1.3.3</version>
</dependency>
<!-- JSON Processing -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.8.8</version>
</dependency>import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;import org.apache.flink.streaming.connectors.twitter.TwitterSource;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-examples-streaming-2-10