or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

entity-parsing.mdgeographic-data.mdindex.mdinput-format.mdtweet-model.mduser-model.md

input-format.mddocs/

0

# Tweet Input Format

1

2

Core input format functionality for reading and parsing Twitter JSON data streams into structured Tweet objects within Apache Flink processing pipelines.

3

4

## Capabilities

5

6

### SimpleTweetInputFormat

7

8

Main input format class that extends Flink's DelimitedInputFormat to handle Twitter JSON data with error resilience and proper type integration.

9

10

```java { .api }

11

/**

12

* Apache Flink input format for parsing Twitter JSON data into Tweet objects.

13

* Extends DelimitedInputFormat to provide streaming JSON parsing with error handling.

14

*/

15

public class SimpleTweetInputFormat extends DelimitedInputFormat<Tweet>

16

implements ResultTypeQueryable<Tweet> {

17

18

/**

19

* Initialize the input format with JSON parser and tweet handler.

20

* Called by Flink runtime before processing begins.

21

*

22

* @param split FileInputSplit containing the data source information

23

* @throws IOException if initialization fails

24

*/

25

public void open(FileInputSplit split) throws IOException;

26

27

/**

28

* Read the next tweet record from the input stream with error handling.

29

* Continues processing even if individual tweets are malformed.

30

*

31

* @param record Tweet object to reuse for parsing (for efficiency)

32

* @return Parsed Tweet object or null if end of stream

33

* @throws IOException if stream reading fails

34

*/

35

public Tweet nextRecord(Tweet record) throws IOException;

36

37

/**

38

* Parse raw JSON bytes into a Tweet object using streaming JSON parser.

39

* Handles malformed JSON gracefully with proper logging.

40

*

41

* @param reuse Tweet object to reuse for parsing

42

* @param bytes Raw JSON bytes to parse

43

* @param offset Starting position in byte array

44

* @param numBytes Number of bytes to read

45

* @return Parsed Tweet object

46

* @throws IOException if parsing fails critically

47

*/

48

public Tweet readRecord(Tweet reuse, byte[] bytes, int offset, int numBytes) throws IOException;

49

50

/**

51

* Provide type information for Flink's serialization system.

52

* Required for proper Kryo serialization of Tweet objects.

53

*

54

* @return TypeInformation for Tweet class

55

*/

56

public TypeInformation<Tweet> getProducedType();

57

}

58

```

59

60

**Usage Examples:**

61

62

```java

63

import org.apache.flink.contrib.tweetinputformat.io.SimpleTweetInputFormat;

64

import org.apache.flink.contrib.tweetinputformat.model.tweet.Tweet;

65

import org.apache.flink.api.java.DataSet;

66

import org.apache.flink.api.java.ExecutionEnvironment;

67

68

// Basic usage with file input

69

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

70

SimpleTweetInputFormat inputFormat = new SimpleTweetInputFormat();

71

DataSet<Tweet> tweets = env.readFile(inputFormat, "tweets.json");

72

73

// Process tweets

74

tweets.filter(tweet -> tweet.getRetweet_count() > 100)

75

.map(tweet -> tweet.getUser().getScreen_name() + ": " + tweet.getText())

76

.print();

77

78

// Usage with streaming (DataStream API)

79

StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();

80

DataStream<Tweet> tweetStream = streamEnv.readFile(inputFormat, "stream/");

81

82

tweetStream.filter(tweet -> tweet.getLang().equals("en"))

83

.map(tweet -> new TweetSummary(

84

tweet.getId_str(),

85

tweet.getUser().getScreen_name(),

86

tweet.getText(),

87

tweet.getCreated_at()

88

))

89

.addSink(new TweetSink());

90

```

91

92

### TweetHandler

93

94

JSON parsing handler that implements the streaming ContentHandler interface for efficient processing of Twitter JSON data.

95

96

```java { .api }

97

/**

98

* Streaming JSON parser handler for Twitter data structures.

99

* Implements ContentHandler for efficient parsing of nested JSON objects.

100

*/

101

public class TweetHandler implements ContentHandler {

102

103

/** Tweet object being populated during parsing */

104

protected Tweet reuse;

105

106

/**

107

* Handle the start of JSON parsing.

108

* @throws ParseException if parsing setup fails

109

* @throws IOException if I/O error occurs

110

*/

111

public void startJSON() throws ParseException, IOException;

112

113

/**

114

* Handle the end of JSON parsing.

115

* @throws ParseException if parsing cleanup fails

116

* @throws IOException if I/O error occurs

117

*/

118

public void endJSON() throws ParseException, IOException;

119

120

/**

121

* Handle start of JSON object.

122

* @return true to continue parsing

123

* @throws ParseException if object parsing fails

124

* @throws IOException if I/O error occurs

125

*/

126

public boolean startObject() throws ParseException, IOException;

127

128

/**

129

* Handle end of JSON object.

130

* @return true to continue parsing

131

* @throws ParseException if object parsing fails

132

* @throws IOException if I/O error occurs

133

*/

134

public boolean endObject() throws ParseException, IOException;

135

136

/**

137

* Handle start of object property with key name.

138

* @param key Property name

139

* @return true to continue parsing

140

* @throws ParseException if property parsing fails

141

* @throws IOException if I/O error occurs

142

*/

143

public boolean startObjectEntry(String key) throws ParseException, IOException;

144

145

/**

146

* Handle primitive values (strings, numbers, booleans).

147

* @param value Primitive value to process

148

* @return true to continue parsing

149

* @throws ParseException if value parsing fails

150

* @throws IOException if I/O error occurs

151

*/

152

public boolean primitive(Object value) throws ParseException, IOException;

153

}

154

```

155

156

## Error Handling

157

158

The input format provides robust error handling for production use:

159

160

- **Malformed JSON**: Individual malformed tweets are logged and skipped, processing continues

161

- **Type Conversion**: Invalid data types are handled gracefully with default values

162

- **Parse Exceptions**: Detailed logging of parsing issues without stopping the stream

163

- **I/O Errors**: Proper propagation of critical I/O failures

164

165

**Example Error Scenarios:**

166

167

```java

168

// The input format handles these gracefully:

169

// 1. Malformed JSON tweets

170

{"invalid": json, "missing": quotes}

171

172

// 2. Missing required fields

173

{"text": "Hello", "missing_id": true}

174

175

// 3. Wrong data types

176

{"id": "not_a_number", "retweet_count": "invalid"}

177

178

// 4. Truncated JSON

179

{"text": "Hello world", "incomplete":

180

```

181

182

## Integration with Flink

183

184

The input format integrates seamlessly with Flink's DataSet and DataStream APIs:

185

186

```java

187

// DataSet API (batch processing)

188

DataSet<Tweet> tweets = env.readFile(new SimpleTweetInputFormat(), inputPath);

189

190

// DataStream API (stream processing)

191

DataStream<Tweet> tweetStream = streamEnv.readFile(new SimpleTweetInputFormat(), inputPath);

192

193

// Custom serialization (if needed)

194

env.getConfig().registerKryoType(Tweet.class);

195

env.getConfig().registerKryoType(Users.class);

196

env.getConfig().registerKryoType(Entities.class);

197

```