or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.md

index.mddocs/

0

# Apache Flink Twitter Connector

1

2

Apache Flink Twitter connector provides streaming data ingestion from the Twitter API. It implements a RichSourceFunction that emits tweets as strings, using Twitter's Hosebird Client (HBC) library for reliable streaming connections with OAuth1 authentication and rate limiting support.

3

4

## Package Information

5

6

- **Package Name**: flink-connector-twitter_2.12

7

- **Group ID**: org.apache.flink

8

- **Package Type**: Maven

9

- **Language**: Java

10

- **Installation**: Add to Maven dependencies:

11

12

```xml

13

<dependency>

14

<groupId>org.apache.flink</groupId>

15

<artifactId>flink-connector-twitter_2.12</artifactId>

16

<version>1.14.6</version>

17

</dependency>

18

```

19

20

## Core Imports

21

22

```java

23

import org.apache.flink.streaming.connectors.twitter.TwitterSource;

24

import org.apache.flink.streaming.connectors.twitter.TwitterSource.EndpointInitializer;

25

import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;

26

import org.apache.flink.configuration.Configuration;

27

import com.twitter.hbc.core.endpoint.StreamingEndpoint;

28

import java.util.Properties;

29

```

30

31

## Basic Usage

32

33

```java

34

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

35

import org.apache.flink.streaming.connectors.twitter.TwitterSource;

36

import java.util.Properties;

37

38

// Configure Twitter credentials

39

Properties props = new Properties();

40

props.setProperty(TwitterSource.CONSUMER_KEY, "your-consumer-key");

41

props.setProperty(TwitterSource.CONSUMER_SECRET, "your-consumer-secret");

42

props.setProperty(TwitterSource.TOKEN, "your-access-token");

43

props.setProperty(TwitterSource.TOKEN_SECRET, "your-access-token-secret");

44

45

// Create and configure the source

46

TwitterSource twitterSource = new TwitterSource(props);

47

48

// Use in Flink streaming job

49

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

50

env.addSource(twitterSource)

51

.print();

52

53

env.execute("Twitter Stream Job");

54

```

55

56

## Advanced Usage Examples

57

58

### Custom Endpoint with Error Handling

59

60

```java

61

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

62

import org.apache.flink.streaming.connectors.twitter.TwitterSource;

63

import com.twitter.hbc.core.endpoint.StatusesFilterEndpoint;

64

import java.util.Arrays;

65

import java.util.Properties;

66

67

try {

68

// Configure properties

69

Properties props = new Properties();

70

props.setProperty(TwitterSource.CONSUMER_KEY, "your-consumer-key");

71

props.setProperty(TwitterSource.CONSUMER_SECRET, "your-consumer-secret");

72

props.setProperty(TwitterSource.TOKEN, "your-access-token");

73

props.setProperty(TwitterSource.TOKEN_SECRET, "your-access-token-secret");

74

props.setProperty(TwitterSource.CLIENT_NAME, "my-flink-app");

75

props.setProperty(TwitterSource.CLIENT_BUFFER_SIZE, "100000");

76

77

// Create source with custom endpoint

78

TwitterSource twitterSource = new TwitterSource(props);

79

twitterSource.setCustomEndpointInitializer(() -> {

80

StatusesFilterEndpoint endpoint = new StatusesFilterEndpoint();

81

endpoint.trackTerms(Arrays.asList("apache", "flink", "bigdata"));

82

return endpoint;

83

});

84

85

// Setup streaming environment

86

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

87

env.enableCheckpointing(10000); // Enable checkpointing for fault tolerance

88

89

env.addSource(twitterSource)

90

.filter(tweet -> tweet.contains("apache"))

91

.print();

92

93

env.execute("Filtered Twitter Stream");

94

95

} catch (IllegalArgumentException e) {

96

System.err.println("Configuration error: " + e.getMessage());

97

} catch (Exception e) {

98

System.err.println("Execution error: " + e.getMessage());

99

}

100

```

101

102

### Resource Management Pattern

103

104

