or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

modules

redis-bloom.mdredis-json.mdredis-search.mdredis-timeseries.md
async-support.mdcluster-support.mdconnection-management.mdcore-client.mddistributed-locking.mderror-handling.mdhigh-availability.mdindex.mdpipelines-transactions.mdpubsub-messaging.md

redis-timeseries.mddocs/modules/

0

# RedisTimeSeries

1

2

RedisTimeSeries provides time series data structure support for Redis, enabling efficient storage, querying, and analysis of time-stamped data. It supports downsampling, aggregations, and real-time analytics for monitoring and IoT applications.

3

4

## Capabilities

5

6

### Time Series Creation and Management

7

8

Create and configure time series with retention policies and labels.

9

10

```python { .api }

11

def ts_create(

12

self,

13

key: str,

14

retention_msecs: Optional[int] = None,

15

uncompressed: bool = False,

16

labels: Optional[Dict[str, str]] = None,

17

duplicate_policy: Optional[str] = None

18

) -> str: ...

19

20

def ts_del(

21

self,

22

key: str,

23

from_timestamp: int,

24

to_timestamp: int

25

) -> int: ...

26

27

def ts_alter(

28

self,

29

key: str,

30

retention_msecs: Optional[int] = None,

31

labels: Optional[Dict[str, str]] = None,

32

duplicate_policy: Optional[str] = None

33

) -> str: ...

34

35

def ts_info(self, key: str) -> Dict[str, Any]: ...

36

```

37

38

### Data Ingestion

39

40

Add time series data points with timestamps and values.

41

42

```python { .api }

43

def ts_add(

44

self,

45

key: str,

46

timestamp: Union[int, str],

47

value: float,

48

retention_msecs: Optional[int] = None,

49

uncompressed: bool = False,

50

labels: Optional[Dict[str, str]] = None,

51

duplicate_policy: Optional[str] = None

52

) -> int: ...

53

54

def ts_madd(

55

self,

56

*args: Tuple[str, Union[int, str], float]

57

) -> List[int]: ...

58

59

def ts_incrby(

60

self,

61

key: str,

62

value: float,

63

timestamp: Optional[Union[int, str]] = None,

64

retention_msecs: Optional[int] = None,

65

uncompressed: bool = False,

66

labels: Optional[Dict[str, str]] = None

67

) -> int: ...

68

69

def ts_decrby(

70

self,

71

key: str,

72

value: float,

73

timestamp: Optional[Union[int, str]] = None,

74

retention_msecs: Optional[int] = None,

75

uncompressed: bool = False,

76

labels: Optional[Dict[str, str]] = None

77

) -> int: ...

78

```

79

80

### Data Retrieval and Querying

81

82

Query time series data with range queries and aggregations.

83

84

```python { .api }

85

def ts_get(self, key: str) -> Optional[Tuple[int, float]]: ...

86

87

def ts_mget(

88

self,

89

filters: List[str],

90

with_labels: bool = False

91

) -> List[Dict[str, Any]]: ...

92

93

def ts_range(

94

self,

95

key: str,

96

from_timestamp: Union[int, str],

97

to_timestamp: Union[int, str],

98

count: Optional[int] = None,

99

aggregation_type: Optional[str] = None,

100

bucket_size_msec: Optional[int] = None,

101

with_labels: bool = False,

102

filter_by_ts: Optional[List[int]] = None,

103

filter_by_min_value: Optional[float] = None,

104

filter_by_max_value: Optional[float] = None

105

) -> List[Tuple[int, float]]: ...

106

107

def ts_revrange(

108

self,

109

key: str,

110

from_timestamp: Union[int, str],

111

to_timestamp: Union[int, str],

112

count: Optional[int] = None,

113

aggregation_type: Optional[str] = None,

114

bucket_size_msec: Optional[int] = None,

115

with_labels: bool = False,

116

filter_by_ts: Optional[List[int]] = None,

117

filter_by_min_value: Optional[float] = None,

118

filter_by_max_value: Optional[float] = None

119

) -> List[Tuple[int, float]]: ...

120

121

def ts_mrange(

122

self,

123

from_timestamp: Union[int, str],

124

to_timestamp: Union[int, str],

125

filters: List[str],

126

count: Optional[int] = None,

127

aggregation_type: Optional[str] = None,

128

bucket_size_msec: Optional[int] = None,

129

with_labels: bool = False,

130

filter_by_ts: Optional[List[int]] = None,

131

filter_by_min_value: Optional[float] = None,

132

filter_by_max_value: Optional[float] = None,

133

groupby: Optional[str] = None,

134

reduce: Optional[str] = None

135

) -> List[Dict[str, Any]]: ...

136

137

def ts_mrevrange(

138

self,

139

from_timestamp: Union[int, str],

140

to_timestamp: Union[int, str],

141

filters: List[str],

142

count: Optional[int] = None,

143

aggregation_type: Optional[str] = None,

144

bucket_size_msec: Optional[int] = None,

145

with_labels: bool = False,

146

filter_by_ts: Optional[List[int]] = None,

147

filter_by_min_value: Optional[float] = None,

148

filter_by_max_value: Optional[float] = None,

149

groupby: Optional[str] = None,

150

reduce: Optional[str] = None

151

) -> List[Dict[str, Any]]: ...

152

```

