or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

core-auth.mddirect-messages.mdfavorites.mdindex.mdlists.mdplaces.mdsearch.mdstreaming.mdtimelines.mdtweets.mdusers.md

streaming.mddocs/

0

# Real-time Streaming

1

2

Real-time Twitter data streaming with customizable filters and listeners.

3

4

## Core Streaming Interface

5

6

### TwitterStream

7

8

Main interface for accessing Twitter's Streaming API.

9

10

```java { .api }

11

interface TwitterStream {

12

/**

13

* Add listener for stream events

14

* @param listener Stream event listener

15

*/

16

void addListener(StreamListener listener);

17

18

/**

19

* Start filtered stream with query parameters

20

* @param query Filter query with keywords, users, locations

21

*/

22

void filter(FilterQuery query);

23

24

/**

25

* Start sample stream (random 1% of public tweets)

26

*/

27

void sample();

28

29

/**

30

* Start sample stream with language filter

31

* @param language Language code (e.g., "en", "es")

32

*/

33

void sample(String language);

34

35

/**

36

* Start firehose stream (approved parties only)

37

* @param count Number of tweets to retrieve

38

*/

39

TwitterStream firehose(int count);

40

41

/**

42

* Start retweet stream

43

*/

44

TwitterStream retweet();

45

46

/**

47

* Clean up resources and close connections

48

*/

49

void cleanUp();

50

51

/**

52

* Shutdown the stream completely

53

*/

54

void shutdown();

55

}

56

```

57

58

**Usage Examples:**

59

60

```java

61

TwitterV1 v1 = twitter.v1();

62

TwitterStream stream = v1.stream();

63

64

// Add status listener

65

stream.addListener(new StatusListener() {

66

@Override

67

public void onStatus(Status status) {

68

System.out.println("@" + status.getUser().getScreenName() + ": " + status.getText());

69

}

70

71

@Override

72

public void onException(Exception ex) {

73

ex.printStackTrace();

74

}

75

});

76

77

// Start sample stream

78

stream.sample();

79

80

// Or start filtered stream

81

FilterQuery filter = FilterQuery.ofTrack("Twitter", "API");

82

stream.filter(filter);

83

84

// Stop streaming after some time

85

Thread.sleep(60000); // Stream for 1 minute

86

stream.cleanUp();

87

```

88

89

## Stream Listeners

90

91

### StatusListener

92

93

Primary listener interface for tweet stream events.

94

95

```java { .api }

96

interface StatusListener extends StreamListener {

97

/**

98

* Called when a new tweet arrives

99

* @param status Tweet status object

100

*/

101

void onStatus(Status status);

102

103

/**

104

* Called when a tweet deletion notice is received

105

* @param statusDeletionNotice Deletion notice details

106

*/

107

void onDeletionNotice(StatusDeletionNotice statusDeletionNotice);

108

109

/**

110

* Called when track limitation notice is received

111

* @param numberOfLimitedStatuses Number of tweets limited

112

*/

113

void onTrackLimitationNotice(int numberOfLimitedStatuses);

114

115

/**

116

* Called when location deletion notice is received

117

* @param userId User ID

118

* @param upToStatusId Status ID up to which location data should be scrubbed

119

*/

120

void onScrubGeo(long userId, long upToStatusId);

121

122

/**

123

* Called when stall warning is received

124

* @param warning Stall warning details

125

*/

126

void onStallWarning(StallWarning warning);

127

128

/**

129

* Called when stream encounters an exception

130

* @param ex Exception that occurred

131

*/

132

void onException(Exception ex);

133

}

134

```

135

136

### StatusAdapter

137

138

Convenience class with empty implementations of all StatusListener methods.

139

140

```java { .api }

141

class StatusAdapter implements StatusListener {

142

/**

143

* Override only the methods you need

144

*/

145

@Override

146

public void onStatus(Status status) {

147

// Empty implementation - override as needed

148

}

149

150

@Override

151

public void onDeletionNotice(StatusDeletionNotice statusDeletionNotice) {

152

// Empty implementation

153

}

154

155

@Override

156

public void onTrackLimitationNotice(int numberOfLimitedStatuses) {

157

// Empty implementation

158

}

159

160

@Override

161

public void onScrubGeo(long userId, long upToStatusId) {

162

// Empty implementation

163

}

164

165

@Override

166

public void onStallWarning(StallWarning warning) {

167

// Empty implementation

168

}

169

170

@Override

171

public void onException(Exception ex) {

172

// Empty implementation

173

}

174

}

175

```

