or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

core-processing.mddistributed-processing.mdindex.mdutilities.md

distributed-processing.mddocs/

0

# Distributed Processing

1

2

Parallel processing functionality for handling multiple XML dump files simultaneously using multiprocessing to overcome Python's GIL limitations. This enables efficient processing of large collections of MediaWiki dump files across multiple CPU cores.

3

4

## Capabilities

5

6

### Parallel Dump Processing

7

8

The `map` function provides a distributed processing strategy that automatically manages multiprocessing threads and result aggregation.

9

10

```python { .api }

11

def map(process, paths, threads=None):

12

"""

13

Implements a distributed strategy for processing XML files using multiprocessing.

14

15

This function constructs a set of multiprocessing threads (spread over multiple cores)

16

and uses an internal queue to aggregate outputs. The process function takes two arguments:

17

a Dump object and the path the dump was loaded from. Anything that this function yields

18

will be yielded in turn from the map function.

19

20

Parameters:

21

- process: Function that takes (Dump, path) and yields results. The function should

22

accept a mwxml.Dump object and a file path string, then yield any results

23

to be aggregated.

24

- paths: Iterable of file paths (str) or file objects to dump files to process

25

- threads: Number of individual processing threads to spawn (int, optional).

26

If not specified, defaults to CPU count.

27

28

Yields: Results from the process function across all files

29

"""

30

```

31

32

**Basic Usage Example:**

33

34

```python

35

import mwxml

36

37

# Define a processing function

38

def extract_page_info(dump, path):

39

"""Extract basic page information from a dump."""

40

print(f"Processing {path}")

41

42

for page in dump:

43

yield {

44

'file': path,

45

'page_id': page.id,

46

'namespace': page.namespace,

47

'title': page.title,

48

'revision_count': sum(1 for _ in page)

49

}

50

51

# Process multiple dump files in parallel

52

files = ["dump1.xml", "dump2.xml", "dump3.xml"]

53

54

for page_info in mwxml.map(extract_page_info, files):

55

print(f"Page {page_info['page_id']}: {page_info['title']} "

56

f"({page_info['revision_count']} revisions)")

57

```

58

59

**Advanced Usage Example:**

60

61

```python

62

import mwxml

63

from collections import defaultdict

64

65

def analyze_user_activity(dump, path):

66

"""Analyze user editing patterns in a dump."""

67

user_stats = defaultdict(lambda: {'edits': 0, 'pages': set()})

68

69

for page in dump.pages:

70

for revision in page:

71

if revision.user and revision.user.text:

72

username = revision.user.text

73

user_stats[username]['edits'] += 1

74

user_stats[username]['pages'].add(page.title)

75

76

# Yield results for each user

77

for username, stats in user_stats.items():

78

yield {

79

'file': path,

80

'username': username,

81

'total_edits': stats['edits'],

82

'unique_pages': len(stats['pages']),

83

'pages_edited': list(stats['pages'])

84

}

85

86

# Process with custom thread count

87

dump_files = ["enwiki-latest-pages-articles1.xml",

88

"enwiki-latest-pages-articles2.xml",

89

"enwiki-latest-pages-articles3.xml"]

90

91

# Use 4 threads for processing

92

results = []

93

for user_data in mwxml.map(analyze_user_activity, dump_files, threads=4):

94

results.append(user_data)

95

96

# Aggregate results across all files

97

user_totals = defaultdict(lambda: {'edits': 0, 'pages': set()})

98

for result in results:

99

username = result['username']

100

user_totals[username]['edits'] += result['total_edits']

101

user_totals[username]['pages'].update(result['pages_edited'])

102

103

# Print top contributors

104

sorted_users = sorted(user_totals.items(),

105

key=lambda x: x[1]['edits'],

106

reverse=True)

107

108

print("Top contributors across all files:")

109

for username, stats in sorted_users[:10]:

110

print(f"{username}: {stats['edits']} edits on {len(stats['pages'])} pages")

111

```

112

113

**Log Processing Example:**

114

115

```python

116

import mwxml

117

from datetime import datetime

118

119

def extract_admin_actions(dump, path):

120

"""Extract administrative actions from log items."""

121

for log_item in dump.log_items:

122

if log_item.type in ['block', 'delete', 'protect', 'move']:

123

yield {

124

'file': path,

125

'log_id': log_item.id,

126

'timestamp': log_item.timestamp,

127

'type': log_item.type,

128

'action': log_item.action,

129

'user': log_item.user.text if log_item.user else None,

130

'target_page': log_item.page.title if log_item.page else None,

131

'comment': log_item.comment,

132

'params': log_item.params

133

}

134

135

# Process log dump files

136

log_files = ["enwiki-latest-pages-logging.xml"]

137

138

admin_actions = []

139

for action in mwxml.map(extract_admin_actions, log_files):

140

admin_actions.append(action)

141

142

# Analyze administrative activity

143

action_counts = defaultdict(int)

144

for action in admin_actions:

145

action_counts[f"{action['type']}/{action['action']}"] += 1

146

147

print("Administrative action summary:")

148

for action_type, count in sorted(action_counts.items(), key=lambda x: x[1], reverse=True):

149

print(f"{action_type}: {count}")

150

```

151

152

**Performance Considerations:**

153

154

The distributed processing approach is most effective when:

155

156

- Processing multiple large dump files (>100MB each)

157

- Performing CPU-intensive analysis on revision content

158

- CPU count is greater than the number of files (otherwise threads = number of files)

159

- The processing function yields results incrementally rather than accumulating large data structures

160

161

**Memory Management:**

162

163

```python

164

def memory_efficient_processor(dump, path):

165

"""Example of memory-efficient processing."""

166

# Process pages one at a time, yielding results immediately

167

for page in dump.pages:

168

page_data = {

169

'title': page.title,

170

'id': page.id,

171

'revision_ids': []

172

}

173

174

# Process revisions without storing all in memory

175

for revision in page:

176

page_data['revision_ids'].append(revision.id)

177

178

# Yield immediately if processing large pages

179

if len(page_data['revision_ids']) > 1000:

180

yield page_data

181

page_data = {

182

'title': page.title,

183

'id': page.id,

184

'revision_ids': []

185

}

186

187

# Yield final batch

188

if page_data['revision_ids']:

189

yield page_data

190

191

# Use with memory-efficient processing

192

for result in mwxml.map(memory_efficient_processor, large_files, threads=2):

193

# Process results as they arrive

194

print(f"Processed page {result['title']} with {len(result['revision_ids'])} revisions")

195

```