0
# Data Processing
1
2
Advanced message processing capabilities including custom message handlers, type-safe transformations, and integration with Kinesis Record metadata such as partition keys and sequence numbers.
3
4
## Capabilities
5
6
### Default Message Handler
7
8
The default message handler extracts raw byte data from Kinesis Records.
9
10
```scala { .api }
11
/**
12
* Default message handler that extracts byte array from Kinesis Record.
13
* Handles null records gracefully and converts ByteBuffer to byte array.
14
*
15
* @param record Kinesis Record containing message data and metadata
16
* @return Array[Byte] Raw message data, or null if record is null
17
*/
18
def defaultMessageHandler(record: Record): Array[Byte]
19
```
20
21
**Implementation Details:**
22
- Extracts data from `record.getData()` ByteBuffer
23
- Returns null for null input records
24
- Creates new byte array with remaining buffer data
25
26
### Custom Message Handlers
27
28
Custom message handlers enable type-safe processing of Kinesis records with access to both message data and metadata.
29
30
```scala { .api }
31
/**
32
* Type alias for custom message handler functions.
33
* Transforms Kinesis Record into user-defined type T.
34
*/
35
type MessageHandler[T] = com.amazonaws.services.kinesis.model.Record => T
36
```
37
38
### Kinesis Record Structure
39
40
Understanding the Kinesis Record structure for custom message handlers:
41
42
```scala { .api }
43
// AWS Kinesis Record (external dependency)
44
// Available fields for custom message handlers:
45
// - record.getData(): java.nio.ByteBuffer - The message payload
46
// - record.getPartitionKey(): String - Partition key used for sharding
47
// - record.getSequenceNumber(): String - Unique sequence number per shard
48
// - record.getApproximateArrivalTimestamp(): java.util.Date - Arrival time
49
```
50
51
## Usage Examples
52
53
### Text Message Processing
54
55
```scala
56
import com.amazonaws.services.kinesis.model.Record
57
import java.nio.charset.StandardCharsets
58
59
// Simple text message handler
60
def textMessageHandler(record: Record): String = {
61
val buffer = record.getData()
62
val bytes = new Array[Byte](buffer.remaining())
63
buffer.get(bytes)
64
new String(bytes, StandardCharsets.UTF_8)
65
}
66
67
val textStream = KinesisUtils.createStream[String](
68
ssc, appName, streamName, endpointUrl, regionName,
69
InitialPositionInStream.LATEST, checkpointInterval, storageLevel,
70
textMessageHandler
71
)
72
```
73
74
### JSON Message Processing
75
76
```scala
77
import com.amazonaws.services.kinesis.model.Record
78
import spray.json._
79
import DefaultJsonProtocol._
80
81
case class LogEvent(
82
timestamp: Long,
83
level: String,
84
message: String,
85
source: String
86
)
87
88
implicit val logEventFormat = jsonFormat4(LogEvent)
89
90
// JSON message handler with error handling
91
def jsonLogHandler(record: Record): Option[LogEvent] = {
92
try {
93
val data = new String(record.getData().array(), StandardCharsets.UTF_8)
94
Some(data.parseJson.convertTo[LogEvent])
95
} catch {
96
case _: Exception => None // Handle malformed JSON gracefully
97
}
98
}
99
100
val logStream = KinesisUtils.createStream[Option[LogEvent]](
101
ssc, appName, streamName, endpointUrl, regionName,
102
InitialPositionInStream.LATEST, checkpointInterval, storageLevel,
103
jsonLogHandler
104
)
105
106
// Filter out malformed messages and process valid logs
107
val validLogs = logStream.flatMap(identity)
108
```
109
110
### Message Handler with Metadata
111
112
```scala
113
import com.amazonaws.services.kinesis.model.Record
114
115
case class EnrichedMessage(
116
data: String,
117
partitionKey: String,
118
sequenceNumber: String,
119
arrivalTime: Long,
120
shardId: Option[String] = None
121
)
122
123
// Message handler that captures metadata
124
def enrichedMessageHandler(record: Record): EnrichedMessage = {
125
val data = new String(record.getData().array(), StandardCharsets.UTF_8)
126
val partitionKey = record.getPartitionKey()
127
val sequenceNumber = record.getSequenceNumber()
128
val arrivalTime = record.getApproximateArrivalTimestamp().getTime()
129
130
EnrichedMessage(data, partitionKey, sequenceNumber, arrivalTime)
131
}
132
133
val enrichedStream = KinesisUtils.createStream[EnrichedMessage](
134
ssc, appName, streamName, endpointUrl, regionName,
135
InitialPositionInStream.LATEST, checkpointInterval, storageLevel,
136
enrichedMessageHandler
137
)
138
```
139
140
### Binary Data Processing
141
142
```scala
143
import com.amazonaws.services.kinesis.model.Record
144
import java.nio.ByteBuffer
145
146
case class ImageMetadata(
147
format: String,
148
width: Int,
149
height: Int,
150
size: Long
151
)
152
153
// Handler for binary image data
154
def imageMetadataHandler(record: Record): ImageMetadata = {
155
val buffer = record.getData()
156
val bytes = new Array[Byte](buffer.remaining())
157
buffer.get(bytes)
158
159
// Simple image format detection (first few bytes)
160
val format = bytes.take(4) match {
161
case Array(-1, -40, -1, _*) => "JPEG"
162
case Array(-119, 80, 78, 71, _*) => "PNG"
163
case Array(71, 73, 70, _*) => "GIF"
164
case _ => "UNKNOWN"
165
}
166
167
ImageMetadata(
168
format = format,
169
width = 0, // Would parse from headers in real implementation
170
height = 0,
171
size = bytes.length
172
)
173
}
174
175
val imageStream = KinesisUtils.createStream[ImageMetadata](
176
ssc, appName, streamName, endpointUrl, regionName,
177
InitialPositionInStream.LATEST, checkpointInterval, storageLevel,
178
imageMetadataHandler
179
)
180
```
181
182
### Avro Message Processing
183
184
```scala
185
import com.amazonaws.services.kinesis.model.Record
186
import org.apache.avro.io.{DecoderFactory, DatumReader}
187
import org.apache.avro.specific.{SpecificDatumReader, SpecificRecordBase}
188
189
// Generic Avro message handler
190
def avroMessageHandler[T <: SpecificRecordBase](
191
schema: Schema,
192
recordClass: Class[T]
193
)(record: Record): Option[T] = {
194
try {
195
val bytes = record.getData().array()
196
val decoder = DecoderFactory.get().binaryDecoder(bytes, null)
197
val reader: DatumReader[T] = new SpecificDatumReader[T](schema)
198
Some(reader.read(null.asInstanceOf[T], decoder))
199
} catch {
200
case _: Exception => None // Handle malformed Avro gracefully
201
}
202
}
203
204
// Usage with specific Avro-generated class
205
val avroStream = KinesisUtils.createStream[Option[MyAvroRecord]](
206
ssc, appName, streamName, endpointUrl, regionName,
207
InitialPositionInStream.LATEST, checkpointInterval, storageLevel,
208
avroMessageHandler(MyAvroRecord.getClassSchema(), classOf[MyAvroRecord])
209
)
210
```
211
212
## Message Handler Best Practices
213
214
### Error Handling
215
- Always handle potential exceptions in custom message handlers
216
- Consider returning `Option[T]` or `Either[Error, T]` for error-prone parsing
217
- Log parsing errors for monitoring and debugging
218
- Avoid throwing exceptions that could crash the receiver
219
220
### Performance Considerations
221
- Keep message handlers lightweight and fast
222
- Avoid expensive operations like network calls or database queries
223
- Consider pre-compiling regex patterns or parsers outside the handler
224
- Use efficient serialization libraries for structured data
225
226
### Memory Management
227
- Be careful with large message payloads
228
- Consider streaming parsers for very large messages
229
- Avoid keeping references to the original ByteBuffer
230
- Use appropriate data structures for your use case
231
232
### Type Safety
233
- Use case classes for structured data
234
- Leverage Scala's type system for compile-time safety
235
- Consider using sealed traits for message variants
236
- Validate data types and ranges when appropriate
237
238
## Advanced Processing Patterns
239
240
### Batch Message Processing
241
242
```scala
243
// Process messages in batches for efficiency
244
kinesisStream.foreachRDD { rdd =>
245
val messages = rdd.collect() // Be careful with large batches
246
247
// Batch process messages
248
val processed = processMessageBatch(messages)
249
250
// Write to external system
251
writeToDatabase(processed)
252
}
253
```
254
255
### Filtering and Transformation
256
257
```scala
258
// Chain filtering and transformation operations
259
val processedStream = kinesisStream
260
.filter(_.nonEmpty) // Filter out empty messages
261
.map(parseMessage) // Parse into structured format
262
.filter(_.isValid) // Filter valid messages only
263
.map(enrichMessage) // Add additional metadata
264
```
265
266
### Error Stream Handling
267
268
```scala
269
// Separate valid and invalid messages
270
val (validMessages, errorMessages) = kinesisStream.map { record =>
271
parseMessageSafely(record) match {
272
case Success(msg) => (Some(msg), None)
273
case Failure(err) => (None, Some(err))
274
}
275
}.cache()
276
277
val validStream = validMessages.flatMap(_._1)
278
val errorStream = errorMessages.flatMap(_._2)
279
280
// Process streams separately
281
validStream.foreachRDD(processValidMessages)
282
errorStream.foreachRDD(logErrors)
283
```