or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
golangpkg:golang/github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/v2@v2.0.1

docs

checkpoint-store.mdconfiguration.mdconsumer.mdevent-types.mdindex.mdprocessor.mdproducer.md
tile.json

tessl/golang-github-com-azure-azure-sdk-for-go-sdk-messaging-azeventhubs-v2

tessl install tessl/golang-github-com-azure-azure-sdk-for-go-sdk-messaging-azeventhubs-v2@2.0.0

Azure Event Hubs client library for Go providing ProducerClient for sending events, ConsumerClient for manual partition-level consumption, and Processor for automatic load balancing with checkpoint management using Azure Blob Storage.

producer.mddocs/

Producer Client - Sending Events

The ProducerClient is used to send events to Azure Event Hubs. Events are sent in batches for efficient transmission over the network.

Creating a Producer Client

Using Azure Identity

func NewProducerClient(fullyQualifiedNamespace string, eventHub string, credential azcore.TokenCredential, options *ProducerClientOptions) (*ProducerClient, error)

Creates a ProducerClient using a token credential from the azidentity package.

Parameters:

  • fullyQualifiedNamespace: Event Hubs namespace (e.g., "myeventhub.servicebus.windows.net")
  • eventHub: Name of the event hub
  • credential: Token credential from azidentity (e.g., DefaultAzureCredential)
  • options: Optional configuration settings

Example:

import (
    "github.com/Azure/azure-sdk-for-go/sdk/azidentity"
    "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/v2"
)

cred, err := azidentity.NewDefaultAzureCredential(nil)
if err != nil {
    panic(err)
}

producerClient, err := azeventhubs.NewProducerClient(
    "myeventhub.servicebus.windows.net",
    "myhub",
    cred,
    nil,
)
if err != nil {
    panic(err)
}
defer producerClient.Close(context.TODO())

Using Connection String

func NewProducerClientFromConnectionString(connectionString string, eventHub string, options *ProducerClientOptions) (*ProducerClient, error)

Creates a ProducerClient from a connection string.

Parameters:

  • connectionString: Connection string from Azure portal
  • eventHub: Name of the event hub (empty if EntityPath is in connection string)
  • options: Optional configuration settings

Connection String Formats:

Without EntityPath (eventHub parameter required):

Endpoint=sb://namespace.servicebus.windows.net/;SharedAccessKeyName=key;SharedAccessKey=secret

With EntityPath (eventHub parameter must be empty):

Endpoint=sb://namespace.servicebus.windows.net/;SharedAccessKeyName=key;SharedAccessKey=secret;EntityPath=myhub

Example:

producerClient, err := azeventhubs.NewProducerClientFromConnectionString(
    "Endpoint=sb://myeventhub.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=secret",
    "myhub",
    nil,
)
if err != nil {
    panic(err)
}
defer producerClient.Close(context.TODO())

ProducerClientOptions

type ProducerClientOptions struct {
    // ApplicationID that will be passed to the namespace
    ApplicationID string

    // CustomEndpoint address for establishing the connection to the service
    CustomEndpoint string

    // NewWebSocketConn is a function that can create a net.Conn for use with websockets
    NewWebSocketConn func(ctx context.Context, params WebSocketConnParams) (net.Conn, error)

    // RetryOptions controls how often operations are retried from this client
    RetryOptions RetryOptions

    // TLSConfig configures a client with a custom *tls.Config
    TLSConfig *tls.Config
}

Fields:

  • ApplicationID (string): Application identifier passed to the namespace for diagnostics
  • CustomEndpoint (string): Custom endpoint address (e.g., for using private endpoints)
  • NewWebSocketConn (func): Function to create WebSocket connections (for firewall scenarios)
  • RetryOptions (RetryOptions): Controls retry behavior for operations
  • TLSConfig (*tls.Config): Custom TLS configuration

ProducerClient Type

type ProducerClient struct {
    // Contains filtered or unexported fields
}

The ProducerClient is used to send events to an Event Hub. You must call Close to avoid leaking resources.

Sending Events

Creating Event Data Batches

func (pc *ProducerClient) NewEventDataBatch(ctx context.Context, options *EventDataBatchOptions) (*EventDataBatch, error)

