CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-connector-gcp-pubsub-2-11

Apache Flink connector for Google Cloud Pub/Sub that provides streaming data integration capabilities

Pending
Overview
Eval results
Files

deserialization.mddocs/

Custom Deserialization

The Pub/Sub connector provides advanced deserialization capabilities that give access to full Pub/Sub message metadata including attributes, message ID, and publish time. This is essential for applications requiring message metadata or custom deserialization logic.

Capabilities

PubSubDeserializationSchema Interface

Core interface for custom deserialization with metadata access.

/**
 * Deserialization schema for PubsubMessage objects with metadata access
 * @param <T> Type of deserialized objects
 */
public interface PubSubDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {
    
    /**
     * Initialization method called before working methods
     * @param context Initialization context for metrics and configuration
     * @throws Exception If initialization fails
     */
    default void open(DeserializationSchema.InitializationContext context) throws Exception {}
    
    /**
     * Determine if element signals end of stream
     * @param nextElement Element to test for end-of-stream signal
     * @return True if element signals end of stream, false otherwise
     */
    boolean isEndOfStream(T nextElement);
    
    /**
     * Deserialize a PubsubMessage to the target type
     * @param message PubsubMessage to deserialize
     * @return Deserialized object (null if message cannot be deserialized)
     * @throws Exception If deserialization fails
     */
    T deserialize(PubsubMessage message) throws Exception;
    
    /**
     * Deserialize PubsubMessage using collector for multiple output records
     * @param message PubsubMessage to deserialize
     * @param out Collector for output records
     * @throws Exception If deserialization fails
     */
    default void deserialize(PubsubMessage message, Collector<T> out) throws Exception {
        T deserialized = deserialize(message);
        if (deserialized != null) {
            out.collect(deserialized);
        }
    }
    
    /**
     * Get type information for produced elements
     * @return TypeInformation for type T
     */
    TypeInformation<T> getProducedType();
}

DeserializationSchemaWrapper

Adapter class that wraps standard Flink DeserializationSchema for use with PubSubSource.

/**
 * Wrapper that adapts DeserializationSchema to PubSubDeserializationSchema
 * @param <T> Type of deserialized objects
 */
class DeserializationSchemaWrapper<T> implements PubSubDeserializationSchema<T> {
    
    /**
     * Constructor taking standard DeserializationSchema
     * @param deserializationSchema Standard Flink deserialization schema
     */
    DeserializationSchemaWrapper(DeserializationSchema<T> deserializationSchema);
    
    // Interface methods implemented to delegate to wrapped schema
}

Available Message Metadata

PubsubMessage provides access to comprehensive message metadata:

// From com.google.pubsub.v1.PubsubMessage
public class PubsubMessage {
    public ByteString getData();              // Message payload
    public Map<String, String> getAttributesMap(); // Message attributes
    public String getMessageId();            // Unique message identifier
    public Timestamp getPublishTime();       // When message was published
    // ... other metadata fields
}

Usage Examples

Basic Metadata Access

import org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubDeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import com.google.pubsub.v1.PubsubMessage;

public class MessageWithMetadata {
    public String data;
    public String messageId;
    public long publishTimeSeconds;
    public Map<String, String> attributes;
    
    public MessageWithMetadata(String data, String messageId, long publishTime, Map<String, String> attributes) {
        this.data = data;
        this.messageId = messageId;
        this.publishTimeSeconds = publishTime;
        this.attributes = attributes;
    }
}

public class MetadataDeserializationSchema implements PubSubDeserializationSchema<MessageWithMetadata> {
    
    @Override
    public MessageWithMetadata deserialize(PubsubMessage message) throws Exception {
        String data = message.getData().toStringUtf8();
        String messageId = message.getMessageId();
        long publishTime = message.getPublishTime().getSeconds();
        Map<String, String> attributes = message.getAttributesMap();
        
        return new MessageWithMetadata(data, messageId, publishTime, attributes);
    }
    
    @Override
    public boolean isEndOfStream(MessageWithMetadata nextElement) {
        return false; // Never signals end of stream
    }
    
    @Override
    public TypeInformation<MessageWithMetadata> getProducedType() {
        return TypeInformation.of(MessageWithMetadata.class);
    }
}

// Usage
PubSubSource<MessageWithMetadata> source = PubSubSource.newBuilder()
    .withDeserializationSchema(new MetadataDeserializationSchema())
    .withProjectName("my-gcp-project")
    .withSubscriptionName("my-subscription")
    .build();

Conditional Deserialization Based on Attributes

public class ConditionalDeserializationSchema implements PubSubDeserializationSchema<String> {
    
    @Override
    public String deserialize(PubsubMessage message) throws Exception {
        Map<String, String> attributes = message.getAttributesMap();
        
        // Only process messages with specific attribute
        if (!"IMPORTANT".equals(attributes.get("priority"))) {
            return null; // Skip this message
        }
        
        // Add attribute info to the data
        String data = message.getData().toStringUtf8();
        String source = attributes.getOrDefault("source", "unknown");
        
        return String.format("[%s] %s", source, data);
    }
    
    @Override
    public boolean isEndOfStream(String nextElement) {
        // End stream on special termination message
        return "TERMINATE".equals(nextElement);
    }
    
