CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-stix2

Produce and consume STIX 2 JSON content for cyber threat intelligence

Overview
Eval results
Files

data-storage.mddocs/

Data Storage and Retrieval

Flexible data store backends for persisting and querying STIX objects including in-memory storage, file system storage, and TAXII server integration with comprehensive filtering and search capabilities.

Capabilities

Memory Data Store

In-memory storage for STIX objects with fast access and querying capabilities.

class MemoryStore:
    """
    In-memory data store for STIX objects.
    
    Constructor Parameters:
    - stix_data (list): Initial STIX objects to store
    """
    
    def add(self, stix_data):
        """
        Add STIX objects to the store.
        
        Parameters:
        - stix_data: STIX object(s) to add
        """
    
    def get(self, stix_id, version=None):
        """
        Retrieve STIX object by ID.
        
        Parameters:
        - stix_id (str): STIX object identifier
        - version (str): Specific version timestamp
        
        Returns:
        STIX object or None if not found
        """
    
    def all_versions(self, stix_id):
        """
        Retrieve all versions of a STIX object.
        
        Parameters:
        - stix_id (str): STIX object identifier
        
        Returns:
        List of STIX objects (all versions)
        """
    
    def query(self, query=None):
        """
        Query STIX objects using filters.
        
        Parameters:
        - query (list): List of Filter objects
        
        Returns:
        List of matching STIX objects
        """

class MemorySource:
    """
    In-memory data source for reading STIX objects.
    
    Constructor Parameters:
    - stix_data (list): STIX objects to serve
    - _store (MemoryStore): Memory store instance
    """

class MemorySink:
    """
    In-memory data sink for storing STIX objects.
    
    Constructor Parameters:
    - stix_data (list): Initial STIX objects
    - _store (MemoryStore): Memory store instance
    """

Usage examples:

from stix2 import MemoryStore, Indicator, Malware, Filter

# Create memory store
memory_store = MemoryStore()

# Add objects
indicator = Indicator(
    name="Malicious IP",
    indicator_types=["malicious-activity"],
    pattern_type="stix",
    pattern="[ipv4-addr:value = '192.168.1.100']"
)

malware = Malware(
    name="Poison Ivy", 
    malware_types=["remote-access-trojan"]
)

memory_store.add([indicator, malware])

# Retrieve by ID
retrieved = memory_store.get(indicator.id)
print(retrieved.name)  # "Malicious IP"

# Query with filters
indicators = memory_store.query([
    Filter('type', '=', 'indicator')
])

malware_objects = memory_store.query([
    Filter('type', '=', 'malware'),
    Filter('name', '=', 'Poison Ivy')
])

# Initialize with data
initial_data = [indicator, malware]
store_with_data = MemoryStore(initial_data)

File System Data Store

File system-based storage for persistent STIX object storage.

class FileSystemStore:
    """
    File system data store for STIX objects.
    
    Constructor Parameters:
    - stix_dir (str): Directory path for STIX files
    - allow_custom (bool): Allow custom STIX objects
    - bundlify (bool): Store objects in STIX bundles
    """
    
    def add(self, stix_data):
        """
        Add STIX objects to file system.
        
        Parameters:
        - stix_data: STIX object(s) to store
        """
    
    def get(self, stix_id, version=None):
        """
        Retrieve STIX object from file system.
        
        Parameters:
        - stix_id (str): STIX object identifier
        - version (str): Specific version timestamp
        
        Returns:
        STIX object or None if not found
        """
    
    def all_versions(self, stix_id):
        """
        Retrieve all versions of STIX object from file system.
        
        Parameters:
        - stix_id (str): STIX object identifier
        
        Returns:
        List of STIX objects (all versions)
        """
    
    def query(self, query=None):
        """
        Query STIX objects from file system.
        
        Parameters:
        - query (list): List of Filter objects
        
        Returns:
        List of matching STIX objects
        """

class FileSystemSource:
    """
    File system data source for reading STIX objects.
    
    Constructor Parameters:
    - stix_dir (str): Directory path containing STIX files
    - allow_custom (bool): Allow custom STIX objects
    """

class FileSystemSink:
    """
    File system data sink for storing STIX objects.
    
    Constructor Parameters:
    - stix_dir (str): Directory path for storing STIX files
    - allow_custom (bool): Allow custom STIX objects
    - bundlify (bool): Store objects in STIX bundles
    """

Usage examples:

from stix2 import FileSystemStore, Indicator, Filter
import tempfile
import os

# Create temporary directory for demo
stix_dir = tempfile.mkdtemp()

# Create file system store
fs_store = FileSystemStore(stix_dir)

# Add objects (stored as JSON files)
indicator = Indicator(
    name="File Hash Indicator",
    indicator_types=["malicious-activity"],
    pattern_type="stix",
    pattern="[file:hashes.MD5 = 'abc123']"
)

