or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

aws-configuration.mddata-processing.mdindex.mdjava-api.mdstream-creation.md

data-processing.mddocs/

0

# Data Processing

1

2

Advanced message processing capabilities including custom message handlers, type-safe transformations, and integration with Kinesis Record metadata such as partition keys and sequence numbers.

3

4

## Capabilities

5

6

### Default Message Handler

7

8

The default message handler extracts raw byte data from Kinesis Records.

9

10

```scala { .api }

11

/**

12

* Default message handler that extracts byte array from Kinesis Record.

13

* Handles null records gracefully and converts ByteBuffer to byte array.

14

*

15

* @param record Kinesis Record containing message data and metadata

16

* @return Array[Byte] Raw message data, or null if record is null

17

*/

18

def defaultMessageHandler(record: Record): Array[Byte]

19

```

20

21

**Implementation Details:**

22

- Extracts data from `record.getData()` ByteBuffer

23

- Returns null for null input records

24

- Creates new byte array with remaining buffer data

25

26

### Custom Message Handlers

27

28

Custom message handlers enable type-safe processing of Kinesis records with access to both message data and metadata.

29

30

```scala { .api }

31

/**

32

* Type alias for custom message handler functions.

33

* Transforms Kinesis Record into user-defined type T.

34

*/

35

type MessageHandler[T] = com.amazonaws.services.kinesis.model.Record => T

36

```

37

38

### Kinesis Record Structure

39

40

Understanding the Kinesis Record structure for custom message handlers:

41

42

```scala { .api }

43

// AWS Kinesis Record (external dependency)

44

// Available fields for custom message handlers:

45

// - record.getData(): java.nio.ByteBuffer - The message payload

46

// - record.getPartitionKey(): String - Partition key used for sharding

47

// - record.getSequenceNumber(): String - Unique sequence number per shard

48

// - record.getApproximateArrivalTimestamp(): java.util.Date - Arrival time

49

```

50

51

## Usage Examples

52

53

### Text Message Processing

54

55

```scala

56

import com.amazonaws.services.kinesis.model.Record

57

import java.nio.charset.StandardCharsets

58

59

// Simple text message handler

60

def textMessageHandler(record: Record): String = {

61

val buffer = record.getData()

62

val bytes = new Array[Byte](buffer.remaining())

63

buffer.get(bytes)

64

new String(bytes, StandardCharsets.UTF_8)

65

}

66

67

val textStream = KinesisUtils.createStream[String](

68

ssc, appName, streamName, endpointUrl, regionName,

69

InitialPositionInStream.LATEST, checkpointInterval, storageLevel,

70

textMessageHandler

71

)

72

```

73

74

### JSON Message Processing

75

76

```scala

77

import com.amazonaws.services.kinesis.model.Record

78

import spray.json._

79

import DefaultJsonProtocol._

80

81

case class LogEvent(

82

timestamp: Long,

83

level: String,

84

message: String,

85

source: String

86

)

87

88

implicit val logEventFormat = jsonFormat4(LogEvent)

89

90

// JSON message handler with error handling

91

def jsonLogHandler(record: Record): Option[LogEvent] = {

92

try {

93

val data = new String(record.getData().array(), StandardCharsets.UTF_8)

94

Some(data.parseJson.convertTo[LogEvent])

95

} catch {

96

case _: Exception => None // Handle malformed JSON gracefully

97

}

98

}

99

100

val logStream = KinesisUtils.createStream[Option[LogEvent]](

101

ssc, appName, streamName, endpointUrl, regionName,

102

InitialPositionInStream.LATEST, checkpointInterval, storageLevel,

103

jsonLogHandler

104

)

105

106

// Filter out malformed messages and process valid logs

107

val validLogs = logStream.flatMap(identity)

108

```

109

110

### Message Handler with Metadata

111

112

```scala

113

import com.amazonaws.services.kinesis.model.Record

114

115

case class EnrichedMessage(

116

data: String,

117

partitionKey: String,

118

sequenceNumber: String,

119

arrivalTime: Long,

120

shardId: Option[String] = None

121

)

122

123

// Message handler that captures metadata

124

def enrichedMessageHandler(record: Record): EnrichedMessage = {

125

val data = new String(record.getData().array(), StandardCharsets.UTF_8)

126

val partitionKey = record.getPartitionKey()

127

val sequenceNumber = record.getSequenceNumber()

128

val arrivalTime = record.getApproximateArrivalTimestamp().getTime()

129

130

EnrichedMessage(data, partitionKey, sequenceNumber, arrivalTime)

131

}

132

133

val enrichedStream = KinesisUtils.createStream[EnrichedMessage](

134

ssc, appName, streamName, endpointUrl, regionName,

135

InitialPositionInStream.LATEST, checkpointInterval, storageLevel,

136

enrichedMessageHandler

137

)

138

```

139

140

### Binary Data Processing

