or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
golangpkg:golang/cloud.google.com/go/pubsub@v1.50.1

docs

client.mddelivery.mdindex.mdingestion.mdlow-level.mdpublishing.mdreceiving.mdschemas.mdsnapshots.mdsubscriptions.mdtesting.mdtopics.mdtransforms.md
tile.json

tessl/golang-cloud-google-com--go--pubsub

tessl install tessl/golang-cloud-google-com--go--pubsub@1.50.5

Google Cloud Pub/Sub client library for Go providing high-level idiomatic APIs for publishing and receiving messages with automatic batching, flow control, and support for advanced features including message ordering, schema validation, exactly-once delivery, and ingestion from external data sources

transforms.mddocs/

Message Transforms

Message transforms allow you to modify or filter messages before they are delivered to subscribers or written to destinations. This document covers JavaScript UDF transforms and their configuration.

MessageTransform

type MessageTransform struct {
    Transform Transform
    Disabled  bool
}

A single instance of a message transformation to apply to messages.

Fields:

  • Transform: The transformation function (currently only JavaScriptUDF supported)
  • Disabled: If true, the transform is disabled and will not be applied (defaults to false)

Transform Interface

type Transform interface {
    isTransform() bool
}

Interface for message transforms. Currently only JavaScriptUDF implements this interface.

JavaScript UDF

JavaScriptUDF

type JavaScriptUDF struct {
    FunctionName string
    Code         string
}

func (i JavaScriptUDF) isTransform() bool

User-defined JavaScript function that can transform or filter a Pub/Sub message.

Fields:

  • FunctionName: Name of the JavaScript function to apply to messages
  • Code: JavaScript code containing the function

JavaScript Function Signature

The JavaScript function must have the following signature:

/**
 * Transforms a Pub/Sub message.
 *
 * @param  {Object} message - Pub/Sub message. Keys:
 *   - (required) 'data' : {string}
 *   - (required) 'attributes' : {Object<string, string>}
 *
 * @param  {Object} metadata - Pub/Sub message metadata. Keys:
 *   - (required) 'message_id'  : {string}
 *   - (optional) 'publish_time': {string} YYYY-MM-DDTHH:MM:SSZ format
 *   - (optional) 'ordering_key': {string}
 *
 * @return {Object|null} - To filter a message, return null. To transform
 *   a message, return a map with the following keys:
 *   - (required) 'data' : {string}
 *   - (optional) 'attributes' : {Object<string, string>}
 *   Returning empty attributes will remove all attributes from the message.
 */
function transformFunction(message, metadata) {
    // Transform logic here
    return {
        data: transformedData,
        attributes: transformedAttributes
    };
}

Configuring Transforms on Topics

Create Topic with Transform

topic, err := client.CreateTopicWithConfig(ctx, "transformed-topic", &pubsub.TopicConfig{
    MessageTransforms: []pubsub.MessageTransform{
        {
            Transform: pubsub.JavaScriptUDF{
                FunctionName: "addTimestamp",
                Code: `
                    function addTimestamp(message, metadata) {
                        var data = JSON.parse(message.data);
                        data.timestamp = metadata.publish_time;
                        return {
                            data: JSON.stringify(data),
                            attributes: message.attributes
                        };
                    }
                `,
            },
        },
    },
})

Update Topic with Transform

config, err := topic.Update(ctx, pubsub.TopicConfigToUpdate{
    MessageTransforms: []pubsub.MessageTransform{
        {
            Transform: pubsub.JavaScriptUDF{
                FunctionName: "enrichMessage",
                Code: `
                    function enrichMessage(message, metadata) {
                        var attrs = message.attributes || {};
                        attrs.message_id = metadata.message_id;
                        attrs.publish_time = metadata.publish_time;
                        return {
                            data: message.data,
                            attributes: attrs
                        };
                    }
                `,
            },
        },
    },
})

Configuring Transforms on Subscriptions

Create Subscription with Transform

sub, err := client.CreateSubscription(ctx, "transformed-sub", pubsub.SubscriptionConfig{
    Topic: topic,
    MessageTransforms: []pubsub.MessageTransform{
        {
            Transform: pubsub.JavaScriptUDF{
                FunctionName: "filterHighPriority",
                Code: `
                    function filterHighPriority(message, metadata) {
                        var attrs = message.attributes || {};
                        if (attrs.priority === "high") {
                            return {
                                data: message.data,
                                attributes: attrs
                            };
                        }
                        return null; // Filter out non-high priority
                    }
                `,
            },
        },
    },
})

