or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
golangpkg:golang/github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/v2@v2.0.1

docs

checkpoint-store.mdconfiguration.mdconsumer.mdevent-types.mdindex.mdprocessor.mdproducer.md
tile.json

tessl/golang-github-com-azure-azure-sdk-for-go-sdk-messaging-azeventhubs-v2

tessl install tessl/golang-github-com-azure-azure-sdk-for-go-sdk-messaging-azeventhubs-v2@2.0.0

Azure Event Hubs client library for Go providing ProducerClient for sending events, ConsumerClient for manual partition-level consumption, and Processor for automatic load balancing with checkpoint management using Azure Blob Storage.

event-types.mddocs/

Event Data Types

This document covers the event data types used for sending and receiving events, as well as the low-level AMQP message representations.

EventData

EventData represents an event to be sent to Event Hubs.

type EventData struct {
    // Properties can be used to store custom metadata for a message.
    Properties map[string]any

    // Body is the payload for a message.
    Body []byte

    // ContentType describes the payload of the message, with a descriptor following
    // the format of Content-Type, specified by RFC2045 (ex: "application/json").
    ContentType *string

    // CorrelationID is a client-specific id that can be used to mark or identify messages
    // between clients.
    // CorrelationID can be a uint64, UUID, []byte, or string
    CorrelationID any

    // MessageID is an application-defined value that uniquely identifies
    // the message and its payload. The identifier is a free-form string.
    //
    // If enabled, the duplicate detection feature identifies and removes further submissions
    // of messages with the same MessageId.
    MessageID *string
}

Fields:

  • Properties (map[string]any) - Custom application properties for the message
  • Body ([]byte) - The message payload
  • ContentType (*string) - Content type descriptor (RFC2045 format, e.g., "application/json")
  • CorrelationID (any) - Client-specific identifier for message correlation (types: uint64, UUID, []byte, string)
  • MessageID (*string) - Unique identifier for the message (used for duplicate detection if enabled)

Example:

import "github.com/Azure/azure-sdk-for-go/sdk/azcore/to"

event := &azeventhubs.EventData{
    Body:        []byte(`{"temperature": 25.5, "humidity": 60}`),
    ContentType: to.Ptr("application/json"),
    MessageID:   to.Ptr("msg-12345"),
    CorrelationID: "correlation-abc",
    Properties: map[string]any{
        "sensorId":  "sensor-01",
        "location":  "building-a",
        "timestamp": time.Now().Unix(),
        "priority":  5,
    },
}

ReceivedEventData

ReceivedEventData represents an event received from Event Hubs. It embeds EventData and adds Event Hubs-specific metadata.

type ReceivedEventData struct {
    EventData

    // EnqueuedTime is the UTC time when the message was accepted and stored by Event Hubs.
    EnqueuedTime *time.Time

    // PartitionKey is used with a partitioned entity and enables assigning related messages
    // to the same internal partition. This ensures that the submission sequence order is correctly
    // recorded. The partition is chosen by a hash function in Event Hubs and cannot be chosen
    // directly.
    PartitionKey *string

    // Offset is the offset of the event.
    Offset string

    // RawAMQPMessage is the AMQP message, as received by the client. This can be useful to get access
    // to properties that are not exposed by ReceivedEventData such as payloads encoded into the
    // Value or Sequence section, payloads sent as multiple Data sections, as well as Footer
    // and Header fields.
    RawAMQPMessage *AMQPAnnotatedMessage

    // SequenceNumber is a unique number assigned to a message by Event Hubs.
    SequenceNumber int64

    // Properties set by the Event Hubs service.
    SystemProperties map[string]any
}

Inherited Fields from EventData:

  • Properties (map[string]any) - Custom application properties
  • Body ([]byte) - The message payload
  • ContentType (*string) - Content type descriptor
  • CorrelationID (any) - Client-specific correlation identifier
  • MessageID (*string) - Unique message identifier

Additional Fields:

  • EnqueuedTime (*time.Time) - When Event Hubs received and stored the event (UTC)
  • PartitionKey (*string) - The partition key used for routing (if any)
  • Offset (string) - The offset of the event in the partition
  • RawAMQPMessage (*AMQPAnnotatedMessage) - Full AMQP message for low-level access
  • SequenceNumber (int64) - Unique sequence number assigned by Event Hubs
  • SystemProperties (map[string]any) - Event Hubs system properties (beyond the standard fields)

Example:

events, err := partitionClient.ReceiveEvents(context.TODO(), 10, nil)
if err != nil {
    // handle error
}

