or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

connector-annotations.mdcontext-interfaces.mdindex.mdpush-sources.mdsink-interfaces.mdsource-interfaces.mdutility-classes.md

sink-interfaces.mddocs/

0

# Sink Interfaces

1

2

Sink interfaces define the contract for writing data from Pulsar to external systems.

3

4

## Sink<T>

5

6

The basic sink interface for writing data from Pulsar to external systems.

7

8

```java { .api }

9

package org.apache.pulsar.io.core;

10

11

@InterfaceAudience.Public

12

@InterfaceStability.Stable

13

public interface Sink<T> extends AutoCloseable {

14

/**

15

* Open connector with configuration.

16

*

17

* @param config initialization config

18

* @param sinkContext environment where the sink connector is running

19

* @throws Exception IO type exceptions when opening a connector

20

*/

21

void open(Map<String, Object> config, SinkContext sinkContext) throws Exception;

22

23

/**

24

* Write a message to sink.

25

*

26

* @param record message to write to sink

27

* @throws Exception

28

*/

29

void write(Record<T> record) throws Exception;

30

31

/**

32

* Close the connector and clean up resources.

33

* @throws Exception

34

*/

35

void close() throws Exception;

36

}

37

```

38

39

### Usage Example

40

41

```java

42

public class FileSink implements Sink<String> {

43

private PrintWriter writer;

44

private SinkContext context;

45

46

@Override

47

public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {

48

this.context = sinkContext;

49

String filePath = (String) config.get("file.path");

50

this.writer = new PrintWriter(new FileWriter(filePath, true)); // Append mode

51

}

52

53

@Override

54

public void write(Record<String> record) throws Exception {

55

String value = record.getValue();

56

writer.println(value);

57

writer.flush(); // Ensure data is written immediately

58

}

59

60

@Override

61

public void close() throws Exception {

62

if (writer != null) {

63

writer.close();

64

}

65

}

66

}

67

```

68

69

### Database Sink Example

70

71

```java

72

public class DatabaseSink implements Sink<Map<String, Object>> {

73

private Connection connection;

74

private PreparedStatement insertStatement;

75

private SinkContext context;

76

77

@Override

78

public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {

79

this.context = sinkContext;

80

String jdbcUrl = (String) config.get("jdbc.url");

81

String tableName = (String) config.get("table.name");

82

83

this.connection = DriverManager.getConnection(jdbcUrl);

84

this.insertStatement = connection.prepareStatement(

85

"INSERT INTO " + tableName + " (id, name, value) VALUES (?, ?, ?)"

86

);

87

}

88

89

@Override

90

public void write(Record<Map<String, Object>> record) throws Exception {

91

Map<String, Object> data = record.getValue();

92

93

insertStatement.setObject(1, data.get("id"));

94

insertStatement.setObject(2, data.get("name"));

95

insertStatement.setObject(3, data.get("value"));

96

97

insertStatement.executeUpdate();

98

}

99

100

@Override

101

public void close() throws Exception {

102

if (insertStatement != null) {

103

insertStatement.close();

104

}

105

if (connection != null) {

106

connection.close();

107

}

108

}

109

}

110

```

111

112

### HTTP Sink Example

113

114

```java

115

public class HttpSink implements Sink<String> {

116

private HttpClient httpClient;

117

private String endpoint;

118

private SinkContext context;

119

120

@Override

121

public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {

122

this.context = sinkContext;

123

this.endpoint = (String) config.get("http.endpoint");

124

this.httpClient = HttpClient.newBuilder()

125

.connectTimeout(Duration.ofSeconds(30))

126

.build();

127

}

128

129

@Override

130

public void write(Record<String> record) throws Exception {

131

String payload = record.getValue();

132

133

HttpRequest request = HttpRequest.newBuilder()

134

.uri(URI.create(endpoint))

135

.header("Content-Type", "application/json")

136

.POST(HttpRequest.BodyPublishers.ofString(payload))

137

.build();

138

139

HttpResponse<String> response = httpClient.send(request,

140

HttpResponse.BodyHandlers.ofString());

141

142

if (response.statusCode() >= 400) {

143

throw new Exception("HTTP request failed with status: " + response.statusCode());

144

}

145

}

146

147

@Override

148

public void close() throws Exception {

149

// HttpClient doesn't need explicit closing in Java 11+

150

}

151

}

152

```

153

154

## Types

155

156

```java { .api }

157

// Required imports

158

import java.util.Map;

159

import org.apache.pulsar.functions.api.Record;

160

import org.apache.pulsar.common.classification.InterfaceAudience;

161

import org.apache.pulsar.common.classification.InterfaceStability;

162

```