Core data management capabilities for CDAP including dataset operations, metadata management, lineage tracking, audit functionality, and data registry services for Hadoop-based applications.
—
Real-time stream processing capabilities with coordination, file management, partitioning, and multiple decoder support for various data formats. The stream processing system provides essential functionality for handling high-throughput real-time data ingestion, processing, and consumption within the CDAP platform.
The primary interface for stream lifecycle management and configuration with comprehensive administrative operations.
public interface StreamAdmin {
// Stream lifecycle operations
void create(StreamId streamId) throws Exception;
void create(StreamId streamId, Map<String, String> properties) throws Exception;
void drop(StreamId streamId) throws Exception;
void truncate(StreamId streamId) throws Exception;
// Stream configuration management
void updateConfig(StreamId streamId, StreamProperties properties) throws Exception;
StreamProperties getConfig(StreamId streamId) throws Exception;
// Stream metadata and queries
StreamSpecification getSpecification(StreamId streamId) throws Exception;
List<StreamSpecification> listStreams(NamespaceId namespaceId) throws Exception;
boolean exists(StreamId streamId) throws Exception;
// Administrative operations
void upgrade() throws Exception;
// Stream statistics and monitoring
StreamStats getStats(StreamId streamId) throws Exception;
long getSize(StreamId streamId) throws Exception;
}Stream view store interface for managing logical views over streams with comprehensive CRUD operations and view listing capabilities.
public interface ViewStore {
/**
* Creates a view or updates if it already exists.
* @param viewId the view identifier
* @param config the view configuration specification
* @return true if a new view was created, false if updated
*/
boolean createOrUpdate(StreamViewId viewId, ViewSpecification config);
/**
* Checks if a view exists.
* @param viewId the view identifier
* @return true if the view exists
*/
boolean exists(StreamViewId viewId);
/**
* Deletes a stream view.
* @param viewId the view identifier to delete
*/
void delete(StreamViewId viewId) throws NotFoundException;
/**
* Lists all views for a given stream.
* @param streamId the stream identifier
* @return list of view identifiers for the stream
*/
List<StreamViewId> list(StreamId streamId);
/**
* Gets the details of a specific view.
* @param viewId the view identifier
* @return the view details including configuration and metadata
*/
ViewDetail get(StreamViewId viewId) throws NotFoundException;
}Stream coordination client for managing distributed stream processing with service lifecycle management.
public interface StreamCoordinatorClient extends Service {
// Stream coordination operations
void createStream(StreamId streamId, Map<String, String> properties) throws Exception;
void deleteStream(StreamId streamId) throws Exception;
// Stream configuration coordination
void updateStreamProperties(StreamId streamId, StreamProperties properties) throws Exception;
StreamProperties getStreamProperties(StreamId streamId) throws Exception;
// Consumer coordination
void addConsumerGroup(StreamId streamId, long groupId) throws Exception;
void removeConsumerGroup(StreamId streamId, long groupId) throws Exception;
// Partition management
int getPartitionCount(StreamId streamId) throws Exception;
void setPartitionCount(StreamId streamId, int partitions) throws Exception;
}Stream file writers and management for persistent stream storage with partitioning and format support.
// Stream file writer factory
public interface StreamFileWriterFactory {
StreamFileWriter create(StreamId streamId, int partition);
StreamFileWriter create(StreamId streamId, int partition, StreamFileType fileType);
}
// Time-partitioned stream file writer for temporal data organization
public class TimePartitionedStreamFileWriter implements StreamFileWriter {
// Time-based partitioning with configurable partition intervals
public void append(StreamEvent event) throws IOException;
public void flush() throws IOException;
public void close() throws IOException;
// Partition management
public void rotatePartition() throws IOException;
public String getCurrentPartitionPath();
public long getCurrentPartitionTimestamp();
}
// Stream file type enumeration
public enum StreamFileType {
EVENT, // Standard event files
INDEX, // Index files for efficient seeking
META, // Metadata files
TEMP // Temporary files during processing
}Stream input processing components for handling various data sources and input formats.
// Stream input split finder for distributed processing
public class StreamInputSplitFinder<T> {
public List<StreamInputSplit> findSplits(StreamInputFormat<T> inputFormat,
StreamId streamId,
long startTime,
long endTime) throws IOException;
public List<StreamInputSplit> findSplits(StreamInputFormat<T> inputFormat,
StreamId streamId,
TimeRange timeRange,
int maxSplits) throws IOException;
}
// Stream service manager for lifecycle coordination
public class StreamServiceManager implements Service {
// Service lifecycle management for stream processing
@Override
protected void doStart();
@Override
protected void doStop();
// Stream service coordination
public void registerStreamService(StreamId streamId, StreamService service);
public void unregisterStreamService(StreamId streamId);
public StreamService getStreamService(StreamId streamId);
}Multiple decoder implementations for various data formats and encoding schemes.
// Base stream event decoder interface
public interface StreamEventDecoder<T> {
T decode(StreamEvent event, Charset charset) throws Exception;
DecodeResult<T> decode(StreamEvent event, Charset charset, DecodeCallback<T> callback) throws Exception;
}
// String-based stream event decoder
public class StringStreamEventDecoder implements StreamEventDecoder<String> {
@Override
public String decode(StreamEvent event, Charset charset) throws Exception;
// Decode with custom processing
@Override
public DecodeResult<String> decode(StreamEvent event, Charset charset,
DecodeCallback<String> callback) throws Exception;
}
// Text stream event decoder with line-based processing
public class TextStreamEventDecoder implements StreamEventDecoder<String> {
// Line-by-line text processing with configurable delimiters
public void setLineDelimiter(String delimiter);
public void setMaxLineLength(int maxLength);
}
// Binary stream event decoder
public class BytesStreamEventDecoder implements StreamEventDecoder<byte[]> {
// Raw binary data processing
@Override
public byte[] decode(StreamEvent event, Charset charset) throws Exception;
// Chunk-based binary processing
public DecodeResult<byte[]> decodeChunked(StreamEvent event, int chunkSize) throws Exception;
}
// Identity decoder for pass-through processing
public class IdentityStreamEventDecoder implements StreamEventDecoder<StreamEvent> {
// No transformation - returns original stream event
@Override
public StreamEvent decode(StreamEvent event, Charset charset) throws Exception;
}
// Format-based decoder for structured data
public class FormatStreamEventDecoder<T> implements StreamEventDecoder<T> {
// Configurable format-based decoding (JSON, Avro, etc.)
public FormatStreamEventDecoder(RecordFormat<T> format);
public FormatStreamEventDecoder(RecordFormat<T> format, Schema schema);
@Override
public T decode(StreamEvent event, Charset charset) throws Exception;
}Transaction-aware stream consumer with positioning and batch processing capabilities.
public interface StreamConsumer extends Closeable, TransactionAware {
// Basic consumption operations
DequeInputDatum poll(long timeout, TimeUnit unit) throws InterruptedException;
void consume(int maxEvents, StreamConsumerCallback callback) throws InterruptedException;
// Consumer positioning
void seek(StreamEventOffset offset);
StreamEventOffset getPosition();
// Batch consumption with configuration
ConsumeBatch consume(ConsumeConfig config) throws InterruptedException;
// Consumer state management
ConsumerState getConsumerState();
void setConsumerState(ConsumerState state);
}
// Stream consumer factory
public interface StreamConsumerFactory {
StreamConsumer create(StreamId streamId, String namespace, ConsumerConfig config);
StreamConsumer create(StreamId streamId, String namespace, ConsumerConfig config,
StreamConsumerState startState);
// Consumer group management
StreamConsumer createGroupConsumer(StreamId streamId, String namespace,
String groupId, ConsumerConfig config);
}// Access stream admin (typically injected)
StreamAdmin streamAdmin = // ... obtain instance
// Create a basic stream
StreamId streamId = NamespaceId.DEFAULT.stream("userEvents");
try {
streamAdmin.create(streamId);
System.out.println("Created stream: " + streamId.getStream());
} catch (Exception e) {
System.err.println("Failed to create stream: " + e.getMessage());
}
// Create stream with custom properties
Map<String, String> properties = Map.of(
"format.name", "avro",
"schema.literal", "{\"type\":\"record\",\"name\":\"Event\",\"fields\":[...]}",
"retention.seconds", "604800", // 7 days
"partition.duration", "3600" // 1 hour partitions
);
StreamId configuredStream = NamespaceId.DEFAULT.stream("configuredEvents");
try {
streamAdmin.create(configuredStream, properties);
System.out.println("Created configured stream: " + configuredStream.getStream());
} catch (Exception e) {
System.err.println("Failed to create configured stream: " + e.getMessage());
}
// Check if stream exists
boolean exists = streamAdmin.exists(streamId);
System.out.println("Stream exists: " + exists);
// Get stream configuration
try {
StreamProperties config = streamAdmin.getConfig(streamId);
System.out.println("Stream format: " + config.getFormat());
System.out.println("Stream TTL: " + config.getTTL());
} catch (Exception e) {
System.err.println("Failed to get stream config: " + e.getMessage());
}// Update stream properties
StreamProperties updatedProperties = StreamProperties.builder()
.setTTL(1209600) // 14 days
.setFormat(new FormatSpecification("json", null))
.setNotificationThresholdMB(100)
.build();
try {
streamAdmin.updateConfig(streamId, updatedProperties);
System.out.println("Updated stream configuration");
} catch (Exception e) {
System.err.println("Failed to update stream config: " + e.getMessage());
}
// List all streams in namespace
try {
List<StreamSpecification> streams = streamAdmin.listStreams(NamespaceId.DEFAULT);
System.out.println("Streams in default namespace:");
for (StreamSpecification stream : streams) {
System.out.println(" - " + stream.getName());
System.out.println(" Format: " + stream.getFormat());
System.out.println(" TTL: " + stream.getTTL() + " seconds");
}
} catch (Exception e) {
System.err.println("Failed to list streams: " + e.getMessage());
}
// Get stream statistics
try {
StreamStats stats = streamAdmin.getStats(streamId);
System.out.println("Stream statistics:");
System.out.println(" Events: " + stats.getEvents());
System.out.println(" Size: " + stats.getBytes() + " bytes");
System.out.println(" Ingested in last hour: " + stats.getRecentEvents());
} catch (Exception e) {
System.err.println("Failed to get stream stats: " + e.getMessage());
}// String decoder for text-based events
StringStreamEventDecoder stringDecoder = new StringStreamEventDecoder();
// Process stream events as strings
public void processTextEvents(StreamConsumer consumer) {
try {
DequeInputDatum event = consumer.poll(5, TimeUnit.SECONDS);
if (event != null) {
StreamEvent streamEvent = convertToStreamEvent(event);
String decodedText = stringDecoder.decode(streamEvent, StandardCharsets.UTF_8);
System.out.println("Decoded event: " + decodedText);
processTextEvent(decodedText);
}
} catch (Exception e) {
System.err.println("Failed to decode stream event: " + e.getMessage());
}
}
// Binary decoder for raw data
BytesStreamEventDecoder binaryDecoder = new BytesStreamEventDecoder();
public void processBinaryEvents(StreamConsumer consumer) {
try {
DequeInputDatum event = consumer.poll(5, TimeUnit.SECONDS);
if (event != null) {
StreamEvent streamEvent = convertToStreamEvent(event);
byte[] binaryData = binaryDecoder.decode(streamEvent, null);
System.out.println("Decoded binary data: " + binaryData.length + " bytes");
processBinaryData(binaryData);
}
} catch (Exception e) {
System.err.println("Failed to decode binary event: " + e.getMessage());
}
}
// Format-based decoder for structured data
Schema avroSchema = // ... load Avro schema
RecordFormat<GenericRecord> avroFormat = new AvroRecordFormat<>(avroSchema);
FormatStreamEventDecoder<GenericRecord> avroDecoder =
new FormatStreamEventDecoder<>(avroFormat, avroSchema);
public void processAvroEvents(StreamConsumer consumer) {
try {
DequeInputDatum event = consumer.poll(5, TimeUnit.SECONDS);
if (event != null) {
StreamEvent streamEvent = convertToStreamEvent(event);
GenericRecord record = avroDecoder.decode(streamEvent, StandardCharsets.UTF_8);
System.out.println("Decoded Avro record: " + record);
processAvroRecord(record);
}
} catch (Exception e) {
System.err.println("Failed to decode Avro event: " + e.getMessage());
}
}// Create stream consumer
StreamConsumerFactory consumerFactory = // ... obtain factory
ConsumerConfig config = ConsumerConfig.builder()
.setDequeueTimeout(5000) // 5 seconds
.setMaxDequeueSize(100) // Max 100 events per batch
.build();
StreamConsumer consumer = consumerFactory.create(streamId, "default", config);
// Basic event consumption
try {
DequeInputDatum event = consumer.poll(10, TimeUnit.SECONDS);
if (event != null) {
System.out.println("Received event: " + new String(event.getData()));
System.out.println("Event timestamp: " + event.getTimestamp());
System.out.println("Event headers: " + event.getHeaders());
} else {
System.out.println("No events available");
}
} catch (InterruptedException e) {
System.out.println("Consumer polling interrupted");
}
// Batch event consumption with callback
try {
consumer.consume(50, new StreamConsumerCallback() {
@Override
public void onEvent(DequeInputDatum event, DequeInputDatum eventMetadata) throws Exception {
String eventData = new String(event.getData());
System.out.println("Processing event: " + eventData);
// Process the event
processEvent(eventData);
}
@Override
public void onFinish() throws Exception {
System.out.println("Batch processing completed");
}
@Override
public void onError(Exception error) throws Exception {
System.err.println("Error processing batch: " + error.getMessage());
throw error;
}
});
} catch (InterruptedException e) {
System.out.println("Consumer batch processing interrupted");
}
// Consumer positioning
StreamEventOffset currentPosition = consumer.getPosition();
System.out.println("Current position: " + currentPosition);
// Seek to specific offset
StreamEventOffset seekOffset = new StreamEventOffset(0, 1000);
consumer.seek(seekOffset);
System.out.println("Seeked to offset: " + seekOffset);// Time-partitioned stream processing
public class TimePartitionedStreamProcessor {
private final StreamAdmin streamAdmin;
private final StreamConsumerFactory consumerFactory;
private final TimePartitionedStreamFileWriter fileWriter;
public TimePartitionedStreamProcessor(StreamAdmin streamAdmin,
StreamConsumerFactory consumerFactory,
TimePartitionedStreamFileWriter fileWriter) {
this.streamAdmin = streamAdmin;
this.consumerFactory = consumerFactory;
this.fileWriter = fileWriter;
}
public void processTimePartitions(StreamId streamId, long startTime, long endTime) {
try {
// Create consumer for time range
StreamConsumer consumer = consumerFactory.create(streamId, "default",
ConsumerConfig.builder().build());
// Seek to start time
StreamEventOffset startOffset = findOffsetForTime(streamId, startTime);
consumer.seek(startOffset);
long currentTime = startTime;
long partitionInterval = 3600000; // 1 hour partitions
while (currentTime < endTime) {
long partitionEnd = Math.min(currentTime + partitionInterval, endTime);
System.out.println("Processing partition: " +
new Date(currentTime) + " to " + new Date(partitionEnd));
processTimePartition(consumer, currentTime, partitionEnd);
currentTime = partitionEnd;
}
} catch (Exception e) {
System.err.println("Failed to process time partitions: " + e.getMessage());
}
}
private void processTimePartition(StreamConsumer consumer, long startTime, long endTime)
throws Exception {
// Process events in time partition
consumer.consume(1000, new StreamConsumerCallback() {
@Override
public void onEvent(DequeInputDatum event, DequeInputDatum eventMetadata) throws Exception {
if (event.getTimestamp() >= startTime && event.getTimestamp() < endTime) {
// Write event to time-partitioned file
StreamEvent streamEvent = convertToStreamEvent(event);
fileWriter.append(streamEvent);
}
}
@Override
public void onFinish() throws Exception {
fileWriter.flush();
System.out.println("Partition processing completed");
}
@Override
public void onError(Exception error) throws Exception {
System.err.println("Error in partition processing: " + error.getMessage());
throw error;
}
});
}
}
// Multi-consumer group processing
public class MultiConsumerGroupProcessor {
private final StreamConsumerFactory consumerFactory;
private final ExecutorService executorService;
public MultiConsumerGroupProcessor(StreamConsumerFactory consumerFactory) {
this.consumerFactory = consumerFactory;
this.executorService = Executors.newFixedThreadPool(4);
}
public void processWithMultipleGroups(StreamId streamId) {
List<String> consumerGroups = Arrays.asList("analytics", "monitoring", "alerts", "archive");
for (String groupId : consumerGroups) {
executorService.submit(() -> {
try {
StreamConsumer consumer = consumerFactory.createGroupConsumer(
streamId, "default", groupId, ConsumerConfig.builder().build());
processStreamWithGroup(consumer, groupId);
} catch (Exception e) {
System.err.println("Error in consumer group " + groupId + ": " + e.getMessage());
}
});
}
}
private void processStreamWithGroup(StreamConsumer consumer, String groupId) {
System.out.println("Starting processing for group: " + groupId);
try {
while (!Thread.currentThread().isInterrupted()) {
DequeInputDatum event = consumer.poll(5, TimeUnit.SECONDS);
if (event != null) {
processEventForGroup(event, groupId);
}
}
} catch (InterruptedException e) {
System.out.println("Consumer group " + groupId + " interrupted");
} catch (Exception e) {
System.err.println("Error in consumer group " + groupId + ": " + e.getMessage());
}
}
private void processEventForGroup(DequeInputDatum event, String groupId) {
// Group-specific event processing
switch (groupId) {
case "analytics":
performAnalytics(event);
break;
case "monitoring":
updateMonitoringMetrics(event);
break;
case "alerts":
checkForAlerts(event);
break;
case "archive":
archiveEvent(event);
break;
}
}
}// Stream coordination client usage
public class StreamCoordinationManager {
private final StreamCoordinatorClient coordinatorClient;
public StreamCoordinationManager(StreamCoordinatorClient coordinatorClient) {
this.coordinatorClient = coordinatorClient;
}
public void setupDistributedStream(StreamId streamId, int partitionCount) {
try {
// Create stream with coordination
Map<String, String> properties = Map.of(
"partition.count", String.valueOf(partitionCount),
"replication.factor", "3"
);
coordinatorClient.createStream(streamId, properties);
System.out.println("Created distributed stream: " + streamId);
// Set up consumer groups
for (int groupId = 1; groupId <= 3; groupId++) {
coordinatorClient.addConsumerGroup(streamId, groupId);
System.out.println("Added consumer group: " + groupId);
}
// Verify partition count
int actualPartitions = coordinatorClient.getPartitionCount(streamId);
System.out.println("Stream partition count: " + actualPartitions);
} catch (Exception e) {
System.err.println("Failed to setup distributed stream: " + e.getMessage());
}
}
public void scaleStream(StreamId streamId, int newPartitionCount) {
try {
int currentPartitions = coordinatorClient.getPartitionCount(streamId);
if (newPartitionCount != currentPartitions) {
coordinatorClient.setPartitionCount(streamId, newPartitionCount);
System.out.println("Scaled stream from " + currentPartitions +
" to " + newPartitionCount + " partitions");
}
} catch (Exception e) {
System.err.println("Failed to scale stream: " + e.getMessage());
}
}
public void cleanupStream(StreamId streamId) {
try {
// Remove all consumer groups
for (int groupId = 1; groupId <= 3; groupId++) {
coordinatorClient.removeConsumerGroup(streamId, groupId);
}
// Delete the stream
coordinatorClient.deleteStream(streamId);
System.out.println("Cleaned up stream: " + streamId);
} catch (Exception e) {
System.err.println("Failed to cleanup stream: " + e.getMessage());
}
}
}// Core stream types
public final class StreamId extends EntityId {
public static StreamId of(String namespace, String stream);
public String getStream();
public NamespaceId getParent();
}
// Stream properties and configuration
public final class StreamProperties {
public static Builder builder();
public long getTTL();
public FormatSpecification getFormat();
public int getNotificationThresholdMB();
public Map<String, String> getProperties();
public static class Builder {
public Builder setTTL(long ttl);
public Builder setFormat(FormatSpecification format);
public Builder setNotificationThresholdMB(int threshold);
public Builder setProperties(Map<String, String> properties);
public StreamProperties build();
}
}
// Stream specification and metadata
public final class StreamSpecification {
public String getName();
public FormatSpecification getFormat();
public long getTTL();
public Map<String, String> getProperties();
}
// Stream statistics
public final class StreamStats {
public long getEvents();
public long getBytes();
public long getRecentEvents();
public long getLastEventTime();
}
// Stream event structures
public interface StreamEvent {
ByteBuffer getBody();
Map<String, String> getHeaders();
long getTimestamp();
}
public final class StreamEventOffset {
public long getGeneration();
public long getOffset();
public StreamEventOffset(long generation, long offset);
}
// Consumer configuration and state
public final class ConsumerConfig {
public static Builder builder();
public int getDequeueTimeout();
public int getMaxDequeueSize();
public String getInstanceId();
public static class Builder {
public Builder setDequeueTimeout(int timeout);
public Builder setMaxDequeueSize(int size);
public Builder setInstanceId(String instanceId);
public ConsumerConfig build();
}
}
public interface ConsumerState {
StreamEventOffset getOffset();
Map<String, String> getState();
long getTimestamp();
}
// Stream input processing types
public interface StreamInputSplit {
StreamId getStreamId();
long getStartTime();
long getEndTime();
int getPartition();
}
public final class TimeRange {
public long getStartTime();
public long getEndTime();
public TimeRange(long startTime, long endTime);
}
// Decoder types
public interface DecodeCallback<T> {
void decoded(T decodedObject);
void onError(Exception error);
}
public final class DecodeResult<T> {
public T getResult();
public boolean hasError();
public Exception getError();
}
// Format specifications
public final class FormatSpecification {
public String getName();
public Schema getSchema();
public Map<String, String> getSettings();
public FormatSpecification(String name, Schema schema);
public FormatSpecification(String name, Schema schema, Map<String, String> settings);
}
// Exception types
public class StreamException extends Exception {
public StreamException(String message);
public StreamException(String message, Throwable cause);
}
public class StreamNotFoundException extends StreamException {
public StreamNotFoundException(StreamId streamId);
}
public class StreamAlreadyExistsException extends StreamException {
public StreamAlreadyExistsException(StreamId streamId);
}Install with Tessl CLI
npx tessl i tessl/maven-co-cask-cdap--cdap-data-fabric