or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
golangpkg:golang/github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/v2@v2.0.1

docs

checkpoint-store.mdconfiguration.mdconsumer.mdevent-types.mdindex.mdprocessor.mdproducer.md
tile.json

tessl/golang-github-com-azure-azure-sdk-for-go-sdk-messaging-azeventhubs-v2

tessl install tessl/golang-github-com-azure-azure-sdk-for-go-sdk-messaging-azeventhubs-v2@2.0.0

Azure Event Hubs client library for Go providing ProducerClient for sending events, ConsumerClient for manual partition-level consumption, and Processor for automatic load balancing with checkpoint management using Azure Blob Storage.

consumer.mddocs/

ConsumerClient and PartitionClient - Receiving Events

The ConsumerClient and PartitionClient provide manual control over event consumption from specific partitions. For automatic load balancing across multiple consumer instances, see the Processor documentation.

Creating a ConsumerClient

Using Azure Identity

func NewConsumerClient(
    fullyQualifiedNamespace string,
    eventHub string,
    consumerGroup string,
    credential azcore.TokenCredential,
    options *ConsumerClientOptions,
) (*ConsumerClient, error)

Parameters:

  • fullyQualifiedNamespace - The Event Hubs namespace (e.g., "myeventhub.servicebus.windows.net")
  • eventHub - The name of the Event Hub
  • consumerGroup - The consumer group name (use azeventhubs.DefaultConsumerGroup for the default)
  • credential - An azcore.TokenCredential from the azidentity package
  • options - Optional configuration (can be nil)

Example:

import (
    "github.com/Azure/azure-sdk-for-go/sdk/azidentity"
    "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/v2"
)

credential, err := azidentity.NewDefaultAzureCredential(nil)
if err != nil {
    // handle error
}

consumerClient, err := azeventhubs.NewConsumerClient(
    "namespace.servicebus.windows.net",
    "eventhub-name",
    azeventhubs.DefaultConsumerGroup,
    credential,
    nil,
)
if err != nil {
    // handle error
}
defer consumerClient.Close(context.TODO())

Using Connection String

func NewConsumerClientFromConnectionString(
    connectionString string,
    eventHub string,
    consumerGroup string,
    options *ConsumerClientOptions,
) (*ConsumerClient, error)

Parameters:

  • connectionString - Connection string from the Azure portal
  • eventHub - The name of the Event Hub (must be empty if the connection string contains EntityPath)
  • consumerGroup - The consumer group name
  • options - Optional configuration (can be nil)

Example:

consumerClient, err := azeventhubs.NewConsumerClientFromConnectionString(
    "Endpoint=sb://namespace.servicebus.windows.net/;SharedAccessKeyName=key;SharedAccessKey=secret",
    "eventhub-name",
    azeventhubs.DefaultConsumerGroup,
    nil,
)
if err != nil {
    // handle error
}
defer consumerClient.Close(context.TODO())

ConsumerClientOptions

type ConsumerClientOptions struct {
    // ApplicationID is used as the identifier when setting the User-Agent property.
    ApplicationID string

    // CustomEndpoint is a custom endpoint address that can be used when
    // establishing the connection to the service.
    CustomEndpoint string

    // InstanceID is a unique name used to identify the consumer. This can help with
    // diagnostics as this name will be returned in error messages. By default,
    // an identifier will be automatically generated.
    InstanceID string

    // NewWebSocketConn is a function that can create a net.Conn for use with websockets.
    NewWebSocketConn func(ctx context.Context, args WebSocketConnParams) (net.Conn, error)

    // RetryOptions controls how often operations are retried from this client.
    RetryOptions RetryOptions

    // TLSConfig configures a client with a custom *tls.Config.
    TLSConfig *tls.Config
}

Fields:

  • ApplicationID (string) - Application identifier for User-Agent diagnostics
  • CustomEndpoint (string) - Custom endpoint address for the connection
  • InstanceID (string) - Unique consumer identifier (auto-generated if not provided)
  • NewWebSocketConn (func) - Function to create WebSocket connections
  • RetryOptions (RetryOptions) - Controls retry behavior for operations
  • TLSConfig (*tls.Config) - Custom TLS configuration

DefaultConsumerGroup Constant

const DefaultConsumerGroup = "$Default"

The name of the default consumer group in the Event Hubs service.

ConsumerClient Methods

Get Instance ID

func (cc *ConsumerClient) InstanceID() string

Returns the instance ID for this ConsumerClient (useful for diagnostics).

Get Event Hub Properties

func (cc *ConsumerClient) GetEventHubProperties(
    ctx context.Context,
    options *GetEventHubPropertiesOptions,
) (EventHubProperties, error)

