or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

geographic-data.mdindex.mdinput-format.mdtweet-entities.mdtweet-model.mduser-model.md

input-format.mddocs/

0

# Input Format Processing

1

2

Core input format functionality for reading Twitter JSON files into Flink DataSets or DataStreams with robust error handling and JSON parsing.

3

4

## Capabilities

5

6

### SimpleTweetInputFormat

7

8

Main input format class for reading Twitter JSON data files. Extends Flink's DelimitedInputFormat to handle line-delimited JSON files containing tweet data.

9

10

```java { .api }

11

/**

12

* Input format for reading Twitter JSON data files into Tweet objects.

13

* Extends DelimitedInputFormat to process line-delimited JSON files.

14

* Implements ResultTypeQueryable for Flink's type system integration.

15

*/

16

public class SimpleTweetInputFormat extends DelimitedInputFormat<Tweet>

17

implements ResultTypeQueryable<Tweet> {

18

19

/**

20

* Opens the input format and initializes JSON parsing components

21

* @param split - File input split to process

22

* @throws IOException if initialization fails

23

*/

24

public void open(FileInputSplit split) throws IOException;

25

26

/**

27

* Reads the next tweet record with error recovery for malformed JSON

28

* @param record - Tweet object to reuse for deserialization

29

* @return Parsed Tweet object

30

* @throws IOException if reading fails

31

*/

32

public Tweet nextRecord(Tweet record) throws IOException;

33

34

/**

35

* Parses raw bytes into a Tweet object using JSON parser

36

* @param reuse - Tweet object to reuse for deserialization

37

* @param bytes - Raw byte data containing JSON

38

* @param offset - Starting position in byte array

39

* @param numBytes - Number of bytes to read

40

* @return Parsed Tweet object

41

* @throws IOException if parsing fails

42

*/

43

public Tweet readRecord(Tweet reuse, byte[] bytes, int offset, int numBytes) throws IOException;

44

45

/**

46

* Returns type information for Flink's type system

47

* @return TypeInformation for Tweet class

48

*/

49

public TypeInformation<Tweet> getProducedType();

50

}

51

```

52

53

**Usage Examples:**

54

55

```java

56

import org.apache.flink.api.java.DataSet;

57

import org.apache.flink.api.java.ExecutionEnvironment;

58

import org.apache.flink.contrib.tweetinputformat.io.SimpleTweetInputFormat;

59

import org.apache.flink.contrib.tweetinputformat.model.tweet.Tweet;

60

61

// Basic usage with DataSet API

62

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

63

DataSet<Tweet> tweets = env.readFile(new SimpleTweetInputFormat(), "tweets.json");

64

65

// Process tweets

66

DataSet<String> usernames = tweets.map(tweet -> tweet.getUser().getScreen_name());

67

68

// Filter for verified users

69

DataSet<Tweet> verifiedTweets = tweets.filter(tweet -> tweet.getUser().isVerified());

70

71

// Count retweets by user

72

DataSet<Tuple2<String, Long>> retweetCounts = tweets

73

.groupBy(tweet -> tweet.getUser().getScreen_name())

74

.sum(tweet -> tweet.getRetweet_count());

75

```

76

77

```java

78

import org.apache.flink.streaming.api.datastream.DataStream;

79

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

80

81

// Usage with DataStream API for real-time processing

82

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

83

84

// Read tweets as a stream (for file monitoring scenarios)

85

DataStream<Tweet> tweetStream = env.readFile(

86

new SimpleTweetInputFormat(),

87

"tweet-directory",

88

FileProcessingMode.PROCESS_CONTINUOUSLY,

89

1000 // monitoring interval in ms

90

);

91

92

// Real-time tweet processing

93

tweetStream

94

.filter(tweet -> tweet.getEntities().getHashtags().size() > 0)

95

.map(tweet -> {

96

String hashtags = tweet.getEntities().getHashtags().stream()

97

.map(ht -> ht.getText())

98

.collect(Collectors.joining(", "));

99

return tweet.getUser().getScreen_name() + ": " + hashtags;

100

})

101

.print();

102

```

103

104

### Error Handling

105

106

The input format provides robust error handling for malformed JSON data:

107

108

```java

109

// Error recovery is built into nextRecord() method

110

public Tweet nextRecord(Tweet record) throws IOException {

111

Boolean result = false;

112

113

do {

114

try {

115

record.reset(0);

116

record = super.nextRecord(record);

117

result = true;

118

} catch (JsonParseException e) {

119

// Skip malformed records and continue processing

120

result = false;

121

}

122

} while (!result);

123

124

return record;

125

}

126

```

127

128

**Key Features:**

129

130

- **Automatic Error Recovery**: Skips malformed JSON records and continues processing

131

- **Memory Efficiency**: Reuses Tweet objects to minimize garbage collection

132

- **Flink Integration**: Full compatibility with Flink's type system and serialization

133

- **JSON Parser Integration**: Uses json-simple library for robust JSON parsing

134

- **Logging**: Debug-level logging for parsing exceptions and malformed data

135

136

### Type System Integration

137

138

```java { .api }

139

/**

140

* Interface for providing type information to Flink's type system

141

*/

142

public interface ResultTypeQueryable<T> {

143

/**

144

* Returns type information for the produced type

145

* @return TypeInformation describing the output type

146

*/

147

TypeInformation<T> getProducedType();

148

}

149

150

/**

151

* Generic type information for Tweet objects

152

*/

153

public class GenericTypeInfo<T> extends TypeInformation<T> {

154

public GenericTypeInfo(Class<T> typeClass);

155

}

156

```

157

158

The input format implements `ResultTypeQueryable<Tweet>` to provide proper type information to Flink's type system, enabling efficient serialization and deserialization in distributed processing scenarios.