tessl install tessl/golang-github-com-azure-azure-sdk-for-go-sdk-messaging-azeventhubs-v2@2.0.0Azure Event Hubs client library for Go providing ProducerClient for sending events, ConsumerClient for manual partition-level consumption, and Processor for automatic load balancing with checkpoint management using Azure Blob Storage.
This document covers connection string parsing, retry options, error handling, logging, and other configuration utilities.
func ParseConnectionString(connStr string) (ConnectionStringProperties, error)Parses a connection string from the Azure portal and returns the parsed representation.
Supported Formats:
Endpoint=sb://namespace.servicebus.windows.net/;SharedAccessKeyName=keyname;SharedAccessKey=keyEndpoint=sb://namespace.servicebus.windows.net/;SharedAccessSignature=SharedAccessSignature sr=...Endpoint=sb://namespace.servicebus.windows.net/;SharedAccessKeyName=keyname;SharedAccessKey=key;EntityPath=eventhub-nameEndpoint=sb://localhost:6765;SharedAccessKeyName=key;SharedAccessKey=key;UseDevelopmentEmulator=trueParameters:
connStr (string) - The connection string to parseReturns:
ConnectionStringProperties - Parsed connection string propertieserror - Error if parsing failsExample:
import "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/v2"
connStr := "Endpoint=sb://namespace.servicebus.windows.net/;SharedAccessKeyName=mykey;SharedAccessKey=secret;EntityPath=my-eventhub"
props, err := azeventhubs.ParseConnectionString(connStr)
if err != nil {
// handle error
}
fmt.Printf("Namespace: %s\n", props.FullyQualifiedNamespace)
fmt.Printf("Endpoint: %s\n", props.Endpoint)
if props.EntityPath != nil {
fmt.Printf("Entity Path: %s\n", *props.EntityPath)
}
if props.SharedAccessKeyName != nil {
fmt.Printf("Key Name: %s\n", *props.SharedAccessKeyName)
}type ConnectionStringProperties struct {
// Endpoint is the Endpoint value in the connection string.
// Ex: sb://example.servicebus.windows.net
Endpoint string
// EntityPath is EntityPath value in the connection string.
EntityPath *string
// FullyQualifiedNamespace is the Endpoint value without the protocol scheme.
// Ex: example.servicebus.windows.net
FullyQualifiedNamespace string
// SharedAccessKey is the SharedAccessKey value in the connection string.
SharedAccessKey *string
// SharedAccessKeyName is the SharedAccessKeyName value in the connection string.
SharedAccessKeyName *string
// SharedAccessSignature is the SharedAccessSignature value in the connection string.
SharedAccessSignature *string
// Emulator indicates that the connection string is for an emulator:
// ex: Endpoint=localhost:6765;SharedAccessKeyName=...;SharedAccessKey=...;UseDevelopmentEmulator=true
Emulator bool
}Fields:
Endpoint (string) - Full endpoint URL (e.g., "sb://example.servicebus.windows.net")EntityPath (*string) - Entity path from the connection string (Event Hub name)FullyQualifiedNamespace (string) - Namespace without protocol (e.g., "example.servicebus.windows.net")SharedAccessKey (*string) - Shared access key for authenticationSharedAccessKeyName (*string) - Shared access key nameSharedAccessSignature (*string) - Shared access signature (alternative to key/keyname)Emulator (bool) - Whether this is an emulator connection stringtype RetryOptions struct {
// MaxRetries specifies the maximum number of attempts a failed operation will be retried
// before producing an error.
// The default value is three. A value less than zero means one try and no retries.
MaxRetries int32
// RetryDelay specifies the initial amount of delay to use before retrying an operation.
// The delay increases exponentially with each retry up to the maximum specified by MaxRetryDelay.
// The default value is four seconds. A value less than zero means no delay between retries.
RetryDelay time.Duration
// MaxRetryDelay specifies the maximum delay allowed before retrying an operation.
// Typically the value is greater than or equal to the value specified in RetryDelay.
// The default Value is 120 seconds. A value less than zero means there is no cap.
MaxRetryDelay time.Duration
}Fields:
MaxRetries (int32) - Maximum retry attempts (default: 3, < 0 means no retries)RetryDelay (time.Duration) - Initial delay before first retry (default: 4 seconds, < 0 means no delay)MaxRetryDelay (time.Duration) - Maximum delay cap for exponential backoff (default: 120 seconds, < 0 means no cap)Retry Behavior:
Example:
import "time"
// Custom retry options
retryOptions := azeventhubs.RetryOptions{
MaxRetries: 5,
RetryDelay: 2 * time.Second,
MaxRetryDelay: 60 * time.Second,
}
// Use with ProducerClient
producerClient, err := azeventhubs.NewProducerClient(
"namespace.servicebus.windows.net",
"eventhub-name",
credential,
&azeventhubs.ProducerClientOptions{
RetryOptions: retryOptions,
},
)
// Use with ConsumerClient
consumerClient, err := azeventhubs.NewConsumerClient(
"namespace.servicebus.windows.net",
"eventhub-name",
azeventhubs.DefaultConsumerGroup,
credential,
&azeventhubs.ConsumerClientOptions{
RetryOptions: retryOptions,
},
)type WebSocketConnParams struct {
// Host is the the `wss://<host>` to connect to
Host string
}WebSocket support allows Event Hubs connections through proxies and firewalls that block AMQP ports.
Fields:
Host (string) - The WebSocket host (wss://<host>)Example:
import (
"context"
"net"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/v2"
"nhooyr.io/websocket"
)
// WebSocket connection factory
newWebSocketConn := func(ctx context.Context, params azeventhubs.WebSocketConnParams) (net.Conn, error) {
// Using nhooyr.io/websocket library
opts := &websocket.DialOptions{
Subprotocols: []string{"amqp"},
}
wssConn, _, err := websocket.Dial(ctx, "wss://"+params.Host, opts)
if err != nil {
return nil, err
}
return websocket.NetConn(ctx, wssConn, websocket.MessageBinary), nil
}
// Use with ProducerClient
producerClient, err := azeventhubs.NewProducerClient(
"namespace.servicebus.windows.net",
"eventhub-name",
credential,
&azeventhubs.ProducerClientOptions{
NewWebSocketConn: newWebSocketConn,
},
)
// Use with ConsumerClient
consumerClient, err := azeventhubs.NewConsumerClient(
"namespace.servicebus.windows.net",
"eventhub-name",
azeventhubs.DefaultConsumerGroup,
credential,
&azeventhubs.ConsumerClientOptions{
NewWebSocketConn: newWebSocketConn,
},
)type Error struct {
// Code is a stable error code which can be used as part of programatic error handling.
// The codes can expand in the future, but the values (and their meaning) will remain the same.
Code ErrorCode
}
func (e *Error) Error() stringEvent Hubs-specific error with actionable error codes.
Note: The Code field is part of the stable API, but the error message from Error() and the underlying wrapped error are subject to change.
type ErrorCode stringString-based error code for programmatic error handling.
const (
// ErrorCodeUnauthorizedAccess means the credentials provided are not valid for use with
// a particular entity, or have expired.
ErrorCodeUnauthorizedAccess ErrorCode = "unauthorized"
// ErrorCodeConnectionLost means our connection was lost and all retry attempts failed.
// This typically reflects an extended outage or connection disruption and may
// require manual intervention.
ErrorCodeConnectionLost ErrorCode = "connlost"
// ErrorCodeOwnershipLost means that a partition that you were reading from was opened
// by another link with a higher epoch/owner level.
ErrorCodeOwnershipLost ErrorCode = "ownershiplost"
)Error Codes:
ErrorCodeUnauthorizedAccess - Invalid or expired credentialsErrorCodeConnectionLost - Connection lost after all retries (extended outage)ErrorCodeOwnershipLost - Partition claimed by another consumer with higher epochExample:
import (
"errors"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/v2"
)
events, err := partitionClient.ReceiveEvents(context.TODO(), 100, nil)
if err != nil {
var ehErr *azeventhubs.Error
if errors.As(err, &ehErr) {
switch ehErr.Code {
case azeventhubs.ErrorCodeUnauthorizedAccess:
// Credentials are invalid or expired
// Re-authenticate or check permissions
log.Println("Authentication failed, refreshing credentials...")
case azeventhubs.ErrorCodeConnectionLost:
// Connection lost after all retries
// May need manual intervention or service investigation
log.Println("Connection lost, check service health...")
case azeventhubs.ErrorCodeOwnershipLost:
// Another consumer took ownership
// Normal for Processor rebalancing
log.Println("Ownership lost, partition reassigned")
return
default:
// Other Event Hubs error
log.Printf("Event Hubs error: %s", ehErr.Code)
}
} else {
// Not an Event Hubs-specific error
log.Printf("Other error: %v", err)
}
}const (
// EventConn is used whenever we create a connection or any links (ie: producers, consumers).
EventConn log.Event = "azeh.Conn"
// EventAuth is used when we're doing authentication/claims negotiation.
EventAuth log.Event = "azeh.Auth"
// EventProducer represents operations that happen on Producers.
EventProducer log.Event = "azeh.Producer"
// EventConsumer represents operations that happen on Consumers.
EventConsumer log.Event = "azeh.Consumer"
)Log Events:
EventConn - Connection and link creation eventsEventAuth - Authentication and claims negotiation eventsEventProducer - Producer operations (sending, batching)EventConsumer - Consumer operations (receiving, checkpointing)Environment Variable (All SDK Modules):
export AZURE_SDK_GO_LOGGING=allProgrammatic (azeventhubs Only):
import (
"fmt"
azlog "github.com/Azure/azure-sdk-for-go/sdk/azcore/log"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/v2"
)
// Set up listener to print log output to stdout
azlog.SetListener(func(event azlog.Event, s string) {
fmt.Printf("[%s] %s\n", event, s)
})
// Pick which events to log
azlog.SetEvents(
azeventhubs.EventConn,
azeventhubs.EventAuth,
azeventhubs.EventProducer,
azeventhubs.EventConsumer,
)Example with Structured Logging:
import (
"log/slog"
azlog "github.com/Azure/azure-sdk-for-go/sdk/azcore/log"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/v2"
)
logger := slog.Default()
azlog.SetListener(func(event azlog.Event, message string) {
logger.Info(message,
slog.String("event", string(event)),
slog.String("component", "eventhubs"),
)
})
azlog.SetEvents(
azeventhubs.EventConn,
azeventhubs.EventAuth,
azeventhubs.EventProducer,
azeventhubs.EventConsumer,
)type EventHubProperties struct {
// CreatedOn is the time when the event hub was created.
CreatedOn time.Time
// Name of the event hub
Name string
// PartitionIDs for the event hub
PartitionIDs []string
// GeoReplicationEnabled is true if the event hub has geo-replication enabled.
GeoReplicationEnabled bool
}Fields:
CreatedOn (time.Time) - When the Event Hub was createdName (string) - Name of the Event HubPartitionIDs ([]string) - List of partition IDs (e.g., ["0", "1", "2", "3"])GeoReplicationEnabled (bool) - Whether geo-replication is enabledRetrieved via:
ProducerClient.GetEventHubProperties()ConsumerClient.GetEventHubProperties()Example:
props, err := consumerClient.GetEventHubProperties(context.TODO(), nil)
if err != nil {
// handle error
}
fmt.Printf("Event Hub: %s\n", props.Name)
fmt.Printf("Created: %s\n", props.CreatedOn.Format(time.RFC3339))
fmt.Printf("Partitions: %v\n", props.PartitionIDs)
fmt.Printf("Geo-replication: %v\n", props.GeoReplicationEnabled)
// Iterate over all partitions
for _, partitionID := range props.PartitionIDs {
partitionClient, err := consumerClient.NewPartitionClient(partitionID, nil)
if err != nil {
// handle error
}
defer partitionClient.Close(context.TODO())
// ... use partition client
}type GetEventHubPropertiesOptions struct {
// For future expansion
}Currently empty, reserved for future options.
type PartitionProperties struct {
// BeginningSequenceNumber is the first sequence number for a partition.
BeginningSequenceNumber int64
// EventHubName is the name of the Event Hub for this partition.
EventHubName string
// IsEmpty is true if the partition is empty, false otherwise.
IsEmpty bool
// LastEnqueuedOffset is the offset of latest enqueued event.
LastEnqueuedOffset string
// LastEnqueuedOn is the date of latest enqueued event.
LastEnqueuedOn time.Time
// LastEnqueuedSequenceNumber is the sequence number of the latest enqueued event.
LastEnqueuedSequenceNumber int64
// PartitionID is the partition ID of this partition.
PartitionID string
}Fields:
BeginningSequenceNumber (int64) - First available sequence number in the partitionEventHubName (string) - Name of the Event HubIsEmpty (bool) - Whether the partition is emptyLastEnqueuedOffset (string) - Offset of the latest enqueued eventLastEnqueuedOn (time.Time) - When the latest event was enqueuedLastEnqueuedSequenceNumber (int64) - Sequence number of the latest enqueued eventPartitionID (string) - The partition IDRetrieved via:
ProducerClient.GetPartitionProperties()ConsumerClient.GetPartitionProperties()Example:
partProps, err := consumerClient.GetPartitionProperties(context.TODO(), "0", nil)
if err != nil {
// handle error
}
fmt.Printf("Partition: %s\n", partProps.PartitionID)
fmt.Printf("Event Hub: %s\n", partProps.EventHubName)
fmt.Printf("Is Empty: %v\n", partProps.IsEmpty)
fmt.Printf("Beginning Sequence: %d\n", partProps.BeginningSequenceNumber)
fmt.Printf("Last Sequence: %d\n", partProps.LastEnqueuedSequenceNumber)
fmt.Printf("Last Offset: %s\n", partProps.LastEnqueuedOffset)
fmt.Printf("Last Enqueued: %s\n", partProps.LastEnqueuedOn.Format(time.RFC3339))
// Calculate lag
if !partProps.IsEmpty {
lag := partProps.LastEnqueuedSequenceNumber - partProps.BeginningSequenceNumber
fmt.Printf("Total events in partition: %d\n", lag+1)
}type GetPartitionPropertiesOptions struct {
// For future expansion
}Currently empty, reserved for future options.
package main
import (
"context"
"fmt"
"log"
"time"
"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
azlog "github.com/Azure/azure-sdk-for-go/sdk/azcore/log"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/v2"
)
func main() {
// Enable logging
azlog.SetListener(func(event azlog.Event, s string) {
fmt.Printf("[%s] %s\n", event, s)
})
azlog.SetEvents(
azeventhubs.EventConn,
azeventhubs.EventAuth,
azeventhubs.EventProducer,
azeventhubs.EventConsumer,
)
// Parse connection string
connStr := "Endpoint=sb://namespace.servicebus.windows.net/;SharedAccessKeyName=key;SharedAccessKey=secret"
props, err := azeventhubs.ParseConnectionString(connStr)
if err != nil {
log.Fatal(err)
}
fmt.Printf("Namespace: %s\n", props.FullyQualifiedNamespace)
// Create credential
credential, err := azidentity.NewDefaultAzureCredential(nil)
if err != nil {
log.Fatal(err)
}
// Configure retry options
retryOptions := azeventhubs.RetryOptions{
MaxRetries: 5,
RetryDelay: 2 * time.Second,
MaxRetryDelay: 60 * time.Second,
}
// Create producer with custom configuration
producerClient, err := azeventhubs.NewProducerClient(
"namespace.servicebus.windows.net",
"eventhub-name",
credential,
&azeventhubs.ProducerClientOptions{
ApplicationID: "my-application",
RetryOptions: retryOptions,
},
)
if err != nil {
log.Fatal(err)
}
defer producerClient.Close(context.TODO())
// Get Event Hub properties
ehProps, err := producerClient.GetEventHubProperties(context.TODO(), nil)
if err != nil {
log.Fatal(err)
}
fmt.Printf("Event Hub: %s\n", ehProps.Name)
fmt.Printf("Partitions: %v\n", ehProps.PartitionIDs)
fmt.Printf("Geo-replication: %v\n", ehProps.GeoReplicationEnabled)
// Get partition properties
for _, partitionID := range ehProps.PartitionIDs {
partProps, err := producerClient.GetPartitionProperties(context.TODO(), partitionID, nil)
if err != nil {
log.Printf("Error getting properties for partition %s: %v", partitionID, err)
continue
}
fmt.Printf("\nPartition %s:\n", partProps.PartitionID)
fmt.Printf(" Empty: %v\n", partProps.IsEmpty)
if !partProps.IsEmpty {
fmt.Printf(" Range: %d - %d\n",
partProps.BeginningSequenceNumber,
partProps.LastEnqueuedSequenceNumber)
fmt.Printf(" Last Event: %s\n",
partProps.LastEnqueuedOn.Format(time.RFC3339))
}
}
}