Produce and consume STIX 2 JSON content for cyber threat intelligence
Semantic equivalence and similarity algorithms for STIX objects, graphs, and patterns. These functions implement the STIX Semantic Equivalence Committee Note specifications, enabling intelligent comparison of STIX content that goes beyond simple string matching to understand semantic relationships.
Functions to determine semantic equivalence and calculate similarity scores between STIX objects using configurable weights and property comparisons.
def object_equivalence(obj1, obj2, prop_scores={}, threshold=70, ds1=None, ds2=None, ignore_spec_version=False, versioning_checks=False, max_depth=1, **weight_dict):
"""
Determine if two STIX objects are semantically equivalent.
Parameters:
- obj1: First STIX object instance
- obj2: Second STIX object instance
- prop_scores (dict): Dictionary to hold individual property scores and weights
- threshold (int): Minimum similarity score (0-100) for equivalence (default: 70)
- ds1/ds2: Optional DataStore instances for pulling related objects
- ignore_spec_version (bool): Ignore spec version differences (default: False)
- versioning_checks (bool): Test multiple object revisions (default: False)
- max_depth (int): Maximum recursion depth for de-referencing (default: 1)
- **weight_dict: Custom weights to override default similarity checks
Returns:
bool: True if similarity score >= threshold, False otherwise
"""
def object_similarity(obj1, obj2, prop_scores={}, ds1=None, ds2=None, ignore_spec_version=False, versioning_checks=False, max_depth=1, **weight_dict):
"""
Calculate similarity score between two STIX objects.
Parameters:
- obj1: First STIX object instance
- obj2: Second STIX object instance
- prop_scores (dict): Dictionary to hold individual property scores and weights
- ds1/ds2: Optional DataStore instances for pulling related objects
- ignore_spec_version (bool): Ignore spec version differences (default: False)
- versioning_checks (bool): Test multiple object revisions (default: False)
- max_depth (int): Maximum recursion depth for de-referencing (default: 1)
- **weight_dict: Custom weights to override default similarity checks
Returns:
float: Similarity score between 0.0 and 100.0
"""Usage examples:
from stix2.equivalence.object import object_equivalence, object_similarity
from stix2 import Indicator, Malware, MemoryStore
# Create similar indicators
indicator1 = Indicator(
name="Malicious File Hash",
indicator_types=["malicious-activity"],
pattern_type="stix",
pattern="[file:hashes.MD5 = 'abc123def456ghi789']"
)
indicator2 = Indicator(
name="File Hash Indicator",
indicator_types=["malicious-activity"],
pattern_type="stix",
pattern="[file:hashes.MD5 = 'abc123def456ghi789']"
)
# Different indicator
indicator3 = Indicator(
name="IP Address Indicator",
indicator_types=["malicious-activity"],
pattern_type="stix",
pattern="[ipv4-addr:value = '192.168.1.100']"
)
# Test equivalence with default threshold (70)
print(f"Indicator1 == Indicator2: {object_equivalence(indicator1, indicator2)}") # True
print(f"Indicator1 == Indicator3: {object_equivalence(indicator1, indicator3)}") # False
# Calculate exact similarity scores
score1_2 = object_similarity(indicator1, indicator2)
score1_3 = object_similarity(indicator1, indicator3)
print(f"Similarity indicator1 vs indicator2: {score1_2:.2f}") # High score (90+)
print(f"Similarity indicator1 vs indicator3: {score1_3:.2f}") # Low score (10-30)
# Test with custom threshold
high_threshold_equivalent = object_equivalence(indicator1, indicator2, threshold=95)
print(f"High threshold equivalence: {high_threshold_equivalent}")
# Test with property scores to see individual contributions
prop_scores = {}
similarity = object_similarity(indicator1, indicator2, prop_scores=prop_scores)
print(f"Property scores breakdown: {prop_scores}")
# Custom weights for specific properties
custom_weights = {
"indicator": {
"pattern": (90, "exact_match"), # Pattern matching is 90% of score
"name": (5, "partial_string_based"), # Name is only 5%
"indicator_types": (5, "exact_match") # Types are 5%
}
}
custom_score = object_similarity(indicator1, indicator2, **custom_weights)
print(f"Custom weighted similarity: {custom_score:.2f}")Functions to compare entire STIX graphs (collections of related objects) using DataStore instances, enabling comparison of complex threat intelligence datasets.
def graph_equivalence(ds1, ds2, prop_scores={}, threshold=70, ignore_spec_version=False, versioning_checks=False, max_depth=1, **weight_dict):
"""
Determine if two STIX graphs are semantically equivalent.
Parameters:
- ds1: First DataStore instance representing a STIX graph
- ds2: Second DataStore instance representing a STIX graph
- prop_scores (dict): Dictionary to hold individual property scores and weights
- threshold (int): Minimum similarity score (0-100) for equivalence (default: 70)
- ignore_spec_version (bool): Ignore spec version differences (default: False)
- versioning_checks (bool): Test multiple object revisions (default: False)
- max_depth (int): Maximum recursion depth for de-referencing (default: 1)
- **weight_dict: Custom weights to override default similarity checks
Returns:
bool: True if graph similarity >= threshold, False otherwise
"""
def graph_similarity(ds1, ds2, prop_scores={}, ignore_spec_version=False, versioning_checks=False, max_depth=1, **weight_dict):
"""
Calculate similarity score between two STIX graphs.
Parameters:
- ds1: First DataStore instance representing a STIX graph
- ds2: Second DataStore instance representing a STIX graph
- prop_scores (dict): Dictionary to hold individual property scores and weights
- ignore_spec_version (bool): Ignore spec version differences (default: False)
- versioning_checks (bool): Test multiple object revisions (default: False)
- max_depth (int): Maximum recursion depth for de-referencing (default: 1)
- **weight_dict: Custom weights to override default similarity checks
Returns:
float: Similarity score between 0.0 and 100.0
"""Usage examples:
from stix2.equivalence.graph import graph_equivalence, graph_similarity
from stix2 import MemoryStore, Indicator, Malware, Relationship
# Create first threat intelligence graph
malware1 = Malware(
name="Zeus Banking Trojan",
malware_types=["trojan"]
)
indicator1 = Indicator(
name="Zeus Hash",
indicator_types=["malicious-activity"],
pattern_type="stix",
pattern="[file:hashes.MD5 = 'abc123']"
)
relationship1 = Relationship(
relationship_type="indicates",
source_ref=indicator1.id,
target_ref=malware1.id
)
# Store in first DataStore
ds1 = MemoryStore()
ds1.add([malware1, indicator1, relationship1])
# Create similar second graph
malware2 = Malware(
name="Zeus Trojan Variant",
malware_types=["trojan"]
)
indicator2 = Indicator(
name="Zeus File Hash",
indicator_types=["malicious-activity"],
pattern_type="stix",
pattern="[file:hashes.MD5 = 'abc123']"
)
relationship2 = Relationship(
relationship_type="indicates",
source_ref=indicator2.id,
target_ref=malware2.id
)
# Store in second DataStore
ds2 = MemoryStore()
ds2.add([malware2, indicator2, relationship2])
# Compare graphs
graph_equiv = graph_equivalence(ds1, ds2)
graph_sim = graph_similarity(ds1, ds2)
print(f"Graphs are equivalent: {graph_equiv}")
print(f"Graph similarity score: {graph_sim:.2f}")
# Create dissimilar third graph for comparison
malware3 = Malware(
name="Emotet Banking Malware",
malware_types=["trojan"]
)
indicator3 = Indicator(
name="Emotet Hash",
indicator_types=["malicious-activity"],
pattern_type="stix",
pattern="[file:hashes.SHA256 = 'xyz789']"
)
ds3 = MemoryStore()
ds3.add([malware3, indicator3])
# Compare different graphs
diff_graph_sim = graph_similarity(ds1, ds3)
print(f"Different graphs similarity: {diff_graph_sim:.2f}")
# Graph comparison with custom weights
graph_weights = {
"malware": {
"name": (50, "partial_string_based"),
"malware_types": (50, "exact_match")
},
"indicator": {
"pattern": (80, "custom_pattern_based"),
"name": (20, "partial_string_based")
}
}
weighted_similarity = graph_similarity(ds1, ds2, **graph_weights)
print(f"Weighted graph similarity: {weighted_similarity:.2f}")Specialized functions for comparing STIX indicator patterns, enabling semantic matching of detection rules and observables.
def equivalent_patterns(pattern1, pattern2, stix_version="2.1"):
"""
Determine if two STIX patterns are semantically equivalent.
Parameters:
- pattern1 (str): First STIX pattern string
- pattern2 (str): Second STIX pattern string
- stix_version (str): STIX version for parsing ("2.0", "2.1", etc.)
Returns:
bool: True if patterns are semantically equivalent, False otherwise
"""
def find_equivalent_patterns(search_pattern, patterns, stix_version="2.1"):
"""
Find patterns from a sequence equivalent to a given pattern.
Parameters:
- search_pattern (str): Search pattern string
- patterns (iterable): Sequence of pattern strings to search
- stix_version (str): STIX version for parsing
Returns:
generator: Generator yielding equivalent patterns
"""Usage examples:
from stix2.equivalence.pattern import equivalent_patterns, find_equivalent_patterns
# Test pattern equivalence
pattern1 = "[file:hashes.MD5 = 'abc123'] AND [file:size = 1024]"
pattern2 = "[file:size = 1024] AND [file:hashes.MD5 = 'abc123']" # Same logic, different order
pattern3 = "[file:hashes.SHA1 = 'def456']" # Different pattern
print(f"Pattern1 == Pattern2: {equivalent_patterns(pattern1, pattern2)}") # True
print(f"Pattern1 == Pattern3: {equivalent_patterns(pattern1, pattern3)}") # False
# Test boolean logic equivalence
logical1 = "[a:b = 1] OR [a:b = 1]" # Redundant OR
logical2 = "[a:b = 1]" # Simplified
logical3 = "[a:b = 1] AND [a:b = 1]" # Redundant AND -> same as simplified
print(f"Redundant OR equivalent: {equivalent_patterns(logical1, logical2)}") # True
print(f"Redundant AND equivalent: {equivalent_patterns(logical3, logical2)}") # True
# Complex boolean equivalence
complex1 = "([a:b = 1] OR [a:b = 2]) AND [c:d = 3]"
complex2 = "[c:d = 3] AND ([a:b = 1] OR [a:b = 2])" # Commutative
complex3 = "([a:b = 1] AND [c:d = 3]) OR ([a:b = 2] AND [c:d = 3])" # Distributive
print(f"Commutative patterns: {equivalent_patterns(complex1, complex2)}") # True
print(f"Distributive patterns: {equivalent_patterns(complex1, complex3)}") # True
# Temporal qualifier equivalence
temporal1 = "[file:name = 'malware.exe'] REPEATS 2 TIMES WITHIN 300 SECONDS"
temporal2 = "[file:name = 'malware.exe'] REPEATS 2 TIMES WITHIN 5 MINUTES" # Same duration
print(f"Temporal equivalence: {equivalent_patterns(temporal1, temporal2)}") # True
# Find equivalent patterns in a collection
search_pattern = "[ipv4-addr:value = '192.168.1.1']"
pattern_database = [
"[ipv4-addr:value = '192.168.1.1']", # Exact match
"[ipv4-addr:value = '192.168.1.2']", # Different IP
"[ipv4-addr:value = '192.168.1.1'] AND [ipv4-addr:value = '192.168.1.1']", # Redundant
"[network-traffic:src_ref.value = '192.168.1.1']", # Different structure
"[domain-name:value = 'example.com']" # Completely different
]
equivalent_found = list(find_equivalent_patterns(search_pattern, pattern_database))
print(f"Equivalent patterns found: {len(equivalent_found)}")
for pattern in equivalent_found:
print(f" - {pattern}")
# Performance comparison: bulk pattern matching
def find_equivalents_naive(search, patterns):
"""Naive approach using repeated calls."""
return [p for p in patterns if equivalent_patterns(search, p)]
def find_equivalents_optimized(search, patterns):
"""Optimized approach using find_equivalent_patterns."""
return list(find_equivalent_patterns(search, patterns))
# Large pattern database simulation
large_pattern_db = [
f"[file:hashes.MD5 = '{i:032d}']" for i in range(1000)
] + [
"[file:hashes.MD5 = '00000000000000000000000000000001']", # Match
"[file:hashes.MD5 = '00000000000000000000000000000001'] AND [file:hashes.MD5 = '00000000000000000000000000000001']" # Redundant match
]
search_in_large = "[file:hashes.MD5 = '00000000000000000000000000000001']"
# The optimized version is more efficient for large datasets
matches_optimized = find_equivalents_optimized(search_in_large, large_pattern_db)
print(f"Matches found in large database: {len(matches_optimized)}")
# Version-specific pattern testing
stix_20_pattern = "[file:hashes.MD5 = 'abc123']" # STIX 2.0 compatible
stix_21_pattern = "[file:hashes.MD5 = 'abc123']" # Same in STIX 2.1
equiv_20 = equivalent_patterns(stix_20_pattern, stix_21_pattern, stix_version="2.0")
equiv_21 = equivalent_patterns(stix_20_pattern, stix_21_pattern, stix_version="2.1")
print(f"Cross-version equivalence (2.0): {equiv_20}")
print(f"Cross-version equivalence (2.1): {equiv_21}")Configuration options and advanced usage patterns for fine-tuning equivalence calculations.
from stix2.equivalence.object import exact_match, partial_string_based, partial_timestamp_based
from stix2.equivalence.object import custom_pattern_based, partial_external_reference_based
from stix2 import Environment
import datetime
# Create environment with equivalence capabilities
env = Environment()
# Advanced weight configuration for different object types
advanced_weights = {
"indicator": {
"pattern": (85, custom_pattern_based), # Custom pattern comparison
"name": (10, partial_string_based), # Fuzzy string matching
"indicator_types": (5, exact_match) # Exact list matching
},
"malware": {
"name": (60, partial_string_based),
"malware_types": (30, exact_match),
"is_family": (10, exact_match)
},
"threat-actor": {
"name": (40, partial_string_based),
"threat_actor_types": (30, exact_match),
"aliases": (20, "partial_list_based"), # Partial list overlap
"first_seen": (10, partial_timestamp_based)
}
}
# Use environment methods for equivalence
indicator_a = Indicator(
name="Suspicious File",
indicator_types=["malicious-activity"],
pattern_type="stix",
pattern="[file:hashes.MD5 = 'abc123']"
)
indicator_b = Indicator(
name="Malicious File Hash",
indicator_types=["malicious-activity"],
pattern_type="stix",
pattern="[file:hashes.MD5 = 'abc123']"
)
# Environment-based similarity calculation
env_similarity = env.object_similarity(indicator_a, indicator_b, **advanced_weights)
print(f"Environment similarity: {env_similarity:.2f}")
# Versioning checks - compare across object versions
from stix2 import new_version
indicator_v1 = Indicator(
name="Base Indicator",
indicator_types=["malicious-activity"],
pattern_type="stix",
pattern="[file:name = 'malware.exe']"
)
# Create new version with additional confidence
indicator_v2 = new_version(indicator_v1, confidence=85)
# Compare with versioning checks enabled
version_similarity = env.object_similarity(
indicator_v1,
indicator_v2,
versioning_checks=True
)
print(f"Version similarity: {version_similarity:.2f}")
# Reference checking with DataStores
from stix2 import MemoryStore, Malware, Relationship
# Create objects with references
malware_a = Malware(name="TrojanA", malware_types=["trojan"])
malware_b = Malware(name="TrojanB", malware_types=["trojan"])
relationship_a = Relationship(
relationship_type="indicates",
source_ref=indicator_a.id,
target_ref=malware_a.id
)
relationship_b = Relationship(
relationship_type="indicates",
source_ref=indicator_b.id,
target_ref=malware_b.id
)
# Create datastores with the objects
ds_a = MemoryStore([indicator_a, malware_a, relationship_a])
ds_b = MemoryStore([indicator_b, malware_b, relationship_b])
# Compare relationships with reference resolution
ref_similarity = env.object_similarity(
relationship_a,
relationship_b,
ds1=ds_a,
ds2=ds_b,
max_depth=2 # Follow references 2 levels deep
)
print(f"Reference-resolved similarity: {ref_similarity:.2f}")
# Property score analysis
prop_scores = {}
detailed_similarity = env.object_similarity(
indicator_a,
indicator_b,
prop_scores=prop_scores,
**advanced_weights
)
print(f"Detailed property scores:")
for prop, score in prop_scores.items():
print(f" {prop}: {score}")
# Timestamp-based comparison with tolerance
from datetime import datetime, timedelta
# Create objects with similar timestamps
now = datetime.now()
timestamp_a = now.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
timestamp_b = (now + timedelta(seconds=30)).strftime("%Y-%m-%dT%H:%M:%S.%fZ")
indicator_time_a = Indicator(
name="Time Test A",
indicator_types=["malicious-activity"],
pattern_type="stix",
pattern="[file:name = 'test.exe']",
valid_from=timestamp_a
)
indicator_time_b = Indicator(
name="Time Test B",
indicator_types=["malicious-activity"],
pattern_type="stix",
pattern="[file:name = 'test.exe']",
valid_from=timestamp_b
)
# Custom timestamp tolerance (60 seconds)
timestamp_weights = {
"indicator": {
"pattern": (70, custom_pattern_based),
"name": (20, partial_string_based),
"valid_from": (10, lambda t1, t2: partial_timestamp_based(t1, t2, 60))
}
}
timestamp_similarity = env.object_similarity(
indicator_time_a,
indicator_time_b,
**timestamp_weights
)
print(f"Timestamp tolerance similarity: {timestamp_similarity:.2f}")The equivalence functions are integrated into the STIX Environment class, providing a consistent interface for semantic analysis across STIX workflows.
from stix2 import Environment
# Create environment for equivalence operations
env = Environment()
# Environment provides direct access to equivalence functions
objects_equivalent = env.object_equivalence(obj1, obj2, threshold=80)
similarity_score = env.object_similarity(obj1, obj2)
# Environment-based graph comparison
graph_equivalent = env.graph_equivalence(datastore1, datastore2)
graph_score = env.graph_similarity(datastore1, datastore2)
print(f"Objects equivalent: {objects_equivalent}")
print(f"Similarity score: {similarity_score:.2f}")
print(f"Graphs equivalent: {graph_equivalent}")
print(f"Graph similarity: {graph_score:.2f}")Install with Tessl CLI
npx tessl i tessl/pypi-stix2