or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

connectors.mddataflow.mdindex.mdoperators.mdrecovery.mdruntime.mdsinks.mdsources.mdstateful.mdtesting.mdtracing.mdwindowing.md

testing.mddocs/

0

# Testing Utilities

1

2

Tools and utilities for testing dataflows including test sources, sinks, and execution helpers. Provides deterministic testing environments and data capture utilities for validating stream processing logic.

3

4

## Capabilities

5

6

### Test Execution Functions

7

8

Functions for running dataflows in test environments with deterministic behavior.

9

10

```python { .api }

11

def run_main(flow: Dataflow, epoch_interval: Optional[timedelta] = None, recovery_config: Optional[RecoveryConfig] = None): ...

12

13

def cluster_main(flow: Dataflow, addresses: List[str], proc_id: int, epoch_interval: Optional[timedelta] = None, recovery_config: Optional[RecoveryConfig] = None, worker_count_per_proc: int = 1): ...

14

```

15

16

**Parameters same as runtime functions but designed for testing:**

17

- Single-threaded execution for predictable ordering

18

- Synchronous operation for test assertions

19

- No background processes or networking complexity

20

21

**Usage Examples:**

22

```python

23

from bytewax.testing import run_main

24

from bytewax.dataflow import Dataflow

25

import bytewax.operators as op

26

27

def test_simple_dataflow():

28

flow = Dataflow("test_flow")

29

30

# Build test dataflow

31

data = op.input("input", flow, TestingSource([1, 2, 3, 4, 5]))

32

doubled = op.map("double", data, lambda x: x * 2)

33

34

# Capture output

35

captured = []

36

op.output("output", doubled, TestingSink(captured))

37

38

# Run and verify

39

run_main(flow)

40

assert captured == [2, 4, 6, 8, 10]

41

```

42

43

### Test Sources

44

45

Sources that provide deterministic, controllable data for testing.

46

47

```python { .api }

48

class TestingSource:

49

def __init__(self, data: Iterable[X]): ...

50

```

51

52

**Parameters:**

53

- `data` (Iterable[X]): Static data to emit in order

54

55

**Usage Examples:**

56

```python

57

from bytewax.testing import TestingSource

58

59

# Simple list data

60

simple_source = TestingSource([1, 2, 3, 4, 5])

61

62

# Complex objects

63

events = [

64

{"user": "alice", "action": "login", "timestamp": "2023-01-01T10:00:00Z"},

65

{"user": "bob", "action": "purchase", "timestamp": "2023-01-01T10:05:00Z"},

66

{"user": "alice", "action": "logout", "timestamp": "2023-01-01T10:30:00Z"}

67

]

68

event_source = TestingSource(events)

69

70

# Generator data

71

def generate_data():

72

for i in range(100):

73

yield {"id": i, "value": i * i}

74

75

generator_source = TestingSource(generate_data())

76

```

77

78

### Test Sinks

79

80

Sinks that capture output data for test assertions.

81

82

```python { .api }

83

class TestingSink:

84

def __init__(self, capture_list: Optional[List] = None): ...

85

def captured(self) -> List: ...

86

```

87

88

**Parameters:**

89

- `capture_list` (List): Optional external list to append captured items to

90

91

**Usage Examples:**

92

```python

93

from bytewax.testing import TestingSink

94

95

# Capture to external list

96

results = []

97

sink = TestingSink(results)

98

99

# Use in dataflow

100

op.output("test_output", processed_stream, sink)

101

102

# Run and assert

103

run_main(flow)

104

assert len(results) == 10

105

assert all(isinstance(item, dict) for item in results)

106

107

# Capture to internal list

108

sink = TestingSink()

109

op.output("test_output", processed_stream, sink)

110

run_main(flow)

111

112

captured_data = sink.captured()

113

assert captured_data[0]["processed"] is True

114

```

115

116

### Time Testing Utilities

117

118

Tools for controlling time in tests, enabling deterministic testing of time-based operations.

119

120

```python { .api }

121

class TimeTestingGetter:

122

def __init__(self, now: datetime): ...

123

def advance(self, td: timedelta) -> None: ...

124

def get(self) -> datetime: ...

125

```

126

127

**Methods:**

128

- `advance(td)`: Move the current time forward by the given duration

129

- `get()`: Get the current mock time

130

131

**Usage Examples:**

132

