A 100% pure Java library for the Twitter API with no extra dependency
Real-time Twitter data streaming with customizable filters and listeners.
Main interface for accessing Twitter's Streaming API.
interface TwitterStream {
/**
* Add listener for stream events
* @param listener Stream event listener
*/
void addListener(StreamListener listener);
/**
* Start filtered stream with query parameters
* @param query Filter query with keywords, users, locations
*/
void filter(FilterQuery query);
/**
* Start sample stream (random 1% of public tweets)
*/
void sample();
/**
* Start sample stream with language filter
* @param language Language code (e.g., "en", "es")
*/
void sample(String language);
/**
* Start firehose stream (approved parties only)
* @param count Number of tweets to retrieve
*/
TwitterStream firehose(int count);
/**
* Start retweet stream
*/
TwitterStream retweet();
/**
* Clean up resources and close connections
*/
void cleanUp();
/**
* Shutdown the stream completely
*/
void shutdown();
}Usage Examples:
TwitterV1 v1 = twitter.v1();
TwitterStream stream = v1.stream();
// Add status listener
stream.addListener(new StatusListener() {
@Override
public void onStatus(Status status) {
System.out.println("@" + status.getUser().getScreenName() + ": " + status.getText());
}
@Override
public void onException(Exception ex) {
ex.printStackTrace();
}
});
// Start sample stream
stream.sample();
// Or start filtered stream
FilterQuery filter = FilterQuery.ofTrack("Twitter", "API");
stream.filter(filter);
// Stop streaming after some time
Thread.sleep(60000); // Stream for 1 minute
stream.cleanUp();Primary listener interface for tweet stream events.
interface StatusListener extends StreamListener {
/**
* Called when a new tweet arrives
* @param status Tweet status object
*/
void onStatus(Status status);
/**
* Called when a tweet deletion notice is received
* @param statusDeletionNotice Deletion notice details
*/
void onDeletionNotice(StatusDeletionNotice statusDeletionNotice);
/**
* Called when track limitation notice is received
* @param numberOfLimitedStatuses Number of tweets limited
*/
void onTrackLimitationNotice(int numberOfLimitedStatuses);
/**
* Called when location deletion notice is received
* @param userId User ID
* @param upToStatusId Status ID up to which location data should be scrubbed
*/
void onScrubGeo(long userId, long upToStatusId);
/**
* Called when stall warning is received
* @param warning Stall warning details
*/
void onStallWarning(StallWarning warning);
/**
* Called when stream encounters an exception
* @param ex Exception that occurred
*/
void onException(Exception ex);
}Convenience class with empty implementations of all StatusListener methods.
class StatusAdapter implements StatusListener {
/**
* Override only the methods you need
*/
@Override
public void onStatus(Status status) {
// Empty implementation - override as needed
}
@Override
public void onDeletionNotice(StatusDeletionNotice statusDeletionNotice) {
// Empty implementation
}
@Override
public void onTrackLimitationNotice(int numberOfLimitedStatuses) {
// Empty implementation
}
@Override
public void onScrubGeo(long userId, long upToStatusId) {
// Empty implementation
}
@Override
public void onStallWarning(StallWarning warning) {
// Empty implementation
}
@Override
public void onException(Exception ex) {
// Empty implementation
}
}Usage Example:
stream.addListener(new StatusAdapter() {
@Override
public void onStatus(Status status) {
// Only handle new tweets, ignore other events
processTweet(status);
}
@Override
public void onException(Exception ex) {
System.err.println("Stream error: " + ex.getMessage());
}
});Listen to raw JSON data from the stream.
interface RawStreamListener extends StreamListener {
/**
* Called when raw JSON data is received
* @param rawJSON Raw JSON string from Twitter
*/
void onMessage(String rawJSON);
/**
* Called when stream encounters an exception
* @param ex Exception that occurred
*/
void onException(Exception ex);
}Configure streaming filters for keywords, users, and locations.
class FilterQuery {
/**
* Create filter for specific user IDs
* @param follow Array of user IDs to follow
* @return FilterQuery with user filter
*/
static FilterQuery ofFollow(long... follow);
/**
* Create filter for keywords and hashtags
* @param track Array of keywords to track
* @return FilterQuery with keyword filter
*/
static FilterQuery ofTrack(String... track);
/**
* Add user IDs to follow
* @param follow User IDs to follow
* @return FilterQuery with additional user filters
*/
FilterQuery follow(long... follow);
/**
* Add keywords to track
* @param track Keywords to track
* @return FilterQuery with additional keyword filters
*/
FilterQuery track(String... track);
/**
* Add location filters
* @param locations Array of bounding boxes [[lon1,lat1,lon2,lat2]...]
* @return FilterQuery with location filters
*/
FilterQuery locations(double[][] locations);
/**
* Add language filters
* @param language Language codes to filter by
* @return FilterQuery with language filters
*/
FilterQuery language(String... language);
/**
* Set filter level for content
* @param filterLevel Filter level (none, low, medium, high)
* @return FilterQuery with filter level
*/
FilterQuery filterLevel(FilterLevel filterLevel);
/**
* Set count for backfill
* @param count Number of tweets for backfill
* @return FilterQuery with count setting
*/
FilterQuery count(int count);
/**
* Filter level enumeration
*/
enum FilterLevel {
/** No filtering */
none,
/** Low level filtering */
low,
/** Medium level filtering */
medium,
/** High level filtering */
high
}
}Filter Examples:
// Track specific keywords
FilterQuery keywordFilter = FilterQuery.ofTrack("Twitter", "API", "#programming");
stream.filter(keywordFilter);
// Follow specific users
FilterQuery userFilter = FilterQuery.ofFollow(783214L, 17874544L, 95731L);
stream.filter(userFilter);
// Geographic filtering (San Francisco Bay Area)
double[][] sanFranciscoBBox = {{-122.75, 36.8, -121.75, 37.8}};
FilterQuery geoFilter = FilterQuery.ofTrack("earthquake")
.locations(sanFranciscoBBox);
stream.filter(geoFilter);
// Complex filter combining multiple criteria
FilterQuery complexFilter = FilterQuery.ofTrack("java", "programming")
.follow(12345L, 67890L)
.language("en", "es")
.filterLevel(FilterQuery.FilterLevel.low);
stream.filter(complexFilter);Monitor stream connection status.
interface ConnectionLifeCycleListener {
/**
* Called when connection is established
*/
void onConnect();
/**
* Called when connection is lost
*/
void onDisconnect();
/**
* Called when attempting to reconnect
*/
void onReconnect();
}interface StallWarning {
/**
* Warning message
*/
String getMessage();
/**
* Percentage of tweets being delivered
*/
int getPercentFull();
}
interface StatusDeletionNotice {
/**
* ID of deleted status
*/
long getStatusId();
/**
* User ID who deleted the status
*/
long getUserId();
}public class StreamAnalytics {
private final AtomicLong tweetCount = new AtomicLong(0);
private final Map<String, AtomicLong> hashtagCounts = new ConcurrentHashMap<>();
private final Map<String, AtomicLong> languageCounts = new ConcurrentHashMap<>();
public StatusListener createAnalyticsListener() {
return new StatusAdapter() {
@Override
public void onStatus(Status status) {
tweetCount.incrementAndGet();
// Count hashtags
for (HashtagEntity hashtag : status.getHashtagEntities()) {
hashtagCounts.computeIfAbsent(hashtag.getText().toLowerCase(),
k -> new AtomicLong(0)).incrementAndGet();
}
// Count languages
String lang = status.getLang();
if (lang != null) {
languageCounts.computeIfAbsent(lang,
k -> new AtomicLong(0)).incrementAndGet();
}
// Print stats every 1000 tweets
if (tweetCount.get() % 1000 == 0) {
printStats();
}
}
};
}
private void printStats() {
System.out.println("\n=== Stream Statistics ===");
System.out.println("Total tweets: " + tweetCount.get());
System.out.println("\nTop hashtags:");
hashtagCounts.entrySet().stream()
.sorted(Map.Entry.<String, AtomicLong>comparingByValue(
(a, b) -> Long.compare(b.get(), a.get())))
.limit(5)
.forEach(entry -> System.out.println(" #" + entry.getKey() + ": " + entry.getValue().get()));
System.out.println("\nLanguage distribution:");
languageCounts.entrySet().stream()
.sorted(Map.Entry.<String, AtomicLong>comparingByValue(
(a, b) -> Long.compare(b.get(), a.get())))
.limit(5)
.forEach(entry -> System.out.println(" " + entry.getKey() + ": " + entry.getValue().get()));
}
}public class StreamPersistence {
private final TwitterV1 v1;
private final PrintWriter logWriter;
public StreamPersistence(TwitterV1 v1, String logFile) throws IOException {
this.v1 = v1;
this.logWriter = new PrintWriter(new FileWriter(logFile, true));
}
public StatusListener createPersistenceListener() {
return new StatusAdapter() {
@Override
public void onStatus(Status status) {
// Log to file
String logEntry = String.join("\t",
status.getCreatedAt().toString(),
String.valueOf(status.getId()),
status.getUser().getScreenName(),
status.getText().replace("\n", " ").replace("\t", " "),
String.valueOf(status.getRetweetCount()),
String.valueOf(status.getFavoriteCount())
);
synchronized (logWriter) {
logWriter.println(logEntry);
logWriter.flush();
}
// Store in database (example)
storeInDatabase(status);
}
@Override
public void onException(Exception ex) {
System.err.println("Stream error: " + ex.getMessage());
}
};
}
private void storeInDatabase(Status status) {
// Database storage implementation
// Could use JDBC, JPA, or NoSQL database
}
public void close() throws IOException {
logWriter.close();
}
}public class StreamMonitor {
private final TwitterStream stream;
private final AtomicLong lastTweetTime = new AtomicLong(System.currentTimeMillis());
private final ScheduledExecutorService monitor = Executors.newScheduledThreadPool(1);
private volatile boolean isConnected = false;
public StreamMonitor(TwitterStream stream) {
this.stream = stream;
setupMonitoring();
}
private void setupMonitoring() {
// Add connection lifecycle listener
stream.addListener(new ConnectionLifeCycleListener() {
@Override
public void onConnect() {
System.out.println("Stream connected");
isConnected = true;
}
@Override
public void onDisconnect() {
System.out.println("Stream disconnected");
isConnected = false;
}
@Override
public void onReconnect() {
System.out.println("Stream reconnecting");
}
});
// Add status listener to track activity
stream.addListener(new StatusAdapter() {
@Override
public void onStatus(Status status) {
lastTweetTime.set(System.currentTimeMillis());
}
});
// Monitor for stalls
monitor.scheduleAtFixedRate(this::checkStreamHealth, 60, 60, TimeUnit.SECONDS);
}
private void checkStreamHealth() {
long timeSinceLastTweet = System.currentTimeMillis() - lastTweetTime.get();
if (timeSinceLastTweet > 300000) { // 5 minutes
System.out.println("Stream appears stalled, attempting reconnection");
restartStream();
}
if (!isConnected) {
System.out.println("Stream disconnected, attempting reconnection");
restartStream();
}
}
private void restartStream() {
try {
stream.cleanUp();
Thread.sleep(5000); // Wait before reconnecting
// Restart with same filter (implementation dependent)
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
public void shutdown() {
monitor.shutdown();
stream.shutdown();
}
}public class RobustStreamListener extends StatusAdapter {
private static final int MAX_RETRIES = 3;
private int retryCount = 0;
@Override
public void onStatus(Status status) {
try {
processStatus(status);
retryCount = 0; // Reset on successful processing
} catch (Exception e) {
System.err.println("Error processing status: " + e.getMessage());
// Continue processing other tweets
}
}
@Override
public void onException(Exception ex) {
System.err.println("Stream exception: " + ex.getMessage());
if (retryCount < MAX_RETRIES) {
retryCount++;
System.out.println("Retrying... (" + retryCount + "/" + MAX_RETRIES + ")");
try {
Thread.sleep(5000 * retryCount); // Exponential backoff
// Stream will automatically reconnect
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
} else {
System.err.println("Max retries exceeded, stopping stream");
// Handle permanent failure
}
}
@Override
public void onStallWarning(StallWarning warning) {
System.out.println("Stall warning: " + warning.getMessage());
System.out.println("Stream is " + warning.getPercentFull() + "% full");
}
private void processStatus(Status status) {
// Your tweet processing logic here
}
}// Example of respecting streaming limits
FilterQuery filter = FilterQuery.ofTrack("keyword1", "keyword2") // Max 400 keywords
.follow(userId1, userId2) // Max 5000 users
.locations(boundingBox1, boundingBox2); // Max 25 locations
if (filter.getTrackCount() > 400) {
throw new IllegalArgumentException("Too many track keywords");
}
stream.filter(filter);Install with Tessl CLI
npx tessl i tessl/maven-org-twitter4j--twitter4j-core