Apache Flink input format for processing Twitter tweet data in JSON format with strongly-typed Tweet objects
npx @tessl/cli install tessl/maven-org-apache-flink--flink-tweet-inputformat-2-10@1.3.00
# Flink Tweet Input Format
1
2
Apache Flink input format for processing Twitter tweet data in JSON format. This library provides a specialized input format that can read and parse Twitter JSON data files, converting them into structured Tweet objects for use in Flink batch and streaming applications with comprehensive error handling and type-safe access to tweet data.
3
4
## Package Information
5
6
- **Package Name**: flink-tweet-inputformat_2.10
7
- **Package Type**: maven
8
- **Language**: Java
9
- **Group ID**: org.apache.flink
10
- **Version**: 1.3.3
11
- **Installation**: Add to Maven `pom.xml`:
12
13
```xml
14
<dependency>
15
<groupId>org.apache.flink</groupId>
16
<artifactId>flink-tweet-inputformat_2.10</artifactId>
17
<version>1.3.3</version>
18
</dependency>
19
```
20
21
## Core Imports
22
23
```java
24
import org.apache.flink.contrib.tweetinputformat.io.SimpleTweetInputFormat;
25
import org.apache.flink.contrib.tweetinputformat.model.tweet.Tweet;
26
```
27
28
## Basic Usage
29
30
```java
31
import org.apache.flink.api.java.DataSet;
32
import org.apache.flink.api.java.ExecutionEnvironment;
33
import org.apache.flink.contrib.tweetinputformat.io.SimpleTweetInputFormat;
34
import org.apache.flink.contrib.tweetinputformat.model.tweet.Tweet;
35
36
// Create Flink execution environment
37
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
38
39
// Read tweets from JSON file using the input format
40
DataSet<Tweet> tweets = env.readFile(new SimpleTweetInputFormat(), "path/to/tweets.json");
41
42
// Process tweets
43
tweets.map(tweet -> {
44
return "User: " + tweet.getUser().getScreen_name() +
45
", Text: " + tweet.getText() +
46
", Retweets: " + tweet.getRetweet_count();
47
}).print();
48
```
49
50
## Architecture
51
52
The Flink Tweet Input Format is built around several key components:
53
54
- **SimpleTweetInputFormat**: Main input format class extending Flink's DelimitedInputFormat for reading Twitter JSON
55
- **Tweet Data Model**: Comprehensive object model representing Twitter's JSON schema with nested objects for users, places, entities
56
- **JSON Parser Integration**: Built-in JSON parsing with error recovery for malformed tweets
57
- **Type Safety**: Strongly-typed Java objects for all Twitter data structures
58
- **Flink Integration**: Full compatibility with Flink's type system and serialization
59
60
## Capabilities
61
62
### Input Format Processing
63
64
Core input format functionality for reading Twitter JSON files into Flink DataSets or DataStreams. Handles JSON parsing, error recovery, and type conversion.
65
66
```java { .api }
67
public class SimpleTweetInputFormat extends DelimitedInputFormat<Tweet>
68
implements ResultTypeQueryable<Tweet> {
69
70
public void open(FileInputSplit split) throws IOException;
71
public Tweet nextRecord(Tweet record) throws IOException;
72
public Tweet readRecord(Tweet reuse, byte[] bytes, int offset, int numBytes) throws IOException;
73
public TypeInformation<Tweet> getProducedType();
74
}
75
```
76
77
[Input Format Processing](./input-format.md)
78
79
### Tweet Data Model
80
81
Complete data model representing Twitter's JSON structure with full access to tweet content, user information, geographic data, and extracted entities.
82
83
```java { .api }
84
public class Tweet {
85
public Tweet();
86
public Tweet(int level);
87
public void reset(int level);
88
89
// Core tweet data
90
public String getText();
91
public void setText(String text);
92
public long getId();
93
public void setId(long id);
94
public String getCreated_at();
95
public void setCreated_at(String created_at);
96
public Users getUser();
97
public void setUser(Users user);
98
99
// Engagement metrics
100
public long getRetweet_count();
101
public void setRetweet_count(long retweet_count);
102
public long getFavorite_count();
103
public void setFavorite_count(long favorite_count);
104
public boolean isRetweeted();
105
public void setRetweeted(boolean retweeted);
106
public boolean isFavorited();
107
public void setFavorited(boolean favorited);
108
}
109
```
110
111
[Tweet Model](./tweet-model.md)
112
113
### User Data Model
114
115
Twitter user profile information including follower counts, verification status, profile details, and account metadata.
116
117
```java { .api }
118
public class Users {
119
public Users();
120
public void reset();
121
122
// Basic user information
123
public String getScreen_name();
124
public void setScreen_name(String screen_name);
125
public String getName();
126
public void setName(String name);
127
public long getId();
128
public void setId(long id);
129
public String getDescription();
130
public void setDescription(String description);
131
132
// User metrics
133
public long getFollowers_count();
134
public void setFollowers_count(long followers_count);
135
public long getFriends_count();
136
public void setFriends_count(long friends_count);
137
public long getStatuses_count();
138
public void setStatuses_count(long statuses_count);
139
140
// Account status
141
public boolean isVerified();
142
public void setVerified(boolean verified);
143
public boolean isProtected_tweet();
144
public void setProtected_tweet(boolean protected_tweet);
145
}
146
```
147
148
[User Model](./user-model.md)
149
150
### Geographic Data
151
152
Location information including coordinates, places, and geographic boundaries for geo-tagged tweets.
153
154
```java { .api }
155
public class Coordinates {
156
public Coordinates();
157
public double[] getCoordinates();
158
public void setCoordinates(double[] coordinates);
159
public void setCoordinates(double longitude, double latitude);
160
public String getType();
161
}
162
163
public class Places {
164
public Places();
165
public String getName();
166
public void setName(String name);
167
public String getFull_name();
168
public void setFull_name(String full_name);
169
public String getCountry();
170
public void setCountry(String country);
171
public String getPlace_type();
172
public void setPlace_type(String place_type);
173
public Attributes getAttributes();
174
public void setAttributes(Attributes attributes);
175
public BoundingBox getBounding_box();
176
public void setBounding_box(BoundingBox bounding_box);
177
}
178
```
179
180
[Geographic Data](./geographic-data.md)
181
182
### Tweet Entities
183
184
Extracted entities from tweet text including hashtags, URLs, user mentions, media attachments, and financial symbols.
185
186
```java { .api }
187
public class Entities {
188
public Entities();
189
public List<HashTags> getHashtags();
190
public void setHashtags(List<HashTags> hashtags);
191
public List<URL> getUrls();
192
public void setUrls(List<URL> urls);
193
public List<UserMention> getUser_mentions();
194
public void setUser_mentions(List<UserMention> user_mentions);
195
public List<Media> getMedia();
196
public void setMedia(List<Media> media);
197
public List<Symbol> getSymbols();
198
public void setSymbols(List<Symbol> symbols);
199
}
200
```
201
202
[Tweet Entities](./tweet-entities.md)
203
204
## Common Types
205
206
```java { .api }
207
/**
208
* Type information for Flink serialization
209
*/
210
public interface ResultTypeQueryable<T> {
211
TypeInformation<T> getProducedType();
212
}
213
214
/**
215
* Contributors to tweet authorship
216
*/
217
public class Contributors {
218
public Contributors();
219
public Contributors(long id, String id_str, String screenName);
220
public void reset();
221
public long getId();
222
public void setId(long id);
223
public String getId_str();
224
public void setId_str(String id_str);
225
public String getScreenName();
226
public void setScreenName(String screenName);
227
}
228
229
/**
230
* Current user's retweet information
231
*/
232
public class CurrentUserRetweet {
233
public CurrentUserRetweet();
234
public void reset();
235
public long getId();
236
public void setId(long id);
237
public String getId_str();
238
public void setId_str();
239
}
240
```