or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
golangpkg:golang/github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/v2@v2.0.1

docs

checkpoint-store.mdconfiguration.mdconsumer.mdevent-types.mdindex.mdprocessor.mdproducer.md
tile.json

tessl/golang-github-com-azure-azure-sdk-for-go-sdk-messaging-azeventhubs-v2

tessl install tessl/golang-github-com-azure-azure-sdk-for-go-sdk-messaging-azeventhubs-v2@2.0.0

Azure Event Hubs client library for Go providing ProducerClient for sending events, ConsumerClient for manual partition-level consumption, and Processor for automatic load balancing with checkpoint management using Azure Blob Storage.

checkpoint-store.mddocs/

Checkpoint Store

The CheckpointStore interface provides a way to track processing progress and coordinate partition ownership across multiple consumer instances. The SDK includes a BlobStore implementation that uses Azure Blob Storage.

CheckpointStore Interface

type CheckpointStore interface {
    // ClaimOwnership attempts to claim ownership of the partitions in partitionOwnership and returns
    // the actual partitions that were claimed.
    ClaimOwnership(ctx context.Context, partitionOwnership []Ownership, options *ClaimOwnershipOptions) ([]Ownership, error)

    // ListCheckpoints lists all the available checkpoints.
    ListCheckpoints(ctx context.Context, fullyQualifiedNamespace, eventHubName, consumerGroup string, options *ListCheckpointsOptions) ([]Checkpoint, error)

    // ListOwnership lists all ownerships.
    ListOwnership(ctx context.Context, fullyQualifiedNamespace, eventHubName, consumerGroup string, options *ListOwnershipOptions) ([]Ownership, error)

    // SetCheckpoint updates a specific checkpoint with a sequence and offset.
    SetCheckpoint(ctx context.Context, checkpoint Checkpoint, options *SetCheckpointOptions) error
}

The CheckpointStore interface is used by the Processor for:

  • Coordinating partition ownership across multiple consumer instances
  • Tracking processing progress through checkpoints
  • Enabling resumption from the last processed event after restarts

Checkpoint Type

Checkpoint tracks the last successfully processed event in a partition.

type Checkpoint struct {
    // ConsumerGroup is the consumer group name
    ConsumerGroup string

    // EventHubName is the Event Hub name
    EventHubName string

    // FullyQualifiedNamespace is the Event Hubs namespace
    FullyQualifiedNamespace string

    // PartitionID is the partition ID
    PartitionID string

    // Offset is the last successfully processed Offset.
    Offset *string

    // SequenceNumber is the last successfully processed SequenceNumber.
    SequenceNumber *int64
}

Fields:

  • ConsumerGroup (string) - The consumer group name
  • EventHubName (string) - The Event Hub name
  • FullyQualifiedNamespace (string) - The Event Hubs namespace (e.g., "namespace.servicebus.windows.net")
  • PartitionID (string) - The partition ID
  • Offset (*string) - The offset of the last successfully processed event
  • SequenceNumber (*int64) - The sequence number of the last successfully processed event

Note: Either Offset or SequenceNumber should be set, depending on which is available.

Ownership Type

Ownership tracks which consumer owns a partition for load balancing.

type Ownership struct {
    // ConsumerGroup is the consumer group name
    ConsumerGroup string

    // EventHubName is the Event Hub name
    EventHubName string

    // FullyQualifiedNamespace is the Event Hubs namespace
    FullyQualifiedNamespace string

    // PartitionID is the partition ID
    PartitionID string

    // OwnerID is the owner ID of the Processor
    OwnerID string

    // LastModifiedTime is used when calculating if ownership has expired
    LastModifiedTime time.Time

    // ETag is the ETag, used when attempting to claim or update ownership of a partition.
    ETag *azcore.ETag
}

Fields:

  • ConsumerGroup (string) - The consumer group name
  • EventHubName (string) - The Event Hub name
  • FullyQualifiedNamespace (string) - The Event Hubs namespace
  • PartitionID (string) - The partition ID
  • OwnerID (string) - The owner ID (instance ID) of the Processor that owns this partition
  • LastModifiedTime (time.Time) - Last time ownership was updated (for expiration checks)
  • ETag (*azcore.ETag) - ETag for optimistic concurrency control

