or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

tessl/pypi-bytewax

Python Stateful Stream Processing Framework

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/bytewax@0.21.x

To install, run

npx @tessl/cli install tessl/pypi-bytewax@0.21.0

0

# Bytewax

1

2

A Python framework that simplifies event and stream processing by coupling the stream and event processing capabilities of systems like Flink, Spark, and Kafka Streams with Python's familiar interface. Built with a Rust distributed processing engine using PyO3 bindings, it provides parallelizable stream processing through a dataflow computational model that supports stateful transformations, windowing operations, joins, and real-time aggregations.

3

4

## Package Information

5

6

- **Package Name**: bytewax

7

- **Language**: Python

8

- **Installation**: `pip install bytewax`

9

- **Optional Dependencies**: `pip install bytewax[kafka]` for Kafka connectors

10

11

## Core Imports

12

13

```python

14

# Core dataflow construction

15

from bytewax.dataflow import Dataflow

16

17

# Built-in operators for stream processing

18

import bytewax.operators as op

19

20

# Windowing operators for time-based processing

21

import bytewax.operators.windowing as win

22

23

# Runtime execution

24

import bytewax.run

25

26

# Testing utilities

27

import bytewax.testing

28

29

# Input and output interfaces

30

from bytewax.inputs import Source

31

from bytewax.outputs import Sink

32

33

# Built-in connectors

34

from bytewax.connectors.stdio import StdOutSink, StdInSource

35

from bytewax.connectors.files import FileSource, FileSink, CSVSource, DirSource, DirSink

36

from bytewax.connectors.kafka import KafkaSource, KafkaSink, KafkaSourceMessage, KafkaSinkMessage

37

from bytewax.connectors.kafka import operators as kop

38

from bytewax.connectors.demo import RandomMetricSource

39

```

40

41

## Basic Usage

42

43

```python

44

from bytewax.dataflow import Dataflow

45

import bytewax.operators as op

46

from bytewax.testing import TestingSource, run_main

47

from bytewax.connectors.stdio import StdOutSink

48

49

# Create a dataflow

50

flow = Dataflow("basic_example")

51

52

# Add input data

53

input_data = [1, 2, 3, 4, 5]

54

stream = op.input("input", flow, TestingSource(input_data))

55

56

# Transform the data

57

doubled = op.map("double", stream, lambda x: x * 2)

58

59

# Filter even results

60

evens = op.filter("evens", doubled, lambda x: x % 4 == 0)

61

62

# Output results

63

op.output("output", evens, StdOutSink())

64

65

# Run the dataflow

66

run_main(flow)

67

```

68

69

## Architecture

70

71

Bytewax's architecture consists of several key components that work together to provide scalable stream processing:

72

73

- **Dataflow**: The main container that defines the processing graph and topology

74

- **Operators**: Processing primitives that transform, filter, aggregate, and route data

75

- **Streams**: Typed data flows between operators that can be keyed for stateful operations

76

- **Sources/Sinks**: Input and output connectors for external systems

77

- **Runtime**: Rust-based execution engine that handles distribution, state management, and recovery

78

- **Recovery System**: Built-in state snapshotting and recovery mechanisms for fault tolerance

79

80

This design enables seamless scaling from single-process development to multi-worker distributed deployments across multiple hosts, with support for exactly-once processing guarantees through state recovery mechanisms.

81

82

## Capabilities

83

84

### Dataflow Construction

85

86

Core functionality for building and configuring stream processing dataflows. Includes the main Dataflow class and Stream abstractions that form the foundation of all processing topologies.

87

88

```python { .api }

89

class Dataflow:

90

def __init__(self, name: str): ...

91

92

class Stream[X]:

93

def __init__(self, id: str, scope): ...

94

```

95

96

[Dataflow Construction](./dataflow.md)

97

98

### Stream Processing Operators

99

100

Essential operators for transforming, filtering, and routing data through processing pipelines. These operators provide the building blocks for most stream processing use cases including map, filter, flat_map, branch, merge, and more.

101

102

```python { .api }

103

def input(step_id: str, flow: Dataflow, source: Source[X]) -> Stream[X]: ...

104

def output(step_id: str, up: Stream[X], sink: Sink[X]) -> None: ...

105

def map(step_id: str, up: Stream[X], mapper: Callable[[X], Y]) -> Stream[Y]: ...

106

def filter(step_id: str, up: Stream[X], predicate: Callable[[X], bool]) -> Stream[X]: ...

107

def flat_map(step_id: str, up: Stream[X], mapper: Callable[[X], Iterable[Y]]) -> Stream[Y]: ...

108

def branch(step_id: str, up: Stream[X], predicate: Callable[[X], bool]) -> BranchOut[X, X]: ...

109

def merge(step_id: str, *ups: Stream[Any]) -> Stream[Any]: ...

110

```

111

112

[Stream Processing Operators](./operators.md)

113

114

### Stateful Processing

115

116

