0
# Built-in Connectors
1
2
Pre-built connectors for common external systems including Kafka, files, stdio, and demo sources. These connectors provide production-ready integration with popular data systems and serve as examples for building custom connectors.
3
4
## Capabilities
5
6
### Kafka Connectors
7
8
High-performance Kafka integration with built-in serialization/deserialization support.
9
10
```python { .api }
11
# Kafka operators (from bytewax.connectors.kafka.operators)
12
def input(step_id: str, flow: Dataflow, brokers: List[str], topics: List[str], starting_offset: str = "stored", consumer_configs: Optional[Dict[str, Any]] = None, batch_size: int = 1000) -> KafkaSourceMessage: ...
13
14
def output(step_id: str, up: Stream[KafkaSinkMessage], brokers: List[str], topic: str, producer_configs: Optional[Dict[str, Any]] = None) -> None: ...
15
16
# Message types
17
class KafkaSourceMessage:
18
key: Optional[bytes]
19
value: bytes
20
topic: str
21
partition: int
22
offset: int
23
timestamp: Optional[datetime]
24
25
class KafkaSinkMessage:
26
def __init__(self, key: Optional[bytes], value: bytes): ...
27
key: Optional[bytes]
28
value: bytes
29
```
30
31
**Kafka Input Parameters:**
32
- `step_id` (str): Unique identifier
33
- `flow` (Dataflow): Target dataflow
34
- `brokers` (List[str]): Kafka broker addresses
35
- `topics` (List[str]): Topics to consume from
36
- `starting_offset` (str): Where to start consuming ("earliest", "latest", "stored")
37
- `consumer_configs` (Dict): Additional Kafka consumer configuration
38
- `batch_size` (int): Maximum messages per batch
39
40
**Kafka Output Parameters:**
41
- `step_id` (str): Unique identifier
42
- `up` (Stream[KafkaSinkMessage]): Stream of messages to send
43
- `brokers` (List[str]): Kafka broker addresses
44
- `topic` (str): Topic to produce to
45
- `producer_configs` (Dict): Additional Kafka producer configuration
46
47
**Usage Examples:**
48
```python
49
from bytewax.connectors.kafka import operators as kop
50
import bytewax.operators as op
51
import json
52
53
# Kafka input with deserialization
54
kafka_stream = kop.input(
55
"kafka_input",
56
flow,
57
brokers=["localhost:9092"],
58
topics=["events", "metrics"],
59
starting_offset="latest"
60
)
61
62
# Access the successful messages and errors separately
63
messages = kafka_stream.oks
64
errors = kafka_stream.errs
65
66
# Process messages
67
def deserialize_json(msg):
68
try:
69
return json.loads(msg.value.decode('utf-8'))
70
except json.JSONDecodeError:
71
return None
72
73
events = op.filter_map("deserialize", messages, deserialize_json)
74
75
# Kafka output with serialization
76
def create_kafka_message(event):
77
key = event.get("user_id", "").encode('utf-8') if event.get("user_id") else None
78
value = json.dumps(event).encode('utf-8')
79
return KafkaSinkMessage(key, value)
80
81
kafka_messages = op.map("serialize", processed_events, create_kafka_message)
82
83
kop.output(
84
"kafka_output",
85
kafka_messages,
86
brokers=["localhost:9092"],
87
topic="processed_events"
88
)
89
```
90
91
### Kafka Serialization/Deserialization
92
93
Built-in serializers and deserializers for common data formats.
94
95
```python { .api }
96
# From bytewax.connectors.kafka.serde
97
class BytesDeserializer: ...
98
class JsonDeserializer: ...
99
class AvroDeserializer:
100
def __init__(self, schema_registry_url: str, schema_id: Optional[int] = None): ...
101
102
class BytesSerializer: ...
103
class JsonSerializer: ...
104
class AvroSerializer:
105
def __init__(self, schema_registry_url: str, schema: str): ...
106
```
107
108
**Usage Example:**
109
```python
110
from bytewax.connectors.kafka.serde import JsonDeserializer, JsonSerializer
111
112
# Using deserializer
113
json_deserializer = JsonDeserializer()
114
deserialized = op.map("deserialize", kafka_messages, lambda msg: json_deserializer(msg.value))
115
116
# Using serializer
117
json_serializer = JsonSerializer()
118
serialized = op.map("serialize", events, lambda event: json_serializer(event))
119
```
120
121
### File Connectors
122
123
File system integration for reading from and writing to files with various formats.
124
125
```python { .api }
126
class FileSource:
127
def __init__(self, paths: List[str], batch_size: int = 1000, encoding: str = "utf-8"): ...
128
129
class FileSink:
130
def __init__(self, path: str, encoding: str = "utf-8", append: bool = False): ...
131
132
class DirSource:
133
def __init__(self, paths: List[str], pattern: str = "*", recursive: bool = False): ...
134
135
class CSVSource:
136
def __init__(self, paths: List[str], **csv_kwargs): ...
137
```
138
139
**FileSource Parameters:**
140
- `paths` (List[str]): File paths to read from
141
- `batch_size` (int): Lines per batch
142
- `encoding` (str): File encoding
143
144
**FileSink Parameters:**
145
- `path` (str): Output file path
146
- `encoding` (str): File encoding
147
- `append` (bool): Append to existing file
148
149
**Usage Examples:**
150
```python
151
from bytewax.connectors.files import FileSource, FileSink, CSVSource
152
153
# Read from multiple files
154
file_stream = op.input("files", flow, FileSource([
155
"/data/logs/app.log",
156
"/data/logs/error.log"
157
], batch_size=500))
158
159
# Write to file
160
op.output("file_output", processed_data, FileSink("/output/results.jsonl"))
161
162
# Read CSV files
163
csv_stream = op.input("csv", flow, CSVSource([
164
"/data/sales.csv",
165
"/data/inventory.csv"
166
], delimiter=',', skip_header=True))
167
```
168
169
### Standard I/O Connectors
170
171
Simple connectors for stdin/stdout, useful for command-line tools and debugging.
172
173
```python { .api }
174
class StdInSource: ...
175
176
class StdOutSink: ...
177
```
178
179
**Usage Examples:**
180
```python
181
from bytewax.connectors.stdio import StdInSource, StdOutSink
182
183
# Read from stdin
184
stdin_stream = op.input("stdin", flow, StdInSource())
185
186
# Write to stdout (useful for debugging)
187
op.output("stdout", results, StdOutSink())
188
189
# Pipeline example: process stdin to stdout
190
flow = Dataflow("stdin_processor")
191
lines = op.input("stdin", flow, StdInSource())
192
processed = op.map("process", lines, lambda line: line.upper().strip())
193
op.output("stdout", processed, StdOutSink())
194
```
195
196
### Demo and Testing Connectors
197
198
Connectors for generating demo data and testing scenarios.
199
200
```python { .api }
201
class DemoSource:
202
def __init__(self, data: Iterable[Any], interval: timedelta = timedelta(seconds=1)): ...
203
204
class RandomMetricSource:
205
def __init__(self, metric_names: List[str], interval: timedelta = timedelta(seconds=1), min_value: float = 0.0, max_value: float = 100.0): ...
206
```
207
208
**DemoSource Parameters:**
209
- `data` (Iterable): Data to emit cyclically
210
- `interval` (timedelta): Time between emissions
211
212
**RandomMetricSource Parameters:**
213
- `metric_names` (List[str]): Names of metrics to generate
214
- `interval` (timedelta): Generation interval
215
- `min_value`, `max_value` (float): Value range
216
217
**Usage Examples:**
218
```python
219
from bytewax.connectors.demo import DemoSource, RandomMetricSource
220
from datetime import timedelta
221
222
# Cycle through demo data
223
demo_data = [
224
{"user": "alice", "action": "login"},
225
{"user": "bob", "action": "purchase"},
226
{"user": "alice", "action": "logout"}
227
]
228
229
demo_stream = op.input("demo", flow, DemoSource(demo_data, interval=timedelta(seconds=2)))
230
231
# Generate random metrics
232
metrics_stream = op.input("metrics", flow, RandomMetricSource(
233
metric_names=["cpu_usage", "memory_usage", "disk_io"],
234
interval=timedelta(milliseconds=500),
235
min_value=0.0,
236
max_value=100.0
237
))
238
```
239
240
### Custom Connector Patterns
241
242
**HTTP API Connector:**
243
```python
244
class HTTPSource(DynamicSource):
245
def __init__(self, url, headers=None, poll_interval=timedelta(seconds=10)):
246
self.url = url
247
self.headers = headers or {}
248
self.poll_interval = poll_interval
249
250
def build(self, step_id, worker_index, worker_count):
251
return HTTPPartition(self.url, self.headers, self.poll_interval)
252
253
class HTTPPartition(StatelessSourcePartition):
254
def __init__(self, url, headers, poll_interval):
255
self.url = url
256
self.headers = headers
257
self.poll_interval = poll_interval
258
self.last_poll = None
259
260
def next_batch(self):
261
response = requests.get(self.url, headers=self.headers)
262
if response.ok:
263
data = response.json()
264
return data.get('items', [])
265
return []
266
267
def next_awake(self):
268
now = datetime.now()
269
if self.last_poll:
270
return self.last_poll + self.poll_interval
271
return now
272
```
273
274
**WebSocket Connector:**
275
```python
276
import websocket
277
import json
278
from queue import Queue, Empty
279
280
class WebSocketSource(DynamicSource):
281
def __init__(self, url):
282
self.url = url
283
self.message_queue = Queue()
284
self.ws = None
285
286
def build(self, step_id, worker_index, worker_count):
287
if not self.ws:
288
self.ws = websocket.WebSocketApp(
289
self.url,
290
on_message=self._on_message,
291
on_error=self._on_error
292
)
293
# Start WebSocket in background thread
294
import threading
295
ws_thread = threading.Thread(target=self.ws.run_forever)
296
ws_thread.daemon = True
297
ws_thread.start()
298
299
return WebSocketPartition(self.message_queue)
300
301
def _on_message(self, ws, message):
302
try:
303
data = json.loads(message)
304
self.message_queue.put(data)
305
except json.JSONDecodeError:
306
pass
307
308
def _on_error(self, ws, error):
309
print(f"WebSocket error: {error}")
310
311
class WebSocketPartition(StatelessSourcePartition):
312
def __init__(self, message_queue):
313
self.message_queue = message_queue
314
315
def next_batch(self):
316
messages = []
317
try:
318
# Get up to 100 messages without blocking
319
for _ in range(100):
320
message = self.message_queue.get_nowait()
321
messages.append(message)
322
except Empty:
323
pass
324
return messages
325
326
def next_awake(self):
327
# Check again in 100ms if no messages
328
return datetime.now() + timedelta(milliseconds=100)
329
```
330
331
**Database Streaming Connector:**
332
```python
333
class DatabaseStreamSource(FixedPartitionedSource):
334
def __init__(self, connection_string, query, partition_column="id"):
335
self.connection_string = connection_string
336
self.query = query
337
self.partition_column = partition_column
338
339
def list_parts(self):
340
# Could partition by date, hash, etc.
341
return ["partition_0", "partition_1", "partition_2", "partition_3"]
342
343
def build_part(self, step_id, for_part, resume_state):
344
partition_id = int(for_part.split("_")[1])
345
return DatabasePartition(
346
self.connection_string,
347
self.query,
348
self.partition_column,
349
partition_id,
350
4, # total partitions
351
resume_state
352
)
353
354
class DatabasePartition(StatefulSourcePartition):
355
def __init__(self, connection_string, query, partition_column, partition_id, total_partitions, resume_state):
356
self.connection_string = connection_string
357
self.base_query = query
358
self.partition_column = partition_column
359
self.partition_id = partition_id
360
self.total_partitions = total_partitions
361
self.last_id = resume_state or 0
362
self.connection = None
363
364
def next_batch(self):
365
if not self.connection:
366
self.connection = psycopg2.connect(self.connection_string)
367
368
# Add partitioning and resume logic to query
369
query = f"""
370
{self.base_query}
371
WHERE {self.partition_column} > %s
372
AND {self.partition_column} %% %s = %s
373
ORDER BY {self.partition_column}
374
LIMIT 1000
375
"""
376
377
cursor = self.connection.cursor()
378
cursor.execute(query, (self.last_id, self.total_partitions, self.partition_id))
379
380
rows = cursor.fetchall()
381
if rows:
382
# Update last seen ID
383
self.last_id = max(row[0] for row in rows) # Assume first column is ID
384
return [dict(zip([desc[0] for desc in cursor.description], row)) for row in rows]
385
386
return []
387
388
def snapshot(self):
389
return self.last_id
390
391
def close(self):
392
if self.connection:
393
self.connection.close()
394
```