0
# Data Storage and Retrieval
1
2
Flexible data store backends for persisting and querying STIX objects including in-memory storage, file system storage, and TAXII server integration with comprehensive filtering and search capabilities.
3
4
## Capabilities
5
6
### Memory Data Store
7
8
In-memory storage for STIX objects with fast access and querying capabilities.
9
10
```python { .api }
11
class MemoryStore:
12
"""
13
In-memory data store for STIX objects.
14
15
Constructor Parameters:
16
- stix_data (list): Initial STIX objects to store
17
"""
18
19
def add(self, stix_data):
20
"""
21
Add STIX objects to the store.
22
23
Parameters:
24
- stix_data: STIX object(s) to add
25
"""
26
27
def get(self, stix_id, version=None):
28
"""
29
Retrieve STIX object by ID.
30
31
Parameters:
32
- stix_id (str): STIX object identifier
33
- version (str): Specific version timestamp
34
35
Returns:
36
STIX object or None if not found
37
"""
38
39
def all_versions(self, stix_id):
40
"""
41
Retrieve all versions of a STIX object.
42
43
Parameters:
44
- stix_id (str): STIX object identifier
45
46
Returns:
47
List of STIX objects (all versions)
48
"""
49
50
def query(self, query=None):
51
"""
52
Query STIX objects using filters.
53
54
Parameters:
55
- query (list): List of Filter objects
56
57
Returns:
58
List of matching STIX objects
59
"""
60
61
class MemorySource:
62
"""
63
In-memory data source for reading STIX objects.
64
65
Constructor Parameters:
66
- stix_data (list): STIX objects to serve
67
- _store (MemoryStore): Memory store instance
68
"""
69
70
class MemorySink:
71
"""
72
In-memory data sink for storing STIX objects.
73
74
Constructor Parameters:
75
- stix_data (list): Initial STIX objects
76
- _store (MemoryStore): Memory store instance
77
"""
78
```
79
80
Usage examples:
81
82
```python
83
from stix2 import MemoryStore, Indicator, Malware, Filter
84
85
# Create memory store
86
memory_store = MemoryStore()
87
88
# Add objects
89
indicator = Indicator(
90
name="Malicious IP",
91
indicator_types=["malicious-activity"],
92
pattern_type="stix",
93
pattern="[ipv4-addr:value = '192.168.1.100']"
94
)
95
96
malware = Malware(
97
name="Poison Ivy",
98
malware_types=["remote-access-trojan"]
99
)
100
101
memory_store.add([indicator, malware])
102
103
# Retrieve by ID
104
retrieved = memory_store.get(indicator.id)
105
print(retrieved.name) # "Malicious IP"
106
107
# Query with filters
108
indicators = memory_store.query([
109
Filter('type', '=', 'indicator')
110
])
111
112
malware_objects = memory_store.query([
113
Filter('type', '=', 'malware'),
114
Filter('name', '=', 'Poison Ivy')
115
])
116
117
# Initialize with data
118
initial_data = [indicator, malware]
119
store_with_data = MemoryStore(initial_data)
120
```
121
122
### File System Data Store
123
124
File system-based storage for persistent STIX object storage.
125
126
```python { .api }
127
class FileSystemStore:
128
"""
129
File system data store for STIX objects.
130
131
Constructor Parameters:
132
- stix_dir (str): Directory path for STIX files
133
- allow_custom (bool): Allow custom STIX objects
134
- bundlify (bool): Store objects in STIX bundles
135
"""
136
137
def add(self, stix_data):
138
"""
139
Add STIX objects to file system.
140
141
Parameters:
142
- stix_data: STIX object(s) to store
143
"""
144
145
def get(self, stix_id, version=None):
146
"""
147
Retrieve STIX object from file system.
148
149
Parameters:
150
- stix_id (str): STIX object identifier
151
- version (str): Specific version timestamp
152
153
Returns:
154
STIX object or None if not found
155
"""
156
157
def all_versions(self, stix_id):
158
"""
159
Retrieve all versions of STIX object from file system.
160
161
Parameters:
162
- stix_id (str): STIX object identifier
163
164
Returns:
165
List of STIX objects (all versions)
166
"""
167
168
def query(self, query=None):
169
"""
170
Query STIX objects from file system.
171
172
Parameters:
173
- query (list): List of Filter objects
174
175
Returns:
176
List of matching STIX objects
177
"""
178
179
class FileSystemSource:
180
"""
181
File system data source for reading STIX objects.
182
183
Constructor Parameters:
184
- stix_dir (str): Directory path containing STIX files
185
- allow_custom (bool): Allow custom STIX objects
186
"""
187
188
class FileSystemSink:
189
"""
190
File system data sink for storing STIX objects.
191
192
Constructor Parameters:
193
- stix_dir (str): Directory path for storing STIX files
194
- allow_custom (bool): Allow custom STIX objects
195
- bundlify (bool): Store objects in STIX bundles
196
"""
197
```
198
199
Usage examples:
200
201
```python
202
from stix2 import FileSystemStore, Indicator, Filter
203
import tempfile
204
import os
205
206
# Create temporary directory for demo
207
stix_dir = tempfile.mkdtemp()
208
209
# Create file system store
210
fs_store = FileSystemStore(stix_dir)
211
212
# Add objects (stored as JSON files)
213
indicator = Indicator(
214
name="File Hash Indicator",
215
indicator_types=["malicious-activity"],
216
pattern_type="stix",
217
pattern="[file:hashes.MD5 = 'abc123']"
218
)
219
220
fs_store.add(indicator)
221
222
# Retrieve from file system
223
retrieved = fs_store.get(indicator.id)
224
print(retrieved.name) # "File Hash Indicator"
225
226
# Query file system
227
indicators = fs_store.query([
228
Filter('type', '=', 'indicator')
229
])
230
231
# List files created
232
print(os.listdir(stix_dir))
233
# Output: ['indicator/indicator--<uuid>.json']
234
235
# Store in bundles
236
bundled_store = FileSystemStore(stix_dir, bundlify=True)
237
bundled_store.add([indicator])
238
```
239
240
### TAXII Data Store
241
242
Integration with TAXII (Trusted Automated eXchange of Intelligence Information) servers.
243
244
```python { .api }
245
class TAXIICollectionStore:
246
"""
247
TAXII collection data store for STIX objects.
248
249
Constructor Parameters:
250
- collection: TAXII collection object
251
- allow_custom (bool): Allow custom STIX objects
252
"""
253
254
def add(self, stix_data):
255
"""
256
Add STIX objects to TAXII collection.
257
258
Parameters:
259
- stix_data: STIX object(s) to publish
260
"""
261
262
def get(self, stix_id, version=None):
263
"""
264
Retrieve STIX object from TAXII collection.
265
266
Parameters:
267
- stix_id (str): STIX object identifier
268
- version (str): Specific version timestamp
269
270
Returns:
271
STIX object or None if not found
272
"""
273
274
def all_versions(self, stix_id):
275
"""
276
Retrieve all versions from TAXII collection.
277
278
Parameters:
279
- stix_id (str): STIX object identifier
280
281
Returns:
282
List of STIX objects (all versions)
283
"""
284
285
def query(self, query=None):
286
"""
287
Query STIX objects from TAXII collection.
288
289
Parameters:
290
- query (list): List of Filter objects
291
292
Returns:
293
List of matching STIX objects
294
"""
295
296
class TAXIICollectionSource:
297
"""
298
TAXII collection data source for reading STIX objects.
299
300
Constructor Parameters:
301
- collection: TAXII collection object
302
- allow_custom (bool): Allow custom STIX objects
303
"""
304
305
class TAXIICollectionSink:
306
"""
307
TAXII collection data sink for publishing STIX objects.
308
309
Constructor Parameters:
310
- collection: TAXII collection object
311
- allow_custom (bool): Allow custom STIX objects
312
"""
313
```
314
315
Usage examples:
316
317
```python
318
from stix2 import TAXIICollectionStore, Indicator
319
from taxii2client.v20 import Collection
320
321
# Connect to TAXII server (requires taxii2-client)
322
collection = Collection(
323
"https://taxii-server.com/api/v20/collections/threat-intel/",
324
user="username",
325
password="password"
326
)
327
328
# Create TAXII store
329
taxii_store = TAXIICollectionStore(collection)
330
331
# Add objects (publishes to TAXII server)
332
indicator = Indicator(
333
name="Network Indicator",
334
indicator_types=["malicious-activity"],
335
pattern_type="stix",
336
pattern="[ipv4-addr:value = '10.0.0.1']"
337
)
338
339
taxii_store.add(indicator)
340
341
# Query TAXII collection
342
results = taxii_store.query([
343
Filter('type', '=', 'indicator'),
344
Filter('created', '>', '2021-01-01T00:00:00.000Z')
345
])
346
```
347
348
### Composite Data Source
349
350
Combine multiple data sources for federated querying.
351
352
```python { .api }
353
class CompositeDataSource:
354
"""
355
Composite data source combining multiple sources.
356
357
Constructor Parameters:
358
- data_sources (list): List of data source objects
359
"""
360
361
def add_data_source(self, data_source):
362
"""
363
Add data source to composite.
364
365
Parameters:
366
- data_source: DataSource object to add
367
"""
368
369
def add_data_sources(self, data_sources):
370
"""
371
Add multiple data sources to composite.
372
373
Parameters:
374
- data_sources (list): List of DataSource objects
375
"""
376
377
def remove_data_source(self, data_source_id):
378
"""
379
Remove data source from composite.
380
381
Parameters:
382
- data_source_id (str): ID of data source to remove
383
"""
384
385
def get(self, stix_id, version=None):
386
"""
387
Retrieve STIX object from any source.
388
389
Parameters:
390
- stix_id (str): STIX object identifier
391
- version (str): Specific version timestamp
392
393
Returns:
394
STIX object or None if not found
395
"""
396
397
def all_versions(self, stix_id):
398
"""
399
Retrieve all versions from all sources.
400
401
Parameters:
402
- stix_id (str): STIX object identifier
403
404
Returns:
405
List of STIX objects (all versions)
406
"""
407
408
def query(self, query=None):
409
"""
410
Query STIX objects across all sources.
411
412
Parameters:
413
- query (list): List of Filter objects
414
415
Returns:
416
List of matching STIX objects from all sources
417
"""
418
```
419
420
Usage examples:
421
422
```python
423
from stix2 import CompositeDataSource, MemorySource, FileSystemSource
424
425
# Create individual sources
426
memory_source = MemorySource([indicator1, malware1])
427
fs_source = FileSystemSource("/path/to/stix/data")
428
429
# Create composite source
430
composite = CompositeDataSource()
431
composite.add_data_sources([memory_source, fs_source])
432
433
# Query across all sources
434
all_indicators = composite.query([
435
Filter('type', '=', 'indicator')
436
])
437
438
# Get object from any source
439
obj = composite.get("indicator--12345678-1234-1234-1234-123456789012")
440
```
441
442
### Data Filtering
443
444
Powerful filtering system for querying STIX objects.
445
446
```python { .api }
447
class Filter:
448
"""
449
Filter for querying STIX objects.
450
451
Constructor Parameters:
452
- property (str): STIX object property to filter on
453
- op (str): Comparison operator ("=", "!=", ">", "<", ">=", "<=", "in", "contains")
454
- value: Value to compare against
455
"""
456
457
def __init__(self, property, op, value):
458
"""
459
Create filter for STIX object queries.
460
461
Parameters:
462
- property (str): Object property name (supports dot notation)
463
- op (str): Comparison operator
464
- value: Comparison value
465
"""
466
```
467
468
Usage examples:
469
470
```python
471
from stix2 import Filter, MemoryStore
472
473
# Basic filters
474
type_filter = Filter('type', '=', 'indicator')
475
name_filter = Filter('name', 'contains', 'malicious')
476
date_filter = Filter('created', '>', '2021-01-01T00:00:00.000Z')
477
478
# Property path filters (dot notation)
479
hash_filter = Filter('pattern', 'contains', 'file:hashes.MD5')
480
tlp_filter = Filter('object_marking_refs', '=', 'marking-definition--f88d31f6-486f-44da-b317-01333bde0b82')
481
482
# Nested property filters
483
extensions_filter = Filter('extensions.ntfs-ext.alternate_data_streams', '!=', None)
484
485
# Query with multiple filters (AND logic)
486
results = store.query([
487
Filter('type', '=', 'malware'),
488
Filter('malware_types', 'contains', 'trojan'),
489
Filter('created', '>', '2020-01-01T00:00:00.000Z')
490
])
491
492
# Supported operators
493
equality = Filter('name', '=', 'Zeus')
494
inequality = Filter('confidence', '!=', 0)
495
greater_than = Filter('created', '>', '2021-01-01T00:00:00.000Z')
496
less_than = Filter('modified', '<', '2022-01-01T00:00:00.000Z')
497
greater_equal = Filter('confidence', '>=', 50)
498
less_equal = Filter('confidence', '<=', 90)
499
contains = Filter('name', 'contains', 'APT')
500
in_list = Filter('malware_types', 'in', ['trojan', 'backdoor'])
501
```
502
503
## Environment and Object Factory
504
505
Configurable environment for object creation and parsing.
506
507
```python { .api }
508
class Environment:
509
"""
510
Environment for STIX object creation and management.
511
512
Constructor Parameters:
513
- factory (ObjectFactory): Object factory for creating objects
514
- store (DataStore): Data store for object persistence
515
- allow_custom (bool): Allow custom STIX objects
516
- version (str): Default STIX specification version
517
"""
518
519
def add_filters(self, *args):
520
"""Add filters to the environment."""
521
522
def add_filter(self, filter):
523
"""Add single filter to the environment."""
524
525
def parse(self, stix_data, version=None):
526
"""Parse STIX data using environment settings."""
527
528
def create(self, *args, **kwargs):
529
"""Create STIX object using environment factory."""
530
531
def get(self, stix_id, version=None):
532
"""Get STIX object from environment store."""
533
534
def all_versions(self, stix_id):
535
"""Get all versions from environment store."""
536
537
def query(self, query=None):
538
"""Query environment store."""
539
540
def add(self, stix_data):
541
"""Add objects to environment store."""
542
543
class ObjectFactory:
544
"""
545
Factory for creating STIX objects with consistent settings.
546
547
Constructor Parameters:
548
- created_by_ref (str): Default creator identity reference
549
- created (timestamp): Default creation timestamp
550
- external_references (list): Default external references
551
- object_marking_refs (list): Default object markings
552
- list_append (bool): Append to lists vs replace
553
"""
554
555
def create(self, cls, **kwargs):
556
"""
557
Create STIX object with factory defaults.
558
559
Parameters:
560
- cls: STIX object class
561
- **kwargs: Object-specific properties
562
563
Returns:
564
STIX object instance
565
"""
566
```
567
568
Usage examples:
569
570
```python
571
from stix2 import Environment, ObjectFactory, MemoryStore, Identity
572
573
# Create identity for authorship
574
identity = Identity(
575
name="ACME Corp Security Team",
576
identity_class="organization"
577
)
578
579
# Create object factory with defaults
580
factory = ObjectFactory(
581
created_by_ref=identity.id,
582
object_marking_refs=["marking-definition--f88d31f6-486f-44da-b317-01333bde0b82"] # TLP:WHITE
583
)
584
585
# Create environment with store and factory
586
env = Environment(
587
factory=factory,
588
store=MemoryStore()
589
)
590
591
# Create objects using environment (inherits defaults)
592
indicator = env.create(Indicator,
593
name="Suspicious Domain",
594
indicator_types=["malicious-activity"],
595
pattern_type="stix",
596
pattern="[domain-name:value = 'evil.com']"
597
)
598
599
# Object automatically has created_by_ref and markings
600
print(indicator.created_by_ref) # identity--<uuid>
601
print(indicator.object_marking_refs) # ["marking-definition--f88d31f6..."]
602
603
# Store and retrieve using environment
604
env.add(indicator)
605
retrieved = env.get(indicator.id)
606
```