Gets properties of the Event Hub, including available partition IDs.

See Configuration - Event Hub Properties for details.

Get Partition Properties

func (cc *ConsumerClient) GetPartitionProperties(
    ctx context.Context,
    partitionID string,
    options *GetPartitionPropertiesOptions,
) (PartitionProperties, error)

Gets properties for a specific partition.

See Configuration - Partition Properties for details.

Close Consumer Client

func (cc *ConsumerClient) Close(ctx context.Context) error

Releases resources for this client. You MUST call this method to avoid leaking resources.

Creating a PartitionClient

func (cc *ConsumerClient) NewPartitionClient(
    partitionID string,
    options *PartitionClientOptions,
) (*PartitionClient, error)

Creates a PartitionClient for receiving events from a specific partition.

Parameters:

  • partitionID (string) - The partition ID to read from
  • options (*PartitionClientOptions) - Optional configuration (can be nil)

Returns:

  • *PartitionClient - Client for receiving events from the partition
  • error - Error if the client cannot be created

Example:

partitionClient, err := consumerClient.NewPartitionClient("0", nil)
if err != nil {
    // handle error
}
defer partitionClient.Close(context.TODO())

PartitionClientOptions

type PartitionClientOptions struct {
    // StartPosition is the position to start receiving events from,
    // either an offset (inclusive) with Offset, or receiving events received
    // after a specific time using EnqueuedTime.
    StartPosition StartPosition

    // OwnerLevel is the priority for this partition client, also known as the 'epoch' level.
    // When used, a partition client with a higher OwnerLevel will take ownership of a partition
    // from partition clients with a lower OwnerLevel.
    // Default is off.
    OwnerLevel *int64

    // Prefetch represents the size of the internal prefetch buffer. When set,
    // this client will attempt to always maintain an internal cache of events of
    // this size, asynchronously, increasing the odds that ReceiveEvents() will use
    // a locally stored cache of events, rather than having to wait for events to
    // arrive from the network.
    //
    // Defaults to 300 events if Prefetch == 0.
    // Disabled if Prefetch < 0.
    Prefetch int32
}

Fields:

  • StartPosition (StartPosition) - Where to start receiving events from (default: Latest)
  • OwnerLevel (*int64) - Priority level for partition ownership (higher values take ownership)
  • Prefetch (int32) - Size of internal prefetch buffer (default: 300, disabled if < 0)

StartPosition

type StartPosition struct {
    // Offset will start the consumer after the specified offset. Can be exclusive
    // or inclusive, based on the Inclusive property.
    // NOTE: offsets are not stable values, and might refer to different events over time
    // as the Event Hub events reach their age limit and are discarded.
    Offset *string

    // SequenceNumber will start the consumer after the specified sequence number. Can be exclusive
    // or inclusive, based on the Inclusive property.
    SequenceNumber *int64

    // EnqueuedTime will start the consumer before events that were enqueued on or after EnqueuedTime.
    // Can be exclusive or inclusive, based on the Inclusive property.
    EnqueuedTime *time.Time

    // Inclusive configures whether the events directly at Offset, SequenceNumber or EnqueuedTime will be included (true)
    // or excluded (false).
    Inclusive bool

    // Earliest will start the consumer at the earliest event.
    Earliest *bool

    // Latest will start the consumer after the last event.
    Latest *bool
}

Fields:

  • Offset (*string) - Start after the specified offset (not stable across time)
  • SequenceNumber (*int64) - Start after the specified sequence number
  • EnqueuedTime (*time.Time) - Start before events enqueued on or after this time
  • Inclusive (bool) - Whether to include (true) or exclude (false) the event at the position
  • Earliest (*bool) - Start at the earliest available event
  • Latest (*bool) - Start after the latest event (default)

Note: Only one position field (Offset, SequenceNumber, EnqueuedTime, Earliest, or Latest) can be set.

Examples:

import "github.com/Azure/azure-sdk-for-go/sdk/azcore/to"

// Start from the beginning
partitionClient, err := consumerClient.NewPartitionClient("0", &azeventhubs.PartitionClientOptions{
    StartPosition: azeventhubs.StartPosition{
        Earliest: to.Ptr(true),
    },
})

// Start from latest (default behavior)
partitionClient, err := consumerClient.NewPartitionClient("0", &azeventhubs.PartitionClientOptions{
    StartPosition: azeventhubs.StartPosition{
        Latest: to.Ptr(true),
    },
})

