or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
golangpkg:golang/github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/v2@v2.0.1
tile.json

tessl/golang-github-com-azure-azure-sdk-for-go-sdk-messaging-azeventhubs-v2

tessl install tessl/golang-github-com-azure-azure-sdk-for-go-sdk-messaging-azeventhubs-v2@2.0.0

Azure Event Hubs client library for Go providing ProducerClient for sending events, ConsumerClient for manual partition-level consumption, and Processor for automatic load balancing with checkpoint management using Azure Blob Storage.

index.mddocs/

Azure Event Hubs Go SDK

Azure Event Hubs is a big data streaming platform and event ingestion service from Microsoft. This Go SDK provides clients for sending events to Event Hubs, consuming events from partitions, and automatic load balancing with checkpoint management across multiple consumer instances.

The library provides three primary consumption patterns:

  1. ProducerClient - Send events to Event Hubs
  2. ConsumerClient + PartitionClient - Manual consumption from specific partitions
  3. Processor - Automatic load balancing across multiple consumer instances

Package Information

  • Package Name: github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/v2
  • Package Type: golang
  • Language: Go
  • Version: 2.0.1
  • Installation: go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/v2

Core Imports

import (
    "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/v2"
    "github.com/Azure/azure-sdk-for-go/sdk/azcore"
    "github.com/Azure/azure-sdk-for-go/sdk/azidentity"
)

For checkpoint store functionality:

import (
    "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/v2/checkpoints"
    "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
)

Basic Usage

Sending Events

import (
    "context"
    "github.com/Azure/azure-sdk-for-go/sdk/azidentity"
    "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/v2"
)

// Create producer client with Azure Identity
cred, err := azidentity.NewDefaultAzureCredential(nil)
if err != nil {
    panic(err)
}

producerClient, err := azeventhubs.NewProducerClient(
    "myeventhub.servicebus.windows.net",
    "myhub",
    cred,
    nil,
)
if err != nil {
    panic(err)
}
defer producerClient.Close(context.TODO())

// Create a batch
batch, err := producerClient.NewEventDataBatch(context.TODO(), nil)
if err != nil {
    panic(err)
}

// Add events to batch
err = batch.AddEventData(&azeventhubs.EventData{
    Body: []byte("hello world"),
}, nil)
if err != nil {
    panic(err)
}

// Send the batch
err = producerClient.SendEventDataBatch(context.TODO(), batch, nil)
if err != nil {
    panic(err)
}

Receiving Events from a Partition

import (
    "context"
    "time"
    "github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
    "github.com/Azure/azure-sdk-for-go/sdk/azidentity"
    "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/v2"
)

// Create consumer client
cred, err := azidentity.NewDefaultAzureCredential(nil)
consumerClient, err := azeventhubs.NewConsumerClient(
    "myeventhub.servicebus.windows.net",
    "myhub",
    azeventhubs.DefaultConsumerGroup,
    cred,
    nil,
)
defer consumerClient.Close(context.TODO())

// Create partition client to read from partition "0"
partitionClient, err := consumerClient.NewPartitionClient("0", &azeventhubs.PartitionClientOptions{
    StartPosition: azeventhubs.StartPosition{
        Earliest: to.Ptr(true),
    },
})
defer partitionClient.Close(context.TODO())

// Receive up to 100 events
receiveCtx, cancel := context.WithTimeout(context.TODO(), time.Minute)
events, err := partitionClient.ReceiveEvents(receiveCtx, 100, nil)
cancel()

for _, event := range events {
    fmt.Printf("Event received with body '%s'\n", string(event.Body))
}

Using Processor with Checkpoints

import (
    "context"
    "github.com/Azure/azure-sdk-for-go/sdk/azidentity"
    "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/v2"
    "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/v2/checkpoints"
    "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
)

// Create consumer client
cred, err := azidentity.NewDefaultAzureCredential(nil)
consumerClient, err := azeventhubs.NewConsumerClient(
    "myeventhub.servicebus.windows.net",
    "myhub",
    azeventhubs.DefaultConsumerGroup,
    cred,
    nil,
)
defer consumerClient.Close(context.TODO())

// Create checkpoint store using Azure Blob Storage
blobClient, err := azblob.NewClient("https://mystorageaccount.blob.core.windows.net", cred, nil)
containerClient := blobClient.ServiceClient().NewContainerClient("checkpoints")
checkpointStore, err := checkpoints.NewBlobStore(containerClient, nil)

