0
# Emulator Testing
1
2
The Apache Flink GCP Pub/Sub connector provides built-in support for the Google Cloud Pub/Sub emulator, enabling local development and testing without requiring actual Google Cloud infrastructure.
3
4
## Capabilities
5
6
### Emulator Credentials
7
8
Special credentials implementation for emulator scenarios that bypasses actual authentication.
9
10
```java { .api }
11
/**
12
* Placeholder credentials for emulator testing scenarios
13
* Extends OAuth2Credentials but provides dummy authentication
14
*/
15
public final class EmulatorCredentials extends OAuth2Credentials {
16
/**
17
* Get singleton instance of emulator credentials
18
* @return EmulatorCredentials instance
19
*/
20
public static EmulatorCredentials getInstance();
21
22
/**
23
* Returns dummy access token for emulator authentication
24
* @return AccessToken with dummy value and far-future expiration
25
* @throws IOException Never thrown in emulator implementation
26
*/
27
@Override
28
public AccessToken refreshAccessToken() throws IOException;
29
}
30
```
31
32
### Emulator Credentials Provider
33
34
CredentialsProvider implementation that supplies EmulatorCredentials for Google Cloud client libraries.
35
36
```java { .api }
37
/**
38
* CredentialsProvider for emulator scenarios
39
* Implements Google Cloud's CredentialsProvider interface
40
*/
41
public final class EmulatorCredentialsProvider implements CredentialsProvider {
42
/**
43
* Create new EmulatorCredentialsProvider instance
44
* @return New EmulatorCredentialsProvider
45
*/
46
public static EmulatorCredentialsProvider create();
47
48
/**
49
* Get emulator credentials instance
50
* @return EmulatorCredentials for emulator authentication
51
*/
52
@Override
53
public Credentials getCredentials();
54
}
55
```
56
57
### Emulator Subscriber Factory
58
59
Specialized subscriber factory for connecting to the Pub/Sub emulator with plain-text communication.
60
61
```java { .api }
62
/**
63
* PubSubSubscriberFactory for connecting to Pub/Sub emulator
64
* Configures plain-text communication without SSL/TLS
65
*/
66
public class PubSubSubscriberFactoryForEmulator implements PubSubSubscriberFactory {
67
/**
68
* Create emulator subscriber factory
69
* @param hostAndPort Emulator host and port (e.g., "localhost:8085")
70
* @param project GCP project name (can be any value for emulator)
71
* @param subscription Subscription name
72
* @param retries Number of retries for failed requests
73
* @param timeout Timeout for pull requests
74
* @param maxMessagesPerPull Maximum messages per pull request
75
*/
76
public PubSubSubscriberFactoryForEmulator(
77
String hostAndPort,
78
String project,
79
String subscription,
80
int retries,
81
Duration timeout,
82
int maxMessagesPerPull
83
);
84
85
/**
86
* Create subscriber configured for emulator connection
87
* @param credentials Ignored for emulator (uses plain-text)
88
* @return PubSubSubscriber configured for emulator
89
*/
90
@Override
91
public PubSubSubscriber getSubscriber(Credentials credentials) throws IOException;
92
}
93
```
94
95
## Setting Up Pub/Sub Emulator
96
97
### Installation
98
99
Install the Google Cloud SDK and Pub/Sub emulator:
100
101
```bash
102
# Install Google Cloud SDK
103
curl https://sdk.cloud.google.com | bash
104
105
# Install Pub/Sub emulator component
106
gcloud components install pubsub-emulator
107
108
# Start emulator on localhost:8085
109
gcloud beta emulators pubsub start --host-port=localhost:8085
110
```
111
112
### Environment Setup
113
114
Set environment variables to point to emulator:
115
116
```bash
117
export PUBSUB_EMULATOR_HOST=localhost:8085
118
export PUBSUB_PROJECT_ID=test-project
119
```
120
121
### Create Topics and Subscriptions
122
123
```bash
124
# Create topic
125
gcloud pubsub topics create test-topic --project=test-project
126
127
# Create subscription
128
gcloud pubsub subscriptions create test-subscription \
129
--topic=test-topic \
130
--project=test-project
131
```
132
133
## Usage Examples
134
135
### Source with Emulator
136
137
```java
138
import org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSource;
139
import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.PubSubSubscriberFactoryForEmulator;
140
import org.apache.flink.api.common.serialization.SimpleStringSchema;
141
import java.time.Duration;
142
143
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
144
env.enableCheckpointing(10000);
145
146
// Create emulator subscriber factory
147
PubSubSubscriberFactoryForEmulator emulatorFactory =
148
new PubSubSubscriberFactoryForEmulator(
149
"localhost:8085", // emulator host:port
150
"test-project", // project (any value for emulator)
151
"test-subscription", // subscription name
152
3, // retries
153
Duration.ofSeconds(15), // timeout
154
100 // max messages per pull
155
);
156
157
// Create source with emulator factory
158
PubSubSource<String> source = PubSubSource.newBuilder()
159
.withDeserializationSchema(new SimpleStringSchema())
160
.withProjectName("test-project")
161
.withSubscriptionName("test-subscription")
162
.withPubSubSubscriberFactory(emulatorFactory)
163
.build();
164
165
DataStream<String> stream = env.addSource(source);
166
stream.print();
167
168
env.execute("Emulator Source Test");
169
```
170
171
### Sink with Emulator
172
173
```java
174
import org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSink;
175
import org.apache.flink.api.common.serialization.SimpleStringSchema;
176
177
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
178
179
// Create test data
180
DataStream<String> inputStream = env.fromElements(
181
"Test message 1",
182
"Test message 2",
183
"Test message 3"
184
);
185
186
// Create sink with emulator configuration
187
PubSubSink<String> sink = PubSubSink.newBuilder()
188
.withSerializationSchema(new SimpleStringSchema())
189
.withProjectName("test-project")
190
.withTopicName("test-topic")
191
.withHostAndPortForEmulator("localhost:8085")
192
.build();
193
194
inputStream.addSink(sink);
195
196
env.execute("Emulator Sink Test");
197
```
198
199
### Complete Test Pipeline
200
201
```java
202
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
203
import org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSource;
204
import org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSink;
205
import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.PubSubSubscriberFactoryForEmulator;
206
import java.time.Duration;
207
208
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
209
env.enableCheckpointing(5000);
210
211
// Configure emulator source
212
PubSubSubscriberFactoryForEmulator sourceFactory =
213
new PubSubSubscriberFactoryForEmulator(
214
"localhost:8085", "test-project", "input-subscription",
215
3, Duration.ofSeconds(10), 50
216
);
217
218
PubSubSource<String> source = PubSubSource.newBuilder()
219
.withDeserializationSchema(new SimpleStringSchema())
220
.withProjectName("test-project")
221
.withSubscriptionName("input-subscription")
222
.withPubSubSubscriberFactory(sourceFactory)
223
.build();
224
225
// Configure emulator sink
226
PubSubSink<String> sink = PubSubSink.newBuilder()
227
.withSerializationSchema(new SimpleStringSchema())
228
.withProjectName("test-project")
229
.withTopicName("output-topic")
230
.withHostAndPortForEmulator("localhost:8085")
231
.build();
232
233
// Create processing pipeline
234
env.addSource(source)
235
.map(msg -> "Processed: " + msg.toUpperCase())
236
.addSink(sink);
237
238
env.execute("Emulator Test Pipeline");
239
```
240
241
## Testing Best Practices
242
243
### Test Environment Setup
244
245
1. **Isolated Emulator**: Start fresh emulator instance for each test
246
2. **Clean State**: Clear topics and subscriptions between tests
247
3. **Port Management**: Use different ports for parallel test execution
248
4. **Resource Cleanup**: Properly shutdown emulator after tests
249
250
### Integration Testing
251
252
```java
253
import org.junit.jupiter.api.BeforeEach;
254
import org.junit.jupiter.api.AfterEach;
255
import org.junit.jupiter.api.Test;
256
257
public class PubSubConnectorIntegrationTest {
258
private static final String EMULATOR_HOST = "localhost:8085";
259
private static final String TEST_PROJECT = "test-project";
260
261
@BeforeEach
262
void setupEmulator() throws Exception {
263
// Start emulator programmatically
264
// Create test topics and subscriptions
265
}
266
267
@AfterEach
268
void cleanupEmulator() throws Exception {
269
// Stop emulator
270
// Clean up resources
271
}
272
273
@Test
274
void testSourceSinkIntegration() throws Exception {
275
// Create Flink job with emulator configuration
276
// Publish test messages
277
// Verify message consumption and processing
278
}
279
}
280
```
281
282
### Docker Testing
283
284
```dockerfile
285
# Dockerfile for emulator testing
286
FROM google/cloud-sdk:alpine
287
288
# Install Pub/Sub emulator
289
RUN gcloud components install pubsub-emulator
290
291
# Expose emulator port
292
EXPOSE 8085
293
294
# Start emulator
295
CMD ["gcloud", "beta", "emulators", "pubsub", "start", "--host-port=0.0.0.0:8085"]
296
```
297
298
```yaml
299
# docker-compose.yml for test environment
300
version: '3.8'
301
services:
302
pubsub-emulator:
303
build: .
304
ports:
305
- "8085:8085"
306
environment:
307
- PUBSUB_PROJECT_ID=test-project
308
```
309
310
## Emulator Limitations
311
312
### Feature Differences
313
314
- **Authentication**: No actual authentication required
315
- **IAM**: Access control not enforced
316
- **Monitoring**: Limited metrics and monitoring
317
- **Persistence**: Messages not persisted across emulator restarts
318
- **Performance**: Different performance characteristics than production
319
320
### Configuration Differences
321
322
- **Network**: Plain-text HTTP instead of HTTPS
323
- **Credentials**: Dummy credentials instead of service account keys
324
- **Endpoints**: Local endpoints instead of Google Cloud endpoints
325
- **Retries**: Simplified retry behavior
326
327
### Best Practices
328
329
1. **Production Parity**: Keep emulator configuration as close to production as possible
330
2. **Error Scenarios**: Test failure scenarios that emulator may not simulate
331
3. **Performance Testing**: Use actual Pub/Sub for performance benchmarks
332
4. **Security Testing**: Verify authentication works with real credentials
333
5. **End-to-End Testing**: Include tests against actual Google Cloud Pub/Sub