0
# Output Sinks
1
2
Interfaces and implementations for writing data to external systems. Sinks provide the exit point for processed data from Bytewax dataflows, with support for exactly-once processing guarantees through state management and recovery.
3
4
## Capabilities
5
6
### Sink Base Classes
7
8
Abstract base classes that define the sink interface for different output patterns.
9
10
```python { .api }
11
class Sink[X]: ...
12
13
class StatefulSinkPartition[X, S]:
14
def write_batch(self, values: List[X]) -> None: ...
15
def snapshot(self) -> S: ...
16
def close(self) -> None: ...
17
18
class StatelessSinkPartition[X]:
19
def write_batch(self, values: List[X]) -> None: ...
20
def close(self) -> None: ...
21
```
22
23
**StatefulSinkPartition Methods:**
24
- `write_batch(values)`: Write a batch of values to the sink
25
- `snapshot()`: Return state for recovery (must be pickle-able and immutable)
26
- `close()`: Clean up resources when dataflow completes
27
28
**StatelessSinkPartition Methods:**
29
- `write_batch(values)`: Write a batch of values to the sink
30
- `close()`: Clean up resources
31
32
### Sink Implementations
33
34
Concrete sink types for different partitioning and delivery strategies.
35
36
```python { .api }
37
class FixedPartitionedSink[X, S](Sink[Tuple[str, X]]):
38
def list_parts(self) -> List[str]: ...
39
def part_fn(self, item_key: str) -> int: ...
40
def build_part(self, step_id: str, for_part: str, resume_state: Optional[S]) -> StatefulSinkPartition[X, S]: ...
41
42
class DynamicSink[X](Sink[X]):
43
def build(self, step_id: str, worker_index: int, worker_count: int) -> StatelessSinkPartition[X]: ...
44
```
45
46
**FixedPartitionedSink Methods:**
47
- `list_parts()`: Return list of partition identifiers available to this worker
48
- `part_fn(item_key)`: Map item key to partition index for routing
49
- `build_part()`: Create partition handler for specific partition
50
51
**DynamicSink Methods:**
52
- `build()`: Create sink partition for specific worker
53
54
**Usage Examples:**
55
```python
56
# Custom database sink with partitioning
57
class DatabaseSink(FixedPartitionedSink):
58
def __init__(self, connection_string, table_name):
59
self.connection_string = connection_string
60
self.table_name = table_name
61
62
def list_parts(self):
63
# Return partitions this worker can write to
64
return ["partition_0", "partition_1"]
65
66
def part_fn(self, item_key):
67
# Route items by key hash
68
return hash(item_key) % 2
69
70
def build_part(self, step_id, for_part, resume_state):
71
return DatabasePartition(
72
self.connection_string,
73
self.table_name,
74
for_part,
75
resume_state
76
)
77
78
# Simple dynamic sink
79
class FileSink(DynamicSink):
80
def __init__(self, file_path):
81
self.file_path = file_path
82
83
def build(self, step_id, worker_index, worker_count):
84
worker_file = f"{self.file_path}.worker_{worker_index}"
85
return FilePartition(worker_file)
86
```
87
88
### Sink Implementation Patterns
89
90
**File Writing with State:**
91
```python
92
class FilePartition(StatefulSinkPartition):
93
def __init__(self, file_path, resume_state=None):
94
self.file_path = file_path
95
self.bytes_written = resume_state or 0
96
self.file_handle = None
97
self._open_file()
98
99
def _open_file(self):
100
# Open file and seek to resume position
101
self.file_handle = open(self.file_path, 'a')
102
if self.bytes_written > 0:
103
# Verify file position matches state
104
current_size = os.path.getsize(self.file_path)
105
if current_size != self.bytes_written:
106
raise ValueError(f"File size mismatch: expected {self.bytes_written}, got {current_size}")
107
108
def write_batch(self, values):
109
for value in values:
110
line = json.dumps(value) + '\n'
111
bytes_to_write = len(line.encode('utf-8'))
112
self.file_handle.write(line)
113
self.bytes_written += bytes_to_write
114
115
self.file_handle.flush()
116
os.fsync(self.file_handle.fileno()) # Ensure durability
117
118
def snapshot(self):
119
return self.bytes_written
120
121
def close(self):
122
if self.file_handle:
123
self.file_handle.close()
124
```
125
126
**Database Writing with Transactions:**
127
```python
128
class DatabasePartition(StatefulSinkPartition):
129
def __init__(self, connection_string, table_name, partition_id, resume_state=None):
130
self.connection_string = connection_string
131
self.table_name = table_name
132
self.partition_id = partition_id
133
self.last_offset = resume_state or 0
134
self.connection = None
135
self._connect()
136
137
def _connect(self):
138
self.connection = psycopg2.connect(self.connection_string)
139
self.connection.autocommit = False
140
141
def write_batch(self, values):
142
cursor = self.connection.cursor()
143
try:
144
for i, value in enumerate(values):
145
current_offset = self.last_offset + i + 1
146
147
# Use offset as idempotency key
148
cursor.execute("""
149
INSERT INTO {} (partition_id, offset, data)
150
VALUES (%s, %s, %s)
151
ON CONFLICT (partition_id, offset) DO NOTHING
152
""".format(self.table_name),
153
(self.partition_id, current_offset, json.dumps(value)))
154
155
self.connection.commit()
156
self.last_offset += len(values)
157
158
except Exception as e:
159
self.connection.rollback()
160
raise e
161
finally:
162
cursor.close()
163
164
def snapshot(self):
165
return self.last_offset
166
167
def close(self):
168
if self.connection:
169
self.connection.close()
170
```
171
172
**HTTP API Sink with Retry:**
173
```python
174
import time
175
import requests
176
from typing import List
177
178
class APIPartition(StatelessSinkPartition):
179
def __init__(self, endpoint, api_key, max_retries=3):
180
self.endpoint = endpoint
181
self.headers = {"Authorization": f"Bearer {api_key}"}
182
self.max_retries = max_retries
183
184
def write_batch(self, values):
185
payload = {"events": values}
186
187
for attempt in range(self.max_retries):
188
try:
189
response = requests.post(
190
self.endpoint,
191
json=payload,
192
headers=self.headers,
193
timeout=30
194
)
195
196
if response.status_code == 200:
197
return # Success
198
elif response.status_code == 429: # Rate limited
199
time.sleep(2 ** attempt) # Exponential backoff
200
continue
201
else:
202
response.raise_for_status()
203
204
except requests.RequestException as e:
205
if attempt == self.max_retries - 1:
206
raise e
207
time.sleep(2 ** attempt)
208
209
def close(self):
210
# No resources to clean up
211
pass
212
```
213
214
**Batch Accumulation Pattern:**
215
```python
216
class BatchingSink(StatefulSinkPartition):
217
def __init__(self, target_sink, batch_size=1000, flush_interval=timedelta(seconds=30)):
218
self.target_sink = target_sink
219
self.batch_size = batch_size
220
self.flush_interval = flush_interval
221
self.buffer = []
222
self.last_flush = datetime.now()
223
224
def write_batch(self, values):
225
self.buffer.extend(values)
226
227
# Flush if buffer is full or enough time has passed
228
now = datetime.now()
229
should_flush = (
230
len(self.buffer) >= self.batch_size or
231
now - self.last_flush >= self.flush_interval
232
)
233
234
if should_flush and self.buffer:
235
self.target_sink.write_batch(self.buffer)
236
self.buffer.clear()
237
self.last_flush = now
238
239
def snapshot(self):
240
# Include buffered data in state
241
return {
242
"buffer": self.buffer.copy(),
243
"last_flush": self.last_flush.isoformat(),
244
"target_state": self.target_sink.snapshot() if hasattr(self.target_sink, 'snapshot') else None
245
}
246
247
def close(self):
248
# Flush remaining buffer on close
249
if self.buffer:
250
self.target_sink.write_batch(self.buffer)
251
self.target_sink.close()
252
```
253
254
**Multi-destination Sink:**
255
```python
256
class FanOutSink(StatelessSinkPartition):
257
def __init__(self, sinks: List[StatelessSinkPartition]):
258
self.sinks = sinks
259
260
def write_batch(self, values):
261
# Write to all sinks, collecting any errors
262
errors = []
263
for i, sink in enumerate(self.sinks):
264
try:
265
sink.write_batch(values)
266
except Exception as e:
267
errors.append(f"Sink {i}: {e}")
268
269
if errors:
270
raise RuntimeError(f"Failed to write to some sinks: {'; '.join(errors)}")
271
272
def close(self):
273
for sink in self.sinks:
274
try:
275
sink.close()
276
except Exception:
277
pass # Best effort cleanup
278
```
279
280
**Exactly-Once Guarantees:**
281
282
For exactly-once processing, sinks should:
283
284
1. **Use idempotent operations** when possible (e.g., upserts instead of inserts)
285
2. **Maintain offset/sequence numbers** in state for deduplication
286
3. **Use transactions** to ensure atomic writes with state updates
287
4. **Implement proper error handling** with appropriate retry logic
288
289
```python
290
class ExactlyOnceSink(StatefulSinkPartition):
291
def __init__(self, target_system):
292
self.target_system = target_system
293
self.processed_offsets = set() # Track processed items
294
295
def write_batch(self, values):
296
# Filter out already processed items using state
297
new_values = []
298
for offset, value in values: # Assume values include offset
299
if offset not in self.processed_offsets:
300
new_values.append(value)
301
self.processed_offsets.add(offset)
302
303
if new_values:
304
self.target_system.write_batch(new_values)
305
306
def snapshot(self):
307
return list(self.processed_offsets)
308
```