Apache Pulsar HTTP Sink Connector that streams topic data to HTTP endpoints as JSON webhooks
npx @tessl/cli install tessl/maven-org-apache-pulsar--pulsar-io-http@4.0.0Apache Pulsar HTTP Sink Connector enables streaming data from Pulsar topics to external HTTP endpoints through webhook-style POST requests. The connector transforms Pulsar records into JSON payloads and enriches HTTP requests with metadata headers, providing a reliable bridge between Pulsar's pub-sub messaging and HTTP-based services.
org.apache.pulsar:pulsar-io-http:4.0.6import org.apache.pulsar.io.http.HttpSink;
import org.apache.pulsar.io.http.HttpSinkConfig;
import org.apache.pulsar.io.http.JsonConverter;<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-io-http</artifactId>
<version>4.0.6</version>
</dependency># http-sink-config.yaml
url: "https://example.com/webhook"
headers:
Authorization: "Bearer your-token"
Content-Type: "application/json"
X-Custom-Header: "custom-value"# Create sink connector
bin/pulsar-admin sinks create \
--archive pulsar-io-http-4.0.6.nar \
--inputs my-topic \
--name http-sink \
--sink-config-file http-sink-config.yamlimport java.util.HashMap;
import java.util.Map;
import org.apache.pulsar.io.http.HttpSink;
import org.apache.pulsar.io.http.HttpSinkConfig;
// Create and configure the sink
HttpSink httpSink = new HttpSink();
Map<String, Object> config = new HashMap<>();
config.put("url", "https://example.com/webhook");
Map<String, String> headers = new HashMap<>();
headers.put("Authorization", "Bearer token");
config.put("headers", headers);
// Initialize the sink
httpSink.open(config, sinkContext);
// Process records (typically handled by Pulsar runtime)
httpSink.write(pulsarRecord);
// Clean up
httpSink.close();The HTTP Sink Connector follows the Pulsar IO framework architecture:
Sink<GenericObject> interfaceMain connector class that processes Pulsar records and sends them as HTTP POST requests.
public class HttpSink implements Sink<GenericObject> {
/**
* Initialize the HTTP sink with configuration
* @param config - Configuration map with "url" and "headers"
* @param sinkContext - Pulsar sink context for runtime information
* @throws Exception - Configuration errors or URI parsing failures
*/
void open(Map<String, Object> config, SinkContext sinkContext) throws Exception;
/**
* Process a Pulsar record and send via HTTP POST
* @param record - Pulsar record to be processed and sent via HTTP
* @throws Exception - JSON conversion errors or HTTP failures (non-2xx status)
*/
void write(Record<GenericObject> record) throws Exception;
/**
* Clean up resources (no-op implementation)
*/
void close();
}Internal Components:
HttpSinkConfig httpSinkConfig: Loaded configuration objectHttpClient httpClient: Java 11 HTTP client for requestsObjectMapper mapper: Jackson mapper with JavaTimeModule for JSON serializationURI uri: Parsed target URL for HTTP requeststoJsonSerializable(Schema, Object): Private method handling schema-specific JSON conversionBehavior:
IOException for HTTP errors (status codes outside 200-299 range)Configuration class for HTTP sink connector settings with validation and loading capabilities.
@Data
@Accessors(chain = true)
public class HttpSinkConfig implements Serializable {
@FieldDoc(defaultValue = "http://localhost", help = "The URL of the HTTP server")
private String url = "http://localhost";
@FieldDoc(defaultValue = "", help = "The list of default headers added to each request")
private Map<String, String> headers = new HashMap<>();
/**
* Load configuration from YAML file
* @param yamlFile - Path to YAML configuration file
* @return HttpSinkConfig instance
* @throws IOException - File reading or parsing errors
*/
static HttpSinkConfig load(String yamlFile) throws IOException;
/**
* Load configuration from Map object
* @param map - Configuration map with "url" and "headers" keys
* @return HttpSinkConfig instance
* @throws IOException - Map conversion or validation errors
*/
static HttpSinkConfig load(Map<String, Object> map) throws IOException;
// Lombok @Data generates getter and setter methods
String getUrl();
HttpSinkConfig setUrl(String url);
Map<String, String> getHeaders();
HttpSinkConfig setHeaders(Map<String, String> headers);
}Lombok Annotations:
@Data: Generates getters, setters, toString, equals, and hashCode methods@Accessors(chain = true): Enables fluent method chaining (setters return this)@FieldDoc: Pulsar IO annotation for configuration field documentationConfiguration Fields:
url: Target HTTP endpoint URL (default: "http://localhost")headers: Custom headers to include in HTTP requests (default: empty map)Loading Methods:
load(String yamlFile): Load configuration from YAML file using Jackson YAML factoryload(Map<String, Object> map): Load configuration from Map object via JSON conversionUtility class for converting AVRO GenericRecord objects to Jackson JsonNode format.
public class JsonConverter {
/**
* Convert an AVRO GenericRecord to a JsonNode
* @param genericRecord - AVRO generic record to convert
* @return JsonNode representation, or null if input is null
*/
static JsonNode toJson(GenericRecord genericRecord);
/**
* Convert typed value with schema to JsonNode
* @param schema - AVRO schema for the value
* @param value - Value to convert (may be null)
* @return JsonNode representation
*/
static JsonNode toJson(Schema schema, Object value);
/**
* Merge two JSON objects at top level
* @param n1 - First JsonNode
* @param n2 - Second JsonNode
* @return Merged ObjectNode with fields from both inputs
*/
static JsonNode topLevelMerge(JsonNode n1, JsonNode n2);
/**
* Convert JSON object to array with specified field values
* @param jsonNode - Source JSON object
* @param fields - List of field names to include in array
* @return ArrayNode containing values for the specified fields
*/
static ArrayNode toJsonArray(JsonNode jsonNode, List<String> fields);
}Supported AVRO Types:
All HTTP requests are sent as POST requests with the following characteristics:
The connector automatically adds these headers to every HTTP request:
// Standard headers
"Content-Type" -> "application/json"
// Pulsar metadata headers
"PulsarTopic" -> record.getTopicName()
"PulsarKey" -> record.getKey()
"PulsarEventTime" -> record.getEventTime().toString()
"PulsarPublishTime" -> String.valueOf(message.getPublishTime())
"PulsarMessageId" -> Base64.getEncoder().encodeToString(messageId.toByteArray())
// Custom message properties (prefixed)
"PulsarProperties-<property-name>" -> property-value
// User-configured custom headers
<header-name> -> <header-value>Primitive Value:
"hello world"KEY_VALUE Schema:
{
"key": "user-123",
"value": {
"name": "John Doe",
"age": 30,
"active": true
}
}AVRO Record:
{
"id": 123,
"name": "Product A",
"price": 29.99,
"category": {
"id": 1,
"name": "Electronics"
},
"tags": ["gadget", "mobile"]
}The connector supports all Pulsar schema types with automatic JSON conversion:
// Primitive schemas - direct JSON representation
Schema.STRING, Schema.INT8, Schema.INT16, Schema.INT32, Schema.INT64
Schema.BOOL, Schema.FLOAT, Schema.DOUBLE, Schema.BYTES
// Date/time schemas
Schema.DATE, Schema.TIME, Schema.TIMESTAMP, Schema.INSTANT
Schema.LOCAL_DATE, Schema.LOCAL_TIME, Schema.LOCAL_DATE_TIME
// Complex schemas
Schema.JSON(Class) // Passed through as-is
Schema.AVRO(Class) // Converted via JsonConverter
Schema.KeyValue(keySchema, valueSchema) // Converted to {"key": ..., "value": ...}// Primitive types
Schema.STRING.encode("hello") -> "hello"
Schema.INT32.encode(42) -> 42
Schema.BOOL.encode(true) -> true
// Key-Value pairs
KeyValueSchema<String, User> kvSchema = Schema.KeyValue(Schema.STRING, Schema.AVRO(User.class));
// Produces: {"key": "user-id", "value": {"name": "John", "age": 30}}
// AVRO records automatically converted to JSON objects
// JSON schema values passed through unchanged// Successful responses (200-299 status codes)
// - Request completes successfully
// - No exception thrown
// - Record processing continues
// Error responses (all other status codes)
// - Throws IOException with status code
// - Stops record processing
// - Requires manual intervention or retry logic// HTTP request failures (write method)
IOException: HTTP request failed with non-2xx status code
// Example: "HTTP call to https://example.com/webhook failed with status code 404"
// Configuration errors (open method)
Exception: Configuration loading failures (invalid YAML, missing required fields)
URISyntaxException: Invalid URL format in configuration
// JSON conversion errors (write method)
UnsupportedOperationException: Unsupported schema type (not AVRO, JSON, KEY_VALUE, or primitive)
IllegalArgumentException: Invalid logical type values (e.g., non-BigDecimal for decimal type)
// HTTP client errors (write method)
IOException: Network connectivity issues, request timeout, or other HTTP client failuresSpecific Error Conditions:
The connector does not implement automatic retry logic. Error handling strategies:
url: "https://hooks.slack.com/services/T00000000/B00000000/XXXXXXXXXXXXXXXXXXXXXXXX"
headers:
Content-Type: "application/json"url: "https://api.example.com/v1/events"
headers:
Authorization: "Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9..."
X-API-Key: "your-api-key"
X-Source: "pulsar-connector"url: "https://webhook.example.com/pulsar-events"
headers:
X-Event-Source: "pulsar"
X-Environment: "production"
X-Team: "data-platform"# Download or build the NAR file
wget https://archive.apache.org/dist/pulsar/pulsar-4.0.6/connectors/pulsar-io-http-4.0.6.nar
# Copy to Pulsar connectors directory
cp pulsar-io-http-4.0.6.nar $PULSAR_HOME/connectors/
# Create sink instance
bin/pulsar-admin sinks create \
--archive pulsar-io-http-4.0.6.nar \
--inputs persistent://public/default/events \
--name http-webhook-sink \
--sink-config '{
"url": "https://example.com/webhook",
"headers": {
"Authorization": "Bearer token"
}
}'# Using Pulsar Docker image
docker run -it \
-v /path/to/config:/pulsar/conf \
-v /path/to/connectors:/pulsar/connectors \
apachepulsar/pulsar:4.0.6 \
bin/pulsar-admin sinks create \
--archive /pulsar/connectors/pulsar-io-http-4.0.6.nar \
--inputs my-topic \
--name http-sink \
--sink-config-file /pulsar/conf/http-sink.yaml# List running sinks
bin/pulsar-admin sinks list
# Get sink status
bin/pulsar-admin sinks status --name http-sink
# Update sink configuration
bin/pulsar-admin sinks update \
--name http-sink \
--sink-config '{
"url": "https://new-endpoint.com/webhook",
"headers": {"Authorization": "Bearer new-token"}
}'
# Stop and delete sink
bin/pulsar-admin sinks stop --name http-sink
bin/pulsar-admin sinks delete --name http-sink