or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

authentication.mdcli-framework.mdcore-application.mddata-management.mdindex.mdmonitoring.mdserialization.mdstream-processing.mdtopics-channels.mdwindowing.mdworker-management.md

windowing.mddocs/

0

# Windowing

1

2

Time-based windowing operations for temporal data aggregation in Faust applications. Provides tumbling, hopping, and sliding window implementations for stream analytics with configurable time boundaries, expiration policies, and efficient state management.

3

4

## Capabilities

5

6

### Window Base Class

7

8

Abstract base class for all window implementations. Defines the common interface and behavior for time-based data partitioning and aggregation operations.

9

10

```python { .api }

11

class Window:

12

def __init__(self, *, expires: float = None, **kwargs):

13

"""

14

Base window implementation.

15

16

Args:

17

expires: Window expiration time in seconds

18

"""

19

20

def ranges(self, timestamp: float) -> list:

21

"""

22

Get window ranges for a given timestamp.

23

24

Args:

25

timestamp: Event timestamp in seconds

26

27

Returns:

28

List of (start, end) tuples for applicable windows

29

"""

30

31

def stale(self, timestamp: float, latest_timestamp: float) -> bool:

32

"""

33

Check if window is stale and should be expired.

34

35

Args:

36

timestamp: Window timestamp

37

latest_timestamp: Latest observed timestamp

38

39

Returns:

40

True if window should be expired

41

"""

42

43

def current(self, timestamp: float) -> tuple:

44

"""

45

Get current window range for timestamp.

46

47

Args:

48

timestamp: Event timestamp

49

50

Returns:

51

(start, end) tuple for current window

52

"""

53

54

@property

55

def expires(self) -> float:

56

"""Window expiration time in seconds."""

57

58

@property

59

def ident(self) -> str:

60

"""Unique identifier for this window type."""

61

```

62

63

### Tumbling Windows

64

65

Fixed-size, non-overlapping windows that partition time into discrete intervals. Each event belongs to exactly one window, making them ideal for aggregations like counts, sums, and averages over regular time periods.

66

67

```python { .api }

68

class TumblingWindow(Window):

69

def __init__(self, size: float, *, expires: float = None):

70

"""

71

Create tumbling window with fixed size.

72

73

Args:

74

size: Window size in seconds

75

expires: Window expiration time (defaults to size * 2)

76

77

Example:

78

# 5-minute tumbling windows

79

window = TumblingWindow(300) # 300 seconds = 5 minutes

80

"""

81

82

def ranges(self, timestamp: float) -> list:

83

"""

84

Get single window range for timestamp.

85

86

Args:

87

timestamp: Event timestamp

88

89

Returns:

90

List containing single (start, end) tuple

91

"""

92

93

def current(self, timestamp: float) -> tuple:

94

"""

95

Get current window boundaries.

96

97

Args:

98

timestamp: Event timestamp

99

100

Returns:

101

(start, end) tuple for window containing timestamp

102

"""

103

104

@property

105

def size(self) -> float:

106

"""Window size in seconds."""

107

108

@property

109

def ident(self) -> str:

110

"""Window identifier: 'tumbling_{size}'."""

111

```

112

113

### Hopping Windows

114

115

Fixed-size, overlapping windows that advance by a smaller step size. Events can belong to multiple windows, enabling sliding aggregations and overlapping time-based analysis.

116

117

```python { .api }

118

class HoppingWindow(Window):

119

def __init__(self, size: float, step: float, *, expires: float = None):

120

"""

121

Create hopping window with size and step.

122

123

Args:

124

size: Window size in seconds

125

step: Step size (advance interval) in seconds

126

expires: Window expiration time (defaults to size * 2)

127

128

Example:

129

# 10-minute windows advancing every 5 minutes

130

window = HoppingWindow(size=600, step=300)

131

"""

132

133

def ranges(self, timestamp: float) -> list:

134

"""

135

Get multiple overlapping window ranges.

136

137

Args:

138

timestamp: Event timestamp

139

140

Returns:

141

List of (start, end) tuples for overlapping windows

142

"""

143

144

def current(self, timestamp: float) -> tuple:

145

"""

146

Get most recent window containing timestamp.

147

148

Args:

149

timestamp: Event timestamp

150

151

Returns:

152

(start, end) tuple for latest applicable window

153

"""

154

155

@property

156

def size(self) -> float:

157

"""Window size in seconds."""

158

159

@property

160

def step(self) -> float:

161

"""Step size in seconds."""

162

163

@property

164

def ident(self) -> str:

165

"""Window identifier: 'hopping_{size}_{step}'."""

166

```

