or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

error-handling.mdindex.mdparallel-processing.mdracing.mdresource-management.mdsynchronization-flow.md

parallel-processing.mddocs/

0

# Parallel Processing

1

2

Arrow FX Coroutines provides high-level parallel processing capabilities that enable concurrent execution while maintaining structured concurrency principles. All parallel operations respect cancellation, provide proper exception handling, and can be configured for optimal performance.

3

4

## Parallel Mapping

5

6

### Basic Parallel Map

7

8

```kotlin { .api }

9

suspend fun <A, B> Iterable<A>.parMap(transform: suspend CoroutineScope.(A) -> B): List<B>

10

suspend fun <A, B> Iterable<A>.parMap(context: CoroutineContext = EmptyCoroutineContext, transform: suspend CoroutineScope.(A) -> B): List<B>

11

```

12

13

Execute a transformation function on each element of a collection in parallel.

14

15

```kotlin

16

val urls = listOf("url1", "url2", "url3")

17

val responses = urls.parMap { url ->

18

httpClient.get(url)

19

}

20

```

21

22

### Parallel Map with Concurrency Control

23

24

```kotlin { .api }

25

suspend fun <A, B> Iterable<A>.parMap(concurrency: Int, f: suspend CoroutineScope.(A) -> B): List<B>

26

suspend fun <A, B> Iterable<A>.parMap(context: CoroutineContext = EmptyCoroutineContext, concurrency: Int, f: suspend CoroutineScope.(A) -> B): List<B>

27

```

28

29

Control the maximum number of concurrent operations to prevent resource exhaustion.

30

31

```kotlin

32

val largeDataset = (1..1000).toList()

33

val processed = largeDataset.parMap(concurrency = 10) { item ->

34

expensiveOperation(item)

35

}

36

```

37

38

### Parallel Map with Null Filtering

39

40

```kotlin { .api }

41

suspend fun <A, B> Iterable<A>.parMapNotNull(transform: suspend CoroutineScope.(A) -> B?): List<B>

42

suspend fun <A, B> Iterable<A>.parMapNotNull(context: CoroutineContext = EmptyCoroutineContext, transform: suspend CoroutineScope.(A) -> B?): List<B>

43

suspend fun <A, B> Iterable<A>.parMapNotNull(concurrency: Int, transform: suspend CoroutineScope.(A) -> B?): List<B>

44

suspend fun <A, B> Iterable<A>.parMapNotNull(context: CoroutineContext = EmptyCoroutineContext, concurrency: Int, transform: suspend CoroutineScope.(A) -> B?): List<B>

45

```

46

47

Map in parallel and filter out null results.

48

49

```kotlin

50

val userIds = listOf(1, 2, 3, 4, 5)

51

val validUsers = userIds.parMapNotNull { id ->

52

userService.findById(id) // Returns null for non-existent users

53

}

54

```

55

56

## Parallel Zipping

57

58

### Two-Way Parallel Zip

59

60

```kotlin { .api }

61

suspend fun <A, B, C> parZip(fa: suspend CoroutineScope.() -> A, fb: suspend CoroutineScope.() -> B, f: suspend CoroutineScope.(A, B) -> C): C

62

suspend fun <A, B, C> parZip(context: CoroutineContext, fa: suspend CoroutineScope.() -> A, fb: suspend CoroutineScope.() -> B, f: suspend CoroutineScope.(A, B) -> C): C

63

```

64

65

Execute two operations in parallel and combine their results.

66

67

```kotlin

68

val result = parZip(

69

{ fetchUserProfile(userId) },

70

{ fetchUserPreferences(userId) }

71

) { profile, preferences ->

72

UserData(profile, preferences)

73

}

74

```

75

76

### Multi-Way Parallel Zip (3-9 arity)

77

78

```kotlin { .api }

79

suspend fun <A, B, C, D> parZip(

80

fa: suspend CoroutineScope.() -> A,

81

fb: suspend CoroutineScope.() -> B,

82

fc: suspend CoroutineScope.() -> C,

83

f: suspend CoroutineScope.(A, B, C) -> D

84

): D

85

86

// Similar signatures available for 4, 5, 6, 7, 8, and 9 parameters

87

```

88

89

Execute multiple operations in parallel and combine all results.

90

91

```kotlin

92

val dashboard = parZip(

93

{ fetchUserStats() },

94

{ fetchRecentActivity() },

95

{ fetchNotifications() },

96

{ fetchSystemStatus() }

97

) { stats, activity, notifications, status ->

98

DashboardData(stats, activity, notifications, status)

99

}

100

```

101

102

## Error-Accumulating Parallel Operations

103

104

### Parallel Map with Error Accumulation

105

106