Update Subscription with Transform

config, err := sub.Update(ctx, pubsub.SubscriptionConfigToUpdate{
    MessageTransforms: []pubsub.MessageTransform{
        {
            Transform: pubsub.JavaScriptUDF{
                FunctionName: "maskSensitiveData",
                Code: `
                    function maskSensitiveData(message, metadata) {
                        var data = JSON.parse(message.data);
                        if (data.ssn) {
                            data.ssn = "***-**-" + data.ssn.slice(-4);
                        }
                        return {
                            data: JSON.stringify(data),
                            attributes: message.attributes
                        };
                    }
                `,
            },
        },
    },
})

Transform Examples

Data Enrichment

Add metadata to messages:

enrichTransform := pubsub.JavaScriptUDF{
    FunctionName: "enrichWithMetadata",
    Code: `
        function enrichWithMetadata(message, metadata) {
            var data = JSON.parse(message.data);
            data.enriched = {
                message_id: metadata.message_id,
                publish_time: metadata.publish_time,
                ordering_key: metadata.ordering_key
            };
            return {
                data: JSON.stringify(data),
                attributes: message.attributes
            };
        }
    `,
}

Message Filtering

Filter messages based on attributes:

filterTransform := pubsub.JavaScriptUDF{
    FunctionName: "filterByRegion",
    Code: `
        function filterByRegion(message, metadata) {
            var attrs = message.attributes || {};
            var allowedRegions = ["us-east1", "us-west1"];

            if (allowedRegions.indexOf(attrs.region) !== -1) {
                return {
                    data: message.data,
                    attributes: attrs
                };
            }

            return null; // Filter out messages from other regions
        }
    `,
}

Data Transformation

Transform message payload:

transformData := pubsub.JavaScriptUDF{
    FunctionName: "convertToUppercase",
    Code: `
        function convertToUppercase(message, metadata) {
            var data = JSON.parse(message.data);
            if (data.text) {
                data.text = data.text.toUpperCase();
            }
            return {
                data: JSON.stringify(data),
                attributes: message.attributes
            };
        }
    `,
}

Attribute Manipulation

Modify message attributes:

attributeTransform := pubsub.JavaScriptUDF{
    FunctionName: "normalizeAttributes",
    Code: `
        function normalizeAttributes(message, metadata) {
            var attrs = message.attributes || {};

            // Normalize attribute keys to lowercase
            var normalizedAttrs = {};
            for (var key in attrs) {
                normalizedAttrs[key.toLowerCase()] = attrs[key];
            }

            return {
                data: message.data,
                attributes: normalizedAttrs
            };
        }
    `,
}

Data Masking

Mask sensitive information:

maskTransform := pubsub.JavaScriptUDF{
    FunctionName: "maskPII",
    Code: `
        function maskPII(message, metadata) {
            var data = JSON.parse(message.data);

            // Mask email addresses
            if (data.email) {
                var parts = data.email.split('@');
                if (parts.length === 2) {
                    data.email = parts[0].substring(0, 2) + "***@" + parts[1];
                }
            }

            // Mask phone numbers
            if (data.phone) {
                data.phone = "***-***-" + data.phone.slice(-4);
            }

            return {
                data: JSON.stringify(data),
                attributes: message.attributes
            };
        }
    `,
}

Format Conversion

Convert between data formats:

formatTransform := pubsub.JavaScriptUDF{
    FunctionName: "jsonToCsv",
    Code: `
        function jsonToCsv(message, metadata) {
            var data = JSON.parse(message.data);

            // Convert JSON to CSV row
            var csvRow = [
                data.id || "",
                data.name || "",
                data.value || ""
            ].join(",");

            return {
                data: csvRow,
                attributes: message.attributes
            };
        }
    `,
}

Conditional Routing

Add routing attributes based on content:

routingTransform := pubsub.JavaScriptUDF{
    FunctionName: "addRoutingKey",
    Code: `
        function addRoutingKey(message, metadata) {
            var data = JSON.parse(message.data);
            var attrs = message.attributes || {};

            // Add routing key based on priority
            if (data.priority >= 90) {
                attrs.route = "critical";
            } else if (data.priority >= 50) {
                attrs.route = "normal";
            } else {
                attrs.route = "low";
            }

            return {
                data: message.data,
                attributes: attrs
            };
        }
    `,
}

Multiple Transforms

Apply multiple transforms in sequence:

