CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-pulsar--pulsar-io-http

Apache Pulsar HTTP Sink Connector that streams topic data to HTTP endpoints as JSON webhooks

Pending
Overview
Eval results
Files

Apache Pulsar HTTP Sink Connector

Apache Pulsar HTTP Sink Connector enables streaming data from Pulsar topics to external HTTP endpoints through webhook-style POST requests. The connector transforms Pulsar records into JSON payloads and enriches HTTP requests with metadata headers, providing a reliable bridge between Pulsar's pub-sub messaging and HTTP-based services.

Package Information

  • Package Name: pulsar-io-http
  • Package Type: maven
  • Language: Java
  • Maven Coordinates: org.apache.pulsar:pulsar-io-http:4.0.6
  • Installation: Include as dependency in Maven/Gradle or deploy as Pulsar connector NAR
  • License: Apache License 2.0

Core Imports

import org.apache.pulsar.io.http.HttpSink;
import org.apache.pulsar.io.http.HttpSinkConfig;
import org.apache.pulsar.io.http.JsonConverter;

Basic Usage

Maven Dependency

<dependency>
    <groupId>org.apache.pulsar</groupId>
    <artifactId>pulsar-io-http</artifactId>
    <version>4.0.6</version>
</dependency>

Connector Configuration

# http-sink-config.yaml
url: "https://example.com/webhook"
headers:
  Authorization: "Bearer your-token"
  Content-Type: "application/json"
  X-Custom-Header: "custom-value"

Pulsar Admin Configuration

# Create sink connector
bin/pulsar-admin sinks create \
    --archive pulsar-io-http-4.0.6.nar \
    --inputs my-topic \
    --name http-sink \
    --sink-config-file http-sink-config.yaml

Programmatic Usage

import java.util.HashMap;
import java.util.Map;
import org.apache.pulsar.io.http.HttpSink;
import org.apache.pulsar.io.http.HttpSinkConfig;

// Create and configure the sink
HttpSink httpSink = new HttpSink();
Map<String, Object> config = new HashMap<>();
config.put("url", "https://example.com/webhook");
Map<String, String> headers = new HashMap<>();
headers.put("Authorization", "Bearer token");
config.put("headers", headers);

// Initialize the sink
httpSink.open(config, sinkContext);

// Process records (typically handled by Pulsar runtime)
httpSink.write(pulsarRecord);

// Clean up
httpSink.close();

Architecture

The HTTP Sink Connector follows the Pulsar IO framework architecture:

  • HttpSink: Main connector class implementing the Sink<GenericObject> interface
  • HttpSinkConfig: Configuration model handling URL and custom headers
  • JsonConverter: Utility for converting AVRO records to JSON format
  • HTTP Client: Java 11 HttpClient for reliable HTTP request handling
  • Schema Support: Built-in support for primitives, AVRO, JSON, and KEY_VALUE schemas

Capabilities

HTTP Sink Implementation

Main connector class that processes Pulsar records and sends them as HTTP POST requests.

public class HttpSink implements Sink<GenericObject> {
    /**
     * Initialize the HTTP sink with configuration
     * @param config - Configuration map with "url" and "headers" 
     * @param sinkContext - Pulsar sink context for runtime information
     * @throws Exception - Configuration errors or URI parsing failures
     */
    void open(Map<String, Object> config, SinkContext sinkContext) throws Exception;
    
    /**
     * Process a Pulsar record and send via HTTP POST
     * @param record - Pulsar record to be processed and sent via HTTP
     * @throws Exception - JSON conversion errors or HTTP failures (non-2xx status)
     */
    void write(Record<GenericObject> record) throws Exception;
    
    /**
     * Clean up resources (no-op implementation)
     */
    void close();
}

Internal Components:

  • HttpSinkConfig httpSinkConfig: Loaded configuration object
  • HttpClient httpClient: Java 11 HTTP client for requests
  • ObjectMapper mapper: Jackson mapper with JavaTimeModule for JSON serialization
  • URI uri: Parsed target URL for HTTP requests
  • toJsonSerializable(Schema, Object): Private method handling schema-specific JSON conversion

Behavior:

  • Converts record value to JSON payload using schema-aware conversion
  • Adds metadata headers (topic, key, timestamps, message ID, properties)
  • Sends HTTP POST request to configured URL with application/json content type
  • Throws IOException for HTTP errors (status codes outside 200-299 range)

Configuration Management

Configuration class for HTTP sink connector settings with validation and loading capabilities.

@Data
@Accessors(chain = true)
public class HttpSinkConfig implements Serializable {
    @FieldDoc(defaultValue = "http://localhost", help = "The URL of the HTTP server")
    private String url = "http://localhost";
    
    @FieldDoc(defaultValue = "", help = "The list of default headers added to each request")
    private Map<String, String> headers = new HashMap<>();
    
