A set of utilities for processing MediaWiki XML dump data efficiently with streaming and distributed processing capabilities.
—
Parallel processing functionality for handling multiple XML dump files simultaneously using multiprocessing to overcome Python's GIL limitations. This enables efficient processing of large collections of MediaWiki dump files across multiple CPU cores.
The map function provides a distributed processing strategy that automatically manages multiprocessing threads and result aggregation.
def map(process, paths, threads=None):
"""
Implements a distributed strategy for processing XML files using multiprocessing.
This function constructs a set of multiprocessing threads (spread over multiple cores)
and uses an internal queue to aggregate outputs. The process function takes two arguments:
a Dump object and the path the dump was loaded from. Anything that this function yields
will be yielded in turn from the map function.
Parameters:
- process: Function that takes (Dump, path) and yields results. The function should
accept a mwxml.Dump object and a file path string, then yield any results
to be aggregated.
- paths: Iterable of file paths (str) or file objects to dump files to process
- threads: Number of individual processing threads to spawn (int, optional).
If not specified, defaults to CPU count.
Yields: Results from the process function across all files
"""Basic Usage Example:
import mwxml
# Define a processing function
def extract_page_info(dump, path):
"""Extract basic page information from a dump."""
print(f"Processing {path}")
for page in dump:
yield {
'file': path,
'page_id': page.id,
'namespace': page.namespace,
'title': page.title,
'revision_count': sum(1 for _ in page)
}
# Process multiple dump files in parallel
files = ["dump1.xml", "dump2.xml", "dump3.xml"]
for page_info in mwxml.map(extract_page_info, files):
print(f"Page {page_info['page_id']}: {page_info['title']} "
f"({page_info['revision_count']} revisions)")Advanced Usage Example:
import mwxml
from collections import defaultdict
def analyze_user_activity(dump, path):
"""Analyze user editing patterns in a dump."""
user_stats = defaultdict(lambda: {'edits': 0, 'pages': set()})
for page in dump.pages:
for revision in page:
if revision.user and revision.user.text:
username = revision.user.text
user_stats[username]['edits'] += 1
user_stats[username]['pages'].add(page.title)
# Yield results for each user
for username, stats in user_stats.items():
yield {
'file': path,
'username': username,
'total_edits': stats['edits'],
'unique_pages': len(stats['pages']),
'pages_edited': list(stats['pages'])
}
# Process with custom thread count
dump_files = ["enwiki-latest-pages-articles1.xml",
"enwiki-latest-pages-articles2.xml",
"enwiki-latest-pages-articles3.xml"]
# Use 4 threads for processing
results = []
for user_data in mwxml.map(analyze_user_activity, dump_files, threads=4):
results.append(user_data)
# Aggregate results across all files
user_totals = defaultdict(lambda: {'edits': 0, 'pages': set()})
for result in results:
username = result['username']
user_totals[username]['edits'] += result['total_edits']
user_totals[username]['pages'].update(result['pages_edited'])
# Print top contributors
sorted_users = sorted(user_totals.items(),
key=lambda x: x[1]['edits'],
reverse=True)
print("Top contributors across all files:")
for username, stats in sorted_users[:10]:
print(f"{username}: {stats['edits']} edits on {len(stats['pages'])} pages")Log Processing Example:
import mwxml
from datetime import datetime
def extract_admin_actions(dump, path):
"""Extract administrative actions from log items."""
for log_item in dump.log_items:
if log_item.type in ['block', 'delete', 'protect', 'move']:
yield {
'file': path,
'log_id': log_item.id,
'timestamp': log_item.timestamp,
'type': log_item.type,
'action': log_item.action,
'user': log_item.user.text if log_item.user else None,
'target_page': log_item.page.title if log_item.page else None,
'comment': log_item.comment,
'params': log_item.params
}
# Process log dump files
log_files = ["enwiki-latest-pages-logging.xml"]
admin_actions = []
for action in mwxml.map(extract_admin_actions, log_files):
admin_actions.append(action)
# Analyze administrative activity
action_counts = defaultdict(int)
for action in admin_actions:
action_counts[f"{action['type']}/{action['action']}"] += 1
print("Administrative action summary:")
for action_type, count in sorted(action_counts.items(), key=lambda x: x[1], reverse=True):
print(f"{action_type}: {count}")Performance Considerations:
The distributed processing approach is most effective when:
Memory Management:
def memory_efficient_processor(dump, path):
"""Example of memory-efficient processing."""
# Process pages one at a time, yielding results immediately
for page in dump.pages:
page_data = {
'title': page.title,
'id': page.id,
'revision_ids': []
}
# Process revisions without storing all in memory
for revision in page:
page_data['revision_ids'].append(revision.id)
# Yield immediately if processing large pages
if len(page_data['revision_ids']) > 1000:
yield page_data
page_data = {
'title': page.title,
'id': page.id,
'revision_ids': []
}
# Yield final batch
if page_data['revision_ids']:
yield page_data
# Use with memory-efficient processing
for result in mwxml.map(memory_efficient_processor, large_files, threads=2):
# Process results as they arrive
print(f"Processed page {result['title']} with {len(result['revision_ids'])} revisions")Install with Tessl CLI
npx tessl i tessl/pypi-mwxml