```python

133

from bytewax.testing import TimeTestingGetter

134

from datetime import datetime, timedelta

135

import bytewax.operators.windowing as win

136

137

def test_windowing_with_mock_time():

138

# Create mock time starting at a known point

139

mock_time = TimeTestingGetter(datetime(2023, 1, 1, 12, 0, 0))

140

141

flow = Dataflow("time_test")

142

143

# Create events with timestamps

144

events = [

145

{"value": 1, "time": datetime(2023, 1, 1, 12, 0, 0)},

146

{"value": 2, "time": datetime(2023, 1, 1, 12, 0, 30)},

147

{"value": 3, "time": datetime(2023, 1, 1, 12, 1, 0)},

148

]

149

150

stream = op.input("events", flow, TestingSource(events))

151

keyed = op.key_on("key", stream, lambda x: "all")

152

153

# Use mock time in windowing

154

windowed = win.collect_window(

155

"windows",

156

keyed,

157

win.EventClock(lambda e: e["time"]),

158

win.TumblingWindower(timedelta(minutes=1))

159

)

160

161

results = []

162

op.output("output", windowed, TestingSink(results))

163

164

run_main(flow)

165

166

# Verify window contents

167

assert len(results) == 2 # Two windows

168

assert len(results[0][1]) == 2 # First window has 2 events

169

assert len(results[1][1]) == 1 # Second window has 1 event

170

```

171

172

### Source Testing Utilities

173

174

Utilities for testing custom sources and controlling their behavior.

175

176

```python { .api }

177

def poll_next_batch(source_part: StatefulSourcePartition, timeout: timedelta = timedelta(seconds=1)): ...

178

179

def ffwd_iter(iterator: Iterator[X], skip_to: int) -> Iterator[X]: ...

180

```

181

182

**poll_next_batch Parameters:**

183

- `source_part` (StatefulSourcePartition): Source partition to poll

184

- `timeout` (timedelta): Maximum time to wait for data

185

186

**ffwd_iter Parameters:**

187

- `iterator` (Iterator): Iterator to fast-forward

188

- `skip_to` (int): Number of items to skip

189

190

**Usage Examples:**

191

```python

192

from bytewax.testing import poll_next_batch, ffwd_iter

193

194

def test_custom_source():

195

# Test a custom source partition

196

partition = MyCustomSourcePartition(test_data)

197

198

# Poll for first batch

199

batch1 = poll_next_batch(partition, timeout=timedelta(seconds=5))

200

assert len(batch1) > 0

201

202

# Advance to specific position

203

iterator = iter(range(1000))

204

advanced = ffwd_iter(iterator, skip_to=500)

205

206

# First item should be 500

207

assert next(advanced) == 500

208

```

209

210

### Testing Patterns

211

212

**Unit Testing Dataflow Components:**

213

```python

214

import unittest

215

from bytewax.dataflow import Dataflow

216

from bytewax.testing import TestingSource, TestingSink, run_main

217

import bytewax.operators as op

218

219

class TestDataProcessing(unittest.TestCase):

220

221

def test_data_transformation(self):

222

"""Test basic data transformation logic."""

223

flow = Dataflow("transform_test")

224

225

# Input data

226

input_data = [

227

{"value": 10, "type": "A"},

228

{"value": 20, "type": "B"},

229

{"value": 30, "type": "A"}

230

]

231

232

# Build dataflow

233

stream = op.input("input", flow, TestingSource(input_data))

234

235

# Filter only type A

236

type_a = op.filter("filter_a", stream, lambda x: x["type"] == "A")

237

238

# Double the values

239

doubled = op.map("double", type_a, lambda x: {**x, "value": x["value"] * 2})

240

241

# Capture results

242

results = []

243

op.output("output", doubled, TestingSink(results))

244

245

# Execute and verify

246

run_main(flow)

247

248

expected = [

249

{"value": 20, "type": "A"},

250

{"value": 60, "type": "A"}

251

]

252

self.assertEqual(results, expected)

253

254

def test_stateful_processing(self):

255

"""Test stateful operators maintain state correctly."""

256

flow = Dataflow("stateful_test")

257

258

# Input events

259

events = [

260

("user1", 5),

261

("user2", 3),

262

("user1", 7),

263

("user2", 2),

264

("user1", 1)

265

]

266

267

stream = op.input("input", flow, TestingSource(events))

268

269

# Running sum per user

270

def running_sum(state, value):

271

new_state = (state or 0) + value

272

return new_state, new_state

273

274

sums = op.stateful_map("running_sum", stream, running_sum)

275

276

results = []

277

op.output("output", sums, TestingSink(results))

278

279

run_main(flow)

280

281

expected = [

282

("user1", 5), # 5

283

("user2", 3), # 3

284

("user1", 12), # 5 + 7

285

("user2", 5), # 3 + 2

286

("user1", 13) # 12 + 1

287

]

288

self.assertEqual(results, expected)

289

290

if __name__ == '__main__':

291

unittest.main()

292

```

