0
# Distributed Processing
1
2
Parallel processing functionality for handling multiple XML dump files simultaneously using multiprocessing to overcome Python's GIL limitations. This enables efficient processing of large collections of MediaWiki dump files across multiple CPU cores.
3
4
## Capabilities
5
6
### Parallel Dump Processing
7
8
The `map` function provides a distributed processing strategy that automatically manages multiprocessing threads and result aggregation.
9
10
```python { .api }
11
def map(process, paths, threads=None):
12
"""
13
Implements a distributed strategy for processing XML files using multiprocessing.
14
15
This function constructs a set of multiprocessing threads (spread over multiple cores)
16
and uses an internal queue to aggregate outputs. The process function takes two arguments:
17
a Dump object and the path the dump was loaded from. Anything that this function yields
18
will be yielded in turn from the map function.
19
20
Parameters:
21
- process: Function that takes (Dump, path) and yields results. The function should
22
accept a mwxml.Dump object and a file path string, then yield any results
23
to be aggregated.
24
- paths: Iterable of file paths (str) or file objects to dump files to process
25
- threads: Number of individual processing threads to spawn (int, optional).
26
If not specified, defaults to CPU count.
27
28
Yields: Results from the process function across all files
29
"""
30
```
31
32
**Basic Usage Example:**
33
34
```python
35
import mwxml
36
37
# Define a processing function
38
def extract_page_info(dump, path):
39
"""Extract basic page information from a dump."""
40
print(f"Processing {path}")
41
42
for page in dump:
43
yield {
44
'file': path,
45
'page_id': page.id,
46
'namespace': page.namespace,
47
'title': page.title,
48
'revision_count': sum(1 for _ in page)
49
}
50
51
# Process multiple dump files in parallel
52
files = ["dump1.xml", "dump2.xml", "dump3.xml"]
53
54
for page_info in mwxml.map(extract_page_info, files):
55
print(f"Page {page_info['page_id']}: {page_info['title']} "
56
f"({page_info['revision_count']} revisions)")
57
```
58
59
**Advanced Usage Example:**
60
61
```python
62
import mwxml
63
from collections import defaultdict
64
65
def analyze_user_activity(dump, path):
66
"""Analyze user editing patterns in a dump."""
67
user_stats = defaultdict(lambda: {'edits': 0, 'pages': set()})
68
69
for page in dump.pages:
70
for revision in page:
71
if revision.user and revision.user.text:
72
username = revision.user.text
73
user_stats[username]['edits'] += 1
74
user_stats[username]['pages'].add(page.title)
75
76
# Yield results for each user
77
for username, stats in user_stats.items():
78
yield {
79
'file': path,
80
'username': username,
81
'total_edits': stats['edits'],
82
'unique_pages': len(stats['pages']),
83
'pages_edited': list(stats['pages'])
84
}
85
86
# Process with custom thread count
87
dump_files = ["enwiki-latest-pages-articles1.xml",
88
"enwiki-latest-pages-articles2.xml",
89
"enwiki-latest-pages-articles3.xml"]
90
91
# Use 4 threads for processing
92
results = []
93
for user_data in mwxml.map(analyze_user_activity, dump_files, threads=4):
94
results.append(user_data)
95
96
# Aggregate results across all files
97
user_totals = defaultdict(lambda: {'edits': 0, 'pages': set()})
98
for result in results:
99
username = result['username']
100
user_totals[username]['edits'] += result['total_edits']
101
user_totals[username]['pages'].update(result['pages_edited'])
102
103
# Print top contributors
104
sorted_users = sorted(user_totals.items(),
105
key=lambda x: x[1]['edits'],
106
reverse=True)
107
108
print("Top contributors across all files:")
109
for username, stats in sorted_users[:10]:
110
print(f"{username}: {stats['edits']} edits on {len(stats['pages'])} pages")
111
```
112
113
**Log Processing Example:**
114
115
```python
116
import mwxml
117
from datetime import datetime
118
119
def extract_admin_actions(dump, path):
120
"""Extract administrative actions from log items."""
121
for log_item in dump.log_items:
122
if log_item.type in ['block', 'delete', 'protect', 'move']:
123
yield {
124
'file': path,
125
'log_id': log_item.id,
126
'timestamp': log_item.timestamp,
127
'type': log_item.type,
128
'action': log_item.action,
129
'user': log_item.user.text if log_item.user else None,
130
'target_page': log_item.page.title if log_item.page else None,
131
'comment': log_item.comment,
132
'params': log_item.params
133
}
134
135
# Process log dump files
136
log_files = ["enwiki-latest-pages-logging.xml"]
137
138
admin_actions = []
139
for action in mwxml.map(extract_admin_actions, log_files):
140
admin_actions.append(action)
141
142
# Analyze administrative activity
143
action_counts = defaultdict(int)
144
for action in admin_actions:
145
action_counts[f"{action['type']}/{action['action']}"] += 1
146
147
print("Administrative action summary:")
148
for action_type, count in sorted(action_counts.items(), key=lambda x: x[1], reverse=True):
149
print(f"{action_type}: {count}")
150
```
151
152
**Performance Considerations:**
153
154
The distributed processing approach is most effective when:
155
156
- Processing multiple large dump files (>100MB each)
157
- Performing CPU-intensive analysis on revision content
158
- CPU count is greater than the number of files (otherwise threads = number of files)
159
- The processing function yields results incrementally rather than accumulating large data structures
160
161
**Memory Management:**
162
163
```python
164
def memory_efficient_processor(dump, path):
165
"""Example of memory-efficient processing."""
166
# Process pages one at a time, yielding results immediately
167
for page in dump.pages:
168
page_data = {
169
'title': page.title,
170
'id': page.id,
171
'revision_ids': []
172
}
173
174
# Process revisions without storing all in memory
175
for revision in page:
176
page_data['revision_ids'].append(revision.id)
177
178
# Yield immediately if processing large pages
179
if len(page_data['revision_ids']) > 1000:
180
yield page_data
181
page_data = {
182
'title': page.title,
183
'id': page.id,
184
'revision_ids': []
185
}
186
187
# Yield final batch
188
if page_data['revision_ids']:
189
yield page_data
190
191
# Use with memory-efficient processing
192
for result in mwxml.map(memory_efficient_processor, large_files, threads=2):
193
# Process results as they arrive
194
print(f"Processed page {result['title']} with {len(result['revision_ids'])} revisions")
195
```