// Start from specific offset (inclusive)
partitionClient, err := consumerClient.NewPartitionClient("0", &azeventhubs.PartitionClientOptions{
    StartPosition: azeventhubs.StartPosition{
        Offset:    to.Ptr("12345"),
        Inclusive: true,
    },
})

// Start from specific sequence number (exclusive)
partitionClient, err := consumerClient.NewPartitionClient("0", &azeventhubs.PartitionClientOptions{
    StartPosition: azeventhubs.StartPosition{
        SequenceNumber: to.Ptr(int64(1000)),
        Inclusive:      false,
    },
})

// Start from events enqueued after a specific time
partitionClient, err := consumerClient.NewPartitionClient("0", &azeventhubs.PartitionClientOptions{
    StartPosition: azeventhubs.StartPosition{
        EnqueuedTime: to.Ptr(time.Now().Add(-1 * time.Hour)),
        Inclusive:    true,
    },
})

// With prefetch enabled (300 events)
partitionClient, err := consumerClient.NewPartitionClient("0", &azeventhubs.PartitionClientOptions{
    Prefetch: 300,
})

// With prefetch disabled (manual credit management)
partitionClient, err := consumerClient.NewPartitionClient("0", &azeventhubs.PartitionClientOptions{
    Prefetch: -1,
})

PartitionClient

The PartitionClient receives events from a specific partition.

Receiving Events

func (pc *PartitionClient) ReceiveEvents(
    ctx context.Context,
    count int,
    options *ReceiveEventsOptions,
) ([]*ReceivedEventData, error)

Receives events until count events have been received or the context has expired or been cancelled.

Parameters:

  • ctx (context.Context) - Context for cancellation and timeouts
  • count (int) - Maximum number of events to receive
  • options (*ReceiveEventsOptions) - Optional parameters (currently empty, for future expansion)

Returns:

  • []*ReceivedEventData - Slice of received events
  • error - Error if receiving fails

Troubleshooting:

If ReceiveEvents appears stuck:

  1. StartPosition defaults to Latest - The connection is lazily initialized, so it's possible the link was initialized after events were sent. Use an explicit start position (sequence number, offset, or timestamp).

  2. Events sent to different partition - By default, batches created without a partition ID are distributed by Event Hubs. Specify PartitionID in EventDataBatchOptions or open multiple PartitionClient instances.

  3. Network issues - Check log messages using the logging instructions in the Configuration section.

Example:

partitionClient, err := consumerClient.NewPartitionClient("0", &azeventhubs.PartitionClientOptions{
    StartPosition: azeventhubs.StartPosition{
        Earliest: to.Ptr(true),
    },
})
if err != nil {
    // handle error
}
defer partitionClient.Close(context.TODO())

events, err := partitionClient.ReceiveEvents(context.TODO(), 100, nil)
if err != nil {
    // handle error
}

for _, event := range events {
    fmt.Printf("Event: %s\n", string(event.Body))
}

Close Partition Client

func (pc *PartitionClient) Close(ctx context.Context) error

Releases resources for this partition client. You MUST call this method to avoid leaking resources.

ReceivedEventData

type ReceivedEventData struct {
    EventData

    // EnqueuedTime is the UTC time when the message was accepted and stored by Event Hubs.
    EnqueuedTime *time.Time

    // PartitionKey is used with a partitioned entity and enables assigning related messages
    // to the same internal partition. This ensures that the submission sequence order is correctly
    // recorded. The partition is chosen by a hash function in Event Hubs and cannot be chosen
    // directly.
    PartitionKey *string

    // Offset is the offset of the event.
    Offset string

    // RawAMQPMessage is the AMQP message, as received by the client. This can be useful to get access
    // to properties that are not exposed by ReceivedEventData such as payloads encoded into the
    // Value or Sequence section, payloads sent as multiple Data sections, as well as Footer
    // and Header fields.
    RawAMQPMessage *AMQPAnnotatedMessage

    // SequenceNumber is a unique number assigned to a message by Event Hubs.
    SequenceNumber int64

    // Properties set by the Event Hubs service.
    SystemProperties map[string]any
}

ReceivedEventData embeds EventData and adds Event Hubs-specific fields.

EventData Fields (Inherited):

  • Properties (map[string]any) - Custom application properties
  • Body ([]byte) - The message payload
  • ContentType (*string) - Content type descriptor
  • CorrelationID (any) - Client-specific identifier for correlation
  • MessageID (*string) - Unique identifier for the message

Additional Fields:

  • EnqueuedTime (*time.Time) - When Event Hubs received and stored the event
  • PartitionKey (*string) - The partition key used for routing (if any)
  • Offset (string) - The offset of the event in the partition
  • RawAMQPMessage (*AMQPAnnotatedMessage) - Full AMQP message for low-level access
  • SequenceNumber (int64) - Unique sequence number assigned by Event Hubs
  • SystemProperties (map[string]any) - Event Hubs system properties

