0
# Apache Spark Sketch
1
2
Apache Spark Sketch provides probabilistic data structures optimized for large-scale data processing within the Apache Spark ecosystem. It implements Bloom filters for space-efficient approximate membership testing with configurable false positive rates, and Count-Min sketches for approximate frequency counting and cardinality estimation using sub-linear space.
3
4
## Package Information
5
6
- **Package Name**: spark-sketch_2.12
7
- **Package Type**: Maven
8
- **Language**: Java
9
- **Installation**: Add to your Maven `pom.xml`:
10
11
```xml
12
<dependency>
13
<groupId>org.apache.spark</groupId>
14
<artifactId>spark-sketch_2.12</artifactId>
15
<version>3.0.1</version>
16
</dependency>
17
```
18
19
## Core Imports
20
21
```java
22
import org.apache.spark.util.sketch.BloomFilter;
23
import org.apache.spark.util.sketch.CountMinSketch;
24
import org.apache.spark.util.sketch.IncompatibleMergeException;
25
```
26
27
## Basic Usage
28
29
### Bloom Filter Example
30
31
```java
32
import org.apache.spark.util.sketch.BloomFilter;
33
import java.io.ByteArrayOutputStream;
34
import java.io.ByteArrayInputStream;
35
36
// Create a Bloom filter expecting 1000 items with 5% false positive rate
37
BloomFilter bloomFilter = BloomFilter.create(1000, 0.05);
38
39
// Add items to the filter
40
bloomFilter.put("user123");
41
bloomFilter.putLong(12345L);
42
bloomFilter.putBinary("data".getBytes());
43
44
// Test membership
45
boolean mightContain = bloomFilter.mightContain("user123"); // true
46
boolean definitelyNot = bloomFilter.mightContain("user999"); // false (if not added)
47
48
// Serialize and deserialize
49
ByteArrayOutputStream out = new ByteArrayOutputStream();
50
bloomFilter.writeTo(out);
51
ByteArrayInputStream in = new ByteArrayInputStream(out.toByteArray());
52
BloomFilter loaded = BloomFilter.readFrom(in);
53
```
54
55
### Count-Min Sketch Example
56
57
```java
58
import org.apache.spark.util.sketch.CountMinSketch;
59
import java.io.ByteArrayOutputStream;
60
import java.io.ByteArrayInputStream;
61
62
// Create a sketch with 1% relative error, 90% confidence, and seed 42
63
CountMinSketch sketch = CountMinSketch.create(0.01, 0.9, 42);
64
65
// Add items with counts
66
sketch.add("user123"); // increment by 1
67
sketch.addLong(12345L, 5); // increment by 5
68
sketch.addString("event", 3); // increment by 3
69
70
// Estimate counts
71
long estimate = sketch.estimateCount("user123"); // approximately 1
72
long longEstimate = sketch.estimateCount(12345L); // approximately 5
73
74
// Serialize and deserialize
75
byte[] serialized = sketch.toByteArray();
76
CountMinSketch loaded = CountMinSketch.readFrom(serialized);
77
```
78
79
## Architecture
80
81
Apache Spark Sketch is built around several key components:
82
83
- **Abstract Base Classes**: `BloomFilter` and `CountMinSketch` define the public API while hiding implementation details
84
- **Concrete Implementations**: `BloomFilterImpl` and `CountMinSketchImpl` provide the actual probabilistic data structure logic
85
- **Hash Functions**: Optimized Murmur3 hash implementation for consistent, high-performance hashing
86
- **Memory Management**: Efficient bit manipulation and memory-safe operations for large-scale data processing
87
- **Serialization**: Binary format support for persistence and distributed computing scenarios
88
89
## Capabilities
90
91
### Bloom Filter Operations
92
93
Approximate membership testing with configurable false positive rates. Ideal for duplicate detection and pre-filtering operations.
94
95
```java { .api }
96
// Factory methods
97
public static BloomFilter create(long expectedNumItems);
98
public static BloomFilter create(long expectedNumItems, double fpp);
99
public static BloomFilter create(long expectedNumItems, long numBits);
100
public static BloomFilter readFrom(InputStream in) throws IOException;
101
102
// Configuration and introspection
103
public abstract double expectedFpp();
104
public abstract long bitSize();
105
106
// Adding items (returns true if bits changed)
107
public abstract boolean put(Object item);
108
public abstract boolean putString(String item);
109
public abstract boolean putLong(long item);
110
public abstract boolean putBinary(byte[] item);
111
112
// Membership testing
113
public abstract boolean mightContain(Object item);
114
public abstract boolean mightContainString(String item);
115
public abstract boolean mightContainLong(long item);
116
public abstract boolean mightContainBinary(byte[] item);
117
118
// Filter operations
119
public abstract boolean isCompatible(BloomFilter other);
120
public abstract BloomFilter mergeInPlace(BloomFilter other) throws IncompatibleMergeException;
121
122
// Serialization
123
public abstract void writeTo(OutputStream out) throws IOException;
124
```
125
126
### Count-Min Sketch Operations
127
128
Frequency estimation and cardinality analysis using probabilistic counting with error bounds.
129
130
```java { .api }
131
// Factory methods
132
public static CountMinSketch create(int depth, int width, int seed);
133
public static CountMinSketch create(double eps, double confidence, int seed);
134
public static CountMinSketch readFrom(InputStream in) throws IOException;
135
public static CountMinSketch readFrom(byte[] bytes) throws IOException;
136
137
// Configuration and introspection
138
public abstract double relativeError();
139
public abstract double confidence();
140
public abstract int depth();
141
public abstract int width();
142
public abstract long totalCount();
143
144
// Adding items
145
public abstract void add(Object item);
146
public abstract void add(Object item, long count);
147
public abstract void addLong(long item);
148
public abstract void addLong(long item, long count);
149
public abstract void addString(String item);
150
public abstract void addString(String item, long count);
151
public abstract void addBinary(byte[] item);
152
public abstract void addBinary(byte[] item, long count);
153
154
// Frequency estimation
155
public abstract long estimateCount(Object item);
156
157
// Sketch operations
158
public abstract CountMinSketch mergeInPlace(CountMinSketch other) throws IncompatibleMergeException;
159
160
// Serialization
161
public abstract void writeTo(OutputStream out) throws IOException;
162
public abstract byte[] toByteArray() throws IOException;
163
```
164
165
### Error Handling
166
167
Exception thrown when attempting to merge incompatible data structures.
168
169
```java { .api }
170
public class IncompatibleMergeException extends Exception {
171
public IncompatibleMergeException(String message);
172
}
173
```
174
175
## Types
176
177
### Bloom Filter Types
178
179
```java { .api }
180
public abstract class BloomFilter {
181
public enum Version {
182
V1(1);
183
184
int getVersionNumber() { return versionNumber; }
185
}
186
187
static final double DEFAULT_FPP = 0.03; // package-private constant
188
}
189
```
190
191
### Count-Min Sketch Types
192
193
```java { .api }
194
public abstract class CountMinSketch {
195
public enum Version {
196
V1(1);
197
198
int getVersionNumber() { return versionNumber; }
199
}
200
}
201
```
202
203
## Supported Data Types
204
205
Both Bloom filters and Count-Min sketches support the following data types:
206
207
- **Byte**: 8-bit signed integer values
208
- **Short**: 16-bit signed integer values
209
- **Integer**: 32-bit signed integer values
210
- **Long**: 64-bit signed integer values
211
- **String**: UTF-8 encoded text strings
212
- **byte[]**: Arbitrary binary data arrays
213
214
## Usage Examples
215
216
### Advanced Bloom Filter Usage
217
218
```java
219
// Create filters with specific parameters
220
BloomFilter filter1 = BloomFilter.create(10000, 0.01); // 1% FPP
221
BloomFilter filter2 = BloomFilter.create(10000, 20000); // specific bit size
222
223
// Check compatibility and merge
224
if (filter1.isCompatible(filter2)) {
225
filter1.mergeInPlace(filter2); // combines both filters
226
}
227
228
// Get filter statistics
229
double fpp = filter1.expectedFpp();
230
long bits = filter1.bitSize();
231
```
232
233
### Advanced Count-Min Sketch Usage
234
235
```java
236
// Create sketch with explicit dimensions
237
CountMinSketch sketch1 = CountMinSketch.create(4, 2048, 123); // depth=4, width=2048
238
239
// Get sketch configuration
240
double error = sketch1.relativeError(); // eps parameter
241
double confidence = sketch1.confidence(); // delta parameter
242
int d = sketch1.depth(); // number of hash functions
243
int w = sketch1.width(); // table width
244
long total = sketch1.totalCount(); // total items added
245
246
// Merge compatible sketches
247
CountMinSketch sketch2 = CountMinSketch.create(4, 2048, 123); // same params
248
sketch1.mergeInPlace(sketch2);
249
```
250
251
### Error Handling
252
253
```java
254
try {
255
BloomFilter filter1 = BloomFilter.create(1000);
256
BloomFilter filter2 = BloomFilter.create(2000); // different size
257
filter1.mergeInPlace(filter2); // throws exception
258
} catch (IncompatibleMergeException e) {
259
System.err.println("Cannot merge filters: " + e.getMessage());
260
}
261
```
262
263
## Performance Characteristics
264
265
### Bloom Filter
266
267
- **Space Complexity**: O(m) where m is the number of bits
268
- **Time Complexity**: O(k) per operation, where k is the number of hash functions
269
- **False Positive Rate**: Configurable, typically 1-5%
270
- **False Negative Rate**: Always 0%
271
272
### Count-Min Sketch
273
274
- **Space Complexity**: O(ε⁻¹ log δ⁻¹) where ε is relative error and δ is confidence
275
- **Time Complexity**: O(log δ⁻¹) per operation
276
- **Error Bounds**: With probability 1-δ, estimate ≤ true_frequency + ε × total_count
277
- **Never Underestimates**: Estimates are always ≥ true frequency
278
279
## Thread Safety
280
281
Both data structures are thread-safe for read operations after construction. Write operations (put, add, merge) are not thread-safe and require external synchronization in concurrent environments.