0
# PubSub Sink
1
2
The PubSubSink provides reliable message publishing to Google Cloud Pub/Sub topics with at-least-once delivery guarantees and checkpoint synchronization to ensure message delivery before checkpoint completion.
3
4
## Capabilities
5
6
### Sink Creation
7
8
Create a PubSubSink using the builder pattern with required project name, topic name, and serialization schema.
9
10
```java { .api }
11
/**
12
* Creates a new builder for PubSubSink configuration
13
* @return SerializationSchemaBuilder instance to start configuration
14
*/
15
public static SerializationSchemaBuilder newBuilder();
16
17
public static class SerializationSchemaBuilder {
18
/**
19
* Set serialization schema for converting objects to PubSub message payloads
20
* @param serializationSchema Schema for serializing objects to byte arrays
21
* @return ProjectNameBuilder for next configuration step
22
*/
23
public <IN> ProjectNameBuilder<IN> withSerializationSchema(SerializationSchema<IN> serializationSchema);
24
}
25
```
26
27
### Builder Configuration
28
29
Configure project name, topic name, and optional parameters.
30
31
```java { .api }
32
public interface ProjectNameBuilder<IN> {
33
/**
34
* Set the GCP project name containing the topic
35
* @param projectName Google Cloud project name
36
* @return TopicNameBuilder for next configuration step
37
*/
38
TopicNameBuilder<IN> withProjectName(String projectName);
39
}
40
41
public interface TopicNameBuilder<IN> {
42
/**
43
* Set the Pub/Sub topic name to publish to
44
* @param topicName Pub/Sub topic name
45
* @return PubSubSinkBuilder for optional configuration
46
*/
47
PubSubSinkBuilder<IN> withTopicName(String topicName);
48
}
49
50
public static class PubSubSinkBuilder<IN> {
51
/**
52
* Set custom GCP credentials (optional, defaults to environment credentials)
53
* @param credentials Google Cloud credentials
54
* @return Current builder instance
55
*/
56
public PubSubSinkBuilder<IN> withCredentials(Credentials credentials);
57
58
/**
59
* Set emulator host and port for testing (optional, for emulator use only)
60
* @param hostAndPort Host and port combination (e.g., "localhost:8085")
61
* @return Current builder instance
62
*/
63
public PubSubSinkBuilder<IN> withHostAndPortForEmulator(String hostAndPort);
64
65
/**
66
* Build the configured PubSubSink instance
67
* @return Configured PubSubSink ready for use
68
* @throws IOException If credentials cannot be obtained
69
* @throws IllegalArgumentException If required fields are missing or topic does not exist
70
*/
71
public PubSubSink<IN> build() throws IOException;
72
}
73
```
74
75
## Usage Examples
76
77
### Basic String Message Publisher
78
79
```java
80
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
81
import org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSink;
82
import org.apache.flink.api.common.serialization.SimpleStringSchema;
83
84
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
85
86
// Create input stream
87
DataStream<String> inputStream = env.fromElements(
88
"Hello PubSub",
89
"Message 1",
90
"Message 2"
91
);
92
93
// Create and configure sink
94
PubSubSink<String> sink = PubSubSink.newBuilder()
95
.withSerializationSchema(new SimpleStringSchema())
96
.withProjectName("my-gcp-project")
97
.withTopicName("my-topic")
98
.build();
99
100
// Add sink to stream
101
inputStream.addSink(sink);
102
103
env.execute("Basic PubSub Producer");
104
```
105
106
### JSON Object Publisher
107
108
```java
109
import org.apache.flink.api.common.serialization.SerializationSchema;
110
import com.fasterxml.jackson.databind.ObjectMapper;
111
112
public class JsonUserSerializer implements SerializationSchema<User> {
113
private transient ObjectMapper objectMapper;
114
115
@Override
116
public void open(SerializationSchema.InitializationContext context) {
117
objectMapper = new ObjectMapper();
118
}
119
120
@Override
121
public byte[] serialize(User user) {
122
try {
123
return objectMapper.writeValueAsBytes(user);
124
} catch (Exception e) {
125
throw new RuntimeException("Failed to serialize user", e);
126
}
127
}
128
}
129
130
// Usage
131
DataStream<User> userStream = // ... create user stream
132
133
PubSubSink<User> userSink = PubSubSink.newBuilder()
134
.withSerializationSchema(new JsonUserSerializer())
135
.withProjectName("my-project")
136
.withTopicName("user-events")
137
.build();
138
139
userStream.addSink(userSink);
140
```
141
142
### Publisher with Custom Credentials
143
144
```java
145
import com.google.auth.oauth2.ServiceAccountCredentials;
146
import java.io.FileInputStream;
147
148
Credentials credentials = ServiceAccountCredentials.fromStream(
149
new FileInputStream("path/to/service-account-key.json")
150
);
151
152
PubSubSink<String> sink = PubSubSink.newBuilder()
153
.withSerializationSchema(new SimpleStringSchema())
154
.withProjectName("my-project")
155
.withTopicName("my-topic")
156
.withCredentials(credentials)
157
.build();
158
```
159
160
### Streaming Data Pipeline
161
162
```java
163
import org.apache.flink.streaming.api.datastream.DataStream;
164
165
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
166
env.enableCheckpointing(30000);
167
168
// Source: Read from one PubSub subscription
169
PubSubSource<String> source = PubSubSource.newBuilder()
170
.withDeserializationSchema(new SimpleStringSchema())
171
.withProjectName("my-project")
172
.withSubscriptionName("input-subscription")
173
.build();
174
175
// Transform: Process messages
176
DataStream<String> processedStream = env.addSource(source)
177
.map(message -> "Processed: " + message.toUpperCase())
178
.filter(message -> message.length() > 10);
179
180
// Sink: Publish to another PubSub topic
181
PubSubSink<String> sink = PubSubSink.newBuilder()
182
.withSerializationSchema(new SimpleStringSchema())
183
.withProjectName("my-project")
184
.withTopicName("output-topic")
185
.build();
186
187
processedStream.addSink(sink);
188
189
env.execute("PubSub Processing Pipeline");
190
```
191
192
## Message Publishing Behavior
193
194
### Delivery Guarantees
195
196
The PubSubSink provides **at-least-once** delivery guarantees:
197
198
1. **Asynchronous Publishing**: Messages are published asynchronously to PubSub
199
2. **Checkpoint Synchronization**: Before checkpoint completion, all outstanding publish requests must complete successfully
200
3. **Failure Handling**: If any publish operation fails, the checkpoint fails and the job restarts
201
4. **Retry Logic**: Built-in retry mechanisms handle transient failures
202
203
### Checkpoint Integration
204
205
The sink integrates with Flink's checkpointing mechanism to ensure reliable delivery:
206
207
```java
208
@Override
209
public void snapshotState(FunctionSnapshotContext context) throws Exception {
210
// Flush all buffered messages
211
publisher.publishAllOutstanding();
212
213
// Wait for all pending publish operations to complete
214
waitForFuturesToComplete();
215
216
// If any publish operation failed, throw exception to fail checkpoint
217
if (exceptionAtomicReference.get() != null) {
218
throw exceptionAtomicReference.get();
219
}
220
}
221
```
222
223
### Message Format
224
225
Published messages contain:
226
- **Data**: Serialized message payload as bytes
227
- **Message ID**: Unique identifier assigned by PubSub
228
- **Publish Time**: Timestamp when message was published
229
- **Attributes**: Empty (custom attributes not currently supported)
230
231
## Error Handling
232
233
### Publishing Errors
234
235
- **Serialization Errors**: Exceptions during serialization cause immediate job failure
236
- **Network Errors**: Handled by Google Cloud client library with automatic retry
237
- **Authentication Errors**: Cause job failure with clear error messages
238
- **Topic Not Found**: Causes job failure - topic must exist before starting job
239
240
### Retry Configuration
241
242
The sink uses Google Cloud Pub/Sub client's default retry settings:
243
- **Maximum Attempts**: Configurable through client library
244
- **Exponential Backoff**: Automatic delay increases between retries
245
- **Total Timeout**: Maximum time to spend on retries
246
247
### Monitoring and Metrics
248
249
- **Pending Futures**: Number of outstanding publish operations
250
- **Publish Failures**: Count of failed publish attempts
251
- **Throughput**: Messages published per second
252
253
## Performance Considerations
254
255
### Batching
256
257
The Google Cloud Pub/Sub client automatically batches messages for efficiency:
258
- **Batch Size**: Multiple messages sent in single request
259
- **Batch Delay**: Maximum time to wait before sending partial batch
260
- **Memory Usage**: Batched messages consume memory until published
261
262
### Parallelism
263
264
- **Parallel Sinks**: Each parallel subtask creates its own publisher
265
- **Independent Publishing**: Subtasks publish independently without coordination
266
- **Scaling**: Increase parallelism to improve throughput
267
268
### Resource Management
269
270
- **Connection Pooling**: Managed by Google Cloud client library
271
- **Memory Management**: Outstanding publish requests consume memory
272
- **CPU Usage**: Serialization and network I/O are CPU-intensive operations