Python TAXII 2.X client library for sharing cyber threat intelligence via STIX protocol
Status tracking for asynchronous operations including polling, completion checking, and result analysis. Status objects track the progress and results of TAXII operations like adding objects to collections.
Connect to a TAXII status endpoint to monitor asynchronous operations.
class Status:
def __init__(self, url, conn=None, user=None, password=None, verify=True,
proxies=None, status_info=None, auth=None, cert=None):
"""
Create a TAXII status endpoint connection.
Parameters:
- url (str): URL of TAXII status endpoint
- conn (_HTTPConnection, optional): Reuse existing connection
- user (str, optional): Username for HTTP basic authentication
- password (str, optional): Password for HTTP basic authentication
- verify (bool): Validate SSL certificates (default: True)
- proxies (dict, optional): HTTP/HTTPS proxy settings
- status_info (dict, optional): Pre-loaded status information
- auth (requests.auth.AuthBase, optional): Custom authentication object
- cert (str or tuple, optional): SSL client certificate path or (cert, key) tuple
"""Access detailed status information about asynchronous operations.
@property
def id(self) -> str:
"""Status operation identifier (required)."""
@property
def status(self) -> str:
"""Current status: 'pending', 'complete' (required)."""
@property
def request_timestamp(self) -> str:
"""ISO 8601 timestamp when request was initiated (optional)."""
@property
def total_count(self) -> int:
"""Total number of objects in the operation (required)."""
@property
def success_count(self) -> int:
"""Number of objects successfully processed (required)."""
@property
def failure_count(self) -> int:
"""Number of objects that failed processing (required)."""
@property
def pending_count(self) -> int:
"""Number of objects still being processed (required)."""
@property
def successes(self) -> list:
"""List of successfully processed object identifiers (optional)."""
@property
def failures(self) -> list:
"""List of failed object details with error information (optional)."""
@property
def pendings(self) -> list:
"""List of pending object identifiers (optional)."""
@property
def custom_properties(self) -> dict:
"""Custom status properties not defined in TAXII spec."""
@property
def _raw(self) -> dict:
"""Raw status response (parsed JSON)."""Monitor and update status information for asynchronous operations.
def refresh(self, accept=None) -> None:
"""
Update status information from the server.
Parameters:
- accept (str, optional): Media type for Accept header
"""
def wait_until_final(self, poll_interval=1, timeout=60) -> None:
"""
Poll the status endpoint until operation completes or times out.
Parameters:
- poll_interval (int): Seconds between polling requests (default: 1)
- timeout (int): Maximum seconds to wait (default: 60, <= 0 for no limit)
"""
def __bool__(self) -> bool:
"""
Check if operation completed successfully.
Returns:
bool: True if status is 'complete', False otherwise
"""
def close(self) -> None:
"""Close the status connection."""
def __enter__(self):
"""Context manager entry."""
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit."""from taxii2client import Collection
# Add objects to collection (returns Status object)
collection = Collection("https://taxii-server.example.com/taxii2/api1/collections/indicators/")
stix_envelope = {
"objects": [
{
"type": "indicator",
"id": "indicator--12345678-1234-5678-9012-123456789012",
"created": "2023-01-01T00:00:00.000Z",
"modified": "2023-01-01T00:00:00.000Z",
"pattern": "[file:hashes.MD5 = 'd41d8cd98f00b204e9800998ecf8427e']",
"labels": ["malicious-activity"]
}
]
}
# Start async operation (don't wait for completion)
status = collection.add_objects(stix_envelope, wait_for_completion=False)
print(f"Operation started:")
print(f" Status ID: {status.id}")
print(f" Initial status: {status.status}")
print(f" Total objects: {status.total_count}")import time
# Poll status manually
while status.status != "complete":
print(f"Status: {status.status}")
print(f" Success: {status.success_count}/{status.total_count}")
print(f" Failures: {status.failure_count}")
print(f" Pending: {status.pending_count}")
time.sleep(2) # Wait 2 seconds
status.refresh() # Update status from server
print("Operation completed!")# Use built-in polling with custom timeout
status = collection.add_objects(stix_envelope, wait_for_completion=False)
print(f"Starting operation: {status.id}")
print(f"Initial status: {status.status}")
# Wait up to 5 minutes, polling every 5 seconds
status.wait_until_final(poll_interval=5, timeout=300)
if status.status == "complete":
print("Operation completed successfully!")
else:
print(f"Operation timed out with status: {status.status}")# Analyze operation results
print(f"Operation Summary:")
print(f" Status: {status.status}")
print(f" Started: {status.request_timestamp}")
print(f" Total: {status.total_count}")
print(f" Successful: {status.success_count}")
print(f" Failed: {status.failure_count}")
print(f" Pending: {status.pending_count}")
# Check individual successes
if status.successes:
print(f"\nSuccessful objects ({len(status.successes)}):")
for success in status.successes:
if isinstance(success, dict):
print(f" {success.get('id', 'Unknown ID')}")
else:
print(f" {success}")
# Check individual failures
if status.failures:
print(f"\nFailed objects ({len(status.failures)}):")
for failure in status.failures:
if isinstance(failure, dict):
obj_id = failure.get('id', 'Unknown ID')
message = failure.get('message', 'No error message')
print(f" {obj_id}: {message}")
else:
print(f" {failure}")
# Check pending objects
if status.pendings:
print(f"\nPending objects ({len(status.pendings)}):")
for pending in status.pendings:
if isinstance(pending, dict):
print(f" {pending.get('id', 'Unknown ID')}")
else:
print(f" {pending}")# Use status as boolean (True if complete, False otherwise)
if status:
print("Operation completed successfully")
else:
print(f"Operation not complete: {status.status}")
# Check specific completion state
if status.status == "pending":
print("Still processing...")
elif status.failure_count > 0:
print(f"Some objects failed: {status.failure_count}/{status.total_count}")from taxii2client import ApiRoot, Status
# Get status directly from API root
api_root = ApiRoot("https://taxii-server.example.com/taxii2/api1/")
status_id = "12345678-abcd-efgh-ijkl-123456789012"
# Retrieve existing status
status = api_root.get_status(status_id)
print(f"Retrieved status: {status.status}")
# Or create Status object directly
status_url = f"https://taxii-server.example.com/taxii2/api1/status/{status_id}/"
status = Status(status_url, user="username", password="password")
print(f"Direct status: {status.status}")from taxii2client.exceptions import ValidationError, TAXIIServiceException
try:
# Refresh status information
status.refresh()
# Check for validation errors in status data
if status.total_count != (status.success_count + status.failure_count + status.pending_count):
print("Warning: Status counts don't add up correctly")
except ValidationError as e:
print(f"Status validation error: {e}")
except TAXIIServiceException as e:
print(f"TAXII service error: {e}")
except Exception as e:
print(f"Unexpected error: {e}")import time
from datetime import datetime, timedelta
# Start long-running operation
status = collection.add_objects(large_stix_envelope, wait_for_completion=False)
start_time = datetime.now()
print(f"Started operation at {start_time}")
print(f"Status ID: {status.id}")
# Monitor with progress updates
last_update = datetime.now()
while status.status != "complete":
current_time = datetime.now()
elapsed = current_time - start_time
# Refresh status
status.refresh()
# Show progress every 30 seconds
if current_time - last_update > timedelta(seconds=30):
progress = (status.success_count + status.failure_count) / status.total_count * 100
print(f"Progress: {progress:.1f}% ({elapsed.total_seconds():.0f}s elapsed)")
print(f" Processed: {status.success_count + status.failure_count}/{status.total_count}")
print(f" Success rate: {status.success_count/(status.success_count + status.failure_count)*100:.1f}%")
last_update = current_time
time.sleep(5) # Check every 5 seconds
total_time = datetime.now() - start_time
print(f"Operation completed in {total_time.total_seconds():.0f} seconds")# Direct status connection with automatic cleanup
status_url = "https://taxii-server.example.com/taxii2/api1/status/12345/"
with Status(status_url, user="user", password="pass") as status:
print(f"Status: {status.status}")
print(f"Success: {status.success_count}/{status.total_count}")
# Connection automatically closed when exiting contextInstall with Tessl CLI
npx tessl i tessl/pypi-taxii2-client