or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

entity-parsing.mdgeographic-data.mdindex.mdinput-format.mdtweet-model.mduser-model.md
tile.json

input-format.mddocs/

Tweet Input Format

Core input format functionality for reading and parsing Twitter JSON data streams into structured Tweet objects within Apache Flink processing pipelines.

Capabilities

SimpleTweetInputFormat

Main input format class that extends Flink's DelimitedInputFormat to handle Twitter JSON data with error resilience and proper type integration.

/**
 * Apache Flink input format for parsing Twitter JSON data into Tweet objects.
 * Extends DelimitedInputFormat to provide streaming JSON parsing with error handling.
 */
public class SimpleTweetInputFormat extends DelimitedInputFormat<Tweet> 
    implements ResultTypeQueryable<Tweet> {
    
    /**
     * Initialize the input format with JSON parser and tweet handler.
     * Called by Flink runtime before processing begins.
     * 
     * @param split FileInputSplit containing the data source information
     * @throws IOException if initialization fails
     */
    public void open(FileInputSplit split) throws IOException;
    
    /**
     * Read the next tweet record from the input stream with error handling.
     * Continues processing even if individual tweets are malformed.
     * 
     * @param record Tweet object to reuse for parsing (for efficiency)
     * @return Parsed Tweet object or null if end of stream
     * @throws IOException if stream reading fails
     */
    public Tweet nextRecord(Tweet record) throws IOException;
    
    /**
     * Parse raw JSON bytes into a Tweet object using streaming JSON parser.
     * Handles malformed JSON gracefully with proper logging.
     * 
     * @param reuse Tweet object to reuse for parsing
     * @param bytes Raw JSON bytes to parse
     * @param offset Starting position in byte array
     * @param numBytes Number of bytes to read
     * @return Parsed Tweet object
     * @throws IOException if parsing fails critically
     */
    public Tweet readRecord(Tweet reuse, byte[] bytes, int offset, int numBytes) throws IOException;
    
    /**
     * Provide type information for Flink's serialization system.
     * Required for proper Kryo serialization of Tweet objects.
     * 
     * @return TypeInformation for Tweet class
     */
    public TypeInformation<Tweet> getProducedType();
}

Usage Examples:

import org.apache.flink.contrib.tweetinputformat.io.SimpleTweetInputFormat;
import org.apache.flink.contrib.tweetinputformat.model.tweet.Tweet;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;

// Basic usage with file input
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
SimpleTweetInputFormat inputFormat = new SimpleTweetInputFormat();
DataSet<Tweet> tweets = env.readFile(inputFormat, "tweets.json");

// Process tweets
tweets.filter(tweet -> tweet.getRetweet_count() > 100)
      .map(tweet -> tweet.getUser().getScreen_name() + ": " + tweet.getText())
      .print();

// Usage with streaming (DataStream API)
StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tweet> tweetStream = streamEnv.readFile(inputFormat, "stream/");

tweetStream.filter(tweet -> tweet.getLang().equals("en"))
           .map(tweet -> new TweetSummary(
               tweet.getId_str(),
               tweet.getUser().getScreen_name(),
               tweet.getText(),
               tweet.getCreated_at()
           ))
           .addSink(new TweetSink());

TweetHandler

JSON parsing handler that implements the streaming ContentHandler interface for efficient processing of Twitter JSON data.

/**
 * Streaming JSON parser handler for Twitter data structures.
 * Implements ContentHandler for efficient parsing of nested JSON objects.
 */
public class TweetHandler implements ContentHandler {
    
    /** Tweet object being populated during parsing */
    protected Tweet reuse;
    
    /**
     * Handle the start of JSON parsing.
     * @throws ParseException if parsing setup fails
     * @throws IOException if I/O error occurs
     */
    public void startJSON() throws ParseException, IOException;
    
    /**
     * Handle the end of JSON parsing.
     * @throws ParseException if parsing cleanup fails  
     * @throws IOException if I/O error occurs
     */
    public void endJSON() throws ParseException, IOException;
    
    /**
     * Handle start of JSON object.
     * @return true to continue parsing
     * @throws ParseException if object parsing fails
     * @throws IOException if I/O error occurs
     */
    public boolean startObject() throws ParseException, IOException;
    
    /**
     * Handle end of JSON object.
     * @return true to continue parsing
     * @throws ParseException if object parsing fails
     * @throws IOException if I/O error occurs  
     */
    public boolean endObject() throws ParseException, IOException;
    
    /**
     * Handle start of object property with key name.
     * @param key Property name
     * @return true to continue parsing
     * @throws ParseException if property parsing fails
     * @throws IOException if I/O error occurs
     */
    public boolean startObjectEntry(String key) throws ParseException, IOException;
    
    /**
     * Handle primitive values (strings, numbers, booleans).
     * @param value Primitive value to process
     * @return true to continue parsing
     * @throws ParseException if value parsing fails
     * @throws IOException if I/O error occurs
     */
    public boolean primitive(Object value) throws ParseException, IOException;
}

Error Handling

The input format provides robust error handling for production use:

  • Malformed JSON: Individual malformed tweets are logged and skipped, processing continues
  • Type Conversion: Invalid data types are handled gracefully with default values
  • Parse Exceptions: Detailed logging of parsing issues without stopping the stream
  • I/O Errors: Proper propagation of critical I/O failures

Example Error Scenarios:

// The input format handles these gracefully:
// 1. Malformed JSON tweets
{"invalid": json, "missing": quotes}

// 2. Missing required fields  
{"text": "Hello", "missing_id": true}

// 3. Wrong data types
{"id": "not_a_number", "retweet_count": "invalid"}

// 4. Truncated JSON
{"text": "Hello world", "incomplete":

Integration with Flink

The input format integrates seamlessly with Flink's DataSet and DataStream APIs:

// DataSet API (batch processing)
DataSet<Tweet> tweets = env.readFile(new SimpleTweetInputFormat(), inputPath);

// DataStream API (stream processing)  
DataStream<Tweet> tweetStream = streamEnv.readFile(new SimpleTweetInputFormat(), inputPath);

// Custom serialization (if needed)
env.getConfig().registerKryoType(Tweet.class);
env.getConfig().registerKryoType(Users.class);
env.getConfig().registerKryoType(Entities.class);