0
# Machine Learning
1
2
Machine learning job management, anomaly detection, data analysis, and model operations for Elasticsearch's ML capabilities. These operations provide comprehensive machine learning functionality for detecting anomalies, forecasting, and data analysis.
3
4
## Capabilities
5
6
### Job Management
7
8
Create and manage machine learning jobs for anomaly detection.
9
10
```python { .api }
11
def put_job(
12
self,
13
job_id: str,
14
analysis_config: Dict[str, Any],
15
data_description: Dict[str, Any],
16
description: Optional[str] = None,
17
results_index_name: Optional[str] = None,
18
groups: Optional[List[str]] = None,
19
model_snapshot_retention_days: Optional[int] = None,
20
daily_model_snapshot_retention_after_days: Optional[int] = None,
21
analysis_limits: Optional[Dict[str, Any]] = None,
22
background_persist_interval: Optional[str] = None,
23
custom_settings: Optional[Dict[str, Any]] = None,
24
model_plot_config: Optional[Dict[str, Any]] = None,
25
renormalization_window_days: Optional[int] = None,
26
**kwargs
27
) -> ObjectApiResponse:
28
"""
29
Create a machine learning job.
30
31
Parameters:
32
- job_id: Unique identifier for the job
33
- analysis_config: Analysis configuration including detectors
34
- data_description: Description of input data format
35
- description: Human-readable job description
36
- results_index_name: Index for storing results
37
- groups: Job groups for organization
38
- model_snapshot_retention_days: Retention period for model snapshots
39
- daily_model_snapshot_retention_after_days: Daily snapshot retention threshold
40
- analysis_limits: Memory and processing limits
41
- background_persist_interval: Interval for persisting model updates
42
- custom_settings: Custom job settings
43
- model_plot_config: Model plot configuration
44
- renormalization_window_days: Window for model renormalization
45
46
Returns:
47
ObjectApiResponse with job creation result
48
"""
49
50
def get_jobs(
51
self,
52
job_id: Optional[str] = None,
53
allow_no_match: Optional[bool] = None,
54
exclude_generated: Optional[bool] = None,
55
**kwargs
56
) -> ObjectApiResponse:
57
"""
58
Get machine learning job information.
59
60
Parameters:
61
- job_id: Job ID or pattern to retrieve
62
- allow_no_match: Whether to ignore if no jobs match
63
- exclude_generated: Whether to exclude generated configurations
64
65
Returns:
66
ObjectApiResponse with job information
67
"""
68
69
def delete_job(
70
self,
71
job_id: str,
72
force: Optional[bool] = None,
73
wait_for_completion: Optional[bool] = None,
74
**kwargs
75
) -> ObjectApiResponse:
76
"""
77
Delete a machine learning job.
78
79
Parameters:
80
- job_id: Job ID to delete
81
- force: Whether to force deletion of running job
82
- wait_for_completion: Whether to wait for deletion to complete
83
84
Returns:
85
ObjectApiResponse with deletion result
86
"""
87
88
def open_job(
89
self,
90
job_id: str,
91
ignore_downtime: Optional[bool] = None,
92
timeout: Optional[str] = None,
93
**kwargs
94
) -> ObjectApiResponse:
95
"""
96
Open a machine learning job.
97
98
Parameters:
99
- job_id: Job ID to open
100
- ignore_downtime: Whether to ignore downtime when opening
101
- timeout: Timeout for opening job
102
103
Returns:
104
ObjectApiResponse with job opening result
105
"""
106
107
def close_job(
108
self,
109
job_id: str,
110
allow_no_match: Optional[bool] = None,
111
force: Optional[bool] = None,
112
timeout: Optional[str] = None,
113
**kwargs
114
) -> ObjectApiResponse:
115
"""
116
Close a machine learning job.
117
118
Parameters:
119
- job_id: Job ID to close
120
- allow_no_match: Whether to ignore if job doesn't exist
121
- force: Whether to force close running job
122
- timeout: Timeout for closing job
123
124
Returns:
125
ObjectApiResponse with job closing result
126
"""
127
128
def update_job(
129
self,
130
job_id: str,
131
description: Optional[str] = None,
132
analysis_limits: Optional[Dict[str, Any]] = None,
133
background_persist_interval: Optional[str] = None,
134
custom_settings: Optional[Dict[str, Any]] = None,
135
model_plot_config: Optional[Dict[str, Any]] = None,
136
model_snapshot_retention_days: Optional[int] = None,
137
daily_model_snapshot_retention_after_days: Optional[int] = None,
138
groups: Optional[List[str]] = None,
139
detectors: Optional[List[Dict[str, Any]]] = None,
140
**kwargs
141
) -> ObjectApiResponse:
142
"""
143
Update a machine learning job.
144
145
Parameters:
146
- job_id: Job ID to update
147
- description: Updated job description
148
- analysis_limits: Updated analysis limits
149
- background_persist_interval: Updated persist interval
150
- custom_settings: Updated custom settings
151
- model_plot_config: Updated model plot configuration
152
- model_snapshot_retention_days: Updated snapshot retention
153
- daily_model_snapshot_retention_after_days: Updated daily retention
154
- groups: Updated job groups
155
- detectors: Updated detector configurations
156
157
Returns:
158
ObjectApiResponse with job update result
159
"""
160
```
161
162
### Datafeed Management
163
164
Manage datafeeds that supply data to machine learning jobs.
165
166
```python { .api }
167
def put_datafeed(
168
self,
169
datafeed_id: str,
170
job_id: str,
171
indices: List[str],
172
aggregations: Optional[Dict[str, Any]] = None,
173
chunking_config: Optional[Dict[str, Any]] = None,
174
frequency: Optional[str] = None,
175
query: Optional[Dict[str, Any]] = None,
176
query_delay: Optional[str] = None,
177
runtime_mappings: Optional[Dict[str, Any]] = None,
178
script_fields: Optional[Dict[str, Any]] = None,
179
scroll_size: Optional[int] = None,
180
delayed_data_check_config: Optional[Dict[str, Any]] = None,
181
max_empty_searches: Optional[int] = None,
182
**kwargs
183
) -> ObjectApiResponse:
184
"""
185
Create a datafeed for a machine learning job.
186
187
Parameters:
188
- datafeed_id: Unique identifier for the datafeed
189
- job_id: Associated job ID
190
- indices: List of indices to read from
191
- aggregations: Aggregations to apply to data
192
- chunking_config: Data chunking configuration
193
- frequency: Frequency of data checks
194
- query: Query to filter data
195
- query_delay: Delay between data time and analysis
196
- runtime_mappings: Runtime field mappings
197
- script_fields: Script-based field definitions
198
- scroll_size: Scroll size for data retrieval
199
- delayed_data_check_config: Configuration for delayed data checks
200
- max_empty_searches: Maximum consecutive empty searches
201
202
Returns:
203
ObjectApiResponse with datafeed creation result
204
"""
205
206
def get_datafeeds(
207
self,
208
datafeed_id: Optional[str] = None,
209
allow_no_match: Optional[bool] = None,
210
exclude_generated: Optional[bool] = None,
211
**kwargs
212
) -> ObjectApiResponse:
213
"""
214
Get datafeed information.
215
216
Parameters:
217
- datafeed_id: Datafeed ID or pattern to retrieve
218
- allow_no_match: Whether to ignore if no datafeeds match
219
- exclude_generated: Whether to exclude generated configurations
220
221
Returns:
222
ObjectApiResponse with datafeed information
223
"""
224
225
def start_datafeed(
226
self,
227
datafeed_id: str,
228
start: Optional[str] = None,
229
end: Optional[str] = None,
230
timeout: Optional[str] = None,
231
**kwargs
232
) -> ObjectApiResponse:
233
"""
234
Start a datafeed.
235
236
Parameters:
237
- datafeed_id: Datafeed ID to start
238
- start: Start time for data processing
239
- end: End time for data processing
240
- timeout: Timeout for starting datafeed
241
242
Returns:
243
ObjectApiResponse with datafeed start result
244
"""
245
246
def stop_datafeed(
247
self,
248
datafeed_id: str,
249
allow_no_match: Optional[bool] = None,
250
force: Optional[bool] = None,
251
timeout: Optional[str] = None,
252
**kwargs
253
) -> ObjectApiResponse:
254
"""
255
Stop a datafeed.
256
257
Parameters:
258
- datafeed_id: Datafeed ID to stop
259
- allow_no_match: Whether to ignore if datafeed doesn't exist
260
- force: Whether to force stop
261
- timeout: Timeout for stopping datafeed
262
263
Returns:
264
ObjectApiResponse with datafeed stop result
265
"""
266
267
def delete_datafeed(
268
self,
269
datafeed_id: str,
270
force: Optional[bool] = None,
271
**kwargs
272
) -> ObjectApiResponse:
273
"""
274
Delete a datafeed.
275
276
Parameters:
277
- datafeed_id: Datafeed ID to delete
278
- force: Whether to force deletion
279
280
Returns:
281
ObjectApiResponse with deletion result
282
"""
283
```
284
285
### Model Management
286
287
Manage machine learning models and snapshots.
288
289
```python { .api }
290
def get_model_snapshots(
291
self,
292
job_id: str,
293
snapshot_id: Optional[str] = None,
294
from_: Optional[int] = None,
295
size: Optional[int] = None,
296
start: Optional[str] = None,
297
end: Optional[str] = None,
298
sort: Optional[str] = None,
299
desc: Optional[bool] = None,
300
**kwargs
301
) -> ObjectApiResponse:
302
"""
303
Get model snapshots for a job.
304
305
Parameters:
306
- job_id: Job ID to get snapshots for
307
- snapshot_id: Specific snapshot ID
308
- from_: Starting offset for results
309
- size: Number of results to return
310
- start: Start time for snapshot range
311
- end: End time for snapshot range
312
- sort: Sort field for results
313
- desc: Whether to sort in descending order
314
315
Returns:
316
ObjectApiResponse with model snapshots
317
"""
318
319
def update_model_snapshot(
320
self,
321
job_id: str,
322
snapshot_id: str,
323
description: Optional[str] = None,
324
retain: Optional[bool] = None,
325
**kwargs
326
) -> ObjectApiResponse:
327
"""
328
Update a model snapshot.
329
330
Parameters:
331
- job_id: Job ID containing the snapshot
332
- snapshot_id: Snapshot ID to update
333
- description: Updated description
334
- retain: Whether to retain the snapshot
335
336
Returns:
337
ObjectApiResponse with update result
338
"""
339
340
def delete_model_snapshot(
341
self,
342
job_id: str,
343
snapshot_id: str,
344
**kwargs
345
) -> ObjectApiResponse:
346
"""
347
Delete a model snapshot.
348
349
Parameters:
350
- job_id: Job ID containing the snapshot
351
- snapshot_id: Snapshot ID to delete
352
353
Returns:
354
ObjectApiResponse with deletion result
355
"""
356
357
def revert_model_snapshot(
358
self,
359
job_id: str,
360
snapshot_id: str,
361
delete_intervening_results: Optional[bool] = None,
362
**kwargs
363
) -> ObjectApiResponse:
364
"""
365
Revert to a previous model snapshot.
366
367
Parameters:
368
- job_id: Job ID to revert
369
- snapshot_id: Snapshot ID to revert to
370
- delete_intervening_results: Whether to delete results after snapshot
371
372
Returns:
373
ObjectApiResponse with revert result
374
"""
375
```
376
377
### Anomaly Detection Results
378
379
Retrieve and analyze anomaly detection results.
380
381
```python { .api }
382
def get_buckets(
383
self,
384
job_id: str,
385
timestamp: Optional[str] = None,
386
anomaly_score: Optional[float] = None,
387
from_: Optional[int] = None,
388
size: Optional[int] = None,
389
start: Optional[str] = None,
390
end: Optional[str] = None,
391
exclude_interim: Optional[bool] = None,
392
expand: Optional[bool] = None,
393
sort: Optional[str] = None,
394
desc: Optional[bool] = None,
395
**kwargs
396
) -> ObjectApiResponse:
397
"""
398
Get anomaly detection buckets for a job.
399
400
Parameters:
401
- job_id: Job ID to get buckets for
402
- timestamp: Specific timestamp to retrieve
403
- anomaly_score: Minimum anomaly score threshold
404
- from_: Starting offset for results
405
- size: Number of results to return
406
- start: Start time for bucket range
407
- end: End time for bucket range
408
- exclude_interim: Whether to exclude interim results
409
- expand: Whether to expand bucket details
410
- sort: Sort field for results
411
- desc: Whether to sort in descending order
412
413
Returns:
414
ObjectApiResponse with anomaly buckets
415
"""
416
417
def get_records(
418
self,
419
job_id: str,
420
from_: Optional[int] = None,
421
size: Optional[int] = None,
422
start: Optional[str] = None,
423
end: Optional[str] = None,
424
record_score: Optional[float] = None,
425
sort: Optional[str] = None,
426
desc: Optional[bool] = None,
427
exclude_interim: Optional[bool] = None,
428
**kwargs
429
) -> ObjectApiResponse:
430
"""
431
Get anomaly records for a job.
432
433
Parameters:
434
- job_id: Job ID to get records for
435
- from_: Starting offset for results
436
- size: Number of results to return
437
- start: Start time for record range
438
- end: End time for record range
439
- record_score: Minimum record score threshold
440
- sort: Sort field for results
441
- desc: Whether to sort in descending order
442
- exclude_interim: Whether to exclude interim results
443
444
Returns:
445
ObjectApiResponse with anomaly records
446
"""
447
448
def get_influencers(
449
self,
450
job_id: str,
451
from_: Optional[int] = None,
452
size: Optional[int] = None,
453
start: Optional[str] = None,
454
end: Optional[str] = None,
455
influencer_score: Optional[float] = None,
456
sort: Optional[str] = None,
457
desc: Optional[bool] = None,
458
exclude_interim: Optional[bool] = None,
459
**kwargs
460
) -> ObjectApiResponse:
461
"""
462
Get influencers for a job.
463
464
Parameters:
465
- job_id: Job ID to get influencers for
466
- from_: Starting offset for results
467
- size: Number of results to return
468
- start: Start time for influencer range
469
- end: End time for influencer range
470
- influencer_score: Minimum influencer score threshold
471
- sort: Sort field for results
472
- desc: Whether to sort in descending order
473
- exclude_interim: Whether to exclude interim results
474
475
Returns:
476
ObjectApiResponse with influencers
477
"""
478
479
def get_categories(
480
self,
481
job_id: str,
482
category_id: Optional[str] = None,
483
from_: Optional[int] = None,
484
size: Optional[int] = None,
485
partition_field_value: Optional[str] = None,
486
**kwargs
487
) -> ObjectApiResponse:
488
"""
489
Get categories for a job.
490
491
Parameters:
492
- job_id: Job ID to get categories for
493
- category_id: Specific category ID
494
- from_: Starting offset for results
495
- size: Number of results to return
496
- partition_field_value: Partition field value filter
497
498
Returns:
499
ObjectApiResponse with categories
500
"""
501
```
502
503
### Data Frame Analytics
504
505
Manage data frame analytics jobs for supervised learning.
506
507
```python { .api }
508
def put_data_frame_analytics(
509
self,
510
id: str,
511
source: Dict[str, Any],
512
dest: Dict[str, Any],
513
analysis: Dict[str, Any],
514
description: Optional[str] = None,
515
model_memory_limit: Optional[str] = None,
516
max_num_threads: Optional[int] = None,
517
analyzed_fields: Optional[Dict[str, Any]] = None,
518
allow_lazy_start: Optional[bool] = None,
519
**kwargs
520
) -> ObjectApiResponse:
521
"""
522
Create a data frame analytics job.
523
524
Parameters:
525
- id: Unique identifier for the analytics job
526
- source: Source configuration including index and query
527
- dest: Destination configuration for results
528
- analysis: Analysis configuration (classification, regression, outlier_detection)
529
- description: Human-readable job description
530
- model_memory_limit: Memory limit for analysis
531
- max_num_threads: Maximum number of threads
532
- analyzed_fields: Fields to include/exclude from analysis
533
- allow_lazy_start: Whether to allow lazy start
534
535
Returns:
536
ObjectApiResponse with job creation result
537
"""
538
539
def get_data_frame_analytics(
540
self,
541
id: Optional[str] = None,
542
allow_no_match: Optional[bool] = None,
543
from_: Optional[int] = None,
544
size: Optional[int] = None,
545
exclude_generated: Optional[bool] = None,
546
**kwargs
547
) -> ObjectApiResponse:
548
"""
549
Get data frame analytics job information.
550
551
Parameters:
552
- id: Job ID or pattern to retrieve
553
- allow_no_match: Whether to ignore if no jobs match
554
- from_: Starting offset for results
555
- size: Number of results to return
556
- exclude_generated: Whether to exclude generated configurations
557
558
Returns:
559
ObjectApiResponse with analytics job information
560
"""
561
562
def start_data_frame_analytics(
563
self,
564
id: str,
565
timeout: Optional[str] = None,
566
**kwargs
567
) -> ObjectApiResponse:
568
"""
569
Start a data frame analytics job.
570
571
Parameters:
572
- id: Job ID to start
573
- timeout: Timeout for starting job
574
575
Returns:
576
ObjectApiResponse with start result
577
"""
578
579
def stop_data_frame_analytics(
580
self,
581
id: str,
582
allow_no_match: Optional[bool] = None,
583
force: Optional[bool] = None,
584
timeout: Optional[str] = None,
585
**kwargs
586
) -> ObjectApiResponse:
587
"""
588
Stop a data frame analytics job.
589
590
Parameters:
591
- id: Job ID to stop
592
- allow_no_match: Whether to ignore if job doesn't exist
593
- force: Whether to force stop
594
- timeout: Timeout for stopping job
595
596
Returns:
597
ObjectApiResponse with stop result
598
"""
599
600
def delete_data_frame_analytics(
601
self,
602
id: str,
603
force: Optional[bool] = None,
604
timeout: Optional[str] = None,
605
**kwargs
606
) -> ObjectApiResponse:
607
"""
608
Delete a data frame analytics job.
609
610
Parameters:
611
- id: Job ID to delete
612
- force: Whether to force deletion
613
- timeout: Timeout for deletion
614
615
Returns:
616
ObjectApiResponse with deletion result
617
"""
618
```
619
620
## Usage Examples
621
622
### Anomaly Detection Job
623
624
```python
625
from elasticsearch import Elasticsearch
626
627
client = Elasticsearch(hosts=['http://localhost:9200'])
628
629
# Create an anomaly detection job
630
client.ml.put_job(
631
job_id="transaction_anomalies",
632
description="Detect anomalies in transaction amounts",
633
analysis_config={
634
"bucket_span": "15m",
635
"detectors": [
636
{
637
"function": "mean",
638
"field_name": "amount",
639
"by_field_name": "user_id"
640
},
641
{
642
"function": "high_count",
643
"by_field_name": "merchant_category"
644
}
645
],
646
"influencers": ["user_id", "merchant_category"]
647
},
648
data_description={
649
"time_field": "@timestamp",
650
"time_format": "epoch_ms"
651
},
652
analysis_limits={
653
"model_memory_limit": "100mb"
654
},
655
model_plot_config={
656
"enabled": True,
657
"terms": "user_id"
658
}
659
)
660
661
# Create datafeed for the job
662
client.ml.put_datafeed(
663
datafeed_id="transaction_anomalies_feed",
664
job_id="transaction_anomalies",
665
indices=["transactions-*"],
666
query={
667
"bool": {
668
"must": [
669
{"range": {"amount": {"gt": 0}}},
670
{"term": {"status": "completed"}}
671
]
672
}
673
},
674
frequency="5m",
675
query_delay="30s",
676
scroll_size=1000
677
)
678
679
# Open the job and start the datafeed
680
client.ml.open_job(job_id="transaction_anomalies")
681
client.ml.start_datafeed(
682
datafeed_id="transaction_anomalies_feed",
683
start="2024-01-01T00:00:00Z"
684
)
685
686
# Wait for analysis, then get results
687
import time
688
time.sleep(60) # Allow some processing time
689
690
# Get anomaly buckets
691
buckets = client.ml.get_buckets(
692
job_id="transaction_anomalies",
693
anomaly_score=75, # Only high-score anomalies
694
size=10,
695
sort="anomaly_score",
696
desc=True
697
)
698
699
for bucket in buckets.body['buckets']:
700
print(f"Anomaly at {bucket['timestamp']}, score: {bucket['anomaly_score']}")
701
702
# Get detailed anomaly records
703
records = client.ml.get_records(
704
job_id="transaction_anomalies",
705
record_score=50,
706
size=20
707
)
708
709
for record in records.body['records']:
710
print(f"Record anomaly: {record['function']} on {record['field_name']}")
711
print(f" Typical: {record.get('typical', 'N/A')}, Actual: {record.get('actual', 'N/A')}")
712
print(f" User: {record.get('by_field_value', 'N/A')}")
713
```
714
715
### Data Frame Analytics for Classification
716
717
```python
718
# Create a classification job to predict customer churn
719
client.ml.put_data_frame_analytics(
720
id="customer_churn_prediction",
721
description="Predict customer churn based on usage patterns",
722
source={
723
"index": ["customer_data"],
724
"query": {
725
"bool": {
726
"must": [
727
{"range": {"account_age_days": {"gte": 30}}},
728
{"exists": {"field": "churned"}}
729
]
730
}
731
}
732
},
733
dest={
734
"index": "customer_churn_results",
735
"results_field": "ml_results"
736
},
737
analysis={
738
"classification": {
739
"dependent_variable": "churned",
740
"training_percent": 80,
741
"num_top_feature_importance_values": 5,
742
"prediction_field_name": "churn_prediction"
743
}
744
},
745
analyzed_fields={
746
"includes": [
747
"monthly_spend", "support_tickets", "login_frequency",
748
"feature_usage_score", "account_age_days", "churned"
749
]
750
},
751
model_memory_limit="200mb"
752
)
753
754
# Start the analytics job
755
client.ml.start_data_frame_analytics(id="customer_churn_prediction")
756
757
# Monitor job progress
758
job_stats = client.ml.get_data_frame_analytics_stats(id="customer_churn_prediction")
759
progress = job_stats.body['data_frame_analytics'][0]['progress']
760
print(f"Progress: {progress}")
761
762
# Once complete, examine results
763
results = client.search(
764
index="customer_churn_results",
765
query={"match_all": {}},
766
size=10,
767
sort=[{"ml_results.churn_prediction_probability": "desc"}]
768
)
769
770
for hit in results.body['hits']['hits']:
771
customer = hit['_source']
772
ml_results = customer['ml_results']
773
print(f"Customer ID: {customer.get('customer_id', 'N/A')}")
774
print(f" Churn prediction: {ml_results['churn_prediction']}")
775
print(f" Probability: {ml_results['churn_prediction_probability']:.3f}")
776
print(f" Top features: {ml_results.get('feature_importance', [])[:3]}")
777
```
778
779
### Outlier Detection
780
781
```python
782
# Create outlier detection job for fraud detection
783
client.ml.put_data_frame_analytics(
784
id="fraud_outlier_detection",
785
description="Detect fraudulent transactions using outlier detection",
786
source={
787
"index": ["transactions"],
788
"query": {
789
"range": {
790
"@timestamp": {
791
"gte": "now-7d"
792
}
793
}
794
}
795
},
796
dest={
797
"index": "fraud_outliers",
798
"results_field": "outlier_score"
799
},
800
analysis={
801
"outlier_detection": {
802
"n_neighbors": 20,
803
"method": "lof", # Local Outlier Factor
804
"feature_influence_threshold": 0.1,
805
"outlier_fraction": 0.05
806
}
807
},
808
analyzed_fields={
809
"includes": [
810
"amount", "transaction_hour", "merchant_category_code",
811
"days_since_last_transaction", "amount_deviation_from_avg"
812
]
813
},
814
model_memory_limit="150mb"
815
)
816
817
# Start outlier detection
818
client.ml.start_data_frame_analytics(id="fraud_outlier_detection")
819
820
# Query for high outlier scores (potential fraud)
821
outliers = client.search(
822
index="fraud_outliers",
823
query={
824
"range": {
825
"outlier_score.outlier_score": {"gte": 0.7}
826
}
827
},
828
sort=[{"outlier_score.outlier_score": "desc"}],
829
size=50
830
)
831
832
for hit in outliers.body['hits']['hits']:
833
transaction = hit['_source']
834
score = transaction['outlier_score']['outlier_score']
835
print(f"Transaction ID: {transaction.get('transaction_id', 'N/A')}")
836
print(f" Outlier score: {score:.3f}")
837
print(f" Amount: ${transaction.get('amount', 'N/A')}")
838
print(f" Merchant: {transaction.get('merchant_name', 'N/A')}")
839
```
840
841
### Model Management and Monitoring
842
843
```python
844
# Get job statistics and health
845
job_stats = client.ml.get_job_stats(job_id="transaction_anomalies")
846
job_info = job_stats.body['jobs'][0]
847
848
print(f"Job state: {job_info['state']}")
849
print(f"Data counts: {job_info['data_counts']}")
850
print(f"Model size: {job_info['model_size_stats']}")
851
print(f"Processed records: {job_info['data_counts']['processed_record_count']}")
852
853
# Get model snapshots
854
snapshots = client.ml.get_model_snapshots(
855
job_id="transaction_anomalies",
856
size=5,
857
sort="timestamp",
858
desc=True
859
)
860
861
latest_snapshot = snapshots.body['model_snapshots'][0]
862
print(f"Latest snapshot: {latest_snapshot['snapshot_id']}")
863
print(f"Model size: {latest_snapshot['model_size_stats']['total_by_field_count']}")
864
865
# Update job configuration
866
client.ml.update_job(
867
job_id="transaction_anomalies",
868
description="Updated: Detect anomalies in transaction amounts with enhanced settings",
869
analysis_limits={
870
"model_memory_limit": "150mb" # Increase memory limit
871
},
872
model_plot_config={
873
"enabled": True,
874
"terms": "user_id,merchant_category" # Add more terms
875
}
876
)
877
878
# Close job and datafeed when done
879
client.ml.stop_datafeed(datafeed_id="transaction_anomalies_feed")
880
client.ml.close_job(job_id="transaction_anomalies")
881
```