Apache Flink input format for processing Twitter tweet data in JSON format with strongly-typed Tweet objects
—
Complete data model representing Twitter's JSON structure with full access to tweet content, user information, engagement metrics, and nested data structures.
Main tweet object containing all tweet data including text content, metadata, user information, and engagement metrics.
/**
* Main Tweet object representing a complete Twitter tweet with all associated data.
* Supports nested retweets and provides Kryo serialization compatibility.
*/
public class Tweet {
/**
* Default constructor creating tweet at nesting level 0
*/
public Tweet();
/**
* Constructor with specified nesting level for handling retweets
* @param level - Nesting level (0 for original tweets, >0 for retweets)
*/
public Tweet(int level);
/**
* Resets all fields to default values for object reuse
* @param level - Nesting level for retweet handling
*/
public void reset(int level);
// Core tweet content
/**
* Gets the tweet text content
* @return Tweet text as string
*/
public String getText();
/**
* Sets the tweet text content
* @param text - Tweet text content
*/
public void setText(String text);
/**
* Gets the unique tweet ID
* @return Tweet ID as long
*/
public long getId();
/**
* Sets the unique tweet ID
* @param id - Tweet ID
*/
public void setId(long id);
/**
* Gets the tweet ID as string
* @return Tweet ID as string representation
*/
public String getId_str();
/**
* Sets the tweet ID as string
* @param id_str - Tweet ID as string
*/
public void setId_str(String id_str);
/**
* Gets the tweet creation timestamp
* @return Creation timestamp string in Twitter format
*/
public String getCreated_at();
/**
* Sets the tweet creation timestamp
* @param created_at - Creation timestamp string
*/
public void setCreated_at(String created_at);
/**
* Gets the tweet source/client information
* @return Source string (e.g., "Twitter for iPhone")
*/
public String getSource();
/**
* Sets the tweet source/client information
* @param source - Source string
*/
public void setSource(String source);
/**
* Gets the tweet language code
* @return Language code (e.g., "en", "es")
*/
public String getLang();
/**
* Sets the tweet language code
* @param lang - Language code
*/
public void setLang(String lang);
// Engagement metrics
/**
* Gets the retweet count
* @return Number of retweets as long
*/
public long getRetweet_count();
/**
* Sets the retweet count
* @param retweet_count - Number of retweets
*/
public void setRetweet_count(long retweet_count);
/**
* Gets the favorite/like count
* @return Number of favorites as long
*/
public long getFavorite_count();
/**
* Sets the favorite/like count
* @param favorite_count - Number of favorites
*/
public void setFavorite_count(long favorite_count);
/**
* Checks if the tweet has been retweeted by the current user
* @return true if retweeted by current user
*/
public boolean isRetweeted();
/**
* Sets the retweeted status for current user
* @param retweeted - true if retweeted by current user
*/
public void setRetweeted(boolean retweeted);
/**
* Checks if the tweet has been favorited by the current user
* @return true if favorited by current user
*/
public boolean isFavorited();
/**
* Sets the favorited status for current user
* @param favorited - true if favorited by current user
*/
public void setFavorited(boolean favorited);
// Tweet metadata
/**
* Checks if the tweet text was truncated
* @return true if tweet text was truncated
*/
public boolean isTruncated();
/**
* Sets the truncated status
* @param truncated - true if tweet text was truncated
*/
public void setTruncated(boolean truncated);
/**
* Checks if the tweet contains potentially sensitive content
* @return true if potentially sensitive
*/
public boolean getPossibly_sensitive();
public boolean isPossibly_sensitive(); // Alternative getter
/**
* Sets the potentially sensitive flag
* @param possibly_sensitive - true if potentially sensitive
*/
public void setPossibly_sensitive(boolean possibly_sensitive);
/**
* Gets the filter level applied to the tweet
* @return Filter level string
*/
public String getFilter_level();
/**
* Sets the filter level
* @param filter_level - Filter level string
*/
public void setFilter_level(String filter_level);
// Reply information
/**
* Gets the screen name this tweet is replying to
* @return Screen name of replied-to user
*/
public String getIn_reply_to_screen_name();
/**
* Sets the screen name this tweet is replying to
* @param in_reply_to_screen_name - Screen name of replied-to user
*/
public void setIn_reply_to_screen_name(String in_reply_to_screen_name);
/**
* Gets the status ID this tweet is replying to
* @return Status ID of replied-to tweet
*/
public long getIn_reply_to_status_id();
/**
* Sets the status ID this tweet is replying to
* @param in_reply_to_status_id - Status ID of replied-to tweet
*/
public void setIn_reply_to_status_id(long in_reply_to_status_id);
/**
* Gets the status ID this tweet is replying to as string
* @return Status ID string of replied-to tweet
*/
public String getIn_reply_to_status_id_str();
/**
* Sets the status ID this tweet is replying to as string
* @param in_reply_to_status_id_str - Status ID string
*/
public void setIn_reply_to_status_id_str(String in_reply_to_status_id_str);
/**
* Gets the user ID this tweet is replying to
* @return User ID of replied-to user
*/
public long getIn_reply_to_user_id();
/**
* Sets the user ID this tweet is replying to
* @param in_reply_to_user_id - User ID of replied-to user
*/
public void setIn_reply_to_user_id(long in_reply_to_user_id);
/**
* Gets the user ID this tweet is replying to as string
* @return User ID string of replied-to user
*/
public String getIn_reply_to_user_id_str();
/**
* Sets the user ID this tweet is replying to as string
* @param in_reply_to_user_id_str - User ID string
*/
public void setIn_reply_to_user_id_str(String in_reply_to_user_id_str);
// Associated objects
/**
* Gets the user who posted this tweet
* @return Users object containing user information
*/
public Users getUser();
/**
* Sets the user who posted this tweet
* @param user - Users object containing user information
*/
public void setUser(Users user);
/**
* Gets the geographic coordinates of the tweet
* @return Coordinates object with longitude/latitude
*/
public Coordinates getCoordinates();
/**
* Sets the geographic coordinates of the tweet
* @param coordinates - Coordinates object
*/
public void setCoordinates(Coordinates coordinates);
/**
* Gets the place information associated with the tweet
* @return Places object containing location data
*/
public Places getPlace();
/**
* Sets the place information associated with the tweet
* @param place - Places object containing location data
*/
public void setPlace(Places place);
/**
* Gets the extracted entities from the tweet text
* @return Entities object containing hashtags, URLs, mentions, etc.
*/
public Entities getEntities();
/**
* Sets the extracted entities from the tweet text
* @param entities - Entities object
*/
public void setEntities(Entities entities);
/**
* Gets the list of contributors to this tweet
* @return List of Contributors objects
*/
public List<Contributors> getContributors();
/**
* Sets the list of contributors to this tweet
* @param contributors - List of Contributors objects
*/
public void setContributors(List<Contributors> contributors);
/**
* Gets the current user's retweet information
* @return CurrentUserRetweet object
*/
public CurrentUserRetweet getCurrentUserRetweet();
/**
* Sets the current user's retweet information
* @param currentUserRetweet - CurrentUserRetweet object
*/
public void setCurrentUserRetweet(CurrentUserRetweet currentUserRetweet);
// Retweet handling
/**
* Gets the original tweet if this is a retweet
* @return Tweet object of the original tweet
*/
public Tweet getRetweeted_status();
/**
* Sets the original tweet if this is a retweet
* @param retweeted_status - Original Tweet object
*/
public void setRetweeted_status(Tweet retweeted_status);
/**
* Gets the nesting level for retweet handling
* @return Nesting level as integer
*/
public int getTweetLevel();
/**
* Sets the nesting level for retweet handling
* @param tweetLevel - Nesting level
*/
public void setTweetLevel(int tweetLevel);
}Usage Examples:
import org.apache.flink.contrib.tweetinputformat.model.tweet.Tweet;
// Basic tweet processing
Tweet tweet = new Tweet();
// Tweet populated by input format...
// Access tweet content
String tweetText = tweet.getText();
String author = tweet.getUser().getScreen_name();
long retweetCount = tweet.getRetweet_count();
System.out.println(author + " tweeted: " + tweetText);
System.out.println("Retweets: " + retweetCount);
// Check for replies
if (tweet.getIn_reply_to_screen_name() != null) {
System.out.println("Reply to: " + tweet.getIn_reply_to_screen_name());
}
// Process hashtags from entities
tweet.getEntities().getHashtags().forEach(hashtag -> {
System.out.println("Hashtag: #" + hashtag.getText());
});
// Handle retweets
if (tweet.getRetweeted_status() != null) {
Tweet originalTweet = tweet.getRetweeted_status();
System.out.println("Original tweet by: " + originalTweet.getUser().getScreen_name());
System.out.println("Original text: " + originalTweet.getText());
}// Filter and transform tweets
DataSet<Tweet> tweets = env.readFile(new SimpleTweetInputFormat(), "tweets.json");
// Extract high-engagement tweets
DataSet<Tweet> popularTweets = tweets.filter(tweet ->
tweet.getRetweet_count() > 100 || tweet.getFavorite_count() > 500
);
// Get tweet summaries
DataSet<String> tweetSummaries = tweets.map(tweet -> {
return String.format("@%s (%s): %s [RT:%d, Fav:%d]",
tweet.getUser().getScreen_name(),
tweet.getCreated_at(),
tweet.getText().substring(0, Math.min(50, tweet.getText().length())),
tweet.getRetweet_count(),
tweet.getFavorite_count()
);
});Key Features:
Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-tweet-inputformat-2-10