CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-co-cask-cdap--cdap-data-fabric

Core data management capabilities for CDAP including dataset operations, metadata management, lineage tracking, audit functionality, and data registry services for Hadoop-based applications.

Pending
Overview
Eval results
Files

stream-processing.mddocs/

Stream Processing

Real-time stream processing capabilities with coordination, file management, partitioning, and multiple decoder support for various data formats. The stream processing system provides essential functionality for handling high-throughput real-time data ingestion, processing, and consumption within the CDAP platform.

Capabilities

Stream Administration

The primary interface for stream lifecycle management and configuration with comprehensive administrative operations.

public interface StreamAdmin {
    // Stream lifecycle operations
    void create(StreamId streamId) throws Exception;
    void create(StreamId streamId, Map<String, String> properties) throws Exception;
    void drop(StreamId streamId) throws Exception;
    void truncate(StreamId streamId) throws Exception;
    
    // Stream configuration management
    void updateConfig(StreamId streamId, StreamProperties properties) throws Exception;
    StreamProperties getConfig(StreamId streamId) throws Exception;
    
    // Stream metadata and queries
    StreamSpecification getSpecification(StreamId streamId) throws Exception;
    List<StreamSpecification> listStreams(NamespaceId namespaceId) throws Exception;
    boolean exists(StreamId streamId) throws Exception;
    
    // Administrative operations
    void upgrade() throws Exception;
    
    // Stream statistics and monitoring
    StreamStats getStats(StreamId streamId) throws Exception;
    long getSize(StreamId streamId) throws Exception;
}

Stream View Management

Stream view store interface for managing logical views over streams with comprehensive CRUD operations and view listing capabilities.

public interface ViewStore {
    /**
     * Creates a view or updates if it already exists.
     * @param viewId the view identifier
     * @param config the view configuration specification
     * @return true if a new view was created, false if updated
     */
    boolean createOrUpdate(StreamViewId viewId, ViewSpecification config);
    
    /**
     * Checks if a view exists.
     * @param viewId the view identifier
     * @return true if the view exists
     */
    boolean exists(StreamViewId viewId);
    
    /**
     * Deletes a stream view.
     * @param viewId the view identifier to delete
     */
    void delete(StreamViewId viewId) throws NotFoundException;
    
    /**
     * Lists all views for a given stream.
     * @param streamId the stream identifier
     * @return list of view identifiers for the stream
     */
    List<StreamViewId> list(StreamId streamId);
    
    /**
     * Gets the details of a specific view.
     * @param viewId the view identifier
     * @return the view details including configuration and metadata
     */
    ViewDetail get(StreamViewId viewId) throws NotFoundException;
}

Stream Coordination

Stream coordination client for managing distributed stream processing with service lifecycle management.

public interface StreamCoordinatorClient extends Service {
    // Stream coordination operations
    void createStream(StreamId streamId, Map<String, String> properties) throws Exception;
    void deleteStream(StreamId streamId) throws Exception;
    
    // Stream configuration coordination
    void updateStreamProperties(StreamId streamId, StreamProperties properties) throws Exception;
    StreamProperties getStreamProperties(StreamId streamId) throws Exception;
    
    // Consumer coordination
    void addConsumerGroup(StreamId streamId, long groupId) throws Exception;
    void removeConsumerGroup(StreamId streamId, long groupId) throws Exception;
    
    // Partition management
    int getPartitionCount(StreamId streamId) throws Exception;
    void setPartitionCount(StreamId streamId, int partitions) throws Exception;
}

Stream File Management

Stream file writers and management for persistent stream storage with partitioning and format support.

// Stream file writer factory
public interface StreamFileWriterFactory {
    StreamFileWriter create(StreamId streamId, int partition);
    StreamFileWriter create(StreamId streamId, int partition, StreamFileType fileType);
}

// Time-partitioned stream file writer for temporal data organization
public class TimePartitionedStreamFileWriter implements StreamFileWriter {
    // Time-based partitioning with configurable partition intervals
    public void append(StreamEvent event) throws IOException;
    public void flush() throws IOException;
    public void close() throws IOException;
    
    // Partition management
    public void rotatePartition() throws IOException;
    public String getCurrentPartitionPath();
    public long getCurrentPartitionTimestamp();
}

