CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-connector-twitter-2-12

Apache Flink Twitter connector for streaming data from Twitter API with OAuth authentication

Pending
Overview
Eval results
Files

Apache Flink Twitter Connector

Apache Flink Twitter connector provides streaming data ingestion from the Twitter API. It implements a RichSourceFunction that emits tweets as strings, using Twitter's Hosebird Client (HBC) library for reliable streaming connections with OAuth1 authentication and rate limiting support.

Package Information

  • Package Name: flink-connector-twitter_2.12
  • Group ID: org.apache.flink
  • Package Type: Maven
  • Language: Java
  • Installation: Add to Maven dependencies:
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-twitter_2.12</artifactId>
    <version>1.14.6</version>
</dependency>

Core Imports

import org.apache.flink.streaming.connectors.twitter.TwitterSource;
import org.apache.flink.streaming.connectors.twitter.TwitterSource.EndpointInitializer;
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
import org.apache.flink.configuration.Configuration;
import com.twitter.hbc.core.endpoint.StreamingEndpoint;
import java.util.Properties;

Basic Usage

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.twitter.TwitterSource;
import java.util.Properties;

// Configure Twitter credentials
Properties props = new Properties();
props.setProperty(TwitterSource.CONSUMER_KEY, "your-consumer-key");
props.setProperty(TwitterSource.CONSUMER_SECRET, "your-consumer-secret");
props.setProperty(TwitterSource.TOKEN, "your-access-token");
props.setProperty(TwitterSource.TOKEN_SECRET, "your-access-token-secret");

// Create and configure the source
TwitterSource twitterSource = new TwitterSource(props);

// Use in Flink streaming job
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.addSource(twitterSource)
   .print();

env.execute("Twitter Stream Job");

Advanced Usage Examples

Custom Endpoint with Error Handling

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.twitter.TwitterSource;
import com.twitter.hbc.core.endpoint.StatusesFilterEndpoint;
import java.util.Arrays;
import java.util.Properties;

try {
    // Configure properties
    Properties props = new Properties();
    props.setProperty(TwitterSource.CONSUMER_KEY, "your-consumer-key");
    props.setProperty(TwitterSource.CONSUMER_SECRET, "your-consumer-secret");
    props.setProperty(TwitterSource.TOKEN, "your-access-token");
    props.setProperty(TwitterSource.TOKEN_SECRET, "your-access-token-secret");
    props.setProperty(TwitterSource.CLIENT_NAME, "my-flink-app");
    props.setProperty(TwitterSource.CLIENT_BUFFER_SIZE, "100000");
    
    // Create source with custom endpoint
    TwitterSource twitterSource = new TwitterSource(props);
    twitterSource.setCustomEndpointInitializer(() -> {
        StatusesFilterEndpoint endpoint = new StatusesFilterEndpoint();
        endpoint.trackTerms(Arrays.asList("apache", "flink", "bigdata"));
        return endpoint;
    });
    
    // Setup streaming environment
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.enableCheckpointing(10000); // Enable checkpointing for fault tolerance
    
    env.addSource(twitterSource)
       .filter(tweet -> tweet.contains("apache"))
       .print();
       
    env.execute("Filtered Twitter Stream");
    
} catch (IllegalArgumentException e) {
    System.err.println("Configuration error: " + e.getMessage());
} catch (Exception e) {
    System.err.println("Execution error: " + e.getMessage());
}

Resource Management Pattern

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.twitter.TwitterSource;

public class TwitterStreamingJob {
    public static void main(String[] args) throws Exception {
        Properties props = loadTwitterProperties(); // Your property loading logic
        
        TwitterSource twitterSource = new TwitterSource(props);
        
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        // Configure proper resource management
        env.enableCheckpointing(5000);
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 10000));
        
        env.addSource(twitterSource)
           .name("Twitter Source")
           .setParallelism(1) // TwitterSource is not parallelizable
           .map(tweet -> processTweet(tweet))
           .print();
           
        env.execute("Robust Twitter Stream");
    }
    
    private static String processTweet(String rawTweet) {
        // Your tweet processing logic
        return rawTweet;
    }
}

Capabilities

Twitter Source Creation

Creates a Twitter streaming source with OAuth authentication and configurable properties. The TwitterSource extends RichSourceFunction<String> and implements Flink's SourceFunction interface.

/**
 * Creates a TwitterSource for streaming tweets from Twitter API
 * @param properties Configuration properties containing OAuth credentials and optional settings
 * @throws IllegalArgumentException if required properties are missing
 */
public class TwitterSource extends RichSourceFunction<String> {
    public TwitterSource(Properties properties);
}

Required Properties:

  • twitter-source.consumerKey - Twitter application consumer key
  • twitter-source.consumerSecret - Twitter application consumer secret
  • twitter-source.token - Twitter access token
  • twitter-source.tokenSecret - Twitter access token secret

Optional Properties:

  • twitter-source.name - Client name (default: "flink-twitter-source")
  • twitter-source.hosts - Twitter API hosts (default: Constants.STREAM_HOST from HBC library)
  • twitter-source.bufferSize - Buffer size for reading (default: "50000")

Source Lifecycle Methods

Core lifecycle methods inherited from RichSourceFunction for managing the Twitter streaming connection.

