or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

connectors.mddataflow.mdindex.mdoperators.mdrecovery.mdruntime.mdsinks.mdsources.mdstateful.mdtesting.mdtracing.mdwindowing.md

connectors.mddocs/

0

# Built-in Connectors

1

2

Pre-built connectors for common external systems including Kafka, files, stdio, and demo sources. These connectors provide production-ready integration with popular data systems and serve as examples for building custom connectors.

3

4

## Capabilities

5

6

### Kafka Connectors

7

8

High-performance Kafka integration with built-in serialization/deserialization support.

9

10

```python { .api }

11

# Kafka operators (from bytewax.connectors.kafka.operators)

12

def input(step_id: str, flow: Dataflow, brokers: List[str], topics: List[str], starting_offset: str = "stored", consumer_configs: Optional[Dict[str, Any]] = None, batch_size: int = 1000) -> KafkaSourceMessage: ...

13

14

def output(step_id: str, up: Stream[KafkaSinkMessage], brokers: List[str], topic: str, producer_configs: Optional[Dict[str, Any]] = None) -> None: ...

15

16

# Message types

17

class KafkaSourceMessage:

18

key: Optional[bytes]

19

value: bytes

20

topic: str

21

partition: int

22

offset: int

23

timestamp: Optional[datetime]

24

25

class KafkaSinkMessage:

26

def __init__(self, key: Optional[bytes], value: bytes): ...

27

key: Optional[bytes]

28

value: bytes

29

```

30

31

**Kafka Input Parameters:**

32

- `step_id` (str): Unique identifier

33

- `flow` (Dataflow): Target dataflow

34

- `brokers` (List[str]): Kafka broker addresses

35

- `topics` (List[str]): Topics to consume from

36

- `starting_offset` (str): Where to start consuming ("earliest", "latest", "stored")

37

- `consumer_configs` (Dict): Additional Kafka consumer configuration

38

- `batch_size` (int): Maximum messages per batch

39

40

**Kafka Output Parameters:**

41

- `step_id` (str): Unique identifier

42

- `up` (Stream[KafkaSinkMessage]): Stream of messages to send

43

- `brokers` (List[str]): Kafka broker addresses

44

- `topic` (str): Topic to produce to

45

- `producer_configs` (Dict): Additional Kafka producer configuration

46

47

**Usage Examples:**

48

```python

49

from bytewax.connectors.kafka import operators as kop

50

import bytewax.operators as op

51

import json

52

53

# Kafka input with deserialization

54

kafka_stream = kop.input(

55

"kafka_input",

56

flow,

57

brokers=["localhost:9092"],

58

topics=["events", "metrics"],

59

starting_offset="latest"

60

)

61

62

# Access the successful messages and errors separately

63

messages = kafka_stream.oks

64

errors = kafka_stream.errs

65

66

# Process messages

67

def deserialize_json(msg):

68

try:

69

return json.loads(msg.value.decode('utf-8'))

70

except json.JSONDecodeError:

71

return None

72

73

events = op.filter_map("deserialize", messages, deserialize_json)

74

75

# Kafka output with serialization

76

def create_kafka_message(event):

77

key = event.get("user_id", "").encode('utf-8') if event.get("user_id") else None

78

value = json.dumps(event).encode('utf-8')

79

return KafkaSinkMessage(key, value)

80

81

kafka_messages = op.map("serialize", processed_events, create_kafka_message)

82

83

kop.output(

84

"kafka_output",

85

kafka_messages,

86

brokers=["localhost:9092"],

87

topic="processed_events"

88

)

89

```

90

91

### Kafka Serialization/Deserialization

92

93

Built-in serializers and deserializers for common data formats.

94

95

```python { .api }

96

# From bytewax.connectors.kafka.serde

97

class BytesDeserializer: ...

98

class JsonDeserializer: ...

99

class AvroDeserializer:

100

def __init__(self, schema_registry_url: str, schema_id: Optional[int] = None): ...

101

102

class BytesSerializer: ...

103

class JsonSerializer: ...

104

class AvroSerializer:

105

def __init__(self, schema_registry_url: str, schema: str): ...

106

```

107

108

**Usage Example:**

109

```python

110

from bytewax.connectors.kafka.serde import JsonDeserializer, JsonSerializer

111

112

# Using deserializer

113

json_deserializer = JsonDeserializer()

114

deserialized = op.map("deserialize", kafka_messages, lambda msg: json_deserializer(msg.value))

115

116

# Using serializer

117

json_serializer = JsonSerializer()

118

serialized = op.map("serialize", events, lambda event: json_serializer(event))

119

```