Options Types

// ClaimOwnershipOptions contains optional parameters for the ClaimOwnership function
type ClaimOwnershipOptions struct {
    // For future expansion
}

// ListCheckpointsOptions contains optional parameters for the ListCheckpoints function
type ListCheckpointsOptions struct {
    // For future expansion
}

// ListOwnershipOptions contains optional parameters for the ListOwnership function
type ListOwnershipOptions struct {
    // For future expansion
}

// SetCheckpointOptions contains optional parameters for the UpdateCheckpoint function
type SetCheckpointOptions struct {
    // For future expansion
}

These options types are currently empty and reserved for future expansion.

BlobStore Implementation

The SDK provides a BlobStore implementation that uses Azure Blob Storage to persist checkpoints and ownership information.

Package Import

import (
    "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/v2/checkpoints"
    "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
)

Creating a BlobStore

func NewBlobStore(
    containerClient *container.Client,
    options *BlobStoreOptions,
) (*BlobStore, error)

Creates a checkpoint store that stores ownership and checkpoints in Azure Blob storage.

Important: The container must exist before the checkpoint store can be used.

Parameters:

  • containerClient (*container.Client) - Azure Blob Storage container client
  • options (*BlobStoreOptions) - Optional configuration (can be nil)

Returns:

  • *BlobStore - The checkpoint store instance
  • error - Error if the store cannot be created

Example:

import (
    "github.com/Azure/azure-sdk-for-go/sdk/azidentity"
    "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/v2/checkpoints"
    "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
)

credential, err := azidentity.NewDefaultAzureCredential(nil)
if err != nil {
    // handle error
}

// Create container client
containerClient, err := container.NewClient(
    "https://storageaccount.blob.core.windows.net/checkpoints",
    credential,
    nil,
)
if err != nil {
    // handle error
}

// Create checkpoint store
checkpointStore, err := checkpoints.NewBlobStore(containerClient, nil)
if err != nil {
    // handle error
}

BlobStoreOptions

type BlobStoreOptions struct {
    azcore.ClientOptions
}

BlobStoreOptions embeds azcore.ClientOptions for standard Azure SDK configuration options.

BlobStore Methods

The BlobStore type implements the CheckpointStore interface:

func (b *BlobStore) ClaimOwnership(
    ctx context.Context,
    partitionOwnership []azeventhubs.Ownership,
    options *azeventhubs.ClaimOwnershipOptions,
) ([]azeventhubs.Ownership, error)

Attempts to claim ownership of partitions. If claiming fails due to another update (optimistic concurrency), the partition is omitted from the returned slice.

func (b *BlobStore) ListCheckpoints(
    ctx context.Context,
    fullyQualifiedNamespace, eventHubName, consumerGroup string,
    options *azeventhubs.ListCheckpointsOptions,
) ([]azeventhubs.Checkpoint, error)

Lists all available checkpoints for the given namespace, Event Hub, and consumer group.

func (b *BlobStore) ListOwnership(
    ctx context.Context,
    fullyQualifiedNamespace, eventHubName, consumerGroup string,
    options *azeventhubs.ListOwnershipOptions,
) ([]azeventhubs.Ownership, error)

Lists all ownerships for the given namespace, Event Hub, and consumer group.

func (b *BlobStore) SetCheckpoint(
    ctx context.Context,
    checkpoint azeventhubs.Checkpoint,
    options *azeventhubs.SetCheckpointOptions,
) error

Updates a specific checkpoint with sequence number and offset.

Note: This function doesn't prevent simultaneous checkpoint updates - ownership is assumed.

Blob Storage Layout

The BlobStore uses the following blob naming convention:

Checkpoint Blobs

{fullyQualifiedNamespace}/{eventHubName}/{consumerGroup}/checkpoint/{partitionID}

Example:

namespace.servicebus.windows.net/my-eventhub/$Default/checkpoint/0

Metadata:

  • sequencenumber - The sequence number as a string
  • offset - The offset as a string

Ownership Blobs

{fullyQualifiedNamespace}/{eventHubName}/{consumerGroup}/ownership/{partitionID}

Example:

namespace.servicebus.windows.net/my-eventhub/$Default/ownership/0

Metadata:

  • ownerid - The owner ID (instance ID) of the owning Processor