// Stream file type enumeration
public enum StreamFileType {
    EVENT,      // Standard event files
    INDEX,      // Index files for efficient seeking
    META,       // Metadata files
    TEMP        // Temporary files during processing
}

Stream Input Processing

Stream input processing components for handling various data sources and input formats.

// Stream input split finder for distributed processing
public class StreamInputSplitFinder<T> {
    public List<StreamInputSplit> findSplits(StreamInputFormat<T> inputFormat, 
                                            StreamId streamId, 
                                            long startTime, 
                                            long endTime) throws IOException;
    
    public List<StreamInputSplit> findSplits(StreamInputFormat<T> inputFormat,
                                            StreamId streamId,
                                            TimeRange timeRange,
                                            int maxSplits) throws IOException;
}

// Stream service manager for lifecycle coordination
public class StreamServiceManager implements Service {
    // Service lifecycle management for stream processing
    @Override
    protected void doStart();
    
    @Override
    protected void doStop();
    
    // Stream service coordination
    public void registerStreamService(StreamId streamId, StreamService service);
    public void unregisterStreamService(StreamId streamId);
    public StreamService getStreamService(StreamId streamId);
}

Stream Event Decoding

Multiple decoder implementations for various data formats and encoding schemes.

// Base stream event decoder interface
public interface StreamEventDecoder<T> {
    T decode(StreamEvent event, Charset charset) throws Exception;
    DecodeResult<T> decode(StreamEvent event, Charset charset, DecodeCallback<T> callback) throws Exception;
}

// String-based stream event decoder
public class StringStreamEventDecoder implements StreamEventDecoder<String> {
    @Override
    public String decode(StreamEvent event, Charset charset) throws Exception;
    
    // Decode with custom processing
    @Override
    public DecodeResult<String> decode(StreamEvent event, Charset charset, 
                                      DecodeCallback<String> callback) throws Exception;
}

// Text stream event decoder with line-based processing
public class TextStreamEventDecoder implements StreamEventDecoder<String> {
    // Line-by-line text processing with configurable delimiters
    public void setLineDelimiter(String delimiter);
    public void setMaxLineLength(int maxLength);
}

// Binary stream event decoder
public class BytesStreamEventDecoder implements StreamEventDecoder<byte[]> {
    // Raw binary data processing
    @Override
    public byte[] decode(StreamEvent event, Charset charset) throws Exception;
    
    // Chunk-based binary processing
    public DecodeResult<byte[]> decodeChunked(StreamEvent event, int chunkSize) throws Exception;
}

// Identity decoder for pass-through processing
public class IdentityStreamEventDecoder implements StreamEventDecoder<StreamEvent> {
    // No transformation - returns original stream event
    @Override
    public StreamEvent decode(StreamEvent event, Charset charset) throws Exception;
}

// Format-based decoder for structured data
public class FormatStreamEventDecoder<T> implements StreamEventDecoder<T> {
    // Configurable format-based decoding (JSON, Avro, etc.)
    public FormatStreamEventDecoder(RecordFormat<T> format);
    public FormatStreamEventDecoder(RecordFormat<T> format, Schema schema);
    
    @Override
    public T decode(StreamEvent event, Charset charset) throws Exception;
}

Stream Consumer Interface

Transaction-aware stream consumer with positioning and batch processing capabilities.

public interface StreamConsumer extends Closeable, TransactionAware {
    // Basic consumption operations
    DequeInputDatum poll(long timeout, TimeUnit unit) throws InterruptedException;
    void consume(int maxEvents, StreamConsumerCallback callback) throws InterruptedException;
    
    // Consumer positioning
    void seek(StreamEventOffset offset);
    StreamEventOffset getPosition();
    
    // Batch consumption with configuration
    ConsumeBatch consume(ConsumeConfig config) throws InterruptedException;
    
    // Consumer state management
    ConsumerState getConsumerState();
    void setConsumerState(ConsumerState state);
}

// Stream consumer factory
public interface StreamConsumerFactory {
    StreamConsumer create(StreamId streamId, String namespace, ConsumerConfig config);
    StreamConsumer create(StreamId streamId, String namespace, ConsumerConfig config, 
                         StreamConsumerState startState);
    