153

154

### Rules and Downsampling

155

156

Create compaction rules for automatic downsampling and aggregation.

157

158

```python { .api }

159

def ts_createrule(

160

self,

161

source_key: str,

162

dest_key: str,

163

aggregation_type: str,

164

bucket_size_msec: int

165

) -> str: ...

166

167

def ts_deleterule(

168

self,

169

source_key: str,

170

dest_key: str

171

) -> str: ...

172

```

173

174

### Query and Metadata Operations

175

176

Query time series metadata and perform administrative operations.

177

178

```python { .api }

179

def ts_queryindex(self, filters: List[str]) -> List[str]: ...

180

```

181

182

## Usage Examples

183

184

### Basic Time Series Operations

185

186

```python

187

import redis

188

import time

189

from datetime import datetime, timedelta

190

191

r = redis.Redis(host='localhost', port=6379, decode_responses=True)

192

193

# Create time series for temperature monitoring

194

def create_temperature_series():

195

# Create time series with retention and labels

196

result = r.ts().create(

197

"temperature:sensor1",

198

retention_msecs=86400000, # 24 hours retention

199

labels={

200

"sensor_id": "sensor1",

201

"location": "server_room",

202

"type": "temperature"

203

}

204

)

205

print(f"Created temperature series: {result}")

206

207

# Create series for humidity

208

r.ts().create(

209

"humidity:sensor1",

210

retention_msecs=86400000,

211

labels={

212

"sensor_id": "sensor1",

213

"location": "server_room",

214

"type": "humidity"

215

}

216

)

217

print("Created humidity series")

218

219

create_temperature_series()

220

221

# Add data points

222

def add_sensor_data():

223

current_time = int(time.time() * 1000) # Current timestamp in milliseconds

224

225

# Add individual data points

226

temp_timestamp = r.ts().add("temperature:sensor1", current_time, 23.5)

227

humidity_timestamp = r.ts().add("humidity:sensor1", current_time, 65.2)

228

229

print(f"Added temperature reading at {temp_timestamp}")

230

print(f"Added humidity reading at {humidity_timestamp}")

231

232

# Add multiple data points at once

233

sensor_data = [

234

("temperature:sensor1", current_time + 1000, 23.7),

235

("humidity:sensor1", current_time + 1000, 64.8),

236

("temperature:sensor1", current_time + 2000, 24.1),

237

("humidity:sensor1", current_time + 2000, 66.1)

238

]

239

240

timestamps = r.ts().madd(*sensor_data)

241

print(f"Batch added data points: {timestamps}")

242

243

add_sensor_data()

244

```

245

246

### Time Series Queries and Aggregations

247

248

```python

249

import redis

250

import time

251

252

r = redis.Redis(host='localhost', port=6379, decode_responses=True)

253

254

def query_sensor_data():

255

# Get latest values

256

latest_temp = r.ts().get("temperature:sensor1")

257

latest_humidity = r.ts().get("humidity:sensor1")

258

259

if latest_temp:

260

timestamp, value = latest_temp

261

dt = datetime.fromtimestamp(timestamp / 1000)

262

print(f"Latest temperature: {value}°C at {dt}")

263

264

if latest_humidity:

265

timestamp, value = latest_humidity

266

dt = datetime.fromtimestamp(timestamp / 1000)

267

print(f"Latest humidity: {value}% at {dt}")

268

269

# Query data range (last hour)

270

end_time = int(time.time() * 1000)

271

start_time = end_time - (60 * 60 * 1000) # 1 hour ago

272

273

temp_data = r.ts().range(

274

"temperature:sensor1",

275

start_time,

276

end_time,

277

count=100

278

)

279

280

print(f"Temperature readings in last hour: {len(temp_data)} points")

281

for timestamp, value in temp_data[-5:]: # Last 5 readings

282

dt = datetime.fromtimestamp(timestamp / 1000)

283

print(f" {dt}: {value}°C")

284

285

query_sensor_data()

286

```

