Twitter feed receiver for Apache Spark Streaming that enables real-time consumption of Twitter data streams
npx @tessl/cli install tessl/maven-org-apache-spark--spark-streaming-twitter-2-10@1.6.0Spark Streaming Twitter is an Apache Spark module that provides integration with Twitter's streaming API. It enables real-time consumption of Twitter data streams using the Twitter4J library, offering utilities to create input streams that receive live tweets for processing within Spark Streaming applications.
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-twitter_2.10</artifactId>
<version>1.6.3</version>
</dependency>Scala:
import org.apache.spark.streaming.twitter.TwitterUtils
import org.apache.spark.streaming.StreamingContext
import twitter4j.Status
import twitter4j.auth.AuthorizationJava:
import org.apache.spark.streaming.twitter.TwitterUtils;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import twitter4j.Status;
import twitter4j.auth.Authorization;Scala Example:
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.twitter.TwitterUtils
import org.apache.spark.SparkConf
// Create Spark configuration and streaming context
val conf = new SparkConf().setAppName("TwitterStreaming")
val ssc = new StreamingContext(conf, Seconds(2))
// Create Twitter stream (requires OAuth properties to be set)
val tweets = TwitterUtils.createStream(ssc, None)
// Process tweets
tweets.foreachRDD { rdd =>
val tweetTexts = rdd.map(_.getText)
tweetTexts.collect().foreach(println)
}
ssc.start()
ssc.awaitTermination()Java Example:
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.twitter.TwitterUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Durations;
// Create Spark configuration and streaming context
SparkConf conf = new SparkConf().setAppName("TwitterStreaming");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(2));
// Create Twitter stream
JavaReceiverInputDStream<Status> tweets = TwitterUtils.createStream(jssc);
// Process tweets
tweets.foreachRDD(rdd -> {
rdd.collect().forEach(status -> System.out.println(status.getText()));
});
jssc.start();
jssc.awaitTermination();Twitter API access requires OAuth credentials. Configure these as system properties:
twitter4j.oauth.consumerKey=YOUR_CONSUMER_KEY
twitter4j.oauth.consumerSecret=YOUR_CONSUMER_SECRET
twitter4j.oauth.accessToken=YOUR_ACCESS_TOKEN
twitter4j.oauth.accessTokenSecret=YOUR_ACCESS_TOKEN_SECRETCreates Twitter input streams with various configuration options for Scala applications.
object TwitterUtils {
def createStream(
ssc: StreamingContext,
twitterAuth: Option[Authorization],
filters: Seq[String] = Nil,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): ReceiverInputDStream[Status]
}Parameters:
ssc: StreamingContext object for creating the streamtwitterAuth: Optional Twitter4J authentication object (None uses system properties)filters: Sequence of filter strings to get only matching tweets (default: empty for sample stream)storageLevel: Storage level for received tweet objects (default: MEMORY_AND_DISK_SER_2)Returns: ReceiverInputDStream[Status] - A Spark streaming DStream of Twitter Status objects
Usage Example:
// Basic stream with default authentication
val stream1 = TwitterUtils.createStream(ssc, None)
// Filtered stream with keyword filters
val stream2 = TwitterUtils.createStream(ssc, None, Seq("spark", "scala"))
// Stream with custom authentication
val auth = new OAuthAuthorization(config)
val stream3 = TwitterUtils.createStream(ssc, Some(auth), Seq("tech"))
// Stream with custom storage level
val stream4 = TwitterUtils.createStream(ssc, None, Seq("news"), StorageLevel.MEMORY_ONLY)Creates Twitter input streams with various configuration options for Java applications.
class TwitterUtils {
// Basic stream creation
public static JavaReceiverInputDStream<Status> createStream(
JavaStreamingContext jssc
);
// Stream with filters
public static JavaReceiverInputDStream<Status> createStream(
JavaStreamingContext jssc,
String[] filters
);
// Stream with filters and storage level
public static JavaReceiverInputDStream<Status> createStream(
JavaStreamingContext jssc,
String[] filters,
StorageLevel storageLevel
);
// Stream with custom authentication
public static JavaReceiverInputDStream<Status> createStream(
JavaStreamingContext jssc,
Authorization twitterAuth
);
// Stream with authentication and filters
public static JavaReceiverInputDStream<Status> createStream(
JavaStreamingContext jssc,
Authorization twitterAuth,
String[] filters
);
// Stream with full customization
public static JavaReceiverInputDStream<Status> createStream(
JavaStreamingContext jssc,
Authorization twitterAuth,
String[] filters,
StorageLevel storageLevel
);
}Common Parameters:
jssc: JavaStreamingContext object for creating the streamtwitterAuth: Twitter4J Authorization object for API accessfilters: Array of filter strings to get only matching tweetsstorageLevel: Storage level for received tweet objectsReturns: JavaReceiverInputDStream<Status> - A Java Spark streaming DStream of Twitter Status objects
Usage Examples:
// Basic stream with system property authentication
JavaReceiverInputDStream<Status> stream1 = TwitterUtils.createStream(jssc);
// Filtered stream
String[] filters = {"spark", "java"};
JavaReceiverInputDStream<Status> stream2 = TwitterUtils.createStream(jssc, filters);
// Stream with custom authentication
Authorization auth = new OAuthAuthorization(config);
JavaReceiverInputDStream<Status> stream3 = TwitterUtils.createStream(jssc, auth);
// Full customization
JavaReceiverInputDStream<Status> stream4 = TwitterUtils.createStream(
jssc, auth, filters, StorageLevel.MEMORY_AND_DISK_SER_2()
);// From Twitter4J library
class Status {
def getText(): String
def getUser(): User
def getCreatedAt(): java.util.Date
def getId(): Long
def getRetweetCount(): Int
def getFavoriteCount(): Int
// ... many more methods for accessing tweet data
}
interface Authorization {
// Twitter4J authentication interface
}
class OAuthAuthorization implements Authorization {
// OAuth implementation for Twitter API access
}
// From Spark Streaming
class StreamingContext
class JavaStreamingContext
abstract class ReceiverInputDStream[T] extends InputDStream[T]
abstract class JavaReceiverInputDStream[T] extends JavaInputDStream[T]
// From Spark Core
class StorageLevel {
// Predefined storage levels
MEMORY_ONLY: StorageLevel
MEMORY_AND_DISK: StorageLevel
MEMORY_AND_DISK_SER: StorageLevel
MEMORY_AND_DISK_SER_2: StorageLevel // Default for Twitter streams
// ... more storage level options
}All streams return Twitter4J Status objects containing:
MEMORY_AND_DISK_SER_2 provides fault tolerance with serializationMEMORY_ONLY storage level for better performance