Creates an EventDataBatch which can contain multiple events. The batch ensures that events don't exceed the maximum size for the Event Hubs link.

Parameters:

  • ctx: Context for the operation
  • options: Optional batch configuration

Returns:

  • *EventDataBatch: Batch container for events
  • error: Error if batch creation fails

Example:

// Create batch with default options (Event Hubs picks partition)
batch, err := producerClient.NewEventDataBatch(context.TODO(), nil)
if err != nil {
    panic(err)
}

// Create batch for specific partition
batch, err := producerClient.NewEventDataBatch(context.TODO(), &azeventhubs.EventDataBatchOptions{
    PartitionID: to.Ptr("0"),
})

// Create batch with partition key for consistent routing
batch, err := producerClient.NewEventDataBatch(context.TODO(), &azeventhubs.EventDataBatchOptions{
    PartitionKey: to.Ptr("user-123"),
})

EventDataBatchOptions

type EventDataBatchOptions struct {
    // MaxBytes overrides the max size (in bytes) for a batch
    // By default uses the max message size provided by the service
    MaxBytes uint64

    // PartitionKey is hashed to calculate the partition assignment
    // Messages with the same PartitionKey are guaranteed to end up in the same partition
    // Cannot be used with PartitionID
    PartitionKey *string

    // PartitionID is the ID of the partition to send these messages to
    // Cannot be used with PartitionKey
    PartitionID *string
}

Fields:

  • MaxBytes (uint64): Maximum size in bytes for the batch (0 uses service default)
  • PartitionKey (*string): Key for consistent partition routing (mutually exclusive with PartitionID)
  • PartitionID (*string): Specific partition to send to (mutually exclusive with PartitionKey)

Sending Event Data Batches

func (pc *ProducerClient) SendEventDataBatch(ctx context.Context, batch *EventDataBatch, options *SendEventDataBatchOptions) error

Sends an EventDataBatch to Event Hubs.

Parameters:

  • ctx: Context for the operation
  • batch: Batch of events to send
  • options: Optional send configuration (currently empty, reserved for future use)

Returns:

  • error: Error if send fails

Example:

err := producerClient.SendEventDataBatch(context.TODO(), batch, nil)
if err != nil {
    panic(err)
}

SendEventDataBatchOptions

type SendEventDataBatchOptions struct {
    // For future expansion
}

Currently empty, reserved for future options.

EventDataBatch Type

type EventDataBatch struct {
    // Contains filtered or unexported fields
}

Container for batching events together for efficient sending.

Adding Events to Batch

func (edb *EventDataBatch) AddEventData(ed *EventData, options *AddEventDataOptions) error

Adds an EventData to the batch. Returns ErrEventDataTooLarge if the event cannot fit.

Parameters:

  • ed: Event data to add
  • options: Optional add configuration (currently empty)

Returns:

  • error: ErrEventDataTooLarge if event is too large, other errors for failures

Example:

import (
    "errors"
    "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/v2"
)

err := batch.AddEventData(&azeventhubs.EventData{
    Body: []byte("hello world"),
}, nil)

if errors.Is(err, azeventhubs.ErrEventDataTooLarge) {
    if batch.NumEvents() == 0 {
        // This single event is too large
        panic("Event exceeds maximum size")
    }

    // Batch is full, send it and create a new one
    producerClient.SendEventDataBatch(context.TODO(), batch, nil)
    batch, _ = producerClient.NewEventDataBatch(context.TODO(), nil)
    batch.AddEventData(&azeventhubs.EventData{
        Body: []byte("hello world"),
    }, nil)
}

Adding AMQP Annotated Messages to Batch

func (edb *EventDataBatch) AddAMQPAnnotatedMessage(annotatedMessage *AMQPAnnotatedMessage, options *AddEventDataOptions) error

Adds a full AMQP message to the batch for advanced scenarios requiring low-level AMQP control.

Parameters:

  • annotatedMessage: AMQP annotated message to add
  • options: Optional add configuration

Returns:

  • error: ErrEventDataTooLarge if message is too large, other errors for failures

Batch Information Methods

func (edb *EventDataBatch) NumBytes() uint64

Returns the current size of the batch in bytes.

func (edb *EventDataBatch) NumEvents() int32

Returns the number of events currently in the batch.

AddEventDataOptions

