or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

connectors.mddataflow.mdindex.mdoperators.mdrecovery.mdruntime.mdsinks.mdsources.mdstateful.mdtesting.mdtracing.mdwindowing.md

sources.mddocs/

0

# Input Sources

1

2

Interfaces and implementations for reading data from external systems. Sources provide the entry point for data into Bytewax dataflows, with support for various input patterns including polling, streaming, and batch processing.

3

4

## Capabilities

5

6

### Source Base Classes

7

8

Abstract base classes that define the source interface for different input patterns.

9

10

```python { .api }

11

class Source[X]: ...

12

13

class StatefulSourcePartition[X, S]:

14

def next_batch(self) -> Iterable[X]: ...

15

def next_awake(self) -> Optional[datetime]: ...

16

def snapshot(self) -> S: ...

17

def close(self) -> None: ...

18

19

class StatelessSourcePartition[X]:

20

def next_batch(self) -> Iterable[X]: ...

21

def next_awake(self) -> Optional[datetime]: ...

22

def close(self) -> None: ...

23

```

24

25

**StatefulSourcePartition Methods:**

26

- `next_batch()`: Return next batch of items (empty if none available)

27

- `next_awake()`: Return when to next call next_batch (None for immediate)

28

- `snapshot()`: Return state for recovery (must be pickle-able)

29

- `close()`: Clean up resources when dataflow completes

30

31

**StatelessSourcePartition Methods:**

32

- `next_batch()`: Return next batch of items

33

- `next_awake()`: Return when to next call next_batch

34

- `close()`: Clean up resources

35

36

### Source Implementations

37

38

Concrete source types for different partitioning strategies.

39

40

```python { .api }

41

class FixedPartitionedSource[X, S](Source[X]):

42

def list_parts(self) -> List[str]: ...

43

def build_part(self, step_id: str, for_part: str, resume_state: Optional[S]) -> StatefulSourcePartition[X, S]: ...

44

45

class DynamicSource[X](Source[X]):

46

def build(self, step_id: str, worker_index: int, worker_count: int) -> StatelessSourcePartition[X]: ...

47

48

class SimplePollingSource[X](DynamicSource[X]):

49

def __init__(self, getter: Callable[[], Iterable[X]], interval: timedelta = timedelta(seconds=1)): ...

50

```

51

52

**FixedPartitionedSource Methods:**

53

- `list_parts()`: Return list of partition identifiers

54

- `build_part()`: Create partition handler for specific partition

55

56

**DynamicSource Methods:**

57

- `build()`: Create source partition for specific worker

58

59

**SimplePollingSource Parameters:**

60

- `getter` (Callable): Function that returns iterable of items

61

- `interval` (timedelta): Polling interval

62

63

**Usage Examples:**

64

```python

65

from datetime import timedelta

66

67

# Simple polling source

68

def fetch_data():

69

# Poll external API or database

70

return api_client.get_latest_events()

71

72

polling_source = SimplePollingSource(fetch_data, interval=timedelta(seconds=5))

73

74

# Custom stateful source

75

class DatabaseSource(FixedPartitionedSource):

76

def __init__(self, connection_string, table_name):

77

self.connection_string = connection_string

78

self.table_name = table_name

79

80

def list_parts(self):

81

# Return partition identifiers (e.g., database shards)

82

return ["shard_0", "shard_1", "shard_2"]

83

84

def build_part(self, step_id, for_part, resume_state):

85

return DatabasePartition(self.connection_string, self.table_name, for_part, resume_state)

86

```

87

88

### Helper Functions

89

90

Utility functions for working with sources and batching data.

91

92

```python { .api }

93

def batch(it: Iterable[X], size: int) -> Iterator[List[X]]: ...

94

95

def batch_async(ait: AsyncIterable[X], size: int) -> AsyncIterator[List[X]]: ...

96

97

def batch_getter(getter: Callable[[], Iterable[X]], size: int) -> Callable[[], List[X]]: ...

98

99

def batch_getter_ex(getter: Callable[[], Iterable[X]], size: int) -> Callable[[], List[X]]: ...

100

```

101

102

**batch Parameters:**

103

- `it` (Iterable[X]): Input iterable to batch

104

- `size` (int): Maximum batch size

105

106

**batch_getter Parameters:**

107

- `getter` (Callable): Function returning iterable

108

- `size` (int): Maximum batch size

109

110

**Usage Examples:**

111