176

177

**Usage Example:**

178

179

```java

180

stream.addListener(new StatusAdapter() {

181

@Override

182

public void onStatus(Status status) {

183

// Only handle new tweets, ignore other events

184

processTweet(status);

185

}

186

187

@Override

188

public void onException(Exception ex) {

189

System.err.println("Stream error: " + ex.getMessage());

190

}

191

});

192

```

193

194

### RawStreamListener

195

196

Listen to raw JSON data from the stream.

197

198

```java { .api }

199

interface RawStreamListener extends StreamListener {

200

/**

201

* Called when raw JSON data is received

202

* @param rawJSON Raw JSON string from Twitter

203

*/

204

void onMessage(String rawJSON);

205

206

/**

207

* Called when stream encounters an exception

208

* @param ex Exception that occurred

209

*/

210

void onException(Exception ex);

211

}

212

```

213

214

## Stream Filtering

215

216

### FilterQuery

217

218

Configure streaming filters for keywords, users, and locations.

219

220

```java { .api }

221

class FilterQuery {

222

/**

223

* Create filter for specific user IDs

224

* @param follow Array of user IDs to follow

225

* @return FilterQuery with user filter

226

*/

227

static FilterQuery ofFollow(long... follow);

228

229

/**

230

* Create filter for keywords and hashtags

231

* @param track Array of keywords to track

232

* @return FilterQuery with keyword filter

233

*/

234

static FilterQuery ofTrack(String... track);

235

236

/**

237

* Add user IDs to follow

238

* @param follow User IDs to follow

239

* @return FilterQuery with additional user filters

240

*/

241

FilterQuery follow(long... follow);

242

243

/**

244

* Add keywords to track

245

* @param track Keywords to track

246

* @return FilterQuery with additional keyword filters

247

*/

248

FilterQuery track(String... track);

249

250

/**

251

* Add location filters

252

* @param locations Array of bounding boxes [[lon1,lat1,lon2,lat2]...]

253

* @return FilterQuery with location filters

254

*/

255

FilterQuery locations(double[][] locations);

256

257

/**

258

* Add language filters

259

* @param language Language codes to filter by

260

* @return FilterQuery with language filters

261

*/

262

FilterQuery language(String... language);

263

264

/**

265

* Set filter level for content

266

* @param filterLevel Filter level (none, low, medium, high)

267

* @return FilterQuery with filter level

268

*/

269

FilterQuery filterLevel(FilterLevel filterLevel);

270

271

/**

272

* Set count for backfill

273

* @param count Number of tweets for backfill

274

* @return FilterQuery with count setting

275

*/

276

FilterQuery count(int count);

277

278

/**

279

* Filter level enumeration

280

*/

281

enum FilterLevel {

282

/** No filtering */

283

none,

284

/** Low level filtering */

285

low,

286

/** Medium level filtering */

287

medium,

288

/** High level filtering */

289

high

290

}

291

}

292

```

293

294

**Filter Examples:**

295

296

```java

297

// Track specific keywords

298

FilterQuery keywordFilter = FilterQuery.ofTrack("Twitter", "API", "#programming");

299

stream.filter(keywordFilter);

300

301

// Follow specific users

302

FilterQuery userFilter = FilterQuery.ofFollow(783214L, 17874544L, 95731L);

303

stream.filter(userFilter);

304

305

// Geographic filtering (San Francisco Bay Area)

306

double[][] sanFranciscoBBox = {{-122.75, 36.8, -121.75, 37.8}};

307

FilterQuery geoFilter = FilterQuery.ofTrack("earthquake")

308

.locations(sanFranciscoBBox);

309

stream.filter(geoFilter);

310

311

// Complex filter combining multiple criteria

312

FilterQuery complexFilter = FilterQuery.ofTrack("java", "programming")

313

.follow(12345L, 67890L)

314

.language("en", "es")

315

.filterLevel(FilterQuery.FilterLevel.low);

316

stream.filter(complexFilter);

317

```

318

319

## Stream Event Handling

320

321

### Connection Lifecycle

322

323

Monitor stream connection status.

324

325

```java { .api }

326

interface ConnectionLifeCycleListener {

327

/**

328

* Called when connection is established

329

*/

330

void onConnect();

331

332

/**

333

* Called when connection is lost

334

*/

335

void onDisconnect();

336

337

/**

338

* Called when attempting to reconnect

339

*/

340

void onReconnect();

341

}

342

```

