or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

emulator-testing.mdindex.mdpubsub-sink.mdpubsub-source.md

pubsub-sink.mddocs/

0

# PubSub Sink

1

2

The PubSubSink provides reliable message publishing to Google Cloud Pub/Sub topics with at-least-once delivery guarantees and checkpoint synchronization to ensure message delivery before checkpoint completion.

3

4

## Capabilities

5

6

### Sink Creation

7

8

Create a PubSubSink using the builder pattern with required project name, topic name, and serialization schema.

9

10

```java { .api }

11

/**

12

* Creates a new builder for PubSubSink configuration

13

* @return SerializationSchemaBuilder instance to start configuration

14

*/

15

public static SerializationSchemaBuilder newBuilder();

16

17

public static class SerializationSchemaBuilder {

18

/**

19

* Set serialization schema for converting objects to PubSub message payloads

20

* @param serializationSchema Schema for serializing objects to byte arrays

21

* @return ProjectNameBuilder for next configuration step

22

*/

23

public <IN> ProjectNameBuilder<IN> withSerializationSchema(SerializationSchema<IN> serializationSchema);

24

}

25

```

26

27

### Builder Configuration

28

29

Configure project name, topic name, and optional parameters.

30

31

```java { .api }

32

public interface ProjectNameBuilder<IN> {

33

/**

34

* Set the GCP project name containing the topic

35

* @param projectName Google Cloud project name

36

* @return TopicNameBuilder for next configuration step

37

*/

38

TopicNameBuilder<IN> withProjectName(String projectName);

39

}

40

41

public interface TopicNameBuilder<IN> {

42

/**

43

* Set the Pub/Sub topic name to publish to

44

* @param topicName Pub/Sub topic name

45

* @return PubSubSinkBuilder for optional configuration

46

*/

47

PubSubSinkBuilder<IN> withTopicName(String topicName);

48

}

49

50

public static class PubSubSinkBuilder<IN> {

51

/**

52

* Set custom GCP credentials (optional, defaults to environment credentials)

53

* @param credentials Google Cloud credentials

54

* @return Current builder instance

55

*/

56

public PubSubSinkBuilder<IN> withCredentials(Credentials credentials);

57

58

/**

59

* Set emulator host and port for testing (optional, for emulator use only)

60

* @param hostAndPort Host and port combination (e.g., "localhost:8085")

61

* @return Current builder instance

62

*/

63

public PubSubSinkBuilder<IN> withHostAndPortForEmulator(String hostAndPort);

64

65

/**

66

* Build the configured PubSubSink instance

67

* @return Configured PubSubSink ready for use

68

* @throws IOException If credentials cannot be obtained

69

* @throws IllegalArgumentException If required fields are missing or topic does not exist

70

*/

71

public PubSubSink<IN> build() throws IOException;

72

}

73

```

74

75

## Usage Examples

76

77

### Basic String Message Publisher

78

79

```java

80

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

81

import org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSink;

82

import org.apache.flink.api.common.serialization.SimpleStringSchema;

83

84

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

85

86

// Create input stream

87

DataStream<String> inputStream = env.fromElements(

88

"Hello PubSub",

89

"Message 1",

90

"Message 2"

91

);

92

93

// Create and configure sink

94

PubSubSink<String> sink = PubSubSink.newBuilder()

95

.withSerializationSchema(new SimpleStringSchema())

96

.withProjectName("my-gcp-project")

97

.withTopicName("my-topic")

98

.build();

99

100

// Add sink to stream

101

inputStream.addSink(sink);

102

103

env.execute("Basic PubSub Producer");

104

```

105

106

### JSON Object Publisher

107

108

```java

109

import org.apache.flink.api.common.serialization.SerializationSchema;

110

import com.fasterxml.jackson.databind.ObjectMapper;

111

112

public class JsonUserSerializer implements SerializationSchema<User> {

113

private transient ObjectMapper objectMapper;

114

115

@Override

116

public void open(SerializationSchema.InitializationContext context) {

117

objectMapper = new ObjectMapper();

118

}

119

120

@Override

121

public byte[] serialize(User user) {

122

try {

123

return objectMapper.writeValueAsBytes(user);

124

} catch (Exception e) {

125

throw new RuntimeException("Failed to serialize user", e);

126

}

127

}

128

}

129

130

// Usage

131

DataStream<User> userStream = // ... create user stream

132

133

PubSubSink<User> userSink = PubSubSink.newBuilder()

134

.withSerializationSchema(new JsonUserSerializer())

135

.withProjectName("my-project")

136

.withTopicName("user-events")

137

.build();

138

139

userStream.addSink(userSink);

140

```

141

142

### Publisher with Custom Credentials

