0
# Push Source Classes
1
2
Push-based source classes provide queue-based functionality for asynchronous data ingestion using consumer callback patterns.
3
4
## AbstractPushSource<T>
5
6
Base abstract class providing queue-based push source functionality with internal buffering.
7
8
```java { .api }
9
package org.apache.pulsar.io.core;
10
11
public abstract class AbstractPushSource<T> {
12
/**
13
* Default queue length for internal buffering.
14
*/
15
static final int DEFAULT_QUEUE_LENGTH = 1000;
16
17
/**
18
* Constructor initializing internal queue with default capacity.
19
*/
20
public AbstractPushSource();
21
22
/**
23
* Read next record from internal queue.
24
* This method is used internally by push source implementations.
25
*
26
* @return next record from queue or null if queue is empty
27
* @throws Exception
28
*/
29
protected Record<T> readNext() throws Exception;
30
31
/**
32
* Add record to internal queue.
33
* This method should be called by external systems to push data.
34
*
35
* @param record record to add to queue
36
*/
37
public void consume(Record<T> record);
38
39
/**
40
* Get queue capacity.
41
*
42
* @return queue capacity (default 1000)
43
*/
44
public int getQueueLength();
45
46
/**
47
* Notify of asynchronous errors.
48
* This allows external systems to report errors that occurred during async operations.
49
*
50
* @param ex exception that occurred
51
*/
52
public void notifyError(Exception ex);
53
}
54
```
55
56
## PushSource<T>
57
58
Push-based source that uses a consumer callback pattern, extending AbstractPushSource and implementing the Source interface.
59
60
```java { .api }
61
package org.apache.pulsar.io.core;
62
63
@InterfaceAudience.Public
64
@InterfaceStability.Stable
65
public abstract class PushSource<T> extends AbstractPushSource<T> implements Source<T> {
66
/**
67
* Reads the next message using push mechanism.
68
* Overrides Source.read() to use internal queue-based mechanism.
69
*
70
* @return next message from source
71
* @throws Exception
72
*/
73
Record<T> read() throws Exception;
74
}
75
```
76
77
### Usage Example
78
79
```java
80
public class WebSocketPushSource extends PushSource<String> {
81
private WebSocketClient client;
82
private SourceContext context;
83
84
@Override
85
public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {
86
this.context = sourceContext;
87
String wsUrl = (String) config.get("websocket.url");
88
89
this.client = new WebSocketClient();
90
this.client.onMessage(message -> {
91
// Push received message to internal queue
92
this.consume(new SimpleRecord<>(null, message));
93
});
94
95
this.client.onError(error -> {
96
// Notify of async errors
97
this.notifyError(error);
98
});
99
100
client.connect(wsUrl);
101
}
102
103
@Override
104
public void close() throws Exception {
105
if (client != null) {
106
client.disconnect();
107
}
108
}
109
}
110
```
111
112
## BatchPushSource<T>
113
114
Batch push source combining batch processing with push pattern, extending AbstractPushSource and implementing BatchSource.
115
116
```java { .api }
117
package org.apache.pulsar.io.core;
118
119
@InterfaceAudience.Public
120
@InterfaceStability.Evolving
121
public abstract class BatchPushSource<T> extends AbstractPushSource<T> implements BatchSource<T> {
122
/**
123
* Read next record using push mechanism.
124
* Overrides BatchSource.readNext() to use internal queue-based mechanism.
125
*
126
* @return next record or null when current task is complete
127
* @throws Exception
128
*/
129
Record<T> readNext() throws Exception;
130
}
131
```
132
133
### Usage Example
134
135
```java
136
public class KafkaBatchPushSource extends BatchPushSource<byte[]> {
137
private KafkaConsumer<String, byte[]> consumer;
138
private SourceContext context;
139
private String currentTopic;
140
141
@Override
142
public void open(Map<String, Object> config, SourceContext context) throws Exception {
143
this.context = context;
144
Properties props = new Properties();
145
props.put("bootstrap.servers", config.get("kafka.brokers"));
146
props.put("group.id", config.get("kafka.group.id"));
147
props.put("key.deserializer", StringDeserializer.class.getName());
148
props.put("value.deserializer", ByteArrayDeserializer.class.getName());
149
150
this.consumer = new KafkaConsumer<>(props);
151
}
152
153
@Override
154
public void discover(Consumer<byte[]> taskEater) throws Exception {
155
// Discover available Kafka topics
156
Map<String, List<PartitionInfo>> topics = consumer.listTopics();
157
for (String topic : topics.keySet()) {
158
taskEater.accept(topic.getBytes());
159
}
160
}
161
162
@Override
163
public void prepare(byte[] task) throws Exception {
164
this.currentTopic = new String(task);
165
consumer.subscribe(Collections.singletonList(currentTopic));
166
167
// Start background polling that pushes records to queue
168
startBackgroundPolling();
169
}
170
171
private void startBackgroundPolling() {
172
new Thread(() -> {
173
try {
174
while (!Thread.currentThread().isInterrupted()) {
175
ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofMillis(1000));
176
for (ConsumerRecord<String, byte[]> record : records) {
177
// Push each record to internal queue
178
this.consume(new SimpleRecord<>(record.key(), record.value()));
179
}
180
}
181
} catch (Exception e) {
182
this.notifyError(e);
183
}
184
}).start();
185
}
186
187
@Override
188
public void close() throws Exception {
189
if (consumer != null) {
190
consumer.close();
191
}
192
}
193
}
194
```
195
196
## Event-Driven Push Source Example
197
198
```java
199
public class EventDrivenPushSource extends PushSource<Map<String, Object>> {
200
private EventBus eventBus;
201
private SourceContext context;
202
203
@Override
204
public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {
205
this.context = sourceContext;
206
this.eventBus = new EventBus();
207
208
// Register event handlers that push data to queue
209
eventBus.register(new Object() {
210
@Subscribe
211
public void handleDataEvent(DataEvent event) {
212
Map<String, Object> data = event.getData();
213
consume(new SimpleRecord<>(event.getId(), data));
214
}
215
216
@Subscribe
217
public void handleErrorEvent(ErrorEvent event) {
218
notifyError(event.getException());
219
}
220
});
221
222
// Start event processing
223
eventBus.start();
224
}
225
226
@Override
227
public void close() throws Exception {
228
if (eventBus != null) {
229
eventBus.stop();
230
}
231
}
232
}
233
```
234
235
## Types
236
237
```java { .api }
238
// Required imports
239
import java.util.Map;
240
import java.util.function.Consumer;
241
import org.apache.pulsar.functions.api.Record;
242
import org.apache.pulsar.common.classification.InterfaceAudience;
243
import org.apache.pulsar.common.classification.InterfaceStability;
244
```