or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

index.md
tile.json

tessl/maven-org-apache-flink--flink-connector-twitter-2-12

Apache Flink Twitter connector for streaming data from Twitter API with OAuth authentication

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.flink/flink-connector-twitter_2.12@1.14.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-flink--flink-connector-twitter-2-12@1.14.0

index.mddocs/

Apache Flink Twitter Connector

Apache Flink Twitter connector provides streaming data ingestion from the Twitter API. It implements a RichSourceFunction that emits tweets as strings, using Twitter's Hosebird Client (HBC) library for reliable streaming connections with OAuth1 authentication and rate limiting support.

Package Information

  • Package Name: flink-connector-twitter_2.12
  • Group ID: org.apache.flink
  • Package Type: Maven
  • Language: Java
  • Installation: Add to Maven dependencies:
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-twitter_2.12</artifactId>
    <version>1.14.6</version>
</dependency>

Core Imports

import org.apache.flink.streaming.connectors.twitter.TwitterSource;
import org.apache.flink.streaming.connectors.twitter.TwitterSource.EndpointInitializer;
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
import org.apache.flink.configuration.Configuration;
import com.twitter.hbc.core.endpoint.StreamingEndpoint;
import java.util.Properties;

Basic Usage

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.twitter.TwitterSource;
import java.util.Properties;

// Configure Twitter credentials
Properties props = new Properties();
props.setProperty(TwitterSource.CONSUMER_KEY, "your-consumer-key");
props.setProperty(TwitterSource.CONSUMER_SECRET, "your-consumer-secret");
props.setProperty(TwitterSource.TOKEN, "your-access-token");
props.setProperty(TwitterSource.TOKEN_SECRET, "your-access-token-secret");

// Create and configure the source
TwitterSource twitterSource = new TwitterSource(props);

// Use in Flink streaming job
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.addSource(twitterSource)
   .print();

env.execute("Twitter Stream Job");

Advanced Usage Examples

Custom Endpoint with Error Handling

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.twitter.TwitterSource;
import com.twitter.hbc.core.endpoint.StatusesFilterEndpoint;
import java.util.Arrays;
import java.util.Properties;

try {
    // Configure properties
    Properties props = new Properties();
    props.setProperty(TwitterSource.CONSUMER_KEY, "your-consumer-key");
    props.setProperty(TwitterSource.CONSUMER_SECRET, "your-consumer-secret");
    props.setProperty(TwitterSource.TOKEN, "your-access-token");
    props.setProperty(TwitterSource.TOKEN_SECRET, "your-access-token-secret");
    props.setProperty(TwitterSource.CLIENT_NAME, "my-flink-app");
    props.setProperty(TwitterSource.CLIENT_BUFFER_SIZE, "100000");
    
    // Create source with custom endpoint
    TwitterSource twitterSource = new TwitterSource(props);
    twitterSource.setCustomEndpointInitializer(() -> {
        StatusesFilterEndpoint endpoint = new StatusesFilterEndpoint();
        endpoint.trackTerms(Arrays.asList("apache", "flink", "bigdata"));
        return endpoint;
    });
    
    // Setup streaming environment
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.enableCheckpointing(10000); // Enable checkpointing for fault tolerance
    
    env.addSource(twitterSource)
       .filter(tweet -> tweet.contains("apache"))
       .print();
       
    env.execute("Filtered Twitter Stream");
    
} catch (IllegalArgumentException e) {
    System.err.println("Configuration error: " + e.getMessage());
} catch (Exception e) {
    System.err.println("Execution error: " + e.getMessage());
}

Resource Management Pattern

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.twitter.TwitterSource;

public class TwitterStreamingJob {
    public static void main(String[] args) throws Exception {
        Properties props = loadTwitterProperties(); // Your property loading logic
        
        TwitterSource twitterSource = new TwitterSource(props);
        
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        // Configure proper resource management
        env.enableCheckpointing(5000);
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 10000));
        
        env.addSource(twitterSource)
           .name("Twitter Source")
           .setParallelism(1) // TwitterSource is not parallelizable
           .map(tweet -> processTweet(tweet))
           .print();
           
        env.execute("Robust Twitter Stream");
    }
    
    private static String processTweet(String rawTweet) {
        // Your tweet processing logic
        return rawTweet;
    }
}

Capabilities

Twitter Source Creation

Creates a Twitter streaming source with OAuth authentication and configurable properties. The TwitterSource extends RichSourceFunction<String> and implements Flink's SourceFunction interface.

/**
 * Creates a TwitterSource for streaming tweets from Twitter API
 * @param properties Configuration properties containing OAuth credentials and optional settings
 * @throws IllegalArgumentException if required properties are missing
 */
public class TwitterSource extends RichSourceFunction<String> {
    public TwitterSource(Properties properties);
}

Required Properties:

  • twitter-source.consumerKey - Twitter application consumer key
  • twitter-source.consumerSecret - Twitter application consumer secret
  • twitter-source.token - Twitter access token
  • twitter-source.tokenSecret - Twitter access token secret

Optional Properties:

  • twitter-source.name - Client name (default: "flink-twitter-source")
  • twitter-source.hosts - Twitter API hosts (default: Constants.STREAM_HOST from HBC library)
  • twitter-source.bufferSize - Buffer size for reading (default: "50000")

