0
# Apache Flink Twitter Connector
1
2
Apache Flink Twitter connector provides streaming data ingestion from the Twitter API. It implements a RichSourceFunction that emits tweets as strings, using Twitter's Hosebird Client (HBC) library for reliable streaming connections with OAuth1 authentication and rate limiting support.
3
4
## Package Information
5
6
- **Package Name**: flink-connector-twitter_2.12
7
- **Group ID**: org.apache.flink
8
- **Package Type**: Maven
9
- **Language**: Java
10
- **Installation**: Add to Maven dependencies:
11
12
```xml
13
<dependency>
14
<groupId>org.apache.flink</groupId>
15
<artifactId>flink-connector-twitter_2.12</artifactId>
16
<version>1.14.6</version>
17
</dependency>
18
```
19
20
## Core Imports
21
22
```java
23
import org.apache.flink.streaming.connectors.twitter.TwitterSource;
24
import org.apache.flink.streaming.connectors.twitter.TwitterSource.EndpointInitializer;
25
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
26
import org.apache.flink.configuration.Configuration;
27
import com.twitter.hbc.core.endpoint.StreamingEndpoint;
28
import java.util.Properties;
29
```
30
31
## Basic Usage
32
33
```java
34
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
35
import org.apache.flink.streaming.connectors.twitter.TwitterSource;
36
import java.util.Properties;
37
38
// Configure Twitter credentials
39
Properties props = new Properties();
40
props.setProperty(TwitterSource.CONSUMER_KEY, "your-consumer-key");
41
props.setProperty(TwitterSource.CONSUMER_SECRET, "your-consumer-secret");
42
props.setProperty(TwitterSource.TOKEN, "your-access-token");
43
props.setProperty(TwitterSource.TOKEN_SECRET, "your-access-token-secret");
44
45
// Create and configure the source
46
TwitterSource twitterSource = new TwitterSource(props);
47
48
// Use in Flink streaming job
49
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
50
env.addSource(twitterSource)
51
.print();
52
53
env.execute("Twitter Stream Job");
54
```
55
56
## Advanced Usage Examples
57
58
### Custom Endpoint with Error Handling
59
60
```java
61
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
62
import org.apache.flink.streaming.connectors.twitter.TwitterSource;
63
import com.twitter.hbc.core.endpoint.StatusesFilterEndpoint;
64
import java.util.Arrays;
65
import java.util.Properties;
66
67
try {
68
// Configure properties
69
Properties props = new Properties();
70
props.setProperty(TwitterSource.CONSUMER_KEY, "your-consumer-key");
71
props.setProperty(TwitterSource.CONSUMER_SECRET, "your-consumer-secret");
72
props.setProperty(TwitterSource.TOKEN, "your-access-token");
73
props.setProperty(TwitterSource.TOKEN_SECRET, "your-access-token-secret");
74
props.setProperty(TwitterSource.CLIENT_NAME, "my-flink-app");
75
props.setProperty(TwitterSource.CLIENT_BUFFER_SIZE, "100000");
76
77
// Create source with custom endpoint
78
TwitterSource twitterSource = new TwitterSource(props);
79
twitterSource.setCustomEndpointInitializer(() -> {
80
StatusesFilterEndpoint endpoint = new StatusesFilterEndpoint();
81
endpoint.trackTerms(Arrays.asList("apache", "flink", "bigdata"));
82
return endpoint;
83
});
84
85
// Setup streaming environment
86
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
87
env.enableCheckpointing(10000); // Enable checkpointing for fault tolerance
88
89
env.addSource(twitterSource)
90
.filter(tweet -> tweet.contains("apache"))
91
.print();
92
93
env.execute("Filtered Twitter Stream");
94
95
} catch (IllegalArgumentException e) {
96
System.err.println("Configuration error: " + e.getMessage());
97
} catch (Exception e) {
98
System.err.println("Execution error: " + e.getMessage());
99
}
100
```
101
102
### Resource Management Pattern
103
104
```java
105
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
106
import org.apache.flink.streaming.connectors.twitter.TwitterSource;
107
108
public class TwitterStreamingJob {
109
public static void main(String[] args) throws Exception {
110
Properties props = loadTwitterProperties(); // Your property loading logic
111
112
TwitterSource twitterSource = new TwitterSource(props);
113
114
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
115
116
// Configure proper resource management
117
env.enableCheckpointing(5000);
118
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 10000));
119
120
env.addSource(twitterSource)
121
.name("Twitter Source")
122
.setParallelism(1) // TwitterSource is not parallelizable
123
.map(tweet -> processTweet(tweet))
124
.print();
125
126
env.execute("Robust Twitter Stream");
127
}
128
129
private static String processTweet(String rawTweet) {
130
// Your tweet processing logic
131
return rawTweet;
132
}
133
}
134
```
135
136
## Capabilities
137
138
### Twitter Source Creation
139
140
Creates a Twitter streaming source with OAuth authentication and configurable properties. The TwitterSource extends RichSourceFunction<String> and implements Flink's SourceFunction interface.
141
142
```java { .api }
143
/**
144
* Creates a TwitterSource for streaming tweets from Twitter API
145
* @param properties Configuration properties containing OAuth credentials and optional settings
146
* @throws IllegalArgumentException if required properties are missing
147
*/
148
public class TwitterSource extends RichSourceFunction<String> {
149
public TwitterSource(Properties properties);
150
}
151
```
152
153
**Required Properties:**
154
- `twitter-source.consumerKey` - Twitter application consumer key
155
- `twitter-source.consumerSecret` - Twitter application consumer secret
156
- `twitter-source.token` - Twitter access token
157
- `twitter-source.tokenSecret` - Twitter access token secret
158
159
**Optional Properties:**
160
- `twitter-source.name` - Client name (default: "flink-twitter-source")
161
- `twitter-source.hosts` - Twitter API hosts (default: Constants.STREAM_HOST from HBC library)
162
- `twitter-source.bufferSize` - Buffer size for reading (default: "50000")
163
164
### Source Lifecycle Methods
165
166
Core lifecycle methods inherited from RichSourceFunction for managing the Twitter streaming connection.
167
168
```java { .api }
169
/**
170
* Initializes the source with configuration parameters
171
* @param parameters Configuration from Flink runtime
172
* @throws Exception if initialization fails
173
*/
174
public void open(Configuration parameters) throws Exception;
175
176
/**
177
* Main execution method that establishes Twitter connection and streams data
178
* @param ctx Source context for emitting collected tweets
179
* @throws Exception if connection or streaming fails
180
*/
181
public void run(SourceContext<String> ctx) throws Exception;
182
183
/**
184
* Closes the Twitter connection and releases resources
185
*/
186
public void close();
187
188
/**
189
* Cancels the source operation and stops streaming
190
*/
191
public void cancel();
192
```
193
194
### Custom Endpoint Configuration
195
196
Allows customization of the Twitter streaming endpoint beyond the default sample stream.
197
198
```java { .api }
199
/**
200
* Sets a custom endpoint initializer for the Twitter source
201
* @param initializer Custom endpoint initializer implementation
202
* @throws NullPointerException if initializer is null
203
*/
204
public void setCustomEndpointInitializer(EndpointInitializer initializer)
205
```
206
207
### Property Constants
208
209
Pre-defined property key constants for configuration.
210
211
```java { .api }
212
// Required property keys
213
public static final String CONSUMER_KEY = "twitter-source.consumerKey";
214
public static final String CONSUMER_SECRET = "twitter-source.consumerSecret";
215
public static final String TOKEN = "twitter-source.token";
216
public static final String TOKEN_SECRET = "twitter-source.tokenSecret";
217
218
// Optional property keys
219
public static final String CLIENT_NAME = "twitter-source.name";
220
public static final String CLIENT_HOSTS = "twitter-source.hosts";
221
public static final String CLIENT_BUFFER_SIZE = "twitter-source.bufferSize";
222
```
223
224
## Interfaces
225
226
### EndpointInitializer
227
228
Interface for creating custom Twitter streaming endpoints to override the default sample endpoint behavior.
229
230
```java { .api }
231
/**
232
* Interface for creating custom Twitter streaming endpoints
233
*/
234
public interface EndpointInitializer {
235
/**
236
* Creates and returns a Twitter streaming endpoint
237
* @return StreamingEndpoint instance for Twitter API connection
238
*/
239
StreamingEndpoint createEndpoint();
240
}
241
```
242
243
**Usage Example:**
244
245
```java
246
TwitterSource twitterSource = new TwitterSource(properties);
247
248
// Custom endpoint that filters tweets by keywords
249
twitterSource.setCustomEndpointInitializer(() -> {
250
StatusesFilterEndpoint endpoint = new StatusesFilterEndpoint();
251
endpoint.trackTerms(Arrays.asList("apache", "flink", "streaming"));
252
return endpoint;
253
});
254
```
255
256
## Type Definitions
257
258
### Core Flink Types
259
260
```java { .api }
261
/**
262
* Base source function interface from Flink
263
*/
264
public abstract class RichSourceFunction<T> implements SourceFunction<T> {
265
// Lifecycle methods implemented by TwitterSource
266
}
267
268
/**
269
* Context for emitting data from sources
270
*/
271
public interface SourceContext<T> {
272
void collect(T element);
273
// Additional methods for checkpointing and timestamps
274
}
275
276
/**
277
* Flink configuration object
278
*/
279
public class Configuration {
280
// Configuration parameters passed to open() method
281
}
282
```
283
284
### Twitter HBC Types
285
286
```java { .api }
287
/**
288
* Base interface for Twitter streaming endpoints from HBC library
289
*/
290
public interface StreamingEndpoint {
291
// Twitter endpoint configuration methods
292
}
293
294
/**
295
* Sample endpoint implementation (used by default)
296
*/
297
public class StatusesSampleEndpoint implements StreamingEndpoint {
298
public StatusesSampleEndpoint stallWarnings(boolean stallWarnings);
299
public StatusesSampleEndpoint delimited(boolean delimited);
300
}
301
302
/**
303
* Filter endpoint for tracking specific terms
304
*/
305
public class StatusesFilterEndpoint implements StreamingEndpoint {
306
public StatusesFilterEndpoint trackTerms(List<String> terms);
307
// Additional filtering methods
308
}
309
```
310
311
## Architecture Notes
312
313
- **Non-parallel Source**: TwitterSource is not parallelizable due to Twitter API connection limitations
314
- **Fault Tolerance**: Integrates with Flink's checkpointing mechanism for exactly-once processing
315
- **Authentication**: Uses OAuth1 protocol for secure Twitter API access
316
- **Rate Limiting**: Handles Twitter API rate limits automatically through HBC library
317
- **Data Format**: Emits raw tweet JSON strings for downstream processing
318
- **Dependencies**: Built on Twitter's Hosebird Client (HBC) library version 2.2.0
319
320
## Error Handling
321
322
The connector handles several types of errors:
323
324
- **Configuration Errors**: `IllegalArgumentException` thrown during constructor if required properties (`CONSUMER_KEY`, `CONSUMER_SECRET`, `TOKEN`, `TOKEN_SECRET`) are missing
325
- **Endpoint Initialization Errors**: `NullPointerException` thrown by `setCustomEndpointInitializer()` if initializer parameter is null
326
- **Connection and Runtime Errors**: Handled by the underlying HBC library and Flink's fault tolerance mechanisms
327
- **Authentication Errors**: OAuth authentication failures are handled by the HBC client during connection establishment
328
- **Stream Processing Errors**: `IOException` and `InterruptedException` can be thrown during stream processing and are propagated to Flink's error handling
329
330
**Exception Hierarchy:**
331
```java { .api }
332
// Constructor exceptions
333
public TwitterSource(Properties properties) throws IllegalArgumentException
334
335
// Lifecycle method exceptions
336
public void open(Configuration parameters) throws Exception
337
public void run(SourceContext<String> ctx) throws Exception
338
339
// Configuration method exceptions
340
public void setCustomEndpointInitializer(EndpointInitializer initializer) throws NullPointerException
341
```