tessl install tessl/golang-github-com--azure--azure-sdk-for-go--sdk--messaging--azservicebus@1.10.1Client module for Azure Service Bus, a highly reliable cloud messaging service providing real-time and fault-tolerant communication between distributed senders and receivers.
Rule management operations in the admin client enable creating, updating, and managing filters and actions for topic subscriptions. Rules determine which messages are delivered to a subscription and can optionally transform message properties.
func (ac *Client) CreateRule(ctx context.Context, topicName, subscriptionName string, options *CreateRuleOptions) (CreateRuleResponse, error)Creates a rule that filters and optionally modifies messages for a subscription.
Example with SQL filter:
resp, err := adminClient.CreateRule(
context.Background(),
"events",
"high-priority-subscription",
&admin.CreateRuleOptions{
Name: to.Ptr("HighPriorityFilter"),
Filter: &admin.SQLFilter{
Expression: "priority > @threshold AND category = @cat",
Parameters: map[string]any{
"threshold": 5,
"cat": "important",
},
},
},
)
if err != nil {
panic(err)
}
fmt.Printf("Created rule: %s\n", resp.Name)Example with correlation filter:
resp, err := adminClient.CreateRule(
context.Background(),
"events",
"user-events-subscription",
&admin.CreateRuleOptions{
Name: to.Ptr("UserEventFilter"),
Filter: &admin.CorrelationFilter{
Subject: to.Ptr("user.action"),
CorrelationID: to.Ptr("user-123"),
ApplicationProperties: map[string]any{
"eventType": "click",
},
},
},
)func (ac *Client) GetRule(ctx context.Context, topicName, subscriptionName, ruleName string, options *GetRuleOptions) (*GetRuleResponse, error)Gets a rule for a subscription.
Example:
resp, err := adminClient.GetRule(
context.Background(),
"events",
"high-priority-subscription",
"HighPriorityFilter",
nil,
)
if err != nil {
panic(err)
}
fmt.Printf("Rule: %s\n", resp.Name)
// Check filter type
switch filter := resp.Filter.(type) {
case *admin.SQLFilter:
fmt.Printf("SQL Filter: %s\n", filter.Expression)
case *admin.CorrelationFilter:
fmt.Printf("Correlation Filter - Subject: %v\n", filter.Subject)
case *admin.TrueFilter:
fmt.Println("True Filter (matches all messages)")
case *admin.FalseFilter:
fmt.Println("False Filter (matches no messages)")
}func (ac *Client) UpdateRule(ctx context.Context, topicName, subscriptionName string, properties RuleProperties) (UpdateRuleResponse, error)Updates an existing rule's filter or action.
Example:
// Get current rule
ruleResp, err := adminClient.GetRule(
context.Background(),
"events",
"high-priority-subscription",
"HighPriorityFilter",
nil,
)
if err != nil {
panic(err)
}
// Update the filter
props := ruleResp.RuleProperties
props.Filter = &admin.SQLFilter{
Expression: "priority > 7", // Increased threshold
}
// Apply update
updateResp, err := adminClient.UpdateRule(
context.Background(),
"events",
"high-priority-subscription",
props,
)
if err != nil {
panic(err)
}
fmt.Printf("Updated rule: %s\n", updateResp.Name)func (ac *Client) DeleteRule(ctx context.Context, topicName, subscriptionName, ruleName string, options *DeleteRuleOptions) (DeleteRuleResponse, error)Deletes a rule from a subscription.
Example:
_, err := adminClient.DeleteRule(
context.Background(),
"events",
"high-priority-subscription",
"HighPriorityFilter",
nil,
)
if err != nil {
panic(err)
}
fmt.Println("Rule deleted successfully")func (ac *Client) NewListRulesPager(topicName, subscriptionName string, options *ListRulesOptions) *runtime.Pager[ListRulesResponse]Creates a pager for listing all rules for a subscription.
Example:
pager := adminClient.NewListRulesPager("events", "high-priority-subscription", &admin.ListRulesOptions{
MaxPageSize: 20,
})
for pager.More() {
page, err := pager.NextPage(context.Background())
if err != nil {
panic(err)
}
for _, rule := range page.Rules {
fmt.Printf("Rule: %s\n", rule.Name)
switch filter := rule.Filter.(type) {
case *admin.SQLFilter:
fmt.Printf(" SQL: %s\n", filter.Expression)
case *admin.CorrelationFilter:
fmt.Printf(" Correlation - Subject: %v\n", filter.Subject)
}
if rule.Action != nil {
switch action := rule.Action.(type) {
case *admin.SQLAction:
fmt.Printf(" Action: %s\n", action.Expression)
}
}
}
}type SQLFilter struct {
Expression string
Parameters map[string]any
}SQL expression filter that evaluates to true for messages matching the WHERE clause. The expression uses SQL-92 syntax with support for message properties.
Example:
filter := &admin.SQLFilter{
Expression: "sys.Label = @category AND amount > @minAmount",
Parameters: map[string]any{
"category": "order",
"minAmount": 100.0,
},
}Supported message properties:
sys.Label - Message.Subjectsys.CorrelationId - Message.CorrelationIDsys.MessageId - Message.MessageIDsys.To - Message.Tosys.ReplyTo - Message.ReplyTosys.SessionId - Message.SessionIDtype CorrelationFilter struct {
ApplicationProperties map[string]any
ContentType *string
CorrelationID *string
MessageID *string
ReplyTo *string
ReplyToSessionID *string
SessionID *string
Subject *string
To *string
}Property-based correlation filter that matches messages where all specified properties match. More efficient than SQL filters for simple property matching.
Example:
filter := &admin.CorrelationFilter{
Subject: to.Ptr("order.created"),
CorrelationID: to.Ptr("correlation-123"),
ApplicationProperties: map[string]any{
"region": "us-west",
"priority": 5,
},
}type TrueFilter struct{}Filter that always evaluates to true, matching all messages. This is the default filter when a subscription is created.
Example:
filter := &admin.TrueFilter{}type FalseFilter struct{}Filter that always evaluates to false, matching no messages.
Example:
filter := &admin.FalseFilter{}type UnknownRuleFilter struct {
Type string
RawXML []byte
}Represents a filter type not yet supported by this SDK version. Update to a newer SDK version to handle this filter type.
type SQLAction struct {
Expression string
Parameters map[string]any
}SQL transformation action that modifies message properties. Actions are executed after a message matches the filter.
Example:
action := &admin.SQLAction{
Expression: "SET sys.Label = @newLabel; SET priority = @newPriority",
Parameters: map[string]any{
"newLabel": "processed",
"newPriority": 10,
},
}type UnknownRuleAction struct {
Type string
RawXML []byte
}Represents an action type not yet supported by this SDK version. Update to a newer SDK version to handle this action type.
type RuleFilter interface {
// ... unexported methods
}Filter interface for subscription rules. Implemented by: SQLFilter, CorrelationFilter, TrueFilter, FalseFilter.
type RuleAction interface {
// ... unexported methods
}Action interface for subscription rules. Implemented by: SQLAction.
type RuleProperties struct {
Name string
Filter RuleFilter
Action RuleAction
}Rule configuration:
type Rule struct {
Filter RuleFilter
Action RuleAction
}Rule specification without a name field.
type CreateRuleOptions struct {
Name *string
Filter RuleFilter
Action RuleAction
}Options for creating a rule:
type CreateRuleResponse struct {
RuleProperties
}Response from creating a rule.
type GetRuleOptions struct {
// Currently empty, reserved for future expansion
}Options for getting a rule.
type GetRuleResponse struct {
RuleProperties
}Response from getting a rule.
type UpdateRuleOptions struct {
// Currently empty, reserved for future expansion
}Options for updating a rule.
type UpdateRuleResponse struct {
RuleProperties
}Response from updating a rule.
type DeleteRuleOptions struct {
// Currently empty, reserved for future expansion
}Options for deleting a rule.
type DeleteRuleResponse struct {
// Empty response
}Response from deleting a rule.
type ListRulesOptions struct {
MaxPageSize int32
}Options for listing rules:
type ListRulesResponse struct {
Rules []RuleProperties
}Response from listing rules.
func createPriorityRules(adminClient *admin.Client, topicName string) error {
// Create high-priority subscription
_, err := adminClient.CreateSubscription(
context.Background(),
topicName,
"high-priority",
nil,
)
if err != nil {
return err
}
// Add high-priority filter
_, err = adminClient.CreateRule(
context.Background(),
topicName,
"high-priority",
&admin.CreateRuleOptions{
Name: to.Ptr("HighPriorityOnly"),
Filter: &admin.SQLFilter{
Expression: "priority >= 8",
},
},
)
if err != nil {
return err
}
// Create medium-priority subscription
_, err = adminClient.CreateSubscription(
context.Background(),
topicName,
"medium-priority",
nil,
)
if err != nil {
return err
}
_, err = adminClient.CreateRule(
context.Background(),
topicName,
"medium-priority",
&admin.CreateRuleOptions{
Name: to.Ptr("MediumPriorityOnly"),
Filter: &admin.SQLFilter{
Expression: "priority >= 4 AND priority < 8",
},
},
)
return err
}func createRegionRules(adminClient *admin.Client, topicName string, regions []string) error {
for _, region := range regions {
subscriptionName := fmt.Sprintf("%s-subscription", region)
// Create subscription
_, err := adminClient.CreateSubscription(
context.Background(),
topicName,
subscriptionName,
nil,
)
if err != nil {
return err
}
// Add region filter using correlation filter (more efficient)
_, err = adminClient.CreateRule(
context.Background(),
topicName,
subscriptionName,
&admin.CreateRuleOptions{
Name: to.Ptr(fmt.Sprintf("%sFilter", region)),
Filter: &admin.CorrelationFilter{
ApplicationProperties: map[string]any{
"region": region,
},
},
},
)
if err != nil {
return err
}
}
return nil
}func createContentRoutingWithActions(adminClient *admin.Client) error {
// Create subscription
_, err := adminClient.CreateSubscription(
context.Background(),
"orders",
"processed-orders",
nil,
)
if err != nil {
return err
}
// Add rule with filter and action
_, err = adminClient.CreateRule(
context.Background(),
"orders",
"processed-orders",
&admin.CreateRuleOptions{
Name: to.Ptr("ProcessOrderRule"),
Filter: &admin.SQLFilter{
Expression: "amount > @threshold AND status = @status",
Parameters: map[string]any{
"threshold": 1000.0,
"status": "pending",
},
},
Action: &admin.SQLAction{
Expression: "SET sys.Label = 'high-value'; SET processed = true",
},
},
)
return err
}func createSessionRules(adminClient *admin.Client, topicName string) error {
// Create session-enabled subscription
_, err := adminClient.CreateSubscription(
context.Background(),
topicName,
"user-sessions",
&admin.CreateSubscriptionOptions{
Properties: &admin.SubscriptionProperties{
RequiresSession: to.Ptr(true),
},
},
)
if err != nil {
return err
}
// Add correlation filter for specific session pattern
_, err = adminClient.CreateRule(
context.Background(),
topicName,
"user-sessions",
&admin.CreateRuleOptions{
Name: to.Ptr("UserSessionFilter"),
Filter: &admin.CorrelationFilter{
SessionID: to.Ptr("user-*"), // Pattern matching
ApplicationProperties: map[string]any{
"eventType": "user.action",
},
},
},
)
return err
}func createComplexFilter(adminClient *admin.Client, topicName, subscriptionName string) error {
_, err := adminClient.CreateRule(
context.Background(),
topicName,
subscriptionName,
&admin.CreateRuleOptions{
Name: to.Ptr("ComplexBusinessRule"),
Filter: &admin.SQLFilter{
Expression: `
(category = @cat1 OR category = @cat2)
AND amount > @minAmount
AND (region IN (@reg1, @reg2, @reg3))
AND status != @excludeStatus
`,
Parameters: map[string]any{
"cat1": "electronics",
"cat2": "computers",
"minAmount": 500.0,
"reg1": "us-west",
"reg2": "us-east",
"reg3": "eu-west",
"excludeStatus": "cancelled",
},
},
},
)
return err
}func updateFilterThreshold(adminClient *admin.Client, topicName, subscriptionName, ruleName string, newThreshold float64) error {
// Get current rule
resp, err := adminClient.GetRule(context.Background(), topicName, subscriptionName, ruleName, nil)
if err != nil {
return err
}
// Update SQL filter threshold
if sqlFilter, ok := resp.Filter.(*admin.SQLFilter); ok {
props := resp.RuleProperties
props.Filter = &admin.SQLFilter{
Expression: "amount > @threshold",
Parameters: map[string]any{
"threshold": newThreshold,
},
}
_, err = adminClient.UpdateRule(context.Background(), topicName, subscriptionName, props)
return err
}
return fmt.Errorf("rule is not a SQL filter")
}func analyzeSubscriptionRules(adminClient *admin.Client, topicName, subscriptionName string) {
pager := adminClient.NewListRulesPager(topicName, subscriptionName, nil)
sqlCount := 0
correlationCount := 0
withActions := 0
for pager.More() {
page, err := pager.NextPage(context.Background())
if err != nil {
panic(err)
}
for _, rule := range page.Rules {
switch rule.Filter.(type) {
case *admin.SQLFilter:
sqlCount++
case *admin.CorrelationFilter:
correlationCount++
}
if rule.Action != nil {
withActions++
}
}
}
fmt.Printf("Total rules: %d\n", sqlCount+correlationCount)
fmt.Printf("SQL filters: %d\n", sqlCount)
fmt.Printf("Correlation filters: %d\n", correlationCount)
fmt.Printf("Rules with actions: %d\n", withActions)
}func replaceDefaultRule(adminClient *admin.Client, topicName, subscriptionName string, newFilter admin.RuleFilter) error {
// Delete default rule (TrueFilter)
_, err := adminClient.DeleteRule(context.Background(), topicName, subscriptionName, "$Default", nil)
if err != nil {
return fmt.Errorf("failed to delete default rule: %w", err)
}
// Create new rule with custom filter
_, err = adminClient.CreateRule(
context.Background(),
topicName,
subscriptionName,
&admin.CreateRuleOptions{
Name: to.Ptr("$Default"),
Filter: newFilter,
},
)
if err != nil {
return fmt.Errorf("failed to create new rule: %w", err)
}
return nil
}