Client module for Azure Service Bus, a highly reliable cloud messaging service providing real-time and fault-tolerant communication between distributed senders and receivers.
—
Rule management operations in the admin client enable creating, updating, and managing filters and actions for topic subscriptions. Rules determine which messages are delivered to a subscription and can optionally transform message properties.
func (ac *Client) CreateRule(ctx context.Context, topicName, subscriptionName string, options *CreateRuleOptions) (CreateRuleResponse, error)Creates a rule that filters and optionally modifies messages for a subscription.
Example with SQL filter:
resp, err := adminClient.CreateRule(
context.Background(),
"events",
"high-priority-subscription",
&admin.CreateRuleOptions{
Name: to.Ptr("HighPriorityFilter"),
Filter: &admin.SQLFilter{
Expression: "priority > @threshold AND category = @cat",
Parameters: map[string]any{
"threshold": 5,
"cat": "important",
},
},
},
)
if err != nil {
panic(err)
}
fmt.Printf("Created rule: %s\n", resp.Name)Example with correlation filter:
resp, err := adminClient.CreateRule(
context.Background(),
"events",
"user-events-subscription",
&admin.CreateRuleOptions{
Name: to.Ptr("UserEventFilter"),
Filter: &admin.CorrelationFilter{
Subject: to.Ptr("user.action"),
CorrelationID: to.Ptr("user-123"),
ApplicationProperties: map[string]any{
"eventType": "click",
},
},
},
)func (ac *Client) GetRule(ctx context.Context, topicName, subscriptionName, ruleName string, options *GetRuleOptions) (*GetRuleResponse, error)Gets a rule for a subscription.
Example:
resp, err := adminClient.GetRule(
context.Background(),
"events",
"high-priority-subscription",
"HighPriorityFilter",
nil,
)
if err != nil {
panic(err)
}
fmt.Printf("Rule: %s\n", resp.Name)
// Check filter type
switch filter := resp.Filter.(type) {
case *admin.SQLFilter:
fmt.Printf("SQL Filter: %s\n", filter.Expression)
case *admin.CorrelationFilter:
fmt.Printf("Correlation Filter - Subject: %v\n", filter.Subject)
case *admin.TrueFilter:
fmt.Println("True Filter (matches all messages)")
case *admin.FalseFilter:
fmt.Println("False Filter (matches no messages)")
}func (ac *Client) UpdateRule(ctx context.Context, topicName, subscriptionName string, properties RuleProperties) (UpdateRuleResponse, error)Updates an existing rule's filter or action.
Example:
// Get current rule
ruleResp, err := adminClient.GetRule(
context.Background(),
"events",
"high-priority-subscription",
"HighPriorityFilter",
nil,
)
if err != nil {
panic(err)
}
// Update the filter
props := ruleResp.RuleProperties
props.Filter = &admin.SQLFilter{
Expression: "priority > 7", // Increased threshold
}
// Apply update
updateResp, err := adminClient.UpdateRule(
context.Background(),
"events",
"high-priority-subscription",
props,
)
if err != nil {
panic(err)
}
fmt.Printf("Updated rule: %s\n", updateResp.Name)func (ac *Client) DeleteRule(ctx context.Context, topicName, subscriptionName, ruleName string, options *DeleteRuleOptions) (DeleteRuleResponse, error)Deletes a rule from a subscription.
Example:
_, err := adminClient.DeleteRule(
context.Background(),
"events",
"high-priority-subscription",
"HighPriorityFilter",
nil,
)
if err != nil {
panic(err)
}
fmt.Println("Rule deleted successfully")func (ac *Client) NewListRulesPager(topicName, subscriptionName string, options *ListRulesOptions) *runtime.Pager[ListRulesResponse]Creates a pager for listing all rules for a subscription.
Example:
pager := adminClient.NewListRulesPager("events", "high-priority-subscription", &admin.ListRulesOptions{
MaxPageSize: 20,
})
for pager.More() {
page, err := pager.NextPage(context.Background())
if err != nil {
panic(err)
}
for _, rule := range page.Rules {
fmt.Printf("Rule: %s\n", rule.Name)
switch filter := rule.Filter.(type) {
case *admin.SQLFilter:
fmt.Printf(" SQL: %s\n", filter.Expression)
case *admin.CorrelationFilter:
fmt.Printf(" Correlation - Subject: %v\n", filter.Subject)
}
if rule.Action != nil {
switch action := rule.Action.(type) {
case *admin.SQLAction:
fmt.Printf(" Action: %s\n", action.Expression)
}
}
}
}type SQLFilter struct {
Expression string
Parameters map[string]any
}SQL expression filter that evaluates to true for messages matching the WHERE clause. The expression uses SQL-92 syntax with support for message properties.
Example:
filter := &admin.SQLFilter{
Expression: "sys.Label = @category AND amount > @minAmount",
Parameters: map[string]any{
"category": "order",
"minAmount": 100.0,
},
}Supported message properties:
sys.Label - Message.Subjectsys.CorrelationId - Message.CorrelationIDsys.MessageId - Message.MessageIDsys.To - Message.Tosys.ReplyTo - Message.ReplyTosys.SessionId - Message.SessionIDtype CorrelationFilter struct {
ApplicationProperties map[string]any
ContentType *string
CorrelationID *string
MessageID *string
ReplyTo *string
ReplyToSessionID *string
SessionID *string
Subject *string
To *string
}Property-based correlation filter that matches messages where all specified properties match. More efficient than SQL filters for simple property matching.
Example:
filter := &admin.CorrelationFilter{
Subject: to.Ptr("order.created"),
CorrelationID: to.Ptr("correlation-123"),
ApplicationProperties: map[string]any{
"region": "us-west",
"priority": 5,
},
}type TrueFilter struct{}Filter that always evaluates to true, matching all messages. This is the default filter when a subscription is created.
Example:
filter := &admin.TrueFilter{}type FalseFilter struct{}Filter that always evaluates to false, matching no messages.
Example:
filter := &admin.FalseFilter{}type UnknownRuleFilter struct {
Type string
RawXML []byte
}Represents a filter type not yet supported by this SDK version. Update to a newer SDK version to handle this filter type.
type SQLAction struct {
Expression string
Parameters map[string]any
}SQL transformation action that modifies message properties. Actions are executed after a message matches the filter.
Example:
action := &admin.SQLAction{
Expression: "SET sys.Label = @newLabel; SET priority = @newPriority",
Parameters: map[string]any{
"newLabel": "processed",
"newPriority": 10,
},
}type UnknownRuleAction struct {
Type string
RawXML []byte
}Represents an action type not yet supported by this SDK version. Update to a newer SDK version to handle this action type.
type RuleFilter interface {
// ... unexported methods
}Filter interface for subscription rules. Implemented by: SQLFilter, CorrelationFilter, TrueFilter, FalseFilter.
type RuleAction interface {
// ... unexported methods
}Action interface for subscription rules. Implemented by: SQLAction.
type RuleProperties struct {
Name string
Filter RuleFilter
Action RuleAction
}Rule configuration:
type Rule struct {
Filter RuleFilter
Action RuleAction
}Rule specification without a name field.
type CreateRuleOptions struct {
Name *string
Filter RuleFilter
Action RuleAction
}Options for creating a rule:
type CreateRuleResponse struct {
RuleProperties
}Response from creating a rule.
type GetRuleOptions struct {
// Currently empty, reserved for future expansion
}Options for getting a rule.
type GetRuleResponse struct {
RuleProperties
}Response from getting a rule.
type UpdateRuleOptions struct {
// Currently empty, reserved for future expansion
}Options for updating a rule.
type UpdateRuleResponse struct {
RuleProperties
}Response from updating a rule.
type DeleteRuleOptions struct {
// Currently empty, reserved for future expansion
}Options for deleting a rule.
type DeleteRuleResponse struct {
// Empty response
}Response from deleting a rule.
type ListRulesOptions struct {
MaxPageSize int32
}Options for listing rules:
type ListRulesResponse struct {
Rules []RuleProperties
}Response from listing rules.
func createPriorityRules(adminClient *admin.Client, topicName string) error {
// Create high-priority subscription
_, err := adminClient.CreateSubscription(
context.Background(),
topicName,
"high-priority",
nil,
)
if err != nil {
return err
}
// Add high-priority filter
_, err = adminClient.CreateRule(
context.Background(),
topicName,
"high-priority",
&admin.CreateRuleOptions{
Name: to.Ptr("HighPriorityOnly"),
Filter: &admin.SQLFilter{
Expression: "priority >= 8",
},
},
)
if err != nil {
return err
}
// Create medium-priority subscription
_, err = adminClient.CreateSubscription(
context.Background(),
topicName,
"medium-priority",
nil,
)
if err != nil {
return err
}
_, err = adminClient.CreateRule(
context.Background(),
topicName,
"medium-priority",
&admin.CreateRuleOptions{
Name: to.Ptr("MediumPriorityOnly"),
Filter: &admin.SQLFilter{
Expression: "priority >= 4 AND priority < 8",
},
},
)
return err
}func createRegionRules(adminClient *admin.Client, topicName string, regions []string) error {
for _, region := range regions {
subscriptionName := fmt.Sprintf("%s-subscription", region)
// Create subscription
_, err := adminClient.CreateSubscription(
context.Background(),
topicName,
subscriptionName,
nil,
)
if err != nil {
return err
}
// Add region filter using correlation filter (more efficient)
_, err = adminClient.CreateRule(
context.Background(),
topicName,
subscriptionName,
&admin.CreateRuleOptions{
Name: to.Ptr(fmt.Sprintf("%sFilter", region)),
Filter: &admin.CorrelationFilter{
ApplicationProperties: map[string]any{
"region": region,
},
},
},
)
if err != nil {
return err
}
}
return nil
}func createContentRoutingWithActions(adminClient *admin.Client) error {
// Create subscription
_, err := adminClient.CreateSubscription(
context.Background(),
"orders",
"processed-orders",
nil,
)
if err != nil {
return err
}
// Add rule with filter and action
_, err = adminClient.CreateRule(
context.Background(),
"orders",
"processed-orders",
&admin.CreateRuleOptions{
Name: to.Ptr("ProcessOrderRule"),
Filter: &admin.SQLFilter{
Expression: "amount > @threshold AND status = @status",
Parameters: map[string]any{
"threshold": 1000.0,
"status": "pending",
},
},
Action: &admin.SQLAction{
Expression: "SET sys.Label = 'high-value'; SET processed = true",
},
},
)
return err
}func createSessionRules(adminClient *admin.Client, topicName string) error {
// Create session-enabled subscription
_, err := adminClient.CreateSubscription(
context.Background(),
topicName,
"user-sessions",
&admin.CreateSubscriptionOptions{
Properties: &admin.SubscriptionProperties{
RequiresSession: to.Ptr(true),
},
},
)
if err != nil {
return err
}
// Add correlation filter for specific session pattern
_, err = adminClient.CreateRule(
context.Background(),
topicName,
"user-sessions",
&admin.CreateRuleOptions{
Name: to.Ptr("UserSessionFilter"),
Filter: &admin.CorrelationFilter{
SessionID: to.Ptr("user-*"), // Pattern matching
ApplicationProperties: map[string]any{
"eventType": "user.action",
},
},
},
)
return err
}func createComplexFilter(adminClient *admin.Client, topicName, subscriptionName string) error {
_, err := adminClient.CreateRule(
context.Background(),
topicName,
subscriptionName,
&admin.CreateRuleOptions{
Name: to.Ptr("ComplexBusinessRule"),
Filter: &admin.SQLFilter{
Expression: `
(category = @cat1 OR category = @cat2)
AND amount > @minAmount
AND (region IN (@reg1, @reg2, @reg3))
AND status != @excludeStatus
`,
Parameters: map[string]any{
"cat1": "electronics",
"cat2": "computers",
"minAmount": 500.0,
"reg1": "us-west",
"reg2": "us-east",
"reg3": "eu-west",
"excludeStatus": "cancelled",
},
},
},
)
return err
}func updateFilterThreshold(adminClient *admin.Client, topicName, subscriptionName, ruleName string, newThreshold float64) error {
// Get current rule
resp, err := adminClient.GetRule(context.Background(), topicName, subscriptionName, ruleName, nil)
if err != nil {
return err
}
// Update SQL filter threshold
if sqlFilter, ok := resp.Filter.(*admin.SQLFilter); ok {
props := resp.RuleProperties
props.Filter = &admin.SQLFilter{
Expression: "amount > @threshold",
Parameters: map[string]any{
"threshold": newThreshold,
},
}
_, err = adminClient.UpdateRule(context.Background(), topicName, subscriptionName, props)
return err
}
return fmt.Errorf("rule is not a SQL filter")
}func analyzeSubscriptionRules(adminClient *admin.Client, topicName, subscriptionName string) {
pager := adminClient.NewListRulesPager(topicName, subscriptionName, nil)
sqlCount := 0
correlationCount := 0
withActions := 0
for pager.More() {
page, err := pager.NextPage(context.Background())
if err != nil {
panic(err)
}
for _, rule := range page.Rules {
switch rule.Filter.(type) {
case *admin.SQLFilter:
sqlCount++
case *admin.CorrelationFilter:
correlationCount++
}
if rule.Action != nil {
withActions++
}
}
}
fmt.Printf("Total rules: %d\n", sqlCount+correlationCount)
fmt.Printf("SQL filters: %d\n", sqlCount)
fmt.Printf("Correlation filters: %d\n", correlationCount)
fmt.Printf("Rules with actions: %d\n", withActions)
}func replaceDefaultRule(adminClient *admin.Client, topicName, subscriptionName string, newFilter admin.RuleFilter) error {
// Delete default rule (TrueFilter)
_, err := adminClient.DeleteRule(context.Background(), topicName, subscriptionName, "$Default", nil)
if err != nil {
return fmt.Errorf("failed to delete default rule: %w", err)
}
// Create new rule with custom filter
_, err = adminClient.CreateRule(
context.Background(),
topicName,
subscriptionName,
&admin.CreateRuleOptions{
Name: to.Ptr("$Default"),
Filter: newFilter,
},
)
if err != nil {
return fmt.Errorf("failed to create new rule: %w", err)
}
return nil
}Install with Tessl CLI
npx tessl i tessl/golang-github-com--azure--azure-sdk-for-go--sdk--messaging--azservicebus