167

168

### Sliding Windows

169

170

Variable-size windows that expand around each event timestamp. Useful for time-range queries and event correlation within flexible time boundaries.

171

172

```python { .api }

173

class SlidingWindow(Window):

174

def __init__(self, before: float, after: float, *, expires: float = None):

175

"""

176

Create sliding window with before/after ranges.

177

178

Args:

179

before: Time range before event timestamp (seconds)

180

after: Time range after event timestamp (seconds)

181

expires: Window expiration time (defaults to before + after + 60)

182

183

Example:

184

# 5 minutes before, 2 minutes after each event

185

window = SlidingWindow(before=300, after=120)

186

"""

187

188

def ranges(self, timestamp: float) -> list:

189

"""

190

Get window range centered on timestamp.

191

192

Args:

193

timestamp: Event timestamp

194

195

Returns:

196

List containing single (start, end) tuple

197

"""

198

199

def current(self, timestamp: float) -> tuple:

200

"""

201

Get window boundaries around timestamp.

202

203

Args:

204

timestamp: Event timestamp

205

206

Returns:

207

(start, end) tuple: (timestamp - before, timestamp + after)

208

"""

209

210

@property

211

def before(self) -> float:

212

"""Time range before event in seconds."""

213

214

@property

215

def after(self) -> float:

216

"""Time range after event in seconds."""

217

218

@property

219

def total_size(self) -> float:

220

"""Total window size (before + after)."""

221

222

@property

223

def ident(self) -> str:

224

"""Window identifier: 'sliding_{before}_{after}'."""

225

```

226

227

### Windowed Tables

228

229

Integration between windows and tables for time-based stateful aggregations. Windowed tables automatically partition data by time windows and manage window lifecycle.

230

231

```python { .api }

232

class WindowedTable:

233

def __init__(

234

self,

235

table: Table,

236

window: Window,

237

*,

238

key_index_size: int = None

239

):

240

"""

241

Create windowed table wrapper.

242

243

Args:

244

table: Base table for storage

245

window: Window specification

246

key_index_size: Size of key index for cleanup

247

"""

248

249

def __getitem__(self, key_and_timestamp: tuple) -> any:

250

"""

251

Get value for key at specific timestamp.

252

253

Args:

254

key_and_timestamp: (key, timestamp) tuple

255

256

Returns:

257

Value in applicable window

258

"""

259

260

def __setitem__(self, key_and_timestamp: tuple, value: any) -> None:

261

"""

262

Set value for key at specific timestamp.

263

264

Args:

265

key_and_timestamp: (key, timestamp) tuple

266

value: Value to store

267

"""

268

269

def get_window(self, key: any, window_range: tuple) -> any:

270

"""

271

Get value for specific window range.

272

273

Args:

274

key: Table key

275

window_range: (start, end) window tuple

276

277

Returns:

278

Value in specified window

279

"""

280

281

def set_window(self, key: any, window_range: tuple, value: any) -> None:

282

"""

283

Set value for specific window range.

284

285

Args:

286

key: Table key

287

window_range: (start, end) window tuple

288

value: Value to store

289

"""

290

291

def expire_windows(self, latest_timestamp: float) -> int:

292

"""

293

Expire stale windows based on latest timestamp.

294

295

Args:

296

latest_timestamp: Latest observed timestamp

297

298

Returns:

299

Number of windows expired

300

"""

301

302

def windows_for_key(self, key: any) -> list:

303

"""

304

Get all active windows for a key.

305

306

Args:

307

key: Table key

308

309

Returns:

310

List of (window_range, value) tuples

311

"""

312

313

@property

314

def window(self) -> Window:

315

"""Window specification."""

316

317

@property

318

def table(self) -> Table:

319

"""Underlying table."""

320

```

321

322

### Window Operations

323

324

Utility functions and operations for working with windowed data, including aggregation helpers and window management utilities.

325

326

