Apache Flink input format library for parsing Twitter JSON data into structured Tweet objects for stream processing applications
npx @tessl/cli install tessl/maven-org-apache-flink--flink-tweet-inputformat_2-11@1.3.0Apache Flink input format library for parsing Twitter JSON data into structured Tweet objects for stream processing applications. This library extends Flink's DelimitedInputFormat to handle Twitter API JSON data with comprehensive error handling, parsing capabilities, and complete Twitter data model support.
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-tweet-inputformat_2.11</artifactId>
<version>1.3.3</version>
</dependency>import org.apache.flink.contrib.tweetinputformat.io.SimpleTweetInputFormat;
import org.apache.flink.contrib.tweetinputformat.model.tweet.Tweet;import org.apache.flink.contrib.tweetinputformat.io.SimpleTweetInputFormat;
import org.apache.flink.contrib.tweetinputformat.model.tweet.Tweet;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
// Create Flink execution environment
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// Create input format for Twitter JSON data
SimpleTweetInputFormat inputFormat = new SimpleTweetInputFormat();
// Read tweets from file or stream
DataSet<Tweet> tweets = env.readFile(inputFormat, "path/to/tweets.json");
// Process tweet data
tweets.map(tweet -> {
String text = tweet.getText();
String userName = tweet.getUser().getScreen_name();
long retweetCount = tweet.getRetweet_count();
return String.format("@%s: %s [RTs: %d]", userName, text, retweetCount);
}).print();
env.execute("Twitter Stream Processing");The library is built around several key components:
SimpleTweetInputFormat extends Flink's DelimitedInputFormat for efficient stream processingTweetHandler implements streaming JSON parsing with proper error handlingTweet as the root entityCore input format functionality for reading and parsing Twitter JSON data streams into structured Tweet objects within Apache Flink processing pipelines.
public class SimpleTweetInputFormat extends DelimitedInputFormat<Tweet>
implements ResultTypeQueryable<Tweet> {
public void open(FileInputSplit split) throws IOException;
public Tweet nextRecord(Tweet record) throws IOException;
public Tweet readRecord(Tweet reuse, byte[] bytes, int offset, int numBytes) throws IOException;
public TypeInformation<Tweet> getProducedType();
}Complete Twitter data model with Tweet as the root entity, containing all standard Twitter API fields including user information, entities, geographic data, and engagement metrics.
public class Tweet {
public Tweet();
public Tweet(int level);
public void reset(int level);
// Core tweet properties
public String getText();
public void setText(String text);
public long getId();
public void setId(long id);
public String getCreated_at();
public void setCreated_at(String created_at);
// Related objects
public Users getUser();
public void setUser(Users user);
public Entities getEntities();
public void setEntities(Entities entities);
public Places getPlace();
public void setPlace(Places place);
public Tweet getRetweeted_status();
public void setRetweeted_status(Tweet retweeted_status);
}Twitter user profile information including demographics, account settings, follower metrics, and profile customization data.
public class Users {
public Users();
public void reset();
// User identification
public long getId();
public void setId(long id);
public String getScreen_name();
public void setScreen_name(String screen_name);
public String getName();
public void setName(String name);
// Profile information
public String getDescription();
public void setDescription(String description);
public String getLocation();
public void setLocation(String location);
public boolean isVerified();
public void setVerified(boolean verified);
// Metrics
public long getFollowers_count();
public void setFollowers_count(long followers_count);
public long getFriends_count();
public void setFriends_count(long friends_count);
}Extraction and parsing of entities from tweet text including hashtags, URLs, user mentions, media attachments, and stock symbols.
public class Entities {
public Entities();
public List<HashTags> getHashtags();
public void setHashtags(List<HashTags> hashtags);
public List<URL> getUrls();
public void setUrls(List<URL> urls);
public List<UserMention> getUser_mentions();
public void setUser_mentions(List<UserMention> user_mentions);
public List<Media> getMedia();
public void setMedia(List<Media> media);
public List<Symbol> getSymbols();
public void setSymbols(List<Symbol> symbols);
}Geographic information including coordinates, places, and location metadata associated with tweets.
public class Coordinates {
public Coordinates();
public double[] getCoordinates();
public void setCoordinates(double[] coordinates);
public void setCoordinates(double longitude, double latitude);
public String getType();
}
public class Places {
public Places();
public String getId();
public void setId(String id);
public String getName();
public void setName(String name);
public String getCountry();
public void setCountry(String country);
public Attributes getAttributes();
public void setAttributes(Attributes attributes);
}// Core input format for Flink integration
public class SimpleTweetInputFormat extends DelimitedInputFormat<Tweet>
implements ResultTypeQueryable<Tweet>;
// JSON parsing handler
public class TweetHandler implements ContentHandler;
// Main tweet data structure
public class Tweet;
// User profile information
public class Users;
// Entity container
public class Entities;
// Individual entity types
public class HashTags;
public class URL;
public class UserMention;
public class Media;
public class Symbol;
// Geographic data
public class Coordinates;
public class Places;
public class Attributes;
public class BoundingBox;
// Additional data structures
public class Contributors;
public class CurrentUserRetweet;
public class Size;