Produce and consume STIX 2 JSON content for cyber threat intelligence
Flexible data store backends for persisting and querying STIX objects including in-memory storage, file system storage, and TAXII server integration with comprehensive filtering and search capabilities.
In-memory storage for STIX objects with fast access and querying capabilities.
class MemoryStore:
"""
In-memory data store for STIX objects.
Constructor Parameters:
- stix_data (list): Initial STIX objects to store
"""
def add(self, stix_data):
"""
Add STIX objects to the store.
Parameters:
- stix_data: STIX object(s) to add
"""
def get(self, stix_id, version=None):
"""
Retrieve STIX object by ID.
Parameters:
- stix_id (str): STIX object identifier
- version (str): Specific version timestamp
Returns:
STIX object or None if not found
"""
def all_versions(self, stix_id):
"""
Retrieve all versions of a STIX object.
Parameters:
- stix_id (str): STIX object identifier
Returns:
List of STIX objects (all versions)
"""
def query(self, query=None):
"""
Query STIX objects using filters.
Parameters:
- query (list): List of Filter objects
Returns:
List of matching STIX objects
"""
class MemorySource:
"""
In-memory data source for reading STIX objects.
Constructor Parameters:
- stix_data (list): STIX objects to serve
- _store (MemoryStore): Memory store instance
"""
class MemorySink:
"""
In-memory data sink for storing STIX objects.
Constructor Parameters:
- stix_data (list): Initial STIX objects
- _store (MemoryStore): Memory store instance
"""Usage examples:
from stix2 import MemoryStore, Indicator, Malware, Filter
# Create memory store
memory_store = MemoryStore()
# Add objects
indicator = Indicator(
name="Malicious IP",
indicator_types=["malicious-activity"],
pattern_type="stix",
pattern="[ipv4-addr:value = '192.168.1.100']"
)
malware = Malware(
name="Poison Ivy",
malware_types=["remote-access-trojan"]
)
memory_store.add([indicator, malware])
# Retrieve by ID
retrieved = memory_store.get(indicator.id)
print(retrieved.name) # "Malicious IP"
# Query with filters
indicators = memory_store.query([
Filter('type', '=', 'indicator')
])
malware_objects = memory_store.query([
Filter('type', '=', 'malware'),
Filter('name', '=', 'Poison Ivy')
])
# Initialize with data
initial_data = [indicator, malware]
store_with_data = MemoryStore(initial_data)File system-based storage for persistent STIX object storage.
class FileSystemStore:
"""
File system data store for STIX objects.
Constructor Parameters:
- stix_dir (str): Directory path for STIX files
- allow_custom (bool): Allow custom STIX objects
- bundlify (bool): Store objects in STIX bundles
"""
def add(self, stix_data):
"""
Add STIX objects to file system.
Parameters:
- stix_data: STIX object(s) to store
"""
def get(self, stix_id, version=None):
"""
Retrieve STIX object from file system.
Parameters:
- stix_id (str): STIX object identifier
- version (str): Specific version timestamp
Returns:
STIX object or None if not found
"""
def all_versions(self, stix_id):
"""
Retrieve all versions of STIX object from file system.
Parameters:
- stix_id (str): STIX object identifier
Returns:
List of STIX objects (all versions)
"""
def query(self, query=None):
"""
Query STIX objects from file system.
Parameters:
- query (list): List of Filter objects
Returns:
List of matching STIX objects
"""
class FileSystemSource:
"""
File system data source for reading STIX objects.
Constructor Parameters:
- stix_dir (str): Directory path containing STIX files
- allow_custom (bool): Allow custom STIX objects
"""
class FileSystemSink:
"""
File system data sink for storing STIX objects.
Constructor Parameters:
- stix_dir (str): Directory path for storing STIX files
- allow_custom (bool): Allow custom STIX objects
- bundlify (bool): Store objects in STIX bundles
"""Usage examples:
from stix2 import FileSystemStore, Indicator, Filter
import tempfile
import os
# Create temporary directory for demo
stix_dir = tempfile.mkdtemp()
# Create file system store
fs_store = FileSystemStore(stix_dir)
# Add objects (stored as JSON files)
indicator = Indicator(
name="File Hash Indicator",
indicator_types=["malicious-activity"],
pattern_type="stix",
pattern="[file:hashes.MD5 = 'abc123']"
)
fs_store.add(indicator)
# Retrieve from file system
retrieved = fs_store.get(indicator.id)
print(retrieved.name) # "File Hash Indicator"
# Query file system
indicators = fs_store.query([
Filter('type', '=', 'indicator')
])
# List files created
print(os.listdir(stix_dir))
# Output: ['indicator/indicator--<uuid>.json']
# Store in bundles
bundled_store = FileSystemStore(stix_dir, bundlify=True)
bundled_store.add([indicator])Integration with TAXII (Trusted Automated eXchange of Intelligence Information) servers.
class TAXIICollectionStore:
"""
TAXII collection data store for STIX objects.
Constructor Parameters:
- collection: TAXII collection object
- allow_custom (bool): Allow custom STIX objects
"""
def add(self, stix_data):
"""
Add STIX objects to TAXII collection.
Parameters:
- stix_data: STIX object(s) to publish
"""
def get(self, stix_id, version=None):
"""
Retrieve STIX object from TAXII collection.
Parameters:
- stix_id (str): STIX object identifier
- version (str): Specific version timestamp
Returns:
STIX object or None if not found
"""
def all_versions(self, stix_id):
"""
Retrieve all versions from TAXII collection.
Parameters:
- stix_id (str): STIX object identifier
Returns:
List of STIX objects (all versions)
"""
def query(self, query=None):
"""
Query STIX objects from TAXII collection.
Parameters:
- query (list): List of Filter objects
Returns:
List of matching STIX objects
"""
class TAXIICollectionSource:
"""
TAXII collection data source for reading STIX objects.
Constructor Parameters:
- collection: TAXII collection object
- allow_custom (bool): Allow custom STIX objects
"""
class TAXIICollectionSink:
"""
TAXII collection data sink for publishing STIX objects.
Constructor Parameters:
- collection: TAXII collection object
- allow_custom (bool): Allow custom STIX objects
"""Usage examples:
from stix2 import TAXIICollectionStore, Indicator
from taxii2client.v20 import Collection
# Connect to TAXII server (requires taxii2-client)
collection = Collection(
"https://taxii-server.com/api/v20/collections/threat-intel/",
user="username",
password="password"
)
# Create TAXII store
taxii_store = TAXIICollectionStore(collection)
# Add objects (publishes to TAXII server)
indicator = Indicator(
name="Network Indicator",
indicator_types=["malicious-activity"],
pattern_type="stix",
pattern="[ipv4-addr:value = '10.0.0.1']"
)
taxii_store.add(indicator)
# Query TAXII collection
results = taxii_store.query([
Filter('type', '=', 'indicator'),
Filter('created', '>', '2021-01-01T00:00:00.000Z')
])Combine multiple data sources for federated querying.
class CompositeDataSource:
"""
Composite data source combining multiple sources.
Constructor Parameters:
- data_sources (list): List of data source objects
"""
def add_data_source(self, data_source):
"""
Add data source to composite.
Parameters:
- data_source: DataSource object to add
"""
def add_data_sources(self, data_sources):
"""
Add multiple data sources to composite.
Parameters:
- data_sources (list): List of DataSource objects
"""
def remove_data_source(self, data_source_id):
"""
Remove data source from composite.
Parameters:
- data_source_id (str): ID of data source to remove
"""
def get(self, stix_id, version=None):
"""
Retrieve STIX object from any source.
Parameters:
- stix_id (str): STIX object identifier
- version (str): Specific version timestamp
Returns:
STIX object or None if not found
"""
def all_versions(self, stix_id):
"""
Retrieve all versions from all sources.
Parameters:
- stix_id (str): STIX object identifier
Returns:
List of STIX objects (all versions)
"""
def query(self, query=None):
"""
Query STIX objects across all sources.
Parameters:
- query (list): List of Filter objects
Returns:
List of matching STIX objects from all sources
"""Usage examples:
from stix2 import CompositeDataSource, MemorySource, FileSystemSource
# Create individual sources
memory_source = MemorySource([indicator1, malware1])
fs_source = FileSystemSource("/path/to/stix/data")
# Create composite source
composite = CompositeDataSource()
composite.add_data_sources([memory_source, fs_source])
# Query across all sources
all_indicators = composite.query([
Filter('type', '=', 'indicator')
])
# Get object from any source
obj = composite.get("indicator--12345678-1234-1234-1234-123456789012")Powerful filtering system for querying STIX objects.
class Filter:
"""
Filter for querying STIX objects.
Constructor Parameters:
- property (str): STIX object property to filter on
- op (str): Comparison operator ("=", "!=", ">", "<", ">=", "<=", "in", "contains")
- value: Value to compare against
"""
def __init__(self, property, op, value):
"""
Create filter for STIX object queries.
Parameters:
- property (str): Object property name (supports dot notation)
- op (str): Comparison operator
- value: Comparison value
"""Usage examples:
from stix2 import Filter, MemoryStore
# Basic filters
type_filter = Filter('type', '=', 'indicator')
name_filter = Filter('name', 'contains', 'malicious')
date_filter = Filter('created', '>', '2021-01-01T00:00:00.000Z')
# Property path filters (dot notation)
hash_filter = Filter('pattern', 'contains', 'file:hashes.MD5')
tlp_filter = Filter('object_marking_refs', '=', 'marking-definition--f88d31f6-486f-44da-b317-01333bde0b82')
# Nested property filters
extensions_filter = Filter('extensions.ntfs-ext.alternate_data_streams', '!=', None)
# Query with multiple filters (AND logic)
results = store.query([
Filter('type', '=', 'malware'),
Filter('malware_types', 'contains', 'trojan'),
Filter('created', '>', '2020-01-01T00:00:00.000Z')
])
# Supported operators
equality = Filter('name', '=', 'Zeus')
inequality = Filter('confidence', '!=', 0)
greater_than = Filter('created', '>', '2021-01-01T00:00:00.000Z')
less_than = Filter('modified', '<', '2022-01-01T00:00:00.000Z')
greater_equal = Filter('confidence', '>=', 50)
less_equal = Filter('confidence', '<=', 90)
contains = Filter('name', 'contains', 'APT')
in_list = Filter('malware_types', 'in', ['trojan', 'backdoor'])Configurable environment for object creation and parsing.
class Environment:
"""
Environment for STIX object creation and management.
Constructor Parameters:
- factory (ObjectFactory): Object factory for creating objects
- store (DataStore): Data store for object persistence
- allow_custom (bool): Allow custom STIX objects
- version (str): Default STIX specification version
"""
def add_filters(self, *args):
"""Add filters to the environment."""
def add_filter(self, filter):
"""Add single filter to the environment."""
def parse(self, stix_data, version=None):
"""Parse STIX data using environment settings."""
def create(self, *args, **kwargs):
"""Create STIX object using environment factory."""
def get(self, stix_id, version=None):
"""Get STIX object from environment store."""
def all_versions(self, stix_id):
"""Get all versions from environment store."""
def query(self, query=None):
"""Query environment store."""
def add(self, stix_data):
"""Add objects to environment store."""
class ObjectFactory:
"""
Factory for creating STIX objects with consistent settings.
Constructor Parameters:
- created_by_ref (str): Default creator identity reference
- created (timestamp): Default creation timestamp
- external_references (list): Default external references
- object_marking_refs (list): Default object markings
- list_append (bool): Append to lists vs replace
"""
def create(self, cls, **kwargs):
"""
Create STIX object with factory defaults.
Parameters:
- cls: STIX object class
- **kwargs: Object-specific properties
Returns:
STIX object instance
"""Usage examples:
from stix2 import Environment, ObjectFactory, MemoryStore, Identity
# Create identity for authorship
identity = Identity(
name="ACME Corp Security Team",
identity_class="organization"
)
# Create object factory with defaults
factory = ObjectFactory(
created_by_ref=identity.id,
object_marking_refs=["marking-definition--f88d31f6-486f-44da-b317-01333bde0b82"] # TLP:WHITE
)
# Create environment with store and factory
env = Environment(
factory=factory,
store=MemoryStore()
)
# Create objects using environment (inherits defaults)
indicator = env.create(Indicator,
name="Suspicious Domain",
indicator_types=["malicious-activity"],
pattern_type="stix",
pattern="[domain-name:value = 'evil.com']"
)
# Object automatically has created_by_ref and markings
print(indicator.created_by_ref) # identity--<uuid>
print(indicator.object_marking_refs) # ["marking-definition--f88d31f6..."]
# Store and retrieve using environment
env.add(indicator)
retrieved = env.get(indicator.id)Install with Tessl CLI
npx tessl i tessl/pypi-stix2