Python TAXII 2.X client library for sharing cyber threat intelligence via STIX protocol
Pagination utilities for handling large result sets across different TAXII versions with automatic page traversal. The as_pages function provides a consistent interface for paginated requests regardless of TAXII version.
TAXII 2.1 uses limit/next parameters for pagination with server-driven continuation tokens.
def as_pages(func, per_request=0, *args, **kwargs):
"""
Generator for TAXII 2.1 endpoints supporting pagination.
Parameters:
- func (callable): Collection method supporting pagination (get_objects, get_manifest)
- per_request (int): Number of items to request per page (0 for server default)
- *args: Positional arguments to pass to the function
- **kwargs: Keyword arguments to pass to the function (filters, etc.)
Yields:
dict: Response envelope for each page containing objects and metadata
Note:
- Automatically handles 'next' tokens from server responses
- Adjusts per_request if server returns different amount than requested
- Stops when server indicates no more pages available
"""TAXII 2.0 uses start/per_request parameters with HTTP Range headers for pagination.
def as_pages(func, start=0, per_request=0, *args, **kwargs):
"""
Generator for TAXII 2.0 endpoints supporting pagination.
Parameters:
- func (callable): Collection method supporting pagination (get_objects, get_manifest)
- start (int): Starting index for pagination (default: 0)
- per_request (int): Number of items to request per page (0 for server default)
- *args: Positional arguments to pass to the function
- **kwargs: Keyword arguments to pass to the function (filters, etc.)
Yields:
dict: Response bundle for each page containing objects and metadata
Note:
- Uses HTTP Content-Range headers to determine total available items
- Automatically calculates next start position
- Handles server-specific Range header format variations
"""from taxii2client import Collection, as_pages
collection = Collection("https://taxii-server.example.com/taxii2/api1/collections/indicators/")
# Paginate through all objects with default page size
total_objects = 0
for page in as_pages(collection.get_objects):
objects = page.get('objects', [])
total_objects += len(objects)
print(f"Page contains {len(objects)} objects (total so far: {total_objects})")
# Process objects in this page
for obj in objects:
print(f" {obj.get('type')}: {obj.get('id')}")
print(f"Total objects retrieved: {total_objects}")# Request 50 objects per page
for page_num, page in enumerate(as_pages(collection.get_objects, per_request=50), 1):
objects = page.get('objects', [])
print(f"Page {page_num}: {len(objects)} objects")
# Check if this is the last page
if not page.get('more', False): # TAXII 2.1
print("This is the last page")
break
# Request 100 objects per page with filter
for page in as_pages(collection.get_objects, per_request=100, type="indicator"):
indicators = page.get('objects', [])
print(f"Retrieved {len(indicators)} indicators")# Paginate through object manifests instead of full objects
total_manifests = 0
for page in as_pages(collection.get_manifest, per_request=200):
manifests = page.get('objects', []) # Manifests are in 'objects' array
total_manifests += len(manifests)
print(f"Manifest page: {len(manifests)} objects")
for manifest in manifests:
obj_id = manifest.get('id')
versions = manifest.get('versions', [])
print(f" {obj_id}: {len(versions)} versions")
print(f"Total objects in collection: {total_manifests}")from datetime import datetime, timezone
# Paginate with date filter
recent_date = datetime(2023, 1, 1, tzinfo=timezone.utc)
for page in as_pages(collection.get_objects, per_request=100, added_after=recent_date):
objects = page.get('objects', [])
print(f"Recent objects page: {len(objects)}")
# Paginate with type filter
for page in as_pages(collection.get_objects, per_request=50, type=["indicator", "malware"]):
objects = page.get('objects', [])
indicators = [obj for obj in objects if obj.get('type') == 'indicator']
malware = [obj for obj in objects if obj.get('type') == 'malware']
print(f"Page: {len(indicators)} indicators, {len(malware)} malware")
# Paginate with multiple filters
filters = {
'type': 'indicator',
'added_after': recent_date
}
for page in as_pages(collection.get_objects, per_request=75, **filters):
indicators = page.get('objects', [])
print(f"Recent indicators: {len(indicators)}")from taxii2client.v20 import Collection, as_pages
# For TAXII 2.0, as_pages uses start/per_request parameters
collection = Collection("https://taxii2-server.example.com/api1/collections/indicators/")
# Start from beginning with custom page size
for page in as_pages(collection.get_objects, start=0, per_request=100):
objects = page.get('objects', [])
print(f"TAXII 2.0 page: {len(objects)} objects")
# Start from specific offset
for page in as_pages(collection.get_objects, start=500, per_request=50):
objects = page.get('objects', [])
print(f"Starting from offset 500: {len(objects)} objects")import time
from datetime import datetime
# Process very large collection with progress tracking
start_time = datetime.now()
total_processed = 0
page_count = 0
try:
for page in as_pages(collection.get_objects, per_request=1000):
page_count += 1
objects = page.get('objects', [])
# Process objects in batch
for obj in objects:
# Your processing logic here
process_stix_object(obj)
total_processed += len(objects)
elapsed = (datetime.now() - start_time).total_seconds()
rate = total_processed / elapsed if elapsed > 0 else 0
print(f"Page {page_count}: Processed {len(objects)} objects")
print(f" Total: {total_processed} objects in {elapsed:.1f}s ({rate:.1f} obj/s)")
# Optional: Add delay to avoid overwhelming the server
time.sleep(0.1)
except KeyboardInterrupt:
print(f"\nInterrupted after processing {total_processed} objects")
except Exception as e:
print(f"Error during pagination: {e}")
print(f"Final: Processed {total_processed} objects across {page_count} pages")# Process large datasets without storing everything in memory
def process_collection_efficiently(collection, batch_size=500):
"""Process all objects in collection without loading everything into memory."""
processed_count = 0
error_count = 0
for page in as_pages(collection.get_objects, per_request=batch_size):
objects = page.get('objects', [])
for obj in objects:
try:
# Process individual object
result = analyze_stix_object(obj)
if result:
processed_count += 1
except Exception as e:
print(f"Error processing {obj.get('id', 'unknown')}: {e}")
error_count += 1
# Clear page from memory
del objects
# Periodic status update
if processed_count % 5000 == 0:
print(f"Processed: {processed_count}, Errors: {error_count}")
return processed_count, error_count
# Use the efficient processor
success_count, error_count = process_collection_efficiently(collection, batch_size=1000)
print(f"Processing complete: {success_count} successful, {error_count} errors")from taxii2client.exceptions import TAXIIServiceException
def robust_pagination(collection, page_size=100):
"""Paginate with error handling and retry logic."""
page_count = 0
total_objects = 0
retry_count = 0
max_retries = 3
try:
for page in as_pages(collection.get_objects, per_request=page_size):
try:
objects = page.get('objects', [])
page_count += 1
total_objects += len(objects)
print(f"Page {page_count}: {len(objects)} objects")
# Reset retry count on successful page
retry_count = 0
except TAXIIServiceException as e:
retry_count += 1
print(f"TAXII error on page {page_count + 1}: {e}")
if retry_count >= max_retries:
print(f"Max retries ({max_retries}) exceeded, stopping")
break
print(f"Retrying page {page_count + 1} (attempt {retry_count + 1})")
time.sleep(2 ** retry_count) # Exponential backoff
except Exception as e:
print(f"Unexpected error during pagination: {e}")
return total_objects, page_count
total, pages = robust_pagination(collection, page_size=500)
print(f"Retrieved {total} objects across {pages} pages")# Adapt page size based on server behavior
def adaptive_pagination(collection, initial_page_size=100):
"""Automatically adjust page size based on server responses."""
page_size = initial_page_size
total_objects = 0
for page_num, page in enumerate(as_pages(collection.get_objects, per_request=page_size), 1):
objects = page.get('objects', [])
actual_size = len(objects)
total_objects += actual_size
print(f"Page {page_num}: requested {page_size}, got {actual_size}")
# Adjust page size based on server response
if actual_size < page_size * 0.5 and page_size > 50:
# Server returned much less than requested, reduce page size
page_size = max(50, page_size // 2)
print(f" Reducing page size to {page_size}")
elif actual_size == page_size and page_size < 1000:
# Server returned exactly what we asked for, try larger pages
page_size = min(1000, int(page_size * 1.5))
print(f" Increasing page size to {page_size}")
return total_objects
total = adaptive_pagination(collection)
print(f"Total objects retrieved with adaptive pagination: {total}")Install with Tessl CLI
npx tessl i tessl/pypi-taxii2-client