or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
golangpkg:golang/github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/v2@v2.0.1

docs

checkpoint-store.mdconfiguration.mdconsumer.mdevent-types.mdindex.mdprocessor.mdproducer.md
tile.json

tessl/golang-github-com-azure-azure-sdk-for-go-sdk-messaging-azeventhubs-v2

tessl install tessl/golang-github-com-azure-azure-sdk-for-go-sdk-messaging-azeventhubs-v2@2.0.0

Azure Event Hubs client library for Go providing ProducerClient for sending events, ConsumerClient for manual partition-level consumption, and Processor for automatic load balancing with checkpoint management using Azure Blob Storage.

configuration.mddocs/

Configuration and Utilities

This document covers connection string parsing, retry options, error handling, logging, and other configuration utilities.

Connection String Parsing

ParseConnectionString Function

func ParseConnectionString(connStr string) (ConnectionStringProperties, error)

Parses a connection string from the Azure portal and returns the parsed representation.

Supported Formats:

  1. Connection string with embedded key and keyname:
Endpoint=sb://namespace.servicebus.windows.net/;SharedAccessKeyName=keyname;SharedAccessKey=key
  1. Connection string with embedded SharedAccessSignature:
Endpoint=sb://namespace.servicebus.windows.net/;SharedAccessSignature=SharedAccessSignature sr=...
  1. Connection string with EntityPath:
Endpoint=sb://namespace.servicebus.windows.net/;SharedAccessKeyName=keyname;SharedAccessKey=key;EntityPath=eventhub-name
  1. Emulator connection string:
Endpoint=sb://localhost:6765;SharedAccessKeyName=key;SharedAccessKey=key;UseDevelopmentEmulator=true

Parameters:

  • connStr (string) - The connection string to parse

Returns:

  • ConnectionStringProperties - Parsed connection string properties
  • error - Error if parsing fails

Example:

import "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/v2"

connStr := "Endpoint=sb://namespace.servicebus.windows.net/;SharedAccessKeyName=mykey;SharedAccessKey=secret;EntityPath=my-eventhub"

props, err := azeventhubs.ParseConnectionString(connStr)
if err != nil {
    // handle error
}

fmt.Printf("Namespace: %s\n", props.FullyQualifiedNamespace)
fmt.Printf("Endpoint: %s\n", props.Endpoint)
if props.EntityPath != nil {
    fmt.Printf("Entity Path: %s\n", *props.EntityPath)
}
if props.SharedAccessKeyName != nil {
    fmt.Printf("Key Name: %s\n", *props.SharedAccessKeyName)
}

ConnectionStringProperties

type ConnectionStringProperties struct {
    // Endpoint is the Endpoint value in the connection string.
    // Ex: sb://example.servicebus.windows.net
    Endpoint string

    // EntityPath is EntityPath value in the connection string.
    EntityPath *string

    // FullyQualifiedNamespace is the Endpoint value without the protocol scheme.
    // Ex: example.servicebus.windows.net
    FullyQualifiedNamespace string

    // SharedAccessKey is the SharedAccessKey value in the connection string.
    SharedAccessKey *string

    // SharedAccessKeyName is the SharedAccessKeyName value in the connection string.
    SharedAccessKeyName *string

    // SharedAccessSignature is the SharedAccessSignature value in the connection string.
    SharedAccessSignature *string

    // Emulator indicates that the connection string is for an emulator:
    // ex: Endpoint=localhost:6765;SharedAccessKeyName=...;SharedAccessKey=...;UseDevelopmentEmulator=true
    Emulator bool
}

Fields:

  • Endpoint (string) - Full endpoint URL (e.g., "sb://example.servicebus.windows.net")
  • EntityPath (*string) - Entity path from the connection string (Event Hub name)
  • FullyQualifiedNamespace (string) - Namespace without protocol (e.g., "example.servicebus.windows.net")
  • SharedAccessKey (*string) - Shared access key for authentication
  • SharedAccessKeyName (*string) - Shared access key name
  • SharedAccessSignature (*string) - Shared access signature (alternative to key/keyname)
  • Emulator (bool) - Whether this is an emulator connection string

