tessl install tessl/golang-github-com-azure-azure-sdk-for-go-sdk-messaging-azeventhubs-v2@2.0.0Azure Event Hubs client library for Go providing ProducerClient for sending events, ConsumerClient for manual partition-level consumption, and Processor for automatic load balancing with checkpoint management using Azure Blob Storage.
The CheckpointStore interface provides a way to track processing progress and coordinate partition ownership across multiple consumer instances. The SDK includes a BlobStore implementation that uses Azure Blob Storage.
type CheckpointStore interface {
// ClaimOwnership attempts to claim ownership of the partitions in partitionOwnership and returns
// the actual partitions that were claimed.
ClaimOwnership(ctx context.Context, partitionOwnership []Ownership, options *ClaimOwnershipOptions) ([]Ownership, error)
// ListCheckpoints lists all the available checkpoints.
ListCheckpoints(ctx context.Context, fullyQualifiedNamespace, eventHubName, consumerGroup string, options *ListCheckpointsOptions) ([]Checkpoint, error)
// ListOwnership lists all ownerships.
ListOwnership(ctx context.Context, fullyQualifiedNamespace, eventHubName, consumerGroup string, options *ListOwnershipOptions) ([]Ownership, error)
// SetCheckpoint updates a specific checkpoint with a sequence and offset.
SetCheckpoint(ctx context.Context, checkpoint Checkpoint, options *SetCheckpointOptions) error
}The CheckpointStore interface is used by the Processor for:
Checkpoint tracks the last successfully processed event in a partition.
type Checkpoint struct {
// ConsumerGroup is the consumer group name
ConsumerGroup string
// EventHubName is the Event Hub name
EventHubName string
// FullyQualifiedNamespace is the Event Hubs namespace
FullyQualifiedNamespace string
// PartitionID is the partition ID
PartitionID string
// Offset is the last successfully processed Offset.
Offset *string
// SequenceNumber is the last successfully processed SequenceNumber.
SequenceNumber *int64
}Fields:
ConsumerGroup (string) - The consumer group nameEventHubName (string) - The Event Hub nameFullyQualifiedNamespace (string) - The Event Hubs namespace (e.g., "namespace.servicebus.windows.net")PartitionID (string) - The partition IDOffset (*string) - The offset of the last successfully processed eventSequenceNumber (*int64) - The sequence number of the last successfully processed eventNote: Either Offset or SequenceNumber should be set, depending on which is available.
Ownership tracks which consumer owns a partition for load balancing.
type Ownership struct {
// ConsumerGroup is the consumer group name
ConsumerGroup string
// EventHubName is the Event Hub name
EventHubName string
// FullyQualifiedNamespace is the Event Hubs namespace
FullyQualifiedNamespace string
// PartitionID is the partition ID
PartitionID string
// OwnerID is the owner ID of the Processor
OwnerID string
// LastModifiedTime is used when calculating if ownership has expired
LastModifiedTime time.Time
// ETag is the ETag, used when attempting to claim or update ownership of a partition.
ETag *azcore.ETag
}Fields:
ConsumerGroup (string) - The consumer group nameEventHubName (string) - The Event Hub nameFullyQualifiedNamespace (string) - The Event Hubs namespacePartitionID (string) - The partition IDOwnerID (string) - The owner ID (instance ID) of the Processor that owns this partitionLastModifiedTime (time.Time) - Last time ownership was updated (for expiration checks)ETag (*azcore.ETag) - ETag for optimistic concurrency control// ClaimOwnershipOptions contains optional parameters for the ClaimOwnership function
type ClaimOwnershipOptions struct {
// For future expansion
}
// ListCheckpointsOptions contains optional parameters for the ListCheckpoints function
type ListCheckpointsOptions struct {
// For future expansion
}
// ListOwnershipOptions contains optional parameters for the ListOwnership function
type ListOwnershipOptions struct {
// For future expansion
}
// SetCheckpointOptions contains optional parameters for the UpdateCheckpoint function
type SetCheckpointOptions struct {
// For future expansion
}These options types are currently empty and reserved for future expansion.
The SDK provides a BlobStore implementation that uses Azure Blob Storage to persist checkpoints and ownership information.
import (
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/v2/checkpoints"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
)func NewBlobStore(
containerClient *container.Client,
options *BlobStoreOptions,
) (*BlobStore, error)Creates a checkpoint store that stores ownership and checkpoints in Azure Blob storage.
Important: The container must exist before the checkpoint store can be used.
Parameters:
containerClient (*container.Client) - Azure Blob Storage container clientoptions (*BlobStoreOptions) - Optional configuration (can be nil)Returns:
*BlobStore - The checkpoint store instanceerror - Error if the store cannot be createdExample:
import (
"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/v2/checkpoints"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
)
credential, err := azidentity.NewDefaultAzureCredential(nil)
if err != nil {
// handle error
}
// Create container client
containerClient, err := container.NewClient(
"https://storageaccount.blob.core.windows.net/checkpoints",
credential,
nil,
)
if err != nil {
// handle error
}
// Create checkpoint store
checkpointStore, err := checkpoints.NewBlobStore(containerClient, nil)
if err != nil {
// handle error
}type BlobStoreOptions struct {
azcore.ClientOptions
}BlobStoreOptions embeds azcore.ClientOptions for standard Azure SDK configuration options.
The BlobStore type implements the CheckpointStore interface:
func (b *BlobStore) ClaimOwnership(
ctx context.Context,
partitionOwnership []azeventhubs.Ownership,
options *azeventhubs.ClaimOwnershipOptions,
) ([]azeventhubs.Ownership, error)Attempts to claim ownership of partitions. If claiming fails due to another update (optimistic concurrency), the partition is omitted from the returned slice.
func (b *BlobStore) ListCheckpoints(
ctx context.Context,
fullyQualifiedNamespace, eventHubName, consumerGroup string,
options *azeventhubs.ListCheckpointsOptions,
) ([]azeventhubs.Checkpoint, error)Lists all available checkpoints for the given namespace, Event Hub, and consumer group.
func (b *BlobStore) ListOwnership(
ctx context.Context,
fullyQualifiedNamespace, eventHubName, consumerGroup string,
options *azeventhubs.ListOwnershipOptions,
) ([]azeventhubs.Ownership, error)Lists all ownerships for the given namespace, Event Hub, and consumer group.
func (b *BlobStore) SetCheckpoint(
ctx context.Context,
checkpoint azeventhubs.Checkpoint,
options *azeventhubs.SetCheckpointOptions,
) errorUpdates a specific checkpoint with sequence number and offset.
Note: This function doesn't prevent simultaneous checkpoint updates - ownership is assumed.
The BlobStore uses the following blob naming convention:
{fullyQualifiedNamespace}/{eventHubName}/{consumerGroup}/checkpoint/{partitionID}Example:
namespace.servicebus.windows.net/my-eventhub/$Default/checkpoint/0Metadata:
sequencenumber - The sequence number as a stringoffset - The offset as a string{fullyQualifiedNamespace}/{eventHubName}/{consumerGroup}/ownership/{partitionID}Example:
namespace.servicebus.windows.net/my-eventhub/$Default/ownership/0Metadata:
ownerid - The owner ID (instance ID) of the owning ProcessorThe Processor uses the CheckpointStore automatically for load balancing:
import (
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/v2"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/v2/checkpoints"
)
// Create checkpoint store
checkpointStore, err := checkpoints.NewBlobStore(containerClient, nil)
if err != nil {
// handle error
}
// Create processor
processor, err := azeventhubs.NewProcessor(
consumerClient,
checkpointStore,
nil,
)
if err != nil {
// handle error
}
// Run processor
go processor.Run(context.Background())
// Get partition clients and update checkpoints
for {
partitionClient := processor.NextPartitionClient(context.Background())
if partitionClient == nil {
break
}
go func(pc *azeventhubs.ProcessorPartitionClient) {
defer pc.Close(context.Background())
for {
events, err := pc.ReceiveEvents(context.Background(), 100, nil)
if err != nil {
return
}
// Process events...
// Update checkpoint
if len(events) > 0 {
lastEvent := events[len(events)-1]
err = pc.UpdateCheckpoint(context.Background(), lastEvent, nil)
if err != nil {
// handle error
}
}
}
}(partitionClient)
}You can also use the CheckpointStore interface directly without the Processor:
import (
"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
)
// List existing checkpoints
checkpoints, err := checkpointStore.ListCheckpoints(
context.TODO(),
"namespace.servicebus.windows.net",
"my-eventhub",
azeventhubs.DefaultConsumerGroup,
nil,
)
if err != nil {
// handle error
}
for _, cp := range checkpoints {
fmt.Printf("Partition %s: seq=%d, offset=%s\n",
cp.PartitionID, *cp.SequenceNumber, *cp.Offset)
}
// Set a checkpoint
checkpoint := azeventhubs.Checkpoint{
FullyQualifiedNamespace: "namespace.servicebus.windows.net",
EventHubName: "my-eventhub",
ConsumerGroup: azeventhubs.DefaultConsumerGroup,
PartitionID: "0",
SequenceNumber: to.Ptr(int64(1000)),
Offset: to.Ptr("12345"),
}
err = checkpointStore.SetCheckpoint(context.TODO(), checkpoint, nil)
if err != nil {
// handle error
}
// List ownerships
ownerships, err := checkpointStore.ListOwnership(
context.TODO(),
"namespace.servicebus.windows.net",
"my-eventhub",
azeventhubs.DefaultConsumerGroup,
nil,
)
if err != nil {
// handle error
}
for _, ownership := range ownerships {
fmt.Printf("Partition %s owned by: %s (last modified: %s)\n",
ownership.PartitionID,
ownership.OwnerID,
ownership.LastModifiedTime.Format(time.RFC3339))
}package main
import (
"context"
"fmt"
"log"
"time"
"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/v2"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/v2/checkpoints"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
)
func main() {
credential, err := azidentity.NewDefaultAzureCredential(nil)
if err != nil {
log.Fatal(err)
}
// Create container client
containerClient, err := container.NewClient(
"https://storageaccount.blob.core.windows.net/checkpoints",
credential,
nil,
)
if err != nil {
log.Fatal(err)
}
// Create checkpoint store
checkpointStore, err := checkpoints.NewBlobStore(containerClient, nil)
if err != nil {
log.Fatal(err)
}
// Create consumer client
consumerClient, err := azeventhubs.NewConsumerClient(
"namespace.servicebus.windows.net",
"my-eventhub",
azeventhubs.DefaultConsumerGroup,
credential,
nil,
)
if err != nil {
log.Fatal(err)
}
defer consumerClient.Close(context.TODO())
// Create partition client
partitionClient, err := consumerClient.NewPartitionClient("0", &azeventhubs.PartitionClientOptions{
StartPosition: azeventhubs.StartPosition{
Earliest: to.Ptr(true),
},
})
if err != nil {
log.Fatal(err)
}
defer partitionClient.Close(context.TODO())
// Check for existing checkpoint
checkpoints, err := checkpointStore.ListCheckpoints(
context.TODO(),
"namespace.servicebus.windows.net",
"my-eventhub",
azeventhubs.DefaultConsumerGroup,
nil,
)
if err != nil {
log.Fatal(err)
}
for _, cp := range checkpoints {
if cp.PartitionID == "0" {
fmt.Printf("Found existing checkpoint: seq=%d, offset=%s\n",
*cp.SequenceNumber, *cp.Offset)
}
}
// Receive and process events
for i := 0; i < 10; i++ {
events, err := partitionClient.ReceiveEvents(context.TODO(), 10, nil)
if err != nil {
log.Fatal(err)
}
for _, event := range events {
fmt.Printf("Event: %s (seq: %d)\n", string(event.Body), event.SequenceNumber)
}
// Save checkpoint every batch
if len(events) > 0 {
lastEvent := events[len(events)-1]
checkpoint := azeventhubs.Checkpoint{
FullyQualifiedNamespace: "namespace.servicebus.windows.net",
EventHubName: "my-eventhub",
ConsumerGroup: azeventhubs.DefaultConsumerGroup,
PartitionID: "0",
SequenceNumber: &lastEvent.SequenceNumber,
Offset: &lastEvent.Offset,
}
err = checkpointStore.SetCheckpoint(context.TODO(), checkpoint, nil)
if err != nil {
log.Printf("Error saving checkpoint: %v", err)
} else {
fmt.Printf("Checkpoint saved: seq=%d, offset=%s\n",
lastEvent.SequenceNumber, lastEvent.Offset)
}
}
time.Sleep(1 * time.Second)
}
}You can implement your own CheckpointStore for other storage backends:
type MyCheckpointStore struct {
// Your storage implementation
}
func (m *MyCheckpointStore) ClaimOwnership(
ctx context.Context,
partitionOwnership []azeventhubs.Ownership,
options *azeventhubs.ClaimOwnershipOptions,
) ([]azeventhubs.Ownership, error) {
// Implement optimistic concurrency control using ETags
// Return only successfully claimed ownerships
}
func (m *MyCheckpointStore) ListCheckpoints(
ctx context.Context,
fullyQualifiedNamespace, eventHubName, consumerGroup string,
options *azeventhubs.ListCheckpointsOptions,
) ([]azeventhubs.Checkpoint, error) {
// Retrieve all checkpoints for the given parameters
}
func (m *MyCheckpointStore) ListOwnership(
ctx context.Context,
fullyQualifiedNamespace, eventHubName, consumerGroup string,
options *azeventhubs.ListOwnershipOptions,
) ([]azeventhubs.Ownership, error) {
// Retrieve all ownerships for the given parameters
}
func (m *MyCheckpointStore) SetCheckpoint(
ctx context.Context,
checkpoint azeventhubs.Checkpoint,
options *azeventhubs.SetCheckpointOptions,
) error {
// Save the checkpoint
// No need for optimistic concurrency - ownership is assumed
}Checkpoint Frequently - Update checkpoints regularly to minimize reprocessing after restarts. Consider checkpointing every N events or every M seconds.
Handle Checkpoint Failures Gracefully - Checkpoint updates can fail. Log errors but continue processing, as the Processor will retry from the last successful checkpoint.
Use Blob Storage in the Same Region - To minimize latency, deploy your checkpoint store in the same region as your Event Hubs namespace and consumer applications.
Monitor Checkpoint Lag - Track the difference between the last checkpoint and the latest event to detect processing delays.
Secure Access - Use Azure AD authentication (recommended) or connection strings with appropriate access controls for the blob container.
Container Lifecycle - Ensure the blob container exists before creating the BlobStore. Consider implementing container creation in your deployment scripts.