0
# Source Interfaces
1
2
Source interfaces define the contract for reading data from external systems and publishing to Pulsar topics.
3
4
## Source<T>
5
6
The basic pull-based source interface for reading data from external sources.
7
8
```java { .api }
9
package org.apache.pulsar.io.core;
10
11
@InterfaceAudience.Public
12
@InterfaceStability.Stable
13
public interface Source<T> extends AutoCloseable {
14
/**
15
* Open connector with configuration.
16
*
17
* @param config initialization config
18
* @param sourceContext environment where the source connector is running
19
* @throws Exception IO type exceptions when opening a connector
20
*/
21
void open(Map<String, Object> config, SourceContext sourceContext) throws Exception;
22
23
/**
24
* Reads the next message from source.
25
* If source does not have any new messages, this call should block.
26
* @return next message from source. The return result should never be null
27
* @throws Exception
28
*/
29
Record<T> read() throws Exception;
30
31
/**
32
* Close the connector and clean up resources.
33
* @throws Exception
34
*/
35
void close() throws Exception;
36
}
37
```
38
39
### Usage Example
40
41
```java
42
public class FileSource implements Source<String> {
43
private BufferedReader reader;
44
private SourceContext context;
45
46
@Override
47
public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {
48
this.context = sourceContext;
49
String filePath = (String) config.get("file.path");
50
this.reader = new BufferedReader(new FileReader(filePath));
51
}
52
53
@Override
54
public Record<String> read() throws Exception {
55
String line = reader.readLine();
56
if (line != null) {
57
return new SimpleRecord<>(null, line);
58
}
59
// Block waiting for more data or return when file ends
60
Thread.sleep(1000);
61
return read(); // Retry
62
}
63
64
@Override
65
public void close() throws Exception {
66
if (reader != null) {
67
reader.close();
68
}
69
}
70
}
71
```
72
73
## BatchSource<T>
74
75
Interface for batch-based sources that process data in batches with distinct lifecycle phases.
76
77
```java { .api }
78
package org.apache.pulsar.io.core;
79
80
@InterfaceAudience.Public
81
@InterfaceStability.Evolving
82
public interface BatchSource<T> extends AutoCloseable {
83
/**
84
* Open and initialize the source with configuration.
85
*
86
* @param config initialization config
87
* @param context environment where the source connector is running
88
* @throws Exception IO type exceptions when opening a connector
89
*/
90
void open(Map<String, Object> config, SourceContext context) throws Exception;
91
92
/**
93
* Discovery phase for finding available tasks/partitions to process.
94
*
95
* @param taskEater consumer that accepts discovered task identifiers
96
* @throws Exception
97
*/
98
void discover(Consumer<byte[]> taskEater) throws Exception;
99
100
/**
101
* Prepare to process a specific task identified during discovery.
102
*
103
* @param task task identifier from discovery phase
104
* @throws Exception
105
*/
106
void prepare(byte[] task) throws Exception;
107
108
/**
109
* Read next record from current task.
110
*
111
* @return next record or null when current task is complete
112
* @throws Exception
113
*/
114
Record<T> readNext() throws Exception;
115
116
/**
117
* Close the connector and clean up resources.
118
* @throws Exception
119
*/
120
void close() throws Exception;
121
}
122
```
123
124
### Usage Example
125
126
```java
127
public class DatabaseBatchSource implements BatchSource<Map<String, Object>> {
128
private Connection connection;
129
private PreparedStatement currentQuery;
130
private ResultSet currentResults;
131
private SourceContext context;
132
133
@Override
134
public void open(Map<String, Object> config, SourceContext context) throws Exception {
135
this.context = context;
136
String jdbcUrl = (String) config.get("jdbc.url");
137
this.connection = DriverManager.getConnection(jdbcUrl);
138
}
139
140
@Override
141
public void discover(Consumer<byte[]> taskEater) throws Exception {
142
// Discover available tables or partitions
143
ResultSet tables = connection.getMetaData().getTables(null, null, "%", new String[]{"TABLE"});
144
while (tables.next()) {
145
String tableName = tables.getString("TABLE_NAME");
146
taskEater.accept(tableName.getBytes());
147
}
148
}
149
150
@Override
151
public void prepare(byte[] task) throws Exception {
152
String tableName = new String(task);
153
currentQuery = connection.prepareStatement("SELECT * FROM " + tableName);
154
currentResults = currentQuery.executeQuery();
155
}
156
157
@Override
158
public Record<Map<String, Object>> readNext() throws Exception {
159
if (currentResults.next()) {
160
Map<String, Object> row = new HashMap<>();
161
ResultSetMetaData metadata = currentResults.getMetaData();
162
for (int i = 1; i <= metadata.getColumnCount(); i++) {
163
row.put(metadata.getColumnName(i), currentResults.getObject(i));
164
}
165
return new SimpleRecord<>(null, row);
166
}
167
return null; // Task complete
168
}
169
170
@Override
171
public void close() throws Exception {
172
if (currentResults != null) currentResults.close();
173
if (currentQuery != null) currentQuery.close();
174
if (connection != null) connection.close();
175
}
176
}
177
```
178
179
## BatchSourceTriggerer
180
181
Interface for triggering discovery in batch sources, allowing external systems to control when batch processing should begin.
182
183
```java { .api }
184
package org.apache.pulsar.io.core;
185
186
@InterfaceAudience.Public
187
@InterfaceStability.Evolving
188
public interface BatchSourceTriggerer {
189
/**
190
* Initialize the triggerer with configuration.
191
*
192
* @param config initialization config
193
* @param sourceContext environment where the source connector is running
194
* @throws Exception
195
*/
196
void init(Map<String, Object> config, SourceContext sourceContext) throws Exception;
197
198
/**
199
* Start triggering discovery with callback function.
200
*
201
* @param trigger callback function to invoke when discovery should be triggered
202
*/
203
void start(Consumer<String> trigger);
204
205
/**
206
* Stop triggering discovery.
207
*/
208
void stop();
209
}
210
```
211
212
### Usage Example
213
214
```java
215
public class ScheduledBatchTriggerer implements BatchSourceTriggerer {
216
private ScheduledExecutorService scheduler;
217
private SourceContext context;
218
219
@Override
220
public void init(Map<String, Object> config, SourceContext sourceContext) throws Exception {
221
this.context = sourceContext;
222
this.scheduler = Executors.newScheduledThreadPool(1);
223
}
224
225
@Override
226
public void start(Consumer<String> trigger) {
227
// Trigger discovery every hour
228
scheduler.scheduleAtFixedRate(
229
() -> trigger.accept("scheduled-trigger"),
230
0, 1, TimeUnit.HOURS
231
);
232
}
233
234
@Override
235
public void stop() {
236
if (scheduler != null) {
237
scheduler.shutdown();
238
}
239
}
240
}
241
```
242
243
## Types
244
245
```java { .api }
246
// Required imports
247
import java.util.Map;
248
import java.util.function.Consumer;
249
import org.apache.pulsar.functions.api.Record;
250
import org.apache.pulsar.common.classification.InterfaceAudience;
251
import org.apache.pulsar.common.classification.InterfaceStability;
252
```