or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
golangpkg:golang/github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus@v1.10.0

docs

admin

index.mdqueues.mdrules.mdsubscriptions.mdtopics.md
client.mderrors.mdindex.mdmessaging.mdreceiver.mdsender.mdsessions.md
tile.json

tessl/golang-github-com--azure--azure-sdk-for-go--sdk--messaging--azservicebus

tessl install tessl/golang-github-com--azure--azure-sdk-for-go--sdk--messaging--azservicebus@1.10.1

Client module for Azure Service Bus, a highly reliable cloud messaging service providing real-time and fault-tolerant communication between distributed senders and receivers.

rules.mddocs/admin/

Rule Management

Rule management operations in the admin client enable creating, updating, and managing filters and actions for topic subscriptions. Rules determine which messages are delivered to a subscription and can optionally transform message properties.

Capabilities

Create Rule

func (ac *Client) CreateRule(ctx context.Context, topicName, subscriptionName string, options *CreateRuleOptions) (CreateRuleResponse, error)

Creates a rule that filters and optionally modifies messages for a subscription.

Example with SQL filter:

resp, err := adminClient.CreateRule(
    context.Background(),
    "events",
    "high-priority-subscription",
    &admin.CreateRuleOptions{
        Name: to.Ptr("HighPriorityFilter"),
        Filter: &admin.SQLFilter{
            Expression: "priority > @threshold AND category = @cat",
            Parameters: map[string]any{
                "threshold": 5,
                "cat":       "important",
            },
        },
    },
)
if err != nil {
    panic(err)
}

fmt.Printf("Created rule: %s\n", resp.Name)

Example with correlation filter:

resp, err := adminClient.CreateRule(
    context.Background(),
    "events",
    "user-events-subscription",
    &admin.CreateRuleOptions{
        Name: to.Ptr("UserEventFilter"),
        Filter: &admin.CorrelationFilter{
            Subject:       to.Ptr("user.action"),
            CorrelationID: to.Ptr("user-123"),
            ApplicationProperties: map[string]any{
                "eventType": "click",
            },
        },
    },
)

Get Rule

func (ac *Client) GetRule(ctx context.Context, topicName, subscriptionName, ruleName string, options *GetRuleOptions) (*GetRuleResponse, error)

Gets a rule for a subscription.

Example:

resp, err := adminClient.GetRule(
    context.Background(),
    "events",
    "high-priority-subscription",
    "HighPriorityFilter",
    nil,
)
if err != nil {
    panic(err)
}

fmt.Printf("Rule: %s\n", resp.Name)

// Check filter type
switch filter := resp.Filter.(type) {
case *admin.SQLFilter:
    fmt.Printf("SQL Filter: %s\n", filter.Expression)
case *admin.CorrelationFilter:
    fmt.Printf("Correlation Filter - Subject: %v\n", filter.Subject)
case *admin.TrueFilter:
    fmt.Println("True Filter (matches all messages)")
case *admin.FalseFilter:
    fmt.Println("False Filter (matches no messages)")
}

Update Rule

func (ac *Client) UpdateRule(ctx context.Context, topicName, subscriptionName string, properties RuleProperties) (UpdateRuleResponse, error)

Updates an existing rule's filter or action.

Example:

// Get current rule
ruleResp, err := adminClient.GetRule(
    context.Background(),
    "events",
    "high-priority-subscription",
    "HighPriorityFilter",
    nil,
)
if err != nil {
    panic(err)
}

// Update the filter
props := ruleResp.RuleProperties
props.Filter = &admin.SQLFilter{
    Expression: "priority > 7",  // Increased threshold
}

// Apply update
updateResp, err := adminClient.UpdateRule(
    context.Background(),
    "events",
    "high-priority-subscription",
    props,
)
if err != nil {
    panic(err)
}

fmt.Printf("Updated rule: %s\n", updateResp.Name)

Delete Rule

func (ac *Client) DeleteRule(ctx context.Context, topicName, subscriptionName, ruleName string, options *DeleteRuleOptions) (DeleteRuleResponse, error)

Deletes a rule from a subscription.

Example:

_, err := adminClient.DeleteRule(
    context.Background(),
    "events",
    "high-priority-subscription",
    "HighPriorityFilter",
    nil,
)
if err != nil {
    panic(err)
}

fmt.Println("Rule deleted successfully")

List Rules

func (ac *Client) NewListRulesPager(topicName, subscriptionName string, options *ListRulesOptions) *runtime.Pager[ListRulesResponse]

Creates a pager for listing all rules for a subscription.

Example:

