or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
golangpkg:golang/github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus@v1.10.0

docs

admin

index.mdqueues.mdrules.mdsubscriptions.mdtopics.md
client.mderrors.mdindex.mdmessaging.mdreceiver.mdsender.mdsessions.md
tile.json

tessl/golang-github-com--azure--azure-sdk-for-go--sdk--messaging--azservicebus

tessl install tessl/golang-github-com--azure--azure-sdk-for-go--sdk--messaging--azservicebus@1.10.1

Client module for Azure Service Bus, a highly reliable cloud messaging service providing real-time and fault-tolerant communication between distributed senders and receivers.

subscriptions.mddocs/admin/

Subscription Management

Subscription management operations in the admin client enable creating, configuring, updating, and monitoring topic subscriptions for publish-subscribe messaging patterns.

Capabilities

Create Subscription

func (ac *Client) CreateSubscription(ctx context.Context, topicName, subscriptionName string, options *CreateSubscriptionOptions) (CreateSubscriptionResponse, error)

Creates a subscription to a topic with configurable properties.

Example:

resp, err := adminClient.CreateSubscription(
    context.Background(),
    "events",
    "processor-subscription",
    &admin.CreateSubscriptionOptions{
        Properties: &admin.SubscriptionProperties{
            LockDuration:                                to.Ptr("PT1M"),
            RequiresSession:                             to.Ptr(false),
            DefaultMessageTimeToLive:                    to.Ptr("P7D"),
            DeadLetteringOnMessageExpiration:            to.Ptr(true),
            EnableDeadLetteringOnFilterEvaluationExceptions: to.Ptr(true),
            MaxDeliveryCount:                            to.Ptr(int32(5)),
            EnableBatchedOperations:                     to.Ptr(true),
        },
    },
)
if err != nil {
    panic(err)
}

fmt.Printf("Created subscription: %s on topic: %s\n", resp.SubscriptionName, resp.TopicName)

Get Subscription

func (ac *Client) GetSubscription(ctx context.Context, topicName, subscriptionName string, options *GetSubscriptionOptions) (*GetSubscriptionResponse, error)

Gets subscription configuration properties. Returns nil response and nil error if the subscription does not exist.

Example:

resp, err := adminClient.GetSubscription(context.Background(), "events", "processor-subscription", nil)
if err != nil {
    panic(err)
}

if resp == nil {
    fmt.Println("Subscription does not exist")
    return
}

fmt.Printf("Subscription: %s\n", resp.SubscriptionName)
fmt.Printf("Topic: %s\n", resp.TopicName)
if resp.MaxDeliveryCount != nil {
    fmt.Printf("Max Delivery Count: %d\n", *resp.MaxDeliveryCount)
}

Get Subscription Runtime Properties

func (ac *Client) GetSubscriptionRuntimeProperties(ctx context.Context, topicName, subscriptionName string, options *GetSubscriptionRuntimePropertiesOptions) (*GetSubscriptionRuntimePropertiesResponse, error)

Gets runtime statistics for a subscription including message counts. Returns nil response and nil error if the subscription does not exist.

Example:

resp, err := adminClient.GetSubscriptionRuntimeProperties(
    context.Background(),
    "events",
    "processor-subscription",
    nil,
)
if err != nil {
    panic(err)
}

if resp != nil {
    fmt.Printf("Subscription: %s\n", resp.SubscriptionName)
    fmt.Printf("Topic: %s\n", resp.TopicName)
    fmt.Printf("Active Messages: %d\n", resp.ActiveMessageCount)
    fmt.Printf("Dead Letter Messages: %d\n", resp.DeadLetterMessageCount)
    fmt.Printf("Total Messages: %d\n", resp.TotalMessageCount)
    fmt.Printf("Created At: %s\n", resp.CreatedAt)
}

Update Subscription

func (ac *Client) UpdateSubscription(ctx context.Context, topicName, subscriptionName string, properties SubscriptionProperties, options *UpdateSubscriptionOptions) (UpdateSubscriptionResponse, error)