fs_store.add(indicator)

# Retrieve from file system
retrieved = fs_store.get(indicator.id)
print(retrieved.name)  # "File Hash Indicator"

# Query file system
indicators = fs_store.query([
    Filter('type', '=', 'indicator')
])

# List files created
print(os.listdir(stix_dir))
# Output: ['indicator/indicator--<uuid>.json']

# Store in bundles
bundled_store = FileSystemStore(stix_dir, bundlify=True)
bundled_store.add([indicator])

TAXII Data Store

Integration with TAXII (Trusted Automated eXchange of Intelligence Information) servers.

class TAXIICollectionStore:
    """
    TAXII collection data store for STIX objects.
    
    Constructor Parameters:
    - collection: TAXII collection object
    - allow_custom (bool): Allow custom STIX objects
    """
    
    def add(self, stix_data):
        """
        Add STIX objects to TAXII collection.
        
        Parameters:
        - stix_data: STIX object(s) to publish
        """
    
    def get(self, stix_id, version=None):
        """
        Retrieve STIX object from TAXII collection.
        
        Parameters:
        - stix_id (str): STIX object identifier
        - version (str): Specific version timestamp
        
        Returns:
        STIX object or None if not found
        """
    
    def all_versions(self, stix_id):
        """
        Retrieve all versions from TAXII collection.
        
        Parameters:
        - stix_id (str): STIX object identifier
        
        Returns:
        List of STIX objects (all versions)
        """
    
    def query(self, query=None):
        """
        Query STIX objects from TAXII collection.
        
        Parameters:
        - query (list): List of Filter objects
        
        Returns:
        List of matching STIX objects
        """

class TAXIICollectionSource:
    """
    TAXII collection data source for reading STIX objects.
    
    Constructor Parameters:
    - collection: TAXII collection object
    - allow_custom (bool): Allow custom STIX objects
    """

class TAXIICollectionSink:
    """
    TAXII collection data sink for publishing STIX objects.
    
    Constructor Parameters:
    - collection: TAXII collection object
    - allow_custom (bool): Allow custom STIX objects
    """

Usage examples:

from stix2 import TAXIICollectionStore, Indicator
from taxii2client.v20 import Collection

# Connect to TAXII server (requires taxii2-client)
collection = Collection(
    "https://taxii-server.com/api/v20/collections/threat-intel/",
    user="username",
    password="password"
)

# Create TAXII store
taxii_store = TAXIICollectionStore(collection)

# Add objects (publishes to TAXII server)
indicator = Indicator(
    name="Network Indicator",
    indicator_types=["malicious-activity"],
    pattern_type="stix",
    pattern="[ipv4-addr:value = '10.0.0.1']"
)

taxii_store.add(indicator)

# Query TAXII collection
results = taxii_store.query([
    Filter('type', '=', 'indicator'),
    Filter('created', '>', '2021-01-01T00:00:00.000Z')
])

Composite Data Source

Combine multiple data sources for federated querying.

class CompositeDataSource:
    """
    Composite data source combining multiple sources.
    
    Constructor Parameters:
    - data_sources (list): List of data source objects
    """
    
    def add_data_source(self, data_source):
        """
        Add data source to composite.
        
        Parameters:
        - data_source: DataSource object to add
        """
    
    def add_data_sources(self, data_sources):
        """
        Add multiple data sources to composite.
        
        Parameters:
        - data_sources (list): List of DataSource objects
        """
    
    def remove_data_source(self, data_source_id):
        """
        Remove data source from composite.
        
        Parameters:
        - data_source_id (str): ID of data source to remove
        """
    
    def get(self, stix_id, version=None):
        """
        Retrieve STIX object from any source.
        
        Parameters:
        - stix_id (str): STIX object identifier
        - version (str): Specific version timestamp
        
        Returns:
        STIX object or None if not found
        """
    
    def all_versions(self, stix_id):
        """
        Retrieve all versions from all sources.
        
        Parameters:
        - stix_id (str): STIX object identifier
        
        Returns:
        List of STIX objects (all versions)
        """
    
    def query(self, query=None):
        """
        Query STIX objects across all sources.
        
        Parameters:
        - query (list): List of Filter objects
        
        Returns:
        List of matching STIX objects from all sources
        """

Usage examples:

from stix2 import CompositeDataSource, MemorySource, FileSystemSource

# Create individual sources
memory_source = MemorySource([indicator1, malware1])
fs_source = FileSystemSource("/path/to/stix/data")

# Create composite source
composite = CompositeDataSource()
composite.add_data_sources([memory_source, fs_source])

# Query across all sources
all_indicators = composite.query([
    Filter('type', '=', 'indicator')
])

# Get object from any source
obj = composite.get("indicator--12345678-1234-1234-1234-123456789012")

Data Filtering