120

121

### File Connectors

122

123

File system integration for reading from and writing to files with various formats.

124

125

```python { .api }

126

class FileSource:

127

def __init__(self, paths: List[str], batch_size: int = 1000, encoding: str = "utf-8"): ...

128

129

class FileSink:

130

def __init__(self, path: str, encoding: str = "utf-8", append: bool = False): ...

131

132

class DirSource:

133

def __init__(self, paths: List[str], pattern: str = "*", recursive: bool = False): ...

134

135

class CSVSource:

136

def __init__(self, paths: List[str], **csv_kwargs): ...

137

```

138

139

**FileSource Parameters:**

140

- `paths` (List[str]): File paths to read from

141

- `batch_size` (int): Lines per batch

142

- `encoding` (str): File encoding

143

144

**FileSink Parameters:**

145

- `path` (str): Output file path

146

- `encoding` (str): File encoding

147

- `append` (bool): Append to existing file

148

149

**Usage Examples:**

150

```python

151

from bytewax.connectors.files import FileSource, FileSink, CSVSource

152

153

# Read from multiple files

154

file_stream = op.input("files", flow, FileSource([

155

"/data/logs/app.log",

156

"/data/logs/error.log"

157

], batch_size=500))

158

159

# Write to file

160

op.output("file_output", processed_data, FileSink("/output/results.jsonl"))

161

162

# Read CSV files

163

csv_stream = op.input("csv", flow, CSVSource([

164

"/data/sales.csv",

165

"/data/inventory.csv"

166

], delimiter=',', skip_header=True))

167

```

168

169

### Standard I/O Connectors

170

171

Simple connectors for stdin/stdout, useful for command-line tools and debugging.

172

173

```python { .api }

174

class StdInSource: ...

175

176

class StdOutSink: ...

177

```

178

179

**Usage Examples:**

180

```python

181

from bytewax.connectors.stdio import StdInSource, StdOutSink

182

183

# Read from stdin

184

stdin_stream = op.input("stdin", flow, StdInSource())

185

186

# Write to stdout (useful for debugging)

187

op.output("stdout", results, StdOutSink())

188

189

# Pipeline example: process stdin to stdout

190

flow = Dataflow("stdin_processor")

191

lines = op.input("stdin", flow, StdInSource())

192

processed = op.map("process", lines, lambda line: line.upper().strip())

193

op.output("stdout", processed, StdOutSink())

194

```

195

196

### Demo and Testing Connectors

197

198

Connectors for generating demo data and testing scenarios.

199

200

```python { .api }

201

class DemoSource:

202

def __init__(self, data: Iterable[Any], interval: timedelta = timedelta(seconds=1)): ...

203

204

class RandomMetricSource:

205

def __init__(self, metric_names: List[str], interval: timedelta = timedelta(seconds=1), min_value: float = 0.0, max_value: float = 100.0): ...

206

```

207

208

**DemoSource Parameters:**

209

- `data` (Iterable): Data to emit cyclically

210

- `interval` (timedelta): Time between emissions

211

212

**RandomMetricSource Parameters:**

213

- `metric_names` (List[str]): Names of metrics to generate

214

- `interval` (timedelta): Generation interval

215

- `min_value`, `max_value` (float): Value range

216

217

**Usage Examples:**

218

```python

219

from bytewax.connectors.demo import DemoSource, RandomMetricSource

220

from datetime import timedelta

221

222

# Cycle through demo data

223

demo_data = [

224

{"user": "alice", "action": "login"},

225

{"user": "bob", "action": "purchase"},

226

{"user": "alice", "action": "logout"}

227

]

228

229

demo_stream = op.input("demo", flow, DemoSource(demo_data, interval=timedelta(seconds=2)))

230

231

# Generate random metrics

232

metrics_stream = op.input("metrics", flow, RandomMetricSource(

233

metric_names=["cpu_usage", "memory_usage", "disk_io"],

234

interval=timedelta(milliseconds=500),

235

min_value=0.0,

236

max_value=100.0

237

))

238

```

239

240

### Custom Connector Patterns

241

242

**HTTP API Connector:**

243