    @Override
    public TypeInformation<String> getProducedType() {
        return TypeInformation.of(String.class);
    }
}

Multi-Record Output with Collector

import org.apache.flink.util.Collector;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;

public class BatchMessageDeserializationSchema implements PubSubDeserializationSchema<String> {
    private ObjectMapper mapper = new ObjectMapper();
    
    @Override
    public String deserialize(PubsubMessage message) throws Exception {
        // This method won't be called when using collector version
        throw new UnsupportedOperationException("Use collector version");
    }
    
    @Override
    public void deserialize(PubsubMessage message, Collector<String> out) throws Exception {
        String jsonData = message.getData().toStringUtf8();
        JsonNode rootNode = mapper.readTree(jsonData);
        
        // Handle batch messages - split array into individual records
        if (rootNode.isArray()) {
            for (JsonNode item : rootNode) {
                out.collect(item.toString());
            }
        } else {
            out.collect(jsonData);
        }
    }
    
    @Override
    public boolean isEndOfStream(String nextElement) {
        return false;
    }
    
    @Override
    public TypeInformation<String> getProducedType() {
        return TypeInformation.of(String.class);
    }
}

Complex Event Deserialization with Validation

public class ValidatedEventDeserializationSchema implements PubSubDeserializationSchema<Event> {
    private ObjectMapper mapper = new ObjectMapper();
    
    @Override
    public void open(DeserializationSchema.InitializationContext context) throws Exception {
        // Setup metrics for tracking deserialization errors
        context.getMetricGroup().counter("deserialization_errors");
        context.getMetricGroup().counter("validation_errors");
    }
    
    @Override
    public Event deserialize(PubsubMessage message) throws Exception {
        try {
            // Parse JSON data
            String jsonData = message.getData().toStringUtf8();
            Event event = mapper.readValue(jsonData, Event.class);
            
            // Add metadata
            event.setMessageId(message.getMessageId());
            event.setPublishTime(message.getPublishTime().getSeconds());
            event.setAttributes(message.getAttributesMap());
            
            // Validate event
            if (!isValidEvent(event)) {
                throw new IllegalArgumentException("Event validation failed");
            }
            
            return event;
            
        } catch (Exception e) {
            // Log error but don't fail processing
            System.err.println("Failed to deserialize message: " + e.getMessage());
            return null; // Skip invalid messages
        }
    }
    
    private boolean isValidEvent(Event event) {
        return event.getEventType() != null && 
               event.getTimestamp() > 0 && 
               event.getUserId() != null;
    }
    
    @Override
    public boolean isEndOfStream(Event nextElement) {
        return nextElement != null && "SHUTDOWN".equals(nextElement.getEventType());
    }
    
    @Override
    public TypeInformation<Event> getProducedType() {
        return TypeInformation.of(Event.class);
    }
}

Schema Evolution with Version Handling

public class VersionedDeserializationSchema implements PubSubDeserializationSchema<VersionedEvent> {
    private ObjectMapper mapper = new ObjectMapper();
    
    @Override
    public VersionedEvent deserialize(PubsubMessage message) throws Exception {
        Map<String, String> attributes = message.getAttributesMap();
        String version = attributes.getOrDefault("schema_version", "1.0");
        
        String jsonData = message.getData().toStringUtf8();
        
        switch (version) {
            case "1.0":
                return deserializeV1(jsonData);
            case "2.0":
                return deserializeV2(jsonData);
            default:
                throw new IllegalArgumentException("Unsupported schema version: " + version);
        }
    }
    
    private VersionedEvent deserializeV1(String json) throws Exception {
        EventV1 v1Event = mapper.readValue(json, EventV1.class);
        return new VersionedEvent(v1Event.name, v1Event.value, "default_category");
    }
    
    private VersionedEvent deserializeV2(String json) throws Exception {
        EventV2 v2Event = mapper.readValue(json, EventV2.class);
        return new VersionedEvent(v2Event.name, v2Event.value, v2Event.category);
    }
    
    @Override
    public boolean isEndOfStream(VersionedEvent nextElement) {
        return false;
    }
    
    @Override
    public TypeInformation<VersionedEvent> getProducedType() {
        return TypeInformation.of(VersionedEvent.class);
    }
}

Performance Considerations

Memory Management

  • Avoid storing large metadata objects in memory
  • Use streaming parsing for large JSON payloads
  • Consider object pooling for high-throughput scenarios

Error Handling

  • Return null from deserialize() to skip invalid messages
  • Use metrics to track deserialization errors
  • Implement fallback deserialization strategies for schema evolution

Type Safety

  • Always provide accurate TypeInformation in getProducedType()
  • Use generic type parameters correctly to maintain type safety
  • Consider using Flink's TypeHint for complex generic types

Important Notes

  • Null Handling: Returning null from deserialize() will skip the message entirely
  • Exception Handling: Uncaught exceptions in deserialize() will cause the source to fail
  • Collector Usage: When using the collector version, don't implement the single-record deserialize() method
  • Metadata Access: Full PubsubMessage metadata is only available through PubSubDeserializationSchema, not standard DeserializationSchema
  • Performance: Metadata access has minimal performance overhead compared to standard deserialization

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-connector-gcp-pubsub-2-11

docs

deserialization.md

index.md

sink.md

source.md

tile.json