or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

deserialization.mdindex.mdsink.mdsource.md

sink.mddocs/

0

# Message Publishing (Sink)

1

2

The PubSubSink provides reliable message publishing to Google Cloud Pub/Sub topics with at-least-once delivery semantics, automatic batching, and configurable retry policies.

3

4

## Capabilities

5

6

### PubSubSink Builder

7

8

Creates a new PubSubSink using the builder pattern. The builder enforces required parameters through type-safe interfaces.

9

10

```java { .api }

11

/**

12

* Creates a new builder for PubSubSink

13

* @return SerializationSchemaBuilder for setting serialization schema

14

*/

15

public static SerializationSchemaBuilder newBuilder();

16

```

17

18

### Serialization Schema Configuration

19

20

Configure how messages are serialized for Pub/Sub.

21

22

```java { .api }

23

public static class SerializationSchemaBuilder {

24

/**

25

* Set serialization schema for converting objects to byte arrays

26

* @param serializationSchema Schema for serializing message data

27

* @return ProjectNameBuilder for next configuration step

28

*/

29

public <IN> ProjectNameBuilder<IN> withSerializationSchema(

30

SerializationSchema<IN> serializationSchema);

31

}

32

```

33

34

### Project and Topic Configuration

35

36

Configure the GCP project and Pub/Sub topic.

37

38

```java { .api }

39

public interface ProjectNameBuilder<IN> {

40

/**

41

* Set the GCP project name containing the topic

42

* @param projectName GCP project name

43

* @return TopicNameBuilder for next configuration step

44

*/

45

TopicNameBuilder<IN> withProjectName(String projectName);

46

}

47

48

public interface TopicNameBuilder<IN> {

49

/**

50

* Set the Pub/Sub topic name to publish to

51

* @param topicName Topic name

52

* @return PubSubSinkBuilder for additional configuration

53

*/

54

PubSubSinkBuilder<IN> withTopicName(String topicName);

55

}

56

```

57

58

### Sink Builder Configuration

59

60

Main builder class for configuring optional parameters.

61

62

```java { .api }

63

public static class PubSubSinkBuilder<IN> {

64

/**

65

* Set authentication credentials (optional - uses default credentials if not set)

66

* @param credentials Google Cloud credentials

67

* @return Current builder instance

68

*/

69

public PubSubSinkBuilder<IN> withCredentials(Credentials credentials);

70

71

/**

72

* Set custom hostname/port for Pub/Sub emulator (testing only)

73

* @param hostAndPort Host and port combination ("hostname:1234")

74

* @return Current builder instance

75

*/

76

public PubSubSinkBuilder<IN> withHostAndPortForEmulator(String hostAndPort);

77

78

/**

79

* Build the configured PubSubSink

80

* @return Configured PubSubSink instance

81

* @throws IOException If credentials cannot be obtained

82

* @throws IllegalArgumentException If required fields are missing

83

*/

84

public PubSubSink<IN> build() throws IOException;

85

}

86

```

87

88

### Core Sink Methods

89

90

Key methods of the PubSubSink class.

91

92

```java { .api }

93

public class PubSubSink<IN> extends RichSinkFunction<IN> implements CheckpointedFunction {

94

95

/**

96

* Process and publish a message to Pub/Sub

97

* @param message Message to publish

98

* @param context Sink context (provides processing time, etc.)

99

*/

100

public void invoke(IN message, SinkFunction.Context context);

101

102

/**

103

* Called during checkpointing - ensures all pending messages are published

104

* @param context Checkpoint context

105

* @throws Exception If publishing fails

106

*/

107

public void snapshotState(FunctionSnapshotContext context) throws Exception;

108

109

/**

110

* Initialize state - called once per subtask

111

* @param context Initialization context

112

*/

113

public void initializeState(FunctionInitializationContext context);

114

}

115

```

116

117

## Usage Examples

118

119

### Basic String Publishing

120

121

```java

122

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

123

import org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSink;

124

import org.apache.flink.api.common.serialization.SimpleStringSchema;

125

126

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

127

128

PubSubSink<String> sink = PubSubSink.newBuilder()

129

.withSerializationSchema(new SimpleStringSchema())

130

.withProjectName("my-gcp-project")

131

.withTopicName("my-topic")

132

.build();

133

134

env.fromElements("Hello", "World", "Pub/Sub")

135

.addSink(sink);

136

137

env.execute("Basic Pub/Sub Producer");

138

```

139

140

### JSON Object Publishing

141

142

```java

143

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;

144

import org.apache.flink.api.common.serialization.SerializationSchema;

145

146

public class JsonSerializationSchema<T> implements SerializationSchema<T> {

147

private ObjectMapper mapper = new ObjectMapper();

148

149

@Override

150

public byte[] serialize(T element) {

151

try {

152

return mapper.writeValueAsBytes(element);

153

} catch (Exception e) {

154

throw new RuntimeException("Failed to serialize object", e);

155

}

156

}

157

}

158

159

public class MyEvent {

160

public String eventType;

161

public long timestamp;

162

public String userId;

163

// ... other fields

164

}

165

166

PubSubSink<MyEvent> sink = PubSubSink.newBuilder()

167

.withSerializationSchema(new JsonSerializationSchema<MyEvent>())

168

.withProjectName("my-gcp-project")

169

.withTopicName("events")

170

.build();

171

172

DataStream<MyEvent> events = env.addSource(/* some source */);

173

events.addSink(sink);

174

```