293

294

**Integration Testing with Multiple Sources:**

295

```python

296

def test_multi_source_integration():

297

"""Test dataflow with multiple input sources."""

298

flow = Dataflow("multi_source_test")

299

300

# User events

301

user_events = [

302

{"user_id": "u1", "event": "login", "timestamp": 1000},

303

{"user_id": "u2", "event": "login", "timestamp": 1001},

304

{"user_id": "u1", "event": "purchase", "timestamp": 1002}

305

]

306

307

# User profiles

308

user_profiles = [

309

{"user_id": "u1", "name": "Alice", "tier": "premium"},

310

{"user_id": "u2", "name": "Bob", "tier": "basic"}

311

]

312

313

# Set up streams

314

events = op.input("events", flow, TestingSource(user_events))

315

profiles = op.input("profiles", flow, TestingSource(user_profiles))

316

317

# Key both streams

318

keyed_events = op.key_on("key_events", events, lambda e: e["user_id"])

319

keyed_profiles = op.key_on("key_profiles", profiles, lambda p: p["user_id"])

320

321

# Join streams

322

joined = op.join("join_user_data", keyed_events, keyed_profiles)

323

324

# Process joined data

325

def enrich_event(joined_data):

326

event, profile = joined_data

327

return {

328

**event,

329

"user_name": profile["name"],

330

"user_tier": profile["tier"]

331

}

332

333

enriched = op.map("enrich", joined, lambda x: enrich_event(x[1]))

334

335

results = []

336

op.output("output", enriched, TestingSink(results))

337

338

run_main(flow)

339

340

# Verify enriched events

341

assert len(results) >= 2 # At least login events should be enriched

342

assert all("user_name" in event for event in results)

343

assert all("user_tier" in event for event in results)

344

```

345

346

**Error Handling Tests:**

347

```python

348

def test_error_handling():

349

"""Test dataflow handles errors gracefully."""

350

flow = Dataflow("error_test")

351

352

# Data with some invalid items

353

mixed_data = [

354

{"value": 10},

355

{"invalid": "data"}, # Missing "value" key

356

{"value": 20},

357

None, # Invalid item

358

{"value": 30}

359

]

360

361

stream = op.input("input", flow, TestingSource(mixed_data))

362

363

# Safe processing with error handling

364

def safe_process(item):

365

if item is None or "value" not in item:

366

return None # Filter out invalid items

367

return item["value"] * 2

368

369

processed = op.filter_map("safe_process", stream, safe_process)

370

371

results = []

372

op.output("output", processed, TestingSink(results))

373

374

run_main(flow)

375

376

# Should only have valid processed items

377

expected = [20, 40, 60] # 10*2, 20*2, 30*2

378

assert results == expected

379

```

380

381

**Performance Testing:**

382

```python

383

import time

384

385

def test_throughput():

386

"""Test dataflow throughput with large dataset."""

387

flow = Dataflow("throughput_test")

388

389

# Generate large dataset

390

large_dataset = list(range(100000))

391

392

stream = op.input("input", flow, TestingSource(large_dataset))

393

processed = op.map("process", stream, lambda x: x * 2)

394

395

results = []

396

op.output("output", processed, TestingSink(results))

397

398

# Measure execution time

399

start_time = time.time()

400

run_main(flow)

401

end_time = time.time()

402

403

# Verify results and performance

404

assert len(results) == 100000

405

assert results[0] == 0

406

assert results[-1] == 199998

407

408

duration = end_time - start_time

409

throughput = len(results) / duration

410

411

print(f"Processed {len(results)} items in {duration:.2f}s ({throughput:.0f} items/sec)")

412

413

# Assert minimum throughput (adjust based on requirements)

414

assert throughput > 10000 # At least 10k items/sec

415

```