or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
golangpkg:golang/github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/v2@v2.0.1

docs

checkpoint-store.mdconfiguration.mdconsumer.mdevent-types.mdindex.mdprocessor.mdproducer.md
tile.json

tessl/golang-github-com-azure-azure-sdk-for-go-sdk-messaging-azeventhubs-v2

tessl install tessl/golang-github-com-azure-azure-sdk-for-go-sdk-messaging-azeventhubs-v2@2.0.0

Azure Event Hubs client library for Go providing ProducerClient for sending events, ConsumerClient for manual partition-level consumption, and Processor for automatic load balancing with checkpoint management using Azure Blob Storage.

processor.mddocs/

Processor - Automatic Load Balancing

The Processor provides automatic load balancing of partition ownership across multiple consumer instances, with built-in checkpointing support. It coordinates with other Processor instances using a CheckpointStore to distribute partitions evenly and resume from checkpoints after restarts.

For manual partition control without load balancing, see ConsumerClient.

Creating a Processor

func NewProcessor(
    consumerClient *ConsumerClient,
    checkpointStore CheckpointStore,
    options *ProcessorOptions,
) (*Processor, error)

Creates a Processor that automatically manages partition load balancing.

Parameters:

  • consumerClient (*ConsumerClient) - The ConsumerClient to use for connections
  • checkpointStore (CheckpointStore) - Store for checkpoints and ownership coordination
  • options (*ProcessorOptions) - Optional configuration (can be nil)

Returns:

  • *Processor - The processor instance
  • error - Error if the processor cannot be created

Example:

import (
    "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/v2"
    "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/v2/checkpoints"
)

// Create checkpoint store
containerClient := // ... create Azure Blob container client
checkpointStore, err := checkpoints.NewBlobStore(containerClient, nil)
if err != nil {
    // handle error
}

// Create consumer client
consumerClient, err := azeventhubs.NewConsumerClient(
    "namespace.servicebus.windows.net",
    "eventhub-name",
    azeventhubs.DefaultConsumerGroup,
    credential,
    nil,
)
if err != nil {
    // handle error
}
defer consumerClient.Close(context.TODO())

// Create processor
processor, err := azeventhubs.NewProcessor(
    consumerClient,
    checkpointStore,
    nil,
)
if err != nil {
    // handle error
}

ProcessorOptions

type ProcessorOptions struct {
    // LoadBalancingStrategy dictates how concurrent Processor instances distribute
    // ownership of partitions between them.
    // The default strategy is ProcessorStrategyBalanced.
    LoadBalancingStrategy ProcessorStrategy

    // UpdateInterval controls how often attempt to claim partitions.
    // The default value is 10 seconds.
    UpdateInterval time.Duration

    // PartitionExpirationDuration is the amount of time before a partition is considered
    // unowned.
    // The default value is 60 seconds.
    PartitionExpirationDuration time.Duration

    // StartPositions are the default start positions (configurable per partition, or with an overall
    // default value) if a checkpoint is not found in the CheckpointStore.
    //
    // - If the Event Hubs namespace has geo-replication enabled, the default is Earliest.
    // - If the Event Hubs namespace does NOT have geo-replication enabled, the default position is Latest
    StartPositions StartPositions

    // Prefetch represents the size of the internal prefetch buffer for each ProcessorPartitionClient
    // created by this Processor. When set, this client will attempt to always maintain
    // an internal cache of events of this size, asynchronously, increasing the odds that
    // ReceiveEvents() will use a locally stored cache of events, rather than having to
    // wait for events to arrive from the network.
    //
    // Defaults to 300 events if Prefetch == 0.
    // Disabled if Prefetch < 0.
    Prefetch int32
}

Fields:

  • LoadBalancingStrategy (ProcessorStrategy) - Strategy for distributing partitions (default: ProcessorStrategyBalanced)
  • UpdateInterval (time.Duration) - How often to claim partitions (default: 10 seconds)
  • PartitionExpirationDuration (time.Duration) - How long before a partition is considered unowned (default: 60 seconds)
  • StartPositions (StartPositions) - Default start positions if no checkpoint exists
  • Prefetch (int32) - Prefetch buffer size for partition clients (default: 300)

ProcessorStrategy

type ProcessorStrategy string