Source Lifecycle Methods

Core lifecycle methods inherited from RichSourceFunction for managing the Twitter streaming connection.

/**
 * Initializes the source with configuration parameters
 * @param parameters Configuration from Flink runtime
 * @throws Exception if initialization fails
 */
public void open(Configuration parameters) throws Exception;

/**
 * Main execution method that establishes Twitter connection and streams data
 * @param ctx Source context for emitting collected tweets
 * @throws Exception if connection or streaming fails
 */
public void run(SourceContext<String> ctx) throws Exception;

/**
 * Closes the Twitter connection and releases resources
 */
public void close();

/**
 * Cancels the source operation and stops streaming
 */
public void cancel();

Custom Endpoint Configuration

Allows customization of the Twitter streaming endpoint beyond the default sample stream.

/**
 * Sets a custom endpoint initializer for the Twitter source
 * @param initializer Custom endpoint initializer implementation
 * @throws NullPointerException if initializer is null
 */
public void setCustomEndpointInitializer(EndpointInitializer initializer)

Property Constants

Pre-defined property key constants for configuration.

// Required property keys
public static final String CONSUMER_KEY = "twitter-source.consumerKey";
public static final String CONSUMER_SECRET = "twitter-source.consumerSecret";
public static final String TOKEN = "twitter-source.token";
public static final String TOKEN_SECRET = "twitter-source.tokenSecret";

// Optional property keys  
public static final String CLIENT_NAME = "twitter-source.name";
public static final String CLIENT_HOSTS = "twitter-source.hosts";
public static final String CLIENT_BUFFER_SIZE = "twitter-source.bufferSize";

Interfaces

EndpointInitializer

Interface for creating custom Twitter streaming endpoints to override the default sample endpoint behavior.

/**
 * Interface for creating custom Twitter streaming endpoints
 */
public interface EndpointInitializer {
    /**
     * Creates and returns a Twitter streaming endpoint
     * @return StreamingEndpoint instance for Twitter API connection
     */
    StreamingEndpoint createEndpoint();
}

Usage Example:

TwitterSource twitterSource = new TwitterSource(properties);

// Custom endpoint that filters tweets by keywords
twitterSource.setCustomEndpointInitializer(() -> {
    StatusesFilterEndpoint endpoint = new StatusesFilterEndpoint();
    endpoint.trackTerms(Arrays.asList("apache", "flink", "streaming"));
    return endpoint;
});

Type Definitions

Core Flink Types

/**
 * Base source function interface from Flink
 */
public abstract class RichSourceFunction<T> implements SourceFunction<T> {
    // Lifecycle methods implemented by TwitterSource
}

/**
 * Context for emitting data from sources
 */
public interface SourceContext<T> {
    void collect(T element);
    // Additional methods for checkpointing and timestamps
}

/**
 * Flink configuration object
 */
public class Configuration {
    // Configuration parameters passed to open() method
}

Twitter HBC Types

/**
 * Base interface for Twitter streaming endpoints from HBC library
 */
public interface StreamingEndpoint {
    // Twitter endpoint configuration methods
}

/**
 * Sample endpoint implementation (used by default)
 */
public class StatusesSampleEndpoint implements StreamingEndpoint {
    public StatusesSampleEndpoint stallWarnings(boolean stallWarnings);
    public StatusesSampleEndpoint delimited(boolean delimited);
}

/**
 * Filter endpoint for tracking specific terms
 */
public class StatusesFilterEndpoint implements StreamingEndpoint {
    public StatusesFilterEndpoint trackTerms(List<String> terms);
    // Additional filtering methods
}

Architecture Notes

  • Non-parallel Source: TwitterSource is not parallelizable due to Twitter API connection limitations
  • Fault Tolerance: Integrates with Flink's checkpointing mechanism for exactly-once processing
  • Authentication: Uses OAuth1 protocol for secure Twitter API access
  • Rate Limiting: Handles Twitter API rate limits automatically through HBC library
  • Data Format: Emits raw tweet JSON strings for downstream processing
  • Dependencies: Built on Twitter's Hosebird Client (HBC) library version 2.2.0

Error Handling

The connector handles several types of errors:

  • Configuration Errors: IllegalArgumentException thrown during constructor if required properties (CONSUMER_KEY, CONSUMER_SECRET, TOKEN, TOKEN_SECRET) are missing
  • Endpoint Initialization Errors: NullPointerException thrown by setCustomEndpointInitializer() if initializer parameter is null
  • Connection and Runtime Errors: Handled by the underlying HBC library and Flink's fault tolerance mechanisms
  • Authentication Errors: OAuth authentication failures are handled by the HBC client during connection establishment
  • Stream Processing Errors: IOException and InterruptedException can be thrown during stream processing and are propagated to Flink's error handling

Exception Hierarchy:

// Constructor exceptions
public TwitterSource(Properties properties) throws IllegalArgumentException

// Lifecycle method exceptions  
public void open(Configuration parameters) throws Exception
public void run(SourceContext<String> ctx) throws Exception

// Configuration method exceptions
public void setCustomEndpointInitializer(EndpointInitializer initializer) throws NullPointerException