/**
 * Initializes the source with configuration parameters
 * @param parameters Configuration from Flink runtime
 * @throws Exception if initialization fails
 */
public void open(Configuration parameters) throws Exception;

/**
 * Main execution method that establishes Twitter connection and streams data
 * @param ctx Source context for emitting collected tweets
 * @throws Exception if connection or streaming fails
 */
public void run(SourceContext<String> ctx) throws Exception;

/**
 * Closes the Twitter connection and releases resources
 */
public void close();

/**
 * Cancels the source operation and stops streaming
 */
public void cancel();

Custom Endpoint Configuration

Allows customization of the Twitter streaming endpoint beyond the default sample stream.

/**
 * Sets a custom endpoint initializer for the Twitter source
 * @param initializer Custom endpoint initializer implementation
 * @throws NullPointerException if initializer is null
 */
public void setCustomEndpointInitializer(EndpointInitializer initializer)

Property Constants

Pre-defined property key constants for configuration.

// Required property keys
public static final String CONSUMER_KEY = "twitter-source.consumerKey";
public static final String CONSUMER_SECRET = "twitter-source.consumerSecret";
public static final String TOKEN = "twitter-source.token";
public static final String TOKEN_SECRET = "twitter-source.tokenSecret";

// Optional property keys  
public static final String CLIENT_NAME = "twitter-source.name";
public static final String CLIENT_HOSTS = "twitter-source.hosts";
public static final String CLIENT_BUFFER_SIZE = "twitter-source.bufferSize";

Interfaces

EndpointInitializer

Interface for creating custom Twitter streaming endpoints to override the default sample endpoint behavior.

/**
 * Interface for creating custom Twitter streaming endpoints
 */
public interface EndpointInitializer {
    /**
     * Creates and returns a Twitter streaming endpoint
     * @return StreamingEndpoint instance for Twitter API connection
     */
    StreamingEndpoint createEndpoint();
}

Usage Example:

TwitterSource twitterSource = new TwitterSource(properties);

// Custom endpoint that filters tweets by keywords
twitterSource.setCustomEndpointInitializer(() -> {
    StatusesFilterEndpoint endpoint = new StatusesFilterEndpoint();
    endpoint.trackTerms(Arrays.asList("apache", "flink", "streaming"));
    return endpoint;
});

Type Definitions

Core Flink Types

/**
 * Base source function interface from Flink
 */
public abstract class RichSourceFunction<T> implements SourceFunction<T> {
    // Lifecycle methods implemented by TwitterSource
}

/**
 * Context for emitting data from sources
 */
public interface SourceContext<T> {
    void collect(T element);
    // Additional methods for checkpointing and timestamps
}

/**
 * Flink configuration object
 */
public class Configuration {
    // Configuration parameters passed to open() method
}

Twitter HBC Types

/**
 * Base interface for Twitter streaming endpoints from HBC library
 */
public interface StreamingEndpoint {
    // Twitter endpoint configuration methods
}

/**
 * Sample endpoint implementation (used by default)
 */
public class StatusesSampleEndpoint implements StreamingEndpoint {
    public StatusesSampleEndpoint stallWarnings(boolean stallWarnings);
    public StatusesSampleEndpoint delimited(boolean delimited);
}

/**
 * Filter endpoint for tracking specific terms
 */
public class StatusesFilterEndpoint implements StreamingEndpoint {
    public StatusesFilterEndpoint trackTerms(List<String> terms);
    // Additional filtering methods
}

Architecture Notes

  • Non-parallel Source: TwitterSource is not parallelizable due to Twitter API connection limitations
  • Fault Tolerance: Integrates with Flink's checkpointing mechanism for exactly-once processing
  • Authentication: Uses OAuth1 protocol for secure Twitter API access
  • Rate Limiting: Handles Twitter API rate limits automatically through HBC library
  • Data Format: Emits raw tweet JSON strings for downstream processing
  • Dependencies: Built on Twitter's Hosebird Client (HBC) library version 2.2.0

Error Handling

The connector handles several types of errors:

  • Configuration Errors: IllegalArgumentException thrown during constructor if required properties (CONSUMER_KEY, CONSUMER_SECRET, TOKEN, TOKEN_SECRET) are missing
  • Endpoint Initialization Errors: NullPointerException thrown by setCustomEndpointInitializer() if initializer parameter is null
  • Connection and Runtime Errors: Handled by the underlying HBC library and Flink's fault tolerance mechanisms
  • Authentication Errors: OAuth authentication failures are handled by the HBC client during connection establishment
  • Stream Processing Errors: IOException and InterruptedException can be thrown during stream processing and are propagated to Flink's error handling

Exception Hierarchy:

// Constructor exceptions
public TwitterSource(Properties properties) throws IllegalArgumentException

// Lifecycle method exceptions  
public void open(Configuration parameters) throws Exception
public void run(SourceContext<String> ctx) throws Exception

// Configuration method exceptions
public void setCustomEndpointInitializer(EndpointInitializer initializer) throws NullPointerException

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-connector-twitter-2-12
Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.flink/flink-connector-twitter_2.12@1.14.x
Badge
tessl/maven-org-apache-flink--flink-connector-twitter-2-12 badge