for _, event := range events {
    // Standard EventData fields
    fmt.Printf("Body: %s\n", string(event.Body))
    if event.ContentType != nil {
        fmt.Printf("Content Type: %s\n", *event.ContentType)
    }
    if event.MessageID != nil {
        fmt.Printf("Message ID: %s\n", *event.MessageID)
    }

    // Event Hubs metadata
    fmt.Printf("Sequence Number: %d\n", event.SequenceNumber)
    fmt.Printf("Offset: %s\n", event.Offset)
    if event.EnqueuedTime != nil {
        fmt.Printf("Enqueued Time: %s\n", event.EnqueuedTime.Format(time.RFC3339))
    }
    if event.PartitionKey != nil {
        fmt.Printf("Partition Key: %s\n", *event.PartitionKey)
    }

    // Custom properties
    for key, value := range event.Properties {
        fmt.Printf("Property %s: %v\n", key, value)
    }

    // System properties (if any)
    for key, value := range event.SystemProperties {
        fmt.Printf("System Property %s: %v\n", key, value)
    }

    // Access raw AMQP message if needed
    if event.RawAMQPMessage != nil {
        // Access AMQP-specific fields
        if event.RawAMQPMessage.Header != nil {
            fmt.Printf("AMQP Priority: %d\n", event.RawAMQPMessage.Header.Priority)
        }
    }
}

AMQP Message Types

For advanced scenarios, you can work directly with AMQP message types. These provide full control over the AMQP message format.

AMQPAnnotatedMessage

AMQPAnnotatedMessage represents the full AMQP message structure.

type AMQPAnnotatedMessage struct {
    // ApplicationProperties corresponds to the "application-properties" section of an AMQP message.
    //
    // The values of the map are restricted to AMQP simple types, as listed in the comment for AMQPAnnotatedMessage.
    ApplicationProperties map[string]any

    // Body represents the body of an AMQP message.
    Body AMQPAnnotatedMessageBody

    // DeliveryAnnotations corresponds to the "delivery-annotations" section in an AMQP message.
    //
    // The values of the map are restricted to AMQP simple types, as listed in the comment for AMQPAnnotatedMessage.
    DeliveryAnnotations map[any]any

    // DeliveryTag corresponds to the delivery-tag property of the TRANSFER frame
    // for this message.
    DeliveryTag []byte

    // Footer is the transport footers for this AMQP message.
    //
    // The values of the map are restricted to AMQP simple types, as listed in the comment for AMQPAnnotatedMessage.
    Footer map[any]any

    // Header is the transport headers for this AMQP message.
    Header *AMQPAnnotatedMessageHeader

    // MessageAnnotations corresponds to the message-annotations section of an AMQP message.
    //
    // The values of the map are restricted to AMQP simple types, as listed in the comment for AMQPAnnotatedMessage.
    MessageAnnotations map[any]any

    // Properties corresponds to the properties section of an AMQP message.
    Properties *AMQPAnnotatedMessageProperties
}

AMQP Simple Types:

The following types are supported in AMQP maps and properties:

  • int (any size), uint (any size)
  • float (any size)
  • string
  • bool
  • time.Time

Fields:

  • ApplicationProperties (map[string]any) - Application-specific properties (AMQP simple types)
  • Body (AMQPAnnotatedMessageBody) - The message body
  • DeliveryAnnotations (map[any]any) - Delivery annotations (AMQP simple types)
  • DeliveryTag ([]byte) - Delivery tag from the TRANSFER frame
  • Footer (map[any]any) - Transport footers (AMQP simple types)
  • Header (*AMQPAnnotatedMessageHeader) - Transport headers
  • MessageAnnotations (map[any]any) - Message annotations (AMQP simple types)
  • Properties (*AMQPAnnotatedMessageProperties) - AMQP message properties

Reference: AMQP 1.0 Message Format Specification

AMQPAnnotatedMessageProperties

type AMQPAnnotatedMessageProperties struct {
    // AbsoluteExpiryTime corresponds to the 'absolute-expiry-time' property.
    AbsoluteExpiryTime *time.Time

    // ContentEncoding corresponds to the 'content-encoding' property.
    ContentEncoding *string

    // ContentType corresponds to the 'content-type' property
    ContentType *string

    // CorrelationID corresponds to the 'correlation-id' property.
    // The type of CorrelationID can be a uint64, UUID, []byte, or a string
    CorrelationID any

    // CreationTime corresponds to the 'creation-time' property.
    CreationTime *time.Time

    // GroupID corresponds to the 'group-id' property.
    GroupID *string

    // GroupSequence corresponds to the 'group-sequence' property.
    GroupSequence *uint32

    // MessageID corresponds to the 'message-id' property.
    // The type of MessageID can be a uint64, UUID, []byte, or string
    MessageID any

    // ReplyTo corresponds to the 'reply-to' property.
    ReplyTo *string

    // ReplyToGroupID corresponds to the 'reply-to-group-id' property.
    ReplyToGroupID *string

    // Subject corresponds to the 'subject' property.
    Subject *string

    // To corresponds to the 'to' property.
    To *string

    // UserID corresponds to the 'user-id' property.
    UserID []byte
}

