CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-tweet-inputformat-2-10

Apache Flink input format for processing Twitter tweet data in JSON format with strongly-typed Tweet objects

Pending
Overview
Eval results
Files

input-format.mddocs/

Input Format Processing

Core input format functionality for reading Twitter JSON files into Flink DataSets or DataStreams with robust error handling and JSON parsing.

Capabilities

SimpleTweetInputFormat

Main input format class for reading Twitter JSON data files. Extends Flink's DelimitedInputFormat to handle line-delimited JSON files containing tweet data.

/**
 * Input format for reading Twitter JSON data files into Tweet objects.
 * Extends DelimitedInputFormat to process line-delimited JSON files.
 * Implements ResultTypeQueryable for Flink's type system integration.
 */
public class SimpleTweetInputFormat extends DelimitedInputFormat<Tweet> 
    implements ResultTypeQueryable<Tweet> {

    /**
     * Opens the input format and initializes JSON parsing components
     * @param split - File input split to process
     * @throws IOException if initialization fails
     */
    public void open(FileInputSplit split) throws IOException;

    /**
     * Reads the next tweet record with error recovery for malformed JSON
     * @param record - Tweet object to reuse for deserialization
     * @return Parsed Tweet object
     * @throws IOException if reading fails
     */
    public Tweet nextRecord(Tweet record) throws IOException;

    /**
     * Parses raw bytes into a Tweet object using JSON parser
     * @param reuse - Tweet object to reuse for deserialization
     * @param bytes - Raw byte data containing JSON
     * @param offset - Starting position in byte array
     * @param numBytes - Number of bytes to read
     * @return Parsed Tweet object
     * @throws IOException if parsing fails
     */
    public Tweet readRecord(Tweet reuse, byte[] bytes, int offset, int numBytes) throws IOException;

    /**
     * Returns type information for Flink's type system
     * @return TypeInformation for Tweet class
     */
    public TypeInformation<Tweet> getProducedType();
}

Usage Examples:

import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.contrib.tweetinputformat.io.SimpleTweetInputFormat;
import org.apache.flink.contrib.tweetinputformat.model.tweet.Tweet;

// Basic usage with DataSet API
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tweet> tweets = env.readFile(new SimpleTweetInputFormat(), "tweets.json");

// Process tweets
DataSet<String> usernames = tweets.map(tweet -> tweet.getUser().getScreen_name());

// Filter for verified users
DataSet<Tweet> verifiedTweets = tweets.filter(tweet -> tweet.getUser().isVerified());

// Count retweets by user
DataSet<Tuple2<String, Long>> retweetCounts = tweets
    .groupBy(tweet -> tweet.getUser().getScreen_name())
    .sum(tweet -> tweet.getRetweet_count());
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

// Usage with DataStream API for real-time processing
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// Read tweets as a stream (for file monitoring scenarios)
DataStream<Tweet> tweetStream = env.readFile(
    new SimpleTweetInputFormat(),
    "tweet-directory",
    FileProcessingMode.PROCESS_CONTINUOUSLY,
    1000  // monitoring interval in ms
);

// Real-time tweet processing
tweetStream
    .filter(tweet -> tweet.getEntities().getHashtags().size() > 0)
    .map(tweet -> {
        String hashtags = tweet.getEntities().getHashtags().stream()
            .map(ht -> ht.getText())
            .collect(Collectors.joining(", "));
        return tweet.getUser().getScreen_name() + ": " + hashtags;
    })
    .print();

Error Handling

The input format provides robust error handling for malformed JSON data:

// Error recovery is built into nextRecord() method
public Tweet nextRecord(Tweet record) throws IOException {
    Boolean result = false;
    
    do {
        try {
            record.reset(0);
            record = super.nextRecord(record);
            result = true;
        } catch (JsonParseException e) {
            // Skip malformed records and continue processing
            result = false;
        }
    } while (!result);
    
    return record;
}

Key Features:

  • Automatic Error Recovery: Skips malformed JSON records and continues processing
  • Memory Efficiency: Reuses Tweet objects to minimize garbage collection
  • Flink Integration: Full compatibility with Flink's type system and serialization
  • JSON Parser Integration: Uses json-simple library for robust JSON parsing
  • Logging: Debug-level logging for parsing exceptions and malformed data

Type System Integration

/**
 * Interface for providing type information to Flink's type system
 */
public interface ResultTypeQueryable<T> {
    /**
     * Returns type information for the produced type
     * @return TypeInformation describing the output type
     */
    TypeInformation<T> getProducedType();
}

/**
 * Generic type information for Tweet objects
 */
public class GenericTypeInfo<T> extends TypeInformation<T> {
    public GenericTypeInfo(Class<T> typeClass);
}

The input format implements ResultTypeQueryable<Tweet> to provide proper type information to Flink's type system, enabling efficient serialization and deserialization in distributed processing scenarios.

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-tweet-inputformat-2-10

docs

geographic-data.md

index.md

input-format.md

tweet-entities.md

tweet-model.md

user-model.md

tile.json