0
# STIX Equivalence and Similarity
1
2
Semantic equivalence and similarity algorithms for STIX objects, graphs, and patterns. These functions implement the STIX Semantic Equivalence Committee Note specifications, enabling intelligent comparison of STIX content that goes beyond simple string matching to understand semantic relationships.
3
4
## Capabilities
5
6
### Object Equivalence and Similarity
7
8
Functions to determine semantic equivalence and calculate similarity scores between STIX objects using configurable weights and property comparisons.
9
10
```python { .api }
11
def object_equivalence(obj1, obj2, prop_scores={}, threshold=70, ds1=None, ds2=None, ignore_spec_version=False, versioning_checks=False, max_depth=1, **weight_dict):
12
"""
13
Determine if two STIX objects are semantically equivalent.
14
15
Parameters:
16
- obj1: First STIX object instance
17
- obj2: Second STIX object instance
18
- prop_scores (dict): Dictionary to hold individual property scores and weights
19
- threshold (int): Minimum similarity score (0-100) for equivalence (default: 70)
20
- ds1/ds2: Optional DataStore instances for pulling related objects
21
- ignore_spec_version (bool): Ignore spec version differences (default: False)
22
- versioning_checks (bool): Test multiple object revisions (default: False)
23
- max_depth (int): Maximum recursion depth for de-referencing (default: 1)
24
- **weight_dict: Custom weights to override default similarity checks
25
26
Returns:
27
bool: True if similarity score >= threshold, False otherwise
28
"""
29
30
def object_similarity(obj1, obj2, prop_scores={}, ds1=None, ds2=None, ignore_spec_version=False, versioning_checks=False, max_depth=1, **weight_dict):
31
"""
32
Calculate similarity score between two STIX objects.
33
34
Parameters:
35
- obj1: First STIX object instance
36
- obj2: Second STIX object instance
37
- prop_scores (dict): Dictionary to hold individual property scores and weights
38
- ds1/ds2: Optional DataStore instances for pulling related objects
39
- ignore_spec_version (bool): Ignore spec version differences (default: False)
40
- versioning_checks (bool): Test multiple object revisions (default: False)
41
- max_depth (int): Maximum recursion depth for de-referencing (default: 1)
42
- **weight_dict: Custom weights to override default similarity checks
43
44
Returns:
45
float: Similarity score between 0.0 and 100.0
46
"""
47
```
48
49
Usage examples:
50
51
```python
52
from stix2.equivalence.object import object_equivalence, object_similarity
53
from stix2 import Indicator, Malware, MemoryStore
54
55
# Create similar indicators
56
indicator1 = Indicator(
57
name="Malicious File Hash",
58
indicator_types=["malicious-activity"],
59
pattern_type="stix",
60
pattern="[file:hashes.MD5 = 'abc123def456ghi789']"
61
)
62
63
indicator2 = Indicator(
64
name="File Hash Indicator",
65
indicator_types=["malicious-activity"],
66
pattern_type="stix",
67
pattern="[file:hashes.MD5 = 'abc123def456ghi789']"
68
)
69
70
# Different indicator
71
indicator3 = Indicator(
72
name="IP Address Indicator",
73
indicator_types=["malicious-activity"],
74
pattern_type="stix",
75
pattern="[ipv4-addr:value = '192.168.1.100']"
76
)
77
78
# Test equivalence with default threshold (70)
79
print(f"Indicator1 == Indicator2: {object_equivalence(indicator1, indicator2)}") # True
80
print(f"Indicator1 == Indicator3: {object_equivalence(indicator1, indicator3)}") # False
81
82
# Calculate exact similarity scores
83
score1_2 = object_similarity(indicator1, indicator2)
84
score1_3 = object_similarity(indicator1, indicator3)
85
86
print(f"Similarity indicator1 vs indicator2: {score1_2:.2f}") # High score (90+)
87
print(f"Similarity indicator1 vs indicator3: {score1_3:.2f}") # Low score (10-30)
88
89
# Test with custom threshold
90
high_threshold_equivalent = object_equivalence(indicator1, indicator2, threshold=95)
91
print(f"High threshold equivalence: {high_threshold_equivalent}")
92
93
# Test with property scores to see individual contributions
94
prop_scores = {}
95
similarity = object_similarity(indicator1, indicator2, prop_scores=prop_scores)
96
print(f"Property scores breakdown: {prop_scores}")
97
98
# Custom weights for specific properties
99
custom_weights = {
100
"indicator": {
101
"pattern": (90, "exact_match"), # Pattern matching is 90% of score
102
"name": (5, "partial_string_based"), # Name is only 5%
103
"indicator_types": (5, "exact_match") # Types are 5%
104
}
105
}
106
107
custom_score = object_similarity(indicator1, indicator2, **custom_weights)
108
print(f"Custom weighted similarity: {custom_score:.2f}")
109
```
110
111
### Graph Equivalence and Similarity
112
113
Functions to compare entire STIX graphs (collections of related objects) using DataStore instances, enabling comparison of complex threat intelligence datasets.
114
115
```python { .api }
116
def graph_equivalence(ds1, ds2, prop_scores={}, threshold=70, ignore_spec_version=False, versioning_checks=False, max_depth=1, **weight_dict):
117
"""
118
Determine if two STIX graphs are semantically equivalent.
119
120
Parameters:
121
- ds1: First DataStore instance representing a STIX graph
122
- ds2: Second DataStore instance representing a STIX graph
123
- prop_scores (dict): Dictionary to hold individual property scores and weights
124
- threshold (int): Minimum similarity score (0-100) for equivalence (default: 70)
125
- ignore_spec_version (bool): Ignore spec version differences (default: False)
126
- versioning_checks (bool): Test multiple object revisions (default: False)
127
- max_depth (int): Maximum recursion depth for de-referencing (default: 1)
128
- **weight_dict: Custom weights to override default similarity checks
129
130
Returns:
131
bool: True if graph similarity >= threshold, False otherwise
132
"""
133
134
def graph_similarity(ds1, ds2, prop_scores={}, ignore_spec_version=False, versioning_checks=False, max_depth=1, **weight_dict):
135
"""
136
Calculate similarity score between two STIX graphs.
137
138
Parameters:
139
- ds1: First DataStore instance representing a STIX graph
140
- ds2: Second DataStore instance representing a STIX graph
141
- prop_scores (dict): Dictionary to hold individual property scores and weights
142
- ignore_spec_version (bool): Ignore spec version differences (default: False)
143
- versioning_checks (bool): Test multiple object revisions (default: False)
144
- max_depth (int): Maximum recursion depth for de-referencing (default: 1)
145
- **weight_dict: Custom weights to override default similarity checks
146
147
Returns:
148
float: Similarity score between 0.0 and 100.0
149
"""
150
```
151
152
Usage examples:
153
154
```python
155
from stix2.equivalence.graph import graph_equivalence, graph_similarity
156
from stix2 import MemoryStore, Indicator, Malware, Relationship
157
158
# Create first threat intelligence graph
159
malware1 = Malware(
160
name="Zeus Banking Trojan",
161
malware_types=["trojan"]
162
)
163
164
indicator1 = Indicator(
165
name="Zeus Hash",
166
indicator_types=["malicious-activity"],
167
pattern_type="stix",
168
pattern="[file:hashes.MD5 = 'abc123']"
169
)
170
171
relationship1 = Relationship(
172
relationship_type="indicates",
173
source_ref=indicator1.id,
174
target_ref=malware1.id
175
)
176
177
# Store in first DataStore
178
ds1 = MemoryStore()
179
ds1.add([malware1, indicator1, relationship1])
180
181
# Create similar second graph
182
malware2 = Malware(
183
name="Zeus Trojan Variant",
184
malware_types=["trojan"]
185
)
186
187
indicator2 = Indicator(
188
name="Zeus File Hash",
189
indicator_types=["malicious-activity"],
190
pattern_type="stix",
191
pattern="[file:hashes.MD5 = 'abc123']"
192
)
193
194
relationship2 = Relationship(
195
relationship_type="indicates",
196
source_ref=indicator2.id,
197
target_ref=malware2.id
198
)
199
200
# Store in second DataStore
201
ds2 = MemoryStore()
202
ds2.add([malware2, indicator2, relationship2])
203
204
# Compare graphs
205
graph_equiv = graph_equivalence(ds1, ds2)
206
graph_sim = graph_similarity(ds1, ds2)
207
208
print(f"Graphs are equivalent: {graph_equiv}")
209
print(f"Graph similarity score: {graph_sim:.2f}")
210
211
# Create dissimilar third graph for comparison
212
malware3 = Malware(
213
name="Emotet Banking Malware",
214
malware_types=["trojan"]
215
)
216
217
indicator3 = Indicator(
218
name="Emotet Hash",
219
indicator_types=["malicious-activity"],
220
pattern_type="stix",
221
pattern="[file:hashes.SHA256 = 'xyz789']"
222
)
223
224
ds3 = MemoryStore()
225
ds3.add([malware3, indicator3])
226
227
# Compare different graphs
228
diff_graph_sim = graph_similarity(ds1, ds3)
229
print(f"Different graphs similarity: {diff_graph_sim:.2f}")
230
231
# Graph comparison with custom weights
232
graph_weights = {
233
"malware": {
234
"name": (50, "partial_string_based"),
235
"malware_types": (50, "exact_match")
236
},
237
"indicator": {
238
"pattern": (80, "custom_pattern_based"),
239
"name": (20, "partial_string_based")
240
}
241
}
242
243
weighted_similarity = graph_similarity(ds1, ds2, **graph_weights)
244
print(f"Weighted graph similarity: {weighted_similarity:.2f}")
245
```
246
247
### Pattern Equivalence and Matching
248
249
Specialized functions for comparing STIX indicator patterns, enabling semantic matching of detection rules and observables.
250
251
```python { .api }
252
def equivalent_patterns(pattern1, pattern2, stix_version="2.1"):
253
"""
254
Determine if two STIX patterns are semantically equivalent.
255
256
Parameters:
257
- pattern1 (str): First STIX pattern string
258
- pattern2 (str): Second STIX pattern string
259
- stix_version (str): STIX version for parsing ("2.0", "2.1", etc.)
260
261
Returns:
262
bool: True if patterns are semantically equivalent, False otherwise
263
"""
264
265
def find_equivalent_patterns(search_pattern, patterns, stix_version="2.1"):
266
"""
267
Find patterns from a sequence equivalent to a given pattern.
268
269
Parameters:
270
- search_pattern (str): Search pattern string
271
- patterns (iterable): Sequence of pattern strings to search
272
- stix_version (str): STIX version for parsing
273
274
Returns:
275
generator: Generator yielding equivalent patterns
276
"""
277
```
278
279
Usage examples:
280
281
```python
282
from stix2.equivalence.pattern import equivalent_patterns, find_equivalent_patterns
283
284
# Test pattern equivalence
285
pattern1 = "[file:hashes.MD5 = 'abc123'] AND [file:size = 1024]"
286
pattern2 = "[file:size = 1024] AND [file:hashes.MD5 = 'abc123']" # Same logic, different order
287
pattern3 = "[file:hashes.SHA1 = 'def456']" # Different pattern
288
289
print(f"Pattern1 == Pattern2: {equivalent_patterns(pattern1, pattern2)}") # True
290
print(f"Pattern1 == Pattern3: {equivalent_patterns(pattern1, pattern3)}") # False
291
292
# Test boolean logic equivalence
293
logical1 = "[a:b = 1] OR [a:b = 1]" # Redundant OR
294
logical2 = "[a:b = 1]" # Simplified
295
logical3 = "[a:b = 1] AND [a:b = 1]" # Redundant AND -> same as simplified
296
297
print(f"Redundant OR equivalent: {equivalent_patterns(logical1, logical2)}") # True
298
print(f"Redundant AND equivalent: {equivalent_patterns(logical3, logical2)}") # True
299
300
# Complex boolean equivalence
301
complex1 = "([a:b = 1] OR [a:b = 2]) AND [c:d = 3]"
302
complex2 = "[c:d = 3] AND ([a:b = 1] OR [a:b = 2])" # Commutative
303
complex3 = "([a:b = 1] AND [c:d = 3]) OR ([a:b = 2] AND [c:d = 3])" # Distributive
304
305
print(f"Commutative patterns: {equivalent_patterns(complex1, complex2)}") # True
306
print(f"Distributive patterns: {equivalent_patterns(complex1, complex3)}") # True
307
308
# Temporal qualifier equivalence
309
temporal1 = "[file:name = 'malware.exe'] REPEATS 2 TIMES WITHIN 300 SECONDS"
310
temporal2 = "[file:name = 'malware.exe'] REPEATS 2 TIMES WITHIN 5 MINUTES" # Same duration
311
312
print(f"Temporal equivalence: {equivalent_patterns(temporal1, temporal2)}") # True
313
314
# Find equivalent patterns in a collection
315
search_pattern = "[ipv4-addr:value = '192.168.1.1']"
316
pattern_database = [
317
"[ipv4-addr:value = '192.168.1.1']", # Exact match
318
"[ipv4-addr:value = '192.168.1.2']", # Different IP
319
"[ipv4-addr:value = '192.168.1.1'] AND [ipv4-addr:value = '192.168.1.1']", # Redundant
320
"[network-traffic:src_ref.value = '192.168.1.1']", # Different structure
321
"[domain-name:value = 'example.com']" # Completely different
322
]
323
324
equivalent_found = list(find_equivalent_patterns(search_pattern, pattern_database))
325
print(f"Equivalent patterns found: {len(equivalent_found)}")
326
for pattern in equivalent_found:
327
print(f" - {pattern}")
328
329
# Performance comparison: bulk pattern matching
330
def find_equivalents_naive(search, patterns):
331
"""Naive approach using repeated calls."""
332
return [p for p in patterns if equivalent_patterns(search, p)]
333
334
def find_equivalents_optimized(search, patterns):
335
"""Optimized approach using find_equivalent_patterns."""
336
return list(find_equivalent_patterns(search, patterns))
337
338
# Large pattern database simulation
339
large_pattern_db = [
340
f"[file:hashes.MD5 = '{i:032d}']" for i in range(1000)
341
] + [
342
"[file:hashes.MD5 = '00000000000000000000000000000001']", # Match
343
"[file:hashes.MD5 = '00000000000000000000000000000001'] AND [file:hashes.MD5 = '00000000000000000000000000000001']" # Redundant match
344
]
345
346
search_in_large = "[file:hashes.MD5 = '00000000000000000000000000000001']"
347
348
# The optimized version is more efficient for large datasets
349
matches_optimized = find_equivalents_optimized(search_in_large, large_pattern_db)
350
print(f"Matches found in large database: {len(matches_optimized)}")
351
352
# Version-specific pattern testing
353
stix_20_pattern = "[file:hashes.MD5 = 'abc123']" # STIX 2.0 compatible
354
stix_21_pattern = "[file:hashes.MD5 = 'abc123']" # Same in STIX 2.1
355
356
equiv_20 = equivalent_patterns(stix_20_pattern, stix_21_pattern, stix_version="2.0")
357
equiv_21 = equivalent_patterns(stix_20_pattern, stix_21_pattern, stix_version="2.1")
358
359
print(f"Cross-version equivalence (2.0): {equiv_20}")
360
print(f"Cross-version equivalence (2.1): {equiv_21}")
361
```
362
363
### Advanced Equivalence Configuration
364
365
Configuration options and advanced usage patterns for fine-tuning equivalence calculations.
366
367
```python
368
from stix2.equivalence.object import exact_match, partial_string_based, partial_timestamp_based
369
from stix2.equivalence.object import custom_pattern_based, partial_external_reference_based
370
from stix2 import Environment
371
import datetime
372
373
# Create environment with equivalence capabilities
374
env = Environment()
375
376
# Advanced weight configuration for different object types
377
advanced_weights = {
378
"indicator": {
379
"pattern": (85, custom_pattern_based), # Custom pattern comparison
380
"name": (10, partial_string_based), # Fuzzy string matching
381
"indicator_types": (5, exact_match) # Exact list matching
382
},
383
"malware": {
384
"name": (60, partial_string_based),
385
"malware_types": (30, exact_match),
386
"is_family": (10, exact_match)
387
},
388
"threat-actor": {
389
"name": (40, partial_string_based),
390
"threat_actor_types": (30, exact_match),
391
"aliases": (20, "partial_list_based"), # Partial list overlap
392
"first_seen": (10, partial_timestamp_based)
393
}
394
}
395
396
# Use environment methods for equivalence
397
indicator_a = Indicator(
398
name="Suspicious File",
399
indicator_types=["malicious-activity"],
400
pattern_type="stix",
401
pattern="[file:hashes.MD5 = 'abc123']"
402
)
403
404
indicator_b = Indicator(
405
name="Malicious File Hash",
406
indicator_types=["malicious-activity"],
407
pattern_type="stix",
408
pattern="[file:hashes.MD5 = 'abc123']"
409
)
410
411
# Environment-based similarity calculation
412
env_similarity = env.object_similarity(indicator_a, indicator_b, **advanced_weights)
413
print(f"Environment similarity: {env_similarity:.2f}")
414
415
# Versioning checks - compare across object versions
416
from stix2 import new_version
417
418
indicator_v1 = Indicator(
419
name="Base Indicator",
420
indicator_types=["malicious-activity"],
421
pattern_type="stix",
422
pattern="[file:name = 'malware.exe']"
423
)
424
425
# Create new version with additional confidence
426
indicator_v2 = new_version(indicator_v1, confidence=85)
427
428
# Compare with versioning checks enabled
429
version_similarity = env.object_similarity(
430
indicator_v1,
431
indicator_v2,
432
versioning_checks=True
433
)
434
print(f"Version similarity: {version_similarity:.2f}")
435
436
# Reference checking with DataStores
437
from stix2 import MemoryStore, Malware, Relationship
438
439
# Create objects with references
440
malware_a = Malware(name="TrojanA", malware_types=["trojan"])
441
malware_b = Malware(name="TrojanB", malware_types=["trojan"])
442
443
relationship_a = Relationship(
444
relationship_type="indicates",
445
source_ref=indicator_a.id,
446
target_ref=malware_a.id
447
)
448
449
relationship_b = Relationship(
450
relationship_type="indicates",
451
source_ref=indicator_b.id,
452
target_ref=malware_b.id
453
)
454
455
# Create datastores with the objects
456
ds_a = MemoryStore([indicator_a, malware_a, relationship_a])
457
ds_b = MemoryStore([indicator_b, malware_b, relationship_b])
458
459
# Compare relationships with reference resolution
460
ref_similarity = env.object_similarity(
461
relationship_a,
462
relationship_b,
463
ds1=ds_a,
464
ds2=ds_b,
465
max_depth=2 # Follow references 2 levels deep
466
)
467
print(f"Reference-resolved similarity: {ref_similarity:.2f}")
468
469
# Property score analysis
470
prop_scores = {}
471
detailed_similarity = env.object_similarity(
472
indicator_a,
473
indicator_b,
474
prop_scores=prop_scores,
475
**advanced_weights
476
)
477
478
print(f"Detailed property scores:")
479
for prop, score in prop_scores.items():
480
print(f" {prop}: {score}")
481
482
# Timestamp-based comparison with tolerance
483
from datetime import datetime, timedelta
484
485
# Create objects with similar timestamps
486
now = datetime.now()
487
timestamp_a = now.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
488
timestamp_b = (now + timedelta(seconds=30)).strftime("%Y-%m-%dT%H:%M:%S.%fZ")
489
490
indicator_time_a = Indicator(
491
name="Time Test A",
492
indicator_types=["malicious-activity"],
493
pattern_type="stix",
494
pattern="[file:name = 'test.exe']",
495
valid_from=timestamp_a
496
)
497
498
indicator_time_b = Indicator(
499
name="Time Test B",
500
indicator_types=["malicious-activity"],
501
pattern_type="stix",
502
pattern="[file:name = 'test.exe']",
503
valid_from=timestamp_b
504
)
505
506
# Custom timestamp tolerance (60 seconds)
507
timestamp_weights = {
508
"indicator": {
509
"pattern": (70, custom_pattern_based),
510
"name": (20, partial_string_based),
511
"valid_from": (10, lambda t1, t2: partial_timestamp_based(t1, t2, 60))
512
}
513
}
514
515
timestamp_similarity = env.object_similarity(
516
indicator_time_a,
517
indicator_time_b,
518
**timestamp_weights
519
)
520
print(f"Timestamp tolerance similarity: {timestamp_similarity:.2f}")
521
```
522
523
### Integration with STIX Environment
524
525
The equivalence functions are integrated into the STIX Environment class, providing a consistent interface for semantic analysis across STIX workflows.
526
527
```python
528
from stix2 import Environment
529
530
# Create environment for equivalence operations
531
env = Environment()
532
533
# Environment provides direct access to equivalence functions
534
objects_equivalent = env.object_equivalence(obj1, obj2, threshold=80)
535
similarity_score = env.object_similarity(obj1, obj2)
536
537
# Environment-based graph comparison
538
graph_equivalent = env.graph_equivalence(datastore1, datastore2)
539
graph_score = env.graph_similarity(datastore1, datastore2)
540
541
print(f"Objects equivalent: {objects_equivalent}")
542
print(f"Similarity score: {similarity_score:.2f}")
543
print(f"Graphs equivalent: {graph_equivalent}")
544
print(f"Graph similarity: {graph_score:.2f}")
545
```