Core API classes and utilities for CDAP application development, providing common data schema definitions, data format abstractions, stream event handling, and byte manipulation utilities
—
Event-driven data processing capabilities for handling streaming data with headers, body content, and timestamps. The stream processing system supports real-time data ingestion, custom event decoders, and structured event handling for building scalable data processing pipelines.
Base classes for representing stream event data with headers and typed body content.
/**
* Generic stream event data with typed body
* @param <T> Type of event body
*/
public class GenericStreamEventData<T> {
/**
* Create generic stream event data
* @param headers Immutable map of event headers
* @param body Typed event body
*/
public GenericStreamEventData(Map<String, String> headers, T body);
/**
* Get immutable map of event headers
* @return Map of header key-value pairs
*/
public Map<String, String> getHeaders();
/**
* Get typed event body
* @return Event body of type T
*/
public T getBody();
}
/**
* Stream event data with ByteBuffer body
*/
public class StreamEventData extends GenericStreamEventData<ByteBuffer> {
/**
* Create stream event data with ByteBuffer body
* @param headers Map of event headers
* @param body ByteBuffer containing event data
*/
public StreamEventData(Map<String, String> headers, ByteBuffer body);
}Usage Examples:
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
// Create event headers
Map<String, String> headers = new HashMap<>();
headers.put("source", "sensor-01");
headers.put("type", "temperature");
headers.put("format", "json");
// Create event body
String jsonData = "{\"temperature\": 23.5, \"unit\": \"celsius\"}";
ByteBuffer body = ByteBuffer.wrap(jsonData.getBytes());
// Create stream event data
StreamEventData eventData = new StreamEventData(headers, body);
// Access event information
Map<String, String> eventHeaders = eventData.getHeaders();
ByteBuffer eventBody = eventData.getBody();
String sourceId = eventHeaders.get("source"); // "sensor-01"Stream events that extend basic event data with timestamp information for temporal processing.
/**
* Stream event with timestamp information
*/
public class StreamEvent extends StreamEventData {
/**
* Create empty stream event
*/
public StreamEvent();
/**
* Create stream event with body only
* @param body Event body data
*/
public StreamEvent(ByteBuffer body);
/**
* Create stream event with headers and body (current time as timestamp)
* @param headers Event headers
* @param body Event body data
*/
public StreamEvent(Map<String, String> headers, ByteBuffer body);
/**
* Create stream event from existing event data with timestamp
* @param data Existing stream event data
* @param timestamp Event timestamp in milliseconds
*/
public StreamEvent(StreamEventData data, long timestamp);
/**
* Copy constructor
* @param event Stream event to copy
*/
public StreamEvent(StreamEvent event);
/**
* Create stream event with all parameters
* @param headers Event headers
* @param body Event body data
* @param timestamp Event timestamp in milliseconds since epoch
*/
public StreamEvent(Map<String, String> headers, ByteBuffer body, long timestamp);
/**
* Get event timestamp
* @return Timestamp in milliseconds since epoch
*/
public long getTimestamp();
}Usage Examples:
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
// Create event with current timestamp
Map<String, String> headers = new HashMap<>();
headers.put("deviceId", "device-123");
headers.put("location", "warehouse-A");
ByteBuffer data = ByteBuffer.wrap("sensor reading: 42.3".getBytes());
StreamEvent event = new StreamEvent(headers, data);
long timestamp = event.getTimestamp(); // Current time in milliseconds
// Create event with specific timestamp
long specificTime = System.currentTimeMillis() - 3600000; // 1 hour ago
StreamEvent historicalEvent = new StreamEvent(headers, data, specificTime);
// Copy existing event
StreamEvent copiedEvent = new StreamEvent(event);
// Create from existing StreamEventData
StreamEventData baseData = new StreamEventData(headers, data);
long eventTime = System.currentTimeMillis();
StreamEvent fromData = new StreamEvent(baseData, eventTime);Interface for converting stream events into structured key-value pairs with custom processing logic.
/**
* Interface for decoding stream events into key-value pairs
* @param <K> Type of decoded key
* @param <V> Type of decoded value
*/
public interface StreamEventDecoder<K, V> {
/**
* Decode stream event into key-value pair
* @param event Stream event to decode
* @param result Reusable result object for decoded output
* @return Decode result (may be same instance as result parameter)
*/
DecodeResult<K, V> decode(StreamEvent event, DecodeResult<K, V> result);
}
/**
* Container for decoded key-value pair result
* Note: Not thread-safe, reuse for performance
* @param <K> Key type
* @param <V> Value type
*/
public final class DecodeResult<K, V> {
/**
* Get decoded key
* @return Key value
*/
public K getKey();
/**
* Set decoded key
* @param key Key value
* @return This result for chaining
*/
public DecodeResult<K, V> setKey(K key);
/**
* Get decoded value
* @return Value
*/
public V getValue();
/**
* Set decoded value
* @param value Value
* @return This result for chaining
*/
public DecodeResult<K, V> setValue(V value);
}Usage Examples:
import com.google.gson.Gson;
import com.google.gson.JsonObject;
// Example: JSON event decoder that extracts user ID as key and full data as value
public class JsonEventDecoder implements StreamEventDecoder<String, JsonObject> {
private final Gson gson = new Gson();
@Override
public DecodeResult<String, JsonObject> decode(StreamEvent event, DecodeResult<String, JsonObject> result) {
// Extract body as JSON
ByteBuffer body = event.getBody();
String jsonString = new String(body.array());
JsonObject jsonData = gson.fromJson(jsonString, JsonObject.class);
// Extract user ID from JSON as key
String userId = jsonData.get("userId").getAsString();
// Set result
return result.setKey(userId).setValue(jsonData);
}
}
// Usage of decoder
JsonEventDecoder decoder = new JsonEventDecoder();
StreamEventDecoder.DecodeResult<String, JsonObject> result = new StreamEventDecoder.DecodeResult<>();
// Create sample event
Map<String, String> headers = Collections.singletonMap("type", "user_action");
String jsonBody = "{\"userId\": \"user123\", \"action\": \"login\", \"timestamp\": 1623456789}";
ByteBuffer body = ByteBuffer.wrap(jsonBody.getBytes());
StreamEvent event = new StreamEvent(headers, body);
// Decode event
DecodeResult<String, JsonObject> decodedResult = decoder.decode(event, result);
String key = decodedResult.getKey(); // "user123"
JsonObject value = decodedResult.getValue(); // Full JSON object
// Reuse result object for performance
StreamEvent nextEvent = new StreamEvent(headers,
ByteBuffer.wrap("{\"userId\": \"user456\", \"action\": \"logout\"}".getBytes()));
decoder.decode(nextEvent, result); // Reuses same result objectCommon patterns for processing stream events in data pipelines.
Event Filtering:
import java.util.function.Predicate;
public class EventFilter {
public static Predicate<StreamEvent> byHeader(String headerKey, String expectedValue) {
return event -> expectedValue.equals(event.getHeaders().get(headerKey));
}
public static Predicate<StreamEvent> byTimestamp(long minTimestamp, long maxTimestamp) {
return event -> {
long timestamp = event.getTimestamp();
return timestamp >= minTimestamp && timestamp <= maxTimestamp;
};
}
}
// Usage
Predicate<StreamEvent> deviceFilter = EventFilter.byHeader("deviceType", "sensor");
Predicate<StreamEvent> timeFilter = EventFilter.byTimestamp(
System.currentTimeMillis() - 3600000, // 1 hour ago
System.currentTimeMillis()
);
// Filter events
List<StreamEvent> events = getStreamEvents();
List<StreamEvent> filteredEvents = events.stream()
.filter(deviceFilter.and(timeFilter))
.collect(Collectors.toList());Event Transformation:
import java.util.function.Function;
public class EventTransformer {
public static Function<StreamEvent, StreamEvent> addHeader(String key, String value) {
return event -> {
Map<String, String> newHeaders = new HashMap<>(event.getHeaders());
newHeaders.put(key, value);
return new StreamEvent(newHeaders, event.getBody(), event.getTimestamp());
};
}
public static Function<StreamEvent, StreamEvent> updateTimestamp(long newTimestamp) {
return event -> new StreamEvent(event.getHeaders(), event.getBody(), newTimestamp);
}
}
// Usage
Function<StreamEvent, StreamEvent> addProcessingTime =
EventTransformer.addHeader("processedAt", String.valueOf(System.currentTimeMillis()));
StreamEvent originalEvent = new StreamEvent(headers, body);
StreamEvent enrichedEvent = addProcessingTime.apply(originalEvent);Batch Event Processing:
import java.util.List;
import java.util.concurrent.CompletableFuture;
public class BatchEventProcessor {
private final StreamEventDecoder<String, Object> decoder;
private final int batchSize;
public BatchEventProcessor(StreamEventDecoder<String, Object> decoder, int batchSize) {
this.decoder = decoder;
this.batchSize = batchSize;
}
public CompletableFuture<Void> processBatch(List<StreamEvent> events) {
return CompletableFuture.runAsync(() -> {
StreamEventDecoder.DecodeResult<String, Object> result =
new StreamEventDecoder.DecodeResult<>();
for (StreamEvent event : events) {
decoder.decode(event, result);
// Process decoded key-value pair
processKeyValue(result.getKey(), result.getValue());
}
});
}
private void processKeyValue(String key, Object value) {
// Custom processing logic
System.out.println("Processing: " + key + " -> " + value);
}
}public class EventRouter {
public String determineRoute(StreamEvent event) {
Map<String, String> headers = event.getHeaders();
String eventType = headers.get("type");
String priority = headers.get("priority");
if ("error".equals(eventType)) {
return "error-processing-queue";
} else if ("high".equals(priority)) {
return "priority-queue";
} else {
return "standard-queue";
}
}
}public class ContentAnalyzer {
public boolean containsKeyword(StreamEvent event, String keyword) {
ByteBuffer body = event.getBody();
String content = new String(body.array());
return content.toLowerCase().contains(keyword.toLowerCase());
}
public int getContentLength(StreamEvent event) {
return event.getBody().remaining();
}
}public class TemporalProcessor {
private static final long FIVE_MINUTES = 5 * 60 * 1000; // 5 minutes in milliseconds
public boolean isRecentEvent(StreamEvent event) {
long eventTime = event.getTimestamp();
long currentTime = System.currentTimeMillis();
return (currentTime - eventTime) <= FIVE_MINUTES;
}
public List<StreamEvent> groupByTimeWindow(List<StreamEvent> events, long windowSizeMs) {
return events.stream()
.sorted((e1, e2) -> Long.compare(e1.getTimestamp(), e2.getTimestamp()))
.collect(Collectors.toList());
}
}Install with Tessl CLI
npx tessl i tessl/maven-co-cask-cdap--cdap-api-common