0
# RedisTimeSeries
1
2
RedisTimeSeries provides time series data structure support for Redis, enabling efficient storage, querying, and analysis of time-stamped data. It supports downsampling, aggregations, and real-time analytics for monitoring and IoT applications.
3
4
## Capabilities
5
6
### Time Series Creation and Management
7
8
Create and configure time series with retention policies and labels.
9
10
```python { .api }
11
def ts_create(
12
self,
13
key: str,
14
retention_msecs: Optional[int] = None,
15
uncompressed: bool = False,
16
labels: Optional[Dict[str, str]] = None,
17
duplicate_policy: Optional[str] = None
18
) -> str: ...
19
20
def ts_del(
21
self,
22
key: str,
23
from_timestamp: int,
24
to_timestamp: int
25
) -> int: ...
26
27
def ts_alter(
28
self,
29
key: str,
30
retention_msecs: Optional[int] = None,
31
labels: Optional[Dict[str, str]] = None,
32
duplicate_policy: Optional[str] = None
33
) -> str: ...
34
35
def ts_info(self, key: str) -> Dict[str, Any]: ...
36
```
37
38
### Data Ingestion
39
40
Add time series data points with timestamps and values.
41
42
```python { .api }
43
def ts_add(
44
self,
45
key: str,
46
timestamp: Union[int, str],
47
value: float,
48
retention_msecs: Optional[int] = None,
49
uncompressed: bool = False,
50
labels: Optional[Dict[str, str]] = None,
51
duplicate_policy: Optional[str] = None
52
) -> int: ...
53
54
def ts_madd(
55
self,
56
*args: Tuple[str, Union[int, str], float]
57
) -> List[int]: ...
58
59
def ts_incrby(
60
self,
61
key: str,
62
value: float,
63
timestamp: Optional[Union[int, str]] = None,
64
retention_msecs: Optional[int] = None,
65
uncompressed: bool = False,
66
labels: Optional[Dict[str, str]] = None
67
) -> int: ...
68
69
def ts_decrby(
70
self,
71
key: str,
72
value: float,
73
timestamp: Optional[Union[int, str]] = None,
74
retention_msecs: Optional[int] = None,
75
uncompressed: bool = False,
76
labels: Optional[Dict[str, str]] = None
77
) -> int: ...
78
```
79
80
### Data Retrieval and Querying
81
82
Query time series data with range queries and aggregations.
83
84
```python { .api }
85
def ts_get(self, key: str) -> Optional[Tuple[int, float]]: ...
86
87
def ts_mget(
88
self,
89
filters: List[str],
90
with_labels: bool = False
91
) -> List[Dict[str, Any]]: ...
92
93
def ts_range(
94
self,
95
key: str,
96
from_timestamp: Union[int, str],
97
to_timestamp: Union[int, str],
98
count: Optional[int] = None,
99
aggregation_type: Optional[str] = None,
100
bucket_size_msec: Optional[int] = None,
101
with_labels: bool = False,
102
filter_by_ts: Optional[List[int]] = None,
103
filter_by_min_value: Optional[float] = None,
104
filter_by_max_value: Optional[float] = None
105
) -> List[Tuple[int, float]]: ...
106
107
def ts_revrange(
108
self,
109
key: str,
110
from_timestamp: Union[int, str],
111
to_timestamp: Union[int, str],
112
count: Optional[int] = None,
113
aggregation_type: Optional[str] = None,
114
bucket_size_msec: Optional[int] = None,
115
with_labels: bool = False,
116
filter_by_ts: Optional[List[int]] = None,
117
filter_by_min_value: Optional[float] = None,
118
filter_by_max_value: Optional[float] = None
119
) -> List[Tuple[int, float]]: ...
120
121
def ts_mrange(
122
self,
123
from_timestamp: Union[int, str],
124
to_timestamp: Union[int, str],
125
filters: List[str],
126
count: Optional[int] = None,
127
aggregation_type: Optional[str] = None,
128
bucket_size_msec: Optional[int] = None,
129
with_labels: bool = False,
130
filter_by_ts: Optional[List[int]] = None,
131
filter_by_min_value: Optional[float] = None,
132
filter_by_max_value: Optional[float] = None,
133
groupby: Optional[str] = None,
134
reduce: Optional[str] = None
135
) -> List[Dict[str, Any]]: ...
136
137
def ts_mrevrange(
138
self,
139
from_timestamp: Union[int, str],
140
to_timestamp: Union[int, str],
141
filters: List[str],
142
count: Optional[int] = None,
143
aggregation_type: Optional[str] = None,
144
bucket_size_msec: Optional[int] = None,
145
with_labels: bool = False,
146
filter_by_ts: Optional[List[int]] = None,
147
filter_by_min_value: Optional[float] = None,
148
filter_by_max_value: Optional[float] = None,
149
groupby: Optional[str] = None,
150
reduce: Optional[str] = None
151
) -> List[Dict[str, Any]]: ...
152
```
153
154
### Rules and Downsampling
155
156
Create compaction rules for automatic downsampling and aggregation.
157
158
```python { .api }
159
def ts_createrule(
160
self,
161
source_key: str,
162
dest_key: str,
163
aggregation_type: str,
164
bucket_size_msec: int
165
) -> str: ...
166
167
def ts_deleterule(
168
self,
169
source_key: str,
170
dest_key: str
171
) -> str: ...
172
```
173
174
### Query and Metadata Operations
175
176
Query time series metadata and perform administrative operations.
177
178
```python { .api }
179
def ts_queryindex(self, filters: List[str]) -> List[str]: ...
180
```
181
182
## Usage Examples
183
184
### Basic Time Series Operations
185
186
```python
187
import redis
188
import time
189
from datetime import datetime, timedelta
190
191
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
192
193
# Create time series for temperature monitoring
194
def create_temperature_series():
195
# Create time series with retention and labels
196
result = r.ts().create(
197
"temperature:sensor1",
198
retention_msecs=86400000, # 24 hours retention
199
labels={
200
"sensor_id": "sensor1",
201
"location": "server_room",
202
"type": "temperature"
203
}
204
)
205
print(f"Created temperature series: {result}")
206
207
# Create series for humidity
208
r.ts().create(
209
"humidity:sensor1",
210
retention_msecs=86400000,
211
labels={
212
"sensor_id": "sensor1",
213
"location": "server_room",
214
"type": "humidity"
215
}
216
)
217
print("Created humidity series")
218
219
create_temperature_series()
220
221
# Add data points
222
def add_sensor_data():
223
current_time = int(time.time() * 1000) # Current timestamp in milliseconds
224
225
# Add individual data points
226
temp_timestamp = r.ts().add("temperature:sensor1", current_time, 23.5)
227
humidity_timestamp = r.ts().add("humidity:sensor1", current_time, 65.2)
228
229
print(f"Added temperature reading at {temp_timestamp}")
230
print(f"Added humidity reading at {humidity_timestamp}")
231
232
# Add multiple data points at once
233
sensor_data = [
234
("temperature:sensor1", current_time + 1000, 23.7),
235
("humidity:sensor1", current_time + 1000, 64.8),
236
("temperature:sensor1", current_time + 2000, 24.1),
237
("humidity:sensor1", current_time + 2000, 66.1)
238
]
239
240
timestamps = r.ts().madd(*sensor_data)
241
print(f"Batch added data points: {timestamps}")
242
243
add_sensor_data()
244
```
245
246
### Time Series Queries and Aggregations
247
248
```python
249
import redis
250
import time
251
252
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
253
254
def query_sensor_data():
255
# Get latest values
256
latest_temp = r.ts().get("temperature:sensor1")
257
latest_humidity = r.ts().get("humidity:sensor1")
258
259
if latest_temp:
260
timestamp, value = latest_temp
261
dt = datetime.fromtimestamp(timestamp / 1000)
262
print(f"Latest temperature: {value}°C at {dt}")
263
264
if latest_humidity:
265
timestamp, value = latest_humidity
266
dt = datetime.fromtimestamp(timestamp / 1000)
267
print(f"Latest humidity: {value}% at {dt}")
268
269
# Query data range (last hour)
270
end_time = int(time.time() * 1000)
271
start_time = end_time - (60 * 60 * 1000) # 1 hour ago
272
273
temp_data = r.ts().range(
274
"temperature:sensor1",
275
start_time,
276
end_time,
277
count=100
278
)
279
280
print(f"Temperature readings in last hour: {len(temp_data)} points")
281
for timestamp, value in temp_data[-5:]: # Last 5 readings
282
dt = datetime.fromtimestamp(timestamp / 1000)
283
print(f" {dt}: {value}°C")
284
285
query_sensor_data()
286
```
287
288
### Advanced Aggregation Queries
289
290
```python
291
import redis
292
import time
293
294
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
295
296
def advanced_analytics():
297
end_time = int(time.time() * 1000)
298
start_time = end_time - (24 * 60 * 60 * 1000) # 24 hours ago
299
300
# Get average temperature per hour
301
hourly_avg = r.ts().range(
302
"temperature:sensor1",
303
start_time,
304
end_time,
305
aggregation_type="AVG",
306
bucket_size_msec=3600000 # 1 hour buckets
307
)
308
309
print("Hourly average temperatures:")
310
for timestamp, avg_temp in hourly_avg:
311
dt = datetime.fromtimestamp(timestamp / 1000)
312
print(f" {dt.strftime('%Y-%m-%d %H:00')}: {avg_temp:.2f}°C")
313
314
# Get min/max temperature in last 24 hours
315
min_temp = r.ts().range(
316
"temperature:sensor1",
317
start_time,
318
end_time,
319
aggregation_type="MIN",
320
bucket_size_msec=24 * 3600000 # 24 hour bucket
321
)
322
323
max_temp = r.ts().range(
324
"temperature:sensor1",
325
start_time,
326
end_time,
327
aggregation_type="MAX",
328
bucket_size_msec=24 * 3600000
329
)
330
331
if min_temp and max_temp:
332
print(f"24h Min temperature: {min_temp[0][1]:.2f}°C")
333
print(f"24h Max temperature: {max_temp[0][1]:.2f}°C")
334
335
advanced_analytics()
336
```
337
338
### Multi-Series Queries
339
340
```python
341
import redis
342
import time
343
344
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
345
346
def multi_sensor_queries():
347
# Get latest values from all sensors
348
latest_readings = r.ts().mget(
349
filters=["location=server_room"],
350
with_labels=True
351
)
352
353
print("Latest readings from all server room sensors:")
354
for reading in latest_readings:
355
key = reading.get('key', 'unknown')
356
labels = reading.get('labels', {})
357
value_timestamp = reading.get('value', [None, None])
358
359
if value_timestamp[0] is not None:
360
timestamp, value = value_timestamp
361
dt = datetime.fromtimestamp(timestamp / 1000)
362
sensor_type = labels.get('type', 'unknown')
363
print(f" {sensor_type}: {value} at {dt}")
364
365
# Query range data from multiple series
366
end_time = int(time.time() * 1000)
367
start_time = end_time - (3600000) # 1 hour ago
368
369
multi_range_data = r.ts().mrange(
370
start_time,
371
end_time,
372
filters=["location=server_room"],
373
aggregation_type="AVG",
374
bucket_size_msec=600000, # 10-minute buckets
375
with_labels=True
376
)
377
378
print("\nAverage readings per 10 minutes (last hour):")
379
for series in multi_range_data:
380
key = series.get('key', 'unknown')
381
labels = series.get('labels', {})
382
values = series.get('values', [])
383
sensor_type = labels.get('type', 'unknown')
384
385
print(f"\n{sensor_type} ({key}):")
386
for timestamp, value in values[-3:]: # Last 3 readings
387
dt = datetime.fromtimestamp(timestamp / 1000)
388
print(f" {dt.strftime('%H:%M')}: {value:.2f}")
389
390
multi_sensor_queries()
391
```
392
393
### Automatic Downsampling with Rules
394
395
```python
396
import redis
397
398
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
399
400
def setup_downsampling():
401
# Create destination series for downsampled data
402
r.ts().create(
403
"temperature:sensor1:hourly",
404
retention_msecs=30 * 24 * 3600000, # 30 days retention
405
labels={
406
"sensor_id": "sensor1",
407
"location": "server_room",
408
"type": "temperature",
409
"resolution": "hourly"
410
}
411
)
412
413
r.ts().create(
414
"temperature:sensor1:daily",
415
retention_msecs=365 * 24 * 3600000, # 1 year retention
416
labels={
417
"sensor_id": "sensor1",
418
"location": "server_room",
419
"type": "temperature",
420
"resolution": "daily"
421
}
422
)
423
424
# Create downsampling rules
425
# Raw data -> Hourly averages
426
r.ts().createrule(
427
"temperature:sensor1",
428
"temperature:sensor1:hourly",
429
"AVG",
430
3600000 # 1 hour
431
)
432
433
# Hourly data -> Daily averages
434
r.ts().createrule(
435
"temperature:sensor1:hourly",
436
"temperature:sensor1:daily",
437
"AVG",
438
24 * 3600000 # 24 hours
439
)
440
441
print("Created downsampling rules")
442
443
# Verify rules were created
444
info = r.ts().info("temperature:sensor1")
445
print(f"Rules for sensor1: {info.get('rules', [])}")
446
447
setup_downsampling()
448
449
def query_downsampled_data():
450
# Query different resolution data
451
end_time = int(time.time() * 1000)
452
453
# Last 7 days of daily averages
454
start_time = end_time - (7 * 24 * 3600000)
455
daily_data = r.ts().range("temperature:sensor1:daily", start_time, end_time)
456
457
print("Daily temperature averages (last 7 days):")
458
for timestamp, avg_temp in daily_data:
459
dt = datetime.fromtimestamp(timestamp / 1000)
460
print(f" {dt.strftime('%Y-%m-%d')}: {avg_temp:.2f}°C")
461
462
# Last 24 hours of hourly averages
463
start_time = end_time - (24 * 3600000)
464
hourly_data = r.ts().range("temperature:sensor1:hourly", start_time, end_time)
465
466
print("\nHourly temperature averages (last 24 hours):")
467
for timestamp, avg_temp in hourly_data[-6:]: # Last 6 hours
468
dt = datetime.fromtimestamp(timestamp / 1000)
469
print(f" {dt.strftime('%H:00')}: {avg_temp:.2f}°C")
470
471
# Wait a moment for rules to process, then query
472
time.sleep(1)
473
query_downsampled_data()
474
```
475
476
### Real-time Monitoring and Alerting
477
478
```python
479
import redis
480
import time
481
import threading
482
483
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
484
485
class TemperatureMonitor:
486
def __init__(self, redis_client):
487
self.r = redis_client
488
self.running = False
489
self.alert_threshold_high = 25.0
490
self.alert_threshold_low = 20.0
491
492
def add_reading(self, temperature):
493
"""Add a temperature reading and check for alerts"""
494
timestamp = r.ts().add("temperature:sensor1", "*", temperature)
495
496
# Check for alerts
497
if temperature > self.alert_threshold_high:
498
self.trigger_alert("HIGH", temperature, timestamp)
499
elif temperature < self.alert_threshold_low:
500
self.trigger_alert("LOW", temperature, timestamp)
501
502
return timestamp
503
504
def trigger_alert(self, alert_type, temperature, timestamp):
505
"""Trigger temperature alert"""
506
dt = datetime.fromtimestamp(timestamp / 1000)
507
print(f"🚨 ALERT: {alert_type} temperature {temperature}°C at {dt}")
508
509
# Store alert in separate time series
510
alert_series = f"alerts:temperature:sensor1"
511
if not self.series_exists(alert_series):
512
self.r.ts().create(
513
alert_series,
514
labels={
515
"sensor_id": "sensor1",
516
"type": "alert",
517
"metric": "temperature"
518
}
519
)
520
521
# Store alert with severity as value
522
severity = 2 if alert_type == "HIGH" else 1
523
self.r.ts().add(alert_series, timestamp, severity)
524
525
def series_exists(self, key):
526
"""Check if time series exists"""
527
try:
528
self.r.ts().info(key)
529
return True
530
except:
531
return False
532
533
def get_recent_stats(self, minutes=60):
534
"""Get statistics for recent data"""
535
end_time = int(time.time() * 1000)
536
start_time = end_time - (minutes * 60 * 1000)
537
538
# Get recent data
539
data = self.r.ts().range("temperature:sensor1", start_time, end_time)
540
541
if not data:
542
return None
543
544
temperatures = [value for _, value in data]
545
546
return {
547
"count": len(temperatures),
548
"min": min(temperatures),
549
"max": max(temperatures),
550
"avg": sum(temperatures) / len(temperatures),
551
"latest": temperatures[-1]
552
}
553
554
# Usage example
555
monitor = TemperatureMonitor(r)
556
557
# Simulate temperature readings with some alerts
558
test_temperatures = [21.5, 22.1, 23.8, 24.5, 25.8, 26.2, 24.1, 22.9, 19.5, 18.2, 21.0]
559
560
print("Simulating temperature readings...")
561
for temp in test_temperatures:
562
monitor.add_reading(temp)
563
time.sleep(0.1)
564
565
# Get statistics
566
stats = monitor.get_recent_stats(minutes=5)
567
if stats:
568
print(f"\nRecent temperature statistics:")
569
print(f" Count: {stats['count']} readings")
570
print(f" Min: {stats['min']:.1f}°C")
571
print(f" Max: {stats['max']:.1f}°C")
572
print(f" Average: {stats['avg']:.1f}°C")
573
print(f" Latest: {stats['latest']:.1f}°C")
574
```
575
576
### Time Series Data Export and Analysis
577
578
```python
579
import redis
580
import csv
581
import json
582
from datetime import datetime
583
584
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
585
586
def export_timeseries_data():
587
"""Export time series data to different formats"""
588
589
# Get data for export
590
end_time = int(time.time() * 1000)
591
start_time = end_time - (24 * 3600000) # Last 24 hours
592
593
temp_data = r.ts().range("temperature:sensor1", start_time, end_time)
594
humidity_data = r.ts().range("humidity:sensor1", start_time, end_time)
595
596
# Export to CSV
597
with open('/tmp/sensor_data.csv', 'w', newline='') as csvfile:
598
writer = csv.writer(csvfile)
599
writer.writerow(['timestamp', 'datetime', 'temperature', 'humidity'])
600
601
# Combine data by timestamp
602
temp_dict = {ts: temp for ts, temp in temp_data}
603
humidity_dict = {ts: humidity for ts, humidity in humidity_data}
604
605
all_timestamps = sorted(set(temp_dict.keys()) | set(humidity_dict.keys()))
606
607
for timestamp in all_timestamps:
608
dt = datetime.fromtimestamp(timestamp / 1000)
609
temp = temp_dict.get(timestamp, '')
610
humidity = humidity_dict.get(timestamp, '')
611
writer.writerow([timestamp, dt.isoformat(), temp, humidity])
612
613
print("Exported data to /tmp/sensor_data.csv")
614
615
# Export to JSON
616
export_data = {
617
"metadata": {
618
"export_time": datetime.now().isoformat(),
619
"start_time": datetime.fromtimestamp(start_time / 1000).isoformat(),
620
"end_time": datetime.fromtimestamp(end_time / 1000).isoformat(),
621
"sensor_id": "sensor1",
622
"location": "server_room"
623
},
624
"data": {
625
"temperature": [
626
{"timestamp": ts, "value": temp} for ts, temp in temp_data
627
],
628
"humidity": [
629
{"timestamp": ts, "value": humidity} for ts, humidity in humidity_data
630
]
631
}
632
}
633
634
with open('/tmp/sensor_data.json', 'w') as jsonfile:
635
json.dump(export_data, jsonfile, indent=2)
636
637
print("Exported data to /tmp/sensor_data.json")
638
639
def analyze_sensor_patterns():
640
"""Analyze patterns in sensor data"""
641
end_time = int(time.time() * 1000)
642
start_time = end_time - (7 * 24 * 3600000) # Last 7 days
643
644
# Get hourly averages for pattern analysis
645
hourly_temps = r.ts().range(
646
"temperature:sensor1",
647
start_time,
648
end_time,
649
aggregation_type="AVG",
650
bucket_size_msec=3600000 # 1 hour
651
)
652
653
if not hourly_temps:
654
print("No data available for analysis")
655
return
656
657
# Analyze by hour of day
658
hourly_patterns = {}
659
for timestamp, temp in hourly_temps:
660
dt = datetime.fromtimestamp(timestamp / 1000)
661
hour = dt.hour
662
663
if hour not in hourly_patterns:
664
hourly_patterns[hour] = []
665
hourly_patterns[hour].append(temp)
666
667
print("Temperature patterns by hour of day:")
668
for hour in sorted(hourly_patterns.keys()):
669
temps = hourly_patterns[hour]
670
avg_temp = sum(temps) / len(temps)
671
min_temp = min(temps)
672
max_temp = max(temps)
673
print(f" {hour:02d}:00 - Avg: {avg_temp:.1f}°C, Range: {min_temp:.1f}-{max_temp:.1f}°C")
674
675
export_timeseries_data()
676
analyze_sensor_patterns()
677
```