const (
    // ProcessorStrategyBalanced will attempt to claim a single partition during each update interval, until
    // each active owner has an equal share of partitions. It can take longer for Processors to acquire their
    // full share of partitions, but minimizes partition swapping.
    // This is the default strategy.
    ProcessorStrategyBalanced ProcessorStrategy = "balanced"

    // ProcessorStrategyGreedy will attempt to claim all partitions it can during each update interval, respecting
    // balance. This can lead to more partition swapping, as Processors steal partitions to get to their fair share,
    // but can speed up initial startup.
    ProcessorStrategyGreedy ProcessorStrategy = "greedy"
)

Strategies:

  • ProcessorStrategyBalanced - Claims one partition per interval, minimizes swapping (default)
  • ProcessorStrategyGreedy - Claims multiple partitions per interval, faster startup, more swapping

StartPositions

type StartPositions struct {
    // PerPartition controls the start position for a specific partition,
    // by partition ID. If a partition is not configured here it will default
    // to Default start position.
    PerPartition map[string]StartPosition

    // Default is used if the partition is not found in the PerPartition map.
    Default StartPosition
}

Fields:

  • PerPartition (map[string]StartPosition) - Start positions for specific partitions by ID
  • Default (StartPosition) - Default start position for partitions not in PerPartition

See StartPosition for position options.

Example:

import "github.com/Azure/azure-sdk-for-go/sdk/azcore/to"

processor, err := azeventhubs.NewProcessor(
    consumerClient,
    checkpointStore,
    &azeventhubs.ProcessorOptions{
        LoadBalancingStrategy: azeventhubs.ProcessorStrategyBalanced,
        UpdateInterval:        15 * time.Second,
        StartPositions: azeventhubs.StartPositions{
            Default: azeventhubs.StartPosition{
                Earliest: to.Ptr(true),
            },
            PerPartition: map[string]azeventhubs.StartPosition{
                "0": {
                    Latest: to.Ptr(true),
                },
                "1": {
                    SequenceNumber: to.Ptr(int64(1000)),
                },
            },
        },
        Prefetch: 500,
    },
)

Running the Processor

func (p *Processor) Run(ctx context.Context) error

Handles the load balancing loop, blocking until the context is cancelled or an unrecoverable error occurs. On cancellation, it returns nil.

This function should run for the lifetime of your application, or for as long as you want to continue processing events.

Important: Once a Processor has been stopped, it cannot be restarted. Create a new instance to start processing again.

Parameters:

  • ctx (context.Context) - Context for cancellation

Returns:

  • error - Error if an unrecoverable error occurs, or nil if context is cancelled

Example:

// Run in a goroutine
go func() {
    err := processor.Run(context.Background())
    if err != nil {
        log.Printf("Processor error: %v", err)
    }
}()

Getting Partition Clients

func (p *Processor) NextPartitionClient(
    ctx context.Context,
) *ProcessorPartitionClient

Gets the next owned ProcessorPartitionClient. This function blocks until a partition is claimed or the Processor stops running.

When the Processor stops running, this function returns nil.

Important: You MUST call Close() on the returned client to avoid leaking resources.

Parameters:

  • ctx (context.Context) - Context for cancellation

Returns:

  • *ProcessorPartitionClient - A partition client for a claimed partition, or nil if the Processor stops

Example:

for {
    partitionClient := processor.NextPartitionClient(context.Background())
    if partitionClient == nil {
        // Processor has stopped
        break
    }

    // Process events from this partition
    go func(pc *azeventhubs.ProcessorPartitionClient) {
        defer pc.Close(context.Background())

        for {
            events, err := pc.ReceiveEvents(context.Background(), 100, nil)
            if err != nil {
                // Handle error
                return
            }

            for _, event := range events {
                // Process event
                fmt.Printf("Event: %s\n", string(event.Body))
            }

            // Update checkpoint
            if len(events) > 0 {
                err = pc.UpdateCheckpoint(context.Background(), events[len(events)-1], nil)
                if err != nil {
                    // Handle error
                }
            }
        }
    }(partitionClient)
}

ProcessorPartitionClient

ProcessorPartitionClient is similar to PartitionClient but includes checkpointing capabilities.

Receiving Events

