or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

async-interface.mdauthentication.mdclient-v2.mddata-models.mdindex.mdlegacy-api.mdstreaming.mdutilities.md

streaming.mddocs/

0

# Streaming

1

2

Tweepy's StreamingClient provides real-time access to Twitter data through the Twitter API v2 streaming endpoints. It supports both filtered streaming (based on rules) and sample streaming (1% random sample) with customizable event handlers and automatic reconnection.

3

4

## Capabilities

5

6

### StreamingClient Initialization

7

8

Create a StreamingClient instance with authentication and configuration options.

9

10

```python { .api }

11

class StreamingClient:

12

def __init__(self, bearer_token, *, chunk_size=512, daemon=False,

13

max_retries=float('inf'), proxy=None, return_type=Response,

14

verify=True, wait_on_rate_limit=False):

15

"""

16

Initialize Twitter API v2 streaming client.

17

18

Parameters:

19

- bearer_token (str): Bearer token for authentication

20

- chunk_size (int): Size of data chunks to read (default: 512)

21

- daemon (bool): Run as daemon thread (default: False)

22

- max_retries (int): Maximum reconnection attempts (default: infinite)

23

- proxy (str, optional): Proxy server URL

24

- return_type (type): Response container type (default: Response)

25

- verify (bool): Verify SSL certificates (default: True)

26

- wait_on_rate_limit (bool): Wait when rate limit hit (default: False)

27

"""

28

```

29

30

### Stream Control

31

32

Start and stop streaming connections with different stream types.

33

34

```python { .api }

35

def filter(self, *, backfill_minutes=None, expansions=None, media_fields=None,

36

place_fields=None, poll_fields=None, space_fields=None,

37

threaded=False, tweet_fields=None, user_fields=None):

38

"""

39

Start filtered stream based on added rules.

40

41

Parameters:

42

- threaded (bool): Run stream in separate thread (default: False)

43

- backfill_minutes (int, optional): Backfill tweets from last N minutes (5 max)

44

- expansions (list, optional): Additional data to include

45

- tweet_fields (list, optional): Tweet fields to include

46

- user_fields (list, optional): User fields to include

47

- media_fields (list, optional): Media fields to include

48

- poll_fields (list, optional): Poll fields to include

49

- place_fields (list, optional): Place fields to include

50

- space_fields (list, optional): Space fields to include

51

52

Returns:

53

None (blocking call unless threaded=True)

54

"""

55

56

def sample(self, *, backfill_minutes=None, expansions=None, media_fields=None,

57

place_fields=None, poll_fields=None, space_fields=None,

58

threaded=False, tweet_fields=None, user_fields=None):

59

"""

60

Start sample stream (1% random sample of all tweets).

61

62

Parameters: Same as filter()

63

64

Returns:

65

None (blocking call unless threaded=True)

66

"""

67

68

def disconnect(self):

69

"""

70

Disconnect from the streaming endpoint.

71

72

Returns:

73

None

74

"""

75

```

76

77

### Rule Management

78

79

Manage filtering rules for the filtered stream.

80

81

```python { .api }

82

def add_rules(self, add, *, dry_run=None):

83

"""

84

Add filtering rules for the stream.

85

86

Parameters:

87

- add (list): List of StreamRule objects or rule dictionaries

88

- dry_run (bool, optional): Test rules without adding them

89

90

Returns:

91

Response with rule addition results

92

"""

93

94

def delete_rules(self, ids, *, dry_run=None):

95

"""

96

Delete filtering rules by their IDs.

97

98

Parameters:

99

- ids (list): List of rule IDs to delete

100

- dry_run (bool, optional): Test deletion without removing rules

101

102

Returns:

103

Response with rule deletion results

104

"""

105

106

def get_rules(self, *, ids=None):

107

"""

108

Get current filtering rules.

109

110

Parameters:

111

- ids (list, optional): Specific rule IDs to retrieve

112

113

Returns:

114

Response with current rules

115

"""

116

```

117

118

### Event Handlers

119

120

Override these methods to handle different streaming events.

121

122