```python { .api }

327

def current_window() -> tuple:

328

"""

329

Get current window range from stream context.

330

331

Returns:

332

(start, end) tuple for current window

333

334

Raises:

335

RuntimeError: If called outside windowed stream context

336

"""

337

338

def windowed_count(table: Table, window: Window) -> callable:

339

"""

340

Create windowed counting aggregator.

341

342

Args:

343

table: Table for storing counts

344

window: Window specification

345

346

Returns:

347

Function that increments count for windowed keys

348

"""

349

350

def windowed_sum(table: Table, window: Window) -> callable:

351

"""

352

Create windowed sum aggregator.

353

354

Args:

355

table: Table for storing sums

356

window: Window specification

357

358

Returns:

359

Function that adds values to windowed sums

360

"""

361

362

def windowed_average(

363

sum_table: Table,

364

count_table: Table,

365

window: Window

366

) -> callable:

367

"""

368

Create windowed average aggregator.

369

370

Args:

371

sum_table: Table for storing sums

372

count_table: Table for storing counts

373

window: Window specification

374

375

Returns:

376

Function that maintains windowed averages

377

"""

378

379

class WindowRange:

380

def __init__(self, start: float, end: float):

381

"""

382

Window range representation.

383

384

Args:

385

start: Window start timestamp

386

end: Window end timestamp

387

"""

388

389

def contains(self, timestamp: float) -> bool:

390

"""Check if timestamp falls within window range."""

391

392

def overlaps(self, other: 'WindowRange') -> bool:

393

"""Check if this window overlaps with another."""

394

395

@property

396

def duration(self) -> float:

397

"""Window duration in seconds."""

398

399

@property

400

def midpoint(self) -> float:

401

"""Window midpoint timestamp."""

402

```

403

404

## Usage Examples

405

406

### Tumbling Window Aggregation

407

408

```python

409

import faust

410

from faust import TumblingWindow

411

412

app = faust.App('windowing-app', broker='kafka://localhost:9092')

413

414

# 5-minute tumbling windows for counting events

415

event_counts = app.Table(

416

'event-counts',

417

default=int,

418

window=TumblingWindow(300) # 5 minutes

419

)

420

421

events_topic = app.topic('events', value_type=dict)

422

423

@app.agent(events_topic)

424

async def count_events(events):

425

async for event in events:

426

# Count events per category in 5-minute windows

427

category = event['category']

428

event_counts[category] += 1

429

430

@app.timer(interval=60.0)

431

async def print_window_stats():

432

# Print counts for current windows

433

import time

434

current_time = time.time()

435

436

for category, count in event_counts.items():

437

window_start = (current_time // 300) * 300

438

print(f"Category {category}: {count} events in window {window_start}")

439

```

440

441

### Hopping Window Analytics

442

443

```python

444

from faust import HoppingWindow

445

446

# 10-minute windows advancing every 5 minutes

447

sliding_averages = app.Table(

448

'sliding-averages',

449

default=lambda: {'sum': 0, 'count': 0},

450

window=HoppingWindow(size=600, step=300)

451

)

452

453

metrics_topic = app.topic('metrics', value_type=dict)

454

455

@app.agent(metrics_topic)

456

async def compute_sliding_average(metrics):

457

async for metric in metrics:

458

metric_name = metric['name']

459

value = metric['value']

460

461

# Update sliding average for overlapping windows

462

stats = sliding_averages[metric_name]

463

stats['sum'] += value

464

stats['count'] += 1

465

sliding_averages[metric_name] = stats

466

467

@app.timer(interval=300.0) # Every 5 minutes (step size)

468

async def report_sliding_averages():

469

for metric_name, stats in sliding_averages.items():

470

if stats['count'] > 0:

471

avg = stats['sum'] / stats['count']

472

print(f"Sliding average for {metric_name}: {avg}")

473

```

474

475

### Session Windows with Sliding Window

476

477

```python

478

from faust import SlidingWindow

479

480

# Session tracking with 30-minute timeout

481

user_sessions = app.Table(

482

'user-sessions',

483

default=lambda: {'start_time': None, 'last_activity': None, 'events': []},

484

window=SlidingWindow(before=1800, after=0) # 30 minutes before

485

)

486

487

activity_topic = app.topic('user-activity', value_type=dict)

488

489

@app.agent(activity_topic)

490

async def track_sessions(activities):

491

async for activity in activities:

492

user_id = activity['user_id']

493

timestamp = activity['timestamp']

494

495

# Get session data

496

session = user_sessions[user_id]

497

498

# Check if this continues existing session or starts new one

499

if (session['last_activity'] is None or

500

timestamp - session['last_activity'] > 1800): # 30 min timeout

501

# New session

502

session['start_time'] = timestamp

503

session['events'] = []

504

505

# Update session

506

session['last_activity'] = timestamp

507

session['events'].append(activity)

508

user_sessions[user_id] = session

509

```