Advanced operators for maintaining state across events, including reduce, fold, join, and custom stateful transformations. These operators enable complex event processing patterns like aggregations, session tracking, and multi-stream correlations.

117

118

```python { .api }

119

def stateful_map(step_id: str, up: KeyedStream[V], mapper: Callable[[Optional[S], V], Tuple[Optional[S], W]]) -> KeyedStream[W]: ...

120

def stateful_flat_map(step_id: str, up: KeyedStream[V], mapper: Callable[[Optional[S], V], Tuple[Optional[S], Iterable[W]]]) -> KeyedStream[W]: ...

121

def reduce_final(step_id: str, up: KeyedStream[V], reducer: Callable[[V, V], V]) -> KeyedStream[V]: ...

122

def fold_final(step_id: str, up: KeyedStream[V], builder: Callable[[], S], folder: Callable[[S, V], S]) -> KeyedStream[S]: ...

123

def join(step_id: str, *sides: KeyedStream[Any], insert_mode: JoinInsertMode = "last", emit_mode: JoinEmitMode = "complete") -> KeyedStream[Tuple]: ...

124

def collect(step_id: str, up: KeyedStream[V], timeout: timedelta, max_size: int) -> KeyedStream[List[V]]: ...

125

126

class StatefulLogic[V, W, S]:

127

def on_item(self, value: V) -> Tuple[Iterable[W], bool]: ...

128

def on_notify(self) -> Tuple[Iterable[W], bool]: ...

129

def on_eof(self) -> Tuple[Iterable[W], bool]: ...

130

def notify_at(self) -> Optional[datetime]: ...

131

def snapshot(self) -> S: ...

132

133

JoinInsertMode = Literal["first", "last", "product"]

134

JoinEmitMode = Literal["complete", "final", "running"]

135

```

136

137

[Stateful Processing](./stateful.md)

138

139

### Windowing Operations

140

141

Time-based windowing operators for processing streams in temporal buckets. Supports tumbling, sliding, and session windows with various aggregation functions for real-time analytics and temporal pattern detection.

142

143

```python { .api }

144

def collect_window(step_id: str, up: KeyedStream[V], clock: Clock, windower: Windower) -> KeyedStream[List[V]]: ...

145

def fold_window(step_id: str, up: KeyedStream[V], clock: Clock, windower: Windower, builder: Callable[[], S], folder: Callable[[S, V], S]) -> KeyedStream[S]: ...

146

def reduce_window(step_id: str, up: KeyedStream[V], clock: Clock, windower: Windower, reducer: Callable[[V, V], V]) -> KeyedStream[V]: ...

147

def count_window(step_id: str, up: KeyedStream[V], clock: Clock, windower: Windower, key: Callable[[V], str]) -> KeyedStream[int]: ...

148

def max_window(step_id: str, up: KeyedStream[V], clock: Clock, windower: Windower, by: Callable[[V], Any] = _identity) -> KeyedStream[V]: ...

149

def min_window(step_id: str, up: KeyedStream[V], clock: Clock, windower: Windower, by: Callable[[V], Any] = _identity) -> KeyedStream[V]: ...

150

151

class SystemClock:

152

def __init__(self, ts_getter: Callable[[V], datetime], wait_for_system_duration: timedelta = timedelta(0)): ...

153

154

class EventClock[V]:

155

def __init__(self, ts_getter: Callable[[V], datetime], wait_for_system_duration: timedelta = timedelta(0)): ...

156

157

class TumblingWindower:

158

def __init__(self, length: timedelta): ...

159

160

class SlidingWindower:

161

def __init__(self, length: timedelta, offset: timedelta): ...

162

163

class SessionWindower:

164

def __init__(self, gap: timedelta): ...

165

166

class WindowMetadata:

167

open_time: datetime

168

close_time: datetime

169

```

170

171

[Windowing Operations](./windowing.md)

172

173

### Input Sources

174

175

Interfaces and implementations for reading data from external systems. Includes abstract base classes for building custom sources and built-in sources for common data systems.

176

177

```python { .api }

178

class Source[X]: ...

179

class StatefulSourcePartition[X, S]: ...

180

class StatelessSourcePartition[X]: ...

181

class FixedPartitionedSource[X, S]: ...

182

class DynamicSource[X]: ...

183

```

184

185

[Input Sources](./sources.md)

186

187

### Output Sinks

188

189

Interfaces and implementations for writing data to external systems. Includes abstract base classes for building custom sinks and patterns for exactly-once output delivery.

190

191

```python { .api }

192

class Sink[X]: ...

193

class StatefulSinkPartition[X, S]: ...

194

class StatelessSinkPartition[X]: ...

195

class FixedPartitionedSink[X, S]: ...

196

class DynamicSink[X]: ...

197

```

198

199

[Output Sinks](./sinks.md)

200

201

### Built-in Connectors

202

203

Pre-built connectors for common external systems including Kafka, files, stdio, and demo sources. These connectors provide production-ready integration with popular data systems.

