0
# Apache Flink GCP Pub/Sub Connector
1
2
The Apache Flink GCP Pub/Sub Connector provides streaming data integration between Apache Flink applications and Google Cloud Pub/Sub messaging service. It includes both source and sink implementations with exactly-once and at-least-once processing guarantees, authentication support, and comprehensive error handling.
3
4
## Package Information
5
6
- **Package Name**: flink-connector-gcp-pubsub_2.11
7
- **Package Type**: maven
8
- **Language**: Java
9
- **Group ID**: org.apache.flink
10
- **Artifact ID**: flink-connector-gcp-pubsub_2.11
11
- **Installation**: Add dependency to your `pom.xml`:
12
13
```xml
14
<dependency>
15
<groupId>org.apache.flink</groupId>
16
<artifactId>flink-connector-gcp-pubsub_2.11</artifactId>
17
<version>1.14.6</version>
18
</dependency>
19
```
20
21
## Core Imports
22
23
```java
24
import org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSource;
25
import org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSink;
26
import org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubDeserializationSchema;
27
import org.apache.flink.api.common.serialization.DeserializationSchema;
28
import org.apache.flink.api.common.serialization.SerializationSchema;
29
import com.google.auth.Credentials;
30
```
31
32
## Basic Usage
33
34
### Source (Consumer) Example
35
36
```java
37
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
38
import org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSource;
39
import org.apache.flink.api.common.serialization.SimpleStringSchema;
40
41
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
42
43
// Enable checkpointing (required for exactly-once processing)
44
env.enableCheckpointing(30000);
45
46
PubSubSource<String> source = PubSubSource.newBuilder()
47
.withDeserializationSchema(new SimpleStringSchema())
48
.withProjectName("my-gcp-project")
49
.withSubscriptionName("my-subscription")
50
.build();
51
52
env.addSource(source)
53
.print();
54
55
env.execute("Pub/Sub Consumer");
56
```
57
58
### Sink (Producer) Example
59
60
```java
61
import org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSink;
62
import org.apache.flink.api.common.serialization.SimpleStringSchema;
63
64
PubSubSink<String> sink = PubSubSink.newBuilder()
65
.withSerializationSchema(new SimpleStringSchema())
66
.withProjectName("my-gcp-project")
67
.withTopicName("my-topic")
68
.build();
69
70
env.fromElements("Hello", "World", "Pub/Sub")
71
.addSink(sink);
72
73
env.execute("Pub/Sub Producer");
74
```
75
76
## Architecture
77
78
The connector is built around several key components:
79
80
- **PubSubSource**: Source function implementing exactly-once processing through Flink's checkpointing mechanism
81
- **PubSubSink**: Sink function providing at-least-once delivery guarantees with automatic retries
82
- **Deserialization Schema**: Pluggable schema system supporting both standard Flink serialization and Pub/Sub-specific deserialization with metadata access
83
- **Subscriber Factory**: Configurable factory pattern for customizing connection parameters, timeouts, and retry policies
84
- **Authentication**: Google Cloud credentials integration with automatic credential discovery and manual credential configuration
85
- **Emulator Support**: Built-in support for local Pub/Sub emulator for testing scenarios
86
87
## Capabilities
88
89
### Message Consumption (Source)
90
91
High-performance message consumption from Pub/Sub subscriptions with exactly-once processing guarantees through Flink's checkpointing mechanism. Supports rate limiting, custom deserialization, and flexible subscriber configuration.
92
93
```java { .api }
94
public static DeserializationSchemaBuilder newBuilder();
95
96
public static class PubSubSourceBuilder<OUT> {
97
public PubSubSourceBuilder<OUT> withCredentials(Credentials credentials);
98
public PubSubSourceBuilder<OUT> withPubSubSubscriberFactory(PubSubSubscriberFactory factory);
99
public PubSubSourceBuilder<OUT> withPubSubSubscriberFactory(int maxMessagesPerPull, Duration timeout, int retries);
100
public PubSubSourceBuilder<OUT> withMessageRateLimit(int messagePerSecondRateLimit);
101
public PubSubSource<OUT> build() throws IOException;
102
}
103
```
104
105
[Message Consumption](./source.md)
106
107
### Message Publishing (Sink)
108
109
Reliable message publishing to Pub/Sub topics with at-least-once delivery semantics, automatic batching, and configurable retry policies. Includes emulator support for testing scenarios.
110
111
```java { .api }
112
public static SerializationSchemaBuilder newBuilder();
113
114
public static class PubSubSinkBuilder<IN> {
115
public PubSubSinkBuilder<IN> withCredentials(Credentials credentials);
116
public PubSubSinkBuilder<IN> withHostAndPortForEmulator(String hostAndPort);
117
public PubSubSink<IN> build() throws IOException;
118
}
119
```
120
121
[Message Publishing](./sink.md)
122
123
### Custom Deserialization
124
125
Advanced deserialization system providing access to Pub/Sub message metadata including attributes, message ID, and publish time. Essential for applications requiring message metadata or custom deserialization logic.
126
127
```java { .api }
128
public interface PubSubDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {
129
void open(InitializationContext context) throws Exception;
130
boolean isEndOfStream(T nextElement);
131
T deserialize(PubsubMessage message) throws Exception;
132
void deserialize(PubsubMessage message, Collector<T> out) throws Exception;
133
TypeInformation<T> getProducedType();
134
}
135
```
136
137
[Custom Deserialization](./deserialization.md)
138
139
## Types
140
141
```java { .api }
142
// Core source class
143
public class PubSubSource<OUT> extends RichSourceFunction<OUT>
144
implements ResultTypeQueryable<OUT>, ParallelSourceFunction<OUT>,
145
CheckpointListener, ListCheckpointed<AcknowledgeIdsForCheckpoint<String>>
146
147
// Core sink class
148
public class PubSubSink<IN> extends RichSinkFunction<IN> implements CheckpointedFunction
149
150
// Builder interfaces for source
151
public interface ProjectNameBuilder<OUT> {
152
SubscriptionNameBuilder<OUT> withProjectName(String projectName);
153
}
154
155
public interface SubscriptionNameBuilder<OUT> {
156
PubSubSourceBuilder<OUT> withSubscriptionName(String subscriptionName);
157
}
158
159
// Builder interfaces for sink
160
public interface ProjectNameBuilder<IN> {
161
TopicNameBuilder<IN> withProjectName(String projectName);
162
}
163
164
public interface TopicNameBuilder<IN> {
165
PubSubSinkBuilder<IN> withTopicName(String topicName);
166
}
167
168
// Subscriber factory interface
169
public interface PubSubSubscriberFactory extends Serializable {
170
PubSubSubscriber getSubscriber(Credentials credentials) throws IOException;
171
}
172
173
// Subscriber interface
174
public interface PubSubSubscriber extends Acknowledger<String> {
175
List<ReceivedMessage> pull();
176
void close() throws Exception;
177
}
178
179
// Acknowledger interface
180
public interface Acknowledger<AcknowledgeId> {
181
void acknowledge(List<AcknowledgeId> ids);
182
}
183
184
// Emulator subscriber factory
185
public class PubSubSubscriberFactoryForEmulator implements PubSubSubscriberFactory {
186
public PubSubSubscriberFactoryForEmulator(String hostAndPort, String project, String subscription,
187
int retries, Duration timeout, int maxMessagesPerPull);
188
}
189
```