CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-co-cask-cdap--cdap-api-common

Core API classes and utilities for CDAP application development, providing common data schema definitions, data format abstractions, stream event handling, and byte manipulation utilities

Pending
Overview
Eval results
Files

stream-processing.mddocs/

Stream Processing

Event-driven data processing capabilities for handling streaming data with headers, body content, and timestamps. The stream processing system supports real-time data ingestion, custom event decoders, and structured event handling for building scalable data processing pipelines.

Capabilities

Stream Event Data

Base classes for representing stream event data with headers and typed body content.

/**
 * Generic stream event data with typed body
 * @param <T> Type of event body
 */
public class GenericStreamEventData<T> {
    /**
     * Create generic stream event data
     * @param headers Immutable map of event headers
     * @param body Typed event body
     */
    public GenericStreamEventData(Map<String, String> headers, T body);
    
    /**
     * Get immutable map of event headers
     * @return Map of header key-value pairs
     */
    public Map<String, String> getHeaders();
    
    /**
     * Get typed event body
     * @return Event body of type T
     */
    public T getBody();
}

/**
 * Stream event data with ByteBuffer body
 */
public class StreamEventData extends GenericStreamEventData<ByteBuffer> {
    /**
     * Create stream event data with ByteBuffer body
     * @param headers Map of event headers
     * @param body ByteBuffer containing event data
     */
    public StreamEventData(Map<String, String> headers, ByteBuffer body);
}

Usage Examples:

import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;

// Create event headers
Map<String, String> headers = new HashMap<>();
headers.put("source", "sensor-01");
headers.put("type", "temperature");
headers.put("format", "json");

// Create event body  
String jsonData = "{\"temperature\": 23.5, \"unit\": \"celsius\"}";
ByteBuffer body = ByteBuffer.wrap(jsonData.getBytes());

// Create stream event data
StreamEventData eventData = new StreamEventData(headers, body);

// Access event information
Map<String, String> eventHeaders = eventData.getHeaders();
ByteBuffer eventBody = eventData.getBody();
String sourceId = eventHeaders.get("source"); // "sensor-01"

Stream Events with Timestamps

Stream events that extend basic event data with timestamp information for temporal processing.

/**
 * Stream event with timestamp information
 */
public class StreamEvent extends StreamEventData {
    /**
     * Create empty stream event
     */
    public StreamEvent();
    
    /**
     * Create stream event with body only
     * @param body Event body data
     */
    public StreamEvent(ByteBuffer body);
    
    /**
     * Create stream event with headers and body (current time as timestamp)
     * @param headers Event headers
     * @param body Event body data
     */
    public StreamEvent(Map<String, String> headers, ByteBuffer body);
    
    /**
     * Create stream event from existing event data with timestamp
     * @param data Existing stream event data
     * @param timestamp Event timestamp in milliseconds
     */
    public StreamEvent(StreamEventData data, long timestamp);
    
    /**
     * Copy constructor
     * @param event Stream event to copy
     */
    public StreamEvent(StreamEvent event);
    
    /**
     * Create stream event with all parameters
     * @param headers Event headers
     * @param body Event body data
     * @param timestamp Event timestamp in milliseconds since epoch
     */
    public StreamEvent(Map<String, String> headers, ByteBuffer body, long timestamp);
    
    /**
     * Get event timestamp
     * @return Timestamp in milliseconds since epoch
     */
    public long getTimestamp();
}

Usage Examples:

import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

// Create event with current timestamp
Map<String, String> headers = new HashMap<>();
headers.put("deviceId", "device-123");
headers.put("location", "warehouse-A");

ByteBuffer data = ByteBuffer.wrap("sensor reading: 42.3".getBytes());
StreamEvent event = new StreamEvent(headers, data);

long timestamp = event.getTimestamp(); // Current time in milliseconds

// Create event with specific timestamp
long specificTime = System.currentTimeMillis() - 3600000; // 1 hour ago
StreamEvent historicalEvent = new StreamEvent(headers, data, specificTime);

// Copy existing event
StreamEvent copiedEvent = new StreamEvent(event);

// Create from existing StreamEventData
StreamEventData baseData = new StreamEventData(headers, data);
long eventTime = System.currentTimeMillis();
StreamEvent fromData = new StreamEvent(baseData, eventTime);