    // Consumer group management
    StreamConsumer createGroupConsumer(StreamId streamId, String namespace, 
                                      String groupId, ConsumerConfig config);
}

Usage Examples

Basic Stream Administration

// Access stream admin (typically injected)
StreamAdmin streamAdmin = // ... obtain instance

// Create a basic stream
StreamId streamId = NamespaceId.DEFAULT.stream("userEvents");
try {
    streamAdmin.create(streamId);
    System.out.println("Created stream: " + streamId.getStream());
} catch (Exception e) {
    System.err.println("Failed to create stream: " + e.getMessage());
}

// Create stream with custom properties
Map<String, String> properties = Map.of(
    "format.name", "avro",
    "schema.literal", "{\"type\":\"record\",\"name\":\"Event\",\"fields\":[...]}",
    "retention.seconds", "604800", // 7 days
    "partition.duration", "3600"   // 1 hour partitions
);

StreamId configuredStream = NamespaceId.DEFAULT.stream("configuredEvents");
try {
    streamAdmin.create(configuredStream, properties);
    System.out.println("Created configured stream: " + configuredStream.getStream());
} catch (Exception e) {
    System.err.println("Failed to create configured stream: " + e.getMessage());
}

// Check if stream exists
boolean exists = streamAdmin.exists(streamId);
System.out.println("Stream exists: " + exists);

// Get stream configuration
try {
    StreamProperties config = streamAdmin.getConfig(streamId);
    System.out.println("Stream format: " + config.getFormat());
    System.out.println("Stream TTL: " + config.getTTL());
} catch (Exception e) {
    System.err.println("Failed to get stream config: " + e.getMessage());
}

Stream Configuration Management

// Update stream properties
StreamProperties updatedProperties = StreamProperties.builder()
    .setTTL(1209600) // 14 days
    .setFormat(new FormatSpecification("json", null))
    .setNotificationThresholdMB(100)
    .build();

try {
    streamAdmin.updateConfig(streamId, updatedProperties);
    System.out.println("Updated stream configuration");
} catch (Exception e) {
    System.err.println("Failed to update stream config: " + e.getMessage());
}

// List all streams in namespace
try {
    List<StreamSpecification> streams = streamAdmin.listStreams(NamespaceId.DEFAULT);
    System.out.println("Streams in default namespace:");
    for (StreamSpecification stream : streams) {
        System.out.println("  - " + stream.getName());
        System.out.println("    Format: " + stream.getFormat());
        System.out.println("    TTL: " + stream.getTTL() + " seconds");
    }
} catch (Exception e) {
    System.err.println("Failed to list streams: " + e.getMessage());
}

// Get stream statistics
try {
    StreamStats stats = streamAdmin.getStats(streamId);
    System.out.println("Stream statistics:");
    System.out.println("  Events: " + stats.getEvents());
    System.out.println("  Size: " + stats.getBytes() + " bytes");
    System.out.println("  Ingested in last hour: " + stats.getRecentEvents());
} catch (Exception e) {
    System.err.println("Failed to get stream stats: " + e.getMessage());
}

Stream Event Decoding

// String decoder for text-based events
StringStreamEventDecoder stringDecoder = new StringStreamEventDecoder();

// Process stream events as strings
public void processTextEvents(StreamConsumer consumer) {
    try {
        DequeInputDatum event = consumer.poll(5, TimeUnit.SECONDS);
        if (event != null) {
            StreamEvent streamEvent = convertToStreamEvent(event);
            String decodedText = stringDecoder.decode(streamEvent, StandardCharsets.UTF_8);
            
            System.out.println("Decoded event: " + decodedText);
            processTextEvent(decodedText);
        }
    } catch (Exception e) {
        System.err.println("Failed to decode stream event: " + e.getMessage());
    }
}

// Binary decoder for raw data
BytesStreamEventDecoder binaryDecoder = new BytesStreamEventDecoder();

public void processBinaryEvents(StreamConsumer consumer) {
    try {
        DequeInputDatum event = consumer.poll(5, TimeUnit.SECONDS);
        if (event != null) {
            StreamEvent streamEvent = convertToStreamEvent(event);
            byte[] binaryData = binaryDecoder.decode(streamEvent, null);
            
            System.out.println("Decoded binary data: " + binaryData.length + " bytes");
            processBinaryData(binaryData);
        }
    } catch (Exception e) {
        System.err.println("Failed to decode binary event: " + e.getMessage());
    }
}

