tessl install tessl/golang-github-com-azure-azure-sdk-for-go-sdk-messaging-azeventhubs-v2@2.0.0Azure Event Hubs client library for Go providing ProducerClient for sending events, ConsumerClient for manual partition-level consumption, and Processor for automatic load balancing with checkpoint management using Azure Blob Storage.
The Processor provides automatic load balancing of partition ownership across multiple consumer instances, with built-in checkpointing support. It coordinates with other Processor instances using a CheckpointStore to distribute partitions evenly and resume from checkpoints after restarts.
For manual partition control without load balancing, see ConsumerClient.
func NewProcessor(
consumerClient *ConsumerClient,
checkpointStore CheckpointStore,
options *ProcessorOptions,
) (*Processor, error)Creates a Processor that automatically manages partition load balancing.
Parameters:
consumerClient (*ConsumerClient) - The ConsumerClient to use for connectionscheckpointStore (CheckpointStore) - Store for checkpoints and ownership coordinationoptions (*ProcessorOptions) - Optional configuration (can be nil)Returns:
*Processor - The processor instanceerror - Error if the processor cannot be createdExample:
import (
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/v2"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/v2/checkpoints"
)
// Create checkpoint store
containerClient := // ... create Azure Blob container client
checkpointStore, err := checkpoints.NewBlobStore(containerClient, nil)
if err != nil {
// handle error
}
// Create consumer client
consumerClient, err := azeventhubs.NewConsumerClient(
"namespace.servicebus.windows.net",
"eventhub-name",
azeventhubs.DefaultConsumerGroup,
credential,
nil,
)
if err != nil {
// handle error
}
defer consumerClient.Close(context.TODO())
// Create processor
processor, err := azeventhubs.NewProcessor(
consumerClient,
checkpointStore,
nil,
)
if err != nil {
// handle error
}type ProcessorOptions struct {
// LoadBalancingStrategy dictates how concurrent Processor instances distribute
// ownership of partitions between them.
// The default strategy is ProcessorStrategyBalanced.
LoadBalancingStrategy ProcessorStrategy
// UpdateInterval controls how often attempt to claim partitions.
// The default value is 10 seconds.
UpdateInterval time.Duration
// PartitionExpirationDuration is the amount of time before a partition is considered
// unowned.
// The default value is 60 seconds.
PartitionExpirationDuration time.Duration
// StartPositions are the default start positions (configurable per partition, or with an overall
// default value) if a checkpoint is not found in the CheckpointStore.
//
// - If the Event Hubs namespace has geo-replication enabled, the default is Earliest.
// - If the Event Hubs namespace does NOT have geo-replication enabled, the default position is Latest
StartPositions StartPositions
// Prefetch represents the size of the internal prefetch buffer for each ProcessorPartitionClient
// created by this Processor. When set, this client will attempt to always maintain
// an internal cache of events of this size, asynchronously, increasing the odds that
// ReceiveEvents() will use a locally stored cache of events, rather than having to
// wait for events to arrive from the network.
//
// Defaults to 300 events if Prefetch == 0.
// Disabled if Prefetch < 0.
Prefetch int32
}Fields:
LoadBalancingStrategy (ProcessorStrategy) - Strategy for distributing partitions (default: ProcessorStrategyBalanced)UpdateInterval (time.Duration) - How often to claim partitions (default: 10 seconds)PartitionExpirationDuration (time.Duration) - How long before a partition is considered unowned (default: 60 seconds)StartPositions (StartPositions) - Default start positions if no checkpoint existsPrefetch (int32) - Prefetch buffer size for partition clients (default: 300)type ProcessorStrategy string
const (
// ProcessorStrategyBalanced will attempt to claim a single partition during each update interval, until
// each active owner has an equal share of partitions. It can take longer for Processors to acquire their
// full share of partitions, but minimizes partition swapping.
// This is the default strategy.
ProcessorStrategyBalanced ProcessorStrategy = "balanced"
// ProcessorStrategyGreedy will attempt to claim all partitions it can during each update interval, respecting
// balance. This can lead to more partition swapping, as Processors steal partitions to get to their fair share,
// but can speed up initial startup.
ProcessorStrategyGreedy ProcessorStrategy = "greedy"
)Strategies:
ProcessorStrategyBalanced - Claims one partition per interval, minimizes swapping (default)ProcessorStrategyGreedy - Claims multiple partitions per interval, faster startup, more swappingtype StartPositions struct {
// PerPartition controls the start position for a specific partition,
// by partition ID. If a partition is not configured here it will default
// to Default start position.
PerPartition map[string]StartPosition
// Default is used if the partition is not found in the PerPartition map.
Default StartPosition
}Fields:
PerPartition (map[string]StartPosition) - Start positions for specific partitions by IDDefault (StartPosition) - Default start position for partitions not in PerPartitionSee StartPosition for position options.
Example:
import "github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
processor, err := azeventhubs.NewProcessor(
consumerClient,
checkpointStore,
&azeventhubs.ProcessorOptions{
LoadBalancingStrategy: azeventhubs.ProcessorStrategyBalanced,
UpdateInterval: 15 * time.Second,
StartPositions: azeventhubs.StartPositions{
Default: azeventhubs.StartPosition{
Earliest: to.Ptr(true),
},
PerPartition: map[string]azeventhubs.StartPosition{
"0": {
Latest: to.Ptr(true),
},
"1": {
SequenceNumber: to.Ptr(int64(1000)),
},
},
},
Prefetch: 500,
},
)func (p *Processor) Run(ctx context.Context) errorHandles the load balancing loop, blocking until the context is cancelled or an unrecoverable error occurs. On cancellation, it returns nil.
This function should run for the lifetime of your application, or for as long as you want to continue processing events.
Important: Once a Processor has been stopped, it cannot be restarted. Create a new instance to start processing again.
Parameters:
ctx (context.Context) - Context for cancellationReturns:
error - Error if an unrecoverable error occurs, or nil if context is cancelledExample:
// Run in a goroutine
go func() {
err := processor.Run(context.Background())
if err != nil {
log.Printf("Processor error: %v", err)
}
}()func (p *Processor) NextPartitionClient(
ctx context.Context,
) *ProcessorPartitionClientGets the next owned ProcessorPartitionClient. This function blocks until a partition is claimed or the Processor stops running.
When the Processor stops running, this function returns nil.
Important: You MUST call Close() on the returned client to avoid leaking resources.
Parameters:
ctx (context.Context) - Context for cancellationReturns:
*ProcessorPartitionClient - A partition client for a claimed partition, or nil if the Processor stopsExample:
for {
partitionClient := processor.NextPartitionClient(context.Background())
if partitionClient == nil {
// Processor has stopped
break
}
// Process events from this partition
go func(pc *azeventhubs.ProcessorPartitionClient) {
defer pc.Close(context.Background())
for {
events, err := pc.ReceiveEvents(context.Background(), 100, nil)
if err != nil {
// Handle error
return
}
for _, event := range events {
// Process event
fmt.Printf("Event: %s\n", string(event.Body))
}
// Update checkpoint
if len(events) > 0 {
err = pc.UpdateCheckpoint(context.Background(), events[len(events)-1], nil)
if err != nil {
// Handle error
}
}
}
}(partitionClient)
}ProcessorPartitionClient is similar to PartitionClient but includes checkpointing capabilities.
func (c *ProcessorPartitionClient) ReceiveEvents(
ctx context.Context,
count int,
options *ReceiveEventsOptions,
) ([]*ReceivedEventData, error)Receives events until count events have been received or the context has expired or been cancelled.
See PartitionClient.ReceiveEvents for details.
func (p *ProcessorPartitionClient) UpdateCheckpoint(
ctx context.Context,
latestEvent *ReceivedEventData,
options *UpdateCheckpointOptions,
) errorUpdates the checkpoint in the CheckpointStore. New Processors will resume after this checkpoint for this partition.
Parameters:
ctx (context.Context) - Context for cancellation and timeoutslatestEvent (*ReceivedEventData) - The latest successfully processed eventoptions (*UpdateCheckpointOptions) - Optional parameters (currently empty, for future expansion)Returns:
error - Error if the checkpoint update failsExample:
events, err := partitionClient.ReceiveEvents(context.Background(), 100, nil)
if err != nil {
// handle error
}
for _, event := range events {
// Process event
fmt.Printf("Processing: %s\n", string(event.Body))
}
// Update checkpoint to the last event
if len(events) > 0 {
err = partitionClient.UpdateCheckpoint(
context.Background(),
events[len(events)-1],
nil,
)
if err != nil {
// handle error
}
}func (p *ProcessorPartitionClient) PartitionID() stringReturns the partition ID of the partition being processed. This value does not change during the lifetime of the ProcessorPartitionClient.
func (c *ProcessorPartitionClient) Close(ctx context.Context) errorReleases resources for the partition client. You MUST call this method to avoid leaking resources.
Note: This does NOT close the ConsumerClient that the Processor was created with.
The Processor uses a CheckpointStore to coordinate partition ownership across multiple instances:
Claiming Partitions - Each Processor instance periodically (based on UpdateInterval) attempts to claim partitions from the CheckpointStore.
Fair Distribution - The load balancing algorithm ensures each Processor instance gets an approximately equal share of partitions.
Ownership Expiration - If a Processor crashes or stops updating ownership, its partitions are considered expired after PartitionExpirationDuration and can be claimed by other instances.
Partition Swapping - When a new Processor joins, existing Processors may release some partitions to rebalance the load.
Checkpointing - Processors can save their progress using UpdateCheckpoint(), allowing them to resume from the last checkpoint after restarts.
package main
import (
"context"
"fmt"
"log"
"os"
"os/signal"
"sync"
"time"
"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/v2"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/v2/checkpoints"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
)
func main() {
// Create credential
credential, err := azidentity.NewDefaultAzureCredential(nil)
if err != nil {
log.Fatal(err)
}
// Create checkpoint store
containerClient, err := container.NewClient(
"https://storageaccount.blob.core.windows.net/checkpoints",
credential,
nil,
)
if err != nil {
log.Fatal(err)
}
checkpointStore, err := checkpoints.NewBlobStore(containerClient, nil)
if err != nil {
log.Fatal(err)
}
// Create consumer client
consumerClient, err := azeventhubs.NewConsumerClient(
"namespace.servicebus.windows.net",
"eventhub-name",
azeventhubs.DefaultConsumerGroup,
credential,
nil,
)
if err != nil {
log.Fatal(err)
}
defer consumerClient.Close(context.TODO())
// Create processor
processor, err := azeventhubs.NewProcessor(
consumerClient,
checkpointStore,
&azeventhubs.ProcessorOptions{
LoadBalancingStrategy: azeventhubs.ProcessorStrategyBalanced,
UpdateInterval: 10 * time.Second,
StartPositions: azeventhubs.StartPositions{
Default: azeventhubs.StartPosition{
Earliest: to.Ptr(true),
},
},
},
)
if err != nil {
log.Fatal(err)
}
// Create context that can be cancelled
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Handle shutdown gracefully
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, os.Interrupt)
go func() {
<-sigChan
fmt.Println("\nShutting down...")
cancel()
}()
// Start processor in a goroutine
processorWg := &sync.WaitGroup{}
processorWg.Add(1)
go func() {
defer processorWg.Done()
err := processor.Run(ctx)
if err != nil {
log.Printf("Processor error: %v", err)
}
}()
// Process partitions as they are claimed
dispatchPartitionClients(ctx, processor)
// Wait for processor to finish
processorWg.Wait()
fmt.Println("Processor stopped")
}
func dispatchPartitionClients(ctx context.Context, processor *azeventhubs.Processor) {
for {
partitionClient := processor.NextPartitionClient(ctx)
if partitionClient == nil {
// Processor has stopped
break
}
// Process this partition in a goroutine
go func(pc *azeventhubs.ProcessorPartitionClient) {
defer pc.Close(context.Background())
processPartition(ctx, pc)
}(partitionClient)
}
}
func processPartition(ctx context.Context, partitionClient *azeventhubs.ProcessorPartitionClient) {
partitionID := partitionClient.PartitionID()
fmt.Printf("Processing partition: %s\n", partitionID)
for {
// Check if we should stop
select {
case <-ctx.Done():
return
default:
}
// Receive events
receiveCtx, receiveCancel := context.WithTimeout(ctx, 30*time.Second)
events, err := partitionClient.ReceiveEvents(receiveCtx, 100, nil)
receiveCancel()
if err != nil {
var ehErr *azeventhubs.Error
if errors.As(err, &ehErr) && ehErr.Code == azeventhubs.ErrorCodeOwnershipLost {
fmt.Printf("Partition %s: ownership lost\n", partitionID)
return
}
log.Printf("Partition %s: error receiving: %v\n", partitionID, err)
return
}
if len(events) == 0 {
continue
}
// Process events
for _, event := range events {
fmt.Printf("[Partition %s] Event: %s\n", partitionID, string(event.Body))
}
// Update checkpoint
lastEvent := events[len(events)-1]
err = partitionClient.UpdateCheckpoint(ctx, lastEvent, nil)
if err != nil {
log.Printf("Partition %s: error updating checkpoint: %v\n", partitionID, err)
} else {
fmt.Printf("[Partition %s] Checkpoint updated to sequence %d\n",
partitionID, lastEvent.SequenceNumber)
}
}
}The Processor can encounter several error conditions:
var ehErr *azeventhubs.Error
if errors.As(err, &ehErr) && ehErr.Code == azeventhubs.ErrorCodeOwnershipLost {
// Another processor with higher owner level claimed the partition
// or load balancing moved the partition to another instance
return
}When ownership is lost, the partition will be reassigned. Your processing goroutine should clean up and exit.
var ehErr *azeventhubs.Error
if errors.As(err, &ehErr) && ehErr.Code == azeventhubs.ErrorCodeConnectionLost {
// Connection lost after all retries
// May need manual intervention
}err := processor.Run(ctx)
if err != nil {
// Unrecoverable error occurred
// Create a new Processor instance to restart
}If Run() returns an error, the Processor cannot be restarted. Create a new Processor instance.
Always Close Partition Clients - Use defer to ensure Close() is called on ProcessorPartitionClient instances.
Checkpoint Frequently - Update checkpoints regularly to minimize reprocessing after restarts. Consider checkpointing every N events or every M seconds.
Handle Ownership Loss - Expect ownership to change as Processors join and leave. Design your processing to be idempotent.
Monitor Processor Health - Track errors from Run() and implement restart logic if needed.
Use Appropriate Update Interval - Balance between responsiveness (shorter intervals) and checkpoint store load (longer intervals).
Configure Expiration Duration - Set PartitionExpirationDuration based on your expected failure detection time.