Using with Processor

The Processor uses the CheckpointStore automatically for load balancing:

import (
    "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/v2"
    "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/v2/checkpoints"
)

// Create checkpoint store
checkpointStore, err := checkpoints.NewBlobStore(containerClient, nil)
if err != nil {
    // handle error
}

// Create processor
processor, err := azeventhubs.NewProcessor(
    consumerClient,
    checkpointStore,
    nil,
)
if err != nil {
    // handle error
}

// Run processor
go processor.Run(context.Background())

// Get partition clients and update checkpoints
for {
    partitionClient := processor.NextPartitionClient(context.Background())
    if partitionClient == nil {
        break
    }

    go func(pc *azeventhubs.ProcessorPartitionClient) {
        defer pc.Close(context.Background())

        for {
            events, err := pc.ReceiveEvents(context.Background(), 100, nil)
            if err != nil {
                return
            }

            // Process events...

            // Update checkpoint
            if len(events) > 0 {
                lastEvent := events[len(events)-1]
                err = pc.UpdateCheckpoint(context.Background(), lastEvent, nil)
                if err != nil {
                    // handle error
                }
            }
        }
    }(partitionClient)
}

Manual Checkpoint Usage

You can also use the CheckpointStore interface directly without the Processor:

import (
    "github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
)

// List existing checkpoints
checkpoints, err := checkpointStore.ListCheckpoints(
    context.TODO(),
    "namespace.servicebus.windows.net",
    "my-eventhub",
    azeventhubs.DefaultConsumerGroup,
    nil,
)
if err != nil {
    // handle error
}

for _, cp := range checkpoints {
    fmt.Printf("Partition %s: seq=%d, offset=%s\n",
        cp.PartitionID, *cp.SequenceNumber, *cp.Offset)
}

// Set a checkpoint
checkpoint := azeventhubs.Checkpoint{
    FullyQualifiedNamespace: "namespace.servicebus.windows.net",
    EventHubName:            "my-eventhub",
    ConsumerGroup:           azeventhubs.DefaultConsumerGroup,
    PartitionID:             "0",
    SequenceNumber:          to.Ptr(int64(1000)),
    Offset:                  to.Ptr("12345"),
}

err = checkpointStore.SetCheckpoint(context.TODO(), checkpoint, nil)
if err != nil {
    // handle error
}

// List ownerships
ownerships, err := checkpointStore.ListOwnership(
    context.TODO(),
    "namespace.servicebus.windows.net",
    "my-eventhub",
    azeventhubs.DefaultConsumerGroup,
    nil,
)
if err != nil {
    // handle error
}

for _, ownership := range ownerships {
    fmt.Printf("Partition %s owned by: %s (last modified: %s)\n",
        ownership.PartitionID,
        ownership.OwnerID,
        ownership.LastModifiedTime.Format(time.RFC3339))
}

Complete Example

package main

import (
    "context"
    "fmt"
    "log"
    "time"

    "github.com/Azure/azure-sdk-for-go/sdk/azidentity"
    "github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
    "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/v2"
    "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/v2/checkpoints"
    "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
)