Fields:

  • AbsoluteExpiryTime (*time.Time) - Absolute expiry time for the message
  • ContentEncoding (*string) - Content encoding descriptor
  • ContentType (*string) - Content type descriptor
  • CorrelationID (any) - Correlation identifier (types: uint64, UUID, []byte, string)
  • CreationTime (*time.Time) - Message creation time
  • GroupID (*string) - Group identifier for message sequences
  • GroupSequence (*uint32) - Position in a group sequence
  • MessageID (any) - Unique message identifier (types: uint64, UUID, []byte, string)
  • ReplyTo (*string) - Reply-to address
  • ReplyToGroupID (*string) - Reply-to group identifier
  • Subject (*string) - Message subject
  • To (*string) - Destination address
  • UserID ([]byte) - User identifier

Reference: AMQP 1.0 Properties

AMQPAnnotatedMessageHeader

type AMQPAnnotatedMessageHeader struct {
    // DeliveryCount is the number of unsuccessful previous attempts to deliver this message.
    // It corresponds to the 'delivery-count' property.
    DeliveryCount uint32

    // Durable corresponds to the 'durable' property.
    Durable bool

    // FirstAcquirer corresponds to the 'first-acquirer' property.
    FirstAcquirer bool

    // Priority corresponds to the 'priority' property.
    Priority uint8

    // TTL corresponds to the 'ttl' property.
    TTL time.Duration
}

Fields:

  • DeliveryCount (uint32) - Number of unsuccessful delivery attempts
  • Durable (bool) - Whether the message is durable
  • FirstAcquirer (bool) - Whether this is the first acquirer
  • Priority (uint8) - Message priority (0-255)
  • TTL (time.Duration) - Time to live for the message

Reference: AMQP 1.0 Header

AMQPAnnotatedMessageBody

type AMQPAnnotatedMessageBody struct {
    // Data is encoded/decoded as multiple data sections in the body.
    Data [][]byte

    // Sequence is encoded/decoded as one or more amqp-sequence sections in the body.
    //
    // The values of the slices are are restricted to AMQP simple types, as listed in the comment for AMQPAnnotatedMessage.
    Sequence [][]any

    // Value is encoded/decoded as the amqp-value section in the body.
    //
    // The type of Value can be any of the AMQP simple types, as listed in the comment for AMQPAnnotatedMessage,
    // as well as slices or maps of AMQP simple types.
    Value any
}

Fields:

  • Data ([][]byte) - Multiple data sections (most common for Event Hubs)
  • Sequence ([][]any) - One or more AMQP sequence sections (AMQP simple types)
  • Value (any) - AMQP value section (AMQP simple types, slices, or maps)

Important: Only one of these fields should be used at a time. They are mutually exclusive.

Using AMQP Messages

Sending AMQP Messages

func (b *EventDataBatch) AddAMQPAnnotatedMessage(
    annotatedMessage *AMQPAnnotatedMessage,
    options *AddEventDataOptions,
) error

You can send AMQP messages directly for full control over the message structure.

Example:

import "github.com/Azure/azure-sdk-for-go/sdk/azcore/to"

amqpMsg := &azeventhubs.AMQPAnnotatedMessage{
    Body: azeventhubs.AMQPAnnotatedMessageBody{
        Data: [][]byte{
            []byte("first data section"),
            []byte("second data section"),
        },
    },
    Properties: &azeventhubs.AMQPAnnotatedMessageProperties{
        MessageID:   "msg-12345",
        ContentType: to.Ptr("application/octet-stream"),
        Subject:     to.Ptr("sensor-data"),
        CreationTime: to.Ptr(time.Now()),
    },
    Header: &azeventhubs.AMQPAnnotatedMessageHeader{
        Durable:  true,
        Priority: 5,
        TTL:      1 * time.Hour,
    },
    ApplicationProperties: map[string]any{
        "deviceId":  "device-001",
        "location":  "warehouse-a",
        "temperature": 25.5,
    },
    MessageAnnotations: map[any]any{
        "x-custom-annotation": "value",
    },
}

batch, err := producerClient.NewEventDataBatch(context.TODO(), nil)
if err != nil {
    // handle error
}

err = batch.AddAMQPAnnotatedMessage(amqpMsg, nil)
if err != nil {
    // handle error
}