type AddEventDataOptions struct {
    // For future expansion
}

Currently empty, reserved for future options.

EventData Type

type EventData struct {
    // Properties can be used to store custom metadata for a message
    Properties map[string]any

    // Body is the payload for a message
    Body []byte

    // ContentType describes the payload of the message, with a descriptor following
    // the format of Content-Type, specified by RFC2045 (ex: "application/json")
    ContentType *string

    // CorrelationID is a client-specific id that can be used to mark or identify messages
    // between clients. Can be a uint64, UUID, []byte, or string
    CorrelationID any

    // MessageID is an application-defined value that uniquely identifies
    // the message and its payload. The identifier is a free-form string.
    // If enabled, the duplicate detection feature identifies and removes further
    // submissions of messages with the same MessageID
    MessageID *string
}

Fields:

  • Properties (map[string]any): Custom application properties (key-value metadata)
  • Body ([]byte): Event payload as byte array
  • ContentType (*string): MIME content type descriptor (e.g., "application/json")
  • CorrelationID (any): Client-specific identifier for correlation (uint64, UUID, []byte, or string)
  • MessageID (*string): Unique message identifier for duplicate detection

Example:

event := &azeventhubs.EventData{
    Body: []byte(`{"temperature": 72, "humidity": 45}`),
    ContentType: to.Ptr("application/json"),
    MessageID: to.Ptr("msg-12345"),
    Properties: map[string]any{
        "deviceId": "sensor-001",
        "timestamp": time.Now().Unix(),
    },
}

Management Operations

Getting Event Hub Properties

func (pc *ProducerClient) GetEventHubProperties(ctx context.Context, options *GetEventHubPropertiesOptions) (EventHubProperties, error)

Gets properties of the Event Hub, including partition IDs and creation time.

Parameters:

  • ctx: Context for the operation
  • options: Optional configuration (currently empty)

Returns:

  • EventHubProperties: Event hub metadata
  • error: Error if operation fails

Example:

props, err := producerClient.GetEventHubProperties(context.TODO(), nil)
if err != nil {
    panic(err)
}

fmt.Printf("Event Hub: %s\n", props.Name)
fmt.Printf("Partitions: %v\n", props.PartitionIDs)
fmt.Printf("Created: %s\n", props.CreatedOn)

GetEventHubPropertiesOptions

type GetEventHubPropertiesOptions struct {
    // For future expansion
}

EventHubProperties

type EventHubProperties struct {
    // CreatedOn is the time when the event hub was created
    CreatedOn time.Time

    // Name of the event hub
    Name string

    // PartitionIDs for the event hub
    PartitionIDs []string

    // GeoReplicationEnabled is true if the event hub has geo-replication enabled
    GeoReplicationEnabled bool
}

Fields:

  • CreatedOn (time.Time): UTC time when the Event Hub was created
  • Name (string): Name of the Event Hub
  • PartitionIDs ([]string): List of partition identifiers
  • GeoReplicationEnabled (bool): Whether geo-replication is enabled

Getting Partition Properties

func (pc *ProducerClient) GetPartitionProperties(ctx context.Context, partitionID string, options *GetPartitionPropertiesOptions) (PartitionProperties, error)

Gets properties for a specific partition, including sequence numbers and last enqueued event information.

Parameters:

  • ctx: Context for the operation
  • partitionID: Partition identifier
  • options: Optional configuration (currently empty)

Returns:

  • PartitionProperties: Partition metadata
  • error: Error if operation fails

Example:

props, err := producerClient.GetPartitionProperties(context.TODO(), "0", nil)
if err != nil {
    panic(err)
}

fmt.Printf("Partition: %s\n", props.PartitionID)
fmt.Printf("Beginning sequence: %d\n", props.BeginningSequenceNumber)
fmt.Printf("Last sequence: %d\n", props.LastEnqueuedSequenceNumber)
fmt.Printf("Last enqueued: %s\n", props.LastEnqueuedOn)
fmt.Printf("Is empty: %v\n", props.IsEmpty)

GetPartitionPropertiesOptions

type GetPartitionPropertiesOptions struct {
    // For future expansion
}

PartitionProperties