Stream Event Decoders

Interface for converting stream events into structured key-value pairs with custom processing logic.

/**
 * Interface for decoding stream events into key-value pairs
 * @param <K> Type of decoded key
 * @param <V> Type of decoded value
 */
public interface StreamEventDecoder<K, V> {
    /**
     * Decode stream event into key-value pair
     * @param event Stream event to decode
     * @param result Reusable result object for decoded output
     * @return Decode result (may be same instance as result parameter)
     */
    DecodeResult<K, V> decode(StreamEvent event, DecodeResult<K, V> result);
    
}

/**
 * Container for decoded key-value pair result
 * Note: Not thread-safe, reuse for performance
 * @param <K> Key type
 * @param <V> Value type
 */
public final class DecodeResult<K, V> {
    /**
     * Get decoded key
     * @return Key value
     */
    public K getKey();
    
    /**
     * Set decoded key
     * @param key Key value
     * @return This result for chaining
     */
    public DecodeResult<K, V> setKey(K key);
    
    /**
     * Get decoded value
     * @return Value
     */
    public V getValue();
    
    /**
     * Set decoded value  
     * @param value Value
     * @return This result for chaining
     */
    public DecodeResult<K, V> setValue(V value);
}

Usage Examples:

import com.google.gson.Gson;
import com.google.gson.JsonObject;

// Example: JSON event decoder that extracts user ID as key and full data as value
public class JsonEventDecoder implements StreamEventDecoder<String, JsonObject> {
    private final Gson gson = new Gson();
    
    @Override
    public DecodeResult<String, JsonObject> decode(StreamEvent event, DecodeResult<String, JsonObject> result) {
        // Extract body as JSON
        ByteBuffer body = event.getBody();
        String jsonString = new String(body.array());
        JsonObject jsonData = gson.fromJson(jsonString, JsonObject.class);
        
        // Extract user ID from JSON as key
        String userId = jsonData.get("userId").getAsString();
        
        // Set result
        return result.setKey(userId).setValue(jsonData);
    }
}

// Usage of decoder
JsonEventDecoder decoder = new JsonEventDecoder();
StreamEventDecoder.DecodeResult<String, JsonObject> result = new StreamEventDecoder.DecodeResult<>();

// Create sample event
Map<String, String> headers = Collections.singletonMap("type", "user_action");
String jsonBody = "{\"userId\": \"user123\", \"action\": \"login\", \"timestamp\": 1623456789}";
ByteBuffer body = ByteBuffer.wrap(jsonBody.getBytes());
StreamEvent event = new StreamEvent(headers, body);

// Decode event
DecodeResult<String, JsonObject> decodedResult = decoder.decode(event, result);
String key = decodedResult.getKey();           // "user123"  
JsonObject value = decodedResult.getValue();   // Full JSON object

// Reuse result object for performance
StreamEvent nextEvent = new StreamEvent(headers, 
    ByteBuffer.wrap("{\"userId\": \"user456\", \"action\": \"logout\"}".getBytes()));
decoder.decode(nextEvent, result); // Reuses same result object

Custom Event Processing Patterns

Common patterns for processing stream events in data pipelines.

Event Filtering:

import java.util.function.Predicate;

public class EventFilter {
    public static Predicate<StreamEvent> byHeader(String headerKey, String expectedValue) {
        return event -> expectedValue.equals(event.getHeaders().get(headerKey));
    }
    
    public static Predicate<StreamEvent> byTimestamp(long minTimestamp, long maxTimestamp) {
        return event -> {
            long timestamp = event.getTimestamp();
            return timestamp >= minTimestamp && timestamp <= maxTimestamp;
        };
    }
}

// Usage
Predicate<StreamEvent> deviceFilter = EventFilter.byHeader("deviceType", "sensor");
Predicate<StreamEvent> timeFilter = EventFilter.byTimestamp(
    System.currentTimeMillis() - 3600000,  // 1 hour ago
    System.currentTimeMillis()
);

// Filter events
List<StreamEvent> events = getStreamEvents();
List<StreamEvent> filteredEvents = events.stream()
    .filter(deviceFilter.and(timeFilter))
    .collect(Collectors.toList());

Event Transformation:

import java.util.function.Function;

