CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-examples-streaming-2-10

Apache Flink streaming examples demonstrating various stream processing patterns and use cases

Pending
Overview
Eval results
Files

external-systems.mddocs/

External System Integration

Integration examples for Kafka, Twitter, and other external data sources with fault-tolerant connectors. Demonstrates real-world stream processing with external system integration patterns.

Capabilities

Kafka Integration

ReadFromKafka

Read strings from Kafka topics and process them in Flink streaming jobs.

/**
 * Read strings from Kafka and print to standard output
 * Demonstrates Kafka consumer integration with checkpointing
 * @param args Command line arguments (--topic, --bootstrap.servers, --zookeeper.connect, --group.id)
 */
public class ReadFromKafka {
    public static void main(String[] args) throws Exception;
}

Usage Example:

# Run Kafka consumer example
java -cp flink-examples-streaming_2.10-1.3.3.jar \
  org.apache.flink.streaming.examples.kafka.ReadFromKafka \
  --topic test-topic \
  --bootstrap.servers localhost:9092 \
  --zookeeper.connect localhost:2181 \
  --group.id flink-consumer-group

WriteIntoKafka

Write processed data to Kafka topics with exactly-once semantics and continuous data generation.

/**
 * Write data into Kafka topics with continuous data generation
 * Demonstrates Kafka producer integration with fault tolerance
 * @param args Command line arguments (--topic, --bootstrap.servers)
 */
public class WriteIntoKafka {
    public static void main(String[] args) throws Exception;
}

Usage Example:

# Run Kafka producer example
java -cp flink-examples-streaming_2.10-1.3.3.jar \
  org.apache.flink.streaming.examples.kafka.WriteIntoKafka \
  --topic test-topic \
  --bootstrap.servers localhost:9092

Twitter Integration

TwitterExample

Real-time Twitter stream processing and analysis with trending hashtag detection.

/**
 * Real-time Twitter stream processing
 * Analyzes tweet streams for trending hashtags and user activity
 * @param args Command line arguments (Twitter API credentials)
 */
public class TwitterExample {
    public static void main(String[] args) throws Exception;
}

Usage Example:

# Run Twitter streaming example (requires Twitter API credentials)
java -cp flink-examples-streaming_2.10-1.3.3.jar \
  org.apache.flink.streaming.examples.twitter.TwitterExample \
  --twitter-source.consumerKey YOUR_CONSUMER_KEY \
  --twitter-source.consumerSecret YOUR_CONSUMER_SECRET \
  --twitter-source.token YOUR_ACCESS_TOKEN \
  --twitter-source.tokenSecret YOUR_ACCESS_TOKEN_SECRET

Kafka Integration Patterns

Consumer Configuration

// Parameter validation for Kafka consumer
final ParameterTool parameterTool = ParameterTool.fromArgs(args);

if (parameterTool.getNumberOfParameters() < 4) {
    System.out.println("Missing parameters! Usage: Kafka --topic <topic> " +
        "--bootstrap.servers <kafka brokers> " +
        "--zookeeper.connect <zk quorum> " +
        "--group.id <consumer group>");
    return;
}

// Configure execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().disableSysoutLogging();
env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000));
env.enableCheckpointing(5000); // Checkpoint every 5 seconds
env.getConfig().setGlobalJobParameters(parameterTool);

Kafka Source Setup

// Create Kafka consumer source
DataStream<String> messageStream = env.addSource(
    new FlinkKafkaConsumer08<>(
        parameterTool.getRequired("topic"),           // Topic name
        new SimpleStringSchema(),                     // Deserialization schema
        parameterTool.getProperties()                 // Kafka properties
    )
);

// Process the stream
messageStream.print();

Data Generation for Kafka Producer

// Simple sequential data generator
DataStream<String> messageStream = env.addSource(new SourceFunction<String>() {
    public boolean running = true;
    
    @Override
    public void run(SourceContext<String> ctx) throws Exception {
        int counter = 0;
        while (running) {
            Thread.sleep(500); // 500ms intervals
            ctx.collect("Element - " + counter);
            counter++;
        }
    }
    
    @Override
    public void cancel() {
        running = false;
    }
});

// Send generated data to Kafka
messageStream.addSink(new FlinkKafkaProducer08<>(
    parameterTool.getRequired("topic"), 
    new SimpleStringSchema(), 
    parameterTool.getProperties()));

Kafka Producer Configuration

Properties kafkaProps = new Properties();
kafkaProps.setProperty("bootstrap.servers", bootstrapServers);
kafkaProps.setProperty("group.id", consumerGroup);

// Configure producer for exactly-once semantics
FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>(
    outputTopic,                    // Target topic
    new SimpleStringSchema(),       // Serialization schema
    kafkaProps,                     // Producer properties
    FlinkKafkaProducer.Semantic.EXACTLY_ONCE // Delivery guarantee
);

// Add producer to stream
processedStream.addSink(kafkaProducer);

Twitter Integration Patterns

Twitter Source Configuration

// Twitter API credentials from parameters
Properties twitterProps = new Properties();
twitterProps.setProperty(TwitterSource.CONSUMER_KEY, params.get("twitter-source.consumerKey"));
twitterProps.setProperty(TwitterSource.CONSUMER_SECRET, params.get("twitter-source.consumerSecret"));
twitterProps.setProperty(TwitterSource.TOKEN, params.get("twitter-source.token"));
twitterProps.setProperty(TwitterSource.TOKEN_SECRET, params.get("twitter-source.tokenSecret"));

