0
# Tweet Input Format
1
2
Core input format functionality for reading and parsing Twitter JSON data streams into structured Tweet objects within Apache Flink processing pipelines.
3
4
## Capabilities
5
6
### SimpleTweetInputFormat
7
8
Main input format class that extends Flink's DelimitedInputFormat to handle Twitter JSON data with error resilience and proper type integration.
9
10
```java { .api }
11
/**
12
* Apache Flink input format for parsing Twitter JSON data into Tweet objects.
13
* Extends DelimitedInputFormat to provide streaming JSON parsing with error handling.
14
*/
15
public class SimpleTweetInputFormat extends DelimitedInputFormat<Tweet>
16
implements ResultTypeQueryable<Tweet> {
17
18
/**
19
* Initialize the input format with JSON parser and tweet handler.
20
* Called by Flink runtime before processing begins.
21
*
22
* @param split FileInputSplit containing the data source information
23
* @throws IOException if initialization fails
24
*/
25
public void open(FileInputSplit split) throws IOException;
26
27
/**
28
* Read the next tweet record from the input stream with error handling.
29
* Continues processing even if individual tweets are malformed.
30
*
31
* @param record Tweet object to reuse for parsing (for efficiency)
32
* @return Parsed Tweet object or null if end of stream
33
* @throws IOException if stream reading fails
34
*/
35
public Tweet nextRecord(Tweet record) throws IOException;
36
37
/**
38
* Parse raw JSON bytes into a Tweet object using streaming JSON parser.
39
* Handles malformed JSON gracefully with proper logging.
40
*
41
* @param reuse Tweet object to reuse for parsing
42
* @param bytes Raw JSON bytes to parse
43
* @param offset Starting position in byte array
44
* @param numBytes Number of bytes to read
45
* @return Parsed Tweet object
46
* @throws IOException if parsing fails critically
47
*/
48
public Tweet readRecord(Tweet reuse, byte[] bytes, int offset, int numBytes) throws IOException;
49
50
/**
51
* Provide type information for Flink's serialization system.
52
* Required for proper Kryo serialization of Tweet objects.
53
*
54
* @return TypeInformation for Tweet class
55
*/
56
public TypeInformation<Tweet> getProducedType();
57
}
58
```
59
60
**Usage Examples:**
61
62
```java
63
import org.apache.flink.contrib.tweetinputformat.io.SimpleTweetInputFormat;
64
import org.apache.flink.contrib.tweetinputformat.model.tweet.Tweet;
65
import org.apache.flink.api.java.DataSet;
66
import org.apache.flink.api.java.ExecutionEnvironment;
67
68
// Basic usage with file input
69
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
70
SimpleTweetInputFormat inputFormat = new SimpleTweetInputFormat();
71
DataSet<Tweet> tweets = env.readFile(inputFormat, "tweets.json");
72
73
// Process tweets
74
tweets.filter(tweet -> tweet.getRetweet_count() > 100)
75
.map(tweet -> tweet.getUser().getScreen_name() + ": " + tweet.getText())
76
.print();
77
78
// Usage with streaming (DataStream API)
79
StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
80
DataStream<Tweet> tweetStream = streamEnv.readFile(inputFormat, "stream/");
81
82
tweetStream.filter(tweet -> tweet.getLang().equals("en"))
83
.map(tweet -> new TweetSummary(
84
tweet.getId_str(),
85
tweet.getUser().getScreen_name(),
86
tweet.getText(),
87
tweet.getCreated_at()
88
))
89
.addSink(new TweetSink());
90
```
91
92
### TweetHandler
93
94
JSON parsing handler that implements the streaming ContentHandler interface for efficient processing of Twitter JSON data.
95
96
```java { .api }
97
/**
98
* Streaming JSON parser handler for Twitter data structures.
99
* Implements ContentHandler for efficient parsing of nested JSON objects.
100
*/
101
public class TweetHandler implements ContentHandler {
102
103
/** Tweet object being populated during parsing */
104
protected Tweet reuse;
105
106
/**
107
* Handle the start of JSON parsing.
108
* @throws ParseException if parsing setup fails
109
* @throws IOException if I/O error occurs
110
*/
111
public void startJSON() throws ParseException, IOException;
112
113
/**
114
* Handle the end of JSON parsing.
115
* @throws ParseException if parsing cleanup fails
116
* @throws IOException if I/O error occurs
117
*/
118
public void endJSON() throws ParseException, IOException;
119
120
/**
121
* Handle start of JSON object.
122
* @return true to continue parsing
123
* @throws ParseException if object parsing fails
124
* @throws IOException if I/O error occurs
125
*/
126
public boolean startObject() throws ParseException, IOException;
127
128
/**
129
* Handle end of JSON object.
130
* @return true to continue parsing
131
* @throws ParseException if object parsing fails
132
* @throws IOException if I/O error occurs
133
*/
134
public boolean endObject() throws ParseException, IOException;
135
136
/**
137
* Handle start of object property with key name.
138
* @param key Property name
139
* @return true to continue parsing
140
* @throws ParseException if property parsing fails
141
* @throws IOException if I/O error occurs
142
*/
143
public boolean startObjectEntry(String key) throws ParseException, IOException;
144
145
/**
146
* Handle primitive values (strings, numbers, booleans).
147
* @param value Primitive value to process
148
* @return true to continue parsing
149
* @throws ParseException if value parsing fails
150
* @throws IOException if I/O error occurs
151
*/
152
public boolean primitive(Object value) throws ParseException, IOException;
153
}
154
```
155
156
## Error Handling
157
158
The input format provides robust error handling for production use:
159
160
- **Malformed JSON**: Individual malformed tweets are logged and skipped, processing continues
161
- **Type Conversion**: Invalid data types are handled gracefully with default values
162
- **Parse Exceptions**: Detailed logging of parsing issues without stopping the stream
163
- **I/O Errors**: Proper propagation of critical I/O failures
164
165
**Example Error Scenarios:**
166
167
```java
168
// The input format handles these gracefully:
169
// 1. Malformed JSON tweets
170
{"invalid": json, "missing": quotes}
171
172
// 2. Missing required fields
173
{"text": "Hello", "missing_id": true}
174
175
// 3. Wrong data types
176
{"id": "not_a_number", "retweet_count": "invalid"}
177
178
// 4. Truncated JSON
179
{"text": "Hello world", "incomplete":
180
```
181
182
## Integration with Flink
183
184
The input format integrates seamlessly with Flink's DataSet and DataStream APIs:
185
186
```java
187
// DataSet API (batch processing)
188
DataSet<Tweet> tweets = env.readFile(new SimpleTweetInputFormat(), inputPath);
189
190
// DataStream API (stream processing)
191
DataStream<Tweet> tweetStream = streamEnv.readFile(new SimpleTweetInputFormat(), inputPath);
192
193
// Custom serialization (if needed)
194
env.getConfig().registerKryoType(Tweet.class);
195
env.getConfig().registerKryoType(Users.class);
196
env.getConfig().registerKryoType(Entities.class);
197
```