pager := adminClient.NewListRulesPager("events", "high-priority-subscription", &admin.ListRulesOptions{
    MaxPageSize: 20,
})

for pager.More() {
    page, err := pager.NextPage(context.Background())
    if err != nil {
        panic(err)
    }

    for _, rule := range page.Rules {
        fmt.Printf("Rule: %s\n", rule.Name)

        switch filter := rule.Filter.(type) {
        case *admin.SQLFilter:
            fmt.Printf("  SQL: %s\n", filter.Expression)
        case *admin.CorrelationFilter:
            fmt.Printf("  Correlation - Subject: %v\n", filter.Subject)
        }

        if rule.Action != nil {
            switch action := rule.Action.(type) {
            case *admin.SQLAction:
                fmt.Printf("  Action: %s\n", action.Expression)
            }
        }
    }
}

Filter Types

SQLFilter

type SQLFilter struct {
    Expression string
    Parameters map[string]any
}

SQL expression filter that evaluates to true for messages matching the WHERE clause. The expression uses SQL-92 syntax with support for message properties.

  • Expression: SQL WHERE clause (e.g., "priority > 5", "category = 'important'")
  • Parameters: Parameter values (string, number, or boolean) referenced in the expression with @name syntax

Example:

filter := &admin.SQLFilter{
    Expression: "sys.Label = @category AND amount > @minAmount",
    Parameters: map[string]any{
        "category":  "order",
        "minAmount": 100.0,
    },
}

Supported message properties:

  • sys.Label - Message.Subject
  • sys.CorrelationId - Message.CorrelationID
  • sys.MessageId - Message.MessageID
  • sys.To - Message.To
  • sys.ReplyTo - Message.ReplyTo
  • sys.SessionId - Message.SessionID
  • Custom properties from ApplicationProperties

CorrelationFilter

type CorrelationFilter struct {
    ApplicationProperties map[string]any
    ContentType *string
    CorrelationID *string
    MessageID *string
    ReplyTo *string
    ReplyToSessionID *string
    SessionID *string
    Subject *string
    To *string
}

Property-based correlation filter that matches messages where all specified properties match. More efficient than SQL filters for simple property matching.

  • ApplicationProperties: Custom application properties to match
  • ContentType: Match message ContentType
  • CorrelationID: Match message CorrelationID
  • MessageID: Match message MessageID
  • ReplyTo: Match message ReplyTo
  • ReplyToSessionID: Match message ReplyToSessionID
  • SessionID: Match message SessionID
  • Subject: Match message Subject
  • To: Match message To

Example:

filter := &admin.CorrelationFilter{
    Subject:       to.Ptr("order.created"),
    CorrelationID: to.Ptr("correlation-123"),
    ApplicationProperties: map[string]any{
        "region":   "us-west",
        "priority": 5,
    },
}

TrueFilter

type TrueFilter struct{}

Filter that always evaluates to true, matching all messages. This is the default filter when a subscription is created.

Example:

filter := &admin.TrueFilter{}

FalseFilter

type FalseFilter struct{}

Filter that always evaluates to false, matching no messages.

Example:

filter := &admin.FalseFilter{}

UnknownRuleFilter

type UnknownRuleFilter struct {
    Type string
    RawXML []byte
}

Represents a filter type not yet supported by this SDK version. Update to a newer SDK version to handle this filter type.

  • Type: Service Bus filter type identifier
  • RawXML: Raw XML content from the service

Action Types

SQLAction

type SQLAction struct {
    Expression string
    Parameters map[string]any
}

SQL transformation action that modifies message properties. Actions are executed after a message matches the filter.

  • Expression: SQL transformation expression (e.g., "SET priority = 10")
  • Parameters: Parameter values (string, number, or boolean) referenced in the expression

Example:

action := &admin.SQLAction{
    Expression: "SET sys.Label = @newLabel; SET priority = @newPriority",
    Parameters: map[string]any{
        "newLabel":    "processed",
        "newPriority": 10,
    },
}

UnknownRuleAction

type UnknownRuleAction struct {
    Type string
    RawXML []byte
}

Represents an action type not yet supported by this SDK version. Update to a newer SDK version to handle this action type.

  • Type: Service Bus action type identifier
  • RawXML: Raw XML content from the service

Types

RuleFilter

type RuleFilter interface {
    // ... unexported methods
}

Filter interface for subscription rules. Implemented by: SQLFilter, CorrelationFilter, TrueFilter, FalseFilter.

RuleAction

type RuleAction interface {
    // ... unexported methods
}

Action interface for subscription rules. Implemented by: SQLAction.

RuleProperties

