CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-mwxml

A set of utilities for processing MediaWiki XML dump data efficiently with streaming and distributed processing capabilities.

Pending
Overview
Eval results
Files

distributed-processing.mddocs/

Distributed Processing

Parallel processing functionality for handling multiple XML dump files simultaneously using multiprocessing to overcome Python's GIL limitations. This enables efficient processing of large collections of MediaWiki dump files across multiple CPU cores.

Capabilities

Parallel Dump Processing

The map function provides a distributed processing strategy that automatically manages multiprocessing threads and result aggregation.

def map(process, paths, threads=None):
    """
    Implements a distributed strategy for processing XML files using multiprocessing.
    
    This function constructs a set of multiprocessing threads (spread over multiple cores) 
    and uses an internal queue to aggregate outputs. The process function takes two arguments:
    a Dump object and the path the dump was loaded from. Anything that this function yields
    will be yielded in turn from the map function.
    
    Parameters:
    - process: Function that takes (Dump, path) and yields results. The function should 
              accept a mwxml.Dump object and a file path string, then yield any results
              to be aggregated.
    - paths: Iterable of file paths (str) or file objects to dump files to process
    - threads: Number of individual processing threads to spawn (int, optional).
              If not specified, defaults to CPU count.
    
    Yields: Results from the process function across all files
    """

Basic Usage Example:

import mwxml

# Define a processing function
def extract_page_info(dump, path):
    """Extract basic page information from a dump."""
    print(f"Processing {path}")
    
    for page in dump:
        yield {
            'file': path,
            'page_id': page.id,
            'namespace': page.namespace,
            'title': page.title,
            'revision_count': sum(1 for _ in page)
        }

# Process multiple dump files in parallel
files = ["dump1.xml", "dump2.xml", "dump3.xml"]

for page_info in mwxml.map(extract_page_info, files):
    print(f"Page {page_info['page_id']}: {page_info['title']} "
          f"({page_info['revision_count']} revisions)")

Advanced Usage Example:

import mwxml
from collections import defaultdict

def analyze_user_activity(dump, path):
    """Analyze user editing patterns in a dump."""
    user_stats = defaultdict(lambda: {'edits': 0, 'pages': set()})
    
    for page in dump.pages:
        for revision in page:
            if revision.user and revision.user.text:
                username = revision.user.text
                user_stats[username]['edits'] += 1
                user_stats[username]['pages'].add(page.title)
    
    # Yield results for each user
    for username, stats in user_stats.items():
        yield {
            'file': path,
            'username': username,
            'total_edits': stats['edits'],
            'unique_pages': len(stats['pages']),
            'pages_edited': list(stats['pages'])
        }

# Process with custom thread count
dump_files = ["enwiki-latest-pages-articles1.xml", 
              "enwiki-latest-pages-articles2.xml",
              "enwiki-latest-pages-articles3.xml"]

# Use 4 threads for processing
results = []
for user_data in mwxml.map(analyze_user_activity, dump_files, threads=4):
    results.append(user_data)
    
# Aggregate results across all files
user_totals = defaultdict(lambda: {'edits': 0, 'pages': set()})
for result in results:
    username = result['username']
    user_totals[username]['edits'] += result['total_edits']
    user_totals[username]['pages'].update(result['pages_edited'])

# Print top contributors
sorted_users = sorted(user_totals.items(), 
                     key=lambda x: x[1]['edits'], 
                     reverse=True)

print("Top contributors across all files:")
for username, stats in sorted_users[:10]:
    print(f"{username}: {stats['edits']} edits on {len(stats['pages'])} pages")

Log Processing Example:

import mwxml
from datetime import datetime

def extract_admin_actions(dump, path):
    """Extract administrative actions from log items."""
    for log_item in dump.log_items:
        if log_item.type in ['block', 'delete', 'protect', 'move']:
            yield {
                'file': path,
                'log_id': log_item.id,
                'timestamp': log_item.timestamp,
                'type': log_item.type,
                'action': log_item.action,
                'user': log_item.user.text if log_item.user else None,
                'target_page': log_item.page.title if log_item.page else None,
                'comment': log_item.comment,
                'params': log_item.params
            }

# Process log dump files
log_files = ["enwiki-latest-pages-logging.xml"]

admin_actions = []
for action in mwxml.map(extract_admin_actions, log_files):
    admin_actions.append(action)

# Analyze administrative activity
action_counts = defaultdict(int)
for action in admin_actions:
    action_counts[f"{action['type']}/{action['action']}"] += 1

print("Administrative action summary:")
for action_type, count in sorted(action_counts.items(), key=lambda x: x[1], reverse=True):
    print(f"{action_type}: {count}")

Performance Considerations:

The distributed processing approach is most effective when:

  • Processing multiple large dump files (>100MB each)
  • Performing CPU-intensive analysis on revision content
  • CPU count is greater than the number of files (otherwise threads = number of files)
  • The processing function yields results incrementally rather than accumulating large data structures

Memory Management:

def memory_efficient_processor(dump, path):
    """Example of memory-efficient processing."""
    # Process pages one at a time, yielding results immediately
    for page in dump.pages:
        page_data = {
            'title': page.title,
            'id': page.id,
            'revision_ids': []
        }
        
        # Process revisions without storing all in memory
        for revision in page:
            page_data['revision_ids'].append(revision.id)
            
            # Yield immediately if processing large pages
            if len(page_data['revision_ids']) > 1000:
                yield page_data
                page_data = {
                    'title': page.title,
                    'id': page.id,  
                    'revision_ids': []
                }
        
        # Yield final batch
        if page_data['revision_ids']:
            yield page_data

# Use with memory-efficient processing
for result in mwxml.map(memory_efficient_processor, large_files, threads=2):
    # Process results as they arrive
    print(f"Processed page {result['title']} with {len(result['revision_ids'])} revisions")

Install with Tessl CLI

npx tessl i tessl/pypi-mwxml

docs

core-processing.md

distributed-processing.md

index.md

utilities.md

tile.json