Retry Options

type RetryOptions struct {
    // MaxRetries specifies the maximum number of attempts a failed operation will be retried
    // before producing an error.
    // The default value is three. A value less than zero means one try and no retries.
    MaxRetries int32

    // RetryDelay specifies the initial amount of delay to use before retrying an operation.
    // The delay increases exponentially with each retry up to the maximum specified by MaxRetryDelay.
    // The default value is four seconds. A value less than zero means no delay between retries.
    RetryDelay time.Duration

    // MaxRetryDelay specifies the maximum delay allowed before retrying an operation.
    // Typically the value is greater than or equal to the value specified in RetryDelay.
    // The default Value is 120 seconds. A value less than zero means there is no cap.
    MaxRetryDelay time.Duration
}

Fields:

  • MaxRetries (int32) - Maximum retry attempts (default: 3, < 0 means no retries)
  • RetryDelay (time.Duration) - Initial delay before first retry (default: 4 seconds, < 0 means no delay)
  • MaxRetryDelay (time.Duration) - Maximum delay cap for exponential backoff (default: 120 seconds, < 0 means no cap)

Retry Behavior:

  • Delays increase exponentially: RetryDelay, RetryDelay×2, RetryDelay×4, etc.
  • Delays are capped at MaxRetryDelay
  • Jitter is applied to prevent thundering herd

Example:

import "time"

// Custom retry options
retryOptions := azeventhubs.RetryOptions{
    MaxRetries:    5,
    RetryDelay:    2 * time.Second,
    MaxRetryDelay: 60 * time.Second,
}

// Use with ProducerClient
producerClient, err := azeventhubs.NewProducerClient(
    "namespace.servicebus.windows.net",
    "eventhub-name",
    credential,
    &azeventhubs.ProducerClientOptions{
        RetryOptions: retryOptions,
    },
)

// Use with ConsumerClient
consumerClient, err := azeventhubs.NewConsumerClient(
    "namespace.servicebus.windows.net",
    "eventhub-name",
    azeventhubs.DefaultConsumerGroup,
    credential,
    &azeventhubs.ConsumerClientOptions{
        RetryOptions: retryOptions,
    },
)

WebSocket Configuration

type WebSocketConnParams struct {
    // Host is the the `wss://<host>` to connect to
    Host string
}

WebSocket support allows Event Hubs connections through proxies and firewalls that block AMQP ports.