func (c *ProcessorPartitionClient) ReceiveEvents(
    ctx context.Context,
    count int,
    options *ReceiveEventsOptions,
) ([]*ReceivedEventData, error)

Receives events until count events have been received or the context has expired or been cancelled.

See PartitionClient.ReceiveEvents for details.

Updating Checkpoints

func (p *ProcessorPartitionClient) UpdateCheckpoint(
    ctx context.Context,
    latestEvent *ReceivedEventData,
    options *UpdateCheckpointOptions,
) error

Updates the checkpoint in the CheckpointStore. New Processors will resume after this checkpoint for this partition.

Parameters:

  • ctx (context.Context) - Context for cancellation and timeouts
  • latestEvent (*ReceivedEventData) - The latest successfully processed event
  • options (*UpdateCheckpointOptions) - Optional parameters (currently empty, for future expansion)

Returns:

  • error - Error if the checkpoint update fails

Example:

events, err := partitionClient.ReceiveEvents(context.Background(), 100, nil)
if err != nil {
    // handle error
}

for _, event := range events {
    // Process event
    fmt.Printf("Processing: %s\n", string(event.Body))
}

// Update checkpoint to the last event
if len(events) > 0 {
    err = partitionClient.UpdateCheckpoint(
        context.Background(),
        events[len(events)-1],
        nil,
    )
    if err != nil {
        // handle error
    }
}

Get Partition ID

func (p *ProcessorPartitionClient) PartitionID() string

Returns the partition ID of the partition being processed. This value does not change during the lifetime of the ProcessorPartitionClient.

Close Partition Client

func (c *ProcessorPartitionClient) Close(ctx context.Context) error

Releases resources for the partition client. You MUST call this method to avoid leaking resources.

Note: This does NOT close the ConsumerClient that the Processor was created with.

How Load Balancing Works

The Processor uses a CheckpointStore to coordinate partition ownership across multiple instances:

  1. Claiming Partitions - Each Processor instance periodically (based on UpdateInterval) attempts to claim partitions from the CheckpointStore.

  2. Fair Distribution - The load balancing algorithm ensures each Processor instance gets an approximately equal share of partitions.

  3. Ownership Expiration - If a Processor crashes or stops updating ownership, its partitions are considered expired after PartitionExpirationDuration and can be claimed by other instances.

  4. Partition Swapping - When a new Processor joins, existing Processors may release some partitions to rebalance the load.

  5. Checkpointing - Processors can save their progress using UpdateCheckpoint(), allowing them to resume from the last checkpoint after restarts.

Complete Example

package main

import (
    "context"
    "fmt"
    "log"
    "os"
    "os/signal"
    "sync"
    "time"

    "github.com/Azure/azure-sdk-for-go/sdk/azidentity"
    "github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
    "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/v2"
    "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/v2/checkpoints"
    "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
)

func main() {
    // Create credential
    credential, err := azidentity.NewDefaultAzureCredential(nil)
    if err != nil {
        log.Fatal(err)
    }

    // Create checkpoint store
    containerClient, err := container.NewClient(
        "https://storageaccount.blob.core.windows.net/checkpoints",
        credential,
        nil,
    )
    if err != nil {
        log.Fatal(err)
    }

    checkpointStore, err := checkpoints.NewBlobStore(containerClient, nil)
    if err != nil {
        log.Fatal(err)
    }

    // Create consumer client
    consumerClient, err := azeventhubs.NewConsumerClient(
        "namespace.servicebus.windows.net",
        "eventhub-name",
        azeventhubs.DefaultConsumerGroup,
        credential,
        nil,
    )
    if err != nil {
        log.Fatal(err)
    }
    defer consumerClient.Close(context.TODO())

    // Create processor
    processor, err := azeventhubs.NewProcessor(
        consumerClient,
        checkpointStore,
        &azeventhubs.ProcessorOptions{
            LoadBalancingStrategy: azeventhubs.ProcessorStrategyBalanced,
            UpdateInterval:        10 * time.Second,
            StartPositions: azeventhubs.StartPositions{
                Default: azeventhubs.StartPosition{
                    Earliest: to.Ptr(true),
                },
            },
        },
    )
    if err != nil {
        log.Fatal(err)
    }

    // Create context that can be cancelled
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    // Handle shutdown gracefully
    sigChan := make(chan os.Signal, 1)
    signal.Notify(sigChan, os.Interrupt)
    go func() {
        <-sigChan
        fmt.Println("\nShutting down...")
        cancel()
    }()

    // Start processor in a goroutine
    processorWg := &sync.WaitGroup{}
    processorWg.Add(1)

    go func() {
        defer processorWg.Done()
        err := processor.Run(ctx)
        if err != nil {
            log.Printf("Processor error: %v", err)
        }
    }()

    // Process partitions as they are claimed
    dispatchPartitionClients(ctx, processor)

    // Wait for processor to finish
    processorWg.Wait()
    fmt.Println("Processor stopped")
}

