0
# Source Functions and Data Ingestion
1
2
This document covers the source function interfaces and implementations in Ray Streaming, including built-in source functions and patterns for creating custom data sources.
3
4
## Overview
5
6
Source functions are the entry points for data ingestion in Ray Streaming. They provide the mechanism to:
7
- Read data from external systems (files, databases, message queues)
8
- Generate synthetic data streams
9
- Create custom data ingestion patterns
10
- Handle parallel data source execution
11
12
## SourceFunction Interface
13
14
The base interface for all source functions in Ray Streaming.
15
16
### Core API
17
18
```python { .api }
19
from ray.streaming.function import SourceFunction, SourceContext
20
21
class SourceFunction:
22
def init(self, parallel: int, index: int) -> None
23
def fetch(self, ctx: SourceContext) -> None
24
def close(self) -> None
25
26
# Inherited from Function base class
27
def open(self, runtime_context) -> None
28
def save_checkpoint(self) -> object
29
def load_checkpoint(self, checkpoint_obj) -> None
30
```
31
32
### SourceContext Interface
33
34
The context interface used by source functions to emit elements.
35
36
```python { .api }
37
class SourceContext:
38
def collect(self, element) -> None
39
```
40
41
## Built-in Source Functions
42
43
Ray Streaming provides several built-in source function implementations.
44
45
### CollectionSourceFunction
46
47
Creates a stream from a Python collection (list, tuple, etc.).
48
49
```python { .api }
50
from ray.streaming.function import CollectionSourceFunction
51
52
class CollectionSourceFunction(SourceFunction):
53
def __init__(self, values)
54
def init(self, parallel: int, index: int) -> None
55
def fetch(self, ctx: SourceContext) -> None
56
```
57
58
### LocalFileSourceFunction
59
60
Reads data from a local text file line by line.
61
62
```python { .api }
63
from ray.streaming.function import LocalFileSourceFunction
64
65
class LocalFileSourceFunction(SourceFunction):
66
def __init__(self, filename: str)
67
def init(self, parallel: int, index: int) -> None
68
def fetch(self, ctx: SourceContext) -> None
69
```
70
71
## Capabilities
72
73
### Creating Streams from Collections
74
75
Use built-in collection source for in-memory data.
76
77
```python
78
from ray.streaming import StreamingContext
79
from ray.streaming.function import CollectionSourceFunction
80
81
ctx = StreamingContext.Builder().build()
82
83
# Using StreamingContext convenience method
84
stream1 = ctx.from_collection([1, 2, 3, 4, 5])
85
86
# Using source function directly
87
collection_source = CollectionSourceFunction([10, 20, 30, 40, 50])
88
stream2 = ctx.source(collection_source)
89
90
# Process the streams
91
stream1.map(lambda x: x * 2).sink(print)
92
stream2.filter(lambda x: x > 25).sink(print)
93
94
ctx.submit("collection_job")
95
```
96
97
### Reading from Text Files
98
99
Use built-in file source for line-by-line file processing.
100
101
```python
102
from ray.streaming import StreamingContext
103
from ray.streaming.function import LocalFileSourceFunction
104
105
ctx = StreamingContext.Builder().build()
106
107
# Using StreamingContext convenience method
108
stream1 = ctx.read_text_file("input.txt")
109
110
# Using source function directly
111
file_source = LocalFileSourceFunction("data.txt")
112
stream2 = ctx.source(file_source)
113
114
# Process file content
115
stream1.flat_map(lambda line: line.split()) \
116
.map(lambda word: word.lower()) \
117
.sink(print)
118
119
ctx.submit("file_processing_job")
120
```
121
122
### Custom Source Functions
123
124
Create custom source functions by implementing the SourceFunction interface.
125
126
```python { .api }
127
from ray.streaming.function import SourceFunction
128
import time
129
import random
130
131
class NumberGeneratorSource(SourceFunction):
132
def init(self, parallel, index):
133
"""Initialize source function with parallelism info"""
134
self.parallel = parallel
135
self.index = index
136
self.count = 0
137
self.max_count = 100
138
139
def fetch(self, ctx):
140
"""Generate and emit elements"""
141
while self.count < self.max_count:
142
# Generate numbers based on parallel index
143
number = (self.count * self.parallel) + self.index
144
ctx.collect(number)
145
self.count += 1
146
time.sleep(0.1) # Simulate data arrival rate
147
148
def close(self):
149
"""Cleanup resources"""
150
print(f"Source {self.index} closing after {self.count} elements")
151
152
# Use custom source
153
ctx = StreamingContext.Builder().build()
154
stream = ctx.source(NumberGeneratorSource())
155
stream.map(lambda x: f"Generated: {x}").sink(print)
156
ctx.submit("number_generator_job")
157
```
158
159
## Advanced Source Patterns
160
161
### Stateful Source Functions
162
163
Maintain state across checkpoint cycles for fault tolerance.
164
165
```python
166
from ray.streaming.function import SourceFunction
167
import json
168
169
class StatefulCounterSource(SourceFunction):
170
def init(self, parallel, index):
171
self.parallel = parallel
172
self.index = index
173
self.counter = 0
174
self.max_count = 1000
175
176
def fetch(self, ctx):
177
while self.counter < self.max_count:
178
ctx.collect(f"count-{self.counter}-from-{self.index}")
179
self.counter += 1
180
181
# Simulate processing delay
182
if self.counter % 100 == 0:
183
time.sleep(0.5)
184
185
def save_checkpoint(self):
186
"""Save current state for fault tolerance"""
187
return {
188
'counter': self.counter,
189
'parallel': self.parallel,
190
'index': self.index
191
}
192
193
def load_checkpoint(self, checkpoint_obj):
194
"""Restore state from checkpoint"""
195
if checkpoint_obj:
196
self.counter = checkpoint_obj['counter']
197
self.parallel = checkpoint_obj['parallel']
198
self.index = checkpoint_obj['index']
199
```
200
201
### External System Source
202
203
Connect to external systems like databases or message queues.
204
205
```python
206
from ray.streaming.function import SourceFunction
207
import time
208
209
class DatabaseSource(SourceFunction):
210
def __init__(self, connection_string, query):
211
self.connection_string = connection_string
212
self.query = query
213
self.connection = None
214
self.cursor = None
215
216
def init(self, parallel, index):
217
self.parallel = parallel
218
self.index = index
219
220
def open(self, runtime_context):
221
"""Initialize database connection"""
222
# Simulated database connection
223
print(f"Connecting to database: {self.connection_string}")
224
self.connection = "mock_connection"
225
self.cursor = "mock_cursor"
226
227
def fetch(self, ctx):
228
"""Fetch data from database"""
229
# Simulated database query results
230
mock_results = [
231
{"id": i, "name": f"record_{i}", "value": i * 10}
232
for i in range(1, 101)
233
]
234
235
for record in mock_results:
236
ctx.collect(record)
237
time.sleep(0.01) # Simulate query time
238
239
def close(self):
240
"""Clean up database connections"""
241
if self.cursor:
242
print("Closing cursor")
243
if self.connection:
244
print("Closing database connection")
245
246
# Usage
247
ctx = StreamingContext.Builder().build()
248
db_source = DatabaseSource("postgresql://localhost/mydb", "SELECT * FROM users")
249
stream = ctx.source(db_source)
250
stream.map(lambda record: f"User: {record['name']}, Value: {record['value']}") \
251
.sink(print)
252
ctx.submit("database_job")
253
```
254
255
### Parallel Source Processing
256
257
Handle parallel execution across multiple source instances.
258
259
```python
260
from ray.streaming.function import SourceFunction
261
import time
262
263
class ParallelRangeSource(SourceFunction):
264
def __init__(self, start, end):
265
self.start = start
266
self.end = end
267
268
def init(self, parallel, index):
269
"""Distribute range across parallel instances"""
270
self.parallel = parallel
271
self.index = index
272
273
# Calculate range for this parallel instance
274
total_range = self.end - self.start
275
range_per_instance = total_range // parallel
276
277
self.local_start = self.start + (index * range_per_instance)
278
if index == parallel - 1: # Last instance gets remainder
279
self.local_end = self.end
280
else:
281
self.local_end = self.local_start + range_per_instance
282
283
print(f"Instance {index}/{parallel} handling range {self.local_start}-{self.local_end}")
284
285
def fetch(self, ctx):
286
"""Generate numbers in assigned range"""
287
for num in range(self.local_start, self.local_end):
288
ctx.collect(num)
289
if num % 1000 == 0:
290
time.sleep(0.1) # Periodic pause
291
292
# Usage with parallelism
293
ctx = StreamingContext.Builder().build()
294
parallel_source = ParallelRangeSource(0, 10000)
295
stream = ctx.source(parallel_source)
296
stream.set_parallelism(4) \
297
.map(lambda x: x ** 2) \
298
.sink(lambda x: print(f"Square: {x}"))
299
ctx.submit("parallel_range_job")
300
```
301
302
## Real-time Data Sources
303
304
### Streaming Data Simulation
305
306
Create continuous data streams that simulate real-time data sources.
307
308
```python
309
from ray.streaming.function import SourceFunction
310
import time
311
import random
312
import json
313
from datetime import datetime
314
315
class SensorDataSource(SourceFunction):
316
def __init__(self, sensor_id, measurement_interval=1.0):
317
self.sensor_id = sensor_id
318
self.measurement_interval = measurement_interval
319
320
def init(self, parallel, index):
321
self.parallel = parallel
322
self.index = index
323
# Each parallel instance simulates different sensors
324
self.actual_sensor_id = f"{self.sensor_id}_{index}"
325
326
def fetch(self, ctx):
327
"""Generate continuous sensor readings"""
328
reading_count = 0
329
while True: # Continuous stream
330
timestamp = datetime.now().isoformat()
331
temperature = 20 + random.uniform(-5, 15)
332
humidity = 50 + random.uniform(-20, 30)
333
334
sensor_reading = {
335
"sensor_id": self.actual_sensor_id,
336
"timestamp": timestamp,
337
"temperature": round(temperature, 2),
338
"humidity": round(humidity, 2),
339
"reading_count": reading_count
340
}
341
342
ctx.collect(sensor_reading)
343
reading_count += 1
344
time.sleep(self.measurement_interval)
345
346
# Process sensor data stream
347
ctx = StreamingContext.Builder().build()
348
sensor_stream = ctx.source(SensorDataSource("temp_sensor", 0.5))
349
sensor_stream.set_parallelism(3) \
350
.filter(lambda reading: reading["temperature"] > 25) \
351
.map(lambda reading: f"HIGH TEMP: {reading['sensor_id']} - {reading['temperature']}°C") \
352
.sink(print)
353
ctx.submit("sensor_monitoring_job")
354
```
355
356
## Error Handling and Resilience
357
358
### Robust Source Implementation
359
360
Handle errors gracefully in source functions.
361
362
```python
363
from ray.streaming.function import SourceFunction
364
import time
365
import random
366
367
class ResilientSource(SourceFunction):
368
def __init__(self, failure_rate=0.1):
369
self.failure_rate = failure_rate
370
self.retry_count = 0
371
self.max_retries = 3
372
373
def init(self, parallel, index):
374
self.parallel = parallel
375
self.index = index
376
self.processed_count = 0
377
378
def fetch(self, ctx):
379
while self.processed_count < 1000:
380
try:
381
# Simulate potential failures
382
if random.random() < self.failure_rate:
383
raise Exception(f"Simulated failure at count {self.processed_count}")
384
385
# Normal processing
386
data = f"data-{self.processed_count}-{self.index}"
387
ctx.collect(data)
388
self.processed_count += 1
389
self.retry_count = 0 # Reset retry count on success
390
391
except Exception as e:
392
self.retry_count += 1
393
if self.retry_count <= self.max_retries:
394
print(f"Source {self.index} error: {e}, retry {self.retry_count}/{self.max_retries}")
395
time.sleep(1 * self.retry_count) # Exponential backoff
396
else:
397
print(f"Source {self.index} failed permanently after {self.max_retries} retries")
398
raise e
399
400
time.sleep(0.1)
401
402
# Usage
403
ctx = StreamingContext.Builder().build()
404
resilient_stream = ctx.source(ResilientSource(failure_rate=0.05))
405
resilient_stream.map(lambda x: f"Processed: {x}").sink(print)
406
ctx.submit("resilient_job")
407
```
408
409
## Best Practices
410
411
### Source Function Guidelines
412
413
1. **Initialization**: Use `init()` for parallel-aware setup, `open()` for resource initialization
414
2. **Resource Management**: Always clean up resources in `close()` method
415
3. **Checkpointing**: Implement checkpoint methods for stateful sources
416
4. **Error Handling**: Handle failures gracefully with retry logic
417
5. **Parallelism**: Design sources to work correctly with parallel execution
418
6. **Performance**: Consider data generation rate and memory usage
419
420
### Configuration and Tuning
421
422
```python
423
# Configure source parallelism and performance
424
ctx = StreamingContext.Builder() \
425
.option("streaming.source.parallelism", "4") \
426
.option("streaming.checkpoint.interval", "10000") \
427
.build()
428
429
source_stream = ctx.source(MyCustomSource()) \
430
.set_parallelism(4) \
431
.with_config("streaming.buffer.size", "2048")
432
```
433
434
## See Also
435
436
- [Streaming Context Documentation](./streaming-context.md) - StreamingContext and job management
437
- [Data Streams Documentation](./data-streams.md) - Stream transformation operations
438
- [Stream Operations Documentation](./stream-operations.md) - Available stream transformations
439
- [Cross-Language Support Documentation](./cross-language.md) - Python/Java integration details