type PartitionProperties struct {
    // BeginningSequenceNumber is the first sequence number for a partition
    BeginningSequenceNumber int64

    // EventHubName is the name of the Event Hub for this partition
    EventHubName string

    // IsEmpty is true if the partition is empty, false otherwise
    IsEmpty bool

    // LastEnqueuedOffset is the offset of latest enqueued event
    LastEnqueuedOffset string

    // LastEnqueuedOn is the date of latest enqueued event
    LastEnqueuedOn time.Time

    // LastEnqueuedSequenceNumber is the sequence number of the latest enqueued event
    LastEnqueuedSequenceNumber int64

    // PartitionID is the partition ID of this partition
    PartitionID string
}

Fields:

  • BeginningSequenceNumber (int64): First available sequence number in the partition
  • EventHubName (string): Name of the Event Hub
  • IsEmpty (bool): Whether the partition contains any events
  • LastEnqueuedOffset (string): Offset of the most recently enqueued event
  • LastEnqueuedOn (time.Time): UTC time when the last event was enqueued
  • LastEnqueuedSequenceNumber (int64): Sequence number of the most recently enqueued event
  • PartitionID (string): Identifier for this partition

Closing the Client

func (pc *ProducerClient) Close(ctx context.Context) error

Closes the ProducerClient and releases all resources. You MUST call this to avoid leaking resources.

Parameters:

  • ctx: Context for the close operation

Returns:

  • error: Error if close fails

Example:

err := producerClient.Close(context.TODO())
if err != nil {
    panic(err)
}

Error Handling

ErrEventDataTooLarge

var ErrEventDataTooLarge error

Error returned when an event is too large to fit in the current batch. Check this error when adding events to handle batch overflow:

import "errors"

err := batch.AddEventData(event, nil)
if errors.Is(err, azeventhubs.ErrEventDataTooLarge) {
    // Handle batch full scenario
    if batch.NumEvents() > 0 {
        // Send current batch and create new one
        producerClient.SendEventDataBatch(context.TODO(), batch, nil)
        batch, _ = producerClient.NewEventDataBatch(context.TODO(), options)
    } else {
        // Single event is too large
        panic("Event exceeds maximum size")
    }
}

Complete Example

package main

import (
    "context"
    "errors"
    "fmt"
    "github.com/Azure/azure-sdk-for-go/sdk/azidentity"
    "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/v2"
)

func main() {
    // Create producer client
    cred, err := azidentity.NewDefaultAzureCredential(nil)
    if err != nil {
        panic(err)
    }

    producerClient, err := azeventhubs.NewProducerClient(
        "myeventhub.servicebus.windows.net",
        "myhub",
        cred,
        nil,
    )
    if err != nil {
        panic(err)
    }
    defer producerClient.Close(context.TODO())

    // Create events
    events := []*azeventhubs.EventData{
        {Body: []byte("event 1")},
        {Body: []byte("event 2")},
        {Body: []byte("event 3")},
    }

    // Create batch
    batchOptions := &azeventhubs.EventDataBatchOptions{
        // Optional: target specific partition
        // PartitionID: to.Ptr("0"),

        // Optional: use partition key for routing
        // PartitionKey: to.Ptr("user-123"),
    }

    batch, err := producerClient.NewEventDataBatch(context.TODO(), batchOptions)
    if err != nil {
        panic(err)
    }

    // Add events to batch
    for i, event := range events {
        err = batch.AddEventData(event, nil)

        if errors.Is(err, azeventhubs.ErrEventDataTooLarge) {
            if batch.NumEvents() == 0 {
                panic(fmt.Sprintf("Event %d is too large", i))
            }

            // Send current batch
            if err := producerClient.SendEventDataBatch(context.TODO(), batch, nil); err != nil {
                panic(err)
            }

            // Create new batch
            batch, err = producerClient.NewEventDataBatch(context.TODO(), batchOptions)
            if err != nil {
                panic(err)
            }

            // Retry adding event
            if err := batch.AddEventData(event, nil); err != nil {
                panic(err)
            }
        } else if err != nil {
            panic(err)
        }
    }

    // Send final batch
    if batch.NumEvents() > 0 {
        if err := producerClient.SendEventDataBatch(context.TODO(), batch, nil); err != nil {
            panic(err)
        }
    }

    fmt.Printf("Successfully sent %d events\n", len(events))
}