0
# Sink Interfaces
1
2
Sink interfaces define the contract for writing data from Pulsar to external systems.
3
4
## Sink<T>
5
6
The basic sink interface for writing data from Pulsar to external systems.
7
8
```java { .api }
9
package org.apache.pulsar.io.core;
10
11
@InterfaceAudience.Public
12
@InterfaceStability.Stable
13
public interface Sink<T> extends AutoCloseable {
14
/**
15
* Open connector with configuration.
16
*
17
* @param config initialization config
18
* @param sinkContext environment where the sink connector is running
19
* @throws Exception IO type exceptions when opening a connector
20
*/
21
void open(Map<String, Object> config, SinkContext sinkContext) throws Exception;
22
23
/**
24
* Write a message to sink.
25
*
26
* @param record message to write to sink
27
* @throws Exception
28
*/
29
void write(Record<T> record) throws Exception;
30
31
/**
32
* Close the connector and clean up resources.
33
* @throws Exception
34
*/
35
void close() throws Exception;
36
}
37
```
38
39
### Usage Example
40
41
```java
42
public class FileSink implements Sink<String> {
43
private PrintWriter writer;
44
private SinkContext context;
45
46
@Override
47
public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
48
this.context = sinkContext;
49
String filePath = (String) config.get("file.path");
50
this.writer = new PrintWriter(new FileWriter(filePath, true)); // Append mode
51
}
52
53
@Override
54
public void write(Record<String> record) throws Exception {
55
String value = record.getValue();
56
writer.println(value);
57
writer.flush(); // Ensure data is written immediately
58
}
59
60
@Override
61
public void close() throws Exception {
62
if (writer != null) {
63
writer.close();
64
}
65
}
66
}
67
```
68
69
### Database Sink Example
70
71
```java
72
public class DatabaseSink implements Sink<Map<String, Object>> {
73
private Connection connection;
74
private PreparedStatement insertStatement;
75
private SinkContext context;
76
77
@Override
78
public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
79
this.context = sinkContext;
80
String jdbcUrl = (String) config.get("jdbc.url");
81
String tableName = (String) config.get("table.name");
82
83
this.connection = DriverManager.getConnection(jdbcUrl);
84
this.insertStatement = connection.prepareStatement(
85
"INSERT INTO " + tableName + " (id, name, value) VALUES (?, ?, ?)"
86
);
87
}
88
89
@Override
90
public void write(Record<Map<String, Object>> record) throws Exception {
91
Map<String, Object> data = record.getValue();
92
93
insertStatement.setObject(1, data.get("id"));
94
insertStatement.setObject(2, data.get("name"));
95
insertStatement.setObject(3, data.get("value"));
96
97
insertStatement.executeUpdate();
98
}
99
100
@Override
101
public void close() throws Exception {
102
if (insertStatement != null) {
103
insertStatement.close();
104
}
105
if (connection != null) {
106
connection.close();
107
}
108
}
109
}
110
```
111
112
### HTTP Sink Example
113
114
```java
115
public class HttpSink implements Sink<String> {
116
private HttpClient httpClient;
117
private String endpoint;
118
private SinkContext context;
119
120
@Override
121
public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
122
this.context = sinkContext;
123
this.endpoint = (String) config.get("http.endpoint");
124
this.httpClient = HttpClient.newBuilder()
125
.connectTimeout(Duration.ofSeconds(30))
126
.build();
127
}
128
129
@Override
130
public void write(Record<String> record) throws Exception {
131
String payload = record.getValue();
132
133
HttpRequest request = HttpRequest.newBuilder()
134
.uri(URI.create(endpoint))
135
.header("Content-Type", "application/json")
136
.POST(HttpRequest.BodyPublishers.ofString(payload))
137
.build();
138
139
HttpResponse<String> response = httpClient.send(request,
140
HttpResponse.BodyHandlers.ofString());
141
142
if (response.statusCode() >= 400) {
143
throw new Exception("HTTP request failed with status: " + response.statusCode());
144
}
145
}
146
147
@Override
148
public void close() throws Exception {
149
// HttpClient doesn't need explicit closing in Java 11+
150
}
151
}
152
```
153
154
## Types
155
156
```java { .api }
157
// Required imports
158
import java.util.Map;
159
import org.apache.pulsar.functions.api.Record;
160
import org.apache.pulsar.common.classification.InterfaceAudience;
161
import org.apache.pulsar.common.classification.InterfaceStability;
162
```