287

288

### Advanced Aggregation Queries

289

290

```python

291

import redis

292

import time

293

294

r = redis.Redis(host='localhost', port=6379, decode_responses=True)

295

296

def advanced_analytics():

297

end_time = int(time.time() * 1000)

298

start_time = end_time - (24 * 60 * 60 * 1000) # 24 hours ago

299

300

# Get average temperature per hour

301

hourly_avg = r.ts().range(

302

"temperature:sensor1",

303

start_time,

304

end_time,

305

aggregation_type="AVG",

306

bucket_size_msec=3600000 # 1 hour buckets

307

)

308

309

print("Hourly average temperatures:")

310

for timestamp, avg_temp in hourly_avg:

311

dt = datetime.fromtimestamp(timestamp / 1000)

312

print(f" {dt.strftime('%Y-%m-%d %H:00')}: {avg_temp:.2f}°C")

313

314

# Get min/max temperature in last 24 hours

315

min_temp = r.ts().range(

316

"temperature:sensor1",

317

start_time,

318

end_time,

319

aggregation_type="MIN",

320

bucket_size_msec=24 * 3600000 # 24 hour bucket

321

)

322

323

max_temp = r.ts().range(

324

"temperature:sensor1",

325

start_time,

326

end_time,

327

aggregation_type="MAX",

328

bucket_size_msec=24 * 3600000

329

)

330

331

if min_temp and max_temp:

332

print(f"24h Min temperature: {min_temp[0][1]:.2f}°C")

333

print(f"24h Max temperature: {max_temp[0][1]:.2f}°C")

334

335

advanced_analytics()

336

```

337

338

### Multi-Series Queries

339

340

```python

341

import redis

342

import time

343

344

r = redis.Redis(host='localhost', port=6379, decode_responses=True)

345

346

def multi_sensor_queries():

347

# Get latest values from all sensors

348

latest_readings = r.ts().mget(

349

filters=["location=server_room"],

350

with_labels=True

351

)

352

353

print("Latest readings from all server room sensors:")

354

for reading in latest_readings:

355

key = reading.get('key', 'unknown')

356

labels = reading.get('labels', {})

357

value_timestamp = reading.get('value', [None, None])

358

359

if value_timestamp[0] is not None:

360

timestamp, value = value_timestamp

361

dt = datetime.fromtimestamp(timestamp / 1000)

362

sensor_type = labels.get('type', 'unknown')

363

print(f" {sensor_type}: {value} at {dt}")

364

365

# Query range data from multiple series

366

end_time = int(time.time() * 1000)

367

start_time = end_time - (3600000) # 1 hour ago

368

369

multi_range_data = r.ts().mrange(

370

start_time,

371

end_time,

372

filters=["location=server_room"],

373

aggregation_type="AVG",

374

bucket_size_msec=600000, # 10-minute buckets

375

with_labels=True

376

)

377

378

print("\nAverage readings per 10 minutes (last hour):")

379

for series in multi_range_data:

380

key = series.get('key', 'unknown')

381

labels = series.get('labels', {})

382

values = series.get('values', [])

383

sensor_type = labels.get('type', 'unknown')

384

385

print(f"\n{sensor_type} ({key}):")

386

for timestamp, value in values[-3:]: # Last 3 readings

387

dt = datetime.fromtimestamp(timestamp / 1000)

388

print(f" {dt.strftime('%H:%M')}: {value:.2f}")

389

390

multi_sensor_queries()

391

```

392

393

### Automatic Downsampling with Rules

394

395

