or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

tessl/maven-org-apache-spark--spark-streaming-twitter-2-10

Twitter feed receiver for Apache Spark Streaming that enables real-time consumption of Twitter data streams

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.spark/spark-streaming-twitter_2.10@1.6.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-spark--spark-streaming-twitter-2-10@1.6.0

0

# Spark Streaming Twitter

1

2

Spark Streaming Twitter is an Apache Spark module that provides integration with Twitter's streaming API. It enables real-time consumption of Twitter data streams using the Twitter4J library, offering utilities to create input streams that receive live tweets for processing within Spark Streaming applications.

3

4

## Package Information

5

6

- **Package Name**: spark-streaming-twitter_2.10

7

- **Package Type**: Maven

8

- **Language**: Scala (with Java API support)

9

- **Group ID**: org.apache.spark

10

- **Artifact ID**: spark-streaming-twitter_2.10

11

- **Version**: 1.6.3

12

- **Installation**: Add Maven dependency or use with pre-built Spark distribution

13

14

```xml

15

<dependency>

16

<groupId>org.apache.spark</groupId>

17

<artifactId>spark-streaming-twitter_2.10</artifactId>

18

<version>1.6.3</version>

19

</dependency>

20

```

21

22

## Core Imports

23

24

**Scala:**

25

```scala

26

import org.apache.spark.streaming.twitter.TwitterUtils

27

import org.apache.spark.streaming.StreamingContext

28

import twitter4j.Status

29

import twitter4j.auth.Authorization

30

```

31

32

**Java:**

33

```java

34

import org.apache.spark.streaming.twitter.TwitterUtils;

35

import org.apache.spark.streaming.api.java.JavaStreamingContext;

36

import twitter4j.Status;

37

import twitter4j.auth.Authorization;

38

```

39

40

## Basic Usage

41

42

**Scala Example:**

43

```scala

44

import org.apache.spark.streaming.{Seconds, StreamingContext}

45

import org.apache.spark.streaming.twitter.TwitterUtils

46

import org.apache.spark.SparkConf

47

48

// Create Spark configuration and streaming context

49

val conf = new SparkConf().setAppName("TwitterStreaming")

50

val ssc = new StreamingContext(conf, Seconds(2))

51

52

// Create Twitter stream (requires OAuth properties to be set)

53

val tweets = TwitterUtils.createStream(ssc, None)

54

55

// Process tweets

56

tweets.foreachRDD { rdd =>

57

val tweetTexts = rdd.map(_.getText)

58

tweetTexts.collect().foreach(println)

59

}

60

61

ssc.start()

62

ssc.awaitTermination()

63

```

64

65

**Java Example:**

66

```java

67

import org.apache.spark.streaming.api.java.JavaStreamingContext;

68

import org.apache.spark.streaming.twitter.TwitterUtils;

69

import org.apache.spark.SparkConf;

70

import org.apache.spark.streaming.Durations;

71

72

// Create Spark configuration and streaming context

73

SparkConf conf = new SparkConf().setAppName("TwitterStreaming");

74

JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(2));

75

76

// Create Twitter stream

77

JavaReceiverInputDStream<Status> tweets = TwitterUtils.createStream(jssc);

78

79

// Process tweets

80

tweets.foreachRDD(rdd -> {

81

rdd.collect().forEach(status -> System.out.println(status.getText()));

82

});

83

84

jssc.start();

85

jssc.awaitTermination();

86

```

87

88

## Authentication

89

90

Twitter API access requires OAuth credentials. Configure these as system properties:

91

92

```properties

93

twitter4j.oauth.consumerKey=YOUR_CONSUMER_KEY

94

twitter4j.oauth.consumerSecret=YOUR_CONSUMER_SECRET

95

twitter4j.oauth.accessToken=YOUR_ACCESS_TOKEN

96

twitter4j.oauth.accessTokenSecret=YOUR_ACCESS_TOKEN_SECRET

97

```

98

99

## Capabilities

100

101

### Twitter Stream Creation - Scala API

102

103

Creates Twitter input streams with various configuration options for Scala applications.

104

105

```scala { .api }

106

object TwitterUtils {

107

def createStream(

108

ssc: StreamingContext,

109

twitterAuth: Option[Authorization],

110

filters: Seq[String] = Nil,

111

storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2

112

): ReceiverInputDStream[Status]

113

}

114

```

115

116

**Parameters:**

117

- `ssc`: StreamingContext object for creating the stream

118

- `twitterAuth`: Optional Twitter4J authentication object (None uses system properties)

119

- `filters`: Sequence of filter strings to get only matching tweets (default: empty for sample stream)

120

- `storageLevel`: Storage level for received tweet objects (default: MEMORY_AND_DISK_SER_2)

121

122

**Returns:** ReceiverInputDStream[Status] - A Spark streaming DStream of Twitter Status objects

123

124

**Usage Example:**

125

