or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

index.md
tile.json

tessl/maven-org-apache-flink--flink-contrib

Apache Flink contributor module providing the WikiEdits streaming connector for processing Wikipedia edit events from IRC

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.flink/flink-contrib@1.20.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-flink--flink-contrib@1.20.0

index.mddocs/

Apache Flink Contrib - WikiEdits Connector

Apache Flink Contrib provides experimental streaming connectors and modules in a staging area for community evaluation. The primary component is the WikiEdits connector that streams Wikipedia edit events from IRC channel monitoring, enabling real-time analytics on Wikipedia activity.

Package Information

  • Package Name: org.apache.flink:flink-contrib
  • Package Type: Maven
  • Language: Java
  • Installation:
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-wikiedits</artifactId>
      <version>1.20.0</version>
    </dependency>

Core Imports

import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditsSource;
import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditEvent;

Basic Usage

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditsSource;
import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditEvent;

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// Create WikiEdits source with default settings (connects to #en.wikipedia on IRC)
WikipediaEditsSource source = new WikipediaEditsSource();

// Add source to streaming environment
DataStream<WikipediaEditEvent> edits = env.addSource(source);

// Process the edit events
edits.filter(event -> !event.isBotEdit())
     .map(event -> "User " + event.getUser() + " edited " + event.getTitle())
     .print();

env.execute("Wikipedia Edits Stream");

Architecture

The WikiEdits connector is built around these key components:

  • WikipediaEditsSource: Main source function that connects to IRC and produces edit events
  • WikipediaEditEvent: Immutable data structure representing a single Wikipedia edit
  • IRC Integration: Uses the schwering IRC library to connect to Wikimedia's IRC channels
  • Event Parsing: Regex-based parsing of IRC messages into structured edit events

Important Note: This connector is deprecated as it uses the legacy SourceFunction API. It's recommended to migrate to the new Source API for production use.

Capabilities

Wikipedia Edit Streaming Source

Connects to Wikipedia's IRC channel to stream real-time edit events from the English Wikipedia.

/**
 * Source function for reading Wikipedia edit events from IRC channel #en.wikipedia
 * @deprecated Uses legacy SourceFunction API, migrate to new Source API
 */
@Deprecated
public class WikipediaEditsSource extends RichSourceFunction<WikipediaEditEvent> {
    
    /** Default IRC server hostname */
    public static final String DEFAULT_HOST = "irc.wikimedia.org";
    
    /** Default IRC server port */
    public static final int DEFAULT_PORT = 6667;
    
    /** Default IRC channel to monitor */
    public static final String DEFAULT_CHANNEL = "#en.wikipedia";
    
    /**
     * Creates a source with default IRC settings (irc.wikimedia.org:6667, #en.wikipedia)
     */
    public WikipediaEditsSource();
    
    /**
     * Creates a source with custom IRC settings
     * @param host IRC server hostname
     * @param port IRC server port
     * @param channel IRC channel to monitor (messages not matching expected format will be ignored)
     */
    public WikipediaEditsSource(String host, int port, String channel);
    
    /**
     * Main execution method that runs the source function
     * @param ctx Source context for collecting events
     * @throws Exception if IRC connection or parsing fails
     */
    @Override
    public void run(SourceContext<WikipediaEditEvent> ctx) throws Exception;
    
    /**
     * Cancels the source function, stopping event collection
     */
    @Override
    public void cancel();
}

Wikipedia Edit Event Data Structure

Immutable data structure containing all metadata about a Wikipedia edit event.

/**
 * Represents a single Wikipedia edit event with all associated metadata
 */
public class WikipediaEditEvent {
    
    /**
     * Creates a new Wikipedia edit event
     * @param timestamp Event timestamp (when received at source)
     * @param channel IRC channel name
     * @param title Wikipedia article title
     * @param diffUrl URL to the edit diff page
     * @param user Username of the editor
     * @param byteDiff Size change in bytes (positive for additions, negative for deletions)
     * @param summary Edit summary/comment provided by the editor
     * @param isMinor Whether this is marked as a minor edit
     * @param isNew Whether this edit created a new page
     * @param isUnpatrolled Whether this edit is unpatrolled
     * @param isBotEdit Whether this edit was made by a bot
     * @param isSpecial Whether this is a special page edit
     * @param isTalk Whether this is a talk page edit
     */
    public WikipediaEditEvent(
        long timestamp,
        String channel,
        String title,
        String diffUrl,
        String user,
        int byteDiff,
        String summary,
        boolean isMinor,
        boolean isNew,
        boolean isUnpatrolled,
        boolean isBotEdit,
        boolean isSpecial,
        boolean isTalk
    );
    
