Apache Flink input format for processing Twitter tweet data in JSON format with strongly-typed Tweet objects
—
Extracted entities from tweet text including hashtags, URLs, user mentions, media attachments, and financial symbols.
Container for all entities parsed from tweet text, providing structured access to hashtags, URLs, mentions, media, and symbols.
/**
* Container for all entities parsed from tweet text.
* Automatically extracts and categorizes different types of content from tweets.
*/
public class Entities {
/**
* Default constructor (initializes all entity lists)
*/
public Entities();
/**
* Gets the list of hashtags extracted from tweet text
* @return List of HashTags objects
*/
public List<HashTags> getHashtags();
/**
* Sets the list of hashtags
* @param hashtags - List of HashTags objects
*/
public void setHashtags(List<HashTags> hashtags);
/**
* Gets the list of URLs found in tweet text
* @return List of URL objects
*/
public List<URL> getUrls();
/**
* Sets the list of URLs
* @param urls - List of URL objects
*/
public void setUrls(List<URL> urls);
/**
* Gets the list of user mentions in tweet text
* @return List of UserMention objects
*/
public List<UserMention> getUser_mentions();
/**
* Sets the list of user mentions
* @param user_mentions - List of UserMention objects
*/
public void setUser_mentions(List<UserMention> user_mentions);
/**
* Gets the list of media attachments
* @return List of Media objects
*/
public List<Media> getMedia();
/**
* Sets the list of media attachments
* @param media - List of Media objects
*/
public void setMedia(List<Media> media);
/**
* Gets the list of financial symbols (cashtags)
* @return List of Symbol objects
*/
public List<Symbol> getSymbols();
/**
* Sets the list of financial symbols
* @param symbols - List of Symbol objects
*/
public void setSymbols(List<Symbol> symbols);
}Hashtags extracted from tweet text with position information.
/**
* Hashtag entities parsed from tweet text.
* Includes text content and position indices within the tweet.
*/
public class HashTags {
/**
* Gets the hashtag text (without the # symbol)
* @return Hashtag text string
*/
public String getText();
/**
* Sets the hashtag text with optional hash symbol handling
* @param text - Hashtag text
* @param hashExist - Whether text already includes # symbol
*/
public void setText(String text, boolean hashExist);
/**
* Gets the character position indices where hashtag appears in tweet text
* @return Array of [start, end] positions
*/
public long[] getIndices();
/**
* Sets the character position indices
* @param indices - Array of [start, end] positions
*/
public void setIndices(long[] indices);
/**
* Sets the position indices using start and end values
* @param start - Starting character position
* @param end - Ending character position
*/
public void setIndices(long start, long end);
}URLs found in tweet text with expanded and display versions.
/**
* URL entities included in tweet text.
* Includes original, display, and expanded versions of URLs.
*/
public class URL {
/**
* Default constructor
*/
public URL();
/**
* Gets the original URL as it appears in tweet text (usually shortened)
* @return Original URL string
*/
public String getUrl();
/**
* Sets the original URL
* @param url - Original URL string
*/
public void setUrl(String url);
/**
* Gets the display URL shown to users (truncated if long)
* @return Display URL string
*/
public String getDisplay_url();
/**
* Sets the display URL
* @param display_url - Display URL string
*/
public void setDisplay_url(String display_url);
/**
* Gets the fully expanded URL
* @return Expanded URL string
*/
public String getExpanded_url();
/**
* Sets the fully expanded URL
* @param expanded_url - Expanded URL string
*/
public void setExpanded_url(String expanded_url);
/**
* Gets the character position indices where URL appears in tweet text
* @return Array of [start, end] positions
*/
public long[] getIndices();
/**
* Sets the character position indices
* @param indices - Array of [start, end] positions
*/
public void setIndices(long[] indices);
}User mentions (@username) found in tweet text with user details.
/**
* User mention entities representing @username references in tweet text.
* Includes user ID, screen name, and display name information.
*/
public class UserMention {
/**
* Default constructor
*/
public UserMention();
/**
* Gets the mentioned user's unique ID
* @return User ID as long
*/
public long getId();
/**
* Sets the mentioned user's ID
* @param id - User ID
*/
public void setId(long id);
/**
* Gets the mentioned user's ID as string
* @return User ID string (computed from long ID)
*/
public String getId_str();
/**
* Sets the user ID string (internal method)
*/
public void setId_str();
/**
* Gets the mentioned user's screen name (without @ symbol)
* @return Screen name string
*/
public String getScreen_name();
/**
* Sets the mentioned user's screen name
* @param screen_name - Screen name string
*/
public void setScreen_name(String screen_name);
/**
* Gets the mentioned user's display name
* @return Display name string
*/
public String getName();
/**
* Sets the mentioned user's display name
* @param name - Display name string
*/
public void setName(String name);
/**
* Gets the character position indices where mention appears in tweet text
* @return Array of [start, end] positions
*/
public long[] getIndices();
/**
* Sets the character position indices
* @param indices - Array of [start, end] positions
*/
public void setIndices(long[] indices);
}Media attachments (photos, videos) uploaded with the tweet.
/**
* Media entities representing photos, videos, and other media uploaded with tweets.
* Includes URLs, metadata, and available size variants.
*/
public class Media {
/**
* Default constructor
*/
public Media();
/**
* Gets the unique media ID
* @return Media ID as long
*/
public long getId();
/**
* Sets the unique media ID
* @param id - Media ID
*/
public void setId(long id);
/**
* Gets the media ID as string
* @return Media ID string
*/
public String getId_str();
/**
* Sets the media ID as string
* @param id_str - Media ID string
*/
public void setId_str(String id_str);
/**
* Gets the media URL (HTTP)
* @return Media URL string
*/
public String getMedia_url();
/**
* Sets the media URL
* @param media_url - Media URL string
*/
public void setMedia_url(String media_url);
/**
* Gets the media URL (HTTPS)
* @return HTTPS media URL string
*/
public String getMedia_url_https();
/**
* Sets the HTTPS media URL
* @param media_url_https - HTTPS media URL string
*/
public void setMedia_url_https(String media_url_https);
/**
* Gets the display URL shown in tweet text
* @return Display URL string
*/
public String getDisplay_url();
/**
* Sets the display URL
* @param display_url - Display URL string
*/
public void setDisplay_url(String display_url);
/**
* Gets the expanded URL linking to media
* @return Expanded URL string
*/
public String getExpanded_url();
/**
* Sets the expanded URL
* @param expanded_url - Expanded URL string
*/
public void setExpanded_url(String expanded_url);
/**
* Gets the shortened URL as it appears in tweet text
* @return Shortened URL string
*/
public String getUrl();
/**
* Sets the shortened URL
* @param url - Shortened URL string
*/
public void setUrl(String url);
/**
* Gets the media type (e.g., "photo", "video")
* @return Media type string
*/
public String getType();
/**
* Sets the media type
* @param type - Media type string
*/
public void setType(String type);
/**
* Gets the available size variants for the media
* @return Map of size names to Size objects
*/
public Map<String, Size> getSizes();
/**
* Sets the available size variants
* @param sizes - Map of size names to Size objects
*/
public void setSizes(Map<String, Size> sizes);
/**
* Gets the character position indices where media URL appears in tweet text
* @return Array of [start, end] positions
*/
public long[] getIndices();
/**
* Sets the character position indices
* @param indices - Array of [start, end] positions
*/
public void setIndices(long[] indices);
}Size variants available for media files.
/**
* Size information for media file variants.
* Different sizes are available for different display contexts.
*/
public class Size {
/**
* Constructor with size dimensions and resize method
* @param width - Width in pixels
* @param height - Height in pixels
* @param resize - Resize method ("fit", "crop")
*/
public Size(long width, long height, String resize);
/**
* Gets the width in pixels
* @return Width as long
*/
public long getWidth();
/**
* Sets the width in pixels
* @param width - Width value
*/
public void setWidth(long width);
/**
* Gets the height in pixels
* @return Height as long
*/
public long getHeight();
/**
* Sets the height in pixels
* @param height - Height value
*/
public void setHeight(long height);
/**
* Gets the resize method used for this size variant
* @return Resize method ("fit", "crop")
*/
public String getResize();
/**
* Sets the resize method
* @param resize - Resize method string
*/
public void setResize(String resize);
}Financial symbols (cashtags) starting with dollar sign.
/**
* Financial symbol entities (cashtags) starting with dollar sign.
* Used for stock tickers and financial references.
*/
public class Symbol {
/**
* Default constructor
*/
public Symbol();
/**
* Gets the symbol text (without the $ symbol)
* @return Symbol text string
*/
public String getText();
/**
* Sets the symbol text
* @param text - Symbol text string
*/
public void setText(String text);
/**
* Gets the character position indices where symbol appears in tweet text
* @return Array of [start, end] positions
*/
public long[] getIndices();
/**
* Sets the character position indices
* @param indices - Array of [start, end] positions
*/
public void setIndices(long[] indices);
}Usage Examples:
import org.apache.flink.contrib.tweetinputformat.model.tweet.Tweet;
import org.apache.flink.contrib.tweetinputformat.model.tweet.entities.*;
// Process tweet entities
DataSet<Tweet> tweets = env.readFile(new SimpleTweetInputFormat(), "tweets.json");
// Extract all hashtags
DataSet<String> hashtags = tweets
.flatMap(tweet -> tweet.getEntities().getHashtags())
.map(hashtag -> hashtag.getText());
// Find tweets with specific hashtags
DataSet<Tweet> techTweets = tweets.filter(tweet -> {
List<HashTags> hashtags = tweet.getEntities().getHashtags();
return hashtags.stream().anyMatch(ht ->
ht.getText().toLowerCase().contains("tech") ||
ht.getText().toLowerCase().contains("ai")
);
});
// Extract URLs and their expanded versions
DataSet<Tuple2<String, String>> urlMappings = tweets
.flatMap(tweet -> tweet.getEntities().getUrls())
.map(url -> new Tuple2<>(url.getUrl(), url.getExpanded_url()));
// Find tweets with media attachments
DataSet<Tweet> mediaTweets = tweets.filter(tweet ->
tweet.getEntities().getMedia().size() > 0
);// Detailed entity analysis
Tweet tweet = new Tweet();
// Tweet populated by input format...
Entities entities = tweet.getEntities();
// Process hashtags
System.out.println("Hashtags found:");
for (HashTags hashtag : entities.getHashtags()) {
System.out.println(" #" + hashtag.getText());
long[] indices = hashtag.getIndices();
System.out.println(" Position: " + indices[0] + "-" + indices[1]);
}
// Process user mentions
System.out.println("User mentions:");
for (UserMention mention : entities.getUser_mentions()) {
System.out.println(" @" + mention.getScreen_name());
System.out.println(" User: " + mention.getName());
System.out.println(" ID: " + mention.getId());
}
// Process URLs
System.out.println("URLs found:");
for (URL url : entities.getUrls()) {
System.out.println(" Original: " + url.getUrl());
System.out.println(" Display: " + url.getDisplay_url());
System.out.println(" Expanded: " + url.getExpanded_url());
}
// Process media
System.out.println("Media attachments:");
for (Media media : entities.getMedia()) {
System.out.println(" Type: " + media.getType());
System.out.println(" URL: " + media.getMedia_url_https());
// Check available sizes
Map<String, Size> sizes = media.getSizes();
for (Map.Entry<String, Size> entry : sizes.entrySet()) {
Size size = entry.getValue();
System.out.println(" " + entry.getKey() + ": " +
size.getWidth() + "x" + size.getHeight() + " (" + size.getResize() + ")");
}
}
// Process financial symbols
System.out.println("Financial symbols:");
for (Symbol symbol : entities.getSymbols()) {
System.out.println(" $" + symbol.getText());
}// Entity-based analytics
DataSet<Tweet> tweets = env.readFile(new SimpleTweetInputFormat(), "tweets.json");
// Most popular hashtags
DataSet<Tuple2<String, Long>> hashtagCounts = tweets
.flatMap(tweet -> tweet.getEntities().getHashtags())
.map(hashtag -> new Tuple2<>(hashtag.getText(), 1L))
.groupBy(0)
.sum(1)
.sortPartition(1, Order.DESCENDING);
// Domain analysis from URLs
DataSet<Tuple2<String, Long>> domainCounts = tweets
.flatMap(tweet -> tweet.getEntities().getUrls())
.map(url -> {
try {
java.net.URL parsedUrl = new java.net.URL(url.getExpanded_url());
return new Tuple2<>(parsedUrl.getHost(), 1L);
} catch (Exception e) {
return new Tuple2<>("unknown", 1L);
}
})
.groupBy(0)
.sum(1);
// User interaction network from mentions
DataSet<Tuple2<String, String>> mentionNetwork = tweets
.flatMap(tweet -> {
String author = tweet.getUser().getScreen_name();
return tweet.getEntities().getUser_mentions().stream()
.map(mention -> new Tuple2<>(author, mention.getScreen_name()))
.collect(Collectors.toList());
});Key Features:
Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-tweet-inputformat-2-10