Apache Flink contributor module providing the WikiEdits streaming connector for processing Wikipedia edit events from IRC
npx @tessl/cli install tessl/maven-org-apache-flink--flink-contrib@1.20.00
# Apache Flink Contrib - WikiEdits Connector
1
2
Apache Flink Contrib provides experimental streaming connectors and modules in a staging area for community evaluation. The primary component is the WikiEdits connector that streams Wikipedia edit events from IRC channel monitoring, enabling real-time analytics on Wikipedia activity.
3
4
## Package Information
5
6
- **Package Name**: org.apache.flink:flink-contrib
7
- **Package Type**: Maven
8
- **Language**: Java
9
- **Installation**:
10
```xml
11
<dependency>
12
<groupId>org.apache.flink</groupId>
13
<artifactId>flink-connector-wikiedits</artifactId>
14
<version>1.20.0</version>
15
</dependency>
16
```
17
18
## Core Imports
19
20
```java
21
import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditsSource;
22
import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditEvent;
23
```
24
25
## Basic Usage
26
27
```java
28
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
29
import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditsSource;
30
import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditEvent;
31
32
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
33
34
// Create WikiEdits source with default settings (connects to #en.wikipedia on IRC)
35
WikipediaEditsSource source = new WikipediaEditsSource();
36
37
// Add source to streaming environment
38
DataStream<WikipediaEditEvent> edits = env.addSource(source);
39
40
// Process the edit events
41
edits.filter(event -> !event.isBotEdit())
42
.map(event -> "User " + event.getUser() + " edited " + event.getTitle())
43
.print();
44
45
env.execute("Wikipedia Edits Stream");
46
```
47
48
## Architecture
49
50
The WikiEdits connector is built around these key components:
51
52
- **WikipediaEditsSource**: Main source function that connects to IRC and produces edit events
53
- **WikipediaEditEvent**: Immutable data structure representing a single Wikipedia edit
54
- **IRC Integration**: Uses the schwering IRC library to connect to Wikimedia's IRC channels
55
- **Event Parsing**: Regex-based parsing of IRC messages into structured edit events
56
57
**Important Note**: This connector is deprecated as it uses the legacy SourceFunction API. It's recommended to migrate to the new Source API for production use.
58
59
## Capabilities
60
61
### Wikipedia Edit Streaming Source
62
63
Connects to Wikipedia's IRC channel to stream real-time edit events from the English Wikipedia.
64
65
```java { .api }
66
/**
67
* Source function for reading Wikipedia edit events from IRC channel #en.wikipedia
68
* @deprecated Uses legacy SourceFunction API, migrate to new Source API
69
*/
70
@Deprecated
71
public class WikipediaEditsSource extends RichSourceFunction<WikipediaEditEvent> {
72
73
/** Default IRC server hostname */
74
public static final String DEFAULT_HOST = "irc.wikimedia.org";
75
76
/** Default IRC server port */
77
public static final int DEFAULT_PORT = 6667;
78
79
/** Default IRC channel to monitor */
80
public static final String DEFAULT_CHANNEL = "#en.wikipedia";
81
82
/**
83
* Creates a source with default IRC settings (irc.wikimedia.org:6667, #en.wikipedia)
84
*/
85
public WikipediaEditsSource();
86
87
/**
88
* Creates a source with custom IRC settings
89
* @param host IRC server hostname
90
* @param port IRC server port
91
* @param channel IRC channel to monitor (messages not matching expected format will be ignored)
92
*/
93
public WikipediaEditsSource(String host, int port, String channel);
94
95
/**
96
* Main execution method that runs the source function
97
* @param ctx Source context for collecting events
98
* @throws Exception if IRC connection or parsing fails
99
*/
100
@Override
101
public void run(SourceContext<WikipediaEditEvent> ctx) throws Exception;
102
103
/**
104
* Cancels the source function, stopping event collection
105
*/
106
@Override
107
public void cancel();
108
}
109
```
110
111
### Wikipedia Edit Event Data Structure
112
113
Immutable data structure containing all metadata about a Wikipedia edit event.
114
115
```java { .api }
116
/**
117
* Represents a single Wikipedia edit event with all associated metadata
118
*/
119
public class WikipediaEditEvent {
120
121
/**
122
* Creates a new Wikipedia edit event
123
* @param timestamp Event timestamp (when received at source)
124
* @param channel IRC channel name
125
* @param title Wikipedia article title
126
* @param diffUrl URL to the edit diff page
127
* @param user Username of the editor
128
* @param byteDiff Size change in bytes (positive for additions, negative for deletions)
129
* @param summary Edit summary/comment provided by the editor
130
* @param isMinor Whether this is marked as a minor edit
131
* @param isNew Whether this edit created a new page
132
* @param isUnpatrolled Whether this edit is unpatrolled
133
* @param isBotEdit Whether this edit was made by a bot
134
* @param isSpecial Whether this is a special page edit
135
* @param isTalk Whether this is a talk page edit
136
*/
137
public WikipediaEditEvent(
138
long timestamp,
139
String channel,
140
String title,
141
String diffUrl,
142
String user,
143
int byteDiff,
144
String summary,
145
boolean isMinor,
146
boolean isNew,
147
boolean isUnpatrolled,
148
boolean isBotEdit,
149
boolean isSpecial,
150
boolean isTalk
151
);
152
153
/**
154
* Returns the timestamp when this event was received at the source
155
* @return Event timestamp in milliseconds since epoch
156
*/
157
public long getTimestamp();
158
159
/**
160
* Returns the IRC channel this event originated from
161
* @return IRC channel name (e.g., "#en.wikipedia")
162
*/
163
public String getChannel();
164
165
/**
166
* Returns the title of the Wikipedia article that was edited
167
* @return Article title
168
*/
169
public String getTitle();
170
171
/**
172
* Returns the URL to view the edit diff
173
* @return Diff URL
174
*/
175
public String getDiffUrl();
176
177
/**
178
* Returns the username of the editor
179
* @return Editor username
180
*/
181
public String getUser();
182
183
/**
184
* Returns the size change in bytes
185
* @return Byte difference (positive for additions, negative for deletions)
186
*/
187
public int getByteDiff();
188
189
/**
190
* Returns the edit summary/comment
191
* @return Edit summary text
192
*/
193
public String getSummary();
194
195
/**
196
* Returns whether this is a minor edit
197
* @return true if marked as minor edit
198
*/
199
public boolean isMinor();
200
201
/**
202
* Returns whether this edit created a new page
203
* @return true if this is a page creation
204
*/
205
public boolean isNew();
206
207
/**
208
* Returns whether this edit is unpatrolled
209
* @return true if unpatrolled
210
*/
211
public boolean isUnpatrolled();
212
213
/**
214
* Returns whether this edit was made by a bot
215
* @return true if bot edit
216
*/
217
public boolean isBotEdit();
218
219
/**
220
* Returns whether this is a special page edit
221
* @return true if special page (title starts with "Special:")
222
*/
223
public boolean isSpecial();
224
225
/**
226
* Returns whether this is a talk page edit
227
* @return true if talk page (title starts with "Talk:")
228
*/
229
public boolean isTalk();
230
231
/**
232
* Creates a WikipediaEditEvent from a raw IRC message
233
* @param timestamp Event timestamp
234
* @param channel IRC channel name
235
* @param rawEvent Raw IRC message text
236
* @return Parsed WikipediaEditEvent or null if parsing failed
237
*/
238
public static WikipediaEditEvent fromRawEvent(long timestamp, String channel, String rawEvent);
239
240
/**
241
* Returns a string representation of the event
242
* @return String representation including all fields
243
*/
244
@Override
245
public String toString();
246
}
247
```
248
249
## Usage Examples
250
251
### Basic Edit Stream Processing
252
253
```java
254
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
255
WikipediaEditsSource source = new WikipediaEditsSource();
256
257
env.addSource(source)
258
.filter(event -> event.getByteDiff() > 100) // Large edits only
259
.map(event -> String.format("%s: %+d bytes to '%s'",
260
event.getUser(),
261
event.getByteDiff(),
262
event.getTitle()))
263
.print();
264
265
env.execute("Large Edit Monitor");
266
```
267
268
### Bot vs Human Edit Analysis
269
270
```java
271
env.addSource(new WikipediaEditsSource())
272
.map(event -> new Tuple2<>(
273
event.isBotEdit() ? "BOT" : "HUMAN",
274
event.getByteDiff()))
275
.keyBy(0) // Group by bot/human
276
.sum(1) // Sum byte differences
277
.print();
278
```
279
280
### Custom IRC Channel Monitoring
281
282
```java
283
// Monitor a different IRC channel (for testing or other wikis)
284
WikipediaEditsSource customSource = new WikipediaEditsSource(
285
"irc.wikimedia.org",
286
6667,
287
"#fr.wikipedia" // French Wikipedia
288
);
289
290
env.addSource(customSource)
291
.filter(event -> !event.isMinor()) // Exclude minor edits
292
.print();
293
```
294
295
## Error Handling
296
297
The WikipediaEditsSource handles several error conditions:
298
299
- **IRC Connection Failures**: Connection issues are handled with automatic reconnection attempts
300
- **Malformed Messages**: IRC messages that don't match the expected Wikipedia edit format are silently ignored
301
- **Null Values**: Constructor validates that required fields (channel, title, diffUrl, user, summary) are not null, throwing NullPointerException if they are
302
- **Queue Overflow**: Internal event queue has bounded capacity (128 events); overflow events are dropped with debug logging
303
304
## Dependencies
305
306
This connector requires the following runtime dependencies:
307
308
- **Apache Flink Streaming**: `org.apache.flink:flink-streaming-java`
309
- **IRC Library**: `org.schwering:irclib:1.10` for IRC protocol support
310
311
## Migration Notes
312
313
This connector is deprecated because it uses the legacy `SourceFunction` API. For new applications, consider:
314
315
1. Implementing a custom source using the new `org.apache.flink.api.connector.source.Source` API
316
2. Using alternative data ingestion methods like Kafka or other message queues
317
3. Building a custom IRC-to-Kafka bridge for more robust streaming architecture