0
# Real-time Streaming
1
2
Real-time Twitter data streaming with customizable filters and listeners.
3
4
## Core Streaming Interface
5
6
### TwitterStream
7
8
Main interface for accessing Twitter's Streaming API.
9
10
```java { .api }
11
interface TwitterStream {
12
/**
13
* Add listener for stream events
14
* @param listener Stream event listener
15
*/
16
void addListener(StreamListener listener);
17
18
/**
19
* Start filtered stream with query parameters
20
* @param query Filter query with keywords, users, locations
21
*/
22
void filter(FilterQuery query);
23
24
/**
25
* Start sample stream (random 1% of public tweets)
26
*/
27
void sample();
28
29
/**
30
* Start sample stream with language filter
31
* @param language Language code (e.g., "en", "es")
32
*/
33
void sample(String language);
34
35
/**
36
* Start firehose stream (approved parties only)
37
* @param count Number of tweets to retrieve
38
*/
39
TwitterStream firehose(int count);
40
41
/**
42
* Start retweet stream
43
*/
44
TwitterStream retweet();
45
46
/**
47
* Clean up resources and close connections
48
*/
49
void cleanUp();
50
51
/**
52
* Shutdown the stream completely
53
*/
54
void shutdown();
55
}
56
```
57
58
**Usage Examples:**
59
60
```java
61
TwitterV1 v1 = twitter.v1();
62
TwitterStream stream = v1.stream();
63
64
// Add status listener
65
stream.addListener(new StatusListener() {
66
@Override
67
public void onStatus(Status status) {
68
System.out.println("@" + status.getUser().getScreenName() + ": " + status.getText());
69
}
70
71
@Override
72
public void onException(Exception ex) {
73
ex.printStackTrace();
74
}
75
});
76
77
// Start sample stream
78
stream.sample();
79
80
// Or start filtered stream
81
FilterQuery filter = FilterQuery.ofTrack("Twitter", "API");
82
stream.filter(filter);
83
84
// Stop streaming after some time
85
Thread.sleep(60000); // Stream for 1 minute
86
stream.cleanUp();
87
```
88
89
## Stream Listeners
90
91
### StatusListener
92
93
Primary listener interface for tweet stream events.
94
95
```java { .api }
96
interface StatusListener extends StreamListener {
97
/**
98
* Called when a new tweet arrives
99
* @param status Tweet status object
100
*/
101
void onStatus(Status status);
102
103
/**
104
* Called when a tweet deletion notice is received
105
* @param statusDeletionNotice Deletion notice details
106
*/
107
void onDeletionNotice(StatusDeletionNotice statusDeletionNotice);
108
109
/**
110
* Called when track limitation notice is received
111
* @param numberOfLimitedStatuses Number of tweets limited
112
*/
113
void onTrackLimitationNotice(int numberOfLimitedStatuses);
114
115
/**
116
* Called when location deletion notice is received
117
* @param userId User ID
118
* @param upToStatusId Status ID up to which location data should be scrubbed
119
*/
120
void onScrubGeo(long userId, long upToStatusId);
121
122
/**
123
* Called when stall warning is received
124
* @param warning Stall warning details
125
*/
126
void onStallWarning(StallWarning warning);
127
128
/**
129
* Called when stream encounters an exception
130
* @param ex Exception that occurred
131
*/
132
void onException(Exception ex);
133
}
134
```
135
136
### StatusAdapter
137
138
Convenience class with empty implementations of all StatusListener methods.
139
140
```java { .api }
141
class StatusAdapter implements StatusListener {
142
/**
143
* Override only the methods you need
144
*/
145
@Override
146
public void onStatus(Status status) {
147
// Empty implementation - override as needed
148
}
149
150
@Override
151
public void onDeletionNotice(StatusDeletionNotice statusDeletionNotice) {
152
// Empty implementation
153
}
154
155
@Override
156
public void onTrackLimitationNotice(int numberOfLimitedStatuses) {
157
// Empty implementation
158
}
159
160
@Override
161
public void onScrubGeo(long userId, long upToStatusId) {
162
// Empty implementation
163
}
164
165
@Override
166
public void onStallWarning(StallWarning warning) {
167
// Empty implementation
168
}
169
170
@Override
171
public void onException(Exception ex) {
172
// Empty implementation
173
}
174
}
175
```
176
177
**Usage Example:**
178
179
```java
180
stream.addListener(new StatusAdapter() {
181
@Override
182
public void onStatus(Status status) {
183
// Only handle new tweets, ignore other events
184
processTweet(status);
185
}
186
187
@Override
188
public void onException(Exception ex) {
189
System.err.println("Stream error: " + ex.getMessage());
190
}
191
});
192
```
193
194
### RawStreamListener
195
196
Listen to raw JSON data from the stream.
197
198
```java { .api }
199
interface RawStreamListener extends StreamListener {
200
/**
201
* Called when raw JSON data is received
202
* @param rawJSON Raw JSON string from Twitter
203
*/
204
void onMessage(String rawJSON);
205
206
/**
207
* Called when stream encounters an exception
208
* @param ex Exception that occurred
209
*/
210
void onException(Exception ex);
211
}
212
```
213
214
## Stream Filtering
215
216
### FilterQuery
217
218
Configure streaming filters for keywords, users, and locations.
219
220
```java { .api }
221
class FilterQuery {
222
/**
223
* Create filter for specific user IDs
224
* @param follow Array of user IDs to follow
225
* @return FilterQuery with user filter
226
*/
227
static FilterQuery ofFollow(long... follow);
228
229
/**
230
* Create filter for keywords and hashtags
231
* @param track Array of keywords to track
232
* @return FilterQuery with keyword filter
233
*/
234
static FilterQuery ofTrack(String... track);
235
236
/**
237
* Add user IDs to follow
238
* @param follow User IDs to follow
239
* @return FilterQuery with additional user filters
240
*/
241
FilterQuery follow(long... follow);
242
243
/**
244
* Add keywords to track
245
* @param track Keywords to track
246
* @return FilterQuery with additional keyword filters
247
*/
248
FilterQuery track(String... track);
249
250
/**
251
* Add location filters
252
* @param locations Array of bounding boxes [[lon1,lat1,lon2,lat2]...]
253
* @return FilterQuery with location filters
254
*/
255
FilterQuery locations(double[][] locations);
256
257
/**
258
* Add language filters
259
* @param language Language codes to filter by
260
* @return FilterQuery with language filters
261
*/
262
FilterQuery language(String... language);
263
264
/**
265
* Set filter level for content
266
* @param filterLevel Filter level (none, low, medium, high)
267
* @return FilterQuery with filter level
268
*/
269
FilterQuery filterLevel(FilterLevel filterLevel);
270
271
/**
272
* Set count for backfill
273
* @param count Number of tweets for backfill
274
* @return FilterQuery with count setting
275
*/
276
FilterQuery count(int count);
277
278
/**
279
* Filter level enumeration
280
*/
281
enum FilterLevel {
282
/** No filtering */
283
none,
284
/** Low level filtering */
285
low,
286
/** Medium level filtering */
287
medium,
288
/** High level filtering */
289
high
290
}
291
}
292
```
293
294
**Filter Examples:**
295
296
```java
297
// Track specific keywords
298
FilterQuery keywordFilter = FilterQuery.ofTrack("Twitter", "API", "#programming");
299
stream.filter(keywordFilter);
300
301
// Follow specific users
302
FilterQuery userFilter = FilterQuery.ofFollow(783214L, 17874544L, 95731L);
303
stream.filter(userFilter);
304
305
// Geographic filtering (San Francisco Bay Area)
306
double[][] sanFranciscoBBox = {{-122.75, 36.8, -121.75, 37.8}};
307
FilterQuery geoFilter = FilterQuery.ofTrack("earthquake")
308
.locations(sanFranciscoBBox);
309
stream.filter(geoFilter);
310
311
// Complex filter combining multiple criteria
312
FilterQuery complexFilter = FilterQuery.ofTrack("java", "programming")
313
.follow(12345L, 67890L)
314
.language("en", "es")
315
.filterLevel(FilterQuery.FilterLevel.low);
316
stream.filter(complexFilter);
317
```
318
319
## Stream Event Handling
320
321
### Connection Lifecycle
322
323
Monitor stream connection status.
324
325
```java { .api }
326
interface ConnectionLifeCycleListener {
327
/**
328
* Called when connection is established
329
*/
330
void onConnect();
331
332
/**
333
* Called when connection is lost
334
*/
335
void onDisconnect();
336
337
/**
338
* Called when attempting to reconnect
339
*/
340
void onReconnect();
341
}
342
```
343
344
### Stream Data Models
345
346
```java { .api }
347
interface StallWarning {
348
/**
349
* Warning message
350
*/
351
String getMessage();
352
353
/**
354
* Percentage of tweets being delivered
355
*/
356
int getPercentFull();
357
}
358
359
interface StatusDeletionNotice {
360
/**
361
* ID of deleted status
362
*/
363
long getStatusId();
364
365
/**
366
* User ID who deleted the status
367
*/
368
long getUserId();
369
}
370
```
371
372
## Advanced Streaming Patterns
373
374
### Real-time Analytics
375
376
```java
377
public class StreamAnalytics {
378
private final AtomicLong tweetCount = new AtomicLong(0);
379
private final Map<String, AtomicLong> hashtagCounts = new ConcurrentHashMap<>();
380
private final Map<String, AtomicLong> languageCounts = new ConcurrentHashMap<>();
381
382
public StatusListener createAnalyticsListener() {
383
return new StatusAdapter() {
384
@Override
385
public void onStatus(Status status) {
386
tweetCount.incrementAndGet();
387
388
// Count hashtags
389
for (HashtagEntity hashtag : status.getHashtagEntities()) {
390
hashtagCounts.computeIfAbsent(hashtag.getText().toLowerCase(),
391
k -> new AtomicLong(0)).incrementAndGet();
392
}
393
394
// Count languages
395
String lang = status.getLang();
396
if (lang != null) {
397
languageCounts.computeIfAbsent(lang,
398
k -> new AtomicLong(0)).incrementAndGet();
399
}
400
401
// Print stats every 1000 tweets
402
if (tweetCount.get() % 1000 == 0) {
403
printStats();
404
}
405
}
406
};
407
}
408
409
private void printStats() {
410
System.out.println("\n=== Stream Statistics ===");
411
System.out.println("Total tweets: " + tweetCount.get());
412
413
System.out.println("\nTop hashtags:");
414
hashtagCounts.entrySet().stream()
415
.sorted(Map.Entry.<String, AtomicLong>comparingByValue(
416
(a, b) -> Long.compare(b.get(), a.get())))
417
.limit(5)
418
.forEach(entry -> System.out.println(" #" + entry.getKey() + ": " + entry.getValue().get()));
419
420
System.out.println("\nLanguage distribution:");
421
languageCounts.entrySet().stream()
422
.sorted(Map.Entry.<String, AtomicLong>comparingByValue(
423
(a, b) -> Long.compare(b.get(), a.get())))
424
.limit(5)
425
.forEach(entry -> System.out.println(" " + entry.getKey() + ": " + entry.getValue().get()));
426
}
427
}
428
```
429
430
### Stream Persistence
431
432
```java
433
public class StreamPersistence {
434
private final TwitterV1 v1;
435
private final PrintWriter logWriter;
436
437
public StreamPersistence(TwitterV1 v1, String logFile) throws IOException {
438
this.v1 = v1;
439
this.logWriter = new PrintWriter(new FileWriter(logFile, true));
440
}
441
442
public StatusListener createPersistenceListener() {
443
return new StatusAdapter() {
444
@Override
445
public void onStatus(Status status) {
446
// Log to file
447
String logEntry = String.join("\t",
448
status.getCreatedAt().toString(),
449
String.valueOf(status.getId()),
450
status.getUser().getScreenName(),
451
status.getText().replace("\n", " ").replace("\t", " "),
452
String.valueOf(status.getRetweetCount()),
453
String.valueOf(status.getFavoriteCount())
454
);
455
456
synchronized (logWriter) {
457
logWriter.println(logEntry);
458
logWriter.flush();
459
}
460
461
// Store in database (example)
462
storeInDatabase(status);
463
}
464
465
@Override
466
public void onException(Exception ex) {
467
System.err.println("Stream error: " + ex.getMessage());
468
}
469
};
470
}
471
472
private void storeInDatabase(Status status) {
473
// Database storage implementation
474
// Could use JDBC, JPA, or NoSQL database
475
}
476
477
public void close() throws IOException {
478
logWriter.close();
479
}
480
}
481
```
482
483
### Stream Monitoring and Recovery
484
485
```java
486
public class StreamMonitor {
487
private final TwitterStream stream;
488
private final AtomicLong lastTweetTime = new AtomicLong(System.currentTimeMillis());
489
private final ScheduledExecutorService monitor = Executors.newScheduledThreadPool(1);
490
private volatile boolean isConnected = false;
491
492
public StreamMonitor(TwitterStream stream) {
493
this.stream = stream;
494
setupMonitoring();
495
}
496
497
private void setupMonitoring() {
498
// Add connection lifecycle listener
499
stream.addListener(new ConnectionLifeCycleListener() {
500
@Override
501
public void onConnect() {
502
System.out.println("Stream connected");
503
isConnected = true;
504
}
505
506
@Override
507
public void onDisconnect() {
508
System.out.println("Stream disconnected");
509
isConnected = false;
510
}
511
512
@Override
513
public void onReconnect() {
514
System.out.println("Stream reconnecting");
515
}
516
});
517
518
// Add status listener to track activity
519
stream.addListener(new StatusAdapter() {
520
@Override
521
public void onStatus(Status status) {
522
lastTweetTime.set(System.currentTimeMillis());
523
}
524
});
525
526
// Monitor for stalls
527
monitor.scheduleAtFixedRate(this::checkStreamHealth, 60, 60, TimeUnit.SECONDS);
528
}
529
530
private void checkStreamHealth() {
531
long timeSinceLastTweet = System.currentTimeMillis() - lastTweetTime.get();
532
533
if (timeSinceLastTweet > 300000) { // 5 minutes
534
System.out.println("Stream appears stalled, attempting reconnection");
535
restartStream();
536
}
537
538
if (!isConnected) {
539
System.out.println("Stream disconnected, attempting reconnection");
540
restartStream();
541
}
542
}
543
544
private void restartStream() {
545
try {
546
stream.cleanUp();
547
Thread.sleep(5000); // Wait before reconnecting
548
// Restart with same filter (implementation dependent)
549
} catch (InterruptedException e) {
550
Thread.currentThread().interrupt();
551
}
552
}
553
554
public void shutdown() {
555
monitor.shutdown();
556
stream.shutdown();
557
}
558
}
559
```
560
561
## Error Handling and Best Practices
562
563
### Stream Error Recovery
564
565
```java
566
public class RobustStreamListener extends StatusAdapter {
567
private static final int MAX_RETRIES = 3;
568
private int retryCount = 0;
569
570
@Override
571
public void onStatus(Status status) {
572
try {
573
processStatus(status);
574
retryCount = 0; // Reset on successful processing
575
} catch (Exception e) {
576
System.err.println("Error processing status: " + e.getMessage());
577
// Continue processing other tweets
578
}
579
}
580
581
@Override
582
public void onException(Exception ex) {
583
System.err.println("Stream exception: " + ex.getMessage());
584
585
if (retryCount < MAX_RETRIES) {
586
retryCount++;
587
System.out.println("Retrying... (" + retryCount + "/" + MAX_RETRIES + ")");
588
589
try {
590
Thread.sleep(5000 * retryCount); // Exponential backoff
591
// Stream will automatically reconnect
592
} catch (InterruptedException e) {
593
Thread.currentThread().interrupt();
594
}
595
} else {
596
System.err.println("Max retries exceeded, stopping stream");
597
// Handle permanent failure
598
}
599
}
600
601
@Override
602
public void onStallWarning(StallWarning warning) {
603
System.out.println("Stall warning: " + warning.getMessage());
604
System.out.println("Stream is " + warning.getPercentFull() + "% full");
605
}
606
607
private void processStatus(Status status) {
608
// Your tweet processing logic here
609
}
610
}
611
```
612
613
### Rate Limiting and Connection Limits
614
615
- Each app can have only one streaming connection at a time
616
- Follow limits: 5,000 user IDs maximum
617
- Track limits: 400 keywords maximum
618
- Location boxes: 25 maximum
619
- Streaming connections count against rate limits
620
- Use connection pooling and reconnection strategies
621
622
```java
623
// Example of respecting streaming limits
624
FilterQuery filter = FilterQuery.ofTrack("keyword1", "keyword2") // Max 400 keywords
625
.follow(userId1, userId2) // Max 5000 users
626
.locations(boundingBox1, boundingBox2); // Max 25 locations
627
628
if (filter.getTrackCount() > 400) {
629
throw new IllegalArgumentException("Too many track keywords");
630
}
631
632
stream.filter(filter);
633
```