343

344

### Stream Data Models

345

346

```java { .api }

347

interface StallWarning {

348

/**

349

* Warning message

350

*/

351

String getMessage();

352

353

/**

354

* Percentage of tweets being delivered

355

*/

356

int getPercentFull();

357

}

358

359

interface StatusDeletionNotice {

360

/**

361

* ID of deleted status

362

*/

363

long getStatusId();

364

365

/**

366

* User ID who deleted the status

367

*/

368

long getUserId();

369

}

370

```

371

372

## Advanced Streaming Patterns

373

374

### Real-time Analytics

375

376

```java

377

public class StreamAnalytics {

378

private final AtomicLong tweetCount = new AtomicLong(0);

379

private final Map<String, AtomicLong> hashtagCounts = new ConcurrentHashMap<>();

380

private final Map<String, AtomicLong> languageCounts = new ConcurrentHashMap<>();

381

382

public StatusListener createAnalyticsListener() {

383

return new StatusAdapter() {

384

@Override

385

public void onStatus(Status status) {

386

tweetCount.incrementAndGet();

387

388

// Count hashtags

389

for (HashtagEntity hashtag : status.getHashtagEntities()) {

390

hashtagCounts.computeIfAbsent(hashtag.getText().toLowerCase(),

391

k -> new AtomicLong(0)).incrementAndGet();

392

}

393

394

// Count languages

395

String lang = status.getLang();

396

if (lang != null) {

397

languageCounts.computeIfAbsent(lang,

398

k -> new AtomicLong(0)).incrementAndGet();

399

}

400

401

// Print stats every 1000 tweets

402

if (tweetCount.get() % 1000 == 0) {

403

printStats();

404

}

405

}

406

};

407

}

408

409

private void printStats() {

410

System.out.println("\n=== Stream Statistics ===");

411

System.out.println("Total tweets: " + tweetCount.get());

412

413

System.out.println("\nTop hashtags:");

414

hashtagCounts.entrySet().stream()

415

.sorted(Map.Entry.<String, AtomicLong>comparingByValue(

416

(a, b) -> Long.compare(b.get(), a.get())))

417

.limit(5)

418

.forEach(entry -> System.out.println(" #" + entry.getKey() + ": " + entry.getValue().get()));

419

420

System.out.println("\nLanguage distribution:");

421

languageCounts.entrySet().stream()

422

.sorted(Map.Entry.<String, AtomicLong>comparingByValue(

423

(a, b) -> Long.compare(b.get(), a.get())))

424

.limit(5)

425

.forEach(entry -> System.out.println(" " + entry.getKey() + ": " + entry.getValue().get()));

426

}

427

}

428

```

429

430

### Stream Persistence

431

432

```java

433

public class StreamPersistence {

434

private final TwitterV1 v1;

435

private final PrintWriter logWriter;

436

437

public StreamPersistence(TwitterV1 v1, String logFile) throws IOException {

438

this.v1 = v1;

439

this.logWriter = new PrintWriter(new FileWriter(logFile, true));

440

}

441

442

public StatusListener createPersistenceListener() {

443

return new StatusAdapter() {

444

@Override

445

public void onStatus(Status status) {

446

// Log to file

447

String logEntry = String.join("\t",

448

status.getCreatedAt().toString(),

449

String.valueOf(status.getId()),

450

status.getUser().getScreenName(),

451

status.getText().replace("\n", " ").replace("\t", " "),

452

String.valueOf(status.getRetweetCount()),

453

String.valueOf(status.getFavoriteCount())

454

);

455

456

synchronized (logWriter) {

457

logWriter.println(logEntry);

458

logWriter.flush();

459

}

460

461

// Store in database (example)

462

storeInDatabase(status);

463

}

464

465

@Override

466

public void onException(Exception ex) {

467

System.err.println("Stream error: " + ex.getMessage());

468

}

469

};

470

}

471

472

private void storeInDatabase(Status status) {

473

// Database storage implementation

474

// Could use JDBC, JPA, or NoSQL database

475

}

476

477

public void close() throws IOException {

478

logWriter.close();

479

}

480

}

481

```

482

483

### Stream Monitoring and Recovery

484

485

