or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.md

index.mddocs/

0

# Apache Flink Contrib - WikiEdits Connector

1

2

Apache Flink Contrib provides experimental streaming connectors and modules in a staging area for community evaluation. The primary component is the WikiEdits connector that streams Wikipedia edit events from IRC channel monitoring, enabling real-time analytics on Wikipedia activity.

3

4

## Package Information

5

6

- **Package Name**: org.apache.flink:flink-contrib

7

- **Package Type**: Maven

8

- **Language**: Java

9

- **Installation**:

10

```xml

11

<dependency>

12

<groupId>org.apache.flink</groupId>

13

<artifactId>flink-connector-wikiedits</artifactId>

14

<version>1.20.0</version>

15

</dependency>

16

```

17

18

## Core Imports

19

20

```java

21

import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditsSource;

22

import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditEvent;

23

```

24

25

## Basic Usage

26

27

```java

28

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

29

import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditsSource;

30

import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditEvent;

31

32

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

33

34

// Create WikiEdits source with default settings (connects to #en.wikipedia on IRC)

35

WikipediaEditsSource source = new WikipediaEditsSource();

36

37

// Add source to streaming environment

38

DataStream<WikipediaEditEvent> edits = env.addSource(source);

39

40

// Process the edit events

41

edits.filter(event -> !event.isBotEdit())

42

.map(event -> "User " + event.getUser() + " edited " + event.getTitle())

43

.print();

44

45

env.execute("Wikipedia Edits Stream");

46

```

47

48

## Architecture

49

50

The WikiEdits connector is built around these key components:

51

52

- **WikipediaEditsSource**: Main source function that connects to IRC and produces edit events

53

- **WikipediaEditEvent**: Immutable data structure representing a single Wikipedia edit

54

- **IRC Integration**: Uses the schwering IRC library to connect to Wikimedia's IRC channels

55

- **Event Parsing**: Regex-based parsing of IRC messages into structured edit events

56

57

**Important Note**: This connector is deprecated as it uses the legacy SourceFunction API. It's recommended to migrate to the new Source API for production use.

58

59

## Capabilities

60

61

### Wikipedia Edit Streaming Source

62

63

Connects to Wikipedia's IRC channel to stream real-time edit events from the English Wikipedia.

64

65

```java { .api }

66

/**

67

* Source function for reading Wikipedia edit events from IRC channel #en.wikipedia

68

* @deprecated Uses legacy SourceFunction API, migrate to new Source API

69

*/

70

@Deprecated

71

public class WikipediaEditsSource extends RichSourceFunction<WikipediaEditEvent> {

72

73

/** Default IRC server hostname */

74

public static final String DEFAULT_HOST = "irc.wikimedia.org";

75

76

/** Default IRC server port */

77

public static final int DEFAULT_PORT = 6667;

78

79

/** Default IRC channel to monitor */

80

public static final String DEFAULT_CHANNEL = "#en.wikipedia";

81

82

/**

83

* Creates a source with default IRC settings (irc.wikimedia.org:6667, #en.wikipedia)

84

*/

85

public WikipediaEditsSource();

86

87

/**

88

* Creates a source with custom IRC settings

89

* @param host IRC server hostname

90

* @param port IRC server port

91

* @param channel IRC channel to monitor (messages not matching expected format will be ignored)

92

*/

93

public WikipediaEditsSource(String host, int port, String channel);

94

95

/**

96

* Main execution method that runs the source function

97

* @param ctx Source context for collecting events

98

* @throws Exception if IRC connection or parsing fails

99

*/

100

@Override

101

public void run(SourceContext<WikipediaEditEvent> ctx) throws Exception;

102

103

/**

104

* Cancels the source function, stopping event collection

105

*/

106

@Override

107

public void cancel();

108

}

109

```

110

111

### Wikipedia Edit Event Data Structure

112

113

Immutable data structure containing all metadata about a Wikipedia edit event.

114

115