func dispatchPartitionClients(ctx context.Context, processor *azeventhubs.Processor) {
    for {
        partitionClient := processor.NextPartitionClient(ctx)
        if partitionClient == nil {
            // Processor has stopped
            break
        }

        // Process this partition in a goroutine
        go func(pc *azeventhubs.ProcessorPartitionClient) {
            defer pc.Close(context.Background())
            processPartition(ctx, pc)
        }(partitionClient)
    }
}

func processPartition(ctx context.Context, partitionClient *azeventhubs.ProcessorPartitionClient) {
    partitionID := partitionClient.PartitionID()
    fmt.Printf("Processing partition: %s\n", partitionID)

    for {
        // Check if we should stop
        select {
        case <-ctx.Done():
            return
        default:
        }

        // Receive events
        receiveCtx, receiveCancel := context.WithTimeout(ctx, 30*time.Second)
        events, err := partitionClient.ReceiveEvents(receiveCtx, 100, nil)
        receiveCancel()

        if err != nil {
            var ehErr *azeventhubs.Error
            if errors.As(err, &ehErr) && ehErr.Code == azeventhubs.ErrorCodeOwnershipLost {
                fmt.Printf("Partition %s: ownership lost\n", partitionID)
                return
            }
            log.Printf("Partition %s: error receiving: %v\n", partitionID, err)
            return
        }

        if len(events) == 0 {
            continue
        }

        // Process events
        for _, event := range events {
            fmt.Printf("[Partition %s] Event: %s\n", partitionID, string(event.Body))
        }

        // Update checkpoint
        lastEvent := events[len(events)-1]
        err = partitionClient.UpdateCheckpoint(ctx, lastEvent, nil)
        if err != nil {
            log.Printf("Partition %s: error updating checkpoint: %v\n", partitionID, err)
        } else {
            fmt.Printf("[Partition %s] Checkpoint updated to sequence %d\n",
                partitionID, lastEvent.SequenceNumber)
        }
    }
}

Error Handling

The Processor can encounter several error conditions:

Ownership Lost

var ehErr *azeventhubs.Error
if errors.As(err, &ehErr) && ehErr.Code == azeventhubs.ErrorCodeOwnershipLost {
    // Another processor with higher owner level claimed the partition
    // or load balancing moved the partition to another instance
    return
}

When ownership is lost, the partition will be reassigned. Your processing goroutine should clean up and exit.

Connection Lost

var ehErr *azeventhubs.Error
if errors.As(err, &ehErr) && ehErr.Code == azeventhubs.ErrorCodeConnectionLost {
    // Connection lost after all retries
    // May need manual intervention
}

Processor Run Errors

err := processor.Run(ctx)
if err != nil {
    // Unrecoverable error occurred
    // Create a new Processor instance to restart
}

If Run() returns an error, the Processor cannot be restarted. Create a new Processor instance.

Best Practices

  1. Always Close Partition Clients - Use defer to ensure Close() is called on ProcessorPartitionClient instances.

  2. Checkpoint Frequently - Update checkpoints regularly to minimize reprocessing after restarts. Consider checkpointing every N events or every M seconds.

  3. Handle Ownership Loss - Expect ownership to change as Processors join and leave. Design your processing to be idempotent.

  4. Monitor Processor Health - Track errors from Run() and implement restart logic if needed.

  5. Use Appropriate Update Interval - Balance between responsiveness (shorter intervals) and checkpoint store load (longer intervals).

  6. Configure Expiration Duration - Set PartitionExpirationDuration based on your expected failure detection time.

See Also