143

144

```java

145

import com.google.auth.oauth2.ServiceAccountCredentials;

146

import java.io.FileInputStream;

147

148

Credentials credentials = ServiceAccountCredentials.fromStream(

149

new FileInputStream("path/to/service-account-key.json")

150

);

151

152

PubSubSink<String> sink = PubSubSink.newBuilder()

153

.withSerializationSchema(new SimpleStringSchema())

154

.withProjectName("my-project")

155

.withTopicName("my-topic")

156

.withCredentials(credentials)

157

.build();

158

```

159

160

### Streaming Data Pipeline

161

162

```java

163

import org.apache.flink.streaming.api.datastream.DataStream;

164

165

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

166

env.enableCheckpointing(30000);

167

168

// Source: Read from one PubSub subscription

169

PubSubSource<String> source = PubSubSource.newBuilder()

170

.withDeserializationSchema(new SimpleStringSchema())

171

.withProjectName("my-project")

172

.withSubscriptionName("input-subscription")

173

.build();

174

175

// Transform: Process messages

176

DataStream<String> processedStream = env.addSource(source)

177

.map(message -> "Processed: " + message.toUpperCase())

178

.filter(message -> message.length() > 10);

179

180

// Sink: Publish to another PubSub topic

181

PubSubSink<String> sink = PubSubSink.newBuilder()

182

.withSerializationSchema(new SimpleStringSchema())

183

.withProjectName("my-project")

184

.withTopicName("output-topic")

185

.build();

186

187

processedStream.addSink(sink);

188

189

env.execute("PubSub Processing Pipeline");

190

```

191

192

## Message Publishing Behavior

193

194

### Delivery Guarantees

195

196

The PubSubSink provides **at-least-once** delivery guarantees:

197

198

1. **Asynchronous Publishing**: Messages are published asynchronously to PubSub

199

2. **Checkpoint Synchronization**: Before checkpoint completion, all outstanding publish requests must complete successfully

200

3. **Failure Handling**: If any publish operation fails, the checkpoint fails and the job restarts

201

4. **Retry Logic**: Built-in retry mechanisms handle transient failures

202

203

### Checkpoint Integration

204

205

The sink integrates with Flink's checkpointing mechanism to ensure reliable delivery:

206

207

```java

208

@Override

209

public void snapshotState(FunctionSnapshotContext context) throws Exception {

210

// Flush all buffered messages

211

publisher.publishAllOutstanding();

212

213

// Wait for all pending publish operations to complete

214

waitForFuturesToComplete();

215

216

// If any publish operation failed, throw exception to fail checkpoint

217

if (exceptionAtomicReference.get() != null) {

218

throw exceptionAtomicReference.get();

219

}

220

}

221

```

222

223

### Message Format

224

225

Published messages contain:

226

- **Data**: Serialized message payload as bytes

227

- **Message ID**: Unique identifier assigned by PubSub

228

- **Publish Time**: Timestamp when message was published

229

- **Attributes**: Empty (custom attributes not currently supported)

230

231

## Error Handling

232

233

### Publishing Errors

234

235

- **Serialization Errors**: Exceptions during serialization cause immediate job failure

236

- **Network Errors**: Handled by Google Cloud client library with automatic retry

237

- **Authentication Errors**: Cause job failure with clear error messages

238

- **Topic Not Found**: Causes job failure - topic must exist before starting job

239

240

### Retry Configuration

241

242

The sink uses Google Cloud Pub/Sub client's default retry settings:

243

- **Maximum Attempts**: Configurable through client library

244

- **Exponential Backoff**: Automatic delay increases between retries

245

- **Total Timeout**: Maximum time to spend on retries

246

247

### Monitoring and Metrics

248

249

- **Pending Futures**: Number of outstanding publish operations

250

- **Publish Failures**: Count of failed publish attempts

251

- **Throughput**: Messages published per second

252

253

## Performance Considerations

254

255

### Batching

256

257

The Google Cloud Pub/Sub client automatically batches messages for efficiency:

258

- **Batch Size**: Multiple messages sent in single request

259

- **Batch Delay**: Maximum time to wait before sending partial batch

260

- **Memory Usage**: Batched messages consume memory until published

261

262

### Parallelism

263

264

- **Parallel Sinks**: Each parallel subtask creates its own publisher

265

- **Independent Publishing**: Subtasks publish independently without coordination

266

- **Scaling**: Increase parallelism to improve throughput

267

268

### Resource Management

269

270

- **Connection Pooling**: Managed by Google Cloud client library

271

- **Memory Management**: Outstanding publish requests consume memory

272

- **CPU Usage**: Serialization and network I/O are CPU-intensive operations