```java

105

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

106

import org.apache.flink.streaming.connectors.twitter.TwitterSource;

107

108

public class TwitterStreamingJob {

109

public static void main(String[] args) throws Exception {

110

Properties props = loadTwitterProperties(); // Your property loading logic

111

112

TwitterSource twitterSource = new TwitterSource(props);

113

114

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

115

116

// Configure proper resource management

117

env.enableCheckpointing(5000);

118

env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 10000));

119

120

env.addSource(twitterSource)

121

.name("Twitter Source")

122

.setParallelism(1) // TwitterSource is not parallelizable

123

.map(tweet -> processTweet(tweet))

124

.print();

125

126

env.execute("Robust Twitter Stream");

127

}

128

129

private static String processTweet(String rawTweet) {

130

// Your tweet processing logic

131

return rawTweet;

132

}

133

}

134

```

135

136

## Capabilities

137

138

### Twitter Source Creation

139

140

Creates a Twitter streaming source with OAuth authentication and configurable properties. The TwitterSource extends RichSourceFunction<String> and implements Flink's SourceFunction interface.

141

142

```java { .api }

143

/**

144

* Creates a TwitterSource for streaming tweets from Twitter API

145

* @param properties Configuration properties containing OAuth credentials and optional settings

146

* @throws IllegalArgumentException if required properties are missing

147

*/

148

public class TwitterSource extends RichSourceFunction<String> {

149

public TwitterSource(Properties properties);

150

}

151

```

152

153

**Required Properties:**

154

- `twitter-source.consumerKey` - Twitter application consumer key

155

- `twitter-source.consumerSecret` - Twitter application consumer secret

156

- `twitter-source.token` - Twitter access token

157

- `twitter-source.tokenSecret` - Twitter access token secret

158

159

**Optional Properties:**

160

- `twitter-source.name` - Client name (default: "flink-twitter-source")

161

- `twitter-source.hosts` - Twitter API hosts (default: Constants.STREAM_HOST from HBC library)

162

- `twitter-source.bufferSize` - Buffer size for reading (default: "50000")

163

164

### Source Lifecycle Methods

165

166

Core lifecycle methods inherited from RichSourceFunction for managing the Twitter streaming connection.

167

168

```java { .api }

169

/**

170

* Initializes the source with configuration parameters

171

* @param parameters Configuration from Flink runtime

172

* @throws Exception if initialization fails

173

*/

174

public void open(Configuration parameters) throws Exception;

175

176

/**

177

* Main execution method that establishes Twitter connection and streams data

178

* @param ctx Source context for emitting collected tweets

179

* @throws Exception if connection or streaming fails

180

*/

181

public void run(SourceContext<String> ctx) throws Exception;

182

183

/**

184

* Closes the Twitter connection and releases resources

185

*/

186

public void close();

187

188

/**

189

* Cancels the source operation and stops streaming

190

*/

191

public void cancel();

192

```

193

194

### Custom Endpoint Configuration

195

196

Allows customization of the Twitter streaming endpoint beyond the default sample stream.

197

198

```java { .api }

199

/**

200

* Sets a custom endpoint initializer for the Twitter source

201

* @param initializer Custom endpoint initializer implementation

202

* @throws NullPointerException if initializer is null

203

*/

204

public void setCustomEndpointInitializer(EndpointInitializer initializer)

205

```

206

207

### Property Constants

208

209

Pre-defined property key constants for configuration.

210

211

```java { .api }

212

// Required property keys

213

public static final String CONSUMER_KEY = "twitter-source.consumerKey";

214

public static final String CONSUMER_SECRET = "twitter-source.consumerSecret";

215

public static final String TOKEN = "twitter-source.token";

216

public static final String TOKEN_SECRET = "twitter-source.tokenSecret";

217

218

// Optional property keys

219

public static final String CLIENT_NAME = "twitter-source.name";

220

public static final String CLIENT_HOSTS = "twitter-source.hosts";

221

public static final String CLIENT_BUFFER_SIZE = "twitter-source.bufferSize";

222

```

223

224

## Interfaces

225

226

### EndpointInitializer

227

228

Interface for creating custom Twitter streaming endpoints to override the default sample endpoint behavior.

229

