tessl install tessl/golang-github-com-azure-azure-sdk-for-go-sdk-messaging-azeventhubs-v2@2.0.0Azure Event Hubs client library for Go providing ProducerClient for sending events, ConsumerClient for manual partition-level consumption, and Processor for automatic load balancing with checkpoint management using Azure Blob Storage.
Azure Event Hubs is a big data streaming platform and event ingestion service from Microsoft. This Go SDK provides clients for sending events to Event Hubs, consuming events from partitions, and automatic load balancing with checkpoint management across multiple consumer instances.
The library provides three primary consumption patterns:
go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/v2import (
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/v2"
"github.com/Azure/azure-sdk-for-go/sdk/azcore"
"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
)For checkpoint store functionality:
import (
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/v2/checkpoints"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
)import (
"context"
"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/v2"
)
// Create producer client with Azure Identity
cred, err := azidentity.NewDefaultAzureCredential(nil)
if err != nil {
panic(err)
}
producerClient, err := azeventhubs.NewProducerClient(
"myeventhub.servicebus.windows.net",
"myhub",
cred,
nil,
)
if err != nil {
panic(err)
}
defer producerClient.Close(context.TODO())
// Create a batch
batch, err := producerClient.NewEventDataBatch(context.TODO(), nil)
if err != nil {
panic(err)
}
// Add events to batch
err = batch.AddEventData(&azeventhubs.EventData{
Body: []byte("hello world"),
}, nil)
if err != nil {
panic(err)
}
// Send the batch
err = producerClient.SendEventDataBatch(context.TODO(), batch, nil)
if err != nil {
panic(err)
}import (
"context"
"time"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/v2"
)
// Create consumer client
cred, err := azidentity.NewDefaultAzureCredential(nil)
consumerClient, err := azeventhubs.NewConsumerClient(
"myeventhub.servicebus.windows.net",
"myhub",
azeventhubs.DefaultConsumerGroup,
cred,
nil,
)
defer consumerClient.Close(context.TODO())
// Create partition client to read from partition "0"
partitionClient, err := consumerClient.NewPartitionClient("0", &azeventhubs.PartitionClientOptions{
StartPosition: azeventhubs.StartPosition{
Earliest: to.Ptr(true),
},
})
defer partitionClient.Close(context.TODO())
// Receive up to 100 events
receiveCtx, cancel := context.WithTimeout(context.TODO(), time.Minute)
events, err := partitionClient.ReceiveEvents(receiveCtx, 100, nil)
cancel()
for _, event := range events {
fmt.Printf("Event received with body '%s'\n", string(event.Body))
}import (
"context"
"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/v2"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/v2/checkpoints"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
)
// Create consumer client
cred, err := azidentity.NewDefaultAzureCredential(nil)
consumerClient, err := azeventhubs.NewConsumerClient(
"myeventhub.servicebus.windows.net",
"myhub",
azeventhubs.DefaultConsumerGroup,
cred,
nil,
)
defer consumerClient.Close(context.TODO())
// Create checkpoint store using Azure Blob Storage
blobClient, err := azblob.NewClient("https://mystorageaccount.blob.core.windows.net", cred, nil)
containerClient := blobClient.ServiceClient().NewContainerClient("checkpoints")
checkpointStore, err := checkpoints.NewBlobStore(containerClient, nil)
// Create processor
processor, err := azeventhubs.NewProcessor(consumerClient, checkpointStore, nil)
// Start processing partitions in background
go func() {
for {
partitionClient := processor.NextPartitionClient(context.TODO())
if partitionClient == nil {
break
}
go processPartition(partitionClient)
}
}()
// Run processor (blocks until context is cancelled)
err = processor.Run(context.TODO())Send events to Event Hubs using batches for efficient transmission.
func NewProducerClient(fullyQualifiedNamespace string, eventHub string, credential azcore.TokenCredential, options *ProducerClientOptions) (*ProducerClient, error)func NewProducerClientFromConnectionString(connectionString string, eventHub string, options *ProducerClientOptions) (*ProducerClient, error)Key Methods:
func (pc *ProducerClient) NewEventDataBatch(ctx context.Context, options *EventDataBatchOptions) (*EventDataBatch, error)func (pc *ProducerClient) SendEventDataBatch(ctx context.Context, batch *EventDataBatch, options *SendEventDataBatchOptions) errorfunc (pc *ProducerClient) GetEventHubProperties(ctx context.Context, options *GetEventHubPropertiesOptions) (EventHubProperties, error)func (pc *ProducerClient) GetPartitionProperties(ctx context.Context, partitionID string, options *GetPartitionPropertiesOptions) (PartitionProperties, error)func (pc *ProducerClient) Close(ctx context.Context) errorManually consume events from specific partitions.
func NewConsumerClient(fullyQualifiedNamespace string, eventHub string, consumerGroup string, credential azcore.TokenCredential, options *ConsumerClientOptions) (*ConsumerClient, error)func NewConsumerClientFromConnectionString(connectionString string, eventHub string, consumerGroup string, options *ConsumerClientOptions) (*ConsumerClient, error)Key Methods:
func (cc *ConsumerClient) NewPartitionClient(partitionID string, options *PartitionClientOptions) (*PartitionClient, error)func (cc *ConsumerClient) GetEventHubProperties(ctx context.Context, options *GetEventHubPropertiesOptions) (EventHubProperties, error)func (cc *ConsumerClient) GetPartitionProperties(ctx context.Context, partitionID string, options *GetPartitionPropertiesOptions) (PartitionProperties, error)func (cc *ConsumerClient) InstanceID() stringfunc (cc *ConsumerClient) Close(ctx context.Context) errorPartitionClient Methods:
func (pc *PartitionClient) ReceiveEvents(ctx context.Context, count int, options *ReceiveEventsOptions) ([]*ReceivedEventData, error)func (pc *PartitionClient) Close(ctx context.Context) errorAutomatically balance partition consumption across multiple instances with checkpoint management.
func NewProcessor(consumerClient *ConsumerClient, checkpointStore CheckpointStore, options *ProcessorOptions) (*Processor, error)Key Methods:
func (p *Processor) Run(ctx context.Context) errorfunc (p *Processor) NextPartitionClient(ctx context.Context) *ProcessorPartitionClientProcessorPartitionClient Methods:
func (ppc *ProcessorPartitionClient) ReceiveEvents(ctx context.Context, count int, options *ReceiveEventsOptions) ([]*ReceivedEventData, error)func (ppc *ProcessorPartitionClient) UpdateCheckpoint(ctx context.Context, latestEvent *ReceivedEventData, options *UpdateCheckpointOptions) errorfunc (ppc *ProcessorPartitionClient) PartitionID() stringfunc (ppc *ProcessorPartitionClient) Close(ctx context.Context) errorCore types for event data and AMQP messages.
type EventData struct {
Properties map[string]any
Body []byte
ContentType *string
CorrelationID any
MessageID *string
}type ReceivedEventData struct {
EventData
EnqueuedTime *time.Time
PartitionKey *string
Offset string
RawAMQPMessage *AMQPAnnotatedMessage
SequenceNumber int64
SystemProperties map[string]any
}type StartPosition struct {
Offset *string
SequenceNumber *int64
EnqueuedTime *time.Time
Inclusive bool
Earliest *bool
Latest *bool
}Interface for checkpoint persistence and Azure Blob Storage implementation.
type CheckpointStore interface {
ClaimOwnership(ctx context.Context, partitionOwnership []Ownership, options *ClaimOwnershipOptions) ([]Ownership, error)
ListCheckpoints(ctx context.Context, fullyQualifiedNamespace string, eventHubName string, consumerGroup string, options *ListCheckpointsOptions) ([]Checkpoint, error)
ListOwnership(ctx context.Context, fullyQualifiedNamespace string, eventHubName string, consumerGroup string, options *ListOwnershipOptions) ([]Ownership, error)
SetCheckpoint(ctx context.Context, checkpoint Checkpoint, options *SetCheckpointOptions) error
}func NewBlobStore(containerClient *container.Client, options *BlobStoreOptions) (*BlobStore, error)Checkpoint Store Documentation
Connection string parsing, retry options, error handling, and logging.
type RetryOptions struct {
MaxRetries int32
RetryDelay time.Duration
MaxRetryDelay time.Duration
}func ParseConnectionString(connStr string) (ConnectionStringProperties, error)const DefaultConsumerGroup = "$Default"Default consumer group name used by Event Hubs.
var ErrEventDataTooLarge errorError returned when an event is too large to fit in a batch.
Event Hubs errors implement the Error interface with specific error codes:
type Error struct {
Code ErrorCode
}func (e *Error) Error() stringtype ErrorCode stringError code constants:
const (
ErrorCodeUnauthorizedAccess ErrorCode = "unauthorized"
ErrorCodeConnectionLost ErrorCode = "connlost"
ErrorCodeOwnershipLost ErrorCode = "ownershiplost"
)The SDK uses the azcore logging package with specific log events:
const (
EventConn log.Event = "azeh.Conn"
EventAuth log.Event = "azeh.Auth"
EventProducer log.Event = "azeh.Producer"
EventConsumer log.Event = "azeh.Consumer"
)Configure logging:
import (
azlog "github.com/Azure/azure-sdk-for-go/sdk/azcore/log"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/v2"
)
// Set up listener
azlog.SetListener(func(event azlog.Event, s string) {
fmt.Printf("[%s] %s\n", event, s)
})
// Enable specific events
azlog.SetEvents(
azeventhubs.EventConn,
azeventhubs.EventAuth,
azeventhubs.EventProducer,
azeventhubs.EventConsumer,
)import "github.com/Azure/azure-sdk-for-go/sdk/azidentity"
credential, err := azidentity.NewDefaultAzureCredential(nil)
producerClient, err := azeventhubs.NewProducerClient(
"namespace.servicebus.windows.net",
"eventhub-name",
credential,
nil,
)// Without EntityPath in connection string
producerClient, err := azeventhubs.NewProducerClientFromConnectionString(
"Endpoint=sb://namespace.servicebus.windows.net/;SharedAccessKeyName=key;SharedAccessKey=secret",
"eventhub-name",
nil,
)
// With EntityPath in connection string
producerClient, err := azeventhubs.NewProducerClientFromConnectionString(
"Endpoint=sb://namespace.servicebus.windows.net/;SharedAccessKeyName=key;SharedAccessKey=secret;EntityPath=eventhub-name",
"",
nil,
)Events in Event Hubs are organized into partitions. Each partition is an ordered sequence of events. You can:
EventDataBatchOptions.PartitionIDEventDataBatchOptions.PartitionKeyConsumer groups allow multiple applications to each have a separate view of the event stream. Each consumer group maintains its own position in the stream. The default consumer group is azeventhubs.DefaultConsumerGroup (value: "$Default").
Checkpoints track the last successfully processed event in a partition. This allows consumers to resume processing from where they left off after restarts or failures. Use the Processor with a CheckpointStore for automatic checkpoint management.