tessl install tessl/golang-github-com-azure-azure-sdk-for-go-sdk-messaging-azeventhubs-v2@2.0.0Azure Event Hubs client library for Go providing ProducerClient for sending events, ConsumerClient for manual partition-level consumption, and Processor for automatic load balancing with checkpoint management using Azure Blob Storage.
The ConsumerClient and PartitionClient provide manual control over event consumption from specific partitions. For automatic load balancing across multiple consumer instances, see the Processor documentation.
func NewConsumerClient(
fullyQualifiedNamespace string,
eventHub string,
consumerGroup string,
credential azcore.TokenCredential,
options *ConsumerClientOptions,
) (*ConsumerClient, error)Parameters:
fullyQualifiedNamespace - The Event Hubs namespace (e.g., "myeventhub.servicebus.windows.net")eventHub - The name of the Event HubconsumerGroup - The consumer group name (use azeventhubs.DefaultConsumerGroup for the default)credential - An azcore.TokenCredential from the azidentity packageoptions - Optional configuration (can be nil)Example:
import (
"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/v2"
)
credential, err := azidentity.NewDefaultAzureCredential(nil)
if err != nil {
// handle error
}
consumerClient, err := azeventhubs.NewConsumerClient(
"namespace.servicebus.windows.net",
"eventhub-name",
azeventhubs.DefaultConsumerGroup,
credential,
nil,
)
if err != nil {
// handle error
}
defer consumerClient.Close(context.TODO())func NewConsumerClientFromConnectionString(
connectionString string,
eventHub string,
consumerGroup string,
options *ConsumerClientOptions,
) (*ConsumerClient, error)Parameters:
connectionString - Connection string from the Azure portaleventHub - The name of the Event Hub (must be empty if the connection string contains EntityPath)consumerGroup - The consumer group nameoptions - Optional configuration (can be nil)Example:
consumerClient, err := azeventhubs.NewConsumerClientFromConnectionString(
"Endpoint=sb://namespace.servicebus.windows.net/;SharedAccessKeyName=key;SharedAccessKey=secret",
"eventhub-name",
azeventhubs.DefaultConsumerGroup,
nil,
)
if err != nil {
// handle error
}
defer consumerClient.Close(context.TODO())type ConsumerClientOptions struct {
// ApplicationID is used as the identifier when setting the User-Agent property.
ApplicationID string
// CustomEndpoint is a custom endpoint address that can be used when
// establishing the connection to the service.
CustomEndpoint string
// InstanceID is a unique name used to identify the consumer. This can help with
// diagnostics as this name will be returned in error messages. By default,
// an identifier will be automatically generated.
InstanceID string
// NewWebSocketConn is a function that can create a net.Conn for use with websockets.
NewWebSocketConn func(ctx context.Context, args WebSocketConnParams) (net.Conn, error)
// RetryOptions controls how often operations are retried from this client.
RetryOptions RetryOptions
// TLSConfig configures a client with a custom *tls.Config.
TLSConfig *tls.Config
}Fields:
ApplicationID (string) - Application identifier for User-Agent diagnosticsCustomEndpoint (string) - Custom endpoint address for the connectionInstanceID (string) - Unique consumer identifier (auto-generated if not provided)NewWebSocketConn (func) - Function to create WebSocket connectionsRetryOptions (RetryOptions) - Controls retry behavior for operationsTLSConfig (*tls.Config) - Custom TLS configurationconst DefaultConsumerGroup = "$Default"The name of the default consumer group in the Event Hubs service.
func (cc *ConsumerClient) InstanceID() stringReturns the instance ID for this ConsumerClient (useful for diagnostics).
func (cc *ConsumerClient) GetEventHubProperties(
ctx context.Context,
options *GetEventHubPropertiesOptions,
) (EventHubProperties, error)Gets properties of the Event Hub, including available partition IDs.
See Configuration - Event Hub Properties for details.
func (cc *ConsumerClient) GetPartitionProperties(
ctx context.Context,
partitionID string,
options *GetPartitionPropertiesOptions,
) (PartitionProperties, error)Gets properties for a specific partition.
See Configuration - Partition Properties for details.
func (cc *ConsumerClient) Close(ctx context.Context) errorReleases resources for this client. You MUST call this method to avoid leaking resources.
func (cc *ConsumerClient) NewPartitionClient(
partitionID string,
options *PartitionClientOptions,
) (*PartitionClient, error)Creates a PartitionClient for receiving events from a specific partition.
Parameters:
partitionID (string) - The partition ID to read fromoptions (*PartitionClientOptions) - Optional configuration (can be nil)Returns:
*PartitionClient - Client for receiving events from the partitionerror - Error if the client cannot be createdExample:
partitionClient, err := consumerClient.NewPartitionClient("0", nil)
if err != nil {
// handle error
}
defer partitionClient.Close(context.TODO())type PartitionClientOptions struct {
// StartPosition is the position to start receiving events from,
// either an offset (inclusive) with Offset, or receiving events received
// after a specific time using EnqueuedTime.
StartPosition StartPosition
// OwnerLevel is the priority for this partition client, also known as the 'epoch' level.
// When used, a partition client with a higher OwnerLevel will take ownership of a partition
// from partition clients with a lower OwnerLevel.
// Default is off.
OwnerLevel *int64
// Prefetch represents the size of the internal prefetch buffer. When set,
// this client will attempt to always maintain an internal cache of events of
// this size, asynchronously, increasing the odds that ReceiveEvents() will use
// a locally stored cache of events, rather than having to wait for events to
// arrive from the network.
//
// Defaults to 300 events if Prefetch == 0.
// Disabled if Prefetch < 0.
Prefetch int32
}Fields:
StartPosition (StartPosition) - Where to start receiving events from (default: Latest)OwnerLevel (*int64) - Priority level for partition ownership (higher values take ownership)Prefetch (int32) - Size of internal prefetch buffer (default: 300, disabled if < 0)type StartPosition struct {
// Offset will start the consumer after the specified offset. Can be exclusive
// or inclusive, based on the Inclusive property.
// NOTE: offsets are not stable values, and might refer to different events over time
// as the Event Hub events reach their age limit and are discarded.
Offset *string
// SequenceNumber will start the consumer after the specified sequence number. Can be exclusive
// or inclusive, based on the Inclusive property.
SequenceNumber *int64
// EnqueuedTime will start the consumer before events that were enqueued on or after EnqueuedTime.
// Can be exclusive or inclusive, based on the Inclusive property.
EnqueuedTime *time.Time
// Inclusive configures whether the events directly at Offset, SequenceNumber or EnqueuedTime will be included (true)
// or excluded (false).
Inclusive bool
// Earliest will start the consumer at the earliest event.
Earliest *bool
// Latest will start the consumer after the last event.
Latest *bool
}Fields:
Offset (*string) - Start after the specified offset (not stable across time)SequenceNumber (*int64) - Start after the specified sequence numberEnqueuedTime (*time.Time) - Start before events enqueued on or after this timeInclusive (bool) - Whether to include (true) or exclude (false) the event at the positionEarliest (*bool) - Start at the earliest available eventLatest (*bool) - Start after the latest event (default)Note: Only one position field (Offset, SequenceNumber, EnqueuedTime, Earliest, or Latest) can be set.
Examples:
import "github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
// Start from the beginning
partitionClient, err := consumerClient.NewPartitionClient("0", &azeventhubs.PartitionClientOptions{
StartPosition: azeventhubs.StartPosition{
Earliest: to.Ptr(true),
},
})
// Start from latest (default behavior)
partitionClient, err := consumerClient.NewPartitionClient("0", &azeventhubs.PartitionClientOptions{
StartPosition: azeventhubs.StartPosition{
Latest: to.Ptr(true),
},
})
// Start from specific offset (inclusive)
partitionClient, err := consumerClient.NewPartitionClient("0", &azeventhubs.PartitionClientOptions{
StartPosition: azeventhubs.StartPosition{
Offset: to.Ptr("12345"),
Inclusive: true,
},
})
// Start from specific sequence number (exclusive)
partitionClient, err := consumerClient.NewPartitionClient("0", &azeventhubs.PartitionClientOptions{
StartPosition: azeventhubs.StartPosition{
SequenceNumber: to.Ptr(int64(1000)),
Inclusive: false,
},
})
// Start from events enqueued after a specific time
partitionClient, err := consumerClient.NewPartitionClient("0", &azeventhubs.PartitionClientOptions{
StartPosition: azeventhubs.StartPosition{
EnqueuedTime: to.Ptr(time.Now().Add(-1 * time.Hour)),
Inclusive: true,
},
})
// With prefetch enabled (300 events)
partitionClient, err := consumerClient.NewPartitionClient("0", &azeventhubs.PartitionClientOptions{
Prefetch: 300,
})
// With prefetch disabled (manual credit management)
partitionClient, err := consumerClient.NewPartitionClient("0", &azeventhubs.PartitionClientOptions{
Prefetch: -1,
})The PartitionClient receives events from a specific partition.
func (pc *PartitionClient) ReceiveEvents(
ctx context.Context,
count int,
options *ReceiveEventsOptions,
) ([]*ReceivedEventData, error)Receives events until count events have been received or the context has expired or been cancelled.
Parameters:
ctx (context.Context) - Context for cancellation and timeoutscount (int) - Maximum number of events to receiveoptions (*ReceiveEventsOptions) - Optional parameters (currently empty, for future expansion)Returns:
[]*ReceivedEventData - Slice of received eventserror - Error if receiving failsTroubleshooting:
If ReceiveEvents appears stuck:
StartPosition defaults to Latest - The connection is lazily initialized, so it's possible the link was initialized after events were sent. Use an explicit start position (sequence number, offset, or timestamp).
Events sent to different partition - By default, batches created without a partition ID are distributed by Event Hubs. Specify PartitionID in EventDataBatchOptions or open multiple PartitionClient instances.
Network issues - Check log messages using the logging instructions in the Configuration section.
Example:
partitionClient, err := consumerClient.NewPartitionClient("0", &azeventhubs.PartitionClientOptions{
StartPosition: azeventhubs.StartPosition{
Earliest: to.Ptr(true),
},
})
if err != nil {
// handle error
}
defer partitionClient.Close(context.TODO())
events, err := partitionClient.ReceiveEvents(context.TODO(), 100, nil)
if err != nil {
// handle error
}
for _, event := range events {
fmt.Printf("Event: %s\n", string(event.Body))
}func (pc *PartitionClient) Close(ctx context.Context) errorReleases resources for this partition client. You MUST call this method to avoid leaking resources.
type ReceivedEventData struct {
EventData
// EnqueuedTime is the UTC time when the message was accepted and stored by Event Hubs.
EnqueuedTime *time.Time
// PartitionKey is used with a partitioned entity and enables assigning related messages
// to the same internal partition. This ensures that the submission sequence order is correctly
// recorded. The partition is chosen by a hash function in Event Hubs and cannot be chosen
// directly.
PartitionKey *string
// Offset is the offset of the event.
Offset string
// RawAMQPMessage is the AMQP message, as received by the client. This can be useful to get access
// to properties that are not exposed by ReceivedEventData such as payloads encoded into the
// Value or Sequence section, payloads sent as multiple Data sections, as well as Footer
// and Header fields.
RawAMQPMessage *AMQPAnnotatedMessage
// SequenceNumber is a unique number assigned to a message by Event Hubs.
SequenceNumber int64
// Properties set by the Event Hubs service.
SystemProperties map[string]any
}ReceivedEventData embeds EventData and adds Event Hubs-specific fields.
EventData Fields (Inherited):
Properties (map[string]any) - Custom application propertiesBody ([]byte) - The message payloadContentType (*string) - Content type descriptorCorrelationID (any) - Client-specific identifier for correlationMessageID (*string) - Unique identifier for the messageAdditional Fields:
EnqueuedTime (*time.Time) - When Event Hubs received and stored the eventPartitionKey (*string) - The partition key used for routing (if any)Offset (string) - The offset of the event in the partitionRawAMQPMessage (*AMQPAnnotatedMessage) - Full AMQP message for low-level accessSequenceNumber (int64) - Unique sequence number assigned by Event HubsSystemProperties (map[string]any) - Event Hubs system propertiesExample:
events, err := partitionClient.ReceiveEvents(context.TODO(), 10, nil)
if err != nil {
// handle error
}
for _, event := range events {
fmt.Printf("Sequence Number: %d\n", event.SequenceNumber)
fmt.Printf("Offset: %s\n", event.Offset)
fmt.Printf("Enqueued Time: %s\n", event.EnqueuedTime)
fmt.Printf("Body: %s\n", string(event.Body))
if event.PartitionKey != nil {
fmt.Printf("Partition Key: %s\n", *event.PartitionKey)
}
if event.ContentType != nil {
fmt.Printf("Content Type: %s\n", *event.ContentType)
}
for key, value := range event.Properties {
fmt.Printf("Property %s: %v\n", key, value)
}
}package main
import (
"context"
"fmt"
"log"
"sync"
"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/v2"
)
func main() {
credential, err := azidentity.NewDefaultAzureCredential(nil)
if err != nil {
log.Fatal(err)
}
consumerClient, err := azeventhubs.NewConsumerClient(
"namespace.servicebus.windows.net",
"eventhub-name",
azeventhubs.DefaultConsumerGroup,
credential,
nil,
)
if err != nil {
log.Fatal(err)
}
defer consumerClient.Close(context.TODO())
// Get list of partitions
props, err := consumerClient.GetEventHubProperties(context.TODO(), nil)
if err != nil {
log.Fatal(err)
}
var wg sync.WaitGroup
// Create a partition client for each partition
for _, partitionID := range props.PartitionIDs {
wg.Add(1)
go func(partID string) {
defer wg.Done()
partitionClient, err := consumerClient.NewPartitionClient(partID, &azeventhubs.PartitionClientOptions{
StartPosition: azeventhubs.StartPosition{
Earliest: to.Ptr(true),
},
})
if err != nil {
log.Printf("Error creating partition client for %s: %v", partID, err)
return
}
defer partitionClient.Close(context.TODO())
// Continuously receive events
for {
events, err := partitionClient.ReceiveEvents(context.TODO(), 100, nil)
if err != nil {
log.Printf("Error receiving from partition %s: %v", partID, err)
return
}
for _, event := range events {
fmt.Printf("[Partition %s] Event: %s\n", partID, string(event.Body))
}
if len(events) == 0 {
// No more events, exit
break
}
}
}(partitionID)
}
wg.Wait()
fmt.Println("Finished reading events")
}package main
import (
"context"
"fmt"
"log"
"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/v2"
)
func main() {
credential, err := azidentity.NewDefaultAzureCredential(nil)
if err != nil {
log.Fatal(err)
}
consumerClient, err := azeventhubs.NewConsumerClient(
"namespace.servicebus.windows.net",
"eventhub-name",
azeventhubs.DefaultConsumerGroup,
credential,
nil,
)
if err != nil {
log.Fatal(err)
}
defer consumerClient.Close(context.TODO())
// Create partition client with owner level
// This will take ownership from clients with lower or equal owner levels
partitionClient, err := consumerClient.NewPartitionClient("0", &azeventhubs.PartitionClientOptions{
StartPosition: azeventhubs.StartPosition{
Latest: to.Ptr(true),
},
OwnerLevel: to.Ptr(int64(10)), // Higher owner level
})
if err != nil {
log.Fatal(err)
}
defer partitionClient.Close(context.TODO())
for {
events, err := partitionClient.ReceiveEvents(context.TODO(), 100, nil)
if err != nil {
var ehErr *azeventhubs.Error
if errors.As(err, &ehErr) && ehErr.Code == azeventhubs.ErrorCodeOwnershipLost {
log.Println("Ownership lost to a client with higher owner level")
return
}
log.Fatal(err)
}
for _, event := range events {
fmt.Printf("Event: %s\n", string(event.Body))
}
}
}