```java

486

public class StreamMonitor {

487

private final TwitterStream stream;

488

private final AtomicLong lastTweetTime = new AtomicLong(System.currentTimeMillis());

489

private final ScheduledExecutorService monitor = Executors.newScheduledThreadPool(1);

490

private volatile boolean isConnected = false;

491

492

public StreamMonitor(TwitterStream stream) {

493

this.stream = stream;

494

setupMonitoring();

495

}

496

497

private void setupMonitoring() {

498

// Add connection lifecycle listener

499

stream.addListener(new ConnectionLifeCycleListener() {

500

@Override

501

public void onConnect() {

502

System.out.println("Stream connected");

503

isConnected = true;

504

}

505

506

@Override

507

public void onDisconnect() {

508

System.out.println("Stream disconnected");

509

isConnected = false;

510

}

511

512

@Override

513

public void onReconnect() {

514

System.out.println("Stream reconnecting");

515

}

516

});

517

518

// Add status listener to track activity

519

stream.addListener(new StatusAdapter() {

520

@Override

521

public void onStatus(Status status) {

522

lastTweetTime.set(System.currentTimeMillis());

523

}

524

});

525

526

// Monitor for stalls

527

monitor.scheduleAtFixedRate(this::checkStreamHealth, 60, 60, TimeUnit.SECONDS);

528

}

529

530

private void checkStreamHealth() {

531

long timeSinceLastTweet = System.currentTimeMillis() - lastTweetTime.get();

532

533

if (timeSinceLastTweet > 300000) { // 5 minutes

534

System.out.println("Stream appears stalled, attempting reconnection");

535

restartStream();

536

}

537

538

if (!isConnected) {

539

System.out.println("Stream disconnected, attempting reconnection");

540

restartStream();

541

}

542

}

543

544

private void restartStream() {

545

try {

546

stream.cleanUp();

547

Thread.sleep(5000); // Wait before reconnecting

548

// Restart with same filter (implementation dependent)

549

} catch (InterruptedException e) {

550

Thread.currentThread().interrupt();

551

}

552

}

553

554

public void shutdown() {

555

monitor.shutdown();

556

stream.shutdown();

557

}

558

}

559

```

560

561

## Error Handling and Best Practices

562

563

### Stream Error Recovery

564

565

```java

566

public class RobustStreamListener extends StatusAdapter {

567

private static final int MAX_RETRIES = 3;

568

private int retryCount = 0;

569

570

@Override

571

public void onStatus(Status status) {

572

try {

573

processStatus(status);

574

retryCount = 0; // Reset on successful processing

575

} catch (Exception e) {

576

System.err.println("Error processing status: " + e.getMessage());

577

// Continue processing other tweets

578

}

579

}

580

581

@Override

582

public void onException(Exception ex) {

583

System.err.println("Stream exception: " + ex.getMessage());

584

585

if (retryCount < MAX_RETRIES) {

586

retryCount++;

587

System.out.println("Retrying... (" + retryCount + "/" + MAX_RETRIES + ")");

588

589

try {

590

Thread.sleep(5000 * retryCount); // Exponential backoff

591

// Stream will automatically reconnect

592

} catch (InterruptedException e) {

593

Thread.currentThread().interrupt();

594

}

595

} else {

596

System.err.println("Max retries exceeded, stopping stream");

597

// Handle permanent failure

598

}

599

}

600

601

@Override

602

public void onStallWarning(StallWarning warning) {

603

System.out.println("Stall warning: " + warning.getMessage());

604

System.out.println("Stream is " + warning.getPercentFull() + "% full");

605

}

606

607

private void processStatus(Status status) {

608

// Your tweet processing logic here

609

}

610

}

611

```

612

613

### Rate Limiting and Connection Limits

614

615

- Each app can have only one streaming connection at a time

616

- Follow limits: 5,000 user IDs maximum

617

- Track limits: 400 keywords maximum

618

- Location boxes: 25 maximum

619

- Streaming connections count against rate limits

620

- Use connection pooling and reconnection strategies

621

622

```java

623

// Example of respecting streaming limits

624

FilterQuery filter = FilterQuery.ofTrack("keyword1", "keyword2") // Max 400 keywords

625

.follow(userId1, userId2) // Max 5000 users

626

.locations(boundingBox1, boundingBox2); // Max 25 locations

627

628

if (filter.getTrackCount() > 400) {

629

throw new IllegalArgumentException("Too many track keywords");

630

}

631

632

stream.filter(filter);

633

```