// Format-based decoder for structured data
Schema avroSchema = // ... load Avro schema
RecordFormat<GenericRecord> avroFormat = new AvroRecordFormat<>(avroSchema);
FormatStreamEventDecoder<GenericRecord> avroDecoder = 
    new FormatStreamEventDecoder<>(avroFormat, avroSchema);

public void processAvroEvents(StreamConsumer consumer) {
    try {
        DequeInputDatum event = consumer.poll(5, TimeUnit.SECONDS);
        if (event != null) {
            StreamEvent streamEvent = convertToStreamEvent(event);
            GenericRecord record = avroDecoder.decode(streamEvent, StandardCharsets.UTF_8);
            
            System.out.println("Decoded Avro record: " + record);
            processAvroRecord(record);
        }
    } catch (Exception e) {
        System.err.println("Failed to decode Avro event: " + e.getMessage());
    }
}

Stream Consumer Usage

// Create stream consumer
StreamConsumerFactory consumerFactory = // ... obtain factory
ConsumerConfig config = ConsumerConfig.builder()
    .setDequeueTimeout(5000)  // 5 seconds
    .setMaxDequeueSize(100)   // Max 100 events per batch
    .build();

StreamConsumer consumer = consumerFactory.create(streamId, "default", config);

// Basic event consumption
try {
    DequeInputDatum event = consumer.poll(10, TimeUnit.SECONDS);
    if (event != null) {
        System.out.println("Received event: " + new String(event.getData()));
        System.out.println("Event timestamp: " + event.getTimestamp());
        System.out.println("Event headers: " + event.getHeaders());
    } else {
        System.out.println("No events available");
    }
} catch (InterruptedException e) {
    System.out.println("Consumer polling interrupted");
}

// Batch event consumption with callback
try {
    consumer.consume(50, new StreamConsumerCallback() {
        @Override
        public void onEvent(DequeInputDatum event, DequeInputDatum eventMetadata) throws Exception {
            String eventData = new String(event.getData());
            System.out.println("Processing event: " + eventData);
            
            // Process the event
            processEvent(eventData);
        }
        
        @Override
        public void onFinish() throws Exception {
            System.out.println("Batch processing completed");
        }
        
        @Override
        public void onError(Exception error) throws Exception {
            System.err.println("Error processing batch: " + error.getMessage());
            throw error;
        }
    });
} catch (InterruptedException e) {
    System.out.println("Consumer batch processing interrupted");
}

// Consumer positioning
StreamEventOffset currentPosition = consumer.getPosition();
System.out.println("Current position: " + currentPosition);

// Seek to specific offset
StreamEventOffset seekOffset = new StreamEventOffset(0, 1000);
consumer.seek(seekOffset);
System.out.println("Seeked to offset: " + seekOffset);

Advanced Stream Processing Patterns

// Time-partitioned stream processing
public class TimePartitionedStreamProcessor {
    private final StreamAdmin streamAdmin;
    private final StreamConsumerFactory consumerFactory;
    private final TimePartitionedStreamFileWriter fileWriter;
    
    public TimePartitionedStreamProcessor(StreamAdmin streamAdmin, 
                                        StreamConsumerFactory consumerFactory,
                                        TimePartitionedStreamFileWriter fileWriter) {
        this.streamAdmin = streamAdmin;
        this.consumerFactory = consumerFactory;
        this.fileWriter = fileWriter;
    }
    
    public void processTimePartitions(StreamId streamId, long startTime, long endTime) {
        try {
            // Create consumer for time range
            StreamConsumer consumer = consumerFactory.create(streamId, "default", 
                ConsumerConfig.builder().build());
            
            // Seek to start time
            StreamEventOffset startOffset = findOffsetForTime(streamId, startTime);
            consumer.seek(startOffset);
            
            long currentTime = startTime;
            long partitionInterval = 3600000; // 1 hour partitions
            
            while (currentTime < endTime) {
                long partitionEnd = Math.min(currentTime + partitionInterval, endTime);
                
                System.out.println("Processing partition: " + 
                    new Date(currentTime) + " to " + new Date(partitionEnd));
                
                processTimePartition(consumer, currentTime, partitionEnd);
                
                currentTime = partitionEnd;
            }
            
        } catch (Exception e) {
            System.err.println("Failed to process time partitions: " + e.getMessage());
        }
    }
    
