or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

connectors.mddataflow.mdindex.mdoperators.mdrecovery.mdruntime.mdsinks.mdsources.mdstateful.mdtesting.mdtracing.mdwindowing.md

sinks.mddocs/

0

# Output Sinks

1

2

Interfaces and implementations for writing data to external systems. Sinks provide the exit point for processed data from Bytewax dataflows, with support for exactly-once processing guarantees through state management and recovery.

3

4

## Capabilities

5

6

### Sink Base Classes

7

8

Abstract base classes that define the sink interface for different output patterns.

9

10

```python { .api }

11

class Sink[X]: ...

12

13

class StatefulSinkPartition[X, S]:

14

def write_batch(self, values: List[X]) -> None: ...

15

def snapshot(self) -> S: ...

16

def close(self) -> None: ...

17

18

class StatelessSinkPartition[X]:

19

def write_batch(self, values: List[X]) -> None: ...

20

def close(self) -> None: ...

21

```

22

23

**StatefulSinkPartition Methods:**

24

- `write_batch(values)`: Write a batch of values to the sink

25

- `snapshot()`: Return state for recovery (must be pickle-able and immutable)

26

- `close()`: Clean up resources when dataflow completes

27

28

**StatelessSinkPartition Methods:**

29

- `write_batch(values)`: Write a batch of values to the sink

30

- `close()`: Clean up resources

31

32

### Sink Implementations

33

34

Concrete sink types for different partitioning and delivery strategies.

35

36

```python { .api }

37

class FixedPartitionedSink[X, S](Sink[Tuple[str, X]]):

38

def list_parts(self) -> List[str]: ...

39

def part_fn(self, item_key: str) -> int: ...

40

def build_part(self, step_id: str, for_part: str, resume_state: Optional[S]) -> StatefulSinkPartition[X, S]: ...

41

42

class DynamicSink[X](Sink[X]):

43

def build(self, step_id: str, worker_index: int, worker_count: int) -> StatelessSinkPartition[X]: ...

44

```

45

46

**FixedPartitionedSink Methods:**

47

- `list_parts()`: Return list of partition identifiers available to this worker

48

- `part_fn(item_key)`: Map item key to partition index for routing

49

- `build_part()`: Create partition handler for specific partition

50

51

**DynamicSink Methods:**

52

- `build()`: Create sink partition for specific worker

53

54

**Usage Examples:**

55

```python

56

# Custom database sink with partitioning

57

class DatabaseSink(FixedPartitionedSink):

58

def __init__(self, connection_string, table_name):

59

self.connection_string = connection_string

60

self.table_name = table_name

61

62

def list_parts(self):

63

# Return partitions this worker can write to

64

return ["partition_0", "partition_1"]

65

66

def part_fn(self, item_key):

67

# Route items by key hash

68

return hash(item_key) % 2

69

70

def build_part(self, step_id, for_part, resume_state):

71

return DatabasePartition(

72

self.connection_string,

73

self.table_name,

74

for_part,

75

resume_state

76

)

77

78

# Simple dynamic sink

79

class FileSink(DynamicSink):

80

def __init__(self, file_path):

81

self.file_path = file_path

82

83

def build(self, step_id, worker_index, worker_count):

84

worker_file = f"{self.file_path}.worker_{worker_index}"

85

return FilePartition(worker_file)

86

```

87

88

### Sink Implementation Patterns

89

90

**File Writing with State:**

91

```python

92

class FilePartition(StatefulSinkPartition):

93

def __init__(self, file_path, resume_state=None):

94

self.file_path = file_path

95

self.bytes_written = resume_state or 0

96

self.file_handle = None

97

self._open_file()

98

99

def _open_file(self):

100

# Open file and seek to resume position

101

self.file_handle = open(self.file_path, 'a')

102

if self.bytes_written > 0:

103

# Verify file position matches state

104

current_size = os.path.getsize(self.file_path)

105

if current_size != self.bytes_written:

106

raise ValueError(f"File size mismatch: expected {self.bytes_written}, got {current_size}")

107

108

def write_batch(self, values):

109

for value in values:

110

line = json.dumps(value) + '\n'

111

bytes_to_write = len(line.encode('utf-8'))

112

self.file_handle.write(line)

113

self.bytes_written += bytes_to_write

114

115

self.file_handle.flush()

116

os.fsync(self.file_handle.fileno()) # Ensure durability

117

118

def snapshot(self):

119

return self.bytes_written

120

121

def close(self):

122

if self.file_handle:

123

self.file_handle.close()

124

```

125

126

**Database Writing with Transactions:**

127