    /**
     * Load configuration from YAML file
     * @param yamlFile - Path to YAML configuration file
     * @return HttpSinkConfig instance
     * @throws IOException - File reading or parsing errors
     */
    static HttpSinkConfig load(String yamlFile) throws IOException;
    
    /**
     * Load configuration from Map object  
     * @param map - Configuration map with "url" and "headers" keys
     * @return HttpSinkConfig instance
     * @throws IOException - Map conversion or validation errors
     */
    static HttpSinkConfig load(Map<String, Object> map) throws IOException;
    
    // Lombok @Data generates getter and setter methods
    String getUrl();
    HttpSinkConfig setUrl(String url);
    Map<String, String> getHeaders();  
    HttpSinkConfig setHeaders(Map<String, String> headers);
}

Lombok Annotations:

  • @Data: Generates getters, setters, toString, equals, and hashCode methods
  • @Accessors(chain = true): Enables fluent method chaining (setters return this)
  • @FieldDoc: Pulsar IO annotation for configuration field documentation

Configuration Fields:

  • url: Target HTTP endpoint URL (default: "http://localhost")
  • headers: Custom headers to include in HTTP requests (default: empty map)

Loading Methods:

  • load(String yamlFile): Load configuration from YAML file using Jackson YAML factory
  • load(Map<String, Object> map): Load configuration from Map object via JSON conversion

JSON Conversion Utilities

Utility class for converting AVRO GenericRecord objects to Jackson JsonNode format.

public class JsonConverter {
    /**
     * Convert an AVRO GenericRecord to a JsonNode
     * @param genericRecord - AVRO generic record to convert
     * @return JsonNode representation, or null if input is null
     */
    static JsonNode toJson(GenericRecord genericRecord);
    
    /**
     * Convert typed value with schema to JsonNode
     * @param schema - AVRO schema for the value
     * @param value - Value to convert (may be null)
     * @return JsonNode representation
     */
    static JsonNode toJson(Schema schema, Object value);
    
    /**
     * Merge two JSON objects at top level
     * @param n1 - First JsonNode
     * @param n2 - Second JsonNode  
     * @return Merged ObjectNode with fields from both inputs
     */
    static JsonNode topLevelMerge(JsonNode n1, JsonNode n2);
    
    /**
     * Convert JSON object to array with specified field values
     * @param jsonNode - Source JSON object
     * @param fields - List of field names to include in array
     * @return ArrayNode containing values for the specified fields
     */
    static ArrayNode toJsonArray(JsonNode jsonNode, List<String> fields);
}

Supported AVRO Types:

  • Primitives: NULL, INT, LONG, DOUBLE, FLOAT, BOOLEAN, BYTES, FIXED, ENUM, STRING
  • Complex Types: ARRAY, MAP, RECORD, UNION
  • Logical Types: decimal, date, time-millis, time-micros, timestamp-millis, timestamp-micros, uuid

HTTP Request Format

Request Structure

All HTTP requests are sent as POST requests with the following characteristics:

  • Method: POST
  • Content-Type: application/json
  • Body: JSON representation of the Pulsar record value

Metadata Headers

The connector automatically adds these headers to every HTTP request:

// Standard headers
"Content-Type" -> "application/json"

// Pulsar metadata headers
"PulsarTopic" -> record.getTopicName()
"PulsarKey" -> record.getKey()
"PulsarEventTime" -> record.getEventTime().toString()
"PulsarPublishTime" -> String.valueOf(message.getPublishTime())
"PulsarMessageId" -> Base64.getEncoder().encodeToString(messageId.toByteArray())

// Custom message properties (prefixed)
"PulsarProperties-<property-name>" -> property-value

// User-configured custom headers
<header-name> -> <header-value>

JSON Payload Examples

Primitive Value:

"hello world"

KEY_VALUE Schema:

{
  "key": "user-123",
  "value": {
    "name": "John Doe",
    "age": 30,
    "active": true
  }
}

AVRO Record:

{
  "id": 123,
  "name": "Product A",
  "price": 29.99,
  "category": {
    "id": 1,
    "name": "Electronics"
  },
  "tags": ["gadget", "mobile"]
}

Schema Support

Supported Schema Types

The connector supports all Pulsar schema types with automatic JSON conversion:

// Primitive schemas - direct JSON representation
Schema.STRING, Schema.INT8, Schema.INT16, Schema.INT32, Schema.INT64
Schema.BOOL, Schema.FLOAT, Schema.DOUBLE, Schema.BYTES

// Date/time schemas
Schema.DATE, Schema.TIME, Schema.TIMESTAMP, Schema.INSTANT
Schema.LOCAL_DATE, Schema.LOCAL_TIME, Schema.LOCAL_DATE_TIME

