0
# Input Sources
1
2
Interfaces and implementations for reading data from external systems. Sources provide the entry point for data into Bytewax dataflows, with support for various input patterns including polling, streaming, and batch processing.
3
4
## Capabilities
5
6
### Source Base Classes
7
8
Abstract base classes that define the source interface for different input patterns.
9
10
```python { .api }
11
class Source[X]: ...
12
13
class StatefulSourcePartition[X, S]:
14
def next_batch(self) -> Iterable[X]: ...
15
def next_awake(self) -> Optional[datetime]: ...
16
def snapshot(self) -> S: ...
17
def close(self) -> None: ...
18
19
class StatelessSourcePartition[X]:
20
def next_batch(self) -> Iterable[X]: ...
21
def next_awake(self) -> Optional[datetime]: ...
22
def close(self) -> None: ...
23
```
24
25
**StatefulSourcePartition Methods:**
26
- `next_batch()`: Return next batch of items (empty if none available)
27
- `next_awake()`: Return when to next call next_batch (None for immediate)
28
- `snapshot()`: Return state for recovery (must be pickle-able)
29
- `close()`: Clean up resources when dataflow completes
30
31
**StatelessSourcePartition Methods:**
32
- `next_batch()`: Return next batch of items
33
- `next_awake()`: Return when to next call next_batch
34
- `close()`: Clean up resources
35
36
### Source Implementations
37
38
Concrete source types for different partitioning strategies.
39
40
```python { .api }
41
class FixedPartitionedSource[X, S](Source[X]):
42
def list_parts(self) -> List[str]: ...
43
def build_part(self, step_id: str, for_part: str, resume_state: Optional[S]) -> StatefulSourcePartition[X, S]: ...
44
45
class DynamicSource[X](Source[X]):
46
def build(self, step_id: str, worker_index: int, worker_count: int) -> StatelessSourcePartition[X]: ...
47
48
class SimplePollingSource[X](DynamicSource[X]):
49
def __init__(self, getter: Callable[[], Iterable[X]], interval: timedelta = timedelta(seconds=1)): ...
50
```
51
52
**FixedPartitionedSource Methods:**
53
- `list_parts()`: Return list of partition identifiers
54
- `build_part()`: Create partition handler for specific partition
55
56
**DynamicSource Methods:**
57
- `build()`: Create source partition for specific worker
58
59
**SimplePollingSource Parameters:**
60
- `getter` (Callable): Function that returns iterable of items
61
- `interval` (timedelta): Polling interval
62
63
**Usage Examples:**
64
```python
65
from datetime import timedelta
66
67
# Simple polling source
68
def fetch_data():
69
# Poll external API or database
70
return api_client.get_latest_events()
71
72
polling_source = SimplePollingSource(fetch_data, interval=timedelta(seconds=5))
73
74
# Custom stateful source
75
class DatabaseSource(FixedPartitionedSource):
76
def __init__(self, connection_string, table_name):
77
self.connection_string = connection_string
78
self.table_name = table_name
79
80
def list_parts(self):
81
# Return partition identifiers (e.g., database shards)
82
return ["shard_0", "shard_1", "shard_2"]
83
84
def build_part(self, step_id, for_part, resume_state):
85
return DatabasePartition(self.connection_string, self.table_name, for_part, resume_state)
86
```
87
88
### Helper Functions
89
90
Utility functions for working with sources and batching data.
91
92
```python { .api }
93
def batch(it: Iterable[X], size: int) -> Iterator[List[X]]: ...
94
95
def batch_async(ait: AsyncIterable[X], size: int) -> AsyncIterator[List[X]]: ...
96
97
def batch_getter(getter: Callable[[], Iterable[X]], size: int) -> Callable[[], List[X]]: ...
98
99
def batch_getter_ex(getter: Callable[[], Iterable[X]], size: int) -> Callable[[], List[X]]: ...
100
```
101
102
**batch Parameters:**
103
- `it` (Iterable[X]): Input iterable to batch
104
- `size` (int): Maximum batch size
105
106
**batch_getter Parameters:**
107
- `getter` (Callable): Function returning iterable
108
- `size` (int): Maximum batch size
109
110
**Usage Examples:**
111
```python
112
# Batch an iterable
113
for batch in batch([1, 2, 3, 4, 5], 2):
114
print(batch) # [1, 2], [3, 4], [5]
115
116
# Batch async iterable
117
async for batch in batch_async(async_generator(), 10):
118
process_batch(batch)
119
120
# Batch a getter function
121
batched_getter = batch_getter(lambda: fetch_items(), 100)
122
```
123
124
### Exception Handling
125
126
Exceptions that can be raised by sources for testing and error handling.
127
128
```python { .api }
129
class AbortExecution(RuntimeError):
130
"""Raise this from next_batch to abort for testing purposes."""
131
...
132
```
133
134
**Usage Example:**
135
```python
136
class TestSource(StatelessSourcePartition):
137
def __init__(self, data, fail_after=None):
138
self.data = list(data)
139
self.index = 0
140
self.fail_after = fail_after
141
142
def next_batch(self):
143
if self.fail_after and self.index >= self.fail_after:
144
raise AbortExecution("Test failure")
145
146
if self.index < len(self.data):
147
item = self.data[self.index]
148
self.index += 1
149
return [item]
150
return []
151
```
152
153
### Source Implementation Patterns
154
155
**Polling Pattern:**
156
```python
157
class APIPollingSource(DynamicSource):
158
def __init__(self, api_endpoint, poll_interval=timedelta(seconds=10)):
159
self.api_endpoint = api_endpoint
160
self.poll_interval = poll_interval
161
162
def build(self, step_id, worker_index, worker_count):
163
return APIPartition(self.api_endpoint, self.poll_interval)
164
165
class APIPartition(StatelessSourcePartition):
166
def __init__(self, endpoint, interval):
167
self.endpoint = endpoint
168
self.interval = interval
169
self.last_fetch = None
170
171
def next_batch(self):
172
# Fetch data from API
173
response = requests.get(self.endpoint)
174
if response.ok:
175
return response.json().get('items', [])
176
return []
177
178
def next_awake(self):
179
if self.last_fetch:
180
return self.last_fetch + self.interval
181
return None
182
```
183
184
**File Reading Pattern:**
185
```python
186
class FileSource(FixedPartitionedSource):
187
def __init__(self, file_paths):
188
self.file_paths = file_paths
189
190
def list_parts(self):
191
return [str(i) for i in range(len(self.file_paths))]
192
193
def build_part(self, step_id, for_part, resume_state):
194
file_index = int(for_part)
195
file_path = self.file_paths[file_index]
196
return FilePartition(file_path, resume_state)
197
198
class FilePartition(StatefulSourcePartition):
199
def __init__(self, file_path, resume_state):
200
self.file_path = file_path
201
self.position = resume_state or 0
202
self.file_handle = None
203
204
def next_batch(self):
205
if not self.file_handle:
206
self.file_handle = open(self.file_path, 'r')
207
self.file_handle.seek(self.position)
208
209
lines = []
210
for _ in range(100): # Read up to 100 lines per batch
211
line = self.file_handle.readline()
212
if not line: # End of file
213
raise StopIteration
214
lines.append(line.strip())
215
self.position = self.file_handle.tell()
216
217
return lines
218
219
def snapshot(self):
220
return self.position
221
222
def close(self):
223
if self.file_handle:
224
self.file_handle.close()
225
```
226
227
**Queue-based Pattern:**
228
```python
229
import queue
230
from threading import Thread
231
232
class QueueSource(DynamicSource):
233
def __init__(self, queue_size=1000):
234
self.queue = queue.Queue(maxsize=queue_size)
235
self.producer_thread = None
236
237
def build(self, step_id, worker_index, worker_count):
238
if not self.producer_thread:
239
self.producer_thread = Thread(target=self._producer)
240
self.producer_thread.start()
241
return QueuePartition(self.queue)
242
243
def _producer(self):
244
# Background thread that feeds the queue
245
while True:
246
data = external_data_source.fetch()
247
try:
248
self.queue.put(data, timeout=1)
249
except queue.Full:
250
pass # Drop data if queue is full
251
252
class QueuePartition(StatelessSourcePartition):
253
def __init__(self, queue):
254
self.queue = queue
255
256
def next_batch(self):
257
items = []
258
try:
259
# Get up to 10 items without blocking
260
for _ in range(10):
261
item = self.queue.get_nowait()
262
items.append(item)
263
except queue.Empty:
264
pass
265
return items
266
267
def next_awake(self):
268
# Check queue again in 100ms if empty
269
return datetime.now() + timedelta(milliseconds=100) if not items else None
270
```