or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

emulator-testing.mdindex.mdpubsub-sink.mdpubsub-source.md

emulator-testing.mddocs/

0

# Emulator Testing

1

2

The Apache Flink GCP Pub/Sub connector provides built-in support for the Google Cloud Pub/Sub emulator, enabling local development and testing without requiring actual Google Cloud infrastructure.

3

4

## Capabilities

5

6

### Emulator Credentials

7

8

Special credentials implementation for emulator scenarios that bypasses actual authentication.

9

10

```java { .api }

11

/**

12

* Placeholder credentials for emulator testing scenarios

13

* Extends OAuth2Credentials but provides dummy authentication

14

*/

15

public final class EmulatorCredentials extends OAuth2Credentials {

16

/**

17

* Get singleton instance of emulator credentials

18

* @return EmulatorCredentials instance

19

*/

20

public static EmulatorCredentials getInstance();

21

22

/**

23

* Returns dummy access token for emulator authentication

24

* @return AccessToken with dummy value and far-future expiration

25

* @throws IOException Never thrown in emulator implementation

26

*/

27

@Override

28

public AccessToken refreshAccessToken() throws IOException;

29

}

30

```

31

32

### Emulator Credentials Provider

33

34

CredentialsProvider implementation that supplies EmulatorCredentials for Google Cloud client libraries.

35

36

```java { .api }

37

/**

38

* CredentialsProvider for emulator scenarios

39

* Implements Google Cloud's CredentialsProvider interface

40

*/

41

public final class EmulatorCredentialsProvider implements CredentialsProvider {

42

/**

43

* Create new EmulatorCredentialsProvider instance

44

* @return New EmulatorCredentialsProvider

45

*/

46

public static EmulatorCredentialsProvider create();

47

48

/**

49

* Get emulator credentials instance

50

* @return EmulatorCredentials for emulator authentication

51

*/

52

@Override

53

public Credentials getCredentials();

54

}

55

```

56

57

### Emulator Subscriber Factory

58

59

Specialized subscriber factory for connecting to the Pub/Sub emulator with plain-text communication.

60

61

```java { .api }

62

/**

63

* PubSubSubscriberFactory for connecting to Pub/Sub emulator

64

* Configures plain-text communication without SSL/TLS

65

*/

66

public class PubSubSubscriberFactoryForEmulator implements PubSubSubscriberFactory {

67

/**

68

* Create emulator subscriber factory

69

* @param hostAndPort Emulator host and port (e.g., "localhost:8085")

70

* @param project GCP project name (can be any value for emulator)

71

* @param subscription Subscription name

72

* @param retries Number of retries for failed requests

73

* @param timeout Timeout for pull requests

74

* @param maxMessagesPerPull Maximum messages per pull request

75

*/

76

public PubSubSubscriberFactoryForEmulator(

77

String hostAndPort,

78

String project,

79

String subscription,

80

int retries,

81

Duration timeout,

82

int maxMessagesPerPull

83

);

84

85

/**

86

* Create subscriber configured for emulator connection

87

* @param credentials Ignored for emulator (uses plain-text)

88

* @return PubSubSubscriber configured for emulator

89

*/

90

@Override

91

public PubSubSubscriber getSubscriber(Credentials credentials) throws IOException;

92

}

93

```

94

95

## Setting Up Pub/Sub Emulator

96

97

### Installation

98

99

Install the Google Cloud SDK and Pub/Sub emulator:

100

101

```bash

102

# Install Google Cloud SDK

103

curl https://sdk.cloud.google.com | bash

104

105

# Install Pub/Sub emulator component

106

gcloud components install pubsub-emulator

107

108

# Start emulator on localhost:8085

109

gcloud beta emulators pubsub start --host-port=localhost:8085

110

```

111

112

### Environment Setup

113

114

Set environment variables to point to emulator:

115

116

```bash

117

export PUBSUB_EMULATOR_HOST=localhost:8085

118

export PUBSUB_PROJECT_ID=test-project

119

```

120

121

### Create Topics and Subscriptions

122

123

```bash

124

# Create topic

125

gcloud pubsub topics create test-topic --project=test-project

126

127

# Create subscription

128

gcloud pubsub subscriptions create test-subscription \

129

--topic=test-topic \

130

--project=test-project

131

```

132

133

## Usage Examples

134

135

### Source with Emulator

136

137

```java

138

import org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSource;

139

import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.PubSubSubscriberFactoryForEmulator;

140

import org.apache.flink.api.common.serialization.SimpleStringSchema;

141

import java.time.Duration;

142

143

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

144

env.enableCheckpointing(10000);

145

146

// Create emulator subscriber factory

147

PubSubSubscriberFactoryForEmulator emulatorFactory =

148

new PubSubSubscriberFactoryForEmulator(

149

"localhost:8085", // emulator host:port

150

"test-project", // project (any value for emulator)

151

"test-subscription", // subscription name

152

3, // retries

153

Duration.ofSeconds(15), // timeout

154

100 // max messages per pull

155

);

156

157

// Create source with emulator factory

158

PubSubSource<String> source = PubSubSource.newBuilder()

159

.withDeserializationSchema(new SimpleStringSchema())

160

.withProjectName("test-project")

161

.withSubscriptionName("test-subscription")

162

.withPubSubSubscriberFactory(emulatorFactory)

163

.build();

164

165

DataStream<String> stream = env.addSource(source);

166

stream.print();

167

168

env.execute("Emulator Source Test");

169

```

