0
# Concurrent Collection Infrastructure
1
2
Base classes and atomic field updaters for building lock-free concurrent data structures. This infrastructure provides the foundation for Scala's concurrent collections, implementing atomic operations and memory management patterns.
3
4
## Core Imports
5
6
```scala
7
import scala.collection.concurrent.{BasicNode, MainNode, INodeBase, CNodeBase}
8
import java.util.concurrent.atomic.{AtomicReferenceFieldUpdater, AtomicIntegerFieldUpdater}
9
import scala.scalanative.unsafe.Ptr
10
```
11
12
## Capabilities
13
14
### Basic Node Infrastructure
15
16
Abstract base class for all concurrent collection nodes.
17
18
```scala { .api }
19
abstract class BasicNode {
20
def string(lev: Int): String
21
}
22
```
23
24
### Main Node Management
25
26
Base class for main nodes in concurrent collections with atomic previous node management.
27
28
```scala { .api }
29
abstract class MainNode[K <: AnyRef, V <: AnyRef] extends BasicNode {
30
@volatile var prev: MainNode[K, V]
31
32
def cachedSize(ct: Object): Int
33
def knownSize(): Int
34
def CAS_PREV(oldval: MainNode[K, V], nval: MainNode[K, V]): Boolean
35
def WRITE_PREV(nval: MainNode[K, V]): Unit
36
@deprecated def READ_PREV(): MainNode[K, V]
37
}
38
39
object MainNode {
40
val updater: AtomicReferenceFieldUpdater[MainNode[_, _], MainNode[_, _]]
41
}
42
```
43
44
### INode Base Implementation
45
46
Base class for I-nodes (indirection nodes) in concurrent trie-based collections.
47
48
```scala { .api }
49
abstract class INodeBase[K <: AnyRef, V <: AnyRef](generation: Gen) extends BasicNode {
50
@volatile var mainnode: MainNode[K, V]
51
final var gen: Gen
52
53
def prev(): BasicNode
54
}
55
56
object INodeBase {
57
val updater: AtomicReferenceFieldUpdater[INodeBase[_, _], MainNode[_, _]]
58
val RESTART: Object
59
val NO_SUCH_ELEMENT_SENTINEL: Object
60
}
61
```
62
63
### CNode Base Implementation
64
65
Base class for C-nodes (collision nodes) with atomic size management.
66
67
```scala { .api }
68
abstract class CNodeBase[K <: AnyRef, V <: AnyRef] extends MainNode[K, V] {
69
@volatile var csize: Int
70
final val updater: AtomicIntegerFieldUpdater[CNodeBase[_, _]]
71
72
def CAS_SIZE(oldval: Int, nval: Int): Boolean
73
def WRITE_SIZE(nval: Int): Unit
74
def READ_SIZE: Int
75
}
76
```
77
78
### Generation Management
79
80
Generation marker for concurrent collections.
81
82
```scala { .api }
83
final class Gen // private[concurrent]
84
```
85
86
### Atomic Field Updaters
87
88
Intrinsic atomic field updaters providing lock-free operations.
89
90
```scala { .api }
91
class IntrinsicAtomicReferenceFieldUpdater[T <: AnyRef, V <: AnyRef](
92
selector: T => Ptr[V]
93
) extends AtomicReferenceFieldUpdater[T, V] {
94
def compareAndSet(obj: T, expect: V, update: V): Boolean
95
def weakCompareAndSet(obj: T, expect: V, update: V): Boolean
96
def set(obj: T, newValue: V): Unit
97
def lazySet(obj: T, newValue: V): Unit
98
def get(obj: T): V
99
}
100
101
class IntrinsicAtomicIntegerFieldUpdater[T <: AnyRef](
102
selector: T => Ptr[Int]
103
) extends AtomicIntegerFieldUpdater[T] {
104
def compareAndSet(obj: T, expect: Int, update: Int): Boolean
105
def weakCompareAndSet(obj: T, expect: Int, update: Int): Boolean
106
def set(obj: T, newValue: Int): Unit
107
def lazySet(obj: T, newValue: Int): Unit
108
def get(obj: T): Int
109
}
110
```
111
112
## Usage Examples
113
114
```scala
115
import scala.collection.concurrent.{BasicNode, MainNode, INodeBase, CNodeBase}
116
117
// Extending BasicNode for custom node types
118
class MyNode extends BasicNode {
119
def string(lev: Int): String = s"MyNode(level=$lev)"
120
}
121
122
// Using atomic operations on MainNode
123
class MyMainNode[K <: AnyRef, V <: AnyRef] extends MainNode[K, V] {
124
def cachedSize(ct: Object): Int = 0
125
def knownSize(): Int = 0
126
127
// Atomic compare-and-swap operation
128
def updatePrevious(expected: MainNode[K, V], newNode: MainNode[K, V]): Boolean = {
129
CAS_PREV(expected, newNode)
130
}
131
}
132
133
// Using CNodeBase for size management
134
class MyCNode[K <: AnyRef, V <: AnyRef] extends CNodeBase[K, V] {
135
def cachedSize(ct: Object): Int = READ_SIZE
136
def knownSize(): Int = READ_SIZE
137
138
// Atomic size update
139
def incrementSize(): Boolean = {
140
val currentSize = READ_SIZE
141
CAS_SIZE(currentSize, currentSize + 1)
142
}
143
}
144
```
145
146
## Thread Safety
147
148
All operations in this infrastructure are designed for concurrent access:
149
150
- `@volatile` fields ensure memory visibility
151
- Atomic field updaters provide lock-free compare-and-swap operations
152
- Memory ordering guarantees are maintained through proper use of atomic operations
153
- The infrastructure supports building high-performance concurrent data structures without explicit locking