Apache Flink Twitter connector for streaming data from Twitter API with OAuth authentication
—
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Pending
The risk profile of this skill
Apache Flink Twitter connector provides streaming data ingestion from the Twitter API. It implements a RichSourceFunction that emits tweets as strings, using Twitter's Hosebird Client (HBC) library for reliable streaming connections with OAuth1 authentication and rate limiting support.
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-twitter_2.12</artifactId>
<version>1.14.6</version>
</dependency>import org.apache.flink.streaming.connectors.twitter.TwitterSource;
import org.apache.flink.streaming.connectors.twitter.TwitterSource.EndpointInitializer;
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
import org.apache.flink.configuration.Configuration;
import com.twitter.hbc.core.endpoint.StreamingEndpoint;
import java.util.Properties;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.twitter.TwitterSource;
import java.util.Properties;
// Configure Twitter credentials
Properties props = new Properties();
props.setProperty(TwitterSource.CONSUMER_KEY, "your-consumer-key");
props.setProperty(TwitterSource.CONSUMER_SECRET, "your-consumer-secret");
props.setProperty(TwitterSource.TOKEN, "your-access-token");
props.setProperty(TwitterSource.TOKEN_SECRET, "your-access-token-secret");
// Create and configure the source
TwitterSource twitterSource = new TwitterSource(props);
// Use in Flink streaming job
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.addSource(twitterSource)
.print();
env.execute("Twitter Stream Job");import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.twitter.TwitterSource;
import com.twitter.hbc.core.endpoint.StatusesFilterEndpoint;
import java.util.Arrays;
import java.util.Properties;
try {
// Configure properties
Properties props = new Properties();
props.setProperty(TwitterSource.CONSUMER_KEY, "your-consumer-key");
props.setProperty(TwitterSource.CONSUMER_SECRET, "your-consumer-secret");
props.setProperty(TwitterSource.TOKEN, "your-access-token");
props.setProperty(TwitterSource.TOKEN_SECRET, "your-access-token-secret");
props.setProperty(TwitterSource.CLIENT_NAME, "my-flink-app");
props.setProperty(TwitterSource.CLIENT_BUFFER_SIZE, "100000");
// Create source with custom endpoint
TwitterSource twitterSource = new TwitterSource(props);
twitterSource.setCustomEndpointInitializer(() -> {
StatusesFilterEndpoint endpoint = new StatusesFilterEndpoint();
endpoint.trackTerms(Arrays.asList("apache", "flink", "bigdata"));
return endpoint;
});
// Setup streaming environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(10000); // Enable checkpointing for fault tolerance
env.addSource(twitterSource)
.filter(tweet -> tweet.contains("apache"))
.print();
env.execute("Filtered Twitter Stream");
} catch (IllegalArgumentException e) {
System.err.println("Configuration error: " + e.getMessage());
} catch (Exception e) {
System.err.println("Execution error: " + e.getMessage());
}import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.twitter.TwitterSource;
public class TwitterStreamingJob {
public static void main(String[] args) throws Exception {
Properties props = loadTwitterProperties(); // Your property loading logic
TwitterSource twitterSource = new TwitterSource(props);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Configure proper resource management
env.enableCheckpointing(5000);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 10000));
env.addSource(twitterSource)
.name("Twitter Source")
.setParallelism(1) // TwitterSource is not parallelizable
.map(tweet -> processTweet(tweet))
.print();
env.execute("Robust Twitter Stream");
}
private static String processTweet(String rawTweet) {
// Your tweet processing logic
return rawTweet;
}
}Creates a Twitter streaming source with OAuth authentication and configurable properties. The TwitterSource extends RichSourceFunction<String> and implements Flink's SourceFunction interface.
/**
* Creates a TwitterSource for streaming tweets from Twitter API
* @param properties Configuration properties containing OAuth credentials and optional settings
* @throws IllegalArgumentException if required properties are missing
*/
public class TwitterSource extends RichSourceFunction<String> {
public TwitterSource(Properties properties);
}Required Properties:
twitter-source.consumerKey - Twitter application consumer keytwitter-source.consumerSecret - Twitter application consumer secrettwitter-source.token - Twitter access tokentwitter-source.tokenSecret - Twitter access token secretOptional Properties:
twitter-source.name - Client name (default: "flink-twitter-source")twitter-source.hosts - Twitter API hosts (default: Constants.STREAM_HOST from HBC library)twitter-source.bufferSize - Buffer size for reading (default: "50000")Core lifecycle methods inherited from RichSourceFunction for managing the Twitter streaming connection.
/**
* Initializes the source with configuration parameters
* @param parameters Configuration from Flink runtime
* @throws Exception if initialization fails
*/
public void open(Configuration parameters) throws Exception;
/**
* Main execution method that establishes Twitter connection and streams data
* @param ctx Source context for emitting collected tweets
* @throws Exception if connection or streaming fails
*/
public void run(SourceContext<String> ctx) throws Exception;
/**
* Closes the Twitter connection and releases resources
*/
public void close();
/**
* Cancels the source operation and stops streaming
*/
public void cancel();Allows customization of the Twitter streaming endpoint beyond the default sample stream.
/**
* Sets a custom endpoint initializer for the Twitter source
* @param initializer Custom endpoint initializer implementation
* @throws NullPointerException if initializer is null
*/
public void setCustomEndpointInitializer(EndpointInitializer initializer)Pre-defined property key constants for configuration.
// Required property keys
public static final String CONSUMER_KEY = "twitter-source.consumerKey";
public static final String CONSUMER_SECRET = "twitter-source.consumerSecret";
public static final String TOKEN = "twitter-source.token";
public static final String TOKEN_SECRET = "twitter-source.tokenSecret";
// Optional property keys
public static final String CLIENT_NAME = "twitter-source.name";
public static final String CLIENT_HOSTS = "twitter-source.hosts";
public static final String CLIENT_BUFFER_SIZE = "twitter-source.bufferSize";Interface for creating custom Twitter streaming endpoints to override the default sample endpoint behavior.
/**
* Interface for creating custom Twitter streaming endpoints
*/
public interface EndpointInitializer {
/**
* Creates and returns a Twitter streaming endpoint
* @return StreamingEndpoint instance for Twitter API connection
*/
StreamingEndpoint createEndpoint();
}Usage Example:
TwitterSource twitterSource = new TwitterSource(properties);
// Custom endpoint that filters tweets by keywords
twitterSource.setCustomEndpointInitializer(() -> {
StatusesFilterEndpoint endpoint = new StatusesFilterEndpoint();
endpoint.trackTerms(Arrays.asList("apache", "flink", "streaming"));
return endpoint;
});/**
* Base source function interface from Flink
*/
public abstract class RichSourceFunction<T> implements SourceFunction<T> {
// Lifecycle methods implemented by TwitterSource
}
/**
* Context for emitting data from sources
*/
public interface SourceContext<T> {
void collect(T element);
// Additional methods for checkpointing and timestamps
}
/**
* Flink configuration object
*/
public class Configuration {
// Configuration parameters passed to open() method
}/**
* Base interface for Twitter streaming endpoints from HBC library
*/
public interface StreamingEndpoint {
// Twitter endpoint configuration methods
}
/**
* Sample endpoint implementation (used by default)
*/
public class StatusesSampleEndpoint implements StreamingEndpoint {
public StatusesSampleEndpoint stallWarnings(boolean stallWarnings);
public StatusesSampleEndpoint delimited(boolean delimited);
}
/**
* Filter endpoint for tracking specific terms
*/
public class StatusesFilterEndpoint implements StreamingEndpoint {
public StatusesFilterEndpoint trackTerms(List<String> terms);
// Additional filtering methods
}The connector handles several types of errors:
IllegalArgumentException thrown during constructor if required properties (CONSUMER_KEY, CONSUMER_SECRET, TOKEN, TOKEN_SECRET) are missingNullPointerException thrown by setCustomEndpointInitializer() if initializer parameter is nullIOException and InterruptedException can be thrown during stream processing and are propagated to Flink's error handlingException Hierarchy:
// Constructor exceptions
public TwitterSource(Properties properties) throws IllegalArgumentException
// Lifecycle method exceptions
public void open(Configuration parameters) throws Exception
public void run(SourceContext<String> ctx) throws Exception
// Configuration method exceptions
public void setCustomEndpointInitializer(EndpointInitializer initializer) throws NullPointerException