0
# Apache Pulsar HTTP Sink Connector
1
2
Apache Pulsar HTTP Sink Connector enables streaming data from Pulsar topics to external HTTP endpoints through webhook-style POST requests. The connector transforms Pulsar records into JSON payloads and enriches HTTP requests with metadata headers, providing a reliable bridge between Pulsar's pub-sub messaging and HTTP-based services.
3
4
## Package Information
5
6
- **Package Name**: pulsar-io-http
7
- **Package Type**: maven
8
- **Language**: Java
9
- **Maven Coordinates**: `org.apache.pulsar:pulsar-io-http:4.0.6`
10
- **Installation**: Include as dependency in Maven/Gradle or deploy as Pulsar connector NAR
11
- **License**: Apache License 2.0
12
13
## Core Imports
14
15
```java
16
import org.apache.pulsar.io.http.HttpSink;
17
import org.apache.pulsar.io.http.HttpSinkConfig;
18
import org.apache.pulsar.io.http.JsonConverter;
19
```
20
21
## Basic Usage
22
23
### Maven Dependency
24
25
```xml
26
<dependency>
27
<groupId>org.apache.pulsar</groupId>
28
<artifactId>pulsar-io-http</artifactId>
29
<version>4.0.6</version>
30
</dependency>
31
```
32
33
### Connector Configuration
34
35
```yaml
36
# http-sink-config.yaml
37
url: "https://example.com/webhook"
38
headers:
39
Authorization: "Bearer your-token"
40
Content-Type: "application/json"
41
X-Custom-Header: "custom-value"
42
```
43
44
### Pulsar Admin Configuration
45
46
```bash
47
# Create sink connector
48
bin/pulsar-admin sinks create \
49
--archive pulsar-io-http-4.0.6.nar \
50
--inputs my-topic \
51
--name http-sink \
52
--sink-config-file http-sink-config.yaml
53
```
54
55
### Programmatic Usage
56
57
```java
58
import java.util.HashMap;
59
import java.util.Map;
60
import org.apache.pulsar.io.http.HttpSink;
61
import org.apache.pulsar.io.http.HttpSinkConfig;
62
63
// Create and configure the sink
64
HttpSink httpSink = new HttpSink();
65
Map<String, Object> config = new HashMap<>();
66
config.put("url", "https://example.com/webhook");
67
Map<String, String> headers = new HashMap<>();
68
headers.put("Authorization", "Bearer token");
69
config.put("headers", headers);
70
71
// Initialize the sink
72
httpSink.open(config, sinkContext);
73
74
// Process records (typically handled by Pulsar runtime)
75
httpSink.write(pulsarRecord);
76
77
// Clean up
78
httpSink.close();
79
```
80
81
## Architecture
82
83
The HTTP Sink Connector follows the Pulsar IO framework architecture:
84
85
- **HttpSink**: Main connector class implementing the `Sink<GenericObject>` interface
86
- **HttpSinkConfig**: Configuration model handling URL and custom headers
87
- **JsonConverter**: Utility for converting AVRO records to JSON format
88
- **HTTP Client**: Java 11 HttpClient for reliable HTTP request handling
89
- **Schema Support**: Built-in support for primitives, AVRO, JSON, and KEY_VALUE schemas
90
91
## Capabilities
92
93
### HTTP Sink Implementation
94
95
Main connector class that processes Pulsar records and sends them as HTTP POST requests.
96
97
```java { .api }
98
public class HttpSink implements Sink<GenericObject> {
99
/**
100
* Initialize the HTTP sink with configuration
101
* @param config - Configuration map with "url" and "headers"
102
* @param sinkContext - Pulsar sink context for runtime information
103
* @throws Exception - Configuration errors or URI parsing failures
104
*/
105
void open(Map<String, Object> config, SinkContext sinkContext) throws Exception;
106
107
/**
108
* Process a Pulsar record and send via HTTP POST
109
* @param record - Pulsar record to be processed and sent via HTTP
110
* @throws Exception - JSON conversion errors or HTTP failures (non-2xx status)
111
*/
112
void write(Record<GenericObject> record) throws Exception;
113
114
/**
115
* Clean up resources (no-op implementation)
116
*/
117
void close();
118
}
119
```
120
121
**Internal Components:**
122
- `HttpSinkConfig httpSinkConfig`: Loaded configuration object
123
- `HttpClient httpClient`: Java 11 HTTP client for requests
124
- `ObjectMapper mapper`: Jackson mapper with JavaTimeModule for JSON serialization
125
- `URI uri`: Parsed target URL for HTTP requests
126
- `toJsonSerializable(Schema, Object)`: Private method handling schema-specific JSON conversion
127
128
**Behavior:**
129
- Converts record value to JSON payload using schema-aware conversion
130
- Adds metadata headers (topic, key, timestamps, message ID, properties)
131
- Sends HTTP POST request to configured URL with application/json content type
132
- Throws `IOException` for HTTP errors (status codes outside 200-299 range)
133
134
### Configuration Management
135
136
Configuration class for HTTP sink connector settings with validation and loading capabilities.
137
138
```java { .api }
139
@Data
140
@Accessors(chain = true)
141
public class HttpSinkConfig implements Serializable {
142
@FieldDoc(defaultValue = "http://localhost", help = "The URL of the HTTP server")
143
private String url = "http://localhost";
144
145
@FieldDoc(defaultValue = "", help = "The list of default headers added to each request")
146
private Map<String, String> headers = new HashMap<>();
147
148
/**
149
* Load configuration from YAML file
150
* @param yamlFile - Path to YAML configuration file
151
* @return HttpSinkConfig instance
152
* @throws IOException - File reading or parsing errors
153
*/
154
static HttpSinkConfig load(String yamlFile) throws IOException;
155
156
/**
157
* Load configuration from Map object
158
* @param map - Configuration map with "url" and "headers" keys
159
* @return HttpSinkConfig instance
160
* @throws IOException - Map conversion or validation errors
161
*/
162
static HttpSinkConfig load(Map<String, Object> map) throws IOException;
163
164
// Lombok @Data generates getter and setter methods
165
String getUrl();
166
HttpSinkConfig setUrl(String url);
167
Map<String, String> getHeaders();
168
HttpSinkConfig setHeaders(Map<String, String> headers);
169
}
170
```
171
172
**Lombok Annotations:**
173
- `@Data`: Generates getters, setters, toString, equals, and hashCode methods
174
- `@Accessors(chain = true)`: Enables fluent method chaining (setters return `this`)
175
- `@FieldDoc`: Pulsar IO annotation for configuration field documentation
176
177
**Configuration Fields:**
178
- `url`: Target HTTP endpoint URL (default: "http://localhost")
179
- `headers`: Custom headers to include in HTTP requests (default: empty map)
180
181
**Loading Methods:**
182
- `load(String yamlFile)`: Load configuration from YAML file using Jackson YAML factory
183
- `load(Map<String, Object> map)`: Load configuration from Map object via JSON conversion
184
185
### JSON Conversion Utilities
186
187
Utility class for converting AVRO GenericRecord objects to Jackson JsonNode format.
188
189
```java { .api }
190
public class JsonConverter {
191
/**
192
* Convert an AVRO GenericRecord to a JsonNode
193
* @param genericRecord - AVRO generic record to convert
194
* @return JsonNode representation, or null if input is null
195
*/
196
static JsonNode toJson(GenericRecord genericRecord);
197
198
/**
199
* Convert typed value with schema to JsonNode
200
* @param schema - AVRO schema for the value
201
* @param value - Value to convert (may be null)
202
* @return JsonNode representation
203
*/
204
static JsonNode toJson(Schema schema, Object value);
205
206
/**
207
* Merge two JSON objects at top level
208
* @param n1 - First JsonNode
209
* @param n2 - Second JsonNode
210
* @return Merged ObjectNode with fields from both inputs
211
*/
212
static JsonNode topLevelMerge(JsonNode n1, JsonNode n2);
213
214
/**
215
* Convert JSON object to array with specified field values
216
* @param jsonNode - Source JSON object
217
* @param fields - List of field names to include in array
218
* @return ArrayNode containing values for the specified fields
219
*/
220
static ArrayNode toJsonArray(JsonNode jsonNode, List<String> fields);
221
}
222
```
223
224
**Supported AVRO Types:**
225
- **Primitives**: NULL, INT, LONG, DOUBLE, FLOAT, BOOLEAN, BYTES, FIXED, ENUM, STRING
226
- **Complex Types**: ARRAY, MAP, RECORD, UNION
227
- **Logical Types**: decimal, date, time-millis, time-micros, timestamp-millis, timestamp-micros, uuid
228
229
## HTTP Request Format
230
231
### Request Structure
232
233
All HTTP requests are sent as POST requests with the following characteristics:
234
235
- **Method**: POST
236
- **Content-Type**: application/json
237
- **Body**: JSON representation of the Pulsar record value
238
239
### Metadata Headers
240
241
The connector automatically adds these headers to every HTTP request:
242
243
```java { .api }
244
// Standard headers
245
"Content-Type" -> "application/json"
246
247
// Pulsar metadata headers
248
"PulsarTopic" -> record.getTopicName()
249
"PulsarKey" -> record.getKey()
250
"PulsarEventTime" -> record.getEventTime().toString()
251
"PulsarPublishTime" -> String.valueOf(message.getPublishTime())
252
"PulsarMessageId" -> Base64.getEncoder().encodeToString(messageId.toByteArray())
253
254
// Custom message properties (prefixed)
255
"PulsarProperties-<property-name>" -> property-value
256
257
// User-configured custom headers
258
<header-name> -> <header-value>
259
```
260
261
### JSON Payload Examples
262
263
**Primitive Value:**
264
```json
265
"hello world"
266
```
267
268
**KEY_VALUE Schema:**
269
```json
270
{
271
"key": "user-123",
272
"value": {
273
"name": "John Doe",
274
"age": 30,
275
"active": true
276
}
277
}
278
```
279
280
**AVRO Record:**
281
```json
282
{
283
"id": 123,
284
"name": "Product A",
285
"price": 29.99,
286
"category": {
287
"id": 1,
288
"name": "Electronics"
289
},
290
"tags": ["gadget", "mobile"]
291
}
292
```
293
294
## Schema Support
295
296
### Supported Schema Types
297
298
The connector supports all Pulsar schema types with automatic JSON conversion:
299
300
```java { .api }
301
// Primitive schemas - direct JSON representation
302
Schema.STRING, Schema.INT8, Schema.INT16, Schema.INT32, Schema.INT64
303
Schema.BOOL, Schema.FLOAT, Schema.DOUBLE, Schema.BYTES
304
305
// Date/time schemas
306
Schema.DATE, Schema.TIME, Schema.TIMESTAMP, Schema.INSTANT
307
Schema.LOCAL_DATE, Schema.LOCAL_TIME, Schema.LOCAL_DATE_TIME
308
309
// Complex schemas
310
Schema.JSON(Class) // Passed through as-is
311
Schema.AVRO(Class) // Converted via JsonConverter
312
Schema.KeyValue(keySchema, valueSchema) // Converted to {"key": ..., "value": ...}
313
```
314
315
### Schema Conversion Examples
316
317
```java
318
// Primitive types
319
Schema.STRING.encode("hello") -> "hello"
320
Schema.INT32.encode(42) -> 42
321
Schema.BOOL.encode(true) -> true
322
323
// Key-Value pairs
324
KeyValueSchema<String, User> kvSchema = Schema.KeyValue(Schema.STRING, Schema.AVRO(User.class));
325
// Produces: {"key": "user-id", "value": {"name": "John", "age": 30}}
326
327
// AVRO records automatically converted to JSON objects
328
// JSON schema values passed through unchanged
329
```
330
331
## Error Handling
332
333
### HTTP Response Handling
334
335
```java { .api }
336
// Successful responses (200-299 status codes)
337
// - Request completes successfully
338
// - No exception thrown
339
// - Record processing continues
340
341
// Error responses (all other status codes)
342
// - Throws IOException with status code
343
// - Stops record processing
344
// - Requires manual intervention or retry logic
345
```
346
347
### Exception Types
348
349
```java { .api }
350
// HTTP request failures (write method)
351
IOException: HTTP request failed with non-2xx status code
352
// Example: "HTTP call to https://example.com/webhook failed with status code 404"
353
354
// Configuration errors (open method)
355
Exception: Configuration loading failures (invalid YAML, missing required fields)
356
URISyntaxException: Invalid URL format in configuration
357
358
// JSON conversion errors (write method)
359
UnsupportedOperationException: Unsupported schema type (not AVRO, JSON, KEY_VALUE, or primitive)
360
IllegalArgumentException: Invalid logical type values (e.g., non-BigDecimal for decimal type)
361
362
// HTTP client errors (write method)
363
IOException: Network connectivity issues, request timeout, or other HTTP client failures
364
```
365
366
**Specific Error Conditions:**
367
- **Status Code Validation**: Any HTTP response with status < 200 or >= 300 throws IOException
368
- **Schema Type Support**: Only AVRO, JSON, KEY_VALUE, and primitive schemas supported
369
- **Logical Type Validation**: Strict type checking for AVRO logical types (decimal, date, time, uuid)
370
- **URI Parsing**: Malformed URLs in configuration cause URISyntaxException during open()
371
- **Network Failures**: Connection timeouts, DNS resolution failures throw IOException
372
373
### Error Recovery
374
375
The connector does not implement automatic retry logic. Error handling strategies:
376
377
1. **Pulsar Framework**: Configure dead letter topic and retry policies at the Pulsar level
378
2. **Monitoring**: Monitor connector status and HTTP endpoint availability
379
3. **Configuration**: Ensure target HTTP endpoints are reliable and properly configured
380
381
## Configuration Examples
382
383
### Basic Webhook Configuration
384
385
```yaml
386
url: "https://hooks.slack.com/services/T00000000/B00000000/XXXXXXXXXXXXXXXXXXXXXXXX"
387
headers:
388
Content-Type: "application/json"
389
```
390
391
### Authenticated API Integration
392
393
```yaml
394
url: "https://api.example.com/v1/events"
395
headers:
396
Authorization: "Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9..."
397
X-API-Key: "your-api-key"
398
X-Source: "pulsar-connector"
399
```
400
401
### Custom Webhook with Routing
402
403
```yaml
404
url: "https://webhook.example.com/pulsar-events"
405
headers:
406
X-Event-Source: "pulsar"
407
X-Environment: "production"
408
X-Team: "data-platform"
409
```
410
411
## Deployment
412
413
### NAR Package Deployment
414
415
```bash
416
# Download or build the NAR file
417
wget https://archive.apache.org/dist/pulsar/pulsar-4.0.6/connectors/pulsar-io-http-4.0.6.nar
418
419
# Copy to Pulsar connectors directory
420
cp pulsar-io-http-4.0.6.nar $PULSAR_HOME/connectors/
421
422
# Create sink instance
423
bin/pulsar-admin sinks create \
424
--archive pulsar-io-http-4.0.6.nar \
425
--inputs persistent://public/default/events \
426
--name http-webhook-sink \
427
--sink-config '{
428
"url": "https://example.com/webhook",
429
"headers": {
430
"Authorization": "Bearer token"
431
}
432
}'
433
```
434
435
### Docker Deployment
436
437
```bash
438
# Using Pulsar Docker image
439
docker run -it \
440
-v /path/to/config:/pulsar/conf \
441
-v /path/to/connectors:/pulsar/connectors \
442
apachepulsar/pulsar:4.0.6 \
443
bin/pulsar-admin sinks create \
444
--archive /pulsar/connectors/pulsar-io-http-4.0.6.nar \
445
--inputs my-topic \
446
--name http-sink \
447
--sink-config-file /pulsar/conf/http-sink.yaml
448
```
449
450
### Management Operations
451
452
```bash
453
# List running sinks
454
bin/pulsar-admin sinks list
455
456
# Get sink status
457
bin/pulsar-admin sinks status --name http-sink
458
459
# Update sink configuration
460
bin/pulsar-admin sinks update \
461
--name http-sink \
462
--sink-config '{
463
"url": "https://new-endpoint.com/webhook",
464
"headers": {"Authorization": "Bearer new-token"}
465
}'
466
467
# Stop and delete sink
468
bin/pulsar-admin sinks stop --name http-sink
469
bin/pulsar-admin sinks delete --name http-sink
470
```