type RuleProperties struct {
    Name string
    Filter RuleFilter
    Action RuleAction
}

Rule configuration:

  • Name: Rule name
  • Filter: Rule filter (SQLFilter, CorrelationFilter, TrueFilter, or FalseFilter)
  • Action: Rule action (SQLAction or nil)

Rule

type Rule struct {
    Filter RuleFilter
    Action RuleAction
}

Rule specification without a name field.

CreateRuleOptions

type CreateRuleOptions struct {
    Name *string
    Filter RuleFilter
    Action RuleAction
}

Options for creating a rule:

  • Name: Rule name (defaults to "$Default" if not specified)
  • Filter: Rule filter (SQLFilter, CorrelationFilter, TrueFilter, or FalseFilter)
  • Action: Rule action (SQLAction or nil)

CreateRuleResponse

type CreateRuleResponse struct {
    RuleProperties
}

Response from creating a rule.

GetRuleOptions

type GetRuleOptions struct {
    // Currently empty, reserved for future expansion
}

Options for getting a rule.

GetRuleResponse

type GetRuleResponse struct {
    RuleProperties
}

Response from getting a rule.

UpdateRuleOptions

type UpdateRuleOptions struct {
    // Currently empty, reserved for future expansion
}

Options for updating a rule.

UpdateRuleResponse

type UpdateRuleResponse struct {
    RuleProperties
}

Response from updating a rule.

DeleteRuleOptions

type DeleteRuleOptions struct {
    // Currently empty, reserved for future expansion
}

Options for deleting a rule.

DeleteRuleResponse

type DeleteRuleResponse struct {
    // Empty response
}

Response from deleting a rule.

ListRulesOptions

type ListRulesOptions struct {
    MaxPageSize int32
}

Options for listing rules:

  • MaxPageSize: Maximum number of rules per page

ListRulesResponse

type ListRulesResponse struct {
    Rules []RuleProperties
}

Response from listing rules.

Usage Patterns

Create Multi-Tier Priority System

func createPriorityRules(adminClient *admin.Client, topicName string) error {
    // Create high-priority subscription
    _, err := adminClient.CreateSubscription(
        context.Background(),
        topicName,
        "high-priority",
        nil,
    )
    if err != nil {
        return err
    }

    // Add high-priority filter
    _, err = adminClient.CreateRule(
        context.Background(),
        topicName,
        "high-priority",
        &admin.CreateRuleOptions{
            Name: to.Ptr("HighPriorityOnly"),
            Filter: &admin.SQLFilter{
                Expression: "priority >= 8",
            },
        },
    )
    if err != nil {
        return err
    }

    // Create medium-priority subscription
    _, err = adminClient.CreateSubscription(
        context.Background(),
        topicName,
        "medium-priority",
        nil,
    )
    if err != nil {
        return err
    }

    _, err = adminClient.CreateRule(
        context.Background(),
        topicName,
        "medium-priority",
        &admin.CreateRuleOptions{
            Name: to.Ptr("MediumPriorityOnly"),
            Filter: &admin.SQLFilter{
                Expression: "priority >= 4 AND priority < 8",
            },
        },
    )

    return err
}

Region-Based Routing

func createRegionRules(adminClient *admin.Client, topicName string, regions []string) error {
    for _, region := range regions {
        subscriptionName := fmt.Sprintf("%s-subscription", region)

        // Create subscription
        _, err := adminClient.CreateSubscription(
            context.Background(),
            topicName,
            subscriptionName,
            nil,
        )
        if err != nil {
            return err
        }

        // Add region filter using correlation filter (more efficient)
        _, err = adminClient.CreateRule(
            context.Background(),
            topicName,
            subscriptionName,
            &admin.CreateRuleOptions{
                Name: to.Ptr(fmt.Sprintf("%sFilter", region)),
                Filter: &admin.CorrelationFilter{
                    ApplicationProperties: map[string]any{
                        "region": region,
                    },
                },
            },
        )
        if err != nil {
            return err
        }
    }

    return nil
}

Content-Based Routing with Actions

func createContentRoutingWithActions(adminClient *admin.Client) error {
    // Create subscription
    _, err := adminClient.CreateSubscription(
        context.Background(),
        "orders",
        "processed-orders",
        nil,
    )
    if err != nil {
        return err
    }

    // Add rule with filter and action
    _, err = adminClient.CreateRule(
        context.Background(),
        "orders",
        "processed-orders",
        &admin.CreateRuleOptions{
            Name: to.Ptr("ProcessOrderRule"),
            Filter: &admin.SQLFilter{
                Expression: "amount > @threshold AND status = @status",
                Parameters: map[string]any{
                    "threshold": 1000.0,
                    "status":    "pending",
                },
            },
            Action: &admin.SQLAction{
                Expression: "SET sys.Label = 'high-value'; SET processed = true",
            },
        },
    )

    return err
}