170

171

### Sink with Emulator

172

173

```java

174

import org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSink;

175

import org.apache.flink.api.common.serialization.SimpleStringSchema;

176

177

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

178

179

// Create test data

180

DataStream<String> inputStream = env.fromElements(

181

"Test message 1",

182

"Test message 2",

183

"Test message 3"

184

);

185

186

// Create sink with emulator configuration

187

PubSubSink<String> sink = PubSubSink.newBuilder()

188

.withSerializationSchema(new SimpleStringSchema())

189

.withProjectName("test-project")

190

.withTopicName("test-topic")

191

.withHostAndPortForEmulator("localhost:8085")

192

.build();

193

194

inputStream.addSink(sink);

195

196

env.execute("Emulator Sink Test");

197

```

198

199

### Complete Test Pipeline

200

201

```java

202

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

203

import org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSource;

204

import org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSink;

205

import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.PubSubSubscriberFactoryForEmulator;

206

import java.time.Duration;

207

208

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

209

env.enableCheckpointing(5000);

210

211

// Configure emulator source

212

PubSubSubscriberFactoryForEmulator sourceFactory =

213

new PubSubSubscriberFactoryForEmulator(

214

"localhost:8085", "test-project", "input-subscription",

215

3, Duration.ofSeconds(10), 50

216

);

217

218

PubSubSource<String> source = PubSubSource.newBuilder()

219

.withDeserializationSchema(new SimpleStringSchema())

220

.withProjectName("test-project")

221

.withSubscriptionName("input-subscription")

222

.withPubSubSubscriberFactory(sourceFactory)

223

.build();

224

225

// Configure emulator sink

226

PubSubSink<String> sink = PubSubSink.newBuilder()

227

.withSerializationSchema(new SimpleStringSchema())

228

.withProjectName("test-project")

229

.withTopicName("output-topic")

230

.withHostAndPortForEmulator("localhost:8085")

231

.build();

232

233

// Create processing pipeline

234

env.addSource(source)

235

.map(msg -> "Processed: " + msg.toUpperCase())

236

.addSink(sink);

237

238

env.execute("Emulator Test Pipeline");

239

```

240

241

## Testing Best Practices

242

243

### Test Environment Setup

244

245

1. **Isolated Emulator**: Start fresh emulator instance for each test

246

2. **Clean State**: Clear topics and subscriptions between tests

247

3. **Port Management**: Use different ports for parallel test execution

248

4. **Resource Cleanup**: Properly shutdown emulator after tests

249

250

### Integration Testing

251

252

```java

253

import org.junit.jupiter.api.BeforeEach;

254

import org.junit.jupiter.api.AfterEach;

255

import org.junit.jupiter.api.Test;

256

257

public class PubSubConnectorIntegrationTest {

258

private static final String EMULATOR_HOST = "localhost:8085";

259

private static final String TEST_PROJECT = "test-project";

260

261

@BeforeEach

262

void setupEmulator() throws Exception {

263

// Start emulator programmatically

264

// Create test topics and subscriptions

265

}

266

267

@AfterEach

268

void cleanupEmulator() throws Exception {

269

// Stop emulator

270

// Clean up resources

271

}

272

273

@Test

274

void testSourceSinkIntegration() throws Exception {

275

// Create Flink job with emulator configuration

276

// Publish test messages

277

// Verify message consumption and processing

278

}

279

}

280

```

281

282

### Docker Testing

283

284

```dockerfile

285

# Dockerfile for emulator testing

286

FROM google/cloud-sdk:alpine

287

288

# Install Pub/Sub emulator

289

RUN gcloud components install pubsub-emulator

290

291

# Expose emulator port

292

EXPOSE 8085

293

294

# Start emulator

295

CMD ["gcloud", "beta", "emulators", "pubsub", "start", "--host-port=0.0.0.0:8085"]

296

```

297

298

```yaml

299

# docker-compose.yml for test environment

300

version: '3.8'

301

services:

302

pubsub-emulator:

303

build: .

304

ports:

305

- "8085:8085"

306

environment:

307

- PUBSUB_PROJECT_ID=test-project

308

```

309

310

## Emulator Limitations

311

312

### Feature Differences

313

314

- **Authentication**: No actual authentication required

315

- **IAM**: Access control not enforced

316

- **Monitoring**: Limited metrics and monitoring

317

- **Persistence**: Messages not persisted across emulator restarts

318

- **Performance**: Different performance characteristics than production

319

320

### Configuration Differences

321

322

- **Network**: Plain-text HTTP instead of HTTPS

323

- **Credentials**: Dummy credentials instead of service account keys

324

- **Endpoints**: Local endpoints instead of Google Cloud endpoints

325

- **Retries**: Simplified retry behavior

326

327

### Best Practices

328

329

1. **Production Parity**: Keep emulator configuration as close to production as possible

330

2. **Error Scenarios**: Test failure scenarios that emulator may not simulate

331

3. **Performance Testing**: Use actual Pub/Sub for performance benchmarks

332

4. **Security Testing**: Verify authentication works with real credentials

333

5. **End-to-End Testing**: Include tests against actual Google Cloud Pub/Sub