Twitter feed receiver for Apache Spark Streaming that enables real-time consumption of Twitter data streams
—
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Pending
The risk profile of this skill
Spark Streaming Twitter is an Apache Spark module that provides integration with Twitter's streaming API. It enables real-time consumption of Twitter data streams using the Twitter4J library, offering utilities to create input streams that receive live tweets for processing within Spark Streaming applications.
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-twitter_2.10</artifactId>
<version>1.6.3</version>
</dependency>Scala:
import org.apache.spark.streaming.twitter.TwitterUtils
import org.apache.spark.streaming.StreamingContext
import twitter4j.Status
import twitter4j.auth.AuthorizationJava:
import org.apache.spark.streaming.twitter.TwitterUtils;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import twitter4j.Status;
import twitter4j.auth.Authorization;Scala Example:
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.twitter.TwitterUtils
import org.apache.spark.SparkConf
// Create Spark configuration and streaming context
val conf = new SparkConf().setAppName("TwitterStreaming")
val ssc = new StreamingContext(conf, Seconds(2))
// Create Twitter stream (requires OAuth properties to be set)
val tweets = TwitterUtils.createStream(ssc, None)
// Process tweets
tweets.foreachRDD { rdd =>
val tweetTexts = rdd.map(_.getText)
tweetTexts.collect().foreach(println)
}
ssc.start()
ssc.awaitTermination()Java Example:
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.twitter.TwitterUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Durations;
// Create Spark configuration and streaming context
SparkConf conf = new SparkConf().setAppName("TwitterStreaming");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(2));
// Create Twitter stream
JavaReceiverInputDStream<Status> tweets = TwitterUtils.createStream(jssc);
// Process tweets
tweets.foreachRDD(rdd -> {
rdd.collect().forEach(status -> System.out.println(status.getText()));
});
jssc.start();
jssc.awaitTermination();Twitter API access requires OAuth credentials. Configure these as system properties:
twitter4j.oauth.consumerKey=YOUR_CONSUMER_KEY
twitter4j.oauth.consumerSecret=YOUR_CONSUMER_SECRET
twitter4j.oauth.accessToken=YOUR_ACCESS_TOKEN
twitter4j.oauth.accessTokenSecret=YOUR_ACCESS_TOKEN_SECRETCreates Twitter input streams with various configuration options for Scala applications.
object TwitterUtils {
def createStream(
ssc: StreamingContext,
twitterAuth: Option[Authorization],
filters: Seq[String] = Nil,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): ReceiverInputDStream[Status]
}Parameters:
ssc: StreamingContext object for creating the streamtwitterAuth: Optional Twitter4J authentication object (None uses system properties)filters: Sequence of filter strings to get only matching tweets (default: empty for sample stream)storageLevel: Storage level for received tweet objects (default: MEMORY_AND_DISK_SER_2)Returns: ReceiverInputDStream[Status] - A Spark streaming DStream of Twitter Status objects
Usage Example:
// Basic stream with default authentication
val stream1 = TwitterUtils.createStream(ssc, None)
// Filtered stream with keyword filters
val stream2 = TwitterUtils.createStream(ssc, None, Seq("spark", "scala"))
// Stream with custom authentication
val auth = new OAuthAuthorization(config)
val stream3 = TwitterUtils.createStream(ssc, Some(auth), Seq("tech"))
// Stream with custom storage level
val stream4 = TwitterUtils.createStream(ssc, None, Seq("news"), StorageLevel.MEMORY_ONLY)Creates Twitter input streams with various configuration options for Java applications.
class TwitterUtils {
// Basic stream creation
public static JavaReceiverInputDStream<Status> createStream(
JavaStreamingContext jssc
);
// Stream with filters
public static JavaReceiverInputDStream<Status> createStream(
JavaStreamingContext jssc,
String[] filters
);
// Stream with filters and storage level
public static JavaReceiverInputDStream<Status> createStream(
JavaStreamingContext jssc,
String[] filters,
StorageLevel storageLevel
);
// Stream with custom authentication
public static JavaReceiverInputDStream<Status> createStream(
JavaStreamingContext jssc,
Authorization twitterAuth
);
// Stream with authentication and filters
public static JavaReceiverInputDStream<Status> createStream(
JavaStreamingContext jssc,
Authorization twitterAuth,
String[] filters
);
// Stream with full customization
public static JavaReceiverInputDStream<Status> createStream(
JavaStreamingContext jssc,
Authorization twitterAuth,
String[] filters,
StorageLevel storageLevel
);
}Common Parameters:
jssc: JavaStreamingContext object for creating the streamtwitterAuth: Twitter4J Authorization object for API accessfilters: Array of filter strings to get only matching tweetsstorageLevel: Storage level for received tweet objectsReturns: JavaReceiverInputDStream<Status> - A Java Spark streaming DStream of Twitter Status objects
Usage Examples:
// Basic stream with system property authentication
JavaReceiverInputDStream<Status> stream1 = TwitterUtils.createStream(jssc);
// Filtered stream
String[] filters = {"spark", "java"};
JavaReceiverInputDStream<Status> stream2 = TwitterUtils.createStream(jssc, filters);
// Stream with custom authentication
Authorization auth = new OAuthAuthorization(config);
JavaReceiverInputDStream<Status> stream3 = TwitterUtils.createStream(jssc, auth);
// Full customization
JavaReceiverInputDStream<Status> stream4 = TwitterUtils.createStream(
jssc, auth, filters, StorageLevel.MEMORY_AND_DISK_SER_2()
);// From Twitter4J library
class Status {
def getText(): String
def getUser(): User
def getCreatedAt(): java.util.Date
def getId(): Long
def getRetweetCount(): Int
def getFavoriteCount(): Int
// ... many more methods for accessing tweet data
}
interface Authorization {
// Twitter4J authentication interface
}
class OAuthAuthorization implements Authorization {
// OAuth implementation for Twitter API access
}
// From Spark Streaming
class StreamingContext
class JavaStreamingContext
abstract class ReceiverInputDStream[T] extends InputDStream[T]
abstract class JavaReceiverInputDStream[T] extends JavaInputDStream[T]
// From Spark Core
class StorageLevel {
// Predefined storage levels
MEMORY_ONLY: StorageLevel
MEMORY_AND_DISK: StorageLevel
MEMORY_AND_DISK_SER: StorageLevel
MEMORY_AND_DISK_SER_2: StorageLevel // Default for Twitter streams
// ... more storage level options
}All streams return Twitter4J Status objects containing:
MEMORY_AND_DISK_SER_2 provides fault tolerance with serializationMEMORY_ONLY storage level for better performance