Apache Flink input format library for parsing Twitter JSON data into structured Tweet objects for stream processing applications
npx @tessl/cli install tessl/maven-org-apache-flink--flink-tweet-inputformat_2-11@1.3.00
# Flink Tweet Input Format
1
2
Apache Flink input format library for parsing Twitter JSON data into structured Tweet objects for stream processing applications. This library extends Flink's DelimitedInputFormat to handle Twitter API JSON data with comprehensive error handling, parsing capabilities, and complete Twitter data model support.
3
4
## Package Information
5
6
- **Package Name**: flink-tweet-inputformat_2.11
7
- **Package Type**: maven
8
- **Language**: Java
9
- **Installation**: Add to Maven dependencies:
10
11
```xml
12
<dependency>
13
<groupId>org.apache.flink</groupId>
14
<artifactId>flink-tweet-inputformat_2.11</artifactId>
15
<version>1.3.3</version>
16
</dependency>
17
```
18
19
## Core Imports
20
21
```java
22
import org.apache.flink.contrib.tweetinputformat.io.SimpleTweetInputFormat;
23
import org.apache.flink.contrib.tweetinputformat.model.tweet.Tweet;
24
```
25
26
## Basic Usage
27
28
```java
29
import org.apache.flink.contrib.tweetinputformat.io.SimpleTweetInputFormat;
30
import org.apache.flink.contrib.tweetinputformat.model.tweet.Tweet;
31
import org.apache.flink.api.java.DataSet;
32
import org.apache.flink.api.java.ExecutionEnvironment;
33
34
// Create Flink execution environment
35
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
36
37
// Create input format for Twitter JSON data
38
SimpleTweetInputFormat inputFormat = new SimpleTweetInputFormat();
39
40
// Read tweets from file or stream
41
DataSet<Tweet> tweets = env.readFile(inputFormat, "path/to/tweets.json");
42
43
// Process tweet data
44
tweets.map(tweet -> {
45
String text = tweet.getText();
46
String userName = tweet.getUser().getScreen_name();
47
long retweetCount = tweet.getRetweet_count();
48
return String.format("@%s: %s [RTs: %d]", userName, text, retweetCount);
49
}).print();
50
51
env.execute("Twitter Stream Processing");
52
```
53
54
## Architecture
55
56
The library is built around several key components:
57
58
- **Input Format**: `SimpleTweetInputFormat` extends Flink's DelimitedInputFormat for efficient stream processing
59
- **JSON Parser**: `TweetHandler` implements streaming JSON parsing with proper error handling
60
- **Data Model**: Complete Twitter API object hierarchy with `Tweet` as the root entity
61
- **Entity Support**: Full parsing of hashtags, URLs, user mentions, media, and geographic data
62
- **Error Resilience**: Graceful handling of malformed JSON with logging and continuation
63
64
## Capabilities
65
66
### Tweet Input Format
67
68
Core input format functionality for reading and parsing Twitter JSON data streams into structured Tweet objects within Apache Flink processing pipelines.
69
70
```java { .api }
71
public class SimpleTweetInputFormat extends DelimitedInputFormat<Tweet>
72
implements ResultTypeQueryable<Tweet> {
73
74
public void open(FileInputSplit split) throws IOException;
75
public Tweet nextRecord(Tweet record) throws IOException;
76
public Tweet readRecord(Tweet reuse, byte[] bytes, int offset, int numBytes) throws IOException;
77
public TypeInformation<Tweet> getProducedType();
78
}
79
```
80
81
[Input Format](./input-format.md)
82
83
### Tweet Data Model
84
85
Complete Twitter data model with `Tweet` as the root entity, containing all standard Twitter API fields including user information, entities, geographic data, and engagement metrics.
86
87
```java { .api }
88
public class Tweet {
89
public Tweet();
90
public Tweet(int level);
91
public void reset(int level);
92
93
// Core tweet properties
94
public String getText();
95
public void setText(String text);
96
public long getId();
97
public void setId(long id);
98
public String getCreated_at();
99
public void setCreated_at(String created_at);
100
101
// Related objects
102
public Users getUser();
103
public void setUser(Users user);
104
public Entities getEntities();
105
public void setEntities(Entities entities);
106
public Places getPlace();
107
public void setPlace(Places place);
108
public Tweet getRetweeted_status();
109
public void setRetweeted_status(Tweet retweeted_status);
110
}
111
```
112
113
[Tweet Model](./tweet-model.md)
114
115
### User Data Model
116
117
Twitter user profile information including demographics, account settings, follower metrics, and profile customization data.
118
119
```java { .api }
120
public class Users {
121
public Users();
122
public void reset();
123
124
// User identification
125
public long getId();
126
public void setId(long id);
127
public String getScreen_name();
128
public void setScreen_name(String screen_name);
129
public String getName();
130
public void setName(String name);
131
132
// Profile information
133
public String getDescription();
134
public void setDescription(String description);
135
public String getLocation();
136
public void setLocation(String location);
137
public boolean isVerified();
138
public void setVerified(boolean verified);
139
140
// Metrics
141
public long getFollowers_count();
142
public void setFollowers_count(long followers_count);
143
public long getFriends_count();
144
public void setFriends_count(long friends_count);
145
}
146
```
147
148
[User Model](./user-model.md)
149
150
### Entity Parsing
151
152
Extraction and parsing of entities from tweet text including hashtags, URLs, user mentions, media attachments, and stock symbols.
153
154
```java { .api }
155
public class Entities {
156
public Entities();
157
158
public List<HashTags> getHashtags();
159
public void setHashtags(List<HashTags> hashtags);
160
public List<URL> getUrls();
161
public void setUrls(List<URL> urls);
162
public List<UserMention> getUser_mentions();
163
public void setUser_mentions(List<UserMention> user_mentions);
164
public List<Media> getMedia();
165
public void setMedia(List<Media> media);
166
public List<Symbol> getSymbols();
167
public void setSymbols(List<Symbol> symbols);
168
}
169
```
170
171
[Entity Parsing](./entity-parsing.md)
172
173
### Geographic Data
174
175
Geographic information including coordinates, places, and location metadata associated with tweets.
176
177
```java { .api }
178
public class Coordinates {
179
public Coordinates();
180
181
public double[] getCoordinates();
182
public void setCoordinates(double[] coordinates);
183
public void setCoordinates(double longitude, double latitude);
184
public String getType();
185
}
186
187
public class Places {
188
public Places();
189
190
public String getId();
191
public void setId(String id);
192
public String getName();
193
public void setName(String name);
194
public String getCountry();
195
public void setCountry(String country);
196
public Attributes getAttributes();
197
public void setAttributes(Attributes attributes);
198
}
199
```
200
201
[Geographic Data](./geographic-data.md)
202
203
## Types
204
205
```java { .api }
206
// Core input format for Flink integration
207
public class SimpleTweetInputFormat extends DelimitedInputFormat<Tweet>
208
implements ResultTypeQueryable<Tweet>;
209
210
// JSON parsing handler
211
public class TweetHandler implements ContentHandler;
212
213
// Main tweet data structure
214
public class Tweet;
215
216
// User profile information
217
public class Users;
218
219
// Entity container
220
public class Entities;
221
222
// Individual entity types
223
public class HashTags;
224
public class URL;
225
public class UserMention;
226
public class Media;
227
public class Symbol;
228
229
// Geographic data
230
public class Coordinates;
231
public class Places;
232
public class Attributes;
233
public class BoundingBox;
234
235
// Additional data structures
236
public class Contributors;
237
public class CurrentUserRetweet;
238
public class Size;
239
```