CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-spark--spark-streaming-twitter-2-10

Twitter feed receiver for Apache Spark Streaming that enables real-time consumption of Twitter data streams

Pending
Overview
Eval results
Files

Spark Streaming Twitter

Spark Streaming Twitter is an Apache Spark module that provides integration with Twitter's streaming API. It enables real-time consumption of Twitter data streams using the Twitter4J library, offering utilities to create input streams that receive live tweets for processing within Spark Streaming applications.

Package Information

  • Package Name: spark-streaming-twitter_2.10
  • Package Type: Maven
  • Language: Scala (with Java API support)
  • Group ID: org.apache.spark
  • Artifact ID: spark-streaming-twitter_2.10
  • Version: 1.6.3
  • Installation: Add Maven dependency or use with pre-built Spark distribution
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-streaming-twitter_2.10</artifactId>
  <version>1.6.3</version>
</dependency>

Core Imports

Scala:

import org.apache.spark.streaming.twitter.TwitterUtils
import org.apache.spark.streaming.StreamingContext
import twitter4j.Status
import twitter4j.auth.Authorization

Java:

import org.apache.spark.streaming.twitter.TwitterUtils;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import twitter4j.Status;
import twitter4j.auth.Authorization;

Basic Usage

Scala Example:

import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.twitter.TwitterUtils
import org.apache.spark.SparkConf

// Create Spark configuration and streaming context
val conf = new SparkConf().setAppName("TwitterStreaming")
val ssc = new StreamingContext(conf, Seconds(2))

// Create Twitter stream (requires OAuth properties to be set)
val tweets = TwitterUtils.createStream(ssc, None)

// Process tweets
tweets.foreachRDD { rdd =>
  val tweetTexts = rdd.map(_.getText)
  tweetTexts.collect().foreach(println)
}

ssc.start()
ssc.awaitTermination()

Java Example:

import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.twitter.TwitterUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Durations;

// Create Spark configuration and streaming context
SparkConf conf = new SparkConf().setAppName("TwitterStreaming");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(2));

// Create Twitter stream
JavaReceiverInputDStream<Status> tweets = TwitterUtils.createStream(jssc);

// Process tweets
tweets.foreachRDD(rdd -> {
    rdd.collect().forEach(status -> System.out.println(status.getText()));
});

jssc.start();
jssc.awaitTermination();

Authentication

Twitter API access requires OAuth credentials. Configure these as system properties:

twitter4j.oauth.consumerKey=YOUR_CONSUMER_KEY
twitter4j.oauth.consumerSecret=YOUR_CONSUMER_SECRET
twitter4j.oauth.accessToken=YOUR_ACCESS_TOKEN
twitter4j.oauth.accessTokenSecret=YOUR_ACCESS_TOKEN_SECRET

Capabilities

Twitter Stream Creation - Scala API

Creates Twitter input streams with various configuration options for Scala applications.

object TwitterUtils {
  def createStream(
    ssc: StreamingContext,
    twitterAuth: Option[Authorization],
    filters: Seq[String] = Nil,
    storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
  ): ReceiverInputDStream[Status]
}

Parameters:

  • ssc: StreamingContext object for creating the stream
  • twitterAuth: Optional Twitter4J authentication object (None uses system properties)
  • filters: Sequence of filter strings to get only matching tweets (default: empty for sample stream)
  • storageLevel: Storage level for received tweet objects (default: MEMORY_AND_DISK_SER_2)

Returns: ReceiverInputDStream[Status] - A Spark streaming DStream of Twitter Status objects

Usage Example:

// Basic stream with default authentication
val stream1 = TwitterUtils.createStream(ssc, None)

// Filtered stream with keyword filters
val stream2 = TwitterUtils.createStream(ssc, None, Seq("spark", "scala"))

// Stream with custom authentication
val auth = new OAuthAuthorization(config)
val stream3 = TwitterUtils.createStream(ssc, Some(auth), Seq("tech"))

// Stream with custom storage level
val stream4 = TwitterUtils.createStream(ssc, None, Seq("news"), StorageLevel.MEMORY_ONLY)

Twitter Stream Creation - Java API

Creates Twitter input streams with various configuration options for Java applications.