```python

244

class HTTPSource(DynamicSource):

245

def __init__(self, url, headers=None, poll_interval=timedelta(seconds=10)):

246

self.url = url

247

self.headers = headers or {}

248

self.poll_interval = poll_interval

249

250

def build(self, step_id, worker_index, worker_count):

251

return HTTPPartition(self.url, self.headers, self.poll_interval)

252

253

class HTTPPartition(StatelessSourcePartition):

254

def __init__(self, url, headers, poll_interval):

255

self.url = url

256

self.headers = headers

257

self.poll_interval = poll_interval

258

self.last_poll = None

259

260

def next_batch(self):

261

response = requests.get(self.url, headers=self.headers)

262

if response.ok:

263

data = response.json()

264

return data.get('items', [])

265

return []

266

267

def next_awake(self):

268

now = datetime.now()

269

if self.last_poll:

270

return self.last_poll + self.poll_interval

271

return now

272

```

273

274

**WebSocket Connector:**

275

```python

276

import websocket

277

import json

278

from queue import Queue, Empty

279

280

class WebSocketSource(DynamicSource):

281

def __init__(self, url):

282

self.url = url

283

self.message_queue = Queue()

284

self.ws = None

285

286

def build(self, step_id, worker_index, worker_count):

287

if not self.ws:

288

self.ws = websocket.WebSocketApp(

289

self.url,

290

on_message=self._on_message,

291

on_error=self._on_error

292

)

293

# Start WebSocket in background thread

294

import threading

295

ws_thread = threading.Thread(target=self.ws.run_forever)

296

ws_thread.daemon = True

297

ws_thread.start()

298

299

return WebSocketPartition(self.message_queue)

300

301

def _on_message(self, ws, message):

302

try:

303

data = json.loads(message)

304

self.message_queue.put(data)

305

except json.JSONDecodeError:

306

pass

307

308

def _on_error(self, ws, error):

309

print(f"WebSocket error: {error}")

310

311

class WebSocketPartition(StatelessSourcePartition):

312

def __init__(self, message_queue):

313

self.message_queue = message_queue

314

315

def next_batch(self):

316

messages = []

317

try:

318

# Get up to 100 messages without blocking

319

for _ in range(100):

320

message = self.message_queue.get_nowait()

321

messages.append(message)

322

except Empty:

323

pass

324

return messages

325

326

def next_awake(self):

327

# Check again in 100ms if no messages

328

return datetime.now() + timedelta(milliseconds=100)

329

```

330

331

**Database Streaming Connector:**

332

```python

333

class DatabaseStreamSource(FixedPartitionedSource):

334

def __init__(self, connection_string, query, partition_column="id"):

335

self.connection_string = connection_string

336

self.query = query

337

self.partition_column = partition_column

338

339

def list_parts(self):

340

# Could partition by date, hash, etc.

341

return ["partition_0", "partition_1", "partition_2", "partition_3"]

342

343

def build_part(self, step_id, for_part, resume_state):

344

partition_id = int(for_part.split("_")[1])

345

return DatabasePartition(

346

self.connection_string,

347

self.query,

348

self.partition_column,

349

partition_id,

350

4, # total partitions

351

resume_state

352

)

353

354

class DatabasePartition(StatefulSourcePartition):

355

def __init__(self, connection_string, query, partition_column, partition_id, total_partitions, resume_state):

356

self.connection_string = connection_string

357

self.base_query = query

358

self.partition_column = partition_column

359

self.partition_id = partition_id

360

self.total_partitions = total_partitions

361

self.last_id = resume_state or 0

362

self.connection = None

363

364

def next_batch(self):

365

if not self.connection:

366

self.connection = psycopg2.connect(self.connection_string)

367

368

# Add partitioning and resume logic to query

369

query = f"""

370

{self.base_query}

371

WHERE {self.partition_column} > %s

372

AND {self.partition_column} %% %s = %s

373

ORDER BY {self.partition_column}

374

LIMIT 1000

375

"""

376

377

cursor = self.connection.cursor()

378

cursor.execute(query, (self.last_id, self.total_partitions, self.partition_id))

379

380

rows = cursor.fetchall()

381

if rows:

382

# Update last seen ID

383

self.last_id = max(row[0] for row in rows) # Assume first column is ID

384

return [dict(zip([desc[0] for desc in cursor.description], row)) for row in rows]

385

386

return []

387

388

def snapshot(self):

389

return self.last_id

390

391

def close(self):

392

if self.connection:

393

self.connection.close()

394

```