or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

cross-language.mddata-streams.mdindex.mdsource-functions.mdstream-operations.mdstreaming-context.md

source-functions.mddocs/

0

# Source Functions and Data Ingestion

1

2

This document covers the source function interfaces and implementations in Ray Streaming, including built-in source functions and patterns for creating custom data sources.

3

4

## Overview

5

6

Source functions are the entry points for data ingestion in Ray Streaming. They provide the mechanism to:

7

- Read data from external systems (files, databases, message queues)

8

- Generate synthetic data streams

9

- Create custom data ingestion patterns

10

- Handle parallel data source execution

11

12

## SourceFunction Interface

13

14

The base interface for all source functions in Ray Streaming.

15

16

### Core API

17

18

```python { .api }

19

from ray.streaming.function import SourceFunction, SourceContext

20

21

class SourceFunction:

22

def init(self, parallel: int, index: int) -> None

23

def fetch(self, ctx: SourceContext) -> None

24

def close(self) -> None

25

26

# Inherited from Function base class

27

def open(self, runtime_context) -> None

28

def save_checkpoint(self) -> object

29

def load_checkpoint(self, checkpoint_obj) -> None

30

```

31

32

### SourceContext Interface

33

34

The context interface used by source functions to emit elements.

35

36

```python { .api }

37

class SourceContext:

38

def collect(self, element) -> None

39

```

40

41

## Built-in Source Functions

42

43

Ray Streaming provides several built-in source function implementations.

44

45

### CollectionSourceFunction

46

47

Creates a stream from a Python collection (list, tuple, etc.).

48

49

```python { .api }

50

from ray.streaming.function import CollectionSourceFunction

51

52

class CollectionSourceFunction(SourceFunction):

53

def __init__(self, values)

54

def init(self, parallel: int, index: int) -> None

55

def fetch(self, ctx: SourceContext) -> None

56

```

57

58

### LocalFileSourceFunction

59

60

Reads data from a local text file line by line.

61

62

```python { .api }

63

from ray.streaming.function import LocalFileSourceFunction

64

65

class LocalFileSourceFunction(SourceFunction):

66

def __init__(self, filename: str)

67

def init(self, parallel: int, index: int) -> None

68

def fetch(self, ctx: SourceContext) -> None

69

```

70

71

## Capabilities

72

73

### Creating Streams from Collections

74

75

Use built-in collection source for in-memory data.

76

77

```python

78

from ray.streaming import StreamingContext

79

from ray.streaming.function import CollectionSourceFunction

80

81

ctx = StreamingContext.Builder().build()

82

83

# Using StreamingContext convenience method

84

stream1 = ctx.from_collection([1, 2, 3, 4, 5])

85

86

# Using source function directly

87

collection_source = CollectionSourceFunction([10, 20, 30, 40, 50])

88

stream2 = ctx.source(collection_source)

89

90

# Process the streams

91

stream1.map(lambda x: x * 2).sink(print)

92

stream2.filter(lambda x: x > 25).sink(print)

93

94

ctx.submit("collection_job")

95

```

96

97

### Reading from Text Files

98

99

Use built-in file source for line-by-line file processing.

100

101

```python

102

from ray.streaming import StreamingContext

103

from ray.streaming.function import LocalFileSourceFunction

104

105

ctx = StreamingContext.Builder().build()

106

107

# Using StreamingContext convenience method

108

stream1 = ctx.read_text_file("input.txt")

109

110

# Using source function directly

111

file_source = LocalFileSourceFunction("data.txt")

112

stream2 = ctx.source(file_source)

113

114

# Process file content

115

stream1.flat_map(lambda line: line.split()) \

116

.map(lambda word: word.lower()) \

117

.sink(print)

118

119

ctx.submit("file_processing_job")

120

```

121

122

### Custom Source Functions

123

124

Create custom source functions by implementing the SourceFunction interface.

125

126

```python { .api }

127

from ray.streaming.function import SourceFunction

128

import time

129

import random

130

131

class NumberGeneratorSource(SourceFunction):

132

def init(self, parallel, index):

133

"""Initialize source function with parallelism info"""

134

self.parallel = parallel

135

self.index = index

136

self.count = 0

137

self.max_count = 100

138

139

def fetch(self, ctx):

140

"""Generate and emit elements"""

141

while self.count < self.max_count:

142

# Generate numbers based on parallel index

143

number = (self.count * self.parallel) + self.index

144

ctx.collect(number)

145

self.count += 1

146

time.sleep(0.1) # Simulate data arrival rate

147

148

def close(self):

149

"""Cleanup resources"""

150

print(f"Source {self.index} closing after {self.count} elements")

151

152

# Use custom source

153

ctx = StreamingContext.Builder().build()

154

stream = ctx.source(NumberGeneratorSource())

155

stream.map(lambda x: f"Generated: {x}").sink(print)

156

ctx.submit("number_generator_job")

157

```

