0
# Input Format Processing
1
2
Core input format functionality for reading Twitter JSON files into Flink DataSets or DataStreams with robust error handling and JSON parsing.
3
4
## Capabilities
5
6
### SimpleTweetInputFormat
7
8
Main input format class for reading Twitter JSON data files. Extends Flink's DelimitedInputFormat to handle line-delimited JSON files containing tweet data.
9
10
```java { .api }
11
/**
12
* Input format for reading Twitter JSON data files into Tweet objects.
13
* Extends DelimitedInputFormat to process line-delimited JSON files.
14
* Implements ResultTypeQueryable for Flink's type system integration.
15
*/
16
public class SimpleTweetInputFormat extends DelimitedInputFormat<Tweet>
17
implements ResultTypeQueryable<Tweet> {
18
19
/**
20
* Opens the input format and initializes JSON parsing components
21
* @param split - File input split to process
22
* @throws IOException if initialization fails
23
*/
24
public void open(FileInputSplit split) throws IOException;
25
26
/**
27
* Reads the next tweet record with error recovery for malformed JSON
28
* @param record - Tweet object to reuse for deserialization
29
* @return Parsed Tweet object
30
* @throws IOException if reading fails
31
*/
32
public Tweet nextRecord(Tweet record) throws IOException;
33
34
/**
35
* Parses raw bytes into a Tweet object using JSON parser
36
* @param reuse - Tweet object to reuse for deserialization
37
* @param bytes - Raw byte data containing JSON
38
* @param offset - Starting position in byte array
39
* @param numBytes - Number of bytes to read
40
* @return Parsed Tweet object
41
* @throws IOException if parsing fails
42
*/
43
public Tweet readRecord(Tweet reuse, byte[] bytes, int offset, int numBytes) throws IOException;
44
45
/**
46
* Returns type information for Flink's type system
47
* @return TypeInformation for Tweet class
48
*/
49
public TypeInformation<Tweet> getProducedType();
50
}
51
```
52
53
**Usage Examples:**
54
55
```java
56
import org.apache.flink.api.java.DataSet;
57
import org.apache.flink.api.java.ExecutionEnvironment;
58
import org.apache.flink.contrib.tweetinputformat.io.SimpleTweetInputFormat;
59
import org.apache.flink.contrib.tweetinputformat.model.tweet.Tweet;
60
61
// Basic usage with DataSet API
62
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
63
DataSet<Tweet> tweets = env.readFile(new SimpleTweetInputFormat(), "tweets.json");
64
65
// Process tweets
66
DataSet<String> usernames = tweets.map(tweet -> tweet.getUser().getScreen_name());
67
68
// Filter for verified users
69
DataSet<Tweet> verifiedTweets = tweets.filter(tweet -> tweet.getUser().isVerified());
70
71
// Count retweets by user
72
DataSet<Tuple2<String, Long>> retweetCounts = tweets
73
.groupBy(tweet -> tweet.getUser().getScreen_name())
74
.sum(tweet -> tweet.getRetweet_count());
75
```
76
77
```java
78
import org.apache.flink.streaming.api.datastream.DataStream;
79
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
80
81
// Usage with DataStream API for real-time processing
82
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
83
84
// Read tweets as a stream (for file monitoring scenarios)
85
DataStream<Tweet> tweetStream = env.readFile(
86
new SimpleTweetInputFormat(),
87
"tweet-directory",
88
FileProcessingMode.PROCESS_CONTINUOUSLY,
89
1000 // monitoring interval in ms
90
);
91
92
// Real-time tweet processing
93
tweetStream
94
.filter(tweet -> tweet.getEntities().getHashtags().size() > 0)
95
.map(tweet -> {
96
String hashtags = tweet.getEntities().getHashtags().stream()
97
.map(ht -> ht.getText())
98
.collect(Collectors.joining(", "));
99
return tweet.getUser().getScreen_name() + ": " + hashtags;
100
})
101
.print();
102
```
103
104
### Error Handling
105
106
The input format provides robust error handling for malformed JSON data:
107
108
```java
109
// Error recovery is built into nextRecord() method
110
public Tweet nextRecord(Tweet record) throws IOException {
111
Boolean result = false;
112
113
do {
114
try {
115
record.reset(0);
116
record = super.nextRecord(record);
117
result = true;
118
} catch (JsonParseException e) {
119
// Skip malformed records and continue processing
120
result = false;
121
}
122
} while (!result);
123
124
return record;
125
}
126
```
127
128
**Key Features:**
129
130
- **Automatic Error Recovery**: Skips malformed JSON records and continues processing
131
- **Memory Efficiency**: Reuses Tweet objects to minimize garbage collection
132
- **Flink Integration**: Full compatibility with Flink's type system and serialization
133
- **JSON Parser Integration**: Uses json-simple library for robust JSON parsing
134
- **Logging**: Debug-level logging for parsing exceptions and malformed data
135
136
### Type System Integration
137
138
```java { .api }
139
/**
140
* Interface for providing type information to Flink's type system
141
*/
142
public interface ResultTypeQueryable<T> {
143
/**
144
* Returns type information for the produced type
145
* @return TypeInformation describing the output type
146
*/
147
TypeInformation<T> getProducedType();
148
}
149
150
/**
151
* Generic type information for Tweet objects
152
*/
153
public class GenericTypeInfo<T> extends TypeInformation<T> {
154
public GenericTypeInfo(Class<T> typeClass);
155
}
156
```
157
158
The input format implements `ResultTypeQueryable<Tweet>` to provide proper type information to Flink's type system, enabling efficient serialization and deserialization in distributed processing scenarios.