Apache Flink input format for processing Twitter tweet data in JSON format with strongly-typed Tweet objects
—
Location information including coordinates, places, and geographic boundaries for geo-tagged tweets.
Geographic coordinates representing the precise location where a tweet was posted.
/**
* Geographic location of tweet as geoJSON Point.
* Coordinates are stored as [longitude, latitude] following geoJSON standard.
*/
public class Coordinates {
/**
* Default constructor
*/
public Coordinates();
/**
* Gets the coordinate array [longitude, latitude]
* @return Array of two doubles: [longitude, latitude]
*/
public double[] getCoordinates();
/**
* Sets the coordinate array
* @param coordinates - Array of [longitude, latitude]
*/
public void setCoordinates(double[] coordinates);
/**
* Sets coordinates using individual longitude and latitude values
* @param longitude - Longitude value
* @param latitude - Latitude value
*/
public void setCoordinates(double longitude, double latitude);
/**
* Gets the geometry type (always "point" for tweets)
* @return String "point"
*/
public String getType();
/**
* Returns formatted coordinate string
* @return String representation of coordinates
*/
public String toString();
}Named locations that can be attached to tweets, representing specific venues, cities, or regions.
/**
* Named locations with coordinates that can be attached to tweets via place_id.
* Includes detailed place information and geographic boundaries.
*/
public class Places {
/**
* Default constructor (initializes Attributes and BoundingBox)
*/
public Places();
/**
* Gets the unique place ID
* @return Place ID string
*/
public String getId();
/**
* Sets the unique place ID
* @param id - Place ID string
*/
public void setId(String id);
/**
* Gets the place name
* @return Short place name
*/
public String getName();
/**
* Sets the place name
* @param name - Short place name
*/
public void setName(String name);
/**
* Gets the full place name including hierarchy
* @return Full place name (e.g., "Manhattan, NY")
*/
public String getFull_name();
/**
* Sets the full place name
* @param full_name - Full place name
*/
public void setFull_name(String full_name);
/**
* Gets the country name
* @return Country name
*/
public String getCountry();
/**
* Sets the country name
* @param country - Country name
*/
public void setCountry(String country);
/**
* Gets the ISO country code
* @return Two-letter country code (e.g., "US", "GB")
*/
public String getCountry_code();
/**
* Sets the ISO country code
* @param country_code - Two-letter country code
*/
public void setCountry_code(String country_code);
/**
* Gets the place type
* @return Place type (e.g., "city", "neighborhood", "poi")
*/
public String getPlace_type();
/**
* Sets the place type
* @param place_type - Place type
*/
public void setPlace_type(String place_type);
/**
* Gets the place URL for more information
* @return URL string
*/
public String getUrl();
/**
* Sets the place URL
* @param url - URL string
*/
public void setUrl(String url);
/**
* Gets additional place attributes (address, phone, etc.)
* @return Attributes object containing detailed place information
*/
public Attributes getAttributes();
/**
* Sets additional place attributes
* @param attributes - Attributes object
*/
public void setAttributes(Attributes attributes);
/**
* Gets the geographic boundary of the place
* @return BoundingBox object defining place boundaries
*/
public BoundingBox getBounding_box();
/**
* Sets the geographic boundary of the place
* @param bounding_box - BoundingBox object
*/
public void setBounding_box(BoundingBox bounding_box);
}Detailed place attributes including address information and contact details.
/**
* Place attributes containing address and contact information.
* Provides detailed metadata for place objects.
*/
public class Attributes {
/**
* Default constructor
*/
public Attributes();
/**
* Gets the street address
* @return Street address string
*/
public String getStreet_address();
/**
* Sets the street address
* @param street_address - Street address
*/
public void setStreet_address(String street_address);
/**
* Gets the locality (city/town)
* @return Locality string
*/
public String getLocality();
/**
* Sets the locality
* @param locality - Locality string
*/
public void setLocality(String locality);
/**
* Gets the region (state/province)
* @return Region string
*/
public String getRegion();
/**
* Sets the region
* @param region - Region string
*/
public void setRegion(String region);
/**
* Gets the ISO3 country code
* @return Three-letter country code
*/
public String getIso3();
/**
* Sets the ISO3 country code
* @param iso3 - Three-letter country code
*/
public void setIso3(String iso3);
/**
* Gets the postal code
* @return Postal code string
*/
public String getPostal_code();
/**
* Sets the postal code
* @param postal_code - Postal code
*/
public void setPostal_code(String postal_code);
/**
* Gets the phone number
* @return Phone number string
*/
public String getPhone();
/**
* Sets the phone number
* @param phone - Phone number
*/
public void setPhone(String phone);
/**
* Gets the Twitter handle (always returns "twitter")
* @return String "twitter"
*/
public String getTwitter();
/**
* Gets the place website URL
* @return Website URL string
*/
public String getUrl();
/**
* Sets the place website URL
* @param url - Website URL
*/
public void setUrl(String url);
/**
* Gets the application ID that created this place
* @return Application ID string (corresponds to API field "app:id")
*/
public String getAppId();
/**
* Sets the application ID
* @param appId - Application ID
*/
public void setAppId(String appId);
}Geographic boundaries defining the area containing a place entity.
/**
* Longitude/latitude points defining a box containing the Place entity.
* Points are stored as [longitude, latitude] arrays following geoJSON standard.
*/
public class BoundingBox {
/**
* Default constructor
*/
public BoundingBox();
/**
* Constructor with initial coordinate points
* @param points - List of coordinate point arrays
*/
public BoundingBox(List<double[]> points);
/**
* Gets the bounding box coordinates as nested lists
* @return List containing a list of coordinate arrays
*/
public List<List<double[]>> getCoordinates();
/**
* Sets the bounding box coordinates
* @param coordinates - Nested list of coordinate arrays
*/
public void setCoordinates(List<List<double[]>> coordinates);
/**
* Gets the geometry type (default is "Polygon")
* @return String geometry type
*/
public String getType();
/**
* Sets the geometry type
* @param type - Geometry type string
*/
public void setType(String type);
}Usage Examples:
import org.apache.flink.contrib.tweetinputformat.model.tweet.Tweet;
import org.apache.flink.contrib.tweetinputformat.model.tweet.Coordinates;
import org.apache.flink.contrib.tweetinputformat.model.places.Places;
// Process geo-tagged tweets
DataSet<Tweet> tweets = env.readFile(new SimpleTweetInputFormat(), "tweets.json");
// Filter tweets with coordinates
DataSet<Tweet> geoTweets = tweets.filter(tweet -> {
Coordinates coords = tweet.getCoordinates();
return coords != null && coords.getCoordinates() != null;
});
// Extract location information
DataSet<Tuple3<String, Double, Double>> tweetLocations = geoTweets.map(tweet -> {
Coordinates coords = tweet.getCoordinates();
double[] coordArray = coords.getCoordinates();
return new Tuple3<>(
tweet.getText(),
coordArray[0], // longitude
coordArray[1] // latitude
);
});
// Process place information
DataSet<String> placeTweets = tweets
.filter(tweet -> tweet.getPlace() != null)
.map(tweet -> {
Places place = tweet.getPlace();
return String.format("%s tweeted from %s, %s",
tweet.getUser().getScreen_name(),
place.getName(),
place.getCountry()
);
});// Detailed geographic analysis
Tweet tweet = new Tweet();
// Tweet populated by input format...
// Check for precise coordinates
Coordinates coords = tweet.getCoordinates();
if (coords != null && coords.getCoordinates() != null) {
double[] coordArray = coords.getCoordinates();
System.out.println("Precise location: " + coordArray[0] + ", " + coordArray[1]);
System.out.println("Coordinates: " + coords.toString());
}
// Check for place information
Places place = tweet.getPlace();
if (place != null) {
System.out.println("Place: " + place.getFull_name());
System.out.println("Type: " + place.getPlace_type());
System.out.println("Country: " + place.getCountry() + " (" + place.getCountry_code() + ")");
// Access detailed attributes
Attributes attrs = place.getAttributes();
if (attrs.getStreet_address() != null) {
System.out.println("Address: " + attrs.getStreet_address());
System.out.println("City: " + attrs.getLocality());
System.out.println("State: " + attrs.getRegion());
System.out.println("Postal: " + attrs.getPostal_code());
}
if (attrs.getPhone() != null) {
System.out.println("Phone: " + attrs.getPhone());
}
if (attrs.getUrl() != null) {
System.out.println("Website: " + attrs.getUrl());
}
}// Geographic aggregation and analysis
DataSet<Tweet> tweets = env.readFile(new SimpleTweetInputFormat(), "tweets.json");
// Count tweets by country
DataSet<Tuple2<String, Long>> tweetsByCountry = tweets
.filter(tweet -> tweet.getPlace() != null)
.map(tweet -> tweet.getPlace().getCountry())
.groupBy(country -> country)
.reduceGroup(countryGroup -> {
String country = null;
long count = 0;
for (String c : countryGroup) {
country = c;
count++;
}
return new Tuple2<>(country, count);
});
// Find tweets within a geographic region
DataSet<Tweet> regionTweets = tweets.filter(tweet -> {
Coordinates coords = tweet.getCoordinates();
if (coords != null && coords.getCoordinates() != null) {
double[] coordArray = coords.getCoordinates();
double longitude = coordArray[0];
double latitude = coordArray[1];
// Example: tweets within New York City area
return longitude >= -74.25 && longitude <= -73.70 &&
latitude >= 40.49 && latitude <= 40.92;
}
return false;
});Key Features:
Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-tweet-inputformat-2-10