Fields:

  • Host (string) - The WebSocket host (wss://<host>)

Example:

import (
    "context"
    "net"
    "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/v2"
    "nhooyr.io/websocket"
)

// WebSocket connection factory
newWebSocketConn := func(ctx context.Context, params azeventhubs.WebSocketConnParams) (net.Conn, error) {
    // Using nhooyr.io/websocket library
    opts := &websocket.DialOptions{
        Subprotocols: []string{"amqp"},
    }

    wssConn, _, err := websocket.Dial(ctx, "wss://"+params.Host, opts)
    if err != nil {
        return nil, err
    }

    return websocket.NetConn(ctx, wssConn, websocket.MessageBinary), nil
}

// Use with ProducerClient
producerClient, err := azeventhubs.NewProducerClient(
    "namespace.servicebus.windows.net",
    "eventhub-name",
    credential,
    &azeventhubs.ProducerClientOptions{
        NewWebSocketConn: newWebSocketConn,
    },
)

// Use with ConsumerClient
consumerClient, err := azeventhubs.NewConsumerClient(
    "namespace.servicebus.windows.net",
    "eventhub-name",
    azeventhubs.DefaultConsumerGroup,
    credential,
    &azeventhubs.ConsumerClientOptions{
        NewWebSocketConn: newWebSocketConn,
    },
)

Error Handling

Error Type

type Error struct {
    // Code is a stable error code which can be used as part of programatic error handling.
    // The codes can expand in the future, but the values (and their meaning) will remain the same.
    Code ErrorCode
}

func (e *Error) Error() string

Event Hubs-specific error with actionable error codes.

Note: The Code field is part of the stable API, but the error message from Error() and the underlying wrapped error are subject to change.

ErrorCode Type

type ErrorCode string

String-based error code for programmatic error handling.

Error Code Constants

const (
    // ErrorCodeUnauthorizedAccess means the credentials provided are not valid for use with
    // a particular entity, or have expired.
    ErrorCodeUnauthorizedAccess ErrorCode = "unauthorized"

    // ErrorCodeConnectionLost means our connection was lost and all retry attempts failed.
    // This typically reflects an extended outage or connection disruption and may
    // require manual intervention.
    ErrorCodeConnectionLost ErrorCode = "connlost"

    // ErrorCodeOwnershipLost means that a partition that you were reading from was opened
    // by another link with a higher epoch/owner level.
    ErrorCodeOwnershipLost ErrorCode = "ownershiplost"
)

Error Codes:

  • ErrorCodeUnauthorizedAccess - Invalid or expired credentials
  • ErrorCodeConnectionLost - Connection lost after all retries (extended outage)
  • ErrorCodeOwnershipLost - Partition claimed by another consumer with higher epoch

Example:

import (
    "errors"
    "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/v2"
)

events, err := partitionClient.ReceiveEvents(context.TODO(), 100, nil)
if err != nil {
    var ehErr *azeventhubs.Error
    if errors.As(err, &ehErr) {
        switch ehErr.Code {
        case azeventhubs.ErrorCodeUnauthorizedAccess:
            // Credentials are invalid or expired
            // Re-authenticate or check permissions
            log.Println("Authentication failed, refreshing credentials...")

        case azeventhubs.ErrorCodeConnectionLost:
            // Connection lost after all retries
            // May need manual intervention or service investigation
            log.Println("Connection lost, check service health...")

        case azeventhubs.ErrorCodeOwnershipLost:
            // Another consumer took ownership
            // Normal for Processor rebalancing
            log.Println("Ownership lost, partition reassigned")
            return

        default:
            // Other Event Hubs error
            log.Printf("Event Hubs error: %s", ehErr.Code)
        }
    } else {
        // Not an Event Hubs-specific error
        log.Printf("Other error: %v", err)
    }
}

Logging

Log Event Constants

const (
    // EventConn is used whenever we create a connection or any links (ie: producers, consumers).
    EventConn log.Event = "azeh.Conn"

    // EventAuth is used when we're doing authentication/claims negotiation.
    EventAuth log.Event = "azeh.Auth"

    // EventProducer represents operations that happen on Producers.
    EventProducer log.Event = "azeh.Producer"

    // EventConsumer represents operations that happen on Consumers.
    EventConsumer log.Event = "azeh.Consumer"
)

Log Events:

  • EventConn - Connection and link creation events
  • EventAuth - Authentication and claims negotiation events
  • EventProducer - Producer operations (sending, batching)
  • EventConsumer - Consumer operations (receiving, checkpointing)

Enabling Logging

Environment Variable (All SDK Modules):

export AZURE_SDK_GO_LOGGING=all

Programmatic (azeventhubs Only):

import (
    "fmt"
    azlog "github.com/Azure/azure-sdk-for-go/sdk/azcore/log"
    "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/v2"
)

// Set up listener to print log output to stdout
azlog.SetListener(func(event azlog.Event, s string) {
    fmt.Printf("[%s] %s\n", event, s)
})

// Pick which events to log
azlog.SetEvents(
    azeventhubs.EventConn,
    azeventhubs.EventAuth,
    azeventhubs.EventProducer,
    azeventhubs.EventConsumer,
)

Example with Structured Logging:

import (
    "log/slog"
    azlog "github.com/Azure/azure-sdk-for-go/sdk/azcore/log"
    "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/v2"
)

logger := slog.Default()

azlog.SetListener(func(event azlog.Event, message string) {
    logger.Info(message,
        slog.String("event", string(event)),
        slog.String("component", "eventhubs"),
    )
})

azlog.SetEvents(
    azeventhubs.EventConn,
    azeventhubs.EventAuth,
    azeventhubs.EventProducer,
    azeventhubs.EventConsumer,
)

Event Hub Properties

EventHubProperties

type EventHubProperties struct {
    // CreatedOn is the time when the event hub was created.
    CreatedOn time.Time

    // Name of the event hub
    Name string

    // PartitionIDs for the event hub
    PartitionIDs []string

    // GeoReplicationEnabled is true if the event hub has geo-replication enabled.
    GeoReplicationEnabled bool
}

Fields:

  • CreatedOn (time.Time) - When the Event Hub was created
  • Name (string) - Name of the Event Hub
  • PartitionIDs ([]string) - List of partition IDs (e.g., ["0", "1", "2", "3"])
  • GeoReplicationEnabled (bool) - Whether geo-replication is enabled

Retrieved via:

  • ProducerClient.GetEventHubProperties()
  • ConsumerClient.GetEventHubProperties()

Example:

props, err := consumerClient.GetEventHubProperties(context.TODO(), nil)
if err != nil {
    // handle error
}

fmt.Printf("Event Hub: %s\n", props.Name)
fmt.Printf("Created: %s\n", props.CreatedOn.Format(time.RFC3339))
fmt.Printf("Partitions: %v\n", props.PartitionIDs)
fmt.Printf("Geo-replication: %v\n", props.GeoReplicationEnabled)

// Iterate over all partitions
for _, partitionID := range props.PartitionIDs {
    partitionClient, err := consumerClient.NewPartitionClient(partitionID, nil)
    if err != nil {
        // handle error
    }
    defer partitionClient.Close(context.TODO())
    // ... use partition client
}

GetEventHubPropertiesOptions

type GetEventHubPropertiesOptions struct {
    // For future expansion
}

Currently empty, reserved for future options.

Partition Properties

PartitionProperties

type PartitionProperties struct {
    // BeginningSequenceNumber is the first sequence number for a partition.
    BeginningSequenceNumber int64

    // EventHubName is the name of the Event Hub for this partition.
    EventHubName string

    // IsEmpty is true if the partition is empty, false otherwise.
    IsEmpty bool

    // LastEnqueuedOffset is the offset of latest enqueued event.
    LastEnqueuedOffset string

    // LastEnqueuedOn is the date of latest enqueued event.
    LastEnqueuedOn time.Time

    // LastEnqueuedSequenceNumber is the sequence number of the latest enqueued event.
    LastEnqueuedSequenceNumber int64

    // PartitionID is the partition ID of this partition.
    PartitionID string
}

Fields:

  • BeginningSequenceNumber (int64) - First available sequence number in the partition
  • EventHubName (string) - Name of the Event Hub
  • IsEmpty (bool) - Whether the partition is empty
  • LastEnqueuedOffset (string) - Offset of the latest enqueued event
  • LastEnqueuedOn (time.Time) - When the latest event was enqueued
  • LastEnqueuedSequenceNumber (int64) - Sequence number of the latest enqueued event
  • PartitionID (string) - The partition ID

Retrieved via:

  • ProducerClient.GetPartitionProperties()
  • ConsumerClient.GetPartitionProperties()

Example:

partProps, err := consumerClient.GetPartitionProperties(context.TODO(), "0", nil)
if err != nil {
    // handle error
}

fmt.Printf("Partition: %s\n", partProps.PartitionID)
fmt.Printf("Event Hub: %s\n", partProps.EventHubName)
fmt.Printf("Is Empty: %v\n", partProps.IsEmpty)
fmt.Printf("Beginning Sequence: %d\n", partProps.BeginningSequenceNumber)
fmt.Printf("Last Sequence: %d\n", partProps.LastEnqueuedSequenceNumber)
fmt.Printf("Last Offset: %s\n", partProps.LastEnqueuedOffset)
fmt.Printf("Last Enqueued: %s\n", partProps.LastEnqueuedOn.Format(time.RFC3339))

// Calculate lag
if !partProps.IsEmpty {
    lag := partProps.LastEnqueuedSequenceNumber - partProps.BeginningSequenceNumber
    fmt.Printf("Total events in partition: %d\n", lag+1)
}

GetPartitionPropertiesOptions

type GetPartitionPropertiesOptions struct {
    // For future expansion
}

Currently empty, reserved for future options.

Complete Configuration Example

package main

import (
    "context"
    "fmt"
    "log"
    "time"

    "github.com/Azure/azure-sdk-for-go/sdk/azidentity"
    azlog "github.com/Azure/azure-sdk-for-go/sdk/azcore/log"
    "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/v2"
)

func main() {
    // Enable logging
    azlog.SetListener(func(event azlog.Event, s string) {
        fmt.Printf("[%s] %s\n", event, s)
    })
    azlog.SetEvents(
        azeventhubs.EventConn,
        azeventhubs.EventAuth,
        azeventhubs.EventProducer,
        azeventhubs.EventConsumer,
    )

    // Parse connection string
    connStr := "Endpoint=sb://namespace.servicebus.windows.net/;SharedAccessKeyName=key;SharedAccessKey=secret"
    props, err := azeventhubs.ParseConnectionString(connStr)
    if err != nil {
        log.Fatal(err)
    }
    fmt.Printf("Namespace: %s\n", props.FullyQualifiedNamespace)

    // Create credential
    credential, err := azidentity.NewDefaultAzureCredential(nil)
    if err != nil {
        log.Fatal(err)
    }

    // Configure retry options
    retryOptions := azeventhubs.RetryOptions{
        MaxRetries:    5,
        RetryDelay:    2 * time.Second,
        MaxRetryDelay: 60 * time.Second,
    }

    // Create producer with custom configuration
    producerClient, err := azeventhubs.NewProducerClient(
        "namespace.servicebus.windows.net",
        "eventhub-name",
        credential,
        &azeventhubs.ProducerClientOptions{
            ApplicationID: "my-application",
            RetryOptions:  retryOptions,
        },
    )
    if err != nil {
        log.Fatal(err)
    }
    defer producerClient.Close(context.TODO())

    // Get Event Hub properties
    ehProps, err := producerClient.GetEventHubProperties(context.TODO(), nil)
    if err != nil {
        log.Fatal(err)
    }
    fmt.Printf("Event Hub: %s\n", ehProps.Name)
    fmt.Printf("Partitions: %v\n", ehProps.PartitionIDs)
    fmt.Printf("Geo-replication: %v\n", ehProps.GeoReplicationEnabled)

    // Get partition properties
    for _, partitionID := range ehProps.PartitionIDs {
        partProps, err := producerClient.GetPartitionProperties(context.TODO(), partitionID, nil)
        if err != nil {
            log.Printf("Error getting properties for partition %s: %v", partitionID, err)
            continue
        }

        fmt.Printf("\nPartition %s:\n", partProps.PartitionID)
        fmt.Printf("  Empty: %v\n", partProps.IsEmpty)
        if !partProps.IsEmpty {
            fmt.Printf("  Range: %d - %d\n",
                partProps.BeginningSequenceNumber,
                partProps.LastEnqueuedSequenceNumber)
            fmt.Printf("  Last Event: %s\n",
                partProps.LastEnqueuedOn.Format(time.RFC3339))
        }
    }
}

See Also

  • Producer Client
  • Consumer Client
  • Processor
  • Error Handling Best Practices