230

```java { .api }

231

/**

232

* Interface for creating custom Twitter streaming endpoints

233

*/

234

public interface EndpointInitializer {

235

/**

236

* Creates and returns a Twitter streaming endpoint

237

* @return StreamingEndpoint instance for Twitter API connection

238

*/

239

StreamingEndpoint createEndpoint();

240

}

241

```

242

243

**Usage Example:**

244

245

```java

246

TwitterSource twitterSource = new TwitterSource(properties);

247

248

// Custom endpoint that filters tweets by keywords

249

twitterSource.setCustomEndpointInitializer(() -> {

250

StatusesFilterEndpoint endpoint = new StatusesFilterEndpoint();

251

endpoint.trackTerms(Arrays.asList("apache", "flink", "streaming"));

252

return endpoint;

253

});

254

```

255

256

## Type Definitions

257

258

### Core Flink Types

259

260

```java { .api }

261

/**

262

* Base source function interface from Flink

263

*/

264

public abstract class RichSourceFunction<T> implements SourceFunction<T> {

265

// Lifecycle methods implemented by TwitterSource

266

}

267

268

/**

269

* Context for emitting data from sources

270

*/

271

public interface SourceContext<T> {

272

void collect(T element);

273

// Additional methods for checkpointing and timestamps

274

}

275

276

/**

277

* Flink configuration object

278

*/

279

public class Configuration {

280

// Configuration parameters passed to open() method

281

}

282

```

283

284

### Twitter HBC Types

285

286

```java { .api }

287

/**

288

* Base interface for Twitter streaming endpoints from HBC library

289

*/

290

public interface StreamingEndpoint {

291

// Twitter endpoint configuration methods

292

}

293

294

/**

295

* Sample endpoint implementation (used by default)

296

*/

297

public class StatusesSampleEndpoint implements StreamingEndpoint {

298

public StatusesSampleEndpoint stallWarnings(boolean stallWarnings);

299

public StatusesSampleEndpoint delimited(boolean delimited);

300

}

301

302

/**

303

* Filter endpoint for tracking specific terms

304

*/

305

public class StatusesFilterEndpoint implements StreamingEndpoint {

306

public StatusesFilterEndpoint trackTerms(List<String> terms);

307

// Additional filtering methods

308

}

309

```

310

311

## Architecture Notes

312

313

- **Non-parallel Source**: TwitterSource is not parallelizable due to Twitter API connection limitations

314

- **Fault Tolerance**: Integrates with Flink's checkpointing mechanism for exactly-once processing

315

- **Authentication**: Uses OAuth1 protocol for secure Twitter API access

316

- **Rate Limiting**: Handles Twitter API rate limits automatically through HBC library

317

- **Data Format**: Emits raw tweet JSON strings for downstream processing

318

- **Dependencies**: Built on Twitter's Hosebird Client (HBC) library version 2.2.0

319

320

## Error Handling

321

322

The connector handles several types of errors:

323

324

- **Configuration Errors**: `IllegalArgumentException` thrown during constructor if required properties (`CONSUMER_KEY`, `CONSUMER_SECRET`, `TOKEN`, `TOKEN_SECRET`) are missing

325

- **Endpoint Initialization Errors**: `NullPointerException` thrown by `setCustomEndpointInitializer()` if initializer parameter is null

326

- **Connection and Runtime Errors**: Handled by the underlying HBC library and Flink's fault tolerance mechanisms

327

- **Authentication Errors**: OAuth authentication failures are handled by the HBC client during connection establishment

328

- **Stream Processing Errors**: `IOException` and `InterruptedException` can be thrown during stream processing and are propagated to Flink's error handling

329

330

**Exception Hierarchy:**

331

```java { .api }

332

// Constructor exceptions

333

public TwitterSource(Properties properties) throws IllegalArgumentException

334

335

// Lifecycle method exceptions

336

public void open(Configuration parameters) throws Exception

337

public void run(SourceContext<String> ctx) throws Exception

338

339

// Configuration method exceptions

340

public void setCustomEndpointInitializer(EndpointInitializer initializer) throws NullPointerException

341

```