```python

396

import redis

397

398

r = redis.Redis(host='localhost', port=6379, decode_responses=True)

399

400

def setup_downsampling():

401

# Create destination series for downsampled data

402

r.ts().create(

403

"temperature:sensor1:hourly",

404

retention_msecs=30 * 24 * 3600000, # 30 days retention

405

labels={

406

"sensor_id": "sensor1",

407

"location": "server_room",

408

"type": "temperature",

409

"resolution": "hourly"

410

}

411

)

412

413

r.ts().create(

414

"temperature:sensor1:daily",

415

retention_msecs=365 * 24 * 3600000, # 1 year retention

416

labels={

417

"sensor_id": "sensor1",

418

"location": "server_room",

419

"type": "temperature",

420

"resolution": "daily"

421

}

422

)

423

424

# Create downsampling rules

425

# Raw data -> Hourly averages

426

r.ts().createrule(

427

"temperature:sensor1",

428

"temperature:sensor1:hourly",

429

"AVG",

430

3600000 # 1 hour

431

)

432

433

# Hourly data -> Daily averages

434

r.ts().createrule(

435

"temperature:sensor1:hourly",

436

"temperature:sensor1:daily",

437

"AVG",

438

24 * 3600000 # 24 hours

439

)

440

441

print("Created downsampling rules")

442

443

# Verify rules were created

444

info = r.ts().info("temperature:sensor1")

445

print(f"Rules for sensor1: {info.get('rules', [])}")

446

447

setup_downsampling()

448

449

def query_downsampled_data():

450

# Query different resolution data

451

end_time = int(time.time() * 1000)

452

453

# Last 7 days of daily averages

454

start_time = end_time - (7 * 24 * 3600000)

455

daily_data = r.ts().range("temperature:sensor1:daily", start_time, end_time)

456

457

print("Daily temperature averages (last 7 days):")

458

for timestamp, avg_temp in daily_data:

459

dt = datetime.fromtimestamp(timestamp / 1000)

460

print(f" {dt.strftime('%Y-%m-%d')}: {avg_temp:.2f}°C")

461

462

# Last 24 hours of hourly averages

463

start_time = end_time - (24 * 3600000)

464

hourly_data = r.ts().range("temperature:sensor1:hourly", start_time, end_time)

465

466

print("\nHourly temperature averages (last 24 hours):")

467

for timestamp, avg_temp in hourly_data[-6:]: # Last 6 hours

468

dt = datetime.fromtimestamp(timestamp / 1000)

469

print(f" {dt.strftime('%H:00')}: {avg_temp:.2f}°C")

470

471

# Wait a moment for rules to process, then query

472

time.sleep(1)

473

query_downsampled_data()

474

```

475

476

### Real-time Monitoring and Alerting

477

478

```python

479

import redis

480

import time

481

import threading

482

483

r = redis.Redis(host='localhost', port=6379, decode_responses=True)

484

485

class TemperatureMonitor:

486

def __init__(self, redis_client):

487

self.r = redis_client

488

self.running = False

489

self.alert_threshold_high = 25.0

490

self.alert_threshold_low = 20.0

491

492

def add_reading(self, temperature):

493

"""Add a temperature reading and check for alerts"""

494

timestamp = r.ts().add("temperature:sensor1", "*", temperature)

495

496

# Check for alerts

497

if temperature > self.alert_threshold_high:

498

self.trigger_alert("HIGH", temperature, timestamp)

499

elif temperature < self.alert_threshold_low:

500

self.trigger_alert("LOW", temperature, timestamp)

501

502

return timestamp

503

504

def trigger_alert(self, alert_type, temperature, timestamp):

505

"""Trigger temperature alert"""

506

dt = datetime.fromtimestamp(timestamp / 1000)

507

print(f"🚨 ALERT: {alert_type} temperature {temperature}°C at {dt}")

508

509

# Store alert in separate time series

510

alert_series = f"alerts:temperature:sensor1"

511

if not self.series_exists(alert_series):

512

self.r.ts().create(

513

alert_series,

514

labels={

515

"sensor_id": "sensor1",

516

"type": "alert",

517

"metric": "temperature"

518

}

519

)

520

521

# Store alert with severity as value

522

severity = 2 if alert_type == "HIGH" else 1

523

self.r.ts().add(alert_series, timestamp, severity)

524

525

def series_exists(self, key):

526

"""Check if time series exists"""

527

try:

528

self.r.ts().info(key)

529

return True

530

except:

531

return False

532

533

def get_recent_stats(self, minutes=60):

534

"""Get statistics for recent data"""

535

end_time = int(time.time() * 1000)

536

start_time = end_time - (minutes * 60 * 1000)

537

538

# Get recent data

539

data = self.r.ts().range("temperature:sensor1", start_time, end_time)

540

541

if not data:

542

return None

543

544

temperatures = [value for _, value in data]

545

546

return {

547

"count": len(temperatures),

548

"min": min(temperatures),

549

"max": max(temperatures),

550

"avg": sum(temperatures) / len(temperatures),

551

"latest": temperatures[-1]

552

}

553

554

# Usage example

555

monitor = TemperatureMonitor(r)

556

557

# Simulate temperature readings with some alerts

558

test_temperatures = [21.5, 22.1, 23.8, 24.5, 25.8, 26.2, 24.1, 22.9, 19.5, 18.2, 21.0]

559

560

print("Simulating temperature readings...")

561

for temp in test_temperatures:

562

monitor.add_reading(temp)

563

time.sleep(0.1)

564

565

# Get statistics

566

stats = monitor.get_recent_stats(minutes=5)

567

if stats:

568

print(f"\nRecent temperature statistics:")

569

print(f" Count: {stats['count']} readings")

570

print(f" Min: {stats['min']:.1f}°C")

571

print(f" Max: {stats['max']:.1f}°C")

572

print(f" Average: {stats['avg']:.1f}°C")

573

print(f" Latest: {stats['latest']:.1f}°C")

574

```