topic, err := client.CreateTopicWithConfig(ctx, "multi-transform-topic", &pubsub.TopicConfig{
    MessageTransforms: []pubsub.MessageTransform{
        // First: Filter out low priority messages
        {
            Transform: pubsub.JavaScriptUDF{
                FunctionName: "filterLowPriority",
                Code: `
                    function filterLowPriority(message, metadata) {
                        var attrs = message.attributes || {};
                        if (attrs.priority === "low") {
                            return null;
                        }
                        return {
                            data: message.data,
                            attributes: attrs
                        };
                    }
                `,
            },
        },
        // Second: Enrich remaining messages
        {
            Transform: pubsub.JavaScriptUDF{
                FunctionName: "enrichMessage",
                Code: `
                    function enrichMessage(message, metadata) {
                        var data = JSON.parse(message.data);
                        data.processed_at = new Date().toISOString();
                        data.message_id = metadata.message_id;
                        return {
                            data: JSON.stringify(data),
                            attributes: message.attributes
                        };
                    }
                `,
            },
        },
    },
})

Note: Each JavaScript UDF must have a unique FunctionName within the same resource.

Disabling Transforms

Temporarily disable a transform without removing it:

config, err := topic.Update(ctx, pubsub.TopicConfigToUpdate{
    MessageTransforms: []pubsub.MessageTransform{
        {
            Transform: pubsub.JavaScriptUDF{
                FunctionName: "myTransform",
                Code:         existingCode,
            },
            Disabled: true, // Disable the transform
        },
    },
})

Removing Transforms

Remove all transforms from a topic or subscription:

// Remove transforms from topic
config, err := topic.Update(ctx, pubsub.TopicConfigToUpdate{
    MessageTransforms: []pubsub.MessageTransform{}, // Empty slice removes all
})

// Remove transforms from subscription
config, err := sub.Update(ctx, pubsub.SubscriptionConfigToUpdate{
    MessageTransforms: []pubsub.MessageTransform{}, // Empty slice removes all
})

Transform Execution Order

When both topic and subscription have transforms:

  1. Topic transforms are applied first (in order)
  2. Subscription transforms are applied second (in order)
  3. If any transform returns null, the message is filtered out
// Topic transform
topic, err := client.CreateTopicWithConfig(ctx, "topic", &pubsub.TopicConfig{
    MessageTransforms: []pubsub.MessageTransform{
        {Transform: topicTransform1},
        {Transform: topicTransform2},
    },
})

// Subscription transform
sub, err := client.CreateSubscription(ctx, "sub", pubsub.SubscriptionConfig{
    Topic: topic,
    MessageTransforms: []pubsub.MessageTransform{
        {Transform: subTransform1},
        {Transform: subTransform2},
    },
})

// Execution order: topicTransform1 -> topicTransform2 -> subTransform1 -> subTransform2

Error Handling

If a transform fails (throws an error), the message delivery will fail and be retried according to the subscription's retry policy:

errorHandlingTransform := pubsub.JavaScriptUDF{
    FunctionName: "safeTransform",
    Code: `
        function safeTransform(message, metadata) {
            try {
                var data = JSON.parse(message.data);
                // Transform logic
                return {
                    data: JSON.stringify(data),
                    attributes: message.attributes
                };
            } catch (e) {
                // Log error in attributes
                var attrs = message.attributes || {};
                attrs.transform_error = e.toString();
                return {
                    data: message.data,
                    attributes: attrs
                };
            }
        }
    `,
}

Best Practices

  1. Keep Functions Simple: Transforms should be fast and simple; complex logic belongs in subscribers
  2. Error Handling: Include try-catch blocks to handle malformed data
  3. Null for Filtering: Return null to filter out unwanted messages
  4. Unique Names: Each transform must have a unique function name within the resource
  5. Testing: Test transforms thoroughly before production use
  6. Idempotency: Ensure transforms are idempotent for retry scenarios
  7. Performance: Minimize transform complexity to avoid latency
  8. Validation: Validate input data before transformation
  9. Ordering: Be mindful of transform order when using multiple transforms
  10. Documentation: Document transform logic and purpose clearly
  11. Monitoring: Monitor for transform failures and filtered messages
  12. Security: Avoid sensitive data in transform code (it's stored as plain text)

Limitations

  • Only JavaScript UDF transforms are currently supported
  • Function code is limited in size
  • No external dependencies or libraries can be imported
  • Limited JavaScript runtime environment
  • Transforms run in a sandbox with restricted capabilities
  • Each transform execution has a timeout limit