510

511

### Custom Window Implementation

512

513

```python

514

class BusinessHoursWindow(faust.Window):

515

"""Custom window that only includes business hours."""

516

517

def __init__(self, *, start_hour=9, end_hour=17, timezone='UTC'):

518

super().__init__()

519

self.start_hour = start_hour

520

self.end_hour = end_hour

521

self.timezone = timezone

522

523

def ranges(self, timestamp):

524

from datetime import datetime

525

import pytz

526

527

tz = pytz.timezone(self.timezone)

528

dt = datetime.fromtimestamp(timestamp, tz)

529

530

# Check if timestamp is within business hours

531

if self.start_hour <= dt.hour < self.end_hour:

532

# Return daily business hours window

533

day_start = dt.replace(

534

hour=self.start_hour, minute=0, second=0, microsecond=0

535

)

536

day_end = dt.replace(

537

hour=self.end_hour, minute=0, second=0, microsecond=0

538

)

539

return [(day_start.timestamp(), day_end.timestamp())]

540

else:

541

return [] # Outside business hours

542

543

# Use custom window

544

business_metrics = app.Table(

545

'business-metrics',

546

default=int,

547

window=BusinessHoursWindow(start_hour=9, end_hour=17, timezone='US/Eastern')

548

)

549

```

550

551

### Window Expiration and Cleanup

552

553

```python

554

from faust import TumblingWindow

555

556

# Configure window expiration

557

hourly_stats = app.Table(

558

'hourly-stats',

559

default=int,

560

window=TumblingWindow(

561

size=3600, # 1 hour windows

562

expires=7200 # Keep windows for 2 hours

563

)

564

)

565

566

@app.timer(interval=300.0) # Every 5 minutes

567

async def cleanup_expired_windows():

568

"""Clean up expired windows to manage memory."""

569

import time

570

current_time = time.time()

571

572

# Force window expiration check

573

if hasattr(hourly_stats, 'expire_windows'):

574

expired_count = hourly_stats.expire_windows(current_time)

575

if expired_count > 0:

576

print(f"Expired {expired_count} old windows")

577

```

578

579

### Multi-Window Analysis

580

581

```python

582

from faust import TumblingWindow, HoppingWindow

583

584

# Multiple window sizes for different analysis

585

minute_counts = app.Table('minute-counts', default=int,

586

window=TumblingWindow(60))

587

hour_counts = app.Table('hour-counts', default=int,

588

window=TumblingWindow(3600))

589

sliding_counts = app.Table('sliding-counts', default=int,

590

window=HoppingWindow(size=600, step=60))

591

592

@app.agent()

593

async def multi_window_analysis(events):

594

async for event in events:

595

event_type = event['type']

596

597

# Update all window types simultaneously

598

minute_counts[event_type] += 1

599

hour_counts[event_type] += 1

600

sliding_counts[event_type] += 1

601

602

@app.timer(interval=60.0)

603

async def report_multi_window_stats():

604

print("=== Multi-Window Analysis ===")

605

606

for event_type in set(minute_counts.keys()) | set(hour_counts.keys()):

607

minute_count = minute_counts.get(event_type, 0)

608

hour_count = hour_counts.get(event_type, 0)

609

sliding_count = sliding_counts.get(event_type, 0)

610

611

print(f"{event_type}:")

612

print(f" Last minute: {minute_count}")

613

print(f" Last hour: {hour_count}")

614

print(f" Sliding 10min: {sliding_count}")

615

```

616

617

## Type Interfaces

618

619

```python { .api }

620

from typing import Protocol, List, Tuple, Optional

621

622

class WindowT(Protocol):

623

"""Type interface for Window."""

624

625

expires: Optional[float]

626

ident: str

627

628

def ranges(self, timestamp: float) -> List[Tuple[float, float]]: ...

629

def stale(self, timestamp: float, latest_timestamp: float) -> bool: ...

630

def current(self, timestamp: float) -> Tuple[float, float]: ...

631

632

class TumblingWindowT(WindowT, Protocol):

633

"""Type interface for TumblingWindow."""

634

635

size: float

636

637

class HoppingWindowT(WindowT, Protocol):

638

"""Type interface for HoppingWindow."""

639

640

size: float

641

step: float

642

643

class SlidingWindowT(WindowT, Protocol):

644

"""Type interface for SlidingWindow."""

645

646

before: float

647

after: float

648

total_size: float

649

```