Apache Flink Twitter connector for streaming data from Twitter API with OAuth authentication
npx @tessl/cli install tessl/maven-org-apache-flink--flink-connector-twitter-2-12@1.14.0Apache Flink Twitter connector provides streaming data ingestion from the Twitter API. It implements a RichSourceFunction that emits tweets as strings, using Twitter's Hosebird Client (HBC) library for reliable streaming connections with OAuth1 authentication and rate limiting support.
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-twitter_2.12</artifactId>
<version>1.14.6</version>
</dependency>import org.apache.flink.streaming.connectors.twitter.TwitterSource;
import org.apache.flink.streaming.connectors.twitter.TwitterSource.EndpointInitializer;
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
import org.apache.flink.configuration.Configuration;
import com.twitter.hbc.core.endpoint.StreamingEndpoint;
import java.util.Properties;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.twitter.TwitterSource;
import java.util.Properties;
// Configure Twitter credentials
Properties props = new Properties();
props.setProperty(TwitterSource.CONSUMER_KEY, "your-consumer-key");
props.setProperty(TwitterSource.CONSUMER_SECRET, "your-consumer-secret");
props.setProperty(TwitterSource.TOKEN, "your-access-token");
props.setProperty(TwitterSource.TOKEN_SECRET, "your-access-token-secret");
// Create and configure the source
TwitterSource twitterSource = new TwitterSource(props);
// Use in Flink streaming job
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.addSource(twitterSource)
.print();
env.execute("Twitter Stream Job");import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.twitter.TwitterSource;
import com.twitter.hbc.core.endpoint.StatusesFilterEndpoint;
import java.util.Arrays;
import java.util.Properties;
try {
// Configure properties
Properties props = new Properties();
props.setProperty(TwitterSource.CONSUMER_KEY, "your-consumer-key");
props.setProperty(TwitterSource.CONSUMER_SECRET, "your-consumer-secret");
props.setProperty(TwitterSource.TOKEN, "your-access-token");
props.setProperty(TwitterSource.TOKEN_SECRET, "your-access-token-secret");
props.setProperty(TwitterSource.CLIENT_NAME, "my-flink-app");
props.setProperty(TwitterSource.CLIENT_BUFFER_SIZE, "100000");
// Create source with custom endpoint
TwitterSource twitterSource = new TwitterSource(props);
twitterSource.setCustomEndpointInitializer(() -> {
StatusesFilterEndpoint endpoint = new StatusesFilterEndpoint();
endpoint.trackTerms(Arrays.asList("apache", "flink", "bigdata"));
return endpoint;
});
// Setup streaming environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(10000); // Enable checkpointing for fault tolerance
env.addSource(twitterSource)
.filter(tweet -> tweet.contains("apache"))
.print();
env.execute("Filtered Twitter Stream");
} catch (IllegalArgumentException e) {
System.err.println("Configuration error: " + e.getMessage());
} catch (Exception e) {
System.err.println("Execution error: " + e.getMessage());
}import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.twitter.TwitterSource;
public class TwitterStreamingJob {
public static void main(String[] args) throws Exception {
Properties props = loadTwitterProperties(); // Your property loading logic
TwitterSource twitterSource = new TwitterSource(props);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Configure proper resource management
env.enableCheckpointing(5000);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 10000));
env.addSource(twitterSource)
.name("Twitter Source")
.setParallelism(1) // TwitterSource is not parallelizable
.map(tweet -> processTweet(tweet))
.print();
env.execute("Robust Twitter Stream");
}
private static String processTweet(String rawTweet) {
// Your tweet processing logic
return rawTweet;
}
}Creates a Twitter streaming source with OAuth authentication and configurable properties. The TwitterSource extends RichSourceFunction<String> and implements Flink's SourceFunction interface.
/**
* Creates a TwitterSource for streaming tweets from Twitter API
* @param properties Configuration properties containing OAuth credentials and optional settings
* @throws IllegalArgumentException if required properties are missing
*/
public class TwitterSource extends RichSourceFunction<String> {
public TwitterSource(Properties properties);
}Required Properties:
twitter-source.consumerKey - Twitter application consumer keytwitter-source.consumerSecret - Twitter application consumer secrettwitter-source.token - Twitter access tokentwitter-source.tokenSecret - Twitter access token secretOptional Properties:
twitter-source.name - Client name (default: "flink-twitter-source")twitter-source.hosts - Twitter API hosts (default: Constants.STREAM_HOST from HBC library)twitter-source.bufferSize - Buffer size for reading (default: "50000")Core lifecycle methods inherited from RichSourceFunction for managing the Twitter streaming connection.
/**
* Initializes the source with configuration parameters
* @param parameters Configuration from Flink runtime
* @throws Exception if initialization fails
*/
public void open(Configuration parameters) throws Exception;
/**
* Main execution method that establishes Twitter connection and streams data
* @param ctx Source context for emitting collected tweets
* @throws Exception if connection or streaming fails
*/
public void run(SourceContext<String> ctx) throws Exception;
/**
* Closes the Twitter connection and releases resources
*/
public void close();
/**
* Cancels the source operation and stops streaming
*/
public void cancel();Allows customization of the Twitter streaming endpoint beyond the default sample stream.
/**
* Sets a custom endpoint initializer for the Twitter source
* @param initializer Custom endpoint initializer implementation
* @throws NullPointerException if initializer is null
*/
public void setCustomEndpointInitializer(EndpointInitializer initializer)Pre-defined property key constants for configuration.
// Required property keys
public static final String CONSUMER_KEY = "twitter-source.consumerKey";
public static final String CONSUMER_SECRET = "twitter-source.consumerSecret";
public static final String TOKEN = "twitter-source.token";
public static final String TOKEN_SECRET = "twitter-source.tokenSecret";
// Optional property keys
public static final String CLIENT_NAME = "twitter-source.name";
public static final String CLIENT_HOSTS = "twitter-source.hosts";
public static final String CLIENT_BUFFER_SIZE = "twitter-source.bufferSize";Interface for creating custom Twitter streaming endpoints to override the default sample endpoint behavior.
/**
* Interface for creating custom Twitter streaming endpoints
*/
public interface EndpointInitializer {
/**
* Creates and returns a Twitter streaming endpoint
* @return StreamingEndpoint instance for Twitter API connection
*/
StreamingEndpoint createEndpoint();
}Usage Example:
TwitterSource twitterSource = new TwitterSource(properties);
// Custom endpoint that filters tweets by keywords
twitterSource.setCustomEndpointInitializer(() -> {
StatusesFilterEndpoint endpoint = new StatusesFilterEndpoint();
endpoint.trackTerms(Arrays.asList("apache", "flink", "streaming"));
return endpoint;
});/**
* Base source function interface from Flink
*/
public abstract class RichSourceFunction<T> implements SourceFunction<T> {
// Lifecycle methods implemented by TwitterSource
}
/**
* Context for emitting data from sources
*/
public interface SourceContext<T> {
void collect(T element);
// Additional methods for checkpointing and timestamps
}
/**
* Flink configuration object
*/
public class Configuration {
// Configuration parameters passed to open() method
}/**
* Base interface for Twitter streaming endpoints from HBC library
*/
public interface StreamingEndpoint {
// Twitter endpoint configuration methods
}
/**
* Sample endpoint implementation (used by default)
*/
public class StatusesSampleEndpoint implements StreamingEndpoint {
public StatusesSampleEndpoint stallWarnings(boolean stallWarnings);
public StatusesSampleEndpoint delimited(boolean delimited);
}
/**
* Filter endpoint for tracking specific terms
*/
public class StatusesFilterEndpoint implements StreamingEndpoint {
public StatusesFilterEndpoint trackTerms(List<String> terms);
// Additional filtering methods
}The connector handles several types of errors:
IllegalArgumentException thrown during constructor if required properties (CONSUMER_KEY, CONSUMER_SECRET, TOKEN, TOKEN_SECRET) are missingNullPointerException thrown by setCustomEndpointInitializer() if initializer parameter is nullIOException and InterruptedException can be thrown during stream processing and are propagated to Flink's error handlingException Hierarchy:
// Constructor exceptions
public TwitterSource(Properties properties) throws IllegalArgumentException
// Lifecycle method exceptions
public void open(Configuration parameters) throws Exception
public void run(SourceContext<String> ctx) throws Exception
// Configuration method exceptions
public void setCustomEndpointInitializer(EndpointInitializer initializer) throws NullPointerException