err = producerClient.SendEventDataBatch(context.TODO(), batch, nil)
if err != nil {
    // handle error
}

Receiving AMQP Messages

Access the full AMQP message through the RawAMQPMessage field of ReceivedEventData.

Example:

events, err := partitionClient.ReceiveEvents(context.TODO(), 10, nil)
if err != nil {
    // handle error
}

for _, event := range events {
    rawMsg := event.RawAMQPMessage
    if rawMsg == nil {
        continue
    }

    // Access header
    if rawMsg.Header != nil {
        fmt.Printf("Priority: %d\n", rawMsg.Header.Priority)
        fmt.Printf("TTL: %s\n", rawMsg.Header.TTL)
        fmt.Printf("Durable: %v\n", rawMsg.Header.Durable)
    }

    // Access properties
    if rawMsg.Properties != nil {
        if rawMsg.Properties.Subject != nil {
            fmt.Printf("Subject: %s\n", *rawMsg.Properties.Subject)
        }
        if rawMsg.Properties.CreationTime != nil {
            fmt.Printf("Creation Time: %s\n", rawMsg.Properties.CreationTime.Format(time.RFC3339))
        }
    }

    // Access multiple data sections
    if len(rawMsg.Body.Data) > 1 {
        fmt.Printf("Message has %d data sections\n", len(rawMsg.Body.Data))
        for i, data := range rawMsg.Body.Data {
            fmt.Printf("  Section %d: %s\n", i, string(data))
        }
    }

    // Access sequence body
    if len(rawMsg.Body.Sequence) > 0 {
        fmt.Printf("Sequence body: %v\n", rawMsg.Body.Sequence)
    }

    // Access value body
    if rawMsg.Body.Value != nil {
        fmt.Printf("Value body: %v\n", rawMsg.Body.Value)
    }

    // Access application properties
    for key, value := range rawMsg.ApplicationProperties {
        fmt.Printf("App Property %s: %v\n", key, value)
    }

    // Access message annotations
    for key, value := range rawMsg.MessageAnnotations {
        fmt.Printf("Message Annotation %v: %v\n", key, value)
    }

    // Access delivery annotations
    for key, value := range rawMsg.DeliveryAnnotations {
        fmt.Printf("Delivery Annotation %v: %v\n", key, value)
    }

    // Access footer
    for key, value := range rawMsg.Footer {
        fmt.Printf("Footer %v: %v\n", key, value)
    }
}

Common Patterns

JSON Payloads

import (
    "encoding/json"
    "github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
)

// Sending
type SensorData struct {
    Temperature float64 `json:"temperature"`
    Humidity    int     `json:"humidity"`
}

data := SensorData{Temperature: 25.5, Humidity: 60}
payload, err := json.Marshal(data)
if err != nil {
    // handle error
}

event := &azeventhubs.EventData{
    Body:        payload,
    ContentType: to.Ptr("application/json"),
}

// Receiving
events, err := partitionClient.ReceiveEvents(context.TODO(), 10, nil)
if err != nil {
    // handle error
}

for _, event := range events {
    if event.ContentType != nil && *event.ContentType == "application/json" {
        var data SensorData
        err := json.Unmarshal(event.Body, &data)
        if err != nil {
            // handle error
        }
        fmt.Printf("Temperature: %.1f, Humidity: %d\n", data.Temperature, data.Humidity)
    }
}

Binary Payloads

// Sending
event := &azeventhubs.EventData{
    Body:        []byte{0x01, 0x02, 0x03, 0x04},
    ContentType: to.Ptr("application/octet-stream"),
}

// Receiving
events, err := partitionClient.ReceiveEvents(context.TODO(), 10, nil)
if err != nil {
    // handle error
}

for _, event := range events {
    fmt.Printf("Binary data length: %d bytes\n", len(event.Body))
    // Process binary data...
}

Message Correlation

// Sending request
requestEvent := &azeventhubs.EventData{
    Body:          []byte("request data"),
    MessageID:     to.Ptr("request-12345"),
    CorrelationID: "session-abc",
}

// Sending response with correlation
responseEvent := &azeventhubs.EventData{
    Body:          []byte("response data"),
    MessageID:     to.Ptr("response-67890"),
    CorrelationID: "request-12345", // Correlate with request
}

// Receiving and matching
events, err := partitionClient.ReceiveEvents(context.TODO(), 10, nil)
if err != nil {
    // handle error
}

for _, event := range events {
    if event.CorrelationID != nil {
        fmt.Printf("Correlated to: %v\n", event.CorrelationID)
    }
}

See Also

  • Producer Client
  • Consumer Client
  • AMQP 1.0 Specification