Updates an existing subscription's configuration properties.

Example:

// Get current properties
subResp, err := adminClient.GetSubscription(context.Background(), "events", "processor-subscription", nil)
if err != nil {
    panic(err)
}

// Modify properties
props := subResp.SubscriptionProperties
props.MaxDeliveryCount = to.Ptr(int32(10))
props.LockDuration = to.Ptr("PT2M")

// Update subscription
updateResp, err := adminClient.UpdateSubscription(
    context.Background(),
    "events",
    "processor-subscription",
    props,
    nil,
)
if err != nil {
    panic(err)
}

fmt.Printf("Updated subscription: %s\n", updateResp.SubscriptionName)

Delete Subscription

func (ac *Client) DeleteSubscription(ctx context.Context, topicName, subscriptionName string, options *DeleteSubscriptionOptions) (DeleteSubscriptionResponse, error)

Deletes a subscription.

Example:

_, err := adminClient.DeleteSubscription(context.Background(), "events", "processor-subscription", nil)
if err != nil {
    panic(err)
}

fmt.Println("Subscription deleted successfully")

List Subscriptions

func (ac *Client) NewListSubscriptionsPager(topicName string, options *ListSubscriptionsOptions) *runtime.Pager[ListSubscriptionsResponse]

Creates a pager for listing all subscriptions for a topic with their configuration properties.

Example:

pager := adminClient.NewListSubscriptionsPager("events", &admin.ListSubscriptionsOptions{
    MaxPageSize: 20,
})

for pager.More() {
    page, err := pager.NextPage(context.Background())
    if err != nil {
        panic(err)
    }

    for _, sub := range page.Subscriptions {
        fmt.Printf("Subscription: %s\n", sub.SubscriptionName)
        if sub.RequiresSession != nil && *sub.RequiresSession {
            fmt.Println("  - Session-enabled")
        }
        if sub.ForwardTo != nil {
            fmt.Printf("  - Forwards to: %s\n", *sub.ForwardTo)
        }
    }
}

List Subscription Runtime Properties

func (ac *Client) NewListSubscriptionsRuntimePropertiesPager(topicName string, options *ListSubscriptionsRuntimePropertiesOptions) *runtime.Pager[ListSubscriptionsRuntimePropertiesResponse]

Creates a pager for listing runtime properties of all subscriptions for a topic.

Example:

pager := adminClient.NewListSubscriptionsRuntimePropertiesPager("events", &admin.ListSubscriptionsRuntimePropertiesOptions{
    MaxPageSize: 20,
})

for pager.More() {
    page, err := pager.NextPage(context.Background())
    if err != nil {
        panic(err)
    }

    for _, subProps := range page.SubscriptionRuntimeProperties {
        fmt.Printf("%s: %d active, %d dead-letter\n",
            subProps.SubscriptionName,
            subProps.ActiveMessageCount,
            subProps.DeadLetterMessageCount,
        )
    }
}

Types

SubscriptionProperties

type SubscriptionProperties struct {
    LockDuration *string
    RequiresSession *bool
    DefaultMessageTimeToLive *string
    DeadLetteringOnMessageExpiration *bool
    EnableDeadLetteringOnFilterEvaluationExceptions *bool
    MaxDeliveryCount *int32
    Status *EntityStatus
    AutoDeleteOnIdle *string
    ForwardTo *string
    ForwardDeadLetteredMessagesTo *string
    EnableBatchedOperations *bool
    UserMetadata *string
    DefaultRule *RuleProperties
}