```python

112

# Batch an iterable

113

for batch in batch([1, 2, 3, 4, 5], 2):

114

print(batch) # [1, 2], [3, 4], [5]

115

116

# Batch async iterable

117

async for batch in batch_async(async_generator(), 10):

118

process_batch(batch)

119

120

# Batch a getter function

121

batched_getter = batch_getter(lambda: fetch_items(), 100)

122

```

123

124

### Exception Handling

125

126

Exceptions that can be raised by sources for testing and error handling.

127

128

```python { .api }

129

class AbortExecution(RuntimeError):

130

"""Raise this from next_batch to abort for testing purposes."""

131

...

132

```

133

134

**Usage Example:**

135

```python

136

class TestSource(StatelessSourcePartition):

137

def __init__(self, data, fail_after=None):

138

self.data = list(data)

139

self.index = 0

140

self.fail_after = fail_after

141

142

def next_batch(self):

143

if self.fail_after and self.index >= self.fail_after:

144

raise AbortExecution("Test failure")

145

146

if self.index < len(self.data):

147

item = self.data[self.index]

148

self.index += 1

149

return [item]

150

return []

151

```

152

153

### Source Implementation Patterns

154

155

**Polling Pattern:**

156

```python

157

class APIPollingSource(DynamicSource):

158

def __init__(self, api_endpoint, poll_interval=timedelta(seconds=10)):

159

self.api_endpoint = api_endpoint

160

self.poll_interval = poll_interval

161

162

def build(self, step_id, worker_index, worker_count):

163

return APIPartition(self.api_endpoint, self.poll_interval)

164

165

class APIPartition(StatelessSourcePartition):

166

def __init__(self, endpoint, interval):

167

self.endpoint = endpoint

168

self.interval = interval

169

self.last_fetch = None

170

171

def next_batch(self):

172

# Fetch data from API

173

response = requests.get(self.endpoint)

174

if response.ok:

175

return response.json().get('items', [])

176

return []

177

178

def next_awake(self):

179

if self.last_fetch:

180

return self.last_fetch + self.interval

181

return None

182

```

183

184

**File Reading Pattern:**

185

```python

186

class FileSource(FixedPartitionedSource):

187

def __init__(self, file_paths):

188

self.file_paths = file_paths

189

190

def list_parts(self):

191

return [str(i) for i in range(len(self.file_paths))]

192

193

def build_part(self, step_id, for_part, resume_state):

194

file_index = int(for_part)

195

file_path = self.file_paths[file_index]

196

return FilePartition(file_path, resume_state)

197

198

class FilePartition(StatefulSourcePartition):

199

def __init__(self, file_path, resume_state):

200

self.file_path = file_path

201

self.position = resume_state or 0

202

self.file_handle = None

203

204

def next_batch(self):

205

if not self.file_handle:

206

self.file_handle = open(self.file_path, 'r')

207

self.file_handle.seek(self.position)

208

209

lines = []

210

for _ in range(100): # Read up to 100 lines per batch

211

line = self.file_handle.readline()

212

if not line: # End of file

213

raise StopIteration

214

lines.append(line.strip())

215

self.position = self.file_handle.tell()

216

217

return lines

218

219

def snapshot(self):

220

return self.position

221

222

def close(self):

223

if self.file_handle:

224

self.file_handle.close()

225

```

226

227

**Queue-based Pattern:**

228

```python

229

import queue

230

from threading import Thread

231

232

class QueueSource(DynamicSource):

233

def __init__(self, queue_size=1000):

234

self.queue = queue.Queue(maxsize=queue_size)

235

self.producer_thread = None

236

237

def build(self, step_id, worker_index, worker_count):

238

if not self.producer_thread:

239

self.producer_thread = Thread(target=self._producer)

240

self.producer_thread.start()

241

return QueuePartition(self.queue)

242

243

def _producer(self):

244

# Background thread that feeds the queue

245

while True:

246

data = external_data_source.fetch()

247

try:

248

self.queue.put(data, timeout=1)

249

except queue.Full:

250

pass # Drop data if queue is full

251

252

class QueuePartition(StatelessSourcePartition):

253

def __init__(self, queue):

254

self.queue = queue

255

256

def next_batch(self):

257

items = []

258

try:

259

# Get up to 10 items without blocking

260

for _ in range(10):

261

item = self.queue.get_nowait()

262

items.append(item)

263

except queue.Empty:

264

pass

265

return items

266

267

def next_awake(self):

268

# Check queue again in 100ms if empty

269

return datetime.now() + timedelta(milliseconds=100) if not items else None

270

```