Apache Pulsar HTTP Sink Connector that streams topic data to HTTP endpoints as JSON webhooks
—
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Pending
The risk profile of this skill
Apache Pulsar HTTP Sink Connector enables streaming data from Pulsar topics to external HTTP endpoints through webhook-style POST requests. The connector transforms Pulsar records into JSON payloads and enriches HTTP requests with metadata headers, providing a reliable bridge between Pulsar's pub-sub messaging and HTTP-based services.
org.apache.pulsar:pulsar-io-http:4.0.6import org.apache.pulsar.io.http.HttpSink;
import org.apache.pulsar.io.http.HttpSinkConfig;
import org.apache.pulsar.io.http.JsonConverter;<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-io-http</artifactId>
<version>4.0.6</version>
</dependency># http-sink-config.yaml
url: "https://example.com/webhook"
headers:
Authorization: "Bearer your-token"
Content-Type: "application/json"
X-Custom-Header: "custom-value"# Create sink connector
bin/pulsar-admin sinks create \
--archive pulsar-io-http-4.0.6.nar \
--inputs my-topic \
--name http-sink \
--sink-config-file http-sink-config.yamlimport java.util.HashMap;
import java.util.Map;
import org.apache.pulsar.io.http.HttpSink;
import org.apache.pulsar.io.http.HttpSinkConfig;
// Create and configure the sink
HttpSink httpSink = new HttpSink();
Map<String, Object> config = new HashMap<>();
config.put("url", "https://example.com/webhook");
Map<String, String> headers = new HashMap<>();
headers.put("Authorization", "Bearer token");
config.put("headers", headers);
// Initialize the sink
httpSink.open(config, sinkContext);
// Process records (typically handled by Pulsar runtime)
httpSink.write(pulsarRecord);
// Clean up
httpSink.close();The HTTP Sink Connector follows the Pulsar IO framework architecture:
Sink<GenericObject> interfaceMain connector class that processes Pulsar records and sends them as HTTP POST requests.
public class HttpSink implements Sink<GenericObject> {
/**
* Initialize the HTTP sink with configuration
* @param config - Configuration map with "url" and "headers"
* @param sinkContext - Pulsar sink context for runtime information
* @throws Exception - Configuration errors or URI parsing failures
*/
void open(Map<String, Object> config, SinkContext sinkContext) throws Exception;
/**
* Process a Pulsar record and send via HTTP POST
* @param record - Pulsar record to be processed and sent via HTTP
* @throws Exception - JSON conversion errors or HTTP failures (non-2xx status)
*/
void write(Record<GenericObject> record) throws Exception;
/**
* Clean up resources (no-op implementation)
*/
void close();
}Internal Components:
HttpSinkConfig httpSinkConfig: Loaded configuration objectHttpClient httpClient: Java 11 HTTP client for requestsObjectMapper mapper: Jackson mapper with JavaTimeModule for JSON serializationURI uri: Parsed target URL for HTTP requeststoJsonSerializable(Schema, Object): Private method handling schema-specific JSON conversionBehavior:
IOException for HTTP errors (status codes outside 200-299 range)Configuration class for HTTP sink connector settings with validation and loading capabilities.
@Data
@Accessors(chain = true)
public class HttpSinkConfig implements Serializable {
@FieldDoc(defaultValue = "http://localhost", help = "The URL of the HTTP server")
private String url = "http://localhost";
@FieldDoc(defaultValue = "", help = "The list of default headers added to each request")
private Map<String, String> headers = new HashMap<>();
/**
* Load configuration from YAML file
* @param yamlFile - Path to YAML configuration file
* @return HttpSinkConfig instance
* @throws IOException - File reading or parsing errors
*/
static HttpSinkConfig load(String yamlFile) throws IOException;
/**
* Load configuration from Map object
* @param map - Configuration map with "url" and "headers" keys
* @return HttpSinkConfig instance
* @throws IOException - Map conversion or validation errors
*/
static HttpSinkConfig load(Map<String, Object> map) throws IOException;
// Lombok @Data generates getter and setter methods
String getUrl();
HttpSinkConfig setUrl(String url);
Map<String, String> getHeaders();
HttpSinkConfig setHeaders(Map<String, String> headers);
}Lombok Annotations:
@Data: Generates getters, setters, toString, equals, and hashCode methods@Accessors(chain = true): Enables fluent method chaining (setters return this)@FieldDoc: Pulsar IO annotation for configuration field documentationConfiguration Fields:
url: Target HTTP endpoint URL (default: "http://localhost")headers: Custom headers to include in HTTP requests (default: empty map)Loading Methods:
load(String yamlFile): Load configuration from YAML file using Jackson YAML factoryload(Map<String, Object> map): Load configuration from Map object via JSON conversionUtility class for converting AVRO GenericRecord objects to Jackson JsonNode format.
public class JsonConverter {
/**
* Convert an AVRO GenericRecord to a JsonNode
* @param genericRecord - AVRO generic record to convert
* @return JsonNode representation, or null if input is null
*/
static JsonNode toJson(GenericRecord genericRecord);
/**
* Convert typed value with schema to JsonNode
* @param schema - AVRO schema for the value
* @param value - Value to convert (may be null)
* @return JsonNode representation
*/
static JsonNode toJson(Schema schema, Object value);
/**
* Merge two JSON objects at top level
* @param n1 - First JsonNode
* @param n2 - Second JsonNode
* @return Merged ObjectNode with fields from both inputs
*/
static JsonNode topLevelMerge(JsonNode n1, JsonNode n2);
/**
* Convert JSON object to array with specified field values
* @param jsonNode - Source JSON object
* @param fields - List of field names to include in array
* @return ArrayNode containing values for the specified fields
*/
static ArrayNode toJsonArray(JsonNode jsonNode, List<String> fields);
}Supported AVRO Types:
All HTTP requests are sent as POST requests with the following characteristics:
The connector automatically adds these headers to every HTTP request:
// Standard headers
"Content-Type" -> "application/json"
// Pulsar metadata headers
"PulsarTopic" -> record.getTopicName()
"PulsarKey" -> record.getKey()
"PulsarEventTime" -> record.getEventTime().toString()
"PulsarPublishTime" -> String.valueOf(message.getPublishTime())
"PulsarMessageId" -> Base64.getEncoder().encodeToString(messageId.toByteArray())
// Custom message properties (prefixed)
"PulsarProperties-<property-name>" -> property-value
// User-configured custom headers
<header-name> -> <header-value>Primitive Value:
"hello world"KEY_VALUE Schema:
{
"key": "user-123",
"value": {
"name": "John Doe",
"age": 30,
"active": true
}
}AVRO Record:
{
"id": 123,
"name": "Product A",
"price": 29.99,
"category": {
"id": 1,
"name": "Electronics"
},
"tags": ["gadget", "mobile"]
}The connector supports all Pulsar schema types with automatic JSON conversion:
// Primitive schemas - direct JSON representation
Schema.STRING, Schema.INT8, Schema.INT16, Schema.INT32, Schema.INT64
Schema.BOOL, Schema.FLOAT, Schema.DOUBLE, Schema.BYTES
// Date/time schemas
Schema.DATE, Schema.TIME, Schema.TIMESTAMP, Schema.INSTANT
Schema.LOCAL_DATE, Schema.LOCAL_TIME, Schema.LOCAL_DATE_TIME
// Complex schemas
Schema.JSON(Class) // Passed through as-is
Schema.AVRO(Class) // Converted via JsonConverter
Schema.KeyValue(keySchema, valueSchema) // Converted to {"key": ..., "value": ...}// Primitive types
Schema.STRING.encode("hello") -> "hello"
Schema.INT32.encode(42) -> 42
Schema.BOOL.encode(true) -> true
// Key-Value pairs
KeyValueSchema<String, User> kvSchema = Schema.KeyValue(Schema.STRING, Schema.AVRO(User.class));
// Produces: {"key": "user-id", "value": {"name": "John", "age": 30}}
// AVRO records automatically converted to JSON objects
// JSON schema values passed through unchanged// Successful responses (200-299 status codes)
// - Request completes successfully
// - No exception thrown
// - Record processing continues
// Error responses (all other status codes)
// - Throws IOException with status code
// - Stops record processing
// - Requires manual intervention or retry logic// HTTP request failures (write method)
IOException: HTTP request failed with non-2xx status code
// Example: "HTTP call to https://example.com/webhook failed with status code 404"
// Configuration errors (open method)
Exception: Configuration loading failures (invalid YAML, missing required fields)
URISyntaxException: Invalid URL format in configuration
// JSON conversion errors (write method)
UnsupportedOperationException: Unsupported schema type (not AVRO, JSON, KEY_VALUE, or primitive)
IllegalArgumentException: Invalid logical type values (e.g., non-BigDecimal for decimal type)
// HTTP client errors (write method)
IOException: Network connectivity issues, request timeout, or other HTTP client failuresSpecific Error Conditions:
The connector does not implement automatic retry logic. Error handling strategies:
url: "https://hooks.slack.com/services/T00000000/B00000000/XXXXXXXXXXXXXXXXXXXXXXXX"
headers:
Content-Type: "application/json"url: "https://api.example.com/v1/events"
headers:
Authorization: "Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9..."
X-API-Key: "your-api-key"
X-Source: "pulsar-connector"url: "https://webhook.example.com/pulsar-events"
headers:
X-Event-Source: "pulsar"
X-Environment: "production"
X-Team: "data-platform"# Download or build the NAR file
wget https://archive.apache.org/dist/pulsar/pulsar-4.0.6/connectors/pulsar-io-http-4.0.6.nar
# Copy to Pulsar connectors directory
cp pulsar-io-http-4.0.6.nar $PULSAR_HOME/connectors/
# Create sink instance
bin/pulsar-admin sinks create \
--archive pulsar-io-http-4.0.6.nar \
--inputs persistent://public/default/events \
--name http-webhook-sink \
--sink-config '{
"url": "https://example.com/webhook",
"headers": {
"Authorization": "Bearer token"
}
}'# Using Pulsar Docker image
docker run -it \
-v /path/to/config:/pulsar/conf \
-v /path/to/connectors:/pulsar/connectors \
apachepulsar/pulsar:4.0.6 \
bin/pulsar-admin sinks create \
--archive /pulsar/connectors/pulsar-io-http-4.0.6.nar \
--inputs my-topic \
--name http-sink \
--sink-config-file /pulsar/conf/http-sink.yaml# List running sinks
bin/pulsar-admin sinks list
# Get sink status
bin/pulsar-admin sinks status --name http-sink
# Update sink configuration
bin/pulsar-admin sinks update \
--name http-sink \
--sink-config '{
"url": "https://new-endpoint.com/webhook",
"headers": {"Authorization": "Bearer new-token"}
}'
# Stop and delete sink
bin/pulsar-admin sinks stop --name http-sink
bin/pulsar-admin sinks delete --name http-sink