Configuration properties for a subscription:

  • LockDuration: Duration a message is locked in PeekLock mode (ISO 8601 format, e.g., "PT1M" for 1 minute). Default: 1 minute.
  • RequiresSession: Enable session support for FIFO message ordering. Default: false.
  • DefaultMessageTimeToLive: Default message expiration time (ISO 8601 duration, e.g., "P7D" for 7 days).
  • DeadLetteringOnMessageExpiration: Move expired messages to dead letter queue.
  • EnableDeadLetteringOnFilterEvaluationExceptions: Move messages to dead letter queue when filter evaluation fails.
  • MaxDeliveryCount: Maximum delivery attempts before dead-lettering. Default: 10.
  • Status: Subscription status (Active, Disabled, SendDisabled, ReceiveDisabled).
  • AutoDeleteOnIdle: Auto-delete subscription after idle period (ISO 8601 duration).
  • ForwardTo: Queue or topic name to forward messages to.
  • ForwardDeadLetteredMessagesTo: Queue or topic name to forward dead-lettered messages to.
  • EnableBatchedOperations: Enable server-side batching for improved throughput.
  • UserMetadata: Custom metadata string.
  • DefaultRule: Default rule created when the subscription is created.

SubscriptionRuntimeProperties

type SubscriptionRuntimeProperties struct {
    TotalMessageCount int64
    ActiveMessageCount int32
    DeadLetterMessageCount int32
    TransferMessageCount int32
    TransferDeadLetterMessageCount int32
    AccessedAt time.Time
    CreatedAt time.Time
    UpdatedAt time.Time
}

Runtime statistics for a subscription:

  • TotalMessageCount: Total number of messages in the subscription
  • ActiveMessageCount: Number of active messages
  • DeadLetterMessageCount: Number of dead-lettered messages
  • TransferMessageCount: Messages pending forwarding
  • TransferDeadLetterMessageCount: Messages dead-lettered during forwarding
  • AccessedAt: Last access timestamp
  • CreatedAt: Subscription creation timestamp
  • UpdatedAt: Last update timestamp

CreateSubscriptionOptions

type CreateSubscriptionOptions struct {
    Properties *SubscriptionProperties
}

Options for creating a subscription:

  • Properties: Subscription configuration properties

CreateSubscriptionResponse

type CreateSubscriptionResponse struct {
    SubscriptionName string
    TopicName string
    SubscriptionProperties
}

Response from creating a subscription, includes the subscription name, topic name, and all properties.

GetSubscriptionOptions

type GetSubscriptionOptions struct {
    // Currently empty, reserved for future expansion
}

Options for getting a subscription.

GetSubscriptionResponse

type GetSubscriptionResponse struct {
    SubscriptionName string
    TopicName string
    SubscriptionProperties
}

Response from getting a subscription, includes the subscription name, topic name, and all properties.

GetSubscriptionRuntimePropertiesOptions

type GetSubscriptionRuntimePropertiesOptions struct {
    // Currently empty, reserved for future expansion
}

Options for getting subscription runtime properties.

GetSubscriptionRuntimePropertiesResponse

type GetSubscriptionRuntimePropertiesResponse struct {
    TopicName string
    SubscriptionName string
    SubscriptionRuntimeProperties
}

Response from getting subscription runtime properties, includes the subscription name, topic name, and all runtime statistics.

UpdateSubscriptionOptions

type UpdateSubscriptionOptions struct {
    // Currently empty, reserved for future expansion
}

Options for updating a subscription.

UpdateSubscriptionResponse

type UpdateSubscriptionResponse struct {
    TopicName string
    SubscriptionName string
    SubscriptionProperties
}

Response from updating a subscription, includes the subscription name, topic name, and updated properties.

DeleteSubscriptionOptions

type DeleteSubscriptionOptions struct {
    // Currently empty, reserved for future expansion
}

Options for deleting a subscription.

DeleteSubscriptionResponse

type DeleteSubscriptionResponse struct {
    // Empty response
}

Response from deleting a subscription.

ListSubscriptionsOptions

type ListSubscriptionsOptions struct {
    MaxPageSize int32
}

Options for listing subscriptions:

  • MaxPageSize: Maximum number of subscriptions per page

ListSubscriptionsResponse

