Apache Flink connector for Google Cloud Pub/Sub with exactly-once processing guarantees for real-time stream processing applications
npx @tessl/cli install tessl/maven-org-apache-flink--flink-connector-gcp-pubsub-2-12@1.14.00
# Apache Flink GCP Pub/Sub Connector
1
2
Apache Flink connector for Google Cloud Pub/Sub enables consuming messages from and publishing messages to Google Pub/Sub topics with exactly-once processing guarantees. The connector provides both source and sink capabilities for real-time stream processing applications with automatic acknowledgment management through Flink's checkpointing mechanism.
3
4
## Package Information
5
6
- **Package Name**: flink-connector-gcp-pubsub_2.12
7
- **Package Type**: Maven
8
- **Group ID**: org.apache.flink
9
- **Language**: Java
10
- **Installation**:
11
```xml
12
<dependency>
13
<groupId>org.apache.flink</groupId>
14
<artifactId>flink-connector-gcp-pubsub_2.12</artifactId>
15
<version>1.14.6</version>
16
</dependency>
17
```
18
19
## Core Imports
20
21
```java
22
import org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSource;
23
import org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSink;
24
import org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubDeserializationSchema;
25
import org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubSubscriberFactory;
26
import org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubSubscriber;
27
import org.apache.flink.api.common.serialization.DeserializationSchema;
28
import org.apache.flink.api.common.serialization.SerializationSchema;
29
import org.apache.flink.util.Collector;
30
```
31
32
For credentials management:
33
```java
34
import com.google.auth.Credentials;
35
import com.google.cloud.pubsub.v1.SubscriptionAdminSettings;
36
```
37
38
For Google Cloud Pub/Sub types:
39
```java
40
import com.google.pubsub.v1.PubsubMessage;
41
import com.google.pubsub.v1.ReceivedMessage;
42
import java.util.List;
43
import java.time.Duration;
44
```
45
46
For emulator testing:
47
```java
48
import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.EmulatorCredentials;
49
import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.EmulatorCredentialsProvider;
50
import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.PubSubSubscriberFactoryForEmulator;
51
```
52
53
## Basic Usage
54
55
### Consuming Messages (Source)
56
57
```java
58
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
59
import org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSource;
60
import org.apache.flink.api.common.serialization.SimpleStringSchema;
61
62
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
63
64
// Enable checkpointing (required for exactly-once guarantees)
65
env.enableCheckpointing(30000); // checkpoint every 30 seconds
66
67
// Create PubSubSource
68
PubSubSource<String> pubsubSource = PubSubSource.newBuilder()
69
.withDeserializationSchema(new SimpleStringSchema())
70
.withProjectName("my-gcp-project")
71
.withSubscriptionName("my-subscription")
72
.build();
73
74
// Add source to stream
75
DataStream<String> stream = env.addSource(pubsubSource);
76
stream.print();
77
78
env.execute("PubSub Consumer");
79
```
80
81
### Publishing Messages (Sink)
82
83
```java
84
import org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSink;
85
import org.apache.flink.api.common.serialization.SimpleStringSchema;
86
87
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
88
89
// Create data stream
90
DataStream<String> inputStream = env.fromElements("Hello", "World", "PubSub");
91
92
// Create PubSubSink
93
PubSubSink<String> pubsubSink = PubSubSink.newBuilder()
94
.withSerializationSchema(new SimpleStringSchema())
95
.withProjectName("my-gcp-project")
96
.withTopicName("my-topic")
97
.build();
98
99
// Add sink to stream
100
inputStream.addSink(pubsubSink);
101
102
env.execute("PubSub Producer");
103
```
104
105
## Architecture
106
107
The Apache Flink GCP Pub/Sub connector is built around several key components:
108
109
- **PubSubSource**: Source function for consuming messages with exactly-once processing guarantees
110
- **PubSubSink**: Sink function for publishing messages with at-least-once delivery semantics
111
- **Builder Pattern**: Fluent API for configuring both sources and sinks with optional parameters
112
- **Checkpointing Integration**: Automatic message acknowledgment tied to Flink checkpoint completion
113
- **Rate Limiting**: Configurable message consumption rate limits per parallel subtask
114
- **Emulator Support**: Built-in support for Google Pub/Sub emulator for testing scenarios
115
116
The connector ensures data consistency through Flink's distributed checkpointing mechanism, where Pub/Sub messages are only acknowledged after successful checkpoint completion, preventing message loss during failure scenarios.
117
118
## Capabilities
119
120
### PubSub Message Source
121
122
Source functionality for consuming messages from Google Cloud Pub/Sub subscriptions with exactly-once processing guarantees and configurable rate limiting.
123
124
```java { .api }
125
public static DeserializationSchemaBuilder newBuilder();
126
127
public static class PubSubSourceBuilder<OUT> {
128
public PubSubSourceBuilder<OUT> withCredentials(Credentials credentials);
129
public PubSubSourceBuilder<OUT> withPubSubSubscriberFactory(PubSubSubscriberFactory factory);
130
public PubSubSourceBuilder<OUT> withPubSubSubscriberFactory(int maxMessagesPerPull, Duration perRequestTimeout, int retries);
131
public PubSubSourceBuilder<OUT> withMessageRateLimit(int messagePerSecondRateLimit);
132
public PubSubSource<OUT> build();
133
}
134
```
135
136
[PubSub Source](./pubsub-source.md)
137
138
### PubSub Message Sink
139
140
Sink functionality for publishing messages to Google Cloud Pub/Sub topics with reliable delivery and checkpoint synchronization.
141
142
```java { .api }
143
public static SerializationSchemaBuilder newBuilder();
144
145
public static class PubSubSinkBuilder<IN> {
146
public PubSubSinkBuilder<IN> withCredentials(Credentials credentials);
147
public PubSubSinkBuilder<IN> withHostAndPortForEmulator(String hostAndPort);
148
public PubSubSink<IN> build();
149
}
150
```
151
152
[PubSub Sink](./pubsub-sink.md)
153
154
### Testing Support
155
156
Emulator support for local development and testing scenarios without requiring actual Google Cloud Pub/Sub infrastructure.
157
158
```java { .api }
159
public final class EmulatorCredentials extends OAuth2Credentials {
160
public static EmulatorCredentials getInstance();
161
}
162
163
public final class EmulatorCredentialsProvider implements CredentialsProvider {
164
public static EmulatorCredentialsProvider create();
165
}
166
```
167
168
[Emulator Testing](./emulator-testing.md)
169
170
## Core Interfaces
171
172
### PubSubDeserializationSchema
173
174
Interface for custom deserialization with access to full PubSub message metadata.
175
176
```java { .api }
177
public interface PubSubDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {
178
default void open(DeserializationSchema.InitializationContext context) throws Exception;
179
boolean isEndOfStream(T nextElement);
180
T deserialize(PubsubMessage message) throws Exception;
181
default void deserialize(PubsubMessage message, Collector<T> out) throws Exception;
182
}
183
```
184
185
### PubSubSubscriberFactory
186
187
Factory interface for creating custom Pub/Sub subscribers with specialized configurations.
188
189
```java { .api }
190
public interface PubSubSubscriberFactory extends Serializable {
191
PubSubSubscriber getSubscriber(Credentials credentials) throws IOException;
192
}
193
```
194
195
### Checkpointing Support Classes
196
197
Core classes for managing checkpoint-based message acknowledgment.
198
199
```java { .api }
200
public interface Acknowledger<AcknowledgeId> {
201
void acknowledge(List<AcknowledgeId> ids);
202
}
203
204
public interface PubSubSubscriber extends Acknowledger<String> {
205
List<ReceivedMessage> pull();
206
void close() throws Exception;
207
}
208
209
public class AcknowledgeIdsForCheckpoint<AcknowledgeId> implements Serializable {
210
AcknowledgeIdsForCheckpoint(long checkpointId, List<AcknowledgeId> acknowledgeIds);
211
public long getCheckpointId();
212
public void setCheckpointId(long checkpointId);
213
public List<AcknowledgeId> getAcknowledgeIds();
214
public void setAcknowledgeIds(List<AcknowledgeId> acknowledgeIds);
215
}
216
217
public class AcknowledgeOnCheckpoint<ACKID extends Serializable>
218
implements ListCheckpointed<AcknowledgeIdsForCheckpoint<ACKID>>, CheckpointListener {
219
public AcknowledgeOnCheckpoint(Acknowledger<ACKID> acknowledger);
220
public void addAcknowledgeId(ACKID acknowledgeId);
221
public void acknowledgeIdsUpToCheckpoint(long checkpointId);
222
}
223
```
224
225
## Important Notes
226
227
- **Checkpointing Required**: PubSubSource requires Flink checkpointing to be enabled for exactly-once guarantees
228
- **Parallel Processing**: Both source and sink support parallel execution across multiple Flink subtasks
229
- **Rate Limiting**: Default rate limit is 100,000 messages per second per parallel source instance
230
- **Credentials**: If not explicitly provided, credentials are automatically loaded from the environment
231
- **Error Handling**: Failed message publishing in sink will cause job failure during checkpointing