```java { .api }

116

/**

117

* Represents a single Wikipedia edit event with all associated metadata

118

*/

119

public class WikipediaEditEvent {

120

121

/**

122

* Creates a new Wikipedia edit event

123

* @param timestamp Event timestamp (when received at source)

124

* @param channel IRC channel name

125

* @param title Wikipedia article title

126

* @param diffUrl URL to the edit diff page

127

* @param user Username of the editor

128

* @param byteDiff Size change in bytes (positive for additions, negative for deletions)

129

* @param summary Edit summary/comment provided by the editor

130

* @param isMinor Whether this is marked as a minor edit

131

* @param isNew Whether this edit created a new page

132

* @param isUnpatrolled Whether this edit is unpatrolled

133

* @param isBotEdit Whether this edit was made by a bot

134

* @param isSpecial Whether this is a special page edit

135

* @param isTalk Whether this is a talk page edit

136

*/

137

public WikipediaEditEvent(

138

long timestamp,

139

String channel,

140

String title,

141

String diffUrl,

142

String user,

143

int byteDiff,

144

String summary,

145

boolean isMinor,

146

boolean isNew,

147

boolean isUnpatrolled,

148

boolean isBotEdit,

149

boolean isSpecial,

150

boolean isTalk

151

);

152

153

/**

154

* Returns the timestamp when this event was received at the source

155

* @return Event timestamp in milliseconds since epoch

156

*/

157

public long getTimestamp();

158

159

/**

160

* Returns the IRC channel this event originated from

161

* @return IRC channel name (e.g., "#en.wikipedia")

162

*/

163

public String getChannel();

164

165

/**

166

* Returns the title of the Wikipedia article that was edited

167

* @return Article title

168

*/

169

public String getTitle();

170

171

/**

172

* Returns the URL to view the edit diff

173

* @return Diff URL

174

*/

175

public String getDiffUrl();

176

177

/**

178

* Returns the username of the editor

179

* @return Editor username

180

*/

181

public String getUser();

182

183

/**

184

* Returns the size change in bytes

185

* @return Byte difference (positive for additions, negative for deletions)

186

*/

187

public int getByteDiff();

188

189

/**

190

* Returns the edit summary/comment

191

* @return Edit summary text

192

*/

193

public String getSummary();

194

195

/**

196

* Returns whether this is a minor edit

197

* @return true if marked as minor edit

198

*/

199

public boolean isMinor();

200

201

/**

202

* Returns whether this edit created a new page

203

* @return true if this is a page creation

204

*/

205

public boolean isNew();

206

207

/**

208

* Returns whether this edit is unpatrolled

209

* @return true if unpatrolled

210

*/

211

public boolean isUnpatrolled();

212

213

/**

214

* Returns whether this edit was made by a bot

215

* @return true if bot edit

216

*/

217

public boolean isBotEdit();

218

219

/**

220

* Returns whether this is a special page edit

221

* @return true if special page (title starts with "Special:")

222

*/

223

public boolean isSpecial();

224

225

/**

226

* Returns whether this is a talk page edit

227

* @return true if talk page (title starts with "Talk:")

228

*/

229

public boolean isTalk();

230

231

/**

232

* Creates a WikipediaEditEvent from a raw IRC message

233

* @param timestamp Event timestamp

234

* @param channel IRC channel name

235

* @param rawEvent Raw IRC message text

236

* @return Parsed WikipediaEditEvent or null if parsing failed

237

*/

238

public static WikipediaEditEvent fromRawEvent(long timestamp, String channel, String rawEvent);

239

240

/**

241

* Returns a string representation of the event

242

* @return String representation including all fields

243

*/

244

@Override

245

public String toString();

246

}

247

```

248

249

## Usage Examples

250

251

### Basic Edit Stream Processing

252

253

```java

254

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

255

WikipediaEditsSource source = new WikipediaEditsSource();

256

257

env.addSource(source)

258

.filter(event -> event.getByteDiff() > 100) // Large edits only

259

.map(event -> String.format("%s: %+d bytes to '%s'",

260

event.getUser(),

261

event.getByteDiff(),

262

event.getTitle()))

263

.print();

264

265

env.execute("Large Edit Monitor");

266

```

267

268

### Bot vs Human Edit Analysis

269

270

```java

271

env.addSource(new WikipediaEditsSource())

272

.map(event -> new Tuple2<>(

273

event.isBotEdit() ? "BOT" : "HUMAN",

274

event.getByteDiff()))

275

.keyBy(0) // Group by bot/human

276

.sum(1) // Sum byte differences

277

.print();

278

```

279

280

### Custom IRC Channel Monitoring

281

282

```java

283

// Monitor a different IRC channel (for testing or other wikis)

284

WikipediaEditsSource customSource = new WikipediaEditsSource(

285

"irc.wikimedia.org",

286

6667,

287

"#fr.wikipedia" // French Wikipedia

288

);

289

290

env.addSource(customSource)

291

.filter(event -> !event.isMinor()) // Exclude minor edits

292

.print();

293

```

294

295

## Error Handling

296

297

The WikipediaEditsSource handles several error conditions:

298

299

- **IRC Connection Failures**: Connection issues are handled with automatic reconnection attempts

300

- **Malformed Messages**: IRC messages that don't match the expected Wikipedia edit format are silently ignored

301

- **Null Values**: Constructor validates that required fields (channel, title, diffUrl, user, summary) are not null, throwing NullPointerException if they are

302

- **Queue Overflow**: Internal event queue has bounded capacity (128 events); overflow events are dropped with debug logging

303

304

## Dependencies

305

306

This connector requires the following runtime dependencies:

307

308

- **Apache Flink Streaming**: `org.apache.flink:flink-streaming-java`

309

- **IRC Library**: `org.schwering:irclib:1.10` for IRC protocol support

310

311

## Migration Notes

312

313

This connector is deprecated because it uses the legacy `SourceFunction` API. For new applications, consider:

314

315

1. Implementing a custom source using the new `org.apache.flink.api.connector.source.Source` API

316

2. Using alternative data ingestion methods like Kafka or other message queues

317

3. Building a custom IRC-to-Kafka bridge for more robust streaming architecture