or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

connector-annotations.mdcontext-interfaces.mdindex.mdpush-sources.mdsink-interfaces.mdsource-interfaces.mdutility-classes.md

source-interfaces.mddocs/

0

# Source Interfaces

1

2

Source interfaces define the contract for reading data from external systems and publishing to Pulsar topics.

3

4

## Source<T>

5

6

The basic pull-based source interface for reading data from external sources.

7

8

```java { .api }

9

package org.apache.pulsar.io.core;

10

11

@InterfaceAudience.Public

12

@InterfaceStability.Stable

13

public interface Source<T> extends AutoCloseable {

14

/**

15

* Open connector with configuration.

16

*

17

* @param config initialization config

18

* @param sourceContext environment where the source connector is running

19

* @throws Exception IO type exceptions when opening a connector

20

*/

21

void open(Map<String, Object> config, SourceContext sourceContext) throws Exception;

22

23

/**

24

* Reads the next message from source.

25

* If source does not have any new messages, this call should block.

26

* @return next message from source. The return result should never be null

27

* @throws Exception

28

*/

29

Record<T> read() throws Exception;

30

31

/**

32

* Close the connector and clean up resources.

33

* @throws Exception

34

*/

35

void close() throws Exception;

36

}

37

```

38

39

### Usage Example

40

41

```java

42

public class FileSource implements Source<String> {

43

private BufferedReader reader;

44

private SourceContext context;

45

46

@Override

47

public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {

48

this.context = sourceContext;

49

String filePath = (String) config.get("file.path");

50

this.reader = new BufferedReader(new FileReader(filePath));

51

}

52

53

@Override

54

public Record<String> read() throws Exception {

55

String line = reader.readLine();

56

if (line != null) {

57

return new SimpleRecord<>(null, line);

58

}

59

// Block waiting for more data or return when file ends

60

Thread.sleep(1000);

61

return read(); // Retry

62

}

63

64

@Override

65

public void close() throws Exception {

66

if (reader != null) {

67

reader.close();

68

}

69

}

70

}

71

```

72

73

## BatchSource<T>

74

75

Interface for batch-based sources that process data in batches with distinct lifecycle phases.

76

77

```java { .api }

78

package org.apache.pulsar.io.core;

79

80

@InterfaceAudience.Public

81

@InterfaceStability.Evolving

82

public interface BatchSource<T> extends AutoCloseable {

83

/**

84

* Open and initialize the source with configuration.

85

*

86

* @param config initialization config

87

* @param context environment where the source connector is running

88

* @throws Exception IO type exceptions when opening a connector

89

*/

90

void open(Map<String, Object> config, SourceContext context) throws Exception;

91

92

/**

93

* Discovery phase for finding available tasks/partitions to process.

94

*

95

* @param taskEater consumer that accepts discovered task identifiers

96

* @throws Exception

97

*/

98

void discover(Consumer<byte[]> taskEater) throws Exception;

99

100

/**

101

* Prepare to process a specific task identified during discovery.

102

*

103

* @param task task identifier from discovery phase

104

* @throws Exception

105

*/

106

void prepare(byte[] task) throws Exception;

107

108

/**

109

* Read next record from current task.

110

*

111

* @return next record or null when current task is complete

112

* @throws Exception

113

*/

114

Record<T> readNext() throws Exception;

115

116

/**

117

* Close the connector and clean up resources.

118

* @throws Exception

119

*/

120

void close() throws Exception;

121

}

122

```

123

124

### Usage Example

125

126

```java

127

public class DatabaseBatchSource implements BatchSource<Map<String, Object>> {

128

private Connection connection;

129

private PreparedStatement currentQuery;

130

private ResultSet currentResults;

131

private SourceContext context;

132

133

@Override

134

public void open(Map<String, Object> config, SourceContext context) throws Exception {

135

this.context = context;

136

String jdbcUrl = (String) config.get("jdbc.url");

137

this.connection = DriverManager.getConnection(jdbcUrl);

138

}

139

140

@Override

141

public void discover(Consumer<byte[]> taskEater) throws Exception {

142

// Discover available tables or partitions

143

ResultSet tables = connection.getMetaData().getTables(null, null, "%", new String[]{"TABLE"});

144

while (tables.next()) {

145

String tableName = tables.getString("TABLE_NAME");

146

taskEater.accept(tableName.getBytes());

147

}

148

}

149

150

@Override

151

public void prepare(byte[] task) throws Exception {

152

String tableName = new String(task);

153

currentQuery = connection.prepareStatement("SELECT * FROM " + tableName);

154

currentResults = currentQuery.executeQuery();

155

}

156

157

@Override

158

public Record<Map<String, Object>> readNext() throws Exception {

159

if (currentResults.next()) {

160

Map<String, Object> row = new HashMap<>();

161

ResultSetMetaData metadata = currentResults.getMetaData();

162

for (int i = 1; i <= metadata.getColumnCount(); i++) {

163

row.put(metadata.getColumnName(i), currentResults.getObject(i));

164

}

165

return new SimpleRecord<>(null, row);

166

}

167

return null; // Task complete

168

}

169

170

@Override

171

public void close() throws Exception {

172

if (currentResults != null) currentResults.close();

173

if (currentQuery != null) currentQuery.close();

174

if (connection != null) connection.close();

175

}

176

}

177

```

178

179

## BatchSourceTriggerer

180

181

Interface for triggering discovery in batch sources, allowing external systems to control when batch processing should begin.

182

183

```java { .api }

184

package org.apache.pulsar.io.core;

185

186

@InterfaceAudience.Public

187

@InterfaceStability.Evolving

188

public interface BatchSourceTriggerer {

189

/**

190

* Initialize the triggerer with configuration.

191

*

192

* @param config initialization config

193

* @param sourceContext environment where the source connector is running

194

* @throws Exception

195

*/

196

void init(Map<String, Object> config, SourceContext sourceContext) throws Exception;

197

198

/**

199

* Start triggering discovery with callback function.

200

*

201

* @param trigger callback function to invoke when discovery should be triggered

202

*/

203

void start(Consumer<String> trigger);

204

205

/**

206

* Stop triggering discovery.

207

*/

208

void stop();

209

}

210

```

211

212

### Usage Example

213

214

```java

215

public class ScheduledBatchTriggerer implements BatchSourceTriggerer {

216

private ScheduledExecutorService scheduler;

217

private SourceContext context;

218

219

@Override

220

public void init(Map<String, Object> config, SourceContext sourceContext) throws Exception {

221

this.context = sourceContext;

222

this.scheduler = Executors.newScheduledThreadPool(1);

223

}

224

225

@Override

226

public void start(Consumer<String> trigger) {

227

// Trigger discovery every hour

228

scheduler.scheduleAtFixedRate(

229

() -> trigger.accept("scheduled-trigger"),

230

0, 1, TimeUnit.HOURS

231

);

232

}

233

234

@Override

235

public void stop() {

236

if (scheduler != null) {

237

scheduler.shutdown();

238

}

239

}

240

}

241

```

242

243

## Types

244

245

```java { .api }

246

// Required imports

247

import java.util.Map;

248

import java.util.function.Consumer;

249

import org.apache.pulsar.functions.api.Record;

250

import org.apache.pulsar.common.classification.InterfaceAudience;

251

import org.apache.pulsar.common.classification.InterfaceStability;

252

```