class TwitterUtils {
  // Basic stream creation
  public static JavaReceiverInputDStream<Status> createStream(
    JavaStreamingContext jssc
  );
  
  // Stream with filters
  public static JavaReceiverInputDStream<Status> createStream(
    JavaStreamingContext jssc,
    String[] filters
  );
  
  // Stream with filters and storage level 
  public static JavaReceiverInputDStream<Status> createStream(
    JavaStreamingContext jssc,
    String[] filters,
    StorageLevel storageLevel
  );
  
  // Stream with custom authentication
  public static JavaReceiverInputDStream<Status> createStream(
    JavaStreamingContext jssc,
    Authorization twitterAuth
  );
  
  // Stream with authentication and filters
  public static JavaReceiverInputDStream<Status> createStream(
    JavaStreamingContext jssc,
    Authorization twitterAuth,
    String[] filters
  );
  
  // Stream with full customization
  public static JavaReceiverInputDStream<Status> createStream(
    JavaStreamingContext jssc,
    Authorization twitterAuth,
    String[] filters,
    StorageLevel storageLevel
  );
}

Common Parameters:

  • jssc: JavaStreamingContext object for creating the stream
  • twitterAuth: Twitter4J Authorization object for API access
  • filters: Array of filter strings to get only matching tweets
  • storageLevel: Storage level for received tweet objects

Returns: JavaReceiverInputDStream<Status> - A Java Spark streaming DStream of Twitter Status objects

Usage Examples:

// Basic stream with system property authentication
JavaReceiverInputDStream<Status> stream1 = TwitterUtils.createStream(jssc);

// Filtered stream
String[] filters = {"spark", "java"};
JavaReceiverInputDStream<Status> stream2 = TwitterUtils.createStream(jssc, filters);

// Stream with custom authentication
Authorization auth = new OAuthAuthorization(config);
JavaReceiverInputDStream<Status> stream3 = TwitterUtils.createStream(jssc, auth);

// Full customization
JavaReceiverInputDStream<Status> stream4 = TwitterUtils.createStream(
  jssc, auth, filters, StorageLevel.MEMORY_AND_DISK_SER_2()
);

Types

Core Types

// From Twitter4J library
class Status {
  def getText(): String
  def getUser(): User
  def getCreatedAt(): java.util.Date
  def getId(): Long
  def getRetweetCount(): Int
  def getFavoriteCount(): Int
  // ... many more methods for accessing tweet data
}

interface Authorization {
  // Twitter4J authentication interface
}

class OAuthAuthorization implements Authorization {
  // OAuth implementation for Twitter API access
}

// From Spark Streaming
class StreamingContext
class JavaStreamingContext

abstract class ReceiverInputDStream[T] extends InputDStream[T]
abstract class JavaReceiverInputDStream[T] extends JavaInputDStream[T]

// From Spark Core
class StorageLevel {
  // Predefined storage levels
  MEMORY_ONLY: StorageLevel
  MEMORY_AND_DISK: StorageLevel  
  MEMORY_AND_DISK_SER: StorageLevel
  MEMORY_AND_DISK_SER_2: StorageLevel  // Default for Twitter streams
  // ... more storage level options
}

Stream Behavior

Filtering vs Sampling

  • Filtered streams: When filters are provided, uses Twitter's filter API to receive tweets matching the specified keywords
  • Sample streams: When no filters are provided, uses Twitter's sample API to receive a random sample of all public tweets

Error Handling

  • Streams automatically restart on connection errors or exceptions
  • Failed connections trigger exponential backoff retry logic
  • Stream state is preserved across restarts when possible

Data Format

All streams return Twitter4J Status objects containing:

  • Tweet text, user information, timestamps
  • Engagement metrics (retweets, likes)
  • Metadata (language, source, geo-location if available)
  • Reply and mention relationships

Performance Considerations

  • Default storage level MEMORY_AND_DISK_SER_2 provides fault tolerance with serialization
  • Higher replication factor (_2) ensures data availability during node failures
  • For high-volume streams, consider using MEMORY_ONLY storage level for better performance
  • Tweet processing should be efficient to avoid backpressure issues

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-spark--spark-streaming-twitter-2-10
Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.spark/spark-streaming-twitter_2.10@1.6.x
Badge
tessl/maven-org-apache-spark--spark-streaming-twitter-2-10 badge