Session-Based Routing

func createSessionRules(adminClient *admin.Client, topicName string) error {
    // Create session-enabled subscription
    _, err := adminClient.CreateSubscription(
        context.Background(),
        topicName,
        "user-sessions",
        &admin.CreateSubscriptionOptions{
            Properties: &admin.SubscriptionProperties{
                RequiresSession: to.Ptr(true),
            },
        },
    )
    if err != nil {
        return err
    }

    // Add correlation filter for specific session pattern
    _, err = adminClient.CreateRule(
        context.Background(),
        topicName,
        "user-sessions",
        &admin.CreateRuleOptions{
            Name: to.Ptr("UserSessionFilter"),
            Filter: &admin.CorrelationFilter{
                SessionID: to.Ptr("user-*"),  // Pattern matching
                ApplicationProperties: map[string]any{
                    "eventType": "user.action",
                },
            },
        },
    )

    return err
}

Complex SQL Filter

func createComplexFilter(adminClient *admin.Client, topicName, subscriptionName string) error {
    _, err := adminClient.CreateRule(
        context.Background(),
        topicName,
        subscriptionName,
        &admin.CreateRuleOptions{
            Name: to.Ptr("ComplexBusinessRule"),
            Filter: &admin.SQLFilter{
                Expression: `
                    (category = @cat1 OR category = @cat2)
                    AND amount > @minAmount
                    AND (region IN (@reg1, @reg2, @reg3))
                    AND status != @excludeStatus
                `,
                Parameters: map[string]any{
                    "cat1":          "electronics",
                    "cat2":          "computers",
                    "minAmount":     500.0,
                    "reg1":          "us-west",
                    "reg2":          "us-east",
                    "reg3":          "eu-west",
                    "excludeStatus": "cancelled",
                },
            },
        },
    )

    return err
}

Update Rule Dynamically

func updateFilterThreshold(adminClient *admin.Client, topicName, subscriptionName, ruleName string, newThreshold float64) error {
    // Get current rule
    resp, err := adminClient.GetRule(context.Background(), topicName, subscriptionName, ruleName, nil)
    if err != nil {
        return err
    }

    // Update SQL filter threshold
    if sqlFilter, ok := resp.Filter.(*admin.SQLFilter); ok {
        props := resp.RuleProperties
        props.Filter = &admin.SQLFilter{
            Expression: "amount > @threshold",
            Parameters: map[string]any{
                "threshold": newThreshold,
            },
        }

        _, err = adminClient.UpdateRule(context.Background(), topicName, subscriptionName, props)
        return err
    }

    return fmt.Errorf("rule is not a SQL filter")
}

List and Analyze Rules

func analyzeSubscriptionRules(adminClient *admin.Client, topicName, subscriptionName string) {
    pager := adminClient.NewListRulesPager(topicName, subscriptionName, nil)

    sqlCount := 0
    correlationCount := 0
    withActions := 0

    for pager.More() {
        page, err := pager.NextPage(context.Background())
        if err != nil {
            panic(err)
        }

        for _, rule := range page.Rules {
            switch rule.Filter.(type) {
            case *admin.SQLFilter:
                sqlCount++
            case *admin.CorrelationFilter:
                correlationCount++
            }

            if rule.Action != nil {
                withActions++
            }
        }
    }

    fmt.Printf("Total rules: %d\n", sqlCount+correlationCount)
    fmt.Printf("SQL filters: %d\n", sqlCount)
    fmt.Printf("Correlation filters: %d\n", correlationCount)
    fmt.Printf("Rules with actions: %d\n", withActions)
}

Replace Default Rule

func replaceDefaultRule(adminClient *admin.Client, topicName, subscriptionName string, newFilter admin.RuleFilter) error {
    // Delete default rule (TrueFilter)
    _, err := adminClient.DeleteRule(context.Background(), topicName, subscriptionName, "$Default", nil)
    if err != nil {
        return fmt.Errorf("failed to delete default rule: %w", err)
    }

    // Create new rule with custom filter
    _, err = adminClient.CreateRule(
        context.Background(),
        topicName,
        subscriptionName,
        &admin.CreateRuleOptions{
            Name:   to.Ptr("$Default"),
            Filter: newFilter,
        },
    )
    if err != nil {
        return fmt.Errorf("failed to create new rule: %w", err)
    }

    return nil
}