    private void processTimePartition(StreamConsumer consumer, long startTime, long endTime) 
            throws Exception {
        // Process events in time partition
        consumer.consume(1000, new StreamConsumerCallback() {
            @Override
            public void onEvent(DequeInputDatum event, DequeInputDatum eventMetadata) throws Exception {
                if (event.getTimestamp() >= startTime && event.getTimestamp() < endTime) {
                    // Write event to time-partitioned file
                    StreamEvent streamEvent = convertToStreamEvent(event);
                    fileWriter.append(streamEvent);
                }
            }
            
            @Override
            public void onFinish() throws Exception {
                fileWriter.flush();
                System.out.println("Partition processing completed");
            }
            
            @Override
            public void onError(Exception error) throws Exception {
                System.err.println("Error in partition processing: " + error.getMessage());
                throw error;
            }
        });
    }
}

// Multi-consumer group processing
public class MultiConsumerGroupProcessor {
    private final StreamConsumerFactory consumerFactory;
    private final ExecutorService executorService;
    
    public MultiConsumerGroupProcessor(StreamConsumerFactory consumerFactory) {
        this.consumerFactory = consumerFactory;
        this.executorService = Executors.newFixedThreadPool(4);
    }
    
    public void processWithMultipleGroups(StreamId streamId) {
        List<String> consumerGroups = Arrays.asList("analytics", "monitoring", "alerts", "archive");
        
        for (String groupId : consumerGroups) {
            executorService.submit(() -> {
                try {
                    StreamConsumer consumer = consumerFactory.createGroupConsumer(
                        streamId, "default", groupId, ConsumerConfig.builder().build());
                    
                    processStreamWithGroup(consumer, groupId);
                    
                } catch (Exception e) {
                    System.err.println("Error in consumer group " + groupId + ": " + e.getMessage());
                }
            });
        }
    }
    
    private void processStreamWithGroup(StreamConsumer consumer, String groupId) {
        System.out.println("Starting processing for group: " + groupId);
        
        try {
            while (!Thread.currentThread().isInterrupted()) {
                DequeInputDatum event = consumer.poll(5, TimeUnit.SECONDS);
                if (event != null) {
                    processEventForGroup(event, groupId);
                }
            }
        } catch (InterruptedException e) {
            System.out.println("Consumer group " + groupId + " interrupted");
        } catch (Exception e) {
            System.err.println("Error in consumer group " + groupId + ": " + e.getMessage());
        }
    }
    
    private void processEventForGroup(DequeInputDatum event, String groupId) {
        // Group-specific event processing
        switch (groupId) {
            case "analytics":
                performAnalytics(event);
                break;
            case "monitoring":
                updateMonitoringMetrics(event);
                break;
            case "alerts":
                checkForAlerts(event);
                break;
            case "archive":
                archiveEvent(event);
                break;
        }
    }
}

Stream Coordination and Management

// Stream coordination client usage
public class StreamCoordinationManager {
    private final StreamCoordinatorClient coordinatorClient;
    
    public StreamCoordinationManager(StreamCoordinatorClient coordinatorClient) {
        this.coordinatorClient = coordinatorClient;
    }
    
    public void setupDistributedStream(StreamId streamId, int partitionCount) {
        try {
            // Create stream with coordination
            Map<String, String> properties = Map.of(
                "partition.count", String.valueOf(partitionCount),
                "replication.factor", "3"
            );
            
            coordinatorClient.createStream(streamId, properties);
            System.out.println("Created distributed stream: " + streamId);
            
            // Set up consumer groups
            for (int groupId = 1; groupId <= 3; groupId++) {
                coordinatorClient.addConsumerGroup(streamId, groupId);
                System.out.println("Added consumer group: " + groupId);
            }
            
            // Verify partition count
            int actualPartitions = coordinatorClient.getPartitionCount(streamId);
            System.out.println("Stream partition count: " + actualPartitions);
            
        } catch (Exception e) {
            System.err.println("Failed to setup distributed stream: " + e.getMessage());
        }
    }
    