// Create processor
processor, err := azeventhubs.NewProcessor(consumerClient, checkpointStore, nil)

// Start processing partitions in background
go func() {
    for {
        partitionClient := processor.NextPartitionClient(context.TODO())
        if partitionClient == nil {
            break
        }
        go processPartition(partitionClient)
    }
}()

// Run processor (blocks until context is cancelled)
err = processor.Run(context.TODO())

Capabilities

Producer Client - Sending Events

Send events to Event Hubs using batches for efficient transmission.

func NewProducerClient(fullyQualifiedNamespace string, eventHub string, credential azcore.TokenCredential, options *ProducerClientOptions) (*ProducerClient, error)
func NewProducerClientFromConnectionString(connectionString string, eventHub string, options *ProducerClientOptions) (*ProducerClient, error)

Key Methods:

func (pc *ProducerClient) NewEventDataBatch(ctx context.Context, options *EventDataBatchOptions) (*EventDataBatch, error)
func (pc *ProducerClient) SendEventDataBatch(ctx context.Context, batch *EventDataBatch, options *SendEventDataBatchOptions) error
func (pc *ProducerClient) GetEventHubProperties(ctx context.Context, options *GetEventHubPropertiesOptions) (EventHubProperties, error)
func (pc *ProducerClient) GetPartitionProperties(ctx context.Context, partitionID string, options *GetPartitionPropertiesOptions) (PartitionProperties, error)
func (pc *ProducerClient) Close(ctx context.Context) error

Producer Client Documentation

Consumer Client - Receiving Events

Manually consume events from specific partitions.

func NewConsumerClient(fullyQualifiedNamespace string, eventHub string, consumerGroup string, credential azcore.TokenCredential, options *ConsumerClientOptions) (*ConsumerClient, error)
func NewConsumerClientFromConnectionString(connectionString string, eventHub string, consumerGroup string, options *ConsumerClientOptions) (*ConsumerClient, error)

Key Methods:

func (cc *ConsumerClient) NewPartitionClient(partitionID string, options *PartitionClientOptions) (*PartitionClient, error)
func (cc *ConsumerClient) GetEventHubProperties(ctx context.Context, options *GetEventHubPropertiesOptions) (EventHubProperties, error)
func (cc *ConsumerClient) GetPartitionProperties(ctx context.Context, partitionID string, options *GetPartitionPropertiesOptions) (PartitionProperties, error)
func (cc *ConsumerClient) InstanceID() string
func (cc *ConsumerClient) Close(ctx context.Context) error

PartitionClient Methods:

func (pc *PartitionClient) ReceiveEvents(ctx context.Context, count int, options *ReceiveEventsOptions) ([]*ReceivedEventData, error)
func (pc *PartitionClient) Close(ctx context.Context) error

Consumer Client Documentation

Processor - Automatic Load Balancing

Automatically balance partition consumption across multiple instances with checkpoint management.

func NewProcessor(consumerClient *ConsumerClient, checkpointStore CheckpointStore, options *ProcessorOptions) (*Processor, error)

Key Methods:

func (p *Processor) Run(ctx context.Context) error
func (p *Processor) NextPartitionClient(ctx context.Context) *ProcessorPartitionClient

ProcessorPartitionClient Methods:

func (ppc *ProcessorPartitionClient) ReceiveEvents(ctx context.Context, count int, options *ReceiveEventsOptions) ([]*ReceivedEventData, error)
func (ppc *ProcessorPartitionClient) UpdateCheckpoint(ctx context.Context, latestEvent *ReceivedEventData, options *UpdateCheckpointOptions) error
func (ppc *ProcessorPartitionClient) PartitionID() string
func (ppc *ProcessorPartitionClient) Close(ctx context.Context) error

Processor Documentation

Event Data Types

Core types for event data and AMQP messages.

type EventData struct {
    Properties    map[string]any
    Body          []byte
    ContentType   *string
    CorrelationID any
    MessageID     *string
}
type ReceivedEventData struct {
    EventData
    EnqueuedTime     *time.Time
    PartitionKey     *string
    Offset           string
    RawAMQPMessage   *AMQPAnnotatedMessage
    SequenceNumber   int64
    SystemProperties map[string]any
}
type StartPosition struct {
    Offset         *string
    SequenceNumber *int64
    EnqueuedTime   *time.Time
    Inclusive      bool
    Earliest       *bool
    Latest         *bool
}

Event Types Documentation

Checkpoint Store

Interface for checkpoint persistence and Azure Blob Storage implementation.

