tessl install tessl/golang-github-com-azure-azure-sdk-for-go-sdk-messaging-azeventhubs-v2@2.0.0Azure Event Hubs client library for Go providing ProducerClient for sending events, ConsumerClient for manual partition-level consumption, and Processor for automatic load balancing with checkpoint management using Azure Blob Storage.
The ProducerClient is used to send events to Azure Event Hubs. Events are sent in batches for efficient transmission over the network.
func NewProducerClient(fullyQualifiedNamespace string, eventHub string, credential azcore.TokenCredential, options *ProducerClientOptions) (*ProducerClient, error)Creates a ProducerClient using a token credential from the azidentity package.
Parameters:
fullyQualifiedNamespace: Event Hubs namespace (e.g., "myeventhub.servicebus.windows.net")eventHub: Name of the event hubcredential: Token credential from azidentity (e.g., DefaultAzureCredential)options: Optional configuration settingsExample:
import (
"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/v2"
)
cred, err := azidentity.NewDefaultAzureCredential(nil)
if err != nil {
panic(err)
}
producerClient, err := azeventhubs.NewProducerClient(
"myeventhub.servicebus.windows.net",
"myhub",
cred,
nil,
)
if err != nil {
panic(err)
}
defer producerClient.Close(context.TODO())func NewProducerClientFromConnectionString(connectionString string, eventHub string, options *ProducerClientOptions) (*ProducerClient, error)Creates a ProducerClient from a connection string.
Parameters:
connectionString: Connection string from Azure portaleventHub: Name of the event hub (empty if EntityPath is in connection string)options: Optional configuration settingsConnection String Formats:
Without EntityPath (eventHub parameter required):
Endpoint=sb://namespace.servicebus.windows.net/;SharedAccessKeyName=key;SharedAccessKey=secretWith EntityPath (eventHub parameter must be empty):
Endpoint=sb://namespace.servicebus.windows.net/;SharedAccessKeyName=key;SharedAccessKey=secret;EntityPath=myhubExample:
producerClient, err := azeventhubs.NewProducerClientFromConnectionString(
"Endpoint=sb://myeventhub.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=secret",
"myhub",
nil,
)
if err != nil {
panic(err)
}
defer producerClient.Close(context.TODO())type ProducerClientOptions struct {
// ApplicationID that will be passed to the namespace
ApplicationID string
// CustomEndpoint address for establishing the connection to the service
CustomEndpoint string
// NewWebSocketConn is a function that can create a net.Conn for use with websockets
NewWebSocketConn func(ctx context.Context, params WebSocketConnParams) (net.Conn, error)
// RetryOptions controls how often operations are retried from this client
RetryOptions RetryOptions
// TLSConfig configures a client with a custom *tls.Config
TLSConfig *tls.Config
}Fields:
ApplicationID (string): Application identifier passed to the namespace for diagnosticsCustomEndpoint (string): Custom endpoint address (e.g., for using private endpoints)NewWebSocketConn (func): Function to create WebSocket connections (for firewall scenarios)RetryOptions (RetryOptions): Controls retry behavior for operationsTLSConfig (*tls.Config): Custom TLS configurationtype ProducerClient struct {
// Contains filtered or unexported fields
}The ProducerClient is used to send events to an Event Hub. You must call Close to avoid leaking resources.
func (pc *ProducerClient) NewEventDataBatch(ctx context.Context, options *EventDataBatchOptions) (*EventDataBatch, error)Creates an EventDataBatch which can contain multiple events. The batch ensures that events don't exceed the maximum size for the Event Hubs link.
Parameters:
ctx: Context for the operationoptions: Optional batch configurationReturns:
*EventDataBatch: Batch container for eventserror: Error if batch creation failsExample:
// Create batch with default options (Event Hubs picks partition)
batch, err := producerClient.NewEventDataBatch(context.TODO(), nil)
if err != nil {
panic(err)
}
// Create batch for specific partition
batch, err := producerClient.NewEventDataBatch(context.TODO(), &azeventhubs.EventDataBatchOptions{
PartitionID: to.Ptr("0"),
})
// Create batch with partition key for consistent routing
batch, err := producerClient.NewEventDataBatch(context.TODO(), &azeventhubs.EventDataBatchOptions{
PartitionKey: to.Ptr("user-123"),
})type EventDataBatchOptions struct {
// MaxBytes overrides the max size (in bytes) for a batch
// By default uses the max message size provided by the service
MaxBytes uint64
// PartitionKey is hashed to calculate the partition assignment
// Messages with the same PartitionKey are guaranteed to end up in the same partition
// Cannot be used with PartitionID
PartitionKey *string
// PartitionID is the ID of the partition to send these messages to
// Cannot be used with PartitionKey
PartitionID *string
}Fields:
MaxBytes (uint64): Maximum size in bytes for the batch (0 uses service default)PartitionKey (*string): Key for consistent partition routing (mutually exclusive with PartitionID)PartitionID (*string): Specific partition to send to (mutually exclusive with PartitionKey)func (pc *ProducerClient) SendEventDataBatch(ctx context.Context, batch *EventDataBatch, options *SendEventDataBatchOptions) errorSends an EventDataBatch to Event Hubs.
Parameters:
ctx: Context for the operationbatch: Batch of events to sendoptions: Optional send configuration (currently empty, reserved for future use)Returns:
error: Error if send failsExample:
err := producerClient.SendEventDataBatch(context.TODO(), batch, nil)
if err != nil {
panic(err)
}type SendEventDataBatchOptions struct {
// For future expansion
}Currently empty, reserved for future options.
type EventDataBatch struct {
// Contains filtered or unexported fields
}Container for batching events together for efficient sending.
func (edb *EventDataBatch) AddEventData(ed *EventData, options *AddEventDataOptions) errorAdds an EventData to the batch. Returns ErrEventDataTooLarge if the event cannot fit.
Parameters:
ed: Event data to addoptions: Optional add configuration (currently empty)Returns:
error: ErrEventDataTooLarge if event is too large, other errors for failuresExample:
import (
"errors"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/v2"
)
err := batch.AddEventData(&azeventhubs.EventData{
Body: []byte("hello world"),
}, nil)
if errors.Is(err, azeventhubs.ErrEventDataTooLarge) {
if batch.NumEvents() == 0 {
// This single event is too large
panic("Event exceeds maximum size")
}
// Batch is full, send it and create a new one
producerClient.SendEventDataBatch(context.TODO(), batch, nil)
batch, _ = producerClient.NewEventDataBatch(context.TODO(), nil)
batch.AddEventData(&azeventhubs.EventData{
Body: []byte("hello world"),
}, nil)
}func (edb *EventDataBatch) AddAMQPAnnotatedMessage(annotatedMessage *AMQPAnnotatedMessage, options *AddEventDataOptions) errorAdds a full AMQP message to the batch for advanced scenarios requiring low-level AMQP control.
Parameters:
annotatedMessage: AMQP annotated message to addoptions: Optional add configurationReturns:
error: ErrEventDataTooLarge if message is too large, other errors for failuresfunc (edb *EventDataBatch) NumBytes() uint64Returns the current size of the batch in bytes.
func (edb *EventDataBatch) NumEvents() int32Returns the number of events currently in the batch.
type AddEventDataOptions struct {
// For future expansion
}Currently empty, reserved for future options.
type EventData struct {
// Properties can be used to store custom metadata for a message
Properties map[string]any
// Body is the payload for a message
Body []byte
// ContentType describes the payload of the message, with a descriptor following
// the format of Content-Type, specified by RFC2045 (ex: "application/json")
ContentType *string
// CorrelationID is a client-specific id that can be used to mark or identify messages
// between clients. Can be a uint64, UUID, []byte, or string
CorrelationID any
// MessageID is an application-defined value that uniquely identifies
// the message and its payload. The identifier is a free-form string.
// If enabled, the duplicate detection feature identifies and removes further
// submissions of messages with the same MessageID
MessageID *string
}Fields:
Properties (map[string]any): Custom application properties (key-value metadata)Body ([]byte): Event payload as byte arrayContentType (*string): MIME content type descriptor (e.g., "application/json")CorrelationID (any): Client-specific identifier for correlation (uint64, UUID, []byte, or string)MessageID (*string): Unique message identifier for duplicate detectionExample:
event := &azeventhubs.EventData{
Body: []byte(`{"temperature": 72, "humidity": 45}`),
ContentType: to.Ptr("application/json"),
MessageID: to.Ptr("msg-12345"),
Properties: map[string]any{
"deviceId": "sensor-001",
"timestamp": time.Now().Unix(),
},
}func (pc *ProducerClient) GetEventHubProperties(ctx context.Context, options *GetEventHubPropertiesOptions) (EventHubProperties, error)Gets properties of the Event Hub, including partition IDs and creation time.
Parameters:
ctx: Context for the operationoptions: Optional configuration (currently empty)Returns:
EventHubProperties: Event hub metadataerror: Error if operation failsExample:
props, err := producerClient.GetEventHubProperties(context.TODO(), nil)
if err != nil {
panic(err)
}
fmt.Printf("Event Hub: %s\n", props.Name)
fmt.Printf("Partitions: %v\n", props.PartitionIDs)
fmt.Printf("Created: %s\n", props.CreatedOn)type GetEventHubPropertiesOptions struct {
// For future expansion
}type EventHubProperties struct {
// CreatedOn is the time when the event hub was created
CreatedOn time.Time
// Name of the event hub
Name string
// PartitionIDs for the event hub
PartitionIDs []string
// GeoReplicationEnabled is true if the event hub has geo-replication enabled
GeoReplicationEnabled bool
}Fields:
CreatedOn (time.Time): UTC time when the Event Hub was createdName (string): Name of the Event HubPartitionIDs ([]string): List of partition identifiersGeoReplicationEnabled (bool): Whether geo-replication is enabledfunc (pc *ProducerClient) GetPartitionProperties(ctx context.Context, partitionID string, options *GetPartitionPropertiesOptions) (PartitionProperties, error)Gets properties for a specific partition, including sequence numbers and last enqueued event information.
Parameters:
ctx: Context for the operationpartitionID: Partition identifieroptions: Optional configuration (currently empty)Returns:
PartitionProperties: Partition metadataerror: Error if operation failsExample:
props, err := producerClient.GetPartitionProperties(context.TODO(), "0", nil)
if err != nil {
panic(err)
}
fmt.Printf("Partition: %s\n", props.PartitionID)
fmt.Printf("Beginning sequence: %d\n", props.BeginningSequenceNumber)
fmt.Printf("Last sequence: %d\n", props.LastEnqueuedSequenceNumber)
fmt.Printf("Last enqueued: %s\n", props.LastEnqueuedOn)
fmt.Printf("Is empty: %v\n", props.IsEmpty)type GetPartitionPropertiesOptions struct {
// For future expansion
}type PartitionProperties struct {
// BeginningSequenceNumber is the first sequence number for a partition
BeginningSequenceNumber int64
// EventHubName is the name of the Event Hub for this partition
EventHubName string
// IsEmpty is true if the partition is empty, false otherwise
IsEmpty bool
// LastEnqueuedOffset is the offset of latest enqueued event
LastEnqueuedOffset string
// LastEnqueuedOn is the date of latest enqueued event
LastEnqueuedOn time.Time
// LastEnqueuedSequenceNumber is the sequence number of the latest enqueued event
LastEnqueuedSequenceNumber int64
// PartitionID is the partition ID of this partition
PartitionID string
}Fields:
BeginningSequenceNumber (int64): First available sequence number in the partitionEventHubName (string): Name of the Event HubIsEmpty (bool): Whether the partition contains any eventsLastEnqueuedOffset (string): Offset of the most recently enqueued eventLastEnqueuedOn (time.Time): UTC time when the last event was enqueuedLastEnqueuedSequenceNumber (int64): Sequence number of the most recently enqueued eventPartitionID (string): Identifier for this partitionfunc (pc *ProducerClient) Close(ctx context.Context) errorCloses the ProducerClient and releases all resources. You MUST call this to avoid leaking resources.
Parameters:
ctx: Context for the close operationReturns:
error: Error if close failsExample:
err := producerClient.Close(context.TODO())
if err != nil {
panic(err)
}var ErrEventDataTooLarge errorError returned when an event is too large to fit in the current batch. Check this error when adding events to handle batch overflow:
import "errors"
err := batch.AddEventData(event, nil)
if errors.Is(err, azeventhubs.ErrEventDataTooLarge) {
// Handle batch full scenario
if batch.NumEvents() > 0 {
// Send current batch and create new one
producerClient.SendEventDataBatch(context.TODO(), batch, nil)
batch, _ = producerClient.NewEventDataBatch(context.TODO(), options)
} else {
// Single event is too large
panic("Event exceeds maximum size")
}
}package main
import (
"context"
"errors"
"fmt"
"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/v2"
)
func main() {
// Create producer client
cred, err := azidentity.NewDefaultAzureCredential(nil)
if err != nil {
panic(err)
}
producerClient, err := azeventhubs.NewProducerClient(
"myeventhub.servicebus.windows.net",
"myhub",
cred,
nil,
)
if err != nil {
panic(err)
}
defer producerClient.Close(context.TODO())
// Create events
events := []*azeventhubs.EventData{
{Body: []byte("event 1")},
{Body: []byte("event 2")},
{Body: []byte("event 3")},
}
// Create batch
batchOptions := &azeventhubs.EventDataBatchOptions{
// Optional: target specific partition
// PartitionID: to.Ptr("0"),
// Optional: use partition key for routing
// PartitionKey: to.Ptr("user-123"),
}
batch, err := producerClient.NewEventDataBatch(context.TODO(), batchOptions)
if err != nil {
panic(err)
}
// Add events to batch
for i, event := range events {
err = batch.AddEventData(event, nil)
if errors.Is(err, azeventhubs.ErrEventDataTooLarge) {
if batch.NumEvents() == 0 {
panic(fmt.Sprintf("Event %d is too large", i))
}
// Send current batch
if err := producerClient.SendEventDataBatch(context.TODO(), batch, nil); err != nil {
panic(err)
}
// Create new batch
batch, err = producerClient.NewEventDataBatch(context.TODO(), batchOptions)
if err != nil {
panic(err)
}
// Retry adding event
if err := batch.AddEventData(event, nil); err != nil {
panic(err)
}
} else if err != nil {
panic(err)
}
}
// Send final batch
if batch.NumEvents() > 0 {
if err := producerClient.SendEventDataBatch(context.TODO(), batch, nil); err != nil {
panic(err)
}
}
fmt.Printf("Successfully sent %d events\n", len(events))
}