type ListSubscriptionsResponse struct {
    Subscriptions []SubscriptionPropertiesItem
}

Response from listing subscriptions.

SubscriptionPropertiesItem

type SubscriptionPropertiesItem struct {
    TopicName string
    SubscriptionName string
    SubscriptionProperties
}

A single subscription item in the list response.

ListSubscriptionsRuntimePropertiesOptions

type ListSubscriptionsRuntimePropertiesOptions struct {
    MaxPageSize int32
}

Options for listing subscription runtime properties:

  • MaxPageSize: Maximum number of subscriptions per page

ListSubscriptionsRuntimePropertiesResponse

type ListSubscriptionsRuntimePropertiesResponse struct {
    SubscriptionRuntimeProperties []SubscriptionRuntimePropertiesItem
}

Response from listing subscription runtime properties.

SubscriptionRuntimePropertiesItem

type SubscriptionRuntimePropertiesItem struct {
    TopicName string
    SubscriptionName string
    SubscriptionRuntimeProperties
}

A single subscription runtime properties item in the list response.

Usage Patterns

Create Session-Enabled Subscription

resp, err := adminClient.CreateSubscription(
    context.Background(),
    "orders",
    "order-processor",
    &admin.CreateSubscriptionOptions{
        Properties: &admin.SubscriptionProperties{
            RequiresSession:          to.Ptr(true),
            LockDuration:             to.Ptr("PT5M"),
            MaxDeliveryCount:         to.Ptr(int32(3)),
            DefaultMessageTimeToLive: to.Ptr("P1D"),
        },
    },
)

Create Subscription with Forwarding

// Create subscription that forwards to a queue
resp, err := adminClient.CreateSubscription(
    context.Background(),
    "events",
    "archive-subscription",
    &admin.CreateSubscriptionOptions{
        Properties: &admin.SubscriptionProperties{
            ForwardTo:                        to.Ptr("archive-queue"),
            ForwardDeadLetteredMessagesTo:    to.Ptr("failed-archive-queue"),
            DeadLetteringOnMessageExpiration: to.Ptr(true),
        },
    },
)

Create Subscription with Default Filter

// Create subscription with a SQL filter
resp, err := adminClient.CreateSubscription(
    context.Background(),
    "events",
    "high-priority-events",
    &admin.CreateSubscriptionOptions{
        Properties: &admin.SubscriptionProperties{
            DefaultRule: &admin.RuleProperties{
                Name: "$Default",
                Filter: &admin.SQLFilter{
                    Expression: "priority > 5",
                },
            },
            EnableDeadLetteringOnFilterEvaluationExceptions: to.Ptr(true),
        },
    },
)

Monitor Subscription Health

func monitorSubscription(adminClient *admin.Client, topicName, subscriptionName string) {
    ticker := time.NewTicker(time.Minute)
    defer ticker.Stop()

    for range ticker.C {
        resp, err := adminClient.GetSubscriptionRuntimeProperties(
            context.Background(),
            topicName,
            subscriptionName,
            nil,
        )
        if err != nil {
            log.Printf("Error getting subscription properties: %v", err)
            continue
        }

        // Alert on dead letter messages
        if resp.DeadLetterMessageCount > 100 {
            log.Printf("WARNING: %d messages in dead letter queue", resp.DeadLetterMessageCount)
        }

        // Alert on message backlog
        if resp.ActiveMessageCount > 10000 {
            log.Printf("High message count: %d active messages", resp.ActiveMessageCount)
        }

        // Check for processing
        if resp.ActiveMessageCount > 0 && time.Since(resp.AccessedAt) > time.Minute*5 {
            log.Printf("WARNING: Messages not being processed (last access: %s)", resp.AccessedAt)
        }
    }
}

Create Complete Pub-Sub System