```python { .api }

123

def on_connect(self):

124

"""

125

Called when stream successfully connects.

126

Override this method to handle connection events.

127

"""

128

129

def on_disconnect(self):

130

"""

131

Called when stream is disconnected.

132

Override this method to handle disconnection events.

133

"""

134

135

def on_tweet(self, tweet):

136

"""

137

Called when a tweet is received.

138

139

Parameters:

140

- tweet (Tweet): Tweet object with all requested fields

141

142

Override this method to process incoming tweets.

143

"""

144

145

def on_includes(self, includes):

146

"""

147

Called when includes data is received.

148

149

Parameters:

150

- includes (dict): Additional data (users, media, polls, places, spaces)

151

152

Override this method to process included data.

153

"""

154

155

def on_errors(self, errors):

156

"""

157

Called when errors are received in the stream.

158

159

Parameters:

160

- errors (list): List of error objects

161

162

Override this method to handle stream errors.

163

"""

164

165

def on_matching_rules(self, matching_rules):

166

"""

167

Called when matching rules information is received.

168

169

Parameters:

170

- matching_rules (list): List of rules that matched the tweet

171

172

Override this method to process rule matching information.

173

"""

174

175

def on_response(self, response):

176

"""

177

Called for each response from the stream.

178

179

Parameters:

180

- response (Response): Complete response object

181

182

Override this method for low-level response handling.

183

"""

184

185

def on_data(self, raw_data):

186

"""

187

Called with raw JSON data from the stream.

188

189

Parameters:

190

- raw_data (str): Raw JSON string

191

192

Override this method for custom data parsing.

193

"""

194

195

def on_keep_alive(self):

196

"""

197

Called when a keep-alive signal is received.

198

Override this method to handle keep-alive events.

199

"""

200

201

def on_connection_error(self):

202

"""

203

Called when a connection error occurs.

204

Override this method to handle connection errors.

205

"""

206

207

def on_request_error(self, status_code):

208

"""

209

Called when an HTTP error response is received.

210

211

Parameters:

212

- status_code (int): HTTP status code

213

214

Override this method to handle HTTP errors.

215

"""

216

217

def on_exception(self, exception):

218

"""

219

Called when an exception occurs during streaming.

220

221

Parameters:

222

- exception (Exception): Exception object

223

224

Override this method to handle exceptions.

225

"""

226

227

def on_closed(self, response):

228

"""

229

Called when the stream is closed by Twitter.

230

231

Parameters:

232

- response (requests.Response): Final response from Twitter

233

234

Override this method to handle stream closure.

235

"""

236

```

237

238

## Stream Rule Object

239

240

Rules define what tweets to receive in filtered streaming.

241

242

```python { .api }

243

class StreamRule:

244

def __init__(self, value=None, tag=None, id=None):

245

"""

246

Create a streaming rule.

247

248

Parameters:

249

- value (str): Rule filter expression

250

- tag (str, optional): Rule identifier tag

251

- id (str, optional): Rule ID (set by Twitter)

252

"""

253

254

# Attributes

255

value: str # Rule filter expression

256

tag: str # Optional rule tag

257

id: str # Rule ID (assigned by Twitter)

258

```

259

260

## Usage Examples

261

262

### Basic Filtered Streaming

263

264

```python

265

import tweepy

266

267

class MyStreamListener(tweepy.StreamingClient):

268

def on_tweet(self, tweet):

269

print(f"New tweet from @{tweet.author_id}: {tweet.text}")

270

271

def on_connect(self):

272

print("Connected to Twitter stream")

273

274

def on_disconnect(self):

275

print("Disconnected from Twitter stream")

276

277

# Initialize streaming client

278

stream = MyStreamListener(bearer_token="your_bearer_token")

279

280

# Add filtering rules

281

rules = [

282

tweepy.StreamRule("python programming -is:retweet lang:en", tag="python"),

283

tweepy.StreamRule("javascript OR nodejs", tag="js"),

284

]

285

stream.add_rules(rules)

286

287

# Start streaming (blocking call)

288

stream.filter()

289

```

290

291

### Sample Streaming

292

293

```python

294

import tweepy

295

296

class SampleStreamListener(tweepy.StreamingClient):

297

def __init__(self, bearer_token):

298

super().__init__(bearer_token)

299

self.tweet_count = 0

300

301

def on_tweet(self, tweet):

302

self.tweet_count += 1

303

print(f"Tweet #{self.tweet_count}: {tweet.text[:50]}...")

304

305

# Stop after 100 tweets

306

if self.tweet_count >= 100:

307

self.disconnect()

308

309

# Start sample stream

310

stream = SampleStreamListener(bearer_token="your_bearer_token")

311

stream.sample()

312

```

313

314

### Advanced Streaming with Full Data

315

316

```python

317

import tweepy

318

import json

319

320

class AdvancedStreamListener(tweepy.StreamingClient):

321

def on_tweet(self, tweet):

322

# Access full tweet data

323

print(f"Tweet ID: {tweet.id}")

324

print(f"Author: {tweet.author_id}")

325

print(f"Text: {tweet.text}")

326

print(f"Created: {tweet.created_at}")

327

328

# Access metrics if available

329

if hasattr(tweet, 'public_metrics'):

330

metrics = tweet.public_metrics

331

print(f"Likes: {metrics.get('like_count', 0)}")

332

print(f"Retweets: {metrics.get('retweet_count', 0)}")

333

334

def on_includes(self, includes):

335

# Process included data (users, media, etc.)

336

if 'users' in includes:

337

for user in includes['users']:

338

print(f"User: {user.username} ({user.name})")

339

340

def on_matching_rules(self, matching_rules):

341

# See which rules matched

342

for rule in matching_rules:

343

print(f"Matched rule: {rule['tag']} - {rule['value']}")

344

345

def on_errors(self, errors):

346

# Handle errors

347

for error in errors:

348

print(f"Stream error: {error}")

349

350

# Initialize with expanded data

351

stream = AdvancedStreamListener(bearer_token="your_bearer_token")

352

353

# Add rules

354

rules = [

355

tweepy.StreamRule("(AI OR \"artificial intelligence\") lang:en -is:retweet", tag="ai_tweets")

356

]

357

stream.add_rules(rules)

358

359

# Start with expanded fields

360

stream.filter(

361

expansions=["author_id", "attachments.media_keys"],

362

tweet_fields=["created_at", "public_metrics", "context_annotations"],

363

user_fields=["name", "username", "verified", "public_metrics"]

364

)

365

```