// Complex schemas
Schema.JSON(Class)           // Passed through as-is
Schema.AVRO(Class)          // Converted via JsonConverter
Schema.KeyValue(keySchema, valueSchema)  // Converted to {"key": ..., "value": ...}

Schema Conversion Examples

// Primitive types
Schema.STRING.encode("hello") -> "hello"
Schema.INT32.encode(42) -> 42
Schema.BOOL.encode(true) -> true

// Key-Value pairs
KeyValueSchema<String, User> kvSchema = Schema.KeyValue(Schema.STRING, Schema.AVRO(User.class));
// Produces: {"key": "user-id", "value": {"name": "John", "age": 30}}

// AVRO records automatically converted to JSON objects
// JSON schema values passed through unchanged

Error Handling

HTTP Response Handling

// Successful responses (200-299 status codes)
// - Request completes successfully
// - No exception thrown
// - Record processing continues

// Error responses (all other status codes)
// - Throws IOException with status code
// - Stops record processing
// - Requires manual intervention or retry logic

Exception Types

// HTTP request failures (write method)
IOException: HTTP request failed with non-2xx status code
// Example: "HTTP call to https://example.com/webhook failed with status code 404"

// Configuration errors (open method)  
Exception: Configuration loading failures (invalid YAML, missing required fields)
URISyntaxException: Invalid URL format in configuration

// JSON conversion errors (write method)
UnsupportedOperationException: Unsupported schema type (not AVRO, JSON, KEY_VALUE, or primitive)
IllegalArgumentException: Invalid logical type values (e.g., non-BigDecimal for decimal type)

// HTTP client errors (write method)
IOException: Network connectivity issues, request timeout, or other HTTP client failures

Specific Error Conditions:

  • Status Code Validation: Any HTTP response with status < 200 or >= 300 throws IOException
  • Schema Type Support: Only AVRO, JSON, KEY_VALUE, and primitive schemas supported
  • Logical Type Validation: Strict type checking for AVRO logical types (decimal, date, time, uuid)
  • URI Parsing: Malformed URLs in configuration cause URISyntaxException during open()
  • Network Failures: Connection timeouts, DNS resolution failures throw IOException

Error Recovery

The connector does not implement automatic retry logic. Error handling strategies:

  1. Pulsar Framework: Configure dead letter topic and retry policies at the Pulsar level
  2. Monitoring: Monitor connector status and HTTP endpoint availability
  3. Configuration: Ensure target HTTP endpoints are reliable and properly configured

Configuration Examples

Basic Webhook Configuration

url: "https://hooks.slack.com/services/T00000000/B00000000/XXXXXXXXXXXXXXXXXXXXXXXX"
headers:
  Content-Type: "application/json"

Authenticated API Integration

url: "https://api.example.com/v1/events"
headers:
  Authorization: "Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9..."
  X-API-Key: "your-api-key"
  X-Source: "pulsar-connector"

Custom Webhook with Routing

url: "https://webhook.example.com/pulsar-events"
headers:
  X-Event-Source: "pulsar"
  X-Environment: "production"
  X-Team: "data-platform"

Deployment

NAR Package Deployment

# Download or build the NAR file
wget https://archive.apache.org/dist/pulsar/pulsar-4.0.6/connectors/pulsar-io-http-4.0.6.nar

# Copy to Pulsar connectors directory
cp pulsar-io-http-4.0.6.nar $PULSAR_HOME/connectors/

# Create sink instance
bin/pulsar-admin sinks create \
    --archive pulsar-io-http-4.0.6.nar \
    --inputs persistent://public/default/events \
    --name http-webhook-sink \
    --sink-config '{
        "url": "https://example.com/webhook",
        "headers": {
            "Authorization": "Bearer token"
        }
    }'

Docker Deployment

# Using Pulsar Docker image
docker run -it \
    -v /path/to/config:/pulsar/conf \
    -v /path/to/connectors:/pulsar/connectors \
    apachepulsar/pulsar:4.0.6 \
    bin/pulsar-admin sinks create \
    --archive /pulsar/connectors/pulsar-io-http-4.0.6.nar \
    --inputs my-topic \
    --name http-sink \
    --sink-config-file /pulsar/conf/http-sink.yaml

Management Operations

# List running sinks
bin/pulsar-admin sinks list

# Get sink status
bin/pulsar-admin sinks status --name http-sink

# Update sink configuration
bin/pulsar-admin sinks update \
    --name http-sink \
    --sink-config '{
        "url": "https://new-endpoint.com/webhook",
        "headers": {"Authorization": "Bearer new-token"}
    }'

# Stop and delete sink
bin/pulsar-admin sinks stop --name http-sink
bin/pulsar-admin sinks delete --name http-sink

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-pulsar--pulsar-io-http
Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.pulsar/pulsar-io-http@4.0.x
Badge
tessl/maven-org-apache-pulsar--pulsar-io-http badge