158

159

## Advanced Source Patterns

160

161

### Stateful Source Functions

162

163

Maintain state across checkpoint cycles for fault tolerance.

164

165

```python

166

from ray.streaming.function import SourceFunction

167

import json

168

169

class StatefulCounterSource(SourceFunction):

170

def init(self, parallel, index):

171

self.parallel = parallel

172

self.index = index

173

self.counter = 0

174

self.max_count = 1000

175

176

def fetch(self, ctx):

177

while self.counter < self.max_count:

178

ctx.collect(f"count-{self.counter}-from-{self.index}")

179

self.counter += 1

180

181

# Simulate processing delay

182

if self.counter % 100 == 0:

183

time.sleep(0.5)

184

185

def save_checkpoint(self):

186

"""Save current state for fault tolerance"""

187

return {

188

'counter': self.counter,

189

'parallel': self.parallel,

190

'index': self.index

191

}

192

193

def load_checkpoint(self, checkpoint_obj):

194

"""Restore state from checkpoint"""

195

if checkpoint_obj:

196

self.counter = checkpoint_obj['counter']

197

self.parallel = checkpoint_obj['parallel']

198

self.index = checkpoint_obj['index']

199

```

200

201

### External System Source

202

203

Connect to external systems like databases or message queues.

204

205

```python

206

from ray.streaming.function import SourceFunction

207

import time

208

209

class DatabaseSource(SourceFunction):

210

def __init__(self, connection_string, query):

211

self.connection_string = connection_string

212

self.query = query

213

self.connection = None

214

self.cursor = None

215

216

def init(self, parallel, index):

217

self.parallel = parallel

218

self.index = index

219

220

def open(self, runtime_context):

221

"""Initialize database connection"""

222

# Simulated database connection

223

print(f"Connecting to database: {self.connection_string}")

224

self.connection = "mock_connection"

225

self.cursor = "mock_cursor"

226

227

def fetch(self, ctx):

228

"""Fetch data from database"""

229

# Simulated database query results

230

mock_results = [

231

{"id": i, "name": f"record_{i}", "value": i * 10}

232

for i in range(1, 101)

233

]

234

235

for record in mock_results:

236

ctx.collect(record)

237

time.sleep(0.01) # Simulate query time

238

239

def close(self):

240

"""Clean up database connections"""

241

if self.cursor:

242

print("Closing cursor")

243

if self.connection:

244

print("Closing database connection")

245

246

# Usage

247

ctx = StreamingContext.Builder().build()

248

db_source = DatabaseSource("postgresql://localhost/mydb", "SELECT * FROM users")

249

stream = ctx.source(db_source)

250

stream.map(lambda record: f"User: {record['name']}, Value: {record['value']}") \

251

.sink(print)

252

ctx.submit("database_job")

253

```

254

255

### Parallel Source Processing

256

257

Handle parallel execution across multiple source instances.

258

259

```python

260

from ray.streaming.function import SourceFunction

261

import time

262

263

class ParallelRangeSource(SourceFunction):

264

def __init__(self, start, end):

265

self.start = start

266

self.end = end

267

268

def init(self, parallel, index):

269

"""Distribute range across parallel instances"""

270

self.parallel = parallel

271

self.index = index

272

273

# Calculate range for this parallel instance

274

total_range = self.end - self.start

275

range_per_instance = total_range // parallel

276

277

self.local_start = self.start + (index * range_per_instance)

278

if index == parallel - 1: # Last instance gets remainder

279

self.local_end = self.end

280

else:

281

self.local_end = self.local_start + range_per_instance

282

283

print(f"Instance {index}/{parallel} handling range {self.local_start}-{self.local_end}")

284

285

def fetch(self, ctx):

286

"""Generate numbers in assigned range"""

287

for num in range(self.local_start, self.local_end):

288

ctx.collect(num)

289

if num % 1000 == 0:

290

time.sleep(0.1) # Periodic pause

291

292

# Usage with parallelism

293

ctx = StreamingContext.Builder().build()

294

parallel_source = ParallelRangeSource(0, 10000)

295

stream = ctx.source(parallel_source)

296

stream.set_parallelism(4) \

297

.map(lambda x: x ** 2) \

298

.sink(lambda x: print(f"Square: {x}"))

299

ctx.submit("parallel_range_job")

300

```

301

302

## Real-time Data Sources

303

304

### Streaming Data Simulation

305

306

Create continuous data streams that simulate real-time data sources.

307

308

