Apache Flink input format for processing Twitter tweet data in JSON format with strongly-typed Tweet objects
npx @tessl/cli install tessl/maven-org-apache-flink--flink-tweet-inputformat-2-10@1.3.0Apache Flink input format for processing Twitter tweet data in JSON format. This library provides a specialized input format that can read and parse Twitter JSON data files, converting them into structured Tweet objects for use in Flink batch and streaming applications with comprehensive error handling and type-safe access to tweet data.
pom.xml:<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-tweet-inputformat_2.10</artifactId>
<version>1.3.3</version>
</dependency>import org.apache.flink.contrib.tweetinputformat.io.SimpleTweetInputFormat;
import org.apache.flink.contrib.tweetinputformat.model.tweet.Tweet;import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.contrib.tweetinputformat.io.SimpleTweetInputFormat;
import org.apache.flink.contrib.tweetinputformat.model.tweet.Tweet;
// Create Flink execution environment
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// Read tweets from JSON file using the input format
DataSet<Tweet> tweets = env.readFile(new SimpleTweetInputFormat(), "path/to/tweets.json");
// Process tweets
tweets.map(tweet -> {
return "User: " + tweet.getUser().getScreen_name() +
", Text: " + tweet.getText() +
", Retweets: " + tweet.getRetweet_count();
}).print();The Flink Tweet Input Format is built around several key components:
Core input format functionality for reading Twitter JSON files into Flink DataSets or DataStreams. Handles JSON parsing, error recovery, and type conversion.
public class SimpleTweetInputFormat extends DelimitedInputFormat<Tweet>
implements ResultTypeQueryable<Tweet> {
public void open(FileInputSplit split) throws IOException;
public Tweet nextRecord(Tweet record) throws IOException;
public Tweet readRecord(Tweet reuse, byte[] bytes, int offset, int numBytes) throws IOException;
public TypeInformation<Tweet> getProducedType();
}Complete data model representing Twitter's JSON structure with full access to tweet content, user information, geographic data, and extracted entities.
public class Tweet {
public Tweet();
public Tweet(int level);
public void reset(int level);
// Core tweet data
public String getText();
public void setText(String text);
public long getId();
public void setId(long id);
public String getCreated_at();
public void setCreated_at(String created_at);
public Users getUser();
public void setUser(Users user);
// Engagement metrics
public long getRetweet_count();
public void setRetweet_count(long retweet_count);
public long getFavorite_count();
public void setFavorite_count(long favorite_count);
public boolean isRetweeted();
public void setRetweeted(boolean retweeted);
public boolean isFavorited();
public void setFavorited(boolean favorited);
}Twitter user profile information including follower counts, verification status, profile details, and account metadata.
public class Users {
public Users();
public void reset();
// Basic user information
public String getScreen_name();
public void setScreen_name(String screen_name);
public String getName();
public void setName(String name);
public long getId();
public void setId(long id);
public String getDescription();
public void setDescription(String description);
// User metrics
public long getFollowers_count();
public void setFollowers_count(long followers_count);
public long getFriends_count();
public void setFriends_count(long friends_count);
public long getStatuses_count();
public void setStatuses_count(long statuses_count);
// Account status
public boolean isVerified();
public void setVerified(boolean verified);
public boolean isProtected_tweet();
public void setProtected_tweet(boolean protected_tweet);
}Location information including coordinates, places, and geographic boundaries for geo-tagged tweets.
public class Coordinates {
public Coordinates();
public double[] getCoordinates();
public void setCoordinates(double[] coordinates);
public void setCoordinates(double longitude, double latitude);
public String getType();
}
public class Places {
public Places();
public String getName();
public void setName(String name);
public String getFull_name();
public void setFull_name(String full_name);
public String getCountry();
public void setCountry(String country);
public String getPlace_type();
public void setPlace_type(String place_type);
public Attributes getAttributes();
public void setAttributes(Attributes attributes);
public BoundingBox getBounding_box();
public void setBounding_box(BoundingBox bounding_box);
}Extracted entities from tweet text including hashtags, URLs, user mentions, media attachments, and financial symbols.
public class Entities {
public Entities();
public List<HashTags> getHashtags();
public void setHashtags(List<HashTags> hashtags);
public List<URL> getUrls();
public void setUrls(List<URL> urls);
public List<UserMention> getUser_mentions();
public void setUser_mentions(List<UserMention> user_mentions);
public List<Media> getMedia();
public void setMedia(List<Media> media);
public List<Symbol> getSymbols();
public void setSymbols(List<Symbol> symbols);
}/**
* Type information for Flink serialization
*/
public interface ResultTypeQueryable<T> {
TypeInformation<T> getProducedType();
}
/**
* Contributors to tweet authorship
*/
public class Contributors {
public Contributors();
public Contributors(long id, String id_str, String screenName);
public void reset();
public long getId();
public void setId(long id);
public String getId_str();
public void setId_str(String id_str);
public String getScreenName();
public void setScreenName(String screenName);
}
/**
* Current user's retweet information
*/
public class CurrentUserRetweet {
public CurrentUserRetweet();
public void reset();
public long getId();
public void setId(long id);
public String getId_str();
public void setId_str();
}