// Create Twitter source
DataStream<String> twitterStream = env.addSource(new TwitterSource(twitterProps));

Tweet Processing Pipeline

// Parse JSON tweets and extract hashtags
DataStream<Tuple2<String, Integer>> hashtagCounts = twitterStream
    .flatMap(new HashtagExtractor())
    .keyBy(0)
    .timeWindow(Time.minutes(1))
    .sum(1);

// Extract trending hashtags
DataStream<String> trendingHashtags = hashtagCounts
    .timeWindowAll(Time.minutes(5))
    .apply(new TopHashtagsFunction(10)); // Top 10 hashtags

JSON Processing

private static class HashtagExtractor implements FlatMapFunction<String, Tuple2<String, Integer>> {
    @Override
    public void flatMap(String tweetJson, Collector<Tuple2<String, Integer>> out) throws Exception {
        try {
            // Parse tweet JSON
            ObjectMapper mapper = new ObjectMapper();
            JsonNode tweet = mapper.readTree(tweetJson);
            
            JsonNode entities = tweet.get("entities");
            if (entities != null) {
                JsonNode hashtags = entities.get("hashtags");
                if (hashtags != null && hashtags.isArray()) {
                    for (JsonNode hashtag : hashtags) {
                        String tag = hashtag.get("text").asText();
                        out.collect(new Tuple2<>("#" + tag.toLowerCase(), 1));
                    }
                }
            }
        } catch (Exception e) {
            // Skip malformed tweets
        }
    }
}

Error Handling and Fault Tolerance

Restart Strategies

// Fixed delay restart strategy
env.getConfig().setRestartStrategy(
    RestartStrategies.fixedDelayRestart(
        4,      // Number of restart attempts
        10000   // Delay between restarts (ms)
    )
);

// Exponential backoff restart strategy
env.getConfig().setRestartStrategy(
    RestartStrategies.exponentialDelayRestart(
        Time.milliseconds(1000),  // Initial delay
        Time.milliseconds(60000), // Max delay
        1.2                       // Backoff multiplier
    )
);

Checkpointing Configuration

// Enable checkpointing for fault tolerance
env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);

// Checkpoint configuration
CheckpointConfig checkpointConfig = env.getCheckpointConfig();
checkpointConfig.setMinPauseBetweenCheckpoints(1000);
checkpointConfig.setCheckpointTimeout(60000);
checkpointConfig.setMaxConcurrentCheckpoints(1);
checkpointConfig.enableExternalizedCheckpoints(
    CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

Connection Resilience

// Configure connector resilience
Properties connectorProps = new Properties();
connectorProps.setProperty("flink.partition-discovery.interval-millis", "30000");
connectorProps.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
connectorProps.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
connectorProps.setProperty("auto.offset.reset", "latest");
connectorProps.setProperty("enable.auto.commit", "false");

Data Serialization

Simple String Schema

// Basic string serialization/deserialization
SimpleStringSchema stringSchema = new SimpleStringSchema();

// For Kafka string messages
new FlinkKafkaConsumer08<>(topic, stringSchema, properties);
new FlinkKafkaProducer<>(topic, stringSchema, properties);

JSON Schema

// Custom JSON schema for complex objects
public class TweetSchema implements DeserializationSchema<Tweet> {
    private ObjectMapper objectMapper = new ObjectMapper();
    
    @Override
    public Tweet deserialize(byte[] message) throws IOException {
        return objectMapper.readValue(message, Tweet.class);
    }
    
    @Override
    public boolean isEndOfStream(Tweet nextElement) {
        return false;
    }
    
    @Override
    public TypeInformation<Tweet> getProducedType() {
        return TypeInformation.of(Tweet.class);
    }
}

Performance Tuning

Parallelism Configuration

// Set source parallelism
twitterStream.setParallelism(1); // Single Twitter connection

// Set processing parallelism  
processedStream.setParallelism(4); // Parallel processing

// Set sink parallelism
kafkaSink.setParallelism(2); // Multiple Kafka producers

Backpressure Handling

// Configure buffering and batching
env.setBufferTimeout(100); // 100ms buffer timeout

// Kafka producer batching
kafkaProps.setProperty("batch.size", "16384");
kafkaProps.setProperty("linger.ms", "10");
kafkaProps.setProperty("buffer.memory", "33554432");

External System Requirements

Kafka Setup

  • Apache Kafka 0.8+ cluster
  • Zookeeper ensemble
  • Configured topics with appropriate partitioning
  • Network connectivity from Flink cluster

Twitter API Setup

  • Twitter Developer Account
  • API credentials (Consumer Key/Secret, Access Token/Secret)
  • Rate limit considerations (Twitter API limits)

Dependencies

<!-- Kafka Connector -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka-0.8_2.10</artifactId>
    <version>1.3.3</version>
</dependency>

<!-- Twitter Connector -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-twitter_2.10</artifactId>
    <version>1.3.3</version>
</dependency>

<!-- JSON Processing -->
<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
    <version>2.8.8</version>
</dependency>

Required Imports

Kafka Integration

import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;

Twitter Integration

import org.apache.flink.streaming.connectors.twitter.TwitterSource;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-examples-streaming-2-10

docs

async.md

external-systems.md

index.md

iteration.md

joins.md

machine-learning.md

side-output.md

socket.md

utilities.md

windowing.md

wordcount.md

tile.json