Apache Flink connector for Google Cloud Pub/Sub that provides streaming data integration capabilities
—
The Pub/Sub connector provides advanced deserialization capabilities that give access to full Pub/Sub message metadata including attributes, message ID, and publish time. This is essential for applications requiring message metadata or custom deserialization logic.
Core interface for custom deserialization with metadata access.
/**
* Deserialization schema for PubsubMessage objects with metadata access
* @param <T> Type of deserialized objects
*/
public interface PubSubDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {
/**
* Initialization method called before working methods
* @param context Initialization context for metrics and configuration
* @throws Exception If initialization fails
*/
default void open(DeserializationSchema.InitializationContext context) throws Exception {}
/**
* Determine if element signals end of stream
* @param nextElement Element to test for end-of-stream signal
* @return True if element signals end of stream, false otherwise
*/
boolean isEndOfStream(T nextElement);
/**
* Deserialize a PubsubMessage to the target type
* @param message PubsubMessage to deserialize
* @return Deserialized object (null if message cannot be deserialized)
* @throws Exception If deserialization fails
*/
T deserialize(PubsubMessage message) throws Exception;
/**
* Deserialize PubsubMessage using collector for multiple output records
* @param message PubsubMessage to deserialize
* @param out Collector for output records
* @throws Exception If deserialization fails
*/
default void deserialize(PubsubMessage message, Collector<T> out) throws Exception {
T deserialized = deserialize(message);
if (deserialized != null) {
out.collect(deserialized);
}
}
/**
* Get type information for produced elements
* @return TypeInformation for type T
*/
TypeInformation<T> getProducedType();
}Adapter class that wraps standard Flink DeserializationSchema for use with PubSubSource.
/**
* Wrapper that adapts DeserializationSchema to PubSubDeserializationSchema
* @param <T> Type of deserialized objects
*/
class DeserializationSchemaWrapper<T> implements PubSubDeserializationSchema<T> {
/**
* Constructor taking standard DeserializationSchema
* @param deserializationSchema Standard Flink deserialization schema
*/
DeserializationSchemaWrapper(DeserializationSchema<T> deserializationSchema);
// Interface methods implemented to delegate to wrapped schema
}PubsubMessage provides access to comprehensive message metadata:
// From com.google.pubsub.v1.PubsubMessage
public class PubsubMessage {
public ByteString getData(); // Message payload
public Map<String, String> getAttributesMap(); // Message attributes
public String getMessageId(); // Unique message identifier
public Timestamp getPublishTime(); // When message was published
// ... other metadata fields
}import org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubDeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import com.google.pubsub.v1.PubsubMessage;
public class MessageWithMetadata {
public String data;
public String messageId;
public long publishTimeSeconds;
public Map<String, String> attributes;
public MessageWithMetadata(String data, String messageId, long publishTime, Map<String, String> attributes) {
this.data = data;
this.messageId = messageId;
this.publishTimeSeconds = publishTime;
this.attributes = attributes;
}
}
public class MetadataDeserializationSchema implements PubSubDeserializationSchema<MessageWithMetadata> {
@Override
public MessageWithMetadata deserialize(PubsubMessage message) throws Exception {
String data = message.getData().toStringUtf8();
String messageId = message.getMessageId();
long publishTime = message.getPublishTime().getSeconds();
Map<String, String> attributes = message.getAttributesMap();
return new MessageWithMetadata(data, messageId, publishTime, attributes);
}
@Override
public boolean isEndOfStream(MessageWithMetadata nextElement) {
return false; // Never signals end of stream
}
@Override
public TypeInformation<MessageWithMetadata> getProducedType() {
return TypeInformation.of(MessageWithMetadata.class);
}
}
// Usage
PubSubSource<MessageWithMetadata> source = PubSubSource.newBuilder()
.withDeserializationSchema(new MetadataDeserializationSchema())
.withProjectName("my-gcp-project")
.withSubscriptionName("my-subscription")
.build();public class ConditionalDeserializationSchema implements PubSubDeserializationSchema<String> {
@Override
public String deserialize(PubsubMessage message) throws Exception {
Map<String, String> attributes = message.getAttributesMap();
// Only process messages with specific attribute
if (!"IMPORTANT".equals(attributes.get("priority"))) {
return null; // Skip this message
}
// Add attribute info to the data
String data = message.getData().toStringUtf8();
String source = attributes.getOrDefault("source", "unknown");
return String.format("[%s] %s", source, data);
}
@Override
public boolean isEndOfStream(String nextElement) {
// End stream on special termination message
return "TERMINATE".equals(nextElement);
}
@Override
public TypeInformation<String> getProducedType() {
return TypeInformation.of(String.class);
}
}import org.apache.flink.util.Collector;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
public class BatchMessageDeserializationSchema implements PubSubDeserializationSchema<String> {
private ObjectMapper mapper = new ObjectMapper();
@Override
public String deserialize(PubsubMessage message) throws Exception {
// This method won't be called when using collector version
throw new UnsupportedOperationException("Use collector version");
}
@Override
public void deserialize(PubsubMessage message, Collector<String> out) throws Exception {
String jsonData = message.getData().toStringUtf8();
JsonNode rootNode = mapper.readTree(jsonData);
// Handle batch messages - split array into individual records
if (rootNode.isArray()) {
for (JsonNode item : rootNode) {
out.collect(item.toString());
}
} else {
out.collect(jsonData);
}
}
@Override
public boolean isEndOfStream(String nextElement) {
return false;
}
@Override
public TypeInformation<String> getProducedType() {
return TypeInformation.of(String.class);
}
}public class ValidatedEventDeserializationSchema implements PubSubDeserializationSchema<Event> {
private ObjectMapper mapper = new ObjectMapper();
@Override
public void open(DeserializationSchema.InitializationContext context) throws Exception {
// Setup metrics for tracking deserialization errors
context.getMetricGroup().counter("deserialization_errors");
context.getMetricGroup().counter("validation_errors");
}
@Override
public Event deserialize(PubsubMessage message) throws Exception {
try {
// Parse JSON data
String jsonData = message.getData().toStringUtf8();
Event event = mapper.readValue(jsonData, Event.class);
// Add metadata
event.setMessageId(message.getMessageId());
event.setPublishTime(message.getPublishTime().getSeconds());
event.setAttributes(message.getAttributesMap());
// Validate event
if (!isValidEvent(event)) {
throw new IllegalArgumentException("Event validation failed");
}
return event;
} catch (Exception e) {
// Log error but don't fail processing
System.err.println("Failed to deserialize message: " + e.getMessage());
return null; // Skip invalid messages
}
}
private boolean isValidEvent(Event event) {
return event.getEventType() != null &&
event.getTimestamp() > 0 &&
event.getUserId() != null;
}
@Override
public boolean isEndOfStream(Event nextElement) {
return nextElement != null && "SHUTDOWN".equals(nextElement.getEventType());
}
@Override
public TypeInformation<Event> getProducedType() {
return TypeInformation.of(Event.class);
}
}public class VersionedDeserializationSchema implements PubSubDeserializationSchema<VersionedEvent> {
private ObjectMapper mapper = new ObjectMapper();
@Override
public VersionedEvent deserialize(PubsubMessage message) throws Exception {
Map<String, String> attributes = message.getAttributesMap();
String version = attributes.getOrDefault("schema_version", "1.0");
String jsonData = message.getData().toStringUtf8();
switch (version) {
case "1.0":
return deserializeV1(jsonData);
case "2.0":
return deserializeV2(jsonData);
default:
throw new IllegalArgumentException("Unsupported schema version: " + version);
}
}
private VersionedEvent deserializeV1(String json) throws Exception {
EventV1 v1Event = mapper.readValue(json, EventV1.class);
return new VersionedEvent(v1Event.name, v1Event.value, "default_category");
}
private VersionedEvent deserializeV2(String json) throws Exception {
EventV2 v2Event = mapper.readValue(json, EventV2.class);
return new VersionedEvent(v2Event.name, v2Event.value, v2Event.category);
}
@Override
public boolean isEndOfStream(VersionedEvent nextElement) {
return false;
}
@Override
public TypeInformation<VersionedEvent> getProducedType() {
return TypeInformation.of(VersionedEvent.class);
}
}null from deserialize() to skip invalid messagesTypeInformation in getProducedType()TypeHint for complex generic typesnull from deserialize() will skip the message entirelydeserialize() will cause the source to faildeserialize() methodPubSubDeserializationSchema, not standard DeserializationSchemaInstall with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-connector-gcp-pubsub-2-11