366

367

### Threaded Streaming

368

369

```python

370

import tweepy

371

import time

372

373

class ThreadedStreamListener(tweepy.StreamingClient):

374

def on_tweet(self, tweet):

375

print(f"Background tweet: {tweet.text[:30]}...")

376

377

# Start streaming in background thread

378

stream = ThreadedStreamListener(bearer_token="your_bearer_token")

379

380

# Add rules

381

rules = [tweepy.StreamRule("music", tag="music_tweets")]

382

stream.add_rules(rules)

383

384

# Start threaded stream

385

stream.filter(threaded=True)

386

387

# Continue with other work

388

for i in range(10):

389

print(f"Main thread working... {i}")

390

time.sleep(2)

391

392

# Stop streaming

393

stream.disconnect()

394

```

395

396

### Rule Management

397

398

```python

399

import tweepy

400

401

# Initialize client

402

stream = tweepy.StreamingClient(bearer_token="your_bearer_token")

403

404

# Get current rules

405

response = stream.get_rules()

406

if response.data:

407

print("Current rules:")

408

for rule in response.data:

409

print(f"- {rule.id}: {rule.value} (tag: {rule.tag})")

410

411

# Add new rules

412

new_rules = [

413

tweepy.StreamRule("cats OR dogs", tag="pets"),

414

tweepy.StreamRule("football OR soccer", tag="sports"),

415

]

416

result = stream.add_rules(new_rules)

417

print(f"Added {len(result.data)} rules")

418

419

# Delete specific rules

420

if response.data:

421

rule_ids = [rule.id for rule in response.data]

422

stream.delete_rules(rule_ids)

423

print(f"Deleted {len(rule_ids)} rules")

424

```

425

426

## Rule Syntax

427

428

Twitter streaming rules support a rich query syntax:

429

430

### Basic Operators

431

- `cats dogs` - AND (tweets containing both "cats" and "dogs")

432

- `cats OR dogs` - OR (tweets containing either "cats" or "dogs")

433

- `-cats` - NOT (tweets not containing "cats")

434

- `(cats OR dogs) -puppies` - Grouping with parentheses

435

436

### Field-specific Operators

437

- `from:username` - Tweets from specific user

438

- `to:username` - Tweets replying to specific user

439

- `@username` - Tweets mentioning specific user

440

- `#hashtag` - Tweets with specific hashtag

441

- `lang:en` - Tweets in specific language

442

- `is:retweet` - Only retweets

443

- `-is:retweet` - Exclude retweets

444

- `is:reply` - Only replies

445

- `has:images` - Tweets with images

446

- `has:videos` - Tweets with videos

447

- `has:links` - Tweets with URLs

448

449

### Advanced Features

450

- Exact phrase matching: `"exact phrase"`

451

- Emoji support: `🔥` or `:fire:`

452

- Keyword proximity: `"word1 word2"~10`

453

- Regular expressions (limited support)

454

455

## Error Handling and Reconnection

456

457

StreamingClient automatically handles reconnection with exponential backoff:

458

459

```python

460

class RobustStreamListener(tweepy.StreamingClient):

461

def on_connection_error(self):

462

print("Connection error - will retry automatically")

463

464

def on_request_error(self, status_code):

465

print(f"HTTP error {status_code}")

466

if status_code == 420: # Rate limit

467

print("Rate limited - backing off")

468

469

def on_exception(self, exception):

470

print(f"Exception occurred: {exception}")

471

472

def on_closed(self, response):

473

print(f"Stream closed by Twitter: {response.status_code}")

474

475

# Configure retry behavior

476

stream = RobustStreamListener(

477

bearer_token="your_bearer_token",

478

max_retries=10, # Limit retry attempts

479

wait_on_rate_limit=True # Wait when rate limited

480

)

481

```

482

483

## Performance Considerations

484

485

- **Chunk Size**: Adjust `chunk_size` parameter for optimal performance

486

- **Threading**: Use `threaded=True` for non-blocking streaming

487

- **Field Selection**: Request only needed fields to reduce bandwidth

488

- **Rule Efficiency**: Write efficient rules to match relevant tweets

489

- **Backfill**: Use `backfill_minutes` sparingly as it counts against rate limits