```python

128

class DatabasePartition(StatefulSinkPartition):

129

def __init__(self, connection_string, table_name, partition_id, resume_state=None):

130

self.connection_string = connection_string

131

self.table_name = table_name

132

self.partition_id = partition_id

133

self.last_offset = resume_state or 0

134

self.connection = None

135

self._connect()

136

137

def _connect(self):

138

self.connection = psycopg2.connect(self.connection_string)

139

self.connection.autocommit = False

140

141

def write_batch(self, values):

142

cursor = self.connection.cursor()

143

try:

144

for i, value in enumerate(values):

145

current_offset = self.last_offset + i + 1

146

147

# Use offset as idempotency key

148

cursor.execute("""

149

INSERT INTO {} (partition_id, offset, data)

150

VALUES (%s, %s, %s)

151

ON CONFLICT (partition_id, offset) DO NOTHING

152

""".format(self.table_name),

153

(self.partition_id, current_offset, json.dumps(value)))

154

155

self.connection.commit()

156

self.last_offset += len(values)

157

158

except Exception as e:

159

self.connection.rollback()

160

raise e

161

finally:

162

cursor.close()

163

164

def snapshot(self):

165

return self.last_offset

166

167

def close(self):

168

if self.connection:

169

self.connection.close()

170

```

171

172

**HTTP API Sink with Retry:**

173

```python

174

import time

175

import requests

176

from typing import List

177

178

class APIPartition(StatelessSinkPartition):

179

def __init__(self, endpoint, api_key, max_retries=3):

180

self.endpoint = endpoint

181

self.headers = {"Authorization": f"Bearer {api_key}"}

182

self.max_retries = max_retries

183

184

def write_batch(self, values):

185

payload = {"events": values}

186

187

for attempt in range(self.max_retries):

188

try:

189

response = requests.post(

190

self.endpoint,

191

json=payload,

192

headers=self.headers,

193

timeout=30

194

)

195

196

if response.status_code == 200:

197

return # Success

198

elif response.status_code == 429: # Rate limited

199

time.sleep(2 ** attempt) # Exponential backoff

200

continue

201

else:

202

response.raise_for_status()

203

204

except requests.RequestException as e:

205

if attempt == self.max_retries - 1:

206

raise e

207

time.sleep(2 ** attempt)

208

209

def close(self):

210

# No resources to clean up

211

pass

212

```

213

214

**Batch Accumulation Pattern:**

215

```python

216

class BatchingSink(StatefulSinkPartition):

217

def __init__(self, target_sink, batch_size=1000, flush_interval=timedelta(seconds=30)):

218

self.target_sink = target_sink

219

self.batch_size = batch_size

220

self.flush_interval = flush_interval

221

self.buffer = []

222

self.last_flush = datetime.now()

223

224

def write_batch(self, values):

225

self.buffer.extend(values)

226

227

# Flush if buffer is full or enough time has passed

228

now = datetime.now()

229

should_flush = (

230

len(self.buffer) >= self.batch_size or

231

now - self.last_flush >= self.flush_interval

232

)

233

234

if should_flush and self.buffer:

235

self.target_sink.write_batch(self.buffer)

236

self.buffer.clear()

237

self.last_flush = now

238

239

def snapshot(self):

240

# Include buffered data in state

241

return {

242

"buffer": self.buffer.copy(),

243

"last_flush": self.last_flush.isoformat(),

244

"target_state": self.target_sink.snapshot() if hasattr(self.target_sink, 'snapshot') else None

245

}

246

247

def close(self):

248

# Flush remaining buffer on close

249

if self.buffer:

250

self.target_sink.write_batch(self.buffer)

251

self.target_sink.close()

252

```

253

254

**Multi-destination Sink:**

255

```python

256

class FanOutSink(StatelessSinkPartition):

257

def __init__(self, sinks: List[StatelessSinkPartition]):

258

self.sinks = sinks

259

260

def write_batch(self, values):

261

# Write to all sinks, collecting any errors

262

errors = []

263

for i, sink in enumerate(self.sinks):

264

try:

265

sink.write_batch(values)

266

except Exception as e:

267

errors.append(f"Sink {i}: {e}")

268

269

if errors:

270

raise RuntimeError(f"Failed to write to some sinks: {'; '.join(errors)}")

271

272

def close(self):

273

for sink in self.sinks:

274

try:

275

sink.close()

276

except Exception:

277

pass # Best effort cleanup

278

```

279

280

**Exactly-Once Guarantees:**

281

282

For exactly-once processing, sinks should:

283

284

1. **Use idempotent operations** when possible (e.g., upserts instead of inserts)

285

2. **Maintain offset/sequence numbers** in state for deduplication

286

3. **Use transactions** to ensure atomic writes with state updates

287

4. **Implement proper error handling** with appropriate retry logic

288

289

```python

290

class ExactlyOnceSink(StatefulSinkPartition):

291

def __init__(self, target_system):

292

self.target_system = target_system

293

self.processed_offsets = set() # Track processed items

294

295

def write_batch(self, values):

296

# Filter out already processed items using state

297

new_values = []

298

for offset, value in values: # Assume values include offset

299

if offset not in self.processed_offsets:

300

new_values.append(value)

301

self.processed_offsets.add(offset)

302

303

if new_values:

304

self.target_system.write_batch(new_values)

305

306

def snapshot(self):

307

return list(self.processed_offsets)

308

```