type CheckpointStore interface {
    ClaimOwnership(ctx context.Context, partitionOwnership []Ownership, options *ClaimOwnershipOptions) ([]Ownership, error)
    ListCheckpoints(ctx context.Context, fullyQualifiedNamespace string, eventHubName string, consumerGroup string, options *ListCheckpointsOptions) ([]Checkpoint, error)
    ListOwnership(ctx context.Context, fullyQualifiedNamespace string, eventHubName string, consumerGroup string, options *ListOwnershipOptions) ([]Ownership, error)
    SetCheckpoint(ctx context.Context, checkpoint Checkpoint, options *SetCheckpointOptions) error
}
func NewBlobStore(containerClient *container.Client, options *BlobStoreOptions) (*BlobStore, error)

Checkpoint Store Documentation

Configuration and Utilities

Connection string parsing, retry options, error handling, and logging.

type RetryOptions struct {
    MaxRetries    int32
    RetryDelay    time.Duration
    MaxRetryDelay time.Duration
}
func ParseConnectionString(connStr string) (ConnectionStringProperties, error)

Configuration Documentation

Constants

const DefaultConsumerGroup = "$Default"

Default consumer group name used by Event Hubs.

var ErrEventDataTooLarge error

Error returned when an event is too large to fit in a batch.

Error Handling

Event Hubs errors implement the Error interface with specific error codes:

type Error struct {
    Code ErrorCode
}
func (e *Error) Error() string
type ErrorCode string

Error code constants:

const (
    ErrorCodeUnauthorizedAccess ErrorCode = "unauthorized"
    ErrorCodeConnectionLost     ErrorCode = "connlost"
    ErrorCodeOwnershipLost      ErrorCode = "ownershiplost"
)
  • ErrorCodeUnauthorizedAccess: Credentials are invalid or expired
  • ErrorCodeConnectionLost: Connection lost after all retry attempts
  • ErrorCodeOwnershipLost: Partition claimed by a consumer with higher epoch/owner level

Logging

The SDK uses the azcore logging package with specific log events:

const (
    EventConn     log.Event = "azeh.Conn"
    EventAuth     log.Event = "azeh.Auth"
    EventProducer log.Event = "azeh.Producer"
    EventConsumer log.Event = "azeh.Consumer"
)

Configure logging:

import (
    azlog "github.com/Azure/azure-sdk-for-go/sdk/azcore/log"
    "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/v2"
)

// Set up listener
azlog.SetListener(func(event azlog.Event, s string) {
    fmt.Printf("[%s] %s\n", event, s)
})

// Enable specific events
azlog.SetEvents(
    azeventhubs.EventConn,
    azeventhubs.EventAuth,
    azeventhubs.EventProducer,
    azeventhubs.EventConsumer,
)

Authentication

Using Azure Identity (Recommended)

import "github.com/Azure/azure-sdk-for-go/sdk/azidentity"

credential, err := azidentity.NewDefaultAzureCredential(nil)

producerClient, err := azeventhubs.NewProducerClient(
    "namespace.servicebus.windows.net",
    "eventhub-name",
    credential,
    nil,
)

Using Connection String

// Without EntityPath in connection string
producerClient, err := azeventhubs.NewProducerClientFromConnectionString(
    "Endpoint=sb://namespace.servicebus.windows.net/;SharedAccessKeyName=key;SharedAccessKey=secret",
    "eventhub-name",
    nil,
)

// With EntityPath in connection string
producerClient, err := azeventhubs.NewProducerClientFromConnectionString(
    "Endpoint=sb://namespace.servicebus.windows.net/;SharedAccessKeyName=key;SharedAccessKey=secret;EntityPath=eventhub-name",
    "",
    nil,
)

Event Hub Concepts

Partitions

Events in Event Hubs are organized into partitions. Each partition is an ordered sequence of events. You can:

  • Send to a specific partition using EventDataBatchOptions.PartitionID
  • Use partition keys for consistent routing with EventDataBatchOptions.PartitionKey
  • Let Event Hubs distribute events automatically by leaving both nil

Consumer Groups

Consumer groups allow multiple applications to each have a separate view of the event stream. Each consumer group maintains its own position in the stream. The default consumer group is azeventhubs.DefaultConsumerGroup (value: "$Default").

Checkpointing

Checkpoints track the last successfully processed event in a partition. This allows consumers to resume processing from where they left off after restarts or failures. Use the Processor with a CheckpointStore for automatic checkpoint management.

See Also