```python

309

from ray.streaming.function import SourceFunction

310

import time

311

import random

312

import json

313

from datetime import datetime

314

315

class SensorDataSource(SourceFunction):

316

def __init__(self, sensor_id, measurement_interval=1.0):

317

self.sensor_id = sensor_id

318

self.measurement_interval = measurement_interval

319

320

def init(self, parallel, index):

321

self.parallel = parallel

322

self.index = index

323

# Each parallel instance simulates different sensors

324

self.actual_sensor_id = f"{self.sensor_id}_{index}"

325

326

def fetch(self, ctx):

327

"""Generate continuous sensor readings"""

328

reading_count = 0

329

while True: # Continuous stream

330

timestamp = datetime.now().isoformat()

331

temperature = 20 + random.uniform(-5, 15)

332

humidity = 50 + random.uniform(-20, 30)

333

334

sensor_reading = {

335

"sensor_id": self.actual_sensor_id,

336

"timestamp": timestamp,

337

"temperature": round(temperature, 2),

338

"humidity": round(humidity, 2),

339

"reading_count": reading_count

340

}

341

342

ctx.collect(sensor_reading)

343

reading_count += 1

344

time.sleep(self.measurement_interval)

345

346

# Process sensor data stream

347

ctx = StreamingContext.Builder().build()

348

sensor_stream = ctx.source(SensorDataSource("temp_sensor", 0.5))

349

sensor_stream.set_parallelism(3) \

350

.filter(lambda reading: reading["temperature"] > 25) \

351

.map(lambda reading: f"HIGH TEMP: {reading['sensor_id']} - {reading['temperature']}°C") \

352

.sink(print)

353

ctx.submit("sensor_monitoring_job")

354

```

355

356

## Error Handling and Resilience

357

358

### Robust Source Implementation

359

360

Handle errors gracefully in source functions.

361

362

```python

363

from ray.streaming.function import SourceFunction

364

import time

365

import random

366

367

class ResilientSource(SourceFunction):

368

def __init__(self, failure_rate=0.1):

369

self.failure_rate = failure_rate

370

self.retry_count = 0

371

self.max_retries = 3

372

373

def init(self, parallel, index):

374

self.parallel = parallel

375

self.index = index

376

self.processed_count = 0

377

378

def fetch(self, ctx):

379

while self.processed_count < 1000:

380

try:

381

# Simulate potential failures

382

if random.random() < self.failure_rate:

383

raise Exception(f"Simulated failure at count {self.processed_count}")

384

385

# Normal processing

386

data = f"data-{self.processed_count}-{self.index}"

387

ctx.collect(data)

388

self.processed_count += 1

389

self.retry_count = 0 # Reset retry count on success

390

391

except Exception as e:

392

self.retry_count += 1

393

if self.retry_count <= self.max_retries:

394

print(f"Source {self.index} error: {e}, retry {self.retry_count}/{self.max_retries}")

395

time.sleep(1 * self.retry_count) # Exponential backoff

396

else:

397

print(f"Source {self.index} failed permanently after {self.max_retries} retries")

398

raise e

399

400

time.sleep(0.1)

401

402

# Usage

403

ctx = StreamingContext.Builder().build()

404

resilient_stream = ctx.source(ResilientSource(failure_rate=0.05))

405

resilient_stream.map(lambda x: f"Processed: {x}").sink(print)

406

ctx.submit("resilient_job")

407

```

408

409

## Best Practices

410

411

### Source Function Guidelines

412

413

1. **Initialization**: Use `init()` for parallel-aware setup, `open()` for resource initialization

414

2. **Resource Management**: Always clean up resources in `close()` method

415

3. **Checkpointing**: Implement checkpoint methods for stateful sources

416

4. **Error Handling**: Handle failures gracefully with retry logic

417

5. **Parallelism**: Design sources to work correctly with parallel execution

418

6. **Performance**: Consider data generation rate and memory usage

419

420

### Configuration and Tuning

421

422

```python

423

# Configure source parallelism and performance

424

ctx = StreamingContext.Builder() \

425

.option("streaming.source.parallelism", "4") \

426

.option("streaming.checkpoint.interval", "10000") \

427

.build()

428

429

source_stream = ctx.source(MyCustomSource()) \

430

.set_parallelism(4) \

431

.with_config("streaming.buffer.size", "2048")

432

```

433

434

## See Also

435

436

- [Streaming Context Documentation](./streaming-context.md) - StreamingContext and job management

437

- [Data Streams Documentation](./data-streams.md) - Stream transformation operations

438

- [Stream Operations Documentation](./stream-operations.md) - Available stream transformations

439

- [Cross-Language Support Documentation](./cross-language.md) - Python/Java integration details