```kotlin { .api }

107

suspend fun <A, B> Iterable<A>.parMapOrAccumulate(f: suspend CoroutineScope.(A) -> B): Either<NonEmptyList<Error>, List<B>>

108

suspend fun <A, B> Iterable<A>.parMapOrAccumulate(context: CoroutineContext, f: suspend CoroutineScope.(A) -> B): Either<NonEmptyList<Error>, List<B>>

109

suspend fun <A, B> Iterable<A>.parMapOrAccumulate(concurrency: Int, f: suspend CoroutineScope.(A) -> B): Either<NonEmptyList<Error>, List<B>>

110

suspend fun <A, B> Iterable<A>.parMapOrAccumulate(context: CoroutineContext, concurrency: Int, f: suspend CoroutineScope.(A) -> B): Either<NonEmptyList<Error>, List<B>>

111

```

112

113

Map in parallel and accumulate all errors instead of failing fast.

114

115

```kotlin

116

val validationResults = users.parMapOrAccumulate { user ->

117

validateUser(user) // Can raise validation errors

118

}

119

120

when (validationResults) {

121

is Either.Left -> println("Validation errors: ${validationResults.value}")

122

is Either.Right -> println("All users valid: ${validationResults.value}")

123

}

124

```

125

126

### Parallel Zip with Error Accumulation

127

128

```kotlin { .api }

129

suspend fun <A, B, C> Raise<E>.parZipOrAccumulate(

130

fa: suspend CoroutineScope.() -> A,

131

fb: suspend CoroutineScope.() -> B,

132

f: suspend CoroutineScope.(A, B) -> C

133

): C

134

135

suspend fun <A, B, C> Raise<NonEmptyList<E>>.parZipOrAccumulate(

136

fa: suspend CoroutineScope.() -> A,

137

fb: suspend CoroutineScope.() -> B,

138

f: suspend CoroutineScope.(A, B) -> C

139

): C

140

```

141

142

Execute operations in parallel within a Raise context, accumulating errors.

143

144

```kotlin

145

either<ValidationError, UserProfile> {

146

parZipOrAccumulate(

147

{ validateName(userData.name) },

148

{ validateEmail(userData.email) },

149

{ validateAge(userData.age) }

150

) { name, email, age ->

151

UserProfile(name, email, age)

152

}

153

}

154

```

155

156

## Advanced Parallel Processing

157

158

### ScopedRaiseAccumulate Type

159

160

```kotlin { .api }

161

interface ScopedRaiseAccumulate<Error> : CoroutineScope, RaiseAccumulate<Error>

162

```

163

164

Intersection type that combines `CoroutineScope` and `RaiseAccumulate` for advanced error handling scenarios.

165

166

### Custom Context Execution

167

168

```kotlin

169

val customDispatcher = Dispatchers.IO.limitedParallelism(5)

170

171

val results = items.parMap(context = customDispatcher) { item ->

172

performIOOperation(item)

173

}

174

```

175

176

### Mixing Parallel and Sequential Operations

177

178

```kotlin

179

val processedData = inputData

180

.chunked(100) // Process in batches

181

.parMap(concurrency = 3) { batch ->

182

// Each batch processed in parallel

183

batch.map { item ->

184

// Items within batch processed sequentially

185

processItem(item)

186

}

187

}

188

.flatten()

189

```

190

191

## Performance Considerations

192

193

### Concurrency Control

194

195

Always consider using concurrency limits for operations that consume significant resources:

196

197

```kotlin

198

// Good: Limited concurrency prevents resource exhaustion

199

val results = urls.parMap(concurrency = 10) { url ->

200

httpClient.get(url)

201

}

202

203

// Potentially problematic: Unlimited concurrency

204

val results = urls.parMap { url ->

205

httpClient.get(url)

206

}

207

```

208

209

### Context Switching

210

211

Use appropriate coroutine contexts for different types of work:

212

213

```kotlin

214

// CPU-intensive work

215

val cpuResults = data.parMap(context = Dispatchers.CPU) { item ->

216

performCpuIntensiveWork(item)

217

}

218

219

// IO work

220

val ioResults = urls.parMap(context = Dispatchers.IO) { url ->

221

fetchFromNetwork(url)

222

}

223

```

224

225

### Memory Considerations

226

227

For large datasets, consider processing in chunks:

228

229

```kotlin

230

val hugeLists = largeDataset

231

.chunked(1000)

232

.parMap(concurrency = 4) { chunk ->

233

chunk.parMap { item ->

234

processItem(item)

235

}

236

}

237

.flatten()

238

```

239

240

## Integration Examples

241

242

### With Resource Management

243

244

```kotlin

245

resourceScope {

246

val httpClient = httpClientResource.bind()

247

val database = databaseResource.bind()

248

249

val enrichedData = rawData.parMap(concurrency = 5) { item ->

250

val apiData = httpClient.fetch(item.url)

251

val dbData = database.query(item.id)

252

enrichData(item, apiData, dbData)

253

}

254

255

database.saveBatch(enrichedData)

256

}

257

```

258

259

### With Error Handling

260

261

```kotlin

262

val processedResults = either<ProcessingError, List<ProcessedItem>> {

263

rawItems.parMapOrAccumulate { item ->

264

when {

265

item.isValid() -> processValidItem(item)

266

else -> raise(ValidationError("Invalid item: ${item.id}"))

267

}

268

}.bind()

269

}

270

```