0
# Lifecycle Management
1
2
Index lifecycle management (ILM), snapshot lifecycle management (SLM), and data stream operations for automated data management. These operations provide comprehensive automation for data retention, archival, and backup strategies.
3
4
## Index Lifecycle Management (ILM)
5
6
### Policy Management
7
8
Create and manage index lifecycle policies for automated data management.
9
10
```python { .api }
11
def put_policy(
12
self,
13
name: str,
14
policy: Dict[str, Any],
15
master_timeout: Optional[str] = None,
16
timeout: Optional[str] = None,
17
**kwargs
18
) -> ObjectApiResponse:
19
"""
20
Create or update an index lifecycle policy.
21
22
Parameters:
23
- name: Policy name
24
- policy: Policy definition with phases and actions
25
- master_timeout: Timeout for master node response
26
- timeout: Request timeout
27
28
Returns:
29
ObjectApiResponse with policy creation result
30
"""
31
32
def get_policy(
33
self,
34
name: Optional[str] = None,
35
master_timeout: Optional[str] = None,
36
timeout: Optional[str] = None,
37
**kwargs
38
) -> ObjectApiResponse:
39
"""
40
Get index lifecycle policies.
41
42
Parameters:
43
- name: Policy name to retrieve, or None for all policies
44
- master_timeout: Timeout for master node response
45
- timeout: Request timeout
46
47
Returns:
48
ObjectApiResponse with policy information
49
"""
50
51
def delete_policy(
52
self,
53
name: str,
54
master_timeout: Optional[str] = None,
55
timeout: Optional[str] = None,
56
**kwargs
57
) -> ObjectApiResponse:
58
"""
59
Delete an index lifecycle policy.
60
61
Parameters:
62
- name: Policy name to delete
63
- master_timeout: Timeout for master node response
64
- timeout: Request timeout
65
66
Returns:
67
ObjectApiResponse with deletion result
68
"""
69
```
70
71
### Policy Execution and Monitoring
72
73
Monitor and control index lifecycle policy execution.
74
75
```python { .api }
76
def explain_lifecycle(
77
self,
78
index: Union[str, List[str]],
79
only_errors: Optional[bool] = None,
80
only_managed: Optional[bool] = None,
81
master_timeout: Optional[str] = None,
82
timeout: Optional[str] = None,
83
**kwargs
84
) -> ObjectApiResponse:
85
"""
86
Explain the lifecycle status of indices.
87
88
Parameters:
89
- index: Index name(s) to explain
90
- only_errors: Whether to only show indices with errors
91
- only_managed: Whether to only show managed indices
92
- master_timeout: Timeout for master node response
93
- timeout: Request timeout
94
95
Returns:
96
ObjectApiResponse with lifecycle explanation for each index
97
"""
98
99
def move_to_step(
100
self,
101
index: str,
102
current_step: Dict[str, Any],
103
next_step: Dict[str, Any],
104
master_timeout: Optional[str] = None,
105
timeout: Optional[str] = None,
106
**kwargs
107
) -> ObjectApiResponse:
108
"""
109
Move an index to a specific lifecycle step.
110
111
Parameters:
112
- index: Index name to move
113
- current_step: Current step information
114
- next_step: Target step information
115
- master_timeout: Timeout for master node response
116
- timeout: Request timeout
117
118
Returns:
119
ObjectApiResponse with move result
120
"""
121
122
def retry(
123
self,
124
index: Union[str, List[str]],
125
master_timeout: Optional[str] = None,
126
timeout: Optional[str] = None,
127
**kwargs
128
) -> ObjectApiResponse:
129
"""
130
Retry failed lifecycle actions.
131
132
Parameters:
133
- index: Index name(s) to retry
134
- master_timeout: Timeout for master node response
135
- timeout: Request timeout
136
137
Returns:
138
ObjectApiResponse with retry result
139
"""
140
141
def remove_policy(
142
self,
143
index: Union[str, List[str]],
144
master_timeout: Optional[str] = None,
145
timeout: Optional[str] = None,
146
**kwargs
147
) -> ObjectApiResponse:
148
"""
149
Remove lifecycle policy from indices.
150
151
Parameters:
152
- index: Index name(s) to remove policy from
153
- master_timeout: Timeout for master node response
154
- timeout: Request timeout
155
156
Returns:
157
ObjectApiResponse with removal result
158
"""
159
160
def start(
161
self,
162
master_timeout: Optional[str] = None,
163
timeout: Optional[str] = None,
164
**kwargs
165
) -> ObjectApiResponse:
166
"""
167
Start the index lifecycle management service.
168
169
Parameters:
170
- master_timeout: Timeout for master node response
171
- timeout: Request timeout
172
173
Returns:
174
ObjectApiResponse with start result
175
"""
176
177
def stop(
178
self,
179
master_timeout: Optional[str] = None,
180
timeout: Optional[str] = None,
181
**kwargs
182
) -> ObjectApiResponse:
183
"""
184
Stop the index lifecycle management service.
185
186
Parameters:
187
- master_timeout: Timeout for master node response
188
- timeout: Request timeout
189
190
Returns:
191
ObjectApiResponse with stop result
192
"""
193
194
def get_status(
195
self,
196
master_timeout: Optional[str] = None,
197
timeout: Optional[str] = None,
198
**kwargs
199
) -> ObjectApiResponse:
200
"""
201
Get the status of index lifecycle management.
202
203
Parameters:
204
- master_timeout: Timeout for master node response
205
- timeout: Request timeout
206
207
Returns:
208
ObjectApiResponse with ILM status
209
"""
210
```
211
212
## Snapshot Lifecycle Management (SLM)
213
214
### SLM Policy Management
215
216
Create and manage snapshot lifecycle policies for automated backups.
217
218
```python { .api }
219
def put_policy(
220
self,
221
name: str,
222
schedule: str,
223
name_pattern: str,
224
repository: str,
225
config: Optional[Dict[str, Any]] = None,
226
retention: Optional[Dict[str, Any]] = None,
227
master_timeout: Optional[str] = None,
228
timeout: Optional[str] = None,
229
**kwargs
230
) -> ObjectApiResponse:
231
"""
232
Create or update a snapshot lifecycle policy.
233
234
Parameters:
235
- name: Policy name
236
- schedule: Cron expression for snapshot schedule
237
- name_pattern: Pattern for snapshot names
238
- repository: Repository name for snapshots
239
- config: Snapshot configuration options
240
- retention: Retention policy for snapshots
241
- master_timeout: Timeout for master node response
242
- timeout: Request timeout
243
244
Returns:
245
ObjectApiResponse with policy creation result
246
"""
247
248
def get_policy(
249
self,
250
name: Optional[Union[str, List[str]]] = None,
251
master_timeout: Optional[str] = None,
252
timeout: Optional[str] = None,
253
**kwargs
254
) -> ObjectApiResponse:
255
"""
256
Get snapshot lifecycle policies.
257
258
Parameters:
259
- name: Policy name(s) to retrieve, or None for all policies
260
- master_timeout: Timeout for master node response
261
- timeout: Request timeout
262
263
Returns:
264
ObjectApiResponse with policy information
265
"""
266
267
def delete_policy(
268
self,
269
name: str,
270
master_timeout: Optional[str] = None,
271
timeout: Optional[str] = None,
272
**kwargs
273
) -> ObjectApiResponse:
274
"""
275
Delete a snapshot lifecycle policy.
276
277
Parameters:
278
- name: Policy name to delete
279
- master_timeout: Timeout for master node response
280
- timeout: Request timeout
281
282
Returns:
283
ObjectApiResponse with deletion result
284
"""
285
```
286
287
### SLM Execution and Monitoring
288
289
Monitor and control snapshot lifecycle policy execution.
290
291
```python { .api }
292
def execute_policy(
293
self,
294
name: str,
295
master_timeout: Optional[str] = None,
296
timeout: Optional[str] = None,
297
**kwargs
298
) -> ObjectApiResponse:
299
"""
300
Execute a snapshot lifecycle policy immediately.
301
302
Parameters:
303
- name: Policy name to execute
304
- master_timeout: Timeout for master node response
305
- timeout: Request timeout
306
307
Returns:
308
ObjectApiResponse with execution result including snapshot name
309
"""
310
311
def get_stats(
312
self,
313
master_timeout: Optional[str] = None,
314
timeout: Optional[str] = None,
315
**kwargs
316
) -> ObjectApiResponse:
317
"""
318
Get snapshot lifecycle management statistics.
319
320
Parameters:
321
- master_timeout: Timeout for master node response
322
- timeout: Request timeout
323
324
Returns:
325
ObjectApiResponse with SLM statistics and policy execution history
326
"""
327
328
def get_status(
329
self,
330
master_timeout: Optional[str] = None,
331
timeout: Optional[str] = None,
332
**kwargs
333
) -> ObjectApiResponse:
334
"""
335
Get the status of snapshot lifecycle management.
336
337
Parameters:
338
- master_timeout: Timeout for master node response
339
- timeout: Request timeout
340
341
Returns:
342
ObjectApiResponse with SLM status
343
"""
344
345
def start(
346
self,
347
master_timeout: Optional[str] = None,
348
timeout: Optional[str] = None,
349
**kwargs
350
) -> ObjectApiResponse:
351
"""
352
Start the snapshot lifecycle management service.
353
354
Parameters:
355
- master_timeout: Timeout for master node response
356
- timeout: Request timeout
357
358
Returns:
359
ObjectApiResponse with start result
360
"""
361
362
def stop(
363
self,
364
master_timeout: Optional[str] = None,
365
timeout: Optional[str] = None,
366
**kwargs
367
) -> ObjectApiResponse:
368
"""
369
Stop the snapshot lifecycle management service.
370
371
Parameters:
372
- master_timeout: Timeout for master node response
373
- timeout: Request timeout
374
375
Returns:
376
ObjectApiResponse with stop result
377
"""
378
```
379
380
## Usage Examples
381
382
### Index Lifecycle Management
383
384
```python
385
from elasticsearch import Elasticsearch
386
387
client = Elasticsearch(hosts=['http://localhost:9200'])
388
389
# Create a comprehensive ILM policy for log data
390
client.ilm.put_policy(
391
name="logs_policy",
392
policy={
393
"phases": {
394
"hot": {
395
"min_age": "0ms",
396
"actions": {
397
"rollover": {
398
"max_size": "5GB",
399
"max_age": "1d",
400
"max_docs": 10000000
401
},
402
"set_priority": {
403
"priority": 100
404
}
405
}
406
},
407
"warm": {
408
"min_age": "7d",
409
"actions": {
410
"set_priority": {
411
"priority": 50
412
},
413
"allocate": {
414
"number_of_replicas": 0,
415
"require": {
416
"data_tier": "warm"
417
}
418
},
419
"forcemerge": {
420
"max_num_segments": 1
421
}
422
}
423
},
424
"cold": {
425
"min_age": "30d",
426
"actions": {
427
"set_priority": {
428
"priority": 0
429
},
430
"allocate": {
431
"number_of_replicas": 0,
432
"require": {
433
"data_tier": "cold"
434
}
435
}
436
}
437
},
438
"frozen": {
439
"min_age": "90d",
440
"actions": {
441
"searchable_snapshot": {
442
"snapshot_repository": "cold_repository"
443
}
444
}
445
},
446
"delete": {
447
"min_age": "365d",
448
"actions": {
449
"delete": {}
450
}
451
}
452
}
453
}
454
)
455
456
# Create index template with ILM policy
457
client.indices.put_index_template(
458
name="logs_template",
459
index_patterns=["logs-*"],
460
template={
461
"settings": {
462
"number_of_shards": 1,
463
"number_of_replicas": 1,
464
"index.lifecycle.name": "logs_policy",
465
"index.lifecycle.rollover_alias": "logs"
466
},
467
"mappings": {
468
"properties": {
469
"@timestamp": {"type": "date"},
470
"level": {"type": "keyword"},
471
"message": {"type": "text"},
472
"service": {"type": "keyword"}
473
}
474
}
475
}
476
)
477
478
# Create the initial index and alias
479
client.indices.create(
480
index="logs-000001",
481
settings={
482
"index.lifecycle.name": "logs_policy",
483
"index.lifecycle.rollover_alias": "logs"
484
},
485
aliases={"logs": {"is_write_index": True}}
486
)
487
488
# Monitor ILM execution
489
explain_result = client.ilm.explain_lifecycle(index="logs-*")
490
for index_name, index_info in explain_result.body['indices'].items():
491
print(f"Index: {index_name}")
492
print(f" Policy: {index_info.get('policy', 'None')}")
493
print(f" Phase: {index_info.get('phase', 'N/A')}")
494
print(f" Action: {index_info.get('action', 'N/A')}")
495
496
if 'phase_execution' in index_info:
497
execution = index_info['phase_execution']
498
print(f" Phase time: {execution.get('phase_time_millis', 'N/A')}ms")
499
```
500
501
### Snapshot Lifecycle Management
502
503
```python
504
# First, create a snapshot repository
505
client.snapshot.create_repository(
506
repository="backup_repository",
507
settings={
508
"type": "fs",
509
"settings": {
510
"location": "/mount/backups/elasticsearch"
511
}
512
}
513
)
514
515
# Create an SLM policy for automated backups
516
client.slm.put_policy(
517
name="daily_backup_policy",
518
schedule="0 2 * * *", # Daily at 2 AM
519
name_pattern="backup-{now/d}",
520
repository="backup_repository",
521
config={
522
"indices": ["important-*", "logs-*"],
523
"ignore_unavailable": True,
524
"include_global_state": False,
525
"metadata": {
526
"taken_by": "automated_slm",
527
"purpose": "daily_backup"
528
}
529
},
530
retention={
531
"expire_after": "30d",
532
"min_count": 5,
533
"max_count": 50
534
}
535
)
536
537
# Create a weekly backup policy with longer retention
538
client.slm.put_policy(
539
name="weekly_backup_policy",
540
schedule="0 3 * * 0", # Weekly on Sunday at 3 AM
541
name_pattern="weekly-backup-{now/w}",
542
repository="backup_repository",
543
config={
544
"indices": ["*"],
545
"ignore_unavailable": True,
546
"include_global_state": True,
547
"metadata": {
548
"taken_by": "automated_slm",
549
"purpose": "weekly_full_backup"
550
}
551
},
552
retention={
553
"expire_after": "180d",
554
"min_count": 10,
555
"max_count": 100
556
}
557
)
558
559
# Execute a policy immediately for testing
560
execute_result = client.slm.execute_policy(name="daily_backup_policy")
561
snapshot_name = execute_result.body['snapshot_name']
562
print(f"Snapshot created: {snapshot_name}")
563
564
# Monitor SLM statistics
565
slm_stats = client.slm.get_stats()
566
print("SLM Statistics:")
567
print(f" Retention runs: {slm_stats.body['retention_runs']}")
568
print(f" Retention failed: {slm_stats.body['retention_failed']}")
569
print(f" Retention timed out: {slm_stats.body['retention_timed_out']}")
570
print(f" Total snapshots taken: {slm_stats.body['total_snapshots_taken']}")
571
print(f" Total snapshots failed: {slm_stats.body['total_snapshots_failed']}")
572
573
# Check individual policy statistics
574
for policy_name, policy_stats in slm_stats.body['policy_stats'].items():
575
print(f"Policy: {policy_name}")
576
print(f" Snapshots taken: {policy_stats['snapshots_taken']}")
577
print(f" Snapshots failed: {policy_stats['snapshots_failed']}")
578
if 'last_success' in policy_stats:
579
print(f" Last success: {policy_stats['last_success']['snapshot_name']}")
580
if 'last_failure' in policy_stats:
581
print(f" Last failure: {policy_stats['last_failure']['snapshot_name']}")
582
```
583
584
### Advanced Lifecycle Management
585
586
```python
587
# Create a complex ILM policy with multiple conditions
588
client.ilm.put_policy(
589
name="complex_logs_policy",
590
policy={
591
"phases": {
592
"hot": {
593
"min_age": "0ms",
594
"actions": {
595
"rollover": {
596
"max_size": "2GB",
597
"max_age": "6h",
598
"max_docs": 5000000
599
},
600
"set_priority": {
601
"priority": 100
602
}
603
}
604
},
605
"warm": {
606
"min_age": "12h",
607
"actions": {
608
"readonly": {},
609
"set_priority": {
610
"priority": 50
611
},
612
"allocate": {
613
"number_of_replicas": 0,
614
"require": {
615
"data_tier": "warm"
616
}
617
},
618
"forcemerge": {
619
"max_num_segments": 1
620
},
621
"shrink": {
622
"number_of_shards": 1,
623
"allow_write_after_shrink": False
624
}
625
}
626
},
627
"cold": {
628
"min_age": "3d",
629
"actions": {
630
"set_priority": {
631
"priority": 0
632
},
633
"allocate": {
634
"number_of_replicas": 0,
635
"require": {
636
"data_tier": "cold"
637
}
638
}
639
}
640
},
641
"delete": {
642
"min_age": "30d",
643
"actions": {
644
"wait_for_snapshot": {
645
"policy": "daily_backup_policy"
646
},
647
"delete": {}
648
}
649
}
650
}
651
}
652
)
653
654
# Move an index to a specific phase manually
655
client.ilm.move_to_step(
656
index="logs-000005",
657
current_step={
658
"phase": "hot",
659
"action": "rollover",
660
"name": "check-rollover-ready"
661
},
662
next_step={
663
"phase": "warm",
664
"action": "allocate",
665
"name": "allocate"
666
}
667
)
668
669
# Handle failed lifecycle actions
670
explain_result = client.ilm.explain_lifecycle(index="logs-*", only_errors=True)
671
failed_indices = []
672
673
for index_name, index_info in explain_result.body['indices'].items():
674
if index_info.get('step_info', {}).get('type') == 'error':
675
failed_indices.append(index_name)
676
print(f"Failed index: {index_name}")
677
print(f" Error: {index_info['step_info']['reason']}")
678
679
# Retry failed actions
680
if failed_indices:
681
retry_result = client.ilm.retry(index=failed_indices)
682
print(f"Retried {len(failed_indices)} indices")
683
684
# Temporarily stop ILM for maintenance
685
client.ilm.stop()
686
print("ILM stopped")
687
688
# Perform maintenance operations...
689
690
# Resume ILM
691
client.ilm.start()
692
print("ILM resumed")
693
694
# Get ILM status
695
status = client.ilm.get_status()
696
print(f"ILM operation mode: {status.body['operation_mode']}")
697
```
698
699
### Monitoring and Troubleshooting
700
701
```python
702
# Comprehensive lifecycle monitoring function
703
def monitor_lifecycle_health():
704
# Check ILM status
705
ilm_status = client.ilm.get_status()
706
print(f"ILM Status: {ilm_status.body['operation_mode']}")
707
708
# Check SLM status
709
slm_status = client.slm.get_status()
710
print(f"SLM Status: {slm_status.body['operation_mode']}")
711
712
# Get all ILM policies
713
policies = client.ilm.get_policy()
714
print(f"Active ILM policies: {len(policies.body)}")
715
716
# Check for indices with lifecycle errors
717
error_explain = client.ilm.explain_lifecycle(
718
index="*",
719
only_errors=True,
720
only_managed=True
721
)
722
723
error_count = len(error_explain.body['indices'])
724
if error_count > 0:
725
print(f"WARNING: {error_count} indices have lifecycle errors")
726
for index_name, index_info in error_explain.body['indices'].items():
727
error_info = index_info.get('step_info', {})
728
print(f" {index_name}: {error_info.get('reason', 'Unknown error')}")
729
else:
730
print("No lifecycle errors detected")
731
732
# Check SLM policy execution
733
slm_stats = client.slm.get_stats()
734
total_failed = slm_stats.body['total_snapshots_failed']
735
if total_failed > 0:
736
print(f"WARNING: {total_failed} snapshot(s) have failed")
737
for policy_name, stats in slm_stats.body['policy_stats'].items():
738
if stats['snapshots_failed'] > 0:
739
print(f" Policy '{policy_name}': {stats['snapshots_failed']} failures")
740
741
return {
742
'ilm_healthy': ilm_status.body['operation_mode'] == 'RUNNING',
743
'slm_healthy': slm_status.body['operation_mode'] == 'RUNNING',
744
'lifecycle_errors': error_count,
745
'snapshot_failures': total_failed
746
}
747
748
# Run monitoring
749
health_status = monitor_lifecycle_health()
750
print(f"Overall health: {'HEALTHY' if all(health_status.values()) else 'ISSUES DETECTED'}")
751
```