```scala

126

// Basic stream with default authentication

127

val stream1 = TwitterUtils.createStream(ssc, None)

128

129

// Filtered stream with keyword filters

130

val stream2 = TwitterUtils.createStream(ssc, None, Seq("spark", "scala"))

131

132

// Stream with custom authentication

133

val auth = new OAuthAuthorization(config)

134

val stream3 = TwitterUtils.createStream(ssc, Some(auth), Seq("tech"))

135

136

// Stream with custom storage level

137

val stream4 = TwitterUtils.createStream(ssc, None, Seq("news"), StorageLevel.MEMORY_ONLY)

138

```

139

140

### Twitter Stream Creation - Java API

141

142

Creates Twitter input streams with various configuration options for Java applications.

143

144

```java { .api }

145

class TwitterUtils {

146

// Basic stream creation

147

public static JavaReceiverInputDStream<Status> createStream(

148

JavaStreamingContext jssc

149

);

150

151

// Stream with filters

152

public static JavaReceiverInputDStream<Status> createStream(

153

JavaStreamingContext jssc,

154

String[] filters

155

);

156

157

// Stream with filters and storage level

158

public static JavaReceiverInputDStream<Status> createStream(

159

JavaStreamingContext jssc,

160

String[] filters,

161

StorageLevel storageLevel

162

);

163

164

// Stream with custom authentication

165

public static JavaReceiverInputDStream<Status> createStream(

166

JavaStreamingContext jssc,

167

Authorization twitterAuth

168

);

169

170

// Stream with authentication and filters

171

public static JavaReceiverInputDStream<Status> createStream(

172

JavaStreamingContext jssc,

173

Authorization twitterAuth,

174

String[] filters

175

);

176

177

// Stream with full customization

178

public static JavaReceiverInputDStream<Status> createStream(

179

JavaStreamingContext jssc,

180

Authorization twitterAuth,

181

String[] filters,

182

StorageLevel storageLevel

183

);

184

}

185

```

186

187

**Common Parameters:**

188

- `jssc`: JavaStreamingContext object for creating the stream

189

- `twitterAuth`: Twitter4J Authorization object for API access

190

- `filters`: Array of filter strings to get only matching tweets

191

- `storageLevel`: Storage level for received tweet objects

192

193

**Returns:** JavaReceiverInputDStream<Status> - A Java Spark streaming DStream of Twitter Status objects

194

195

**Usage Examples:**

196

```java

197

// Basic stream with system property authentication

198

JavaReceiverInputDStream<Status> stream1 = TwitterUtils.createStream(jssc);

199

200

// Filtered stream

201

String[] filters = {"spark", "java"};

202

JavaReceiverInputDStream<Status> stream2 = TwitterUtils.createStream(jssc, filters);

203

204

// Stream with custom authentication

205

Authorization auth = new OAuthAuthorization(config);

206

JavaReceiverInputDStream<Status> stream3 = TwitterUtils.createStream(jssc, auth);

207

208

// Full customization

209

JavaReceiverInputDStream<Status> stream4 = TwitterUtils.createStream(

210

jssc, auth, filters, StorageLevel.MEMORY_AND_DISK_SER_2()

211

);

212

```

213

214

## Types

215

216

### Core Types

217

218

```scala { .api }

219

// From Twitter4J library

220

class Status {

221

def getText(): String

222

def getUser(): User

223

def getCreatedAt(): java.util.Date

224

def getId(): Long

225

def getRetweetCount(): Int

226

def getFavoriteCount(): Int

227

// ... many more methods for accessing tweet data

228

}

229

230

interface Authorization {

231

// Twitter4J authentication interface

232

}

233

234

class OAuthAuthorization implements Authorization {

235

// OAuth implementation for Twitter API access

236

}

237

238

// From Spark Streaming

239

class StreamingContext

240

class JavaStreamingContext

241

242

abstract class ReceiverInputDStream[T] extends InputDStream[T]

243

abstract class JavaReceiverInputDStream[T] extends JavaInputDStream[T]

244

245

// From Spark Core

246

class StorageLevel {

247

// Predefined storage levels

248

MEMORY_ONLY: StorageLevel

249

MEMORY_AND_DISK: StorageLevel

250

MEMORY_AND_DISK_SER: StorageLevel

251

MEMORY_AND_DISK_SER_2: StorageLevel // Default for Twitter streams

252

// ... more storage level options

253

}

254

```

255

256

## Stream Behavior

257

258

### Filtering vs Sampling

259

- **Filtered streams**: When filters are provided, uses Twitter's filter API to receive tweets matching the specified keywords

260

- **Sample streams**: When no filters are provided, uses Twitter's sample API to receive a random sample of all public tweets

261

262

### Error Handling

263

- Streams automatically restart on connection errors or exceptions

264

- Failed connections trigger exponential backoff retry logic

265

- Stream state is preserved across restarts when possible

266

267

### Data Format

268

All streams return Twitter4J `Status` objects containing:

269

- Tweet text, user information, timestamps

270

- Engagement metrics (retweets, likes)

271

- Metadata (language, source, geo-location if available)

272

- Reply and mention relationships

273

274

### Performance Considerations

275

- Default storage level `MEMORY_AND_DISK_SER_2` provides fault tolerance with serialization

276

- Higher replication factor (\_2) ensures data availability during node failures

277

- For high-volume streams, consider using `MEMORY_ONLY` storage level for better performance

278

- Tweet processing should be efficient to avoid backpressure issues