func createPubSubSystem(adminClient *admin.Client) error {
    // Create topic
    _, err := adminClient.CreateTopic(context.Background(), "events", &admin.CreateTopicOptions{
        Properties: &admin.TopicProperties{
            EnablePartitioning: to.Ptr(true),
            SupportOrdering:    to.Ptr(true),
        },
    })
    if err != nil {
        return err
    }

    // Create high-priority subscription
    _, err = adminClient.CreateSubscription(
        context.Background(),
        "events",
        "high-priority",
        &admin.CreateSubscriptionOptions{
            Properties: &admin.SubscriptionProperties{
                MaxDeliveryCount:                 to.Ptr(int32(3)),
                DeadLetteringOnMessageExpiration: to.Ptr(true),
                DefaultRule: &admin.RuleProperties{
                    Name: "$Default",
                    Filter: &admin.SQLFilter{
                        Expression: "priority = 'high'",
                    },
                },
            },
        },
    )
    if err != nil {
        return err
    }

    // Create standard subscription
    _, err = adminClient.CreateSubscription(
        context.Background(),
        "events",
        "standard",
        &admin.CreateSubscriptionOptions{
            Properties: &admin.SubscriptionProperties{
                MaxDeliveryCount: to.Ptr(int32(10)),
                DefaultRule: &admin.RuleProperties{
                    Name: "$Default",
                    Filter: &admin.SQLFilter{
                        Expression: "priority = 'standard'",
                    },
                },
            },
        },
    )

    return err
}

Update Subscription Dynamically

func scaleSubscriptionDelivery(adminClient *admin.Client, topicName, subscriptionName string, maxDelivery int32) error {
    // Get current properties
    resp, err := adminClient.GetSubscription(context.Background(), topicName, subscriptionName, nil)
    if err != nil {
        return err
    }

    if resp == nil {
        return fmt.Errorf("subscription not found")
    }

    // Update max delivery count
    props := resp.SubscriptionProperties
    props.MaxDeliveryCount = to.Ptr(maxDelivery)

    _, err = adminClient.UpdateSubscription(context.Background(), topicName, subscriptionName, props, nil)
    return err
}

List Subscriptions with Metrics

func listSubscriptionsWithMetrics(adminClient *admin.Client, topicName string) {
    pager := adminClient.NewListSubscriptionsRuntimePropertiesPager(topicName, nil)

    for pager.More() {
        page, err := pager.NextPage(context.Background())
        if err != nil {
            panic(err)
        }

        for _, sub := range page.SubscriptionRuntimeProperties {
            fmt.Printf("Subscription: %s\n", sub.SubscriptionName)
            fmt.Printf("  Active: %d\n", sub.ActiveMessageCount)
            fmt.Printf("  Dead Letter: %d\n", sub.DeadLetterMessageCount)
            fmt.Printf("  Total: %d\n", sub.TotalMessageCount)
            fmt.Printf("  Last Access: %s\n", sub.AccessedAt)
            fmt.Printf("  Created: %s\n", sub.CreatedAt)
            fmt.Println()
        }
    }
}

Clean Up Idle Subscriptions

func cleanupIdleSubscriptions(adminClient *admin.Client, topicName string, idleThreshold time.Duration) error {
    pager := adminClient.NewListSubscriptionsRuntimePropertiesPager(topicName, nil)

    for pager.More() {
        page, err := pager.NextPage(context.Background())
        if err != nil {
            return err
        }

        for _, sub := range page.SubscriptionRuntimeProperties {
            // Check if subscription is idle
            if time.Since(sub.AccessedAt) > idleThreshold && sub.ActiveMessageCount == 0 {
                log.Printf("Deleting idle subscription: %s (last access: %s)",
                    sub.SubscriptionName, sub.AccessedAt)

                _, err := adminClient.DeleteSubscription(
                    context.Background(),
                    topicName,
                    sub.SubscriptionName,
                    nil,
                )
                if err != nil {
                    log.Printf("Error deleting subscription %s: %v", sub.SubscriptionName, err)
                }
            }
        }
    }

    return nil
}