tessl install tessl/golang-cloud-google-com--go--pubsub@1.50.5Google Cloud Pub/Sub client library for Go providing high-level idiomatic APIs for publishing and receiving messages with automatic batching, flow control, and support for advanced features including message ordering, schema validation, exactly-once delivery, and ingestion from external data sources
Message transforms allow you to modify or filter messages before they are delivered to subscribers or written to destinations. This document covers JavaScript UDF transforms and their configuration.
type MessageTransform struct {
Transform Transform
Disabled bool
}A single instance of a message transformation to apply to messages.
Fields:
Transform: The transformation function (currently only JavaScriptUDF supported)Disabled: If true, the transform is disabled and will not be applied (defaults to false)type Transform interface {
isTransform() bool
}Interface for message transforms. Currently only JavaScriptUDF implements this interface.
type JavaScriptUDF struct {
FunctionName string
Code string
}
func (i JavaScriptUDF) isTransform() boolUser-defined JavaScript function that can transform or filter a Pub/Sub message.
Fields:
FunctionName: Name of the JavaScript function to apply to messagesCode: JavaScript code containing the functionThe JavaScript function must have the following signature:
/**
* Transforms a Pub/Sub message.
*
* @param {Object} message - Pub/Sub message. Keys:
* - (required) 'data' : {string}
* - (required) 'attributes' : {Object<string, string>}
*
* @param {Object} metadata - Pub/Sub message metadata. Keys:
* - (required) 'message_id' : {string}
* - (optional) 'publish_time': {string} YYYY-MM-DDTHH:MM:SSZ format
* - (optional) 'ordering_key': {string}
*
* @return {Object|null} - To filter a message, return null. To transform
* a message, return a map with the following keys:
* - (required) 'data' : {string}
* - (optional) 'attributes' : {Object<string, string>}
* Returning empty attributes will remove all attributes from the message.
*/
function transformFunction(message, metadata) {
// Transform logic here
return {
data: transformedData,
attributes: transformedAttributes
};
}topic, err := client.CreateTopicWithConfig(ctx, "transformed-topic", &pubsub.TopicConfig{
MessageTransforms: []pubsub.MessageTransform{
{
Transform: pubsub.JavaScriptUDF{
FunctionName: "addTimestamp",
Code: `
function addTimestamp(message, metadata) {
var data = JSON.parse(message.data);
data.timestamp = metadata.publish_time;
return {
data: JSON.stringify(data),
attributes: message.attributes
};
}
`,
},
},
},
})config, err := topic.Update(ctx, pubsub.TopicConfigToUpdate{
MessageTransforms: []pubsub.MessageTransform{
{
Transform: pubsub.JavaScriptUDF{
FunctionName: "enrichMessage",
Code: `
function enrichMessage(message, metadata) {
var attrs = message.attributes || {};
attrs.message_id = metadata.message_id;
attrs.publish_time = metadata.publish_time;
return {
data: message.data,
attributes: attrs
};
}
`,
},
},
},
})sub, err := client.CreateSubscription(ctx, "transformed-sub", pubsub.SubscriptionConfig{
Topic: topic,
MessageTransforms: []pubsub.MessageTransform{
{
Transform: pubsub.JavaScriptUDF{
FunctionName: "filterHighPriority",
Code: `
function filterHighPriority(message, metadata) {
var attrs = message.attributes || {};
if (attrs.priority === "high") {
return {
data: message.data,
attributes: attrs
};
}
return null; // Filter out non-high priority
}
`,
},
},
},
})config, err := sub.Update(ctx, pubsub.SubscriptionConfigToUpdate{
MessageTransforms: []pubsub.MessageTransform{
{
Transform: pubsub.JavaScriptUDF{
FunctionName: "maskSensitiveData",
Code: `
function maskSensitiveData(message, metadata) {
var data = JSON.parse(message.data);
if (data.ssn) {
data.ssn = "***-**-" + data.ssn.slice(-4);
}
return {
data: JSON.stringify(data),
attributes: message.attributes
};
}
`,
},
},
},
})Add metadata to messages:
enrichTransform := pubsub.JavaScriptUDF{
FunctionName: "enrichWithMetadata",
Code: `
function enrichWithMetadata(message, metadata) {
var data = JSON.parse(message.data);
data.enriched = {
message_id: metadata.message_id,
publish_time: metadata.publish_time,
ordering_key: metadata.ordering_key
};
return {
data: JSON.stringify(data),
attributes: message.attributes
};
}
`,
}Filter messages based on attributes:
filterTransform := pubsub.JavaScriptUDF{
FunctionName: "filterByRegion",
Code: `
function filterByRegion(message, metadata) {
var attrs = message.attributes || {};
var allowedRegions = ["us-east1", "us-west1"];
if (allowedRegions.indexOf(attrs.region) !== -1) {
return {
data: message.data,
attributes: attrs
};
}
return null; // Filter out messages from other regions
}
`,
}Transform message payload:
transformData := pubsub.JavaScriptUDF{
FunctionName: "convertToUppercase",
Code: `
function convertToUppercase(message, metadata) {
var data = JSON.parse(message.data);
if (data.text) {
data.text = data.text.toUpperCase();
}
return {
data: JSON.stringify(data),
attributes: message.attributes
};
}
`,
}Modify message attributes:
attributeTransform := pubsub.JavaScriptUDF{
FunctionName: "normalizeAttributes",
Code: `
function normalizeAttributes(message, metadata) {
var attrs = message.attributes || {};
// Normalize attribute keys to lowercase
var normalizedAttrs = {};
for (var key in attrs) {
normalizedAttrs[key.toLowerCase()] = attrs[key];
}
return {
data: message.data,
attributes: normalizedAttrs
};
}
`,
}Mask sensitive information:
maskTransform := pubsub.JavaScriptUDF{
FunctionName: "maskPII",
Code: `
function maskPII(message, metadata) {
var data = JSON.parse(message.data);
// Mask email addresses
if (data.email) {
var parts = data.email.split('@');
if (parts.length === 2) {
data.email = parts[0].substring(0, 2) + "***@" + parts[1];
}
}
// Mask phone numbers
if (data.phone) {
data.phone = "***-***-" + data.phone.slice(-4);
}
return {
data: JSON.stringify(data),
attributes: message.attributes
};
}
`,
}Convert between data formats:
formatTransform := pubsub.JavaScriptUDF{
FunctionName: "jsonToCsv",
Code: `
function jsonToCsv(message, metadata) {
var data = JSON.parse(message.data);
// Convert JSON to CSV row
var csvRow = [
data.id || "",
data.name || "",
data.value || ""
].join(",");
return {
data: csvRow,
attributes: message.attributes
};
}
`,
}Add routing attributes based on content:
routingTransform := pubsub.JavaScriptUDF{
FunctionName: "addRoutingKey",
Code: `
function addRoutingKey(message, metadata) {
var data = JSON.parse(message.data);
var attrs = message.attributes || {};
// Add routing key based on priority
if (data.priority >= 90) {
attrs.route = "critical";
} else if (data.priority >= 50) {
attrs.route = "normal";
} else {
attrs.route = "low";
}
return {
data: message.data,
attributes: attrs
};
}
`,
}Apply multiple transforms in sequence:
topic, err := client.CreateTopicWithConfig(ctx, "multi-transform-topic", &pubsub.TopicConfig{
MessageTransforms: []pubsub.MessageTransform{
// First: Filter out low priority messages
{
Transform: pubsub.JavaScriptUDF{
FunctionName: "filterLowPriority",
Code: `
function filterLowPriority(message, metadata) {
var attrs = message.attributes || {};
if (attrs.priority === "low") {
return null;
}
return {
data: message.data,
attributes: attrs
};
}
`,
},
},
// Second: Enrich remaining messages
{
Transform: pubsub.JavaScriptUDF{
FunctionName: "enrichMessage",
Code: `
function enrichMessage(message, metadata) {
var data = JSON.parse(message.data);
data.processed_at = new Date().toISOString();
data.message_id = metadata.message_id;
return {
data: JSON.stringify(data),
attributes: message.attributes
};
}
`,
},
},
},
})Note: Each JavaScript UDF must have a unique FunctionName within the same resource.
Temporarily disable a transform without removing it:
config, err := topic.Update(ctx, pubsub.TopicConfigToUpdate{
MessageTransforms: []pubsub.MessageTransform{
{
Transform: pubsub.JavaScriptUDF{
FunctionName: "myTransform",
Code: existingCode,
},
Disabled: true, // Disable the transform
},
},
})Remove all transforms from a topic or subscription:
// Remove transforms from topic
config, err := topic.Update(ctx, pubsub.TopicConfigToUpdate{
MessageTransforms: []pubsub.MessageTransform{}, // Empty slice removes all
})
// Remove transforms from subscription
config, err := sub.Update(ctx, pubsub.SubscriptionConfigToUpdate{
MessageTransforms: []pubsub.MessageTransform{}, // Empty slice removes all
})When both topic and subscription have transforms:
// Topic transform
topic, err := client.CreateTopicWithConfig(ctx, "topic", &pubsub.TopicConfig{
MessageTransforms: []pubsub.MessageTransform{
{Transform: topicTransform1},
{Transform: topicTransform2},
},
})
// Subscription transform
sub, err := client.CreateSubscription(ctx, "sub", pubsub.SubscriptionConfig{
Topic: topic,
MessageTransforms: []pubsub.MessageTransform{
{Transform: subTransform1},
{Transform: subTransform2},
},
})
// Execution order: topicTransform1 -> topicTransform2 -> subTransform1 -> subTransform2If a transform fails (throws an error), the message delivery will fail and be retried according to the subscription's retry policy:
errorHandlingTransform := pubsub.JavaScriptUDF{
FunctionName: "safeTransform",
Code: `
function safeTransform(message, metadata) {
try {
var data = JSON.parse(message.data);
// Transform logic
return {
data: JSON.stringify(data),
attributes: message.attributes
};
} catch (e) {
// Log error in attributes
var attrs = message.attributes || {};
attrs.transform_error = e.toString();
return {
data: message.data,
attributes: attrs
};
}
}
`,
}