Example:

events, err := partitionClient.ReceiveEvents(context.TODO(), 10, nil)
if err != nil {
    // handle error
}

for _, event := range events {
    fmt.Printf("Sequence Number: %d\n", event.SequenceNumber)
    fmt.Printf("Offset: %s\n", event.Offset)
    fmt.Printf("Enqueued Time: %s\n", event.EnqueuedTime)
    fmt.Printf("Body: %s\n", string(event.Body))

    if event.PartitionKey != nil {
        fmt.Printf("Partition Key: %s\n", *event.PartitionKey)
    }

    if event.ContentType != nil {
        fmt.Printf("Content Type: %s\n", *event.ContentType)
    }

    for key, value := range event.Properties {
        fmt.Printf("Property %s: %v\n", key, value)
    }
}

Complete Example: Reading from All Partitions

package main

import (
    "context"
    "fmt"
    "log"
    "sync"

    "github.com/Azure/azure-sdk-for-go/sdk/azidentity"
    "github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
    "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/v2"
)

func main() {
    credential, err := azidentity.NewDefaultAzureCredential(nil)
    if err != nil {
        log.Fatal(err)
    }

    consumerClient, err := azeventhubs.NewConsumerClient(
        "namespace.servicebus.windows.net",
        "eventhub-name",
        azeventhubs.DefaultConsumerGroup,
        credential,
        nil,
    )
    if err != nil {
        log.Fatal(err)
    }
    defer consumerClient.Close(context.TODO())

    // Get list of partitions
    props, err := consumerClient.GetEventHubProperties(context.TODO(), nil)
    if err != nil {
        log.Fatal(err)
    }

    var wg sync.WaitGroup

    // Create a partition client for each partition
    for _, partitionID := range props.PartitionIDs {
        wg.Add(1)

        go func(partID string) {
            defer wg.Done()

            partitionClient, err := consumerClient.NewPartitionClient(partID, &azeventhubs.PartitionClientOptions{
                StartPosition: azeventhubs.StartPosition{
                    Earliest: to.Ptr(true),
                },
            })
            if err != nil {
                log.Printf("Error creating partition client for %s: %v", partID, err)
                return
            }
            defer partitionClient.Close(context.TODO())

            // Continuously receive events
            for {
                events, err := partitionClient.ReceiveEvents(context.TODO(), 100, nil)
                if err != nil {
                    log.Printf("Error receiving from partition %s: %v", partID, err)
                    return
                }

                for _, event := range events {
                    fmt.Printf("[Partition %s] Event: %s\n", partID, string(event.Body))
                }

                if len(events) == 0 {
                    // No more events, exit
                    break
                }
            }
        }(partitionID)
    }

    wg.Wait()
    fmt.Println("Finished reading events")
}

Complete Example: Reading with Owner Level

package main

import (
    "context"
    "fmt"
    "log"

    "github.com/Azure/azure-sdk-for-go/sdk/azidentity"
    "github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
    "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/v2"
)

func main() {
    credential, err := azidentity.NewDefaultAzureCredential(nil)
    if err != nil {
        log.Fatal(err)
    }

    consumerClient, err := azeventhubs.NewConsumerClient(
        "namespace.servicebus.windows.net",
        "eventhub-name",
        azeventhubs.DefaultConsumerGroup,
        credential,
        nil,
    )
    if err != nil {
        log.Fatal(err)
    }
    defer consumerClient.Close(context.TODO())

    // Create partition client with owner level
    // This will take ownership from clients with lower or equal owner levels
    partitionClient, err := consumerClient.NewPartitionClient("0", &azeventhubs.PartitionClientOptions{
        StartPosition: azeventhubs.StartPosition{
            Latest: to.Ptr(true),
        },
        OwnerLevel: to.Ptr(int64(10)), // Higher owner level
    })
    if err != nil {
        log.Fatal(err)
    }
    defer partitionClient.Close(context.TODO())

    for {
        events, err := partitionClient.ReceiveEvents(context.TODO(), 100, nil)
        if err != nil {
            var ehErr *azeventhubs.Error
            if errors.As(err, &ehErr) && ehErr.Code == azeventhubs.ErrorCodeOwnershipLost {
                log.Println("Ownership lost to a client with higher owner level")
                return
            }
            log.Fatal(err)
        }

        for _, event := range events {
            fmt.Printf("Event: %s\n", string(event.Body))
        }
    }
}

See Also