141

142

```scala

143

import com.amazonaws.services.kinesis.model.Record

144

import java.nio.ByteBuffer

145

146

case class ImageMetadata(

147

format: String,

148

width: Int,

149

height: Int,

150

size: Long

151

)

152

153

// Handler for binary image data

154

def imageMetadataHandler(record: Record): ImageMetadata = {

155

val buffer = record.getData()

156

val bytes = new Array[Byte](buffer.remaining())

157

buffer.get(bytes)

158

159

// Simple image format detection (first few bytes)

160

val format = bytes.take(4) match {

161

case Array(-1, -40, -1, _*) => "JPEG"

162

case Array(-119, 80, 78, 71, _*) => "PNG"

163

case Array(71, 73, 70, _*) => "GIF"

164

case _ => "UNKNOWN"

165

}

166

167

ImageMetadata(

168

format = format,

169

width = 0, // Would parse from headers in real implementation

170

height = 0,

171

size = bytes.length

172

)

173

}

174

175

val imageStream = KinesisUtils.createStream[ImageMetadata](

176

ssc, appName, streamName, endpointUrl, regionName,

177

InitialPositionInStream.LATEST, checkpointInterval, storageLevel,

178

imageMetadataHandler

179

)

180

```

181

182

### Avro Message Processing

183

184

```scala

185

import com.amazonaws.services.kinesis.model.Record

186

import org.apache.avro.io.{DecoderFactory, DatumReader}

187

import org.apache.avro.specific.{SpecificDatumReader, SpecificRecordBase}

188

189

// Generic Avro message handler

190

def avroMessageHandler[T <: SpecificRecordBase](

191

schema: Schema,

192

recordClass: Class[T]

193

)(record: Record): Option[T] = {

194

try {

195

val bytes = record.getData().array()

196

val decoder = DecoderFactory.get().binaryDecoder(bytes, null)

197

val reader: DatumReader[T] = new SpecificDatumReader[T](schema)

198

Some(reader.read(null.asInstanceOf[T], decoder))

199

} catch {

200

case _: Exception => None // Handle malformed Avro gracefully

201

}

202

}

203

204

// Usage with specific Avro-generated class

205

val avroStream = KinesisUtils.createStream[Option[MyAvroRecord]](

206

ssc, appName, streamName, endpointUrl, regionName,

207

InitialPositionInStream.LATEST, checkpointInterval, storageLevel,

208

avroMessageHandler(MyAvroRecord.getClassSchema(), classOf[MyAvroRecord])

209

)

210

```

211

212

## Message Handler Best Practices

213

214

### Error Handling

215

- Always handle potential exceptions in custom message handlers

216

- Consider returning `Option[T]` or `Either[Error, T]` for error-prone parsing

217

- Log parsing errors for monitoring and debugging

218

- Avoid throwing exceptions that could crash the receiver

219

220

### Performance Considerations

221

- Keep message handlers lightweight and fast

222

- Avoid expensive operations like network calls or database queries

223

- Consider pre-compiling regex patterns or parsers outside the handler

224

- Use efficient serialization libraries for structured data

225

226

### Memory Management

227

- Be careful with large message payloads

228

- Consider streaming parsers for very large messages

229

- Avoid keeping references to the original ByteBuffer

230

- Use appropriate data structures for your use case

231

232

### Type Safety

233

- Use case classes for structured data

234

- Leverage Scala's type system for compile-time safety

235

- Consider using sealed traits for message variants

236

- Validate data types and ranges when appropriate

237

238

## Advanced Processing Patterns

239

240

### Batch Message Processing

241

242

```scala

243

// Process messages in batches for efficiency

244

kinesisStream.foreachRDD { rdd =>

245

val messages = rdd.collect() // Be careful with large batches

246

247

// Batch process messages

248

val processed = processMessageBatch(messages)

249

250

// Write to external system

251

writeToDatabase(processed)

252

}

253

```

254

255

### Filtering and Transformation

256

257

```scala

258

// Chain filtering and transformation operations

259

val processedStream = kinesisStream

260

.filter(_.nonEmpty) // Filter out empty messages

261

.map(parseMessage) // Parse into structured format

262

.filter(_.isValid) // Filter valid messages only

263

.map(enrichMessage) // Add additional metadata

264

```

265

266

### Error Stream Handling

267

268

```scala

269

// Separate valid and invalid messages

270

val (validMessages, errorMessages) = kinesisStream.map { record =>

271

parseMessageSafely(record) match {

272

case Success(msg) => (Some(msg), None)

273

case Failure(err) => (None, Some(err))

274

}

275

}.cache()

276

277

val validStream = validMessages.flatMap(_._1)

278

val errorStream = errorMessages.flatMap(_._2)

279

280

// Process streams separately

281

validStream.foreachRDD(processValidMessages)

282

errorStream.foreachRDD(logErrors)

283

```