    public void scaleStream(StreamId streamId, int newPartitionCount) {
        try {
            int currentPartitions = coordinatorClient.getPartitionCount(streamId);
            
            if (newPartitionCount != currentPartitions) {
                coordinatorClient.setPartitionCount(streamId, newPartitionCount);
                System.out.println("Scaled stream from " + currentPartitions + 
                                 " to " + newPartitionCount + " partitions");
            }
            
        } catch (Exception e) {
            System.err.println("Failed to scale stream: " + e.getMessage());
        }
    }
    
    public void cleanupStream(StreamId streamId) {
        try {
            // Remove all consumer groups
            for (int groupId = 1; groupId <= 3; groupId++) {
                coordinatorClient.removeConsumerGroup(streamId, groupId);
            }
            
            // Delete the stream
            coordinatorClient.deleteStream(streamId);
            System.out.println("Cleaned up stream: " + streamId);
            
        } catch (Exception e) {
            System.err.println("Failed to cleanup stream: " + e.getMessage());
        }
    }
}

Types

// Core stream types
public final class StreamId extends EntityId {
    public static StreamId of(String namespace, String stream);
    public String getStream();
    public NamespaceId getParent();
}

// Stream properties and configuration
public final class StreamProperties {
    public static Builder builder();
    
    public long getTTL();
    public FormatSpecification getFormat();
    public int getNotificationThresholdMB();
    public Map<String, String> getProperties();
    
    public static class Builder {
        public Builder setTTL(long ttl);
        public Builder setFormat(FormatSpecification format);
        public Builder setNotificationThresholdMB(int threshold);
        public Builder setProperties(Map<String, String> properties);
        public StreamProperties build();
    }
}

// Stream specification and metadata
public final class StreamSpecification {
    public String getName();
    public FormatSpecification getFormat();  
    public long getTTL();
    public Map<String, String> getProperties();
}

// Stream statistics
public final class StreamStats {
    public long getEvents();
    public long getBytes();
    public long getRecentEvents();
    public long getLastEventTime();
}

// Stream event structures
public interface StreamEvent {
    ByteBuffer getBody();
    Map<String, String> getHeaders();
    long getTimestamp();
}

public final class StreamEventOffset {
    public long getGeneration();
    public long getOffset();
    
    public StreamEventOffset(long generation, long offset);
}

// Consumer configuration and state
public final class ConsumerConfig {
    public static Builder builder();
    
    public int getDequeueTimeout();
    public int getMaxDequeueSize();
    public String getInstanceId();
    
    public static class Builder {
        public Builder setDequeueTimeout(int timeout);
        public Builder setMaxDequeueSize(int size);
        public Builder setInstanceId(String instanceId);
        public ConsumerConfig build();
    }
}

public interface ConsumerState {
    StreamEventOffset getOffset();
    Map<String, String> getState();
    long getTimestamp();
}

// Stream input processing types
public interface StreamInputSplit {
    StreamId getStreamId();
    long getStartTime();
    long getEndTime();
    int getPartition();
}

public final class TimeRange {
    public long getStartTime();
    public long getEndTime();
    
    public TimeRange(long startTime, long endTime);
}

// Decoder types
public interface DecodeCallback<T> {
    void decoded(T decodedObject);
    void onError(Exception error);
}

public final class DecodeResult<T> {
    public T getResult();
    public boolean hasError();
    public Exception getError();
}

// Format specifications
public final class FormatSpecification {
    public String getName();
    public Schema getSchema();
    public Map<String, String> getSettings();
    
    public FormatSpecification(String name, Schema schema);
    public FormatSpecification(String name, Schema schema, Map<String, String> settings);
}

// Exception types
public class StreamException extends Exception {
    public StreamException(String message);
    public StreamException(String message, Throwable cause);
}

public class StreamNotFoundException extends StreamException {
    public StreamNotFoundException(StreamId streamId);
}

public class StreamAlreadyExistsException extends StreamException {
    public StreamAlreadyExistsException(StreamId streamId);
}

Install with Tessl CLI

npx tessl i tessl/maven-co-cask-cdap--cdap-data-fabric

docs

audit-compliance.md

dataset-management.md

index.md

metadata-management.md

namespace-management.md

stream-processing.md

transaction-management.md

usage-registry.md

tile.json