or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

tessl/maven-org-apache-flink--flink-storm-examples_2.10

Apache Flink Storm compatibility examples demonstrating Storm topology integration with embedded mode and full topology execution patterns.

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.flink/flink-storm-examples_2.10@1.3.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-flink--flink-storm-examples_2.10@1.3.0

0

# Apache Flink Storm Examples

1

2

A comprehensive collection of examples demonstrating Apache Flink's Storm compatibility layer. This library enables developers to run Apache Storm topologies on Flink clusters through three main integration approaches: embedded mode (using Storm components within Flink streaming programs), full topology mode (running complete Storm topologies), and hybrid integration patterns.

3

4

## Package Information

5

6

- **Package Name**: flink-storm-examples_2.10

7

- **Package Type**: Maven

8

- **Language**: Java

9

- **Installation**: Add dependency to your Maven `pom.xml`:

10

11

```xml

12

<dependency>

13

<groupId>org.apache.flink</groupId>

14

<artifactId>flink-storm-examples_2.10</artifactId>

15

<version>1.3.3</version>

16

</dependency>

17

```

18

19

## Core Imports

20

21

```java

22

// Core Storm interfaces

23

import org.apache.storm.topology.IRichSpout;

24

import org.apache.storm.topology.IRichBolt;

25

import org.apache.storm.topology.TopologyBuilder;

26

import org.apache.storm.topology.OutputFieldsDeclarer;

27

import org.apache.storm.task.TopologyContext;

28

import org.apache.storm.task.OutputCollector;

29

import org.apache.storm.spout.SpoutOutputCollector;

30

import org.apache.storm.tuple.Tuple;

31

import org.apache.storm.tuple.Fields;

32

import org.apache.storm.tuple.Values;

33

34

// Utility classes for Storm-Flink integration

35

import org.apache.flink.storm.util.*;

36

37

// Wrapper classes for Storm components

38

import org.apache.flink.storm.wrappers.SpoutWrapper;

39

import org.apache.flink.storm.wrappers.BoltWrapper;

40

41

// Storm API integration

42

import org.apache.flink.storm.api.*;

43

44

// Example classes (choose based on use case)

45

import org.apache.flink.storm.wordcount.SpoutSourceWordCount;

46

import org.apache.flink.storm.wordcount.BoltTokenizerWordCount;

47

import org.apache.flink.storm.wordcount.WordCountTopology;

48

49

// Flink streaming API

50

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

51

import org.apache.flink.streaming.api.datastream.DataStream;

52

import org.apache.flink.api.java.tuple.Tuple2;

53

```

54

55

## Basic Usage

56

57

```java

58

import org.apache.flink.storm.wordcount.SpoutSourceWordCount;

59

import org.apache.flink.storm.wordcount.BoltTokenizerWordCount;

60

import org.apache.flink.storm.wordcount.WordCountLocal;

61

62

// Option 1: Run examples with embedded Storm components

63

public class FlinkStormExample {

64

public static void main(String[] args) throws Exception {

65

// Using Storm spout as data source in Flink streaming

66

SpoutSourceWordCount.main(new String[]{"input.txt", "output"});

67

68

// Using Storm bolt for processing in Flink streaming

69

BoltTokenizerWordCount.main(new String[]{"input.txt", "output"});

70

71

// Running complete Storm topology on Flink

72

WordCountLocal.main(new String[]{"input.txt", "output"});

73

}

74

}

75

```

76

77

## Architecture

78

79

The library is organized around three key integration patterns:

80

81

- **Embedded Mode**: Integrate individual Storm components (Spouts/Bolts) within Flink streaming programs using wrapper classes

82

- **Full Topology Mode**: Execute complete Storm topologies on Flink using topology builders and execution environments

83

- **Utility Framework**: Base classes and formatters for creating custom Storm-Flink integrations

84

85

Key components include:

86

87

- **Base Classes**: `AbstractLineSpout`, `AbstractBoltSink` provide foundations for custom components

88

- **Data Sources**: File-based and memory-based spouts for various input scenarios

89

- **Output Sinks**: Configurable output formatting with console and file writing capabilities

90

- **Storm Operators**: Production-ready Spouts and Bolts for common data processing tasks

91

- **Topology Builders**: Utilities for constructing complete Storm topologies with Flink execution

92

93

## Capabilities

94

95

### Utility Classes and Base Components

96

97

Core utility classes providing the foundation for Storm-Flink integration, including abstract base classes, data sources, output sinks, and formatting utilities.

98

99

```java { .api }

100

// Base classes

101

public abstract class AbstractLineSpout implements IRichSpout {

102

public static final String ATTRIBUTE_LINE = "line";

103

public abstract void nextTuple();

104

}

105

106

public abstract class AbstractBoltSink implements IRichBolt {

107

public AbstractBoltSink(OutputFormatter formatter);

108

protected abstract void writeExternal(String line);

109

}

110

111

// Data sources

112

public class InMemorySpout<T> extends AbstractLineSpout;

113

public class FileSpout extends AbstractLineSpout;

114

115

// Output formatters

116

public interface OutputFormatter extends Serializable {

117

String format(Tuple input);

118

}

119

```