public class EventTransformer {
    public static Function<StreamEvent, StreamEvent> addHeader(String key, String value) {
        return event -> {
            Map<String, String> newHeaders = new HashMap<>(event.getHeaders());
            newHeaders.put(key, value);
            return new StreamEvent(newHeaders, event.getBody(), event.getTimestamp());
        };
    }
    
    public static Function<StreamEvent, StreamEvent> updateTimestamp(long newTimestamp) {
        return event -> new StreamEvent(event.getHeaders(), event.getBody(), newTimestamp);
    }
}

// Usage
Function<StreamEvent, StreamEvent> addProcessingTime = 
    EventTransformer.addHeader("processedAt", String.valueOf(System.currentTimeMillis()));

StreamEvent originalEvent = new StreamEvent(headers, body);
StreamEvent enrichedEvent = addProcessingTime.apply(originalEvent);

Batch Event Processing:

import java.util.List;
import java.util.concurrent.CompletableFuture;

public class BatchEventProcessor {
    private final StreamEventDecoder<String, Object> decoder;
    private final int batchSize;
    
    public BatchEventProcessor(StreamEventDecoder<String, Object> decoder, int batchSize) {
        this.decoder = decoder;
        this.batchSize = batchSize;
    }
    
    public CompletableFuture<Void> processBatch(List<StreamEvent> events) {
        return CompletableFuture.runAsync(() -> {
            StreamEventDecoder.DecodeResult<String, Object> result = 
                new StreamEventDecoder.DecodeResult<>();
            
            for (StreamEvent event : events) {
                decoder.decode(event, result);
                // Process decoded key-value pair
                processKeyValue(result.getKey(), result.getValue());
            }
        });
    }
    
    private void processKeyValue(String key, Object value) {
        // Custom processing logic
        System.out.println("Processing: " + key + " -> " + value);
    }
}

Event Data Access Patterns

Header-Based Routing

public class EventRouter {
    public String determineRoute(StreamEvent event) {
        Map<String, String> headers = event.getHeaders();
        
        String eventType = headers.get("type");
        String priority = headers.get("priority");
        
        if ("error".equals(eventType)) {
            return "error-processing-queue";
        } else if ("high".equals(priority)) {
            return "priority-queue";
        } else {
            return "standard-queue";
        }
    }
}

Body Content Inspection

public class ContentAnalyzer {
    public boolean containsKeyword(StreamEvent event, String keyword) {
        ByteBuffer body = event.getBody();
        String content = new String(body.array());
        return content.toLowerCase().contains(keyword.toLowerCase());
    }
    
    public int getContentLength(StreamEvent event) {
        return event.getBody().remaining();
    }
}

Temporal Event Processing

public class TemporalProcessor {
    private static final long FIVE_MINUTES = 5 * 60 * 1000; // 5 minutes in milliseconds
    
    public boolean isRecentEvent(StreamEvent event) {
        long eventTime = event.getTimestamp();
        long currentTime = System.currentTimeMillis();
        return (currentTime - eventTime) <= FIVE_MINUTES;
    }
    
    public List<StreamEvent> groupByTimeWindow(List<StreamEvent> events, long windowSizeMs) {
        return events.stream()
            .sorted((e1, e2) -> Long.compare(e1.getTimestamp(), e2.getTimestamp()))
            .collect(Collectors.toList());
    }
}

Performance Considerations

Memory Management

  • StreamEvent objects are lightweight but contain references to header maps and ByteBuffers
  • Reuse DecodeResult objects in tight processing loops to reduce garbage collection
  • ByteBuffer body data shares underlying byte arrays, so modifications affect all references

Threading Considerations

  • StreamEvent and StreamEventData are immutable after construction (thread-safe for reading)
  • DecodeResult is explicitly marked as not thread-safe - use separate instances per thread
  • Header maps returned by getHeaders() are immutable collections

Processing Efficiency

  • Access headers by key is O(1) using hash-based lookup
  • ByteBuffer body access is direct memory access (very fast)
  • Timestamp access is a simple field read (no computation)
  • Decoder interface allows for efficient stream processing without object allocation per event

Install with Tessl CLI

npx tessl i tessl/maven-co-cask-cdap--cdap-api-common

docs

byte-utilities.md

data-format-system.md

index.md

schema-system.md

stream-processing.md

structured-records.md

tile.json