CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-twitter4j--twitter4j-core

A 100% pure Java library for the Twitter API with no extra dependency

Overview
Eval results
Files

streaming.mddocs/

Real-time Streaming

Real-time Twitter data streaming with customizable filters and listeners.

Core Streaming Interface

TwitterStream

Main interface for accessing Twitter's Streaming API.

interface TwitterStream {
    /**
     * Add listener for stream events
     * @param listener Stream event listener
     */
    void addListener(StreamListener listener);
    
    /**
     * Start filtered stream with query parameters
     * @param query Filter query with keywords, users, locations
     */
    void filter(FilterQuery query);
    
    /**
     * Start sample stream (random 1% of public tweets)
     */
    void sample();
    
    /**
     * Start sample stream with language filter
     * @param language Language code (e.g., "en", "es")
     */
    void sample(String language);
    
    /**
     * Start firehose stream (approved parties only)
     * @param count Number of tweets to retrieve
     */
    TwitterStream firehose(int count);
    
    /**
     * Start retweet stream
     */
    TwitterStream retweet();
    
    /**
     * Clean up resources and close connections
     */
    void cleanUp();
    
    /**
     * Shutdown the stream completely
     */
    void shutdown();
}

Usage Examples:

TwitterV1 v1 = twitter.v1();
TwitterStream stream = v1.stream();

// Add status listener
stream.addListener(new StatusListener() {
    @Override
    public void onStatus(Status status) {
        System.out.println("@" + status.getUser().getScreenName() + ": " + status.getText());
    }
    
    @Override
    public void onException(Exception ex) {
        ex.printStackTrace();
    }
});

// Start sample stream
stream.sample();

// Or start filtered stream
FilterQuery filter = FilterQuery.ofTrack("Twitter", "API");
stream.filter(filter);

// Stop streaming after some time
Thread.sleep(60000); // Stream for 1 minute
stream.cleanUp();

Stream Listeners

StatusListener

Primary listener interface for tweet stream events.

interface StatusListener extends StreamListener {
    /**
     * Called when a new tweet arrives
     * @param status Tweet status object
     */
    void onStatus(Status status);
    
    /**
     * Called when a tweet deletion notice is received
     * @param statusDeletionNotice Deletion notice details
     */
    void onDeletionNotice(StatusDeletionNotice statusDeletionNotice);
    
    /**
     * Called when track limitation notice is received
     * @param numberOfLimitedStatuses Number of tweets limited
     */
    void onTrackLimitationNotice(int numberOfLimitedStatuses);
    
    /**
     * Called when location deletion notice is received  
     * @param userId User ID
     * @param upToStatusId Status ID up to which location data should be scrubbed
     */
    void onScrubGeo(long userId, long upToStatusId);
    
    /**
     * Called when stall warning is received
     * @param warning Stall warning details
     */
    void onStallWarning(StallWarning warning);
    
    /**
     * Called when stream encounters an exception
     * @param ex Exception that occurred
     */
    void onException(Exception ex);
}

StatusAdapter

Convenience class with empty implementations of all StatusListener methods.

class StatusAdapter implements StatusListener {
    /**
     * Override only the methods you need
     */
    @Override
    public void onStatus(Status status) {
        // Empty implementation - override as needed
    }
    
    @Override
    public void onDeletionNotice(StatusDeletionNotice statusDeletionNotice) {
        // Empty implementation
    }
    
    @Override
    public void onTrackLimitationNotice(int numberOfLimitedStatuses) {
        // Empty implementation
    }
    
    @Override
    public void onScrubGeo(long userId, long upToStatusId) {
        // Empty implementation
    }
    
    @Override
    public void onStallWarning(StallWarning warning) {
        // Empty implementation
    }
    
    @Override
    public void onException(Exception ex) {
        // Empty implementation
    }
}

Usage Example:

stream.addListener(new StatusAdapter() {
    @Override
    public void onStatus(Status status) {
        // Only handle new tweets, ignore other events
        processTweet(status);
    }
    
    @Override
    public void onException(Exception ex) {
        System.err.println("Stream error: " + ex.getMessage());
    }
});

RawStreamListener

Listen to raw JSON data from the stream.

interface RawStreamListener extends StreamListener {
    /**
     * Called when raw JSON data is received
     * @param rawJSON Raw JSON string from Twitter
     */
    void onMessage(String rawJSON);
    
    /**
     * Called when stream encounters an exception
     * @param ex Exception that occurred
     */
    void onException(Exception ex);
}

Stream Filtering

FilterQuery

Configure streaming filters for keywords, users, and locations.

class FilterQuery {
    /**
     * Create filter for specific user IDs
     * @param follow Array of user IDs to follow
     * @return FilterQuery with user filter
     */
    static FilterQuery ofFollow(long... follow);
    
    /**
     * Create filter for keywords and hashtags
     * @param track Array of keywords to track
     * @return FilterQuery with keyword filter
     */
    static FilterQuery ofTrack(String... track);
    
    /**
     * Add user IDs to follow
     * @param follow User IDs to follow
     * @return FilterQuery with additional user filters
     */
    FilterQuery follow(long... follow);
    