120

121

[Utility Classes](./utility-classes.md)

122

123

### Word Count Examples

124

125

Comprehensive word count examples demonstrating various Storm-Flink integration patterns including spout-based sources, bolt-based processing, and complete topology execution.

126

127

```java { .api }

128

// Main example classes

129

public class SpoutSourceWordCount {

130

public static void main(String[] args) throws Exception;

131

public static class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>>;

132

}

133

134

public class BoltTokenizerWordCount {

135

public static void main(String[] args) throws Exception;

136

}

137

138

public class WordCountTopology {

139

public static TopologyBuilder buildTopology();

140

public static TopologyBuilder buildTopology(boolean indexOrName);

141

}

142

```

143

144

[Word Count Examples](./wordcount-examples.md)

145

146

### Storm Operators

147

148

Production-ready Storm operators including Spouts for data ingestion and Bolts for data processing, with support for both index-based and field name-based tuple access patterns.

149

150

```java { .api }

151

// Spouts

152

public class WordCountFileSpout extends FileSpout {

153

public WordCountFileSpout(String path);

154

}

155

156

public class WordCountInMemorySpout extends FiniteInMemorySpout {

157

public WordCountInMemorySpout();

158

}

159

160

// Bolts

161

public class BoltTokenizer implements IRichBolt {

162

public static final String ATTRIBUTE_WORD = "word";

163

public static final String ATTRIBUTE_COUNT = "count";

164

public void execute(Tuple input);

165

}

166

167

public class BoltCounter implements IRichBolt {

168

public void execute(Tuple input);

169

}

170

```

171

172

[Storm Operators](./storm-operators.md)

173

174

### Topology Builders and Remote Execution

175

176

Core topology construction utilities and remote cluster execution patterns for deploying Storm topologies on Flink clusters.

177

178

```java { .api }

179

// Topology builders

180

public class WordCountTopology {

181

public static final String spoutId = "source";

182

public static final String tokenierzerId = "tokenizer";

183

public static final String counterId = "counter";

184

public static final String sinkId = "sink";

185

186

public static TopologyBuilder buildTopology();

187

public static TopologyBuilder buildTopology(boolean indexOrName);

188

}

189

190

public class ExclamationTopology {

191

public static final String spoutId = "source";

192

public static final String firstBoltId = "exclamation1";

193

public static final String secondBoltId = "exclamation2";

194

public static final String sinkId = "sink";

195

196

public static TopologyBuilder buildTopology();

197

}

198

199

// Remote execution

200

public class WordCountRemoteByClient {

201

public static final String topologyId = "Storm WordCount";

202

public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, NotAliveException;

203

}

204

205

public class WordCountRemoteBySubmitter {

206

public static final String topologyId = "Storm WordCount";

207

public static void main(String[] args) throws Exception;

208

}

209

```

210

211

### Additional Examples

212

213

Extended examples demonstrating advanced Storm-Flink integration patterns including stream splitting, joins, exclamation processing, and real-time data printing.

214

215

```java { .api }

216

// Stream processing examples

217

public class ExclamationLocal {

218

public static final String topologyId = "Streaming Exclamation";

219

public static void main(String[] args) throws Exception;

220

}

221

222

public class SpoutSplitExample {

223

public static void main(String[] args) throws Exception;

224

}

225

226

public class SingleJoinExample {

227

public static void main(String[] args) throws Exception;

228

}

229

```

230

231

[Additional Examples](./additional-examples.md)

232

233

## Deployment

234

235

The library includes three pre-built JAR files for cluster deployment:

236

237

1. **WordCount-SpoutSource.jar** - Spout-based word count example

238

2. **WordCount-BoltTokenizer.jar** - Bolt-based word count example

239

3. **WordCount-StormTopology.jar** - Complete Storm topology example

240

241

**Usage**: `bin/flink run <jar-file> [input-path] [output-path]`

242

243

## Integration Patterns

244

245

### Embedded Mode

246

Use Storm components within Flink streaming programs:

247

- Wrap Spouts with `SpoutWrapper<T>` for data sources

248

- Wrap Bolts with `BoltWrapper<IN, OUT>` for data processing

249

- Combine with native Flink operations in streaming pipelines

250

251

### Full Topology Mode

252

Execute complete Storm topologies on Flink:

253

- Use `TopologyBuilder` to construct Storm topologies

254

- Submit via `FlinkLocalCluster` for local testing

255

- Deploy to remote clusters using `FlinkClient` or Storm submitter patterns

256

257

### Data Access Patterns

258

- **Index-based**: Access tuple fields by position (e.g., `tuple.getValue(0)`)

259

- **Name-based**: Access tuple fields by name (e.g., `tuple.getValueByField("word")`)

260

- **POJO-based**: Use Plain Old Java Objects with field name mapping