Powerful filtering system for querying STIX objects.

class Filter:
    """
    Filter for querying STIX objects.
    
    Constructor Parameters:
    - property (str): STIX object property to filter on
    - op (str): Comparison operator ("=", "!=", ">", "<", ">=", "<=", "in", "contains")
    - value: Value to compare against
    """
    
    def __init__(self, property, op, value):
        """
        Create filter for STIX object queries.
        
        Parameters:
        - property (str): Object property name (supports dot notation)
        - op (str): Comparison operator
        - value: Comparison value
        """

Usage examples:

from stix2 import Filter, MemoryStore

# Basic filters
type_filter = Filter('type', '=', 'indicator')
name_filter = Filter('name', 'contains', 'malicious')
date_filter = Filter('created', '>', '2021-01-01T00:00:00.000Z')

# Property path filters (dot notation)
hash_filter = Filter('pattern', 'contains', 'file:hashes.MD5')
tlp_filter = Filter('object_marking_refs', '=', 'marking-definition--f88d31f6-486f-44da-b317-01333bde0b82')

# Nested property filters
extensions_filter = Filter('extensions.ntfs-ext.alternate_data_streams', '!=', None)

# Query with multiple filters (AND logic)
results = store.query([
    Filter('type', '=', 'malware'),
    Filter('malware_types', 'contains', 'trojan'),
    Filter('created', '>', '2020-01-01T00:00:00.000Z')
])

# Supported operators
equality = Filter('name', '=', 'Zeus')
inequality = Filter('confidence', '!=', 0)
greater_than = Filter('created', '>', '2021-01-01T00:00:00.000Z')
less_than = Filter('modified', '<', '2022-01-01T00:00:00.000Z')
greater_equal = Filter('confidence', '>=', 50)
less_equal = Filter('confidence', '<=', 90)
contains = Filter('name', 'contains', 'APT')
in_list = Filter('malware_types', 'in', ['trojan', 'backdoor'])

Environment and Object Factory

Configurable environment for object creation and parsing.

class Environment:
    """
    Environment for STIX object creation and management.
    
    Constructor Parameters:
    - factory (ObjectFactory): Object factory for creating objects
    - store (DataStore): Data store for object persistence
    - allow_custom (bool): Allow custom STIX objects
    - version (str): Default STIX specification version
    """
    
    def add_filters(self, *args):
        """Add filters to the environment."""
    
    def add_filter(self, filter):
        """Add single filter to the environment."""
    
    def parse(self, stix_data, version=None):
        """Parse STIX data using environment settings."""
    
    def create(self, *args, **kwargs):
        """Create STIX object using environment factory."""
    
    def get(self, stix_id, version=None):
        """Get STIX object from environment store."""
    
    def all_versions(self, stix_id):
        """Get all versions from environment store."""
    
    def query(self, query=None):
        """Query environment store."""
    
    def add(self, stix_data):
        """Add objects to environment store."""

class ObjectFactory:
    """
    Factory for creating STIX objects with consistent settings.
    
    Constructor Parameters:
    - created_by_ref (str): Default creator identity reference
    - created (timestamp): Default creation timestamp
    - external_references (list): Default external references
    - object_marking_refs (list): Default object markings
    - list_append (bool): Append to lists vs replace
    """
    
    def create(self, cls, **kwargs):
        """
        Create STIX object with factory defaults.
        
        Parameters:
        - cls: STIX object class
        - **kwargs: Object-specific properties
        
        Returns:
        STIX object instance
        """

Usage examples:

from stix2 import Environment, ObjectFactory, MemoryStore, Identity

# Create identity for authorship
identity = Identity(
    name="ACME Corp Security Team",
    identity_class="organization"
)

# Create object factory with defaults
factory = ObjectFactory(
    created_by_ref=identity.id,
    object_marking_refs=["marking-definition--f88d31f6-486f-44da-b317-01333bde0b82"]  # TLP:WHITE
)

# Create environment with store and factory
env = Environment(
    factory=factory,
    store=MemoryStore()
)

# Create objects using environment (inherits defaults)
indicator = env.create(Indicator,
    name="Suspicious Domain",
    indicator_types=["malicious-activity"],
    pattern_type="stix",
    pattern="[domain-name:value = 'evil.com']"
)

# Object automatically has created_by_ref and markings
print(indicator.created_by_ref)  # identity--<uuid>
print(indicator.object_marking_refs)  # ["marking-definition--f88d31f6..."]

# Store and retrieve using environment
env.add(indicator)
retrieved = env.get(indicator.id)

Install with Tessl CLI

npx tessl i tessl/pypi-stix2

docs

data-storage.md

equivalence.md

index.md

markings.md

object-creation.md

pattern-matching.md

relationships.md

stix-domain-objects.md

stix-observables.md

utilities.md

versioning.md

tile.json