    /**
     * Add keywords to track
     * @param track Keywords to track
     * @return FilterQuery with additional keyword filters
     */
    FilterQuery track(String... track);
    
    /**
     * Add location filters
     * @param locations Array of bounding boxes [[lon1,lat1,lon2,lat2]...]
     * @return FilterQuery with location filters
     */
    FilterQuery locations(double[][] locations);
    
    /**
     * Add language filters
     * @param language Language codes to filter by
     * @return FilterQuery with language filters
     */
    FilterQuery language(String... language);
    
    /**
     * Set filter level for content
     * @param filterLevel Filter level (none, low, medium, high)
     * @return FilterQuery with filter level
     */
    FilterQuery filterLevel(FilterLevel filterLevel);
    
    /**
     * Set count for backfill
     * @param count Number of tweets for backfill
     * @return FilterQuery with count setting
     */
    FilterQuery count(int count);
    
    /**
     * Filter level enumeration
     */
    enum FilterLevel {
        /** No filtering */
        none,
        /** Low level filtering */
        low,
        /** Medium level filtering */
        medium,
        /** High level filtering */
        high
    }
}

Filter Examples:

// Track specific keywords
FilterQuery keywordFilter = FilterQuery.ofTrack("Twitter", "API", "#programming");
stream.filter(keywordFilter);

// Follow specific users
FilterQuery userFilter = FilterQuery.ofFollow(783214L, 17874544L, 95731L);
stream.filter(userFilter);

// Geographic filtering (San Francisco Bay Area)
double[][] sanFranciscoBBox = {{-122.75, 36.8, -121.75, 37.8}};
FilterQuery geoFilter = FilterQuery.ofTrack("earthquake")
    .locations(sanFranciscoBBox);
stream.filter(geoFilter);

// Complex filter combining multiple criteria
FilterQuery complexFilter = FilterQuery.ofTrack("java", "programming")
    .follow(12345L, 67890L)
    .language("en", "es")
    .filterLevel(FilterQuery.FilterLevel.low);
stream.filter(complexFilter);

Stream Event Handling

Connection Lifecycle

Monitor stream connection status.

interface ConnectionLifeCycleListener {
    /**
     * Called when connection is established
     */
    void onConnect();
    
    /**
     * Called when connection is lost
     */
    void onDisconnect();
    
    /**
     * Called when attempting to reconnect
     */
    void onReconnect();
}

Stream Data Models

interface StallWarning {
    /**
     * Warning message
     */
    String getMessage();
    
    /**
     * Percentage of tweets being delivered
     */
    int getPercentFull();
}

interface StatusDeletionNotice {
    /**
     * ID of deleted status
     */
    long getStatusId();
    
    /**
     * User ID who deleted the status
     */
    long getUserId();
}

Advanced Streaming Patterns

Real-time Analytics

public class StreamAnalytics {
    private final AtomicLong tweetCount = new AtomicLong(0);
    private final Map<String, AtomicLong> hashtagCounts = new ConcurrentHashMap<>();
    private final Map<String, AtomicLong> languageCounts = new ConcurrentHashMap<>();
    
    public StatusListener createAnalyticsListener() {
        return new StatusAdapter() {
            @Override
            public void onStatus(Status status) {
                tweetCount.incrementAndGet();
                
                // Count hashtags
                for (HashtagEntity hashtag : status.getHashtagEntities()) {
                    hashtagCounts.computeIfAbsent(hashtag.getText().toLowerCase(), 
                        k -> new AtomicLong(0)).incrementAndGet();
                }
                
                // Count languages
                String lang = status.getLang();
                if (lang != null) {
                    languageCounts.computeIfAbsent(lang, 
                        k -> new AtomicLong(0)).incrementAndGet();
                }
                
                // Print stats every 1000 tweets
                if (tweetCount.get() % 1000 == 0) {
                    printStats();
                }
            }
        };
    }
    
    private void printStats() {
        System.out.println("\n=== Stream Statistics ===");
        System.out.println("Total tweets: " + tweetCount.get());
        
        System.out.println("\nTop hashtags:");
        hashtagCounts.entrySet().stream()
            .sorted(Map.Entry.<String, AtomicLong>comparingByValue(
                (a, b) -> Long.compare(b.get(), a.get())))
            .limit(5)
            .forEach(entry -> System.out.println("  #" + entry.getKey() + ": " + entry.getValue().get()));
        
        System.out.println("\nLanguage distribution:");
        languageCounts.entrySet().stream()
            .sorted(Map.Entry.<String, AtomicLong>comparingByValue(
                (a, b) -> Long.compare(b.get(), a.get())))
            .limit(5)
            .forEach(entry -> System.out.println("  " + entry.getKey() + ": " + entry.getValue().get()));
    }
}

Stream Persistence

public class StreamPersistence {
    private final TwitterV1 v1;
    private final PrintWriter logWriter;
    
    public StreamPersistence(TwitterV1 v1, String logFile) throws IOException {
        this.v1 = v1;
        this.logWriter = new PrintWriter(new FileWriter(logFile, true));
    }
    
