or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

connector-annotations.mdcontext-interfaces.mdindex.mdpush-sources.mdsink-interfaces.mdsource-interfaces.mdutility-classes.md

push-sources.mddocs/

0

# Push Source Classes

1

2

Push-based source classes provide queue-based functionality for asynchronous data ingestion using consumer callback patterns.

3

4

## AbstractPushSource<T>

5

6

Base abstract class providing queue-based push source functionality with internal buffering.

7

8

```java { .api }

9

package org.apache.pulsar.io.core;

10

11

public abstract class AbstractPushSource<T> {

12

/**

13

* Default queue length for internal buffering.

14

*/

15

static final int DEFAULT_QUEUE_LENGTH = 1000;

16

17

/**

18

* Constructor initializing internal queue with default capacity.

19

*/

20

public AbstractPushSource();

21

22

/**

23

* Read next record from internal queue.

24

* This method is used internally by push source implementations.

25

*

26

* @return next record from queue or null if queue is empty

27

* @throws Exception

28

*/

29

protected Record<T> readNext() throws Exception;

30

31

/**

32

* Add record to internal queue.

33

* This method should be called by external systems to push data.

34

*

35

* @param record record to add to queue

36

*/

37

public void consume(Record<T> record);

38

39

/**

40

* Get queue capacity.

41

*

42

* @return queue capacity (default 1000)

43

*/

44

public int getQueueLength();

45

46

/**

47

* Notify of asynchronous errors.

48

* This allows external systems to report errors that occurred during async operations.

49

*

50

* @param ex exception that occurred

51

*/

52

public void notifyError(Exception ex);

53

}

54

```

55

56

## PushSource<T>

57

58

Push-based source that uses a consumer callback pattern, extending AbstractPushSource and implementing the Source interface.

59

60

```java { .api }

61

package org.apache.pulsar.io.core;

62

63

@InterfaceAudience.Public

64

@InterfaceStability.Stable

65

public abstract class PushSource<T> extends AbstractPushSource<T> implements Source<T> {

66

/**

67

* Reads the next message using push mechanism.

68

* Overrides Source.read() to use internal queue-based mechanism.

69

*

70

* @return next message from source

71

* @throws Exception

72

*/

73

Record<T> read() throws Exception;

74

}

75

```

76

77

### Usage Example

78

79

```java

80

public class WebSocketPushSource extends PushSource<String> {

81

private WebSocketClient client;

82

private SourceContext context;

83

84

@Override

85

public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {

86

this.context = sourceContext;

87

String wsUrl = (String) config.get("websocket.url");

88

89

this.client = new WebSocketClient();

90

this.client.onMessage(message -> {

91

// Push received message to internal queue

92

this.consume(new SimpleRecord<>(null, message));

93

});

94

95

this.client.onError(error -> {

96

// Notify of async errors

97

this.notifyError(error);

98

});

99

100

client.connect(wsUrl);

101

}

102

103

@Override

104

public void close() throws Exception {

105

if (client != null) {

106

client.disconnect();

107

}

108

}

109

}

110

```

111

112

## BatchPushSource<T>

113

114

Batch push source combining batch processing with push pattern, extending AbstractPushSource and implementing BatchSource.

115

116

```java { .api }

117

package org.apache.pulsar.io.core;

118

119

@InterfaceAudience.Public

120

@InterfaceStability.Evolving

121

public abstract class BatchPushSource<T> extends AbstractPushSource<T> implements BatchSource<T> {

122

/**

123

* Read next record using push mechanism.

124

* Overrides BatchSource.readNext() to use internal queue-based mechanism.

125

*

126

* @return next record or null when current task is complete

127

* @throws Exception

128

*/

129

Record<T> readNext() throws Exception;

130

}

131

```

132

133

### Usage Example

134

135

```java

136

public class KafkaBatchPushSource extends BatchPushSource<byte[]> {

137

private KafkaConsumer<String, byte[]> consumer;

138

private SourceContext context;

139

private String currentTopic;

140

141

@Override

142

public void open(Map<String, Object> config, SourceContext context) throws Exception {

143

this.context = context;

144

Properties props = new Properties();

145

props.put("bootstrap.servers", config.get("kafka.brokers"));

146

props.put("group.id", config.get("kafka.group.id"));

147

props.put("key.deserializer", StringDeserializer.class.getName());

148

props.put("value.deserializer", ByteArrayDeserializer.class.getName());

149

150

this.consumer = new KafkaConsumer<>(props);

151

}

152

153

@Override

154

public void discover(Consumer<byte[]> taskEater) throws Exception {

155

// Discover available Kafka topics

156

Map<String, List<PartitionInfo>> topics = consumer.listTopics();

157

for (String topic : topics.keySet()) {

158

taskEater.accept(topic.getBytes());

159

}

160

}

161

162

@Override

163

public void prepare(byte[] task) throws Exception {

164

this.currentTopic = new String(task);

165

consumer.subscribe(Collections.singletonList(currentTopic));

166

167

// Start background polling that pushes records to queue

168

startBackgroundPolling();

169

}

170

171

private void startBackgroundPolling() {

172

new Thread(() -> {

173

try {

174

while (!Thread.currentThread().isInterrupted()) {

175

ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofMillis(1000));

176

for (ConsumerRecord<String, byte[]> record : records) {

177

// Push each record to internal queue

178

this.consume(new SimpleRecord<>(record.key(), record.value()));

179

}

180

}

181

} catch (Exception e) {

182

this.notifyError(e);

183

}

184

}).start();

185

}

186

187

@Override

188

public void close() throws Exception {

189

if (consumer != null) {

190

consumer.close();

191

}

192

}

193

}

194

```

195

196

## Event-Driven Push Source Example

197

198

```java

199

public class EventDrivenPushSource extends PushSource<Map<String, Object>> {

200

private EventBus eventBus;

201

private SourceContext context;

202

203

@Override

204

public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {

205

this.context = sourceContext;

206

this.eventBus = new EventBus();

207

208

// Register event handlers that push data to queue

209

eventBus.register(new Object() {

210

@Subscribe

211

public void handleDataEvent(DataEvent event) {

212

Map<String, Object> data = event.getData();

213

consume(new SimpleRecord<>(event.getId(), data));

214

}

215

216

@Subscribe

217

public void handleErrorEvent(ErrorEvent event) {

218

notifyError(event.getException());

219

}

220

});

221

222

// Start event processing

223

eventBus.start();

224

}

225

226

@Override

227

public void close() throws Exception {

228

if (eventBus != null) {

229

eventBus.stop();

230

}

231

}

232

}

233

```

234

235

## Types

236

237

```java { .api }

238

// Required imports

239

import java.util.Map;

240

import java.util.function.Consumer;

241

import org.apache.pulsar.functions.api.Record;

242

import org.apache.pulsar.common.classification.InterfaceAudience;

243

import org.apache.pulsar.common.classification.InterfaceStability;

244

```