func main() {
    credential, err := azidentity.NewDefaultAzureCredential(nil)
    if err != nil {
        log.Fatal(err)
    }

    // Create container client
    containerClient, err := container.NewClient(
        "https://storageaccount.blob.core.windows.net/checkpoints",
        credential,
        nil,
    )
    if err != nil {
        log.Fatal(err)
    }

    // Create checkpoint store
    checkpointStore, err := checkpoints.NewBlobStore(containerClient, nil)
    if err != nil {
        log.Fatal(err)
    }

    // Create consumer client
    consumerClient, err := azeventhubs.NewConsumerClient(
        "namespace.servicebus.windows.net",
        "my-eventhub",
        azeventhubs.DefaultConsumerGroup,
        credential,
        nil,
    )
    if err != nil {
        log.Fatal(err)
    }
    defer consumerClient.Close(context.TODO())

    // Create partition client
    partitionClient, err := consumerClient.NewPartitionClient("0", &azeventhubs.PartitionClientOptions{
        StartPosition: azeventhubs.StartPosition{
            Earliest: to.Ptr(true),
        },
    })
    if err != nil {
        log.Fatal(err)
    }
    defer partitionClient.Close(context.TODO())

    // Check for existing checkpoint
    checkpoints, err := checkpointStore.ListCheckpoints(
        context.TODO(),
        "namespace.servicebus.windows.net",
        "my-eventhub",
        azeventhubs.DefaultConsumerGroup,
        nil,
    )
    if err != nil {
        log.Fatal(err)
    }

    for _, cp := range checkpoints {
        if cp.PartitionID == "0" {
            fmt.Printf("Found existing checkpoint: seq=%d, offset=%s\n",
                *cp.SequenceNumber, *cp.Offset)
        }
    }

    // Receive and process events
    for i := 0; i < 10; i++ {
        events, err := partitionClient.ReceiveEvents(context.TODO(), 10, nil)
        if err != nil {
            log.Fatal(err)
        }

        for _, event := range events {
            fmt.Printf("Event: %s (seq: %d)\n", string(event.Body), event.SequenceNumber)
        }

        // Save checkpoint every batch
        if len(events) > 0 {
            lastEvent := events[len(events)-1]
            checkpoint := azeventhubs.Checkpoint{
                FullyQualifiedNamespace: "namespace.servicebus.windows.net",
                EventHubName:            "my-eventhub",
                ConsumerGroup:           azeventhubs.DefaultConsumerGroup,
                PartitionID:             "0",
                SequenceNumber:          &lastEvent.SequenceNumber,
                Offset:                  &lastEvent.Offset,
            }

            err = checkpointStore.SetCheckpoint(context.TODO(), checkpoint, nil)
            if err != nil {
                log.Printf("Error saving checkpoint: %v", err)
            } else {
                fmt.Printf("Checkpoint saved: seq=%d, offset=%s\n",
                    lastEvent.SequenceNumber, lastEvent.Offset)
            }
        }

        time.Sleep(1 * time.Second)
    }
}

Implementing a Custom CheckpointStore

You can implement your own CheckpointStore for other storage backends:

type MyCheckpointStore struct {
    // Your storage implementation
}

func (m *MyCheckpointStore) ClaimOwnership(
    ctx context.Context,
    partitionOwnership []azeventhubs.Ownership,
    options *azeventhubs.ClaimOwnershipOptions,
) ([]azeventhubs.Ownership, error) {
    // Implement optimistic concurrency control using ETags
    // Return only successfully claimed ownerships
}

func (m *MyCheckpointStore) ListCheckpoints(
    ctx context.Context,
    fullyQualifiedNamespace, eventHubName, consumerGroup string,
    options *azeventhubs.ListCheckpointsOptions,
) ([]azeventhubs.Checkpoint, error) {
    // Retrieve all checkpoints for the given parameters
}

func (m *MyCheckpointStore) ListOwnership(
    ctx context.Context,
    fullyQualifiedNamespace, eventHubName, consumerGroup string,
    options *azeventhubs.ListOwnershipOptions,
) ([]azeventhubs.Ownership, error) {
    // Retrieve all ownerships for the given parameters
}

func (m *MyCheckpointStore) SetCheckpoint(
    ctx context.Context,
    checkpoint azeventhubs.Checkpoint,
    options *azeventhubs.SetCheckpointOptions,
) error {
    // Save the checkpoint
    // No need for optimistic concurrency - ownership is assumed
}

Best Practices

  1. Checkpoint Frequently - Update checkpoints regularly to minimize reprocessing after restarts. Consider checkpointing every N events or every M seconds.

  2. Handle Checkpoint Failures Gracefully - Checkpoint updates can fail. Log errors but continue processing, as the Processor will retry from the last successful checkpoint.

  3. Use Blob Storage in the Same Region - To minimize latency, deploy your checkpoint store in the same region as your Event Hubs namespace and consumer applications.

  4. Monitor Checkpoint Lag - Track the difference between the last checkpoint and the latest event to detect processing delays.

  5. Secure Access - Use Azure AD authentication (recommended) or connection strings with appropriate access controls for the blob container.

  6. Container Lifecycle - Ensure the blob container exists before creating the BlobStore. Consider implementing container creation in your deployment scripts.

See Also

  • Processor Documentation
  • Consumer Client Documentation
  • Configuration and Error Handling