Produce and consume STIX 2 JSON content for cyber threat intelligence
Version management system for creating new versions of STIX objects, tracking changes over time, and handling object revocation with proper timestamp and identifier management.
Core functions for creating new versions and revoking STIX objects.
def new_version(stix_obj, **kwargs):
"""
Create a new version of an existing STIX object.
Parameters:
- stix_obj: Existing STIX object to create new version of
- **kwargs: Properties to update in the new version
Returns:
New STIX object with updated properties and incremented modified timestamp
Raises:
UnmodifiablePropertyError: If attempting to modify immutable properties
TypeNotVersionableError: If object type doesn't support versioning
ObjectNotVersionableError: If object lacks required versioning properties
RevokeError: If attempting to version a revoked object
"""
def revoke(stix_obj):
"""
Create a revoked version of a STIX object.
Parameters:
- stix_obj: STIX object to revoke
Returns:
New STIX object marked as revoked
Raises:
RevokeError: If object is already revoked
TypeNotVersionableError: If object type doesn't support revocation
"""Create new versions of STIX objects with updated properties.
Usage examples:
from stix2 import new_version, Indicator, Malware
# Create initial version
indicator_v1 = Indicator(
name="Suspicious Domain",
indicator_types=["malicious-activity"],
pattern_type="stix",
pattern="[domain-name:value = 'suspicious.com']",
confidence=50
)
print(f"V1 ID: {indicator_v1.id}")
print(f"V1 Created: {indicator_v1.created}")
print(f"V1 Modified: {indicator_v1.modified}")
print(f"V1 Confidence: {indicator_v1.confidence}")
# Create new version with updated confidence
indicator_v2 = new_version(indicator_v1, confidence=85)
print(f"V2 ID: {indicator_v2.id}") # Same ID
print(f"V2 Created: {indicator_v2.created}") # Same creation time
print(f"V2 Modified: {indicator_v2.modified}") # New modification time
print(f"V2 Confidence: {indicator_v2.confidence}") # Updated confidence
# Create another version with additional changes
indicator_v3 = new_version(indicator_v2,
confidence=95,
description="Confirmed malicious domain used in phishing campaign",
labels=["phishing", "credential-theft"]
)
# Versioning preserves object identity
assert indicator_v1.id == indicator_v2.id == indicator_v3.id
assert indicator_v1.created == indicator_v2.created == indicator_v3.created
assert indicator_v1.modified < indicator_v2.modified < indicator_v3.modifiedCertain properties cannot be changed when creating new versions.
from stix2 import new_version, UnmodifiablePropertyError
try:
# Attempting to change immutable properties raises error
invalid_version = new_version(indicator_v1,
id="indicator--new-id-12345678-1234-1234-1234-123456789012", # Cannot change ID
type="malware", # Cannot change type
created="2022-01-01T00:00:00.000Z" # Cannot change creation time
)
except UnmodifiablePropertyError as e:
print(f"Cannot modify: {e.unchangable_properties}")
# Valid versioning - only modifiable properties
valid_version = new_version(indicator_v1,
name="Updated Suspicious Domain", # Can change name
confidence=75, # Can change confidence
description="Additional context added", # Can add description
external_references=[ # Can add external references
{
"source_name": "internal-analysis",
"description": "Internal security team analysis"
}
]
)Mark objects as revoked to indicate they should no longer be used.
from stix2 import revoke, RevokeError
# Create malware object
malware = Malware(
name="Old Malware Name",
malware_types=["trojan"]
)
# Revoke the object
revoked_malware = revoke(malware)
print(f"Original revoked: {malware.revoked}") # False (or not present)
print(f"Revoked version: {revoked_malware.revoked}") # True
print(f"Same ID: {malware.id == revoked_malware.id}") # True
print(f"Updated modified: {revoked_malware.modified > malware.modified}") # True
# Cannot revoke already-revoked objects
try:
double_revoked = revoke(revoked_malware)
except RevokeError as e:
print(f"Revocation error: {e}")
# Cannot create new versions of revoked objects
try:
new_version_of_revoked = new_version(revoked_malware, name="Updated Name")
except RevokeError as e:
print(f"Versioning error: {e}")Track and manage multiple versions of objects over time.
from stix2 import MemoryStore, Filter
# Create store for version tracking
store = MemoryStore()
# Add all versions of an object
threat_actor_v1 = ThreatActor(
name="APT Example",
threat_actor_types=["nation-state"],
first_seen="2020-01-01T00:00:00.000Z"
)
threat_actor_v2 = new_version(threat_actor_v1,
aliases=["APT-EX", "Example Group"],
sophistication="advanced"
)
threat_actor_v3 = new_version(threat_actor_v2,
last_seen="2021-12-31T23:59:59.000Z",
goals=["espionage", "data-theft"]
)
# Store all versions
store.add([threat_actor_v1, threat_actor_v2, threat_actor_v3])
# Get latest version (most recent modified timestamp)
latest_version = store.get(threat_actor_v1.id) # Returns most recent
print(f"Latest version modified: {latest_version.modified}")
# Get all versions of an object
all_versions = store.all_versions(threat_actor_v1.id)
print(f"Total versions: {len(all_versions)}")
for version in all_versions:
print(f"Version modified: {version.modified}")
print(f"Properties: {len(version._inner)}")
# Get specific version by timestamp
specific_version = store.get(threat_actor_v1.id, version=threat_actor_v2.modified)
print(f"Specific version: {specific_version.modified}")Compare different versions to track changes.
def compare_versions(obj_v1, obj_v2):
"""Compare two versions of the same STIX object."""
if obj_v1.id != obj_v2.id:
raise ValueError("Objects must have the same ID")
changes = {}
# Get all properties from both versions
all_props = set(obj_v1._inner.keys()) | set(obj_v2._inner.keys())
for prop in all_props:
v1_val = getattr(obj_v1, prop, None)
v2_val = getattr(obj_v2, prop, None)
if v1_val != v2_val:
changes[prop] = {
'old': v1_val,
'new': v2_val
}
return changes
# Compare versions
changes = compare_versions(threat_actor_v1, threat_actor_v3)
for prop, change in changes.items():
print(f"{prop}: {change['old']} -> {change['new']}")
# Output:
# modified: 2021-04-23T10:30:00.000Z -> 2021-04-23T11:45:00.000Z
# aliases: None -> ['APT-EX', 'Example Group']
# sophistication: None -> advanced
# last_seen: None -> 2021-12-31T23:59:59.000Z
# goals: None -> ['espionage', 'data-theft']Manage the complete lifecycle of versioned objects.
from stix2 import Bundle
class VersionManager:
"""Utility class for managing object versions."""
def __init__(self, store):
self.store = store
def create_object(self, cls, **kwargs):
"""Create new object and store it."""
obj = cls(**kwargs)
self.store.add(obj)
return obj
def update_object(self, obj_id, **updates):
"""Create new version of object with updates."""
current = self.store.get(obj_id)
if not current:
raise ValueError(f"Object {obj_id} not found")
if getattr(current, 'revoked', False):
raise RevokeError("Cannot update revoked object")
new_obj = new_version(current, **updates)
self.store.add(new_obj)
return new_obj
def revoke_object(self, obj_id, reason=None):
"""Revoke object and optionally add reason."""
current = self.store.get(obj_id)
if not current:
raise ValueError(f"Object {obj_id} not found")
revoked_obj = revoke(current)
if reason:
revoked_obj = new_version(revoked_obj,
external_references=[{
"source_name": "revocation-reason",
"description": reason
}]
)
self.store.add(revoked_obj)
return revoked_obj
def get_version_history(self, obj_id):
"""Get chronological version history."""
versions = self.store.all_versions(obj_id)
return sorted(versions, key=lambda x: x.modified)
def is_current_version(self, obj):
"""Check if object is the most current version."""
latest = self.store.get(obj.id)
return obj.modified == latest.modified
# Usage example
store = MemoryStore()
manager = VersionManager(store)
# Create initial object
indicator = manager.create_object(Indicator,
name="Initial Indicator",
indicator_types=["malicious-activity"],
pattern_type="stix",
pattern="[ip-addr:value = '192.168.1.1']",
confidence=30
)
# Update object multiple times
indicator_v2 = manager.update_object(indicator.id, confidence=60)
indicator_v3 = manager.update_object(indicator.id,
confidence=85,
description="Confirmed C2 server"
)
# Get version history
history = manager.get_version_history(indicator.id)
print(f"Version history: {len(history)} versions")
for i, version in enumerate(history, 1):
print(f"V{i}: {version.modified} - Confidence: {version.confidence}")
# Revoke object
revoked = manager.revoke_object(indicator.id,
reason="Indicator found to be false positive")
# Check current status
is_current = manager.is_current_version(indicator_v3)
print(f"V3 is current: {is_current}") # False, revoked version is currentHandle versioning when objects are referenced in relationships.
from stix2 import Relationship, ThreatActor, Malware
# Create related objects
actor = ThreatActor(
name="Initial Actor Name",
threat_actor_types=["nation-state"]
)
malware = Malware(
name="Banking Trojan",
malware_types=["trojan"]
)
# Create relationship
relationship = Relationship(
relationship_type="uses",
source_ref=actor.id,
target_ref=malware.id
)
# Store objects
store.add([actor, malware, relationship])
# Update referenced object
actor_v2 = new_version(actor,
name="Updated Actor Name",
aliases=["Actor Alias"]
)
store.add(actor_v2)
# Relationship still references same ID
retrieved_relationship = store.get(relationship.id)
referenced_actor = store.get(retrieved_relationship.source_ref)
print(f"Relationship references: {referenced_actor.name}") # "Updated Actor Name"
print(f"Actor version: {referenced_actor.modified}") # Latest version
# Create new relationship version if needed
relationship_v2 = new_version(relationship,
description="Updated relationship description"
)
store.add(relationship_v2)Version entire bundles of related objects.
from stix2 import Bundle
# Create bundle with multiple objects
bundle_v1 = Bundle(
indicator_v1,
malware,
relationship
)
# Create new bundle version with updated objects
bundle_v2 = new_version(bundle_v1,
objects=[indicator_v3, malware, relationship_v2] # Updated objects
)
print(f"Bundle V1: {len(bundle_v1.objects)} objects")
print(f"Bundle V2: {len(bundle_v2.objects)} objects")
print(f"Same bundle ID: {bundle_v1.id == bundle_v2.id}")Handle versioning errors appropriately.
from stix2.exceptions import (
TypeNotVersionableError,
ObjectNotVersionableError,
UnmodifiablePropertyError,
RevokeError
)
def safe_version_update(obj, **updates):
"""Safely attempt to create new version with error handling."""
try:
return new_version(obj, **updates)
except TypeNotVersionableError:
print(f"Object type {obj.type} does not support versioning")
return None
except ObjectNotVersionableError:
print(f"Object lacks required properties for versioning")
return None
except UnmodifiablePropertyError as e:
print(f"Cannot modify properties: {e.unchangable_properties}")
# Filter out immutable properties and retry
valid_updates = {k: v for k, v in updates.items()
if k not in e.unchangable_properties}
if valid_updates:
return new_version(obj, **valid_updates)
return None
except RevokeError:
print("Cannot version revoked object")
return None
# Safe versioning attempt
updated_obj = safe_version_update(some_object,
name="New Name",
id="invalid-id-change", # This will be filtered out
confidence=75
)Install with Tessl CLI
npx tessl i tessl/pypi-stix2