575

576

### Time Series Data Export and Analysis

577

578

```python

579

import redis

580

import csv

581

import json

582

from datetime import datetime

583

584

r = redis.Redis(host='localhost', port=6379, decode_responses=True)

585

586

def export_timeseries_data():

587

"""Export time series data to different formats"""

588

589

# Get data for export

590

end_time = int(time.time() * 1000)

591

start_time = end_time - (24 * 3600000) # Last 24 hours

592

593

temp_data = r.ts().range("temperature:sensor1", start_time, end_time)

594

humidity_data = r.ts().range("humidity:sensor1", start_time, end_time)

595

596

# Export to CSV

597

with open('/tmp/sensor_data.csv', 'w', newline='') as csvfile:

598

writer = csv.writer(csvfile)

599

writer.writerow(['timestamp', 'datetime', 'temperature', 'humidity'])

600

601

# Combine data by timestamp

602

temp_dict = {ts: temp for ts, temp in temp_data}

603

humidity_dict = {ts: humidity for ts, humidity in humidity_data}

604

605

all_timestamps = sorted(set(temp_dict.keys()) | set(humidity_dict.keys()))

606

607

for timestamp in all_timestamps:

608

dt = datetime.fromtimestamp(timestamp / 1000)

609

temp = temp_dict.get(timestamp, '')

610

humidity = humidity_dict.get(timestamp, '')

611

writer.writerow([timestamp, dt.isoformat(), temp, humidity])

612

613

print("Exported data to /tmp/sensor_data.csv")

614

615

# Export to JSON

616

export_data = {

617

"metadata": {

618

"export_time": datetime.now().isoformat(),

619

"start_time": datetime.fromtimestamp(start_time / 1000).isoformat(),

620

"end_time": datetime.fromtimestamp(end_time / 1000).isoformat(),

621

"sensor_id": "sensor1",

622

"location": "server_room"

623

},

624

"data": {

625

"temperature": [

626

{"timestamp": ts, "value": temp} for ts, temp in temp_data

627

],

628

"humidity": [

629

{"timestamp": ts, "value": humidity} for ts, humidity in humidity_data

630

]

631

}

632

}

633

634

with open('/tmp/sensor_data.json', 'w') as jsonfile:

635

json.dump(export_data, jsonfile, indent=2)

636

637

print("Exported data to /tmp/sensor_data.json")

638

639

def analyze_sensor_patterns():

640

"""Analyze patterns in sensor data"""

641

end_time = int(time.time() * 1000)

642

start_time = end_time - (7 * 24 * 3600000) # Last 7 days

643

644

# Get hourly averages for pattern analysis

645

hourly_temps = r.ts().range(

646

"temperature:sensor1",

647

start_time,

648

end_time,

649

aggregation_type="AVG",

650

bucket_size_msec=3600000 # 1 hour

651

)

652

653

if not hourly_temps:

654

print("No data available for analysis")

655

return

656

657

# Analyze by hour of day

658

hourly_patterns = {}

659

for timestamp, temp in hourly_temps:

660

dt = datetime.fromtimestamp(timestamp / 1000)

661

hour = dt.hour

662

663

if hour not in hourly_patterns:

664

hourly_patterns[hour] = []

665

hourly_patterns[hour].append(temp)

666

667

print("Temperature patterns by hour of day:")

668

for hour in sorted(hourly_patterns.keys()):

669

temps = hourly_patterns[hour]

670

avg_temp = sum(temps) / len(temps)

671

min_temp = min(temps)

672

max_temp = max(temps)

673

print(f" {hour:02d}:00 - Avg: {avg_temp:.1f}°C, Range: {min_temp:.1f}-{max_temp:.1f}°C")

674

675

export_timeseries_data()

676

analyze_sensor_patterns()

677

```