204

205

```python { .api }

206

# Kafka connectors

207

class KafkaSource:

208

def __init__(self, brokers: List[str], topics: List[str], **kwargs): ...

209

210

class KafkaSink:

211

def __init__(self, brokers: List[str], topic: str, **kwargs): ...

212

213

class KafkaSourceMessage[K, V]:

214

key: K

215

value: V

216

timestamp: datetime

217

partition: int

218

offset: int

219

220

class KafkaSinkMessage[K, V]:

221

key: K

222

value: V

223

224

# File connectors

225

class FileSource:

226

def __init__(self, path: str, **kwargs): ...

227

228

class FileSink:

229

def __init__(self, path: str, **kwargs): ...

230

231

class CSVSource:

232

def __init__(self, path: str, **kwargs): ...

233

234

class DirSource:

235

def __init__(self, dir_path: str, **kwargs): ...

236

237

class DirSink:

238

def __init__(self, dir_path: str, **kwargs): ...

239

240

# Stdio connectors

241

class StdOutSink: ...

242

class StdInSource: ...

243

244

# Demo connectors

245

class RandomMetricSource:

246

def __init__(self, **kwargs): ...

247

```

248

249

[Built-in Connectors](./connectors.md)

250

251

### Runtime and Execution

252

253

Functions and classes for executing dataflows in various environments, from single-threaded testing to distributed production clusters. Includes configuration for worker processes, recovery, and distributed coordination.

254

255

```python { .api }

256

def cli_main(flow: Dataflow, workers_per_process: int = 1, process_id: Optional[int] = None, addresses: Optional[List[str]] = None, epoch_interval: Optional[timedelta] = None, recovery_config: Optional[RecoveryConfig] = None): ...

257

258

def run_main(flow: Dataflow, epoch_interval: Optional[timedelta] = None, recovery_config: Optional[RecoveryConfig] = None): ...

259

260

def cluster_main(flow: Dataflow, addresses: List[str], proc_id: int, epoch_interval: Optional[timedelta] = None, recovery_config: Optional[RecoveryConfig] = None, worker_count_per_proc: int = 1): ...

261

```

262

263

[Runtime and Execution](./runtime.md)

264

265

### Testing Utilities

266

267

Tools and utilities for testing dataflows including test sources, sinks, and execution helpers. Provides deterministic testing environments and data capture utilities for validating stream processing logic.

268

269

```python { .api }

270

class TestingSource[X]:

271

def __init__(self, data: Iterable[X]): ...

272

class EOF: ...

273

class ABORT: ...

274

class PAUSE:

275

def __init__(self, duration: timedelta): ...

276

277

class TestingSink[X]:

278

def __init__(self): ...

279

def get_output(self) -> List[X]: ...

280

281

class TimeTestingGetter:

282

def __init__(self, now: datetime): ...

283

def advance(self, td: timedelta) -> None: ...

284

def now(self) -> datetime: ...

285

286

def run_main(flow: Dataflow, epoch_interval: Optional[timedelta] = None, recovery_config: Optional[RecoveryConfig] = None) -> None: ...

287

def cluster_main(flow: Dataflow, addresses: List[str], proc_id: int, epoch_interval: Optional[timedelta] = None, recovery_config: Optional[RecoveryConfig] = None, worker_count_per_proc: int = 1) -> None: ...

288

def poll_next_batch(partition, timeout: timedelta) -> List[Any]: ...

289

```

290

291

[Testing Utilities](./testing.md)

292

293

### State Recovery

294

295

Recovery mechanisms for fault tolerance including state snapshotting, partition management, and resume capabilities. Enables exactly-once processing guarantees in distributed environments.

296

297

```python { .api }

298

class RecoveryConfig:

299

def __init__(self, db_dir: Path, backup_interval: Optional[timedelta] = None): ...

300

301

def init_db_dir(db_dir: Path, count: int): ...

302

303

class InconsistentPartitionsError(ValueError): ...

304

class MissingPartitionsError(FileNotFoundError): ...

305

class NoPartitionsError(FileNotFoundError): ...

306

```

307

308

[State Recovery](./recovery.md)

309

310

### Tracing and Monitoring

311

312

Logging, tracing, and monitoring capabilities for observing dataflow execution. Supports integration with Jaeger, OpenTelemetry, and other observability platforms for production monitoring.

313

314

```python { .api }

315

class TracingConfig: ...

316

317

class JaegerConfig(TracingConfig):

318

def __init__(self, service_name: str, endpoint: Optional[str] = None, sampling_ratio: float = 1.0): ...

319

320

class OtlpTracingConfig(TracingConfig):

321

def __init__(self, service_name: str, url: Optional[str] = None, sampling_ratio: float = 1.0): ...

322

323

def setup_tracing(tracing_config: Optional[TracingConfig] = None, log_level: Optional[str] = None): ...

324

```

325

326

[Tracing and Monitoring](./tracing.md)