    public StatusListener createPersistenceListener() {
        return new StatusAdapter() {
            @Override
            public void onStatus(Status status) {
                // Log to file
                String logEntry = String.join("\t",
                    status.getCreatedAt().toString(),
                    String.valueOf(status.getId()),
                    status.getUser().getScreenName(),
                    status.getText().replace("\n", " ").replace("\t", " "),
                    String.valueOf(status.getRetweetCount()),
                    String.valueOf(status.getFavoriteCount())
                );
                
                synchronized (logWriter) {
                    logWriter.println(logEntry);
                    logWriter.flush();
                }
                
                // Store in database (example)
                storeInDatabase(status);
            }
            
            @Override
            public void onException(Exception ex) {
                System.err.println("Stream error: " + ex.getMessage());
            }
        };
    }
    
    private void storeInDatabase(Status status) {
        // Database storage implementation
        // Could use JDBC, JPA, or NoSQL database
    }
    
    public void close() throws IOException {
        logWriter.close();
    }
}

Stream Monitoring and Recovery

public class StreamMonitor {
    private final TwitterStream stream;
    private final AtomicLong lastTweetTime = new AtomicLong(System.currentTimeMillis());
    private final ScheduledExecutorService monitor = Executors.newScheduledThreadPool(1);
    private volatile boolean isConnected = false;
    
    public StreamMonitor(TwitterStream stream) {
        this.stream = stream;
        setupMonitoring();
    }
    
    private void setupMonitoring() {
        // Add connection lifecycle listener
        stream.addListener(new ConnectionLifeCycleListener() {
            @Override
            public void onConnect() {
                System.out.println("Stream connected");
                isConnected = true;
            }
            
            @Override
            public void onDisconnect() {
                System.out.println("Stream disconnected");
                isConnected = false;
            }
            
            @Override
            public void onReconnect() {
                System.out.println("Stream reconnecting");
            }
        });
        
        // Add status listener to track activity
        stream.addListener(new StatusAdapter() {
            @Override
            public void onStatus(Status status) {
                lastTweetTime.set(System.currentTimeMillis());
            }
        });
        
        // Monitor for stalls
        monitor.scheduleAtFixedRate(this::checkStreamHealth, 60, 60, TimeUnit.SECONDS);
    }
    
    private void checkStreamHealth() {
        long timeSinceLastTweet = System.currentTimeMillis() - lastTweetTime.get();
        
        if (timeSinceLastTweet > 300000) { // 5 minutes
            System.out.println("Stream appears stalled, attempting reconnection");
            restartStream();
        }
        
        if (!isConnected) {
            System.out.println("Stream disconnected, attempting reconnection");
            restartStream();
        }
    }
    
    private void restartStream() {
        try {
            stream.cleanUp();
            Thread.sleep(5000); // Wait before reconnecting
            // Restart with same filter (implementation dependent)
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
    
    public void shutdown() {
        monitor.shutdown();
        stream.shutdown();
    }
}

Error Handling and Best Practices

Stream Error Recovery

public class RobustStreamListener extends StatusAdapter {
    private static final int MAX_RETRIES = 3;
    private int retryCount = 0;
    
    @Override
    public void onStatus(Status status) {
        try {
            processStatus(status);
            retryCount = 0; // Reset on successful processing
        } catch (Exception e) {
            System.err.println("Error processing status: " + e.getMessage());
            // Continue processing other tweets
        }
    }
    
    @Override
    public void onException(Exception ex) {
        System.err.println("Stream exception: " + ex.getMessage());
        
        if (retryCount < MAX_RETRIES) {
            retryCount++;
            System.out.println("Retrying... (" + retryCount + "/" + MAX_RETRIES + ")");
            
            try {
                Thread.sleep(5000 * retryCount); // Exponential backoff
                // Stream will automatically reconnect
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        } else {
            System.err.println("Max retries exceeded, stopping stream");
            // Handle permanent failure
        }
    }
    
    @Override
    public void onStallWarning(StallWarning warning) {
        System.out.println("Stall warning: " + warning.getMessage());
        System.out.println("Stream is " + warning.getPercentFull() + "% full");
    }
    
    private void processStatus(Status status) {
        // Your tweet processing logic here
    }
}

Rate Limiting and Connection Limits

  • Each app can have only one streaming connection at a time
  • Follow limits: 5,000 user IDs maximum
  • Track limits: 400 keywords maximum
  • Location boxes: 25 maximum
  • Streaming connections count against rate limits
  • Use connection pooling and reconnection strategies
// Example of respecting streaming limits
FilterQuery filter = FilterQuery.ofTrack("keyword1", "keyword2") // Max 400 keywords
    .follow(userId1, userId2) // Max 5000 users
    .locations(boundingBox1, boundingBox2); // Max 25 locations
    
if (filter.getTrackCount() > 400) {
    throw new IllegalArgumentException("Too many track keywords");
}

stream.filter(filter);

Install with Tessl CLI

npx tessl i tessl/maven-org-twitter4j--twitter4j-core

docs

core-auth.md

direct-messages.md

favorites.md

index.md

lists.md

places.md

search.md

streaming.md

timelines.md

tweets.md

users.md

tile.json