tessl install tessl/golang-github-com-azure-azure-sdk-for-go-sdk-messaging-azeventhubs-v2@2.0.0Azure Event Hubs client library for Go providing ProducerClient for sending events, ConsumerClient for manual partition-level consumption, and Processor for automatic load balancing with checkpoint management using Azure Blob Storage.
This document covers the event data types used for sending and receiving events, as well as the low-level AMQP message representations.
EventData represents an event to be sent to Event Hubs.
type EventData struct {
// Properties can be used to store custom metadata for a message.
Properties map[string]any
// Body is the payload for a message.
Body []byte
// ContentType describes the payload of the message, with a descriptor following
// the format of Content-Type, specified by RFC2045 (ex: "application/json").
ContentType *string
// CorrelationID is a client-specific id that can be used to mark or identify messages
// between clients.
// CorrelationID can be a uint64, UUID, []byte, or string
CorrelationID any
// MessageID is an application-defined value that uniquely identifies
// the message and its payload. The identifier is a free-form string.
//
// If enabled, the duplicate detection feature identifies and removes further submissions
// of messages with the same MessageId.
MessageID *string
}Fields:
Properties (map[string]any) - Custom application properties for the messageBody ([]byte) - The message payloadContentType (*string) - Content type descriptor (RFC2045 format, e.g., "application/json")CorrelationID (any) - Client-specific identifier for message correlation (types: uint64, UUID, []byte, string)MessageID (*string) - Unique identifier for the message (used for duplicate detection if enabled)Example:
import "github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
event := &azeventhubs.EventData{
Body: []byte(`{"temperature": 25.5, "humidity": 60}`),
ContentType: to.Ptr("application/json"),
MessageID: to.Ptr("msg-12345"),
CorrelationID: "correlation-abc",
Properties: map[string]any{
"sensorId": "sensor-01",
"location": "building-a",
"timestamp": time.Now().Unix(),
"priority": 5,
},
}ReceivedEventData represents an event received from Event Hubs. It embeds EventData and adds Event Hubs-specific metadata.
type ReceivedEventData struct {
EventData
// EnqueuedTime is the UTC time when the message was accepted and stored by Event Hubs.
EnqueuedTime *time.Time
// PartitionKey is used with a partitioned entity and enables assigning related messages
// to the same internal partition. This ensures that the submission sequence order is correctly
// recorded. The partition is chosen by a hash function in Event Hubs and cannot be chosen
// directly.
PartitionKey *string
// Offset is the offset of the event.
Offset string
// RawAMQPMessage is the AMQP message, as received by the client. This can be useful to get access
// to properties that are not exposed by ReceivedEventData such as payloads encoded into the
// Value or Sequence section, payloads sent as multiple Data sections, as well as Footer
// and Header fields.
RawAMQPMessage *AMQPAnnotatedMessage
// SequenceNumber is a unique number assigned to a message by Event Hubs.
SequenceNumber int64
// Properties set by the Event Hubs service.
SystemProperties map[string]any
}Inherited Fields from EventData:
Properties (map[string]any) - Custom application propertiesBody ([]byte) - The message payloadContentType (*string) - Content type descriptorCorrelationID (any) - Client-specific correlation identifierMessageID (*string) - Unique message identifierAdditional Fields:
EnqueuedTime (*time.Time) - When Event Hubs received and stored the event (UTC)PartitionKey (*string) - The partition key used for routing (if any)Offset (string) - The offset of the event in the partitionRawAMQPMessage (*AMQPAnnotatedMessage) - Full AMQP message for low-level accessSequenceNumber (int64) - Unique sequence number assigned by Event HubsSystemProperties (map[string]any) - Event Hubs system properties (beyond the standard fields)Example:
events, err := partitionClient.ReceiveEvents(context.TODO(), 10, nil)
if err != nil {
// handle error
}
for _, event := range events {
// Standard EventData fields
fmt.Printf("Body: %s\n", string(event.Body))
if event.ContentType != nil {
fmt.Printf("Content Type: %s\n", *event.ContentType)
}
if event.MessageID != nil {
fmt.Printf("Message ID: %s\n", *event.MessageID)
}
// Event Hubs metadata
fmt.Printf("Sequence Number: %d\n", event.SequenceNumber)
fmt.Printf("Offset: %s\n", event.Offset)
if event.EnqueuedTime != nil {
fmt.Printf("Enqueued Time: %s\n", event.EnqueuedTime.Format(time.RFC3339))
}
if event.PartitionKey != nil {
fmt.Printf("Partition Key: %s\n", *event.PartitionKey)
}
// Custom properties
for key, value := range event.Properties {
fmt.Printf("Property %s: %v\n", key, value)
}
// System properties (if any)
for key, value := range event.SystemProperties {
fmt.Printf("System Property %s: %v\n", key, value)
}
// Access raw AMQP message if needed
if event.RawAMQPMessage != nil {
// Access AMQP-specific fields
if event.RawAMQPMessage.Header != nil {
fmt.Printf("AMQP Priority: %d\n", event.RawAMQPMessage.Header.Priority)
}
}
}For advanced scenarios, you can work directly with AMQP message types. These provide full control over the AMQP message format.
AMQPAnnotatedMessage represents the full AMQP message structure.
type AMQPAnnotatedMessage struct {
// ApplicationProperties corresponds to the "application-properties" section of an AMQP message.
//
// The values of the map are restricted to AMQP simple types, as listed in the comment for AMQPAnnotatedMessage.
ApplicationProperties map[string]any
// Body represents the body of an AMQP message.
Body AMQPAnnotatedMessageBody
// DeliveryAnnotations corresponds to the "delivery-annotations" section in an AMQP message.
//
// The values of the map are restricted to AMQP simple types, as listed in the comment for AMQPAnnotatedMessage.
DeliveryAnnotations map[any]any
// DeliveryTag corresponds to the delivery-tag property of the TRANSFER frame
// for this message.
DeliveryTag []byte
// Footer is the transport footers for this AMQP message.
//
// The values of the map are restricted to AMQP simple types, as listed in the comment for AMQPAnnotatedMessage.
Footer map[any]any
// Header is the transport headers for this AMQP message.
Header *AMQPAnnotatedMessageHeader
// MessageAnnotations corresponds to the message-annotations section of an AMQP message.
//
// The values of the map are restricted to AMQP simple types, as listed in the comment for AMQPAnnotatedMessage.
MessageAnnotations map[any]any
// Properties corresponds to the properties section of an AMQP message.
Properties *AMQPAnnotatedMessageProperties
}AMQP Simple Types:
The following types are supported in AMQP maps and properties:
int (any size), uint (any size)float (any size)stringbooltime.TimeFields:
ApplicationProperties (map[string]any) - Application-specific properties (AMQP simple types)Body (AMQPAnnotatedMessageBody) - The message bodyDeliveryAnnotations (map[any]any) - Delivery annotations (AMQP simple types)DeliveryTag ([]byte) - Delivery tag from the TRANSFER frameFooter (map[any]any) - Transport footers (AMQP simple types)Header (*AMQPAnnotatedMessageHeader) - Transport headersMessageAnnotations (map[any]any) - Message annotations (AMQP simple types)Properties (*AMQPAnnotatedMessageProperties) - AMQP message propertiesReference: AMQP 1.0 Message Format Specification
type AMQPAnnotatedMessageProperties struct {
// AbsoluteExpiryTime corresponds to the 'absolute-expiry-time' property.
AbsoluteExpiryTime *time.Time
// ContentEncoding corresponds to the 'content-encoding' property.
ContentEncoding *string
// ContentType corresponds to the 'content-type' property
ContentType *string
// CorrelationID corresponds to the 'correlation-id' property.
// The type of CorrelationID can be a uint64, UUID, []byte, or a string
CorrelationID any
// CreationTime corresponds to the 'creation-time' property.
CreationTime *time.Time
// GroupID corresponds to the 'group-id' property.
GroupID *string
// GroupSequence corresponds to the 'group-sequence' property.
GroupSequence *uint32
// MessageID corresponds to the 'message-id' property.
// The type of MessageID can be a uint64, UUID, []byte, or string
MessageID any
// ReplyTo corresponds to the 'reply-to' property.
ReplyTo *string
// ReplyToGroupID corresponds to the 'reply-to-group-id' property.
ReplyToGroupID *string
// Subject corresponds to the 'subject' property.
Subject *string
// To corresponds to the 'to' property.
To *string
// UserID corresponds to the 'user-id' property.
UserID []byte
}Fields:
AbsoluteExpiryTime (*time.Time) - Absolute expiry time for the messageContentEncoding (*string) - Content encoding descriptorContentType (*string) - Content type descriptorCorrelationID (any) - Correlation identifier (types: uint64, UUID, []byte, string)CreationTime (*time.Time) - Message creation timeGroupID (*string) - Group identifier for message sequencesGroupSequence (*uint32) - Position in a group sequenceMessageID (any) - Unique message identifier (types: uint64, UUID, []byte, string)ReplyTo (*string) - Reply-to addressReplyToGroupID (*string) - Reply-to group identifierSubject (*string) - Message subjectTo (*string) - Destination addressUserID ([]byte) - User identifierReference: AMQP 1.0 Properties
type AMQPAnnotatedMessageHeader struct {
// DeliveryCount is the number of unsuccessful previous attempts to deliver this message.
// It corresponds to the 'delivery-count' property.
DeliveryCount uint32
// Durable corresponds to the 'durable' property.
Durable bool
// FirstAcquirer corresponds to the 'first-acquirer' property.
FirstAcquirer bool
// Priority corresponds to the 'priority' property.
Priority uint8
// TTL corresponds to the 'ttl' property.
TTL time.Duration
}Fields:
DeliveryCount (uint32) - Number of unsuccessful delivery attemptsDurable (bool) - Whether the message is durableFirstAcquirer (bool) - Whether this is the first acquirerPriority (uint8) - Message priority (0-255)TTL (time.Duration) - Time to live for the messageReference: AMQP 1.0 Header
type AMQPAnnotatedMessageBody struct {
// Data is encoded/decoded as multiple data sections in the body.
Data [][]byte
// Sequence is encoded/decoded as one or more amqp-sequence sections in the body.
//
// The values of the slices are are restricted to AMQP simple types, as listed in the comment for AMQPAnnotatedMessage.
Sequence [][]any
// Value is encoded/decoded as the amqp-value section in the body.
//
// The type of Value can be any of the AMQP simple types, as listed in the comment for AMQPAnnotatedMessage,
// as well as slices or maps of AMQP simple types.
Value any
}Fields:
Data ([][]byte) - Multiple data sections (most common for Event Hubs)Sequence ([][]any) - One or more AMQP sequence sections (AMQP simple types)Value (any) - AMQP value section (AMQP simple types, slices, or maps)Important: Only one of these fields should be used at a time. They are mutually exclusive.
func (b *EventDataBatch) AddAMQPAnnotatedMessage(
annotatedMessage *AMQPAnnotatedMessage,
options *AddEventDataOptions,
) errorYou can send AMQP messages directly for full control over the message structure.
Example:
import "github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
amqpMsg := &azeventhubs.AMQPAnnotatedMessage{
Body: azeventhubs.AMQPAnnotatedMessageBody{
Data: [][]byte{
[]byte("first data section"),
[]byte("second data section"),
},
},
Properties: &azeventhubs.AMQPAnnotatedMessageProperties{
MessageID: "msg-12345",
ContentType: to.Ptr("application/octet-stream"),
Subject: to.Ptr("sensor-data"),
CreationTime: to.Ptr(time.Now()),
},
Header: &azeventhubs.AMQPAnnotatedMessageHeader{
Durable: true,
Priority: 5,
TTL: 1 * time.Hour,
},
ApplicationProperties: map[string]any{
"deviceId": "device-001",
"location": "warehouse-a",
"temperature": 25.5,
},
MessageAnnotations: map[any]any{
"x-custom-annotation": "value",
},
}
batch, err := producerClient.NewEventDataBatch(context.TODO(), nil)
if err != nil {
// handle error
}
err = batch.AddAMQPAnnotatedMessage(amqpMsg, nil)
if err != nil {
// handle error
}
err = producerClient.SendEventDataBatch(context.TODO(), batch, nil)
if err != nil {
// handle error
}Access the full AMQP message through the RawAMQPMessage field of ReceivedEventData.
Example:
events, err := partitionClient.ReceiveEvents(context.TODO(), 10, nil)
if err != nil {
// handle error
}
for _, event := range events {
rawMsg := event.RawAMQPMessage
if rawMsg == nil {
continue
}
// Access header
if rawMsg.Header != nil {
fmt.Printf("Priority: %d\n", rawMsg.Header.Priority)
fmt.Printf("TTL: %s\n", rawMsg.Header.TTL)
fmt.Printf("Durable: %v\n", rawMsg.Header.Durable)
}
// Access properties
if rawMsg.Properties != nil {
if rawMsg.Properties.Subject != nil {
fmt.Printf("Subject: %s\n", *rawMsg.Properties.Subject)
}
if rawMsg.Properties.CreationTime != nil {
fmt.Printf("Creation Time: %s\n", rawMsg.Properties.CreationTime.Format(time.RFC3339))
}
}
// Access multiple data sections
if len(rawMsg.Body.Data) > 1 {
fmt.Printf("Message has %d data sections\n", len(rawMsg.Body.Data))
for i, data := range rawMsg.Body.Data {
fmt.Printf(" Section %d: %s\n", i, string(data))
}
}
// Access sequence body
if len(rawMsg.Body.Sequence) > 0 {
fmt.Printf("Sequence body: %v\n", rawMsg.Body.Sequence)
}
// Access value body
if rawMsg.Body.Value != nil {
fmt.Printf("Value body: %v\n", rawMsg.Body.Value)
}
// Access application properties
for key, value := range rawMsg.ApplicationProperties {
fmt.Printf("App Property %s: %v\n", key, value)
}
// Access message annotations
for key, value := range rawMsg.MessageAnnotations {
fmt.Printf("Message Annotation %v: %v\n", key, value)
}
// Access delivery annotations
for key, value := range rawMsg.DeliveryAnnotations {
fmt.Printf("Delivery Annotation %v: %v\n", key, value)
}
// Access footer
for key, value := range rawMsg.Footer {
fmt.Printf("Footer %v: %v\n", key, value)
}
}import (
"encoding/json"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
)
// Sending
type SensorData struct {
Temperature float64 `json:"temperature"`
Humidity int `json:"humidity"`
}
data := SensorData{Temperature: 25.5, Humidity: 60}
payload, err := json.Marshal(data)
if err != nil {
// handle error
}
event := &azeventhubs.EventData{
Body: payload,
ContentType: to.Ptr("application/json"),
}
// Receiving
events, err := partitionClient.ReceiveEvents(context.TODO(), 10, nil)
if err != nil {
// handle error
}
for _, event := range events {
if event.ContentType != nil && *event.ContentType == "application/json" {
var data SensorData
err := json.Unmarshal(event.Body, &data)
if err != nil {
// handle error
}
fmt.Printf("Temperature: %.1f, Humidity: %d\n", data.Temperature, data.Humidity)
}
}// Sending
event := &azeventhubs.EventData{
Body: []byte{0x01, 0x02, 0x03, 0x04},
ContentType: to.Ptr("application/octet-stream"),
}
// Receiving
events, err := partitionClient.ReceiveEvents(context.TODO(), 10, nil)
if err != nil {
// handle error
}
for _, event := range events {
fmt.Printf("Binary data length: %d bytes\n", len(event.Body))
// Process binary data...
}// Sending request
requestEvent := &azeventhubs.EventData{
Body: []byte("request data"),
MessageID: to.Ptr("request-12345"),
CorrelationID: "session-abc",
}
// Sending response with correlation
responseEvent := &azeventhubs.EventData{
Body: []byte("response data"),
MessageID: to.Ptr("response-67890"),
CorrelationID: "request-12345", // Correlate with request
}
// Receiving and matching
events, err := partitionClient.ReceiveEvents(context.TODO(), 10, nil)
if err != nil {
// handle error
}
for _, event := range events {
if event.CorrelationID != nil {
fmt.Printf("Correlated to: %v\n", event.CorrelationID)
}
}