175

176

### Advanced Configuration with Custom Credentials

177

178

```java

179

import com.google.auth.oauth2.ServiceAccountCredentials;

180

import java.io.FileInputStream;

181

182

// Load service account credentials

183

Credentials credentials = ServiceAccountCredentials

184

.fromStream(new FileInputStream("path/to/service-account.json"));

185

186

PubSubSink<String> sink = PubSubSink.newBuilder()

187

.withSerializationSchema(new SimpleStringSchema())

188

.withProjectName("my-gcp-project")

189

.withTopicName("my-topic")

190

.withCredentials(credentials)

191

.build();

192

193

env.addSource(/* some source */)

194

.addSink(sink);

195

```

196

197

### Using with Pub/Sub Emulator for Testing

198

199

```java

200

// Start Pub/Sub emulator first:

201

// gcloud beta emulators pubsub start --host-port=localhost:8085

202

203

PubSubSink<String> sink = PubSubSink.newBuilder()

204

.withSerializationSchema(new SimpleStringSchema())

205

.withProjectName("test-project")

206

.withTopicName("test-topic")

207

.withHostAndPortForEmulator("localhost:8085")

208

.build();

209

210

env.fromElements("test-message-1", "test-message-2")

211

.addSink(sink);

212

```

213

214

### Publishing with Custom Message Attributes

215

216

For advanced use cases requiring message attributes, you would need to implement a custom serialization approach or use the Pub/Sub client library directly within a custom sink function:

217

218

```java

219

import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

220

import com.google.cloud.pubsub.v1.Publisher;

221

import com.google.pubsub.v1.PubsubMessage;

222

import com.google.protobuf.ByteString;

223

224

public class CustomPubSubSink extends RichSinkFunction<MyEventWithAttributes> {

225

private transient Publisher publisher;

226

227

@Override

228

public void open(Configuration parameters) throws Exception {

229

// Initialize publisher

230

publisher = Publisher.newBuilder(TopicName.of("project", "topic")).build();

231

}

232

233

@Override

234

public void invoke(MyEventWithAttributes event, Context context) throws Exception {

235

PubsubMessage message = PubsubMessage.newBuilder()

236

.setData(ByteString.copyFromUtf8(event.getData()))

237

.putAttributes("eventType", event.getEventType())

238

.putAttributes("source", event.getSource())

239

.build();

240

241

publisher.publish(message);

242

}

243

244

@Override

245

public void close() throws Exception {

246

if (publisher != null) {

247

publisher.shutdown();

248

}

249

}

250

}

251

```

252

253

## Error Handling and Reliability

254

255

### At-Least-Once Delivery

256

257

The PubSubSink provides at-least-once delivery guarantees. Messages may be delivered multiple times in case of failures, but no messages are lost.

258

259

### Automatic Retries

260

261

The sink automatically retries failed publish operations according to the Google Cloud Pub/Sub client's default retry policy:

262

- Maximum attempts: Based on gRPC client configuration

263

- Exponential backoff with jitter

264

- Configurable through publisher settings

265

266

### Checkpoint Integration

267

268

The sink integrates with Flink's checkpointing mechanism:

269

- All outstanding publish requests are completed before checkpoint completion

270

- Failed publish operations will fail the checkpoint

271

- Provides durability guarantees in combination with Flink's state management

272

273

### Back-pressure Handling

274

275

The sink handles back-pressure scenarios:

276

- Blocks on publish when Pub/Sub service is unavailable

277

- Respects Flink's back-pressure mechanisms

278

- Provides flow control to prevent memory issues

279

280

## Performance Considerations

281

282

### Batching

283

284

The underlying Google Cloud Pub/Sub publisher automatically batches messages for optimal throughput while respecting latency requirements.

285

286

### Publisher Settings

287

288

For high-throughput scenarios, consider tuning the publisher settings by implementing a custom sink based on the PubSubSink pattern with explicit publisher configuration.

289

290

### Resource Management

291

292

- Each sink subtask creates its own Publisher instance

293

- Publishers are properly shut down during sink lifecycle management

294

- gRPC channels are managed automatically by the Google Cloud client library

295

296

## Important Notes

297

298

- **Message Ordering**: Pub/Sub does not guarantee message ordering by default. Use message ordering keys if ordering is required.

299

300

- **Message Size Limits**: Pub/Sub has a maximum message size limit (10 MB). Ensure your serialized messages are within this limit.

301

302

- **Topic Creation**: Topics must exist before publishing. The sink does not create topics automatically.

303

304

- **Authentication**: Uses Google Cloud default authentication if no explicit credentials are provided. Ensure proper service account configuration in production environments.