Twitter feed receiver for Apache Spark Streaming that enables real-time consumption of Twitter data streams
npx @tessl/cli install tessl/maven-org-apache-spark--spark-streaming-twitter-2-10@1.6.00
# Spark Streaming Twitter
1
2
Spark Streaming Twitter is an Apache Spark module that provides integration with Twitter's streaming API. It enables real-time consumption of Twitter data streams using the Twitter4J library, offering utilities to create input streams that receive live tweets for processing within Spark Streaming applications.
3
4
## Package Information
5
6
- **Package Name**: spark-streaming-twitter_2.10
7
- **Package Type**: Maven
8
- **Language**: Scala (with Java API support)
9
- **Group ID**: org.apache.spark
10
- **Artifact ID**: spark-streaming-twitter_2.10
11
- **Version**: 1.6.3
12
- **Installation**: Add Maven dependency or use with pre-built Spark distribution
13
14
```xml
15
<dependency>
16
<groupId>org.apache.spark</groupId>
17
<artifactId>spark-streaming-twitter_2.10</artifactId>
18
<version>1.6.3</version>
19
</dependency>
20
```
21
22
## Core Imports
23
24
**Scala:**
25
```scala
26
import org.apache.spark.streaming.twitter.TwitterUtils
27
import org.apache.spark.streaming.StreamingContext
28
import twitter4j.Status
29
import twitter4j.auth.Authorization
30
```
31
32
**Java:**
33
```java
34
import org.apache.spark.streaming.twitter.TwitterUtils;
35
import org.apache.spark.streaming.api.java.JavaStreamingContext;
36
import twitter4j.Status;
37
import twitter4j.auth.Authorization;
38
```
39
40
## Basic Usage
41
42
**Scala Example:**
43
```scala
44
import org.apache.spark.streaming.{Seconds, StreamingContext}
45
import org.apache.spark.streaming.twitter.TwitterUtils
46
import org.apache.spark.SparkConf
47
48
// Create Spark configuration and streaming context
49
val conf = new SparkConf().setAppName("TwitterStreaming")
50
val ssc = new StreamingContext(conf, Seconds(2))
51
52
// Create Twitter stream (requires OAuth properties to be set)
53
val tweets = TwitterUtils.createStream(ssc, None)
54
55
// Process tweets
56
tweets.foreachRDD { rdd =>
57
val tweetTexts = rdd.map(_.getText)
58
tweetTexts.collect().foreach(println)
59
}
60
61
ssc.start()
62
ssc.awaitTermination()
63
```
64
65
**Java Example:**
66
```java
67
import org.apache.spark.streaming.api.java.JavaStreamingContext;
68
import org.apache.spark.streaming.twitter.TwitterUtils;
69
import org.apache.spark.SparkConf;
70
import org.apache.spark.streaming.Durations;
71
72
// Create Spark configuration and streaming context
73
SparkConf conf = new SparkConf().setAppName("TwitterStreaming");
74
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(2));
75
76
// Create Twitter stream
77
JavaReceiverInputDStream<Status> tweets = TwitterUtils.createStream(jssc);
78
79
// Process tweets
80
tweets.foreachRDD(rdd -> {
81
rdd.collect().forEach(status -> System.out.println(status.getText()));
82
});
83
84
jssc.start();
85
jssc.awaitTermination();
86
```
87
88
## Authentication
89
90
Twitter API access requires OAuth credentials. Configure these as system properties:
91
92
```properties
93
twitter4j.oauth.consumerKey=YOUR_CONSUMER_KEY
94
twitter4j.oauth.consumerSecret=YOUR_CONSUMER_SECRET
95
twitter4j.oauth.accessToken=YOUR_ACCESS_TOKEN
96
twitter4j.oauth.accessTokenSecret=YOUR_ACCESS_TOKEN_SECRET
97
```
98
99
## Capabilities
100
101
### Twitter Stream Creation - Scala API
102
103
Creates Twitter input streams with various configuration options for Scala applications.
104
105
```scala { .api }
106
object TwitterUtils {
107
def createStream(
108
ssc: StreamingContext,
109
twitterAuth: Option[Authorization],
110
filters: Seq[String] = Nil,
111
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
112
): ReceiverInputDStream[Status]
113
}
114
```
115
116
**Parameters:**
117
- `ssc`: StreamingContext object for creating the stream
118
- `twitterAuth`: Optional Twitter4J authentication object (None uses system properties)
119
- `filters`: Sequence of filter strings to get only matching tweets (default: empty for sample stream)
120
- `storageLevel`: Storage level for received tweet objects (default: MEMORY_AND_DISK_SER_2)
121
122
**Returns:** ReceiverInputDStream[Status] - A Spark streaming DStream of Twitter Status objects
123
124
**Usage Example:**
125
```scala
126
// Basic stream with default authentication
127
val stream1 = TwitterUtils.createStream(ssc, None)
128
129
// Filtered stream with keyword filters
130
val stream2 = TwitterUtils.createStream(ssc, None, Seq("spark", "scala"))
131
132
// Stream with custom authentication
133
val auth = new OAuthAuthorization(config)
134
val stream3 = TwitterUtils.createStream(ssc, Some(auth), Seq("tech"))
135
136
// Stream with custom storage level
137
val stream4 = TwitterUtils.createStream(ssc, None, Seq("news"), StorageLevel.MEMORY_ONLY)
138
```
139
140
### Twitter Stream Creation - Java API
141
142
Creates Twitter input streams with various configuration options for Java applications.
143
144
```java { .api }
145
class TwitterUtils {
146
// Basic stream creation
147
public static JavaReceiverInputDStream<Status> createStream(
148
JavaStreamingContext jssc
149
);
150
151
// Stream with filters
152
public static JavaReceiverInputDStream<Status> createStream(
153
JavaStreamingContext jssc,
154
String[] filters
155
);
156
157
// Stream with filters and storage level
158
public static JavaReceiverInputDStream<Status> createStream(
159
JavaStreamingContext jssc,
160
String[] filters,
161
StorageLevel storageLevel
162
);
163
164
// Stream with custom authentication
165
public static JavaReceiverInputDStream<Status> createStream(
166
JavaStreamingContext jssc,
167
Authorization twitterAuth
168
);
169
170
// Stream with authentication and filters
171
public static JavaReceiverInputDStream<Status> createStream(
172
JavaStreamingContext jssc,
173
Authorization twitterAuth,
174
String[] filters
175
);
176
177
// Stream with full customization
178
public static JavaReceiverInputDStream<Status> createStream(
179
JavaStreamingContext jssc,
180
Authorization twitterAuth,
181
String[] filters,
182
StorageLevel storageLevel
183
);
184
}
185
```
186
187
**Common Parameters:**
188
- `jssc`: JavaStreamingContext object for creating the stream
189
- `twitterAuth`: Twitter4J Authorization object for API access
190
- `filters`: Array of filter strings to get only matching tweets
191
- `storageLevel`: Storage level for received tweet objects
192
193
**Returns:** JavaReceiverInputDStream<Status> - A Java Spark streaming DStream of Twitter Status objects
194
195
**Usage Examples:**
196
```java
197
// Basic stream with system property authentication
198
JavaReceiverInputDStream<Status> stream1 = TwitterUtils.createStream(jssc);
199
200
// Filtered stream
201
String[] filters = {"spark", "java"};
202
JavaReceiverInputDStream<Status> stream2 = TwitterUtils.createStream(jssc, filters);
203
204
// Stream with custom authentication
205
Authorization auth = new OAuthAuthorization(config);
206
JavaReceiverInputDStream<Status> stream3 = TwitterUtils.createStream(jssc, auth);
207
208
// Full customization
209
JavaReceiverInputDStream<Status> stream4 = TwitterUtils.createStream(
210
jssc, auth, filters, StorageLevel.MEMORY_AND_DISK_SER_2()
211
);
212
```
213
214
## Types
215
216
### Core Types
217
218
```scala { .api }
219
// From Twitter4J library
220
class Status {
221
def getText(): String
222
def getUser(): User
223
def getCreatedAt(): java.util.Date
224
def getId(): Long
225
def getRetweetCount(): Int
226
def getFavoriteCount(): Int
227
// ... many more methods for accessing tweet data
228
}
229
230
interface Authorization {
231
// Twitter4J authentication interface
232
}
233
234
class OAuthAuthorization implements Authorization {
235
// OAuth implementation for Twitter API access
236
}
237
238
// From Spark Streaming
239
class StreamingContext
240
class JavaStreamingContext
241
242
abstract class ReceiverInputDStream[T] extends InputDStream[T]
243
abstract class JavaReceiverInputDStream[T] extends JavaInputDStream[T]
244
245
// From Spark Core
246
class StorageLevel {
247
// Predefined storage levels
248
MEMORY_ONLY: StorageLevel
249
MEMORY_AND_DISK: StorageLevel
250
MEMORY_AND_DISK_SER: StorageLevel
251
MEMORY_AND_DISK_SER_2: StorageLevel // Default for Twitter streams
252
// ... more storage level options
253
}
254
```
255
256
## Stream Behavior
257
258
### Filtering vs Sampling
259
- **Filtered streams**: When filters are provided, uses Twitter's filter API to receive tweets matching the specified keywords
260
- **Sample streams**: When no filters are provided, uses Twitter's sample API to receive a random sample of all public tweets
261
262
### Error Handling
263
- Streams automatically restart on connection errors or exceptions
264
- Failed connections trigger exponential backoff retry logic
265
- Stream state is preserved across restarts when possible
266
267
### Data Format
268
All streams return Twitter4J `Status` objects containing:
269
- Tweet text, user information, timestamps
270
- Engagement metrics (retweets, likes)
271
- Metadata (language, source, geo-location if available)
272
- Reply and mention relationships
273
274
### Performance Considerations
275
- Default storage level `MEMORY_AND_DISK_SER_2` provides fault tolerance with serialization
276
- Higher replication factor (\_2) ensures data availability during node failures
277
- For high-volume streams, consider using `MEMORY_ONLY` storage level for better performance
278
- Tweet processing should be efficient to avoid backpressure issues