    /**
     * Returns the timestamp when this event was received at the source
     * @return Event timestamp in milliseconds since epoch
     */
    public long getTimestamp();
    
    /**
     * Returns the IRC channel this event originated from
     * @return IRC channel name (e.g., "#en.wikipedia")
     */
    public String getChannel();
    
    /**
     * Returns the title of the Wikipedia article that was edited
     * @return Article title
     */
    public String getTitle();
    
    /**
     * Returns the URL to view the edit diff
     * @return Diff URL
     */
    public String getDiffUrl();
    
    /**
     * Returns the username of the editor
     * @return Editor username
     */
    public String getUser();
    
    /**
     * Returns the size change in bytes
     * @return Byte difference (positive for additions, negative for deletions)
     */
    public int getByteDiff();
    
    /**
     * Returns the edit summary/comment
     * @return Edit summary text
     */
    public String getSummary();
    
    /**
     * Returns whether this is a minor edit
     * @return true if marked as minor edit
     */
    public boolean isMinor();
    
    /**
     * Returns whether this edit created a new page
     * @return true if this is a page creation
     */
    public boolean isNew();
    
    /**
     * Returns whether this edit is unpatrolled
     * @return true if unpatrolled
     */
    public boolean isUnpatrolled();
    
    /**
     * Returns whether this edit was made by a bot
     * @return true if bot edit
     */
    public boolean isBotEdit();
    
    /**
     * Returns whether this is a special page edit
     * @return true if special page (title starts with "Special:")
     */
    public boolean isSpecial();
    
    /**
     * Returns whether this is a talk page edit
     * @return true if talk page (title starts with "Talk:")
     */
    public boolean isTalk();
    
    /**
     * Creates a WikipediaEditEvent from a raw IRC message
     * @param timestamp Event timestamp
     * @param channel IRC channel name
     * @param rawEvent Raw IRC message text
     * @return Parsed WikipediaEditEvent or null if parsing failed
     */
    public static WikipediaEditEvent fromRawEvent(long timestamp, String channel, String rawEvent);
    
    /**
     * Returns a string representation of the event
     * @return String representation including all fields
     */
    @Override
    public String toString();
}

Usage Examples

Basic Edit Stream Processing

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
WikipediaEditsSource source = new WikipediaEditsSource();

env.addSource(source)
   .filter(event -> event.getByteDiff() > 100) // Large edits only
   .map(event -> String.format("%s: %+d bytes to '%s'", 
                               event.getUser(), 
                               event.getByteDiff(), 
                               event.getTitle()))
   .print();

env.execute("Large Edit Monitor");

Bot vs Human Edit Analysis

env.addSource(new WikipediaEditsSource())
   .map(event -> new Tuple2<>(
       event.isBotEdit() ? "BOT" : "HUMAN", 
       event.getByteDiff()))
   .keyBy(0) // Group by bot/human
   .sum(1)   // Sum byte differences
   .print();

Custom IRC Channel Monitoring

// Monitor a different IRC channel (for testing or other wikis)
WikipediaEditsSource customSource = new WikipediaEditsSource(
    "irc.wikimedia.org", 
    6667, 
    "#fr.wikipedia"  // French Wikipedia
);

env.addSource(customSource)
   .filter(event -> !event.isMinor()) // Exclude minor edits
   .print();

Error Handling

The WikipediaEditsSource handles several error conditions:

  • IRC Connection Failures: Connection issues are handled with automatic reconnection attempts
  • Malformed Messages: IRC messages that don't match the expected Wikipedia edit format are silently ignored
  • Null Values: Constructor validates that required fields (channel, title, diffUrl, user, summary) are not null, throwing NullPointerException if they are
  • Queue Overflow: Internal event queue has bounded capacity (128 events); overflow events are dropped with debug logging

Dependencies

This connector requires the following runtime dependencies:

  • Apache Flink Streaming: org.apache.flink:flink-streaming-java
  • IRC Library: org.schwering:irclib:1.10 for IRC protocol support

Migration Notes

This connector is deprecated because it uses the legacy SourceFunction API. For new applications, consider:

  1. Implementing a custom source using the new org.apache.flink.api.connector.source.Source API
  2. Using alternative data ingestion methods like Kafka or other message queues
  3. Building a custom IRC-to-Kafka bridge for more robust streaming architecture