Apache Flink input format for processing Twitter tweet data in JSON format with strongly-typed Tweet objects
—
Core input format functionality for reading Twitter JSON files into Flink DataSets or DataStreams with robust error handling and JSON parsing.
Main input format class for reading Twitter JSON data files. Extends Flink's DelimitedInputFormat to handle line-delimited JSON files containing tweet data.
/**
* Input format for reading Twitter JSON data files into Tweet objects.
* Extends DelimitedInputFormat to process line-delimited JSON files.
* Implements ResultTypeQueryable for Flink's type system integration.
*/
public class SimpleTweetInputFormat extends DelimitedInputFormat<Tweet>
implements ResultTypeQueryable<Tweet> {
/**
* Opens the input format and initializes JSON parsing components
* @param split - File input split to process
* @throws IOException if initialization fails
*/
public void open(FileInputSplit split) throws IOException;
/**
* Reads the next tweet record with error recovery for malformed JSON
* @param record - Tweet object to reuse for deserialization
* @return Parsed Tweet object
* @throws IOException if reading fails
*/
public Tweet nextRecord(Tweet record) throws IOException;
/**
* Parses raw bytes into a Tweet object using JSON parser
* @param reuse - Tweet object to reuse for deserialization
* @param bytes - Raw byte data containing JSON
* @param offset - Starting position in byte array
* @param numBytes - Number of bytes to read
* @return Parsed Tweet object
* @throws IOException if parsing fails
*/
public Tweet readRecord(Tweet reuse, byte[] bytes, int offset, int numBytes) throws IOException;
/**
* Returns type information for Flink's type system
* @return TypeInformation for Tweet class
*/
public TypeInformation<Tweet> getProducedType();
}Usage Examples:
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.contrib.tweetinputformat.io.SimpleTweetInputFormat;
import org.apache.flink.contrib.tweetinputformat.model.tweet.Tweet;
// Basic usage with DataSet API
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tweet> tweets = env.readFile(new SimpleTweetInputFormat(), "tweets.json");
// Process tweets
DataSet<String> usernames = tweets.map(tweet -> tweet.getUser().getScreen_name());
// Filter for verified users
DataSet<Tweet> verifiedTweets = tweets.filter(tweet -> tweet.getUser().isVerified());
// Count retweets by user
DataSet<Tuple2<String, Long>> retweetCounts = tweets
.groupBy(tweet -> tweet.getUser().getScreen_name())
.sum(tweet -> tweet.getRetweet_count());import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
// Usage with DataStream API for real-time processing
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Read tweets as a stream (for file monitoring scenarios)
DataStream<Tweet> tweetStream = env.readFile(
new SimpleTweetInputFormat(),
"tweet-directory",
FileProcessingMode.PROCESS_CONTINUOUSLY,
1000 // monitoring interval in ms
);
// Real-time tweet processing
tweetStream
.filter(tweet -> tweet.getEntities().getHashtags().size() > 0)
.map(tweet -> {
String hashtags = tweet.getEntities().getHashtags().stream()
.map(ht -> ht.getText())
.collect(Collectors.joining(", "));
return tweet.getUser().getScreen_name() + ": " + hashtags;
})
.print();The input format provides robust error handling for malformed JSON data:
// Error recovery is built into nextRecord() method
public Tweet nextRecord(Tweet record) throws IOException {
Boolean result = false;
do {
try {
record.reset(0);
record = super.nextRecord(record);
result = true;
} catch (JsonParseException e) {
// Skip malformed records and continue processing
result = false;
}
} while (!result);
return record;
}Key Features:
/**
* Interface for providing type information to Flink's type system
*/
public interface ResultTypeQueryable<T> {
/**
* Returns type information for the produced type
* @return TypeInformation describing the output type
*/
TypeInformation<T> getProducedType();
}
/**
* Generic type information for Tweet objects
*/
public class GenericTypeInfo<T> extends TypeInformation<T> {
public GenericTypeInfo(Class<T> typeClass);
}The input format implements ResultTypeQueryable<Tweet> to provide proper type information to Flink's type system, enabling efficient serialization and deserialization in distributed processing scenarios.
Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-tweet-inputformat-2-10