Python GSSAPI wrapper providing both low-level C-style and high-level Pythonic interfaces for Kerberos and other authentication mechanisms
—
GSSAPI security contexts establish authenticated communication channels between principals. The SecurityContext class provides context establishment, message protection, and secure communication capabilities.
Create and establish security contexts for both initiators and acceptors.
class SecurityContext(gssapi.raw.sec_contexts.SecurityContext):
def __new__(cls, base=None, token=None, name=None, creds=None, lifetime=None, flags=None, mech=None, channel_bindings=None, usage=None):
"""
Create a new security context.
Parameters:
- base: existing SecurityContext object to wrap
- token: bytes, imported context token
- name: Name, target name (for initiate) or None (for accept)
- creds: Credentials, credentials to use
- lifetime: int, desired context lifetime in seconds
- flags: RequirementFlag, desired context properties
- mech: OID, desired mechanism
- channel_bindings: ChannelBindings, channel binding data
- usage: str, 'initiate' or 'accept'
Returns:
SecurityContext object
"""Usage example:
import gssapi
# Initiator context
target_name = gssapi.Name('service@hostname.example.com')
creds = gssapi.Credentials(usage='initiate')
init_ctx = gssapi.SecurityContext(
name=target_name,
creds=creds,
flags=gssapi.RequirementFlag.mutual_authentication | gssapi.RequirementFlag.integrity,
usage='initiate'
)
# Acceptor context
service_creds = gssapi.Credentials(usage='accept')
accept_ctx = gssapi.SecurityContext(
creds=service_creds,
usage='accept'
)Perform the multi-step authentication handshake between initiator and acceptor.
def step(self, token=None):
"""
Perform one step of the security context establishment.
Parameters:
- token: bytes, input token from peer (None for first step)
Returns:
bytes or None: Output token to send to peer, or None if complete
Raises:
GSSError: If authentication fails
"""Usage example:
import gssapi
# Complete authentication handshake
init_ctx = gssapi.SecurityContext(name=target_name, creds=init_creds, usage='initiate')
accept_ctx = gssapi.SecurityContext(creds=accept_creds, usage='accept')
token = None
while not init_ctx.complete or not accept_ctx.complete:
# Initiator step
if not init_ctx.complete:
init_token = init_ctx.step(token)
if init_token:
# Send init_token to acceptor
pass
# Acceptor step
if init_token and not accept_ctx.complete:
token = accept_ctx.step(init_token)
if token:
# Send token back to initiator
pass
print(f"Authentication complete. Initiator: {init_ctx.initiator_name}")Access information about established security contexts.
@property
def complete(self):
"""True if context establishment is complete."""
@property
def initiator_name(self):
"""Name of the context initiator."""
@property
def target_name(self):
"""Name of the context target/acceptor."""
@property
def lifetime(self):
"""Remaining context lifetime in seconds."""
@property
def mech(self):
"""Mechanism used for the context."""
@property
def actual_flags(self):
"""Context flags indicating actual available services."""
@property
def locally_initiated(self):
"""True if context was locally initiated."""
@property
def delegated_creds(self):
"""Credentials delegated from initiator to acceptor (or None)."""Protect messages with confidentiality and/or integrity services.
def wrap(self, message, encrypt):
"""
Wrap (encrypt and/or sign) a message.
Parameters:
- message: bytes, message to protect
- encrypt: bool, whether to encrypt the message
Returns:
WrapResult: message (bytes), encrypted (bool)
"""
def unwrap(self, message):
"""
Unwrap (decrypt and/or verify) a protected message.
Parameters:
- message: bytes, protected message to unwrap
Returns:
UnwrapResult: message (bytes), encrypted (bool), qop (int)
"""Usage example:
import gssapi
# After context establishment
ctx = gssapi.SecurityContext(name=target_name, creds=creds, usage='initiate')
# ... complete handshake ...
# Protect messages
plaintext = b"Secret message"
wrapped = ctx.wrap(plaintext, encrypt=True)
print(f"Encrypted: {wrapped.encrypted}")
# Send wrapped.message to peer...
# Unprotect received messages
unwrapped = ctx.unwrap(wrapped.message)
print(f"Original message: {unwrapped.message}")
print(f"Was encrypted: {unwrapped.encrypted}")Simplified encryption/decryption methods that ensure encryption is used.
def encrypt(self, message):
"""
Encrypt a message, ensuring encryption is used.
Parameters:
- message: bytes, message to encrypt
Returns:
bytes: Encrypted message
Raises:
EncryptionNotUsed: If encryption could not be applied
"""
def decrypt(self, message):
"""
Decrypt a message, ensuring encryption was used.
Parameters:
- message: bytes, encrypted message to decrypt
Returns:
bytes: Decrypted message
Raises:
EncryptionNotUsed: If encryption was expected but not used
"""
def get_wrap_size_limit(self, desired_output_size, encrypted=True):
"""
Calculate maximum input message size for a given output size.
Parameters:
- desired_output_size: int, maximum output size in bytes
- encrypted: bool, account for encryption overhead
Returns:
int: Maximum input message size
"""
def process_token(self, token):
"""
Process an output token asynchronously (deprecated).
Parameters:
- token: bytes, token to process
"""Create and verify Message Integrity Codes (signatures) without encryption.
def get_signature(self, message):
"""
Create a Message Integrity Code (MIC) for a message.
Parameters:
- message: bytes, message to sign
Returns:
bytes: MIC token
"""
def verify_signature(self, message, mic):
"""
Verify a Message Integrity Code for a message.
Parameters:
- message: bytes, original message
- mic: bytes, MIC token to verify
Returns:
int: Quality of protection used
Raises:
GSSError: If verification fails
"""Usage example:
import gssapi
# Create signature
message = b"Message to sign"
mic = ctx.get_signature(message)
# Send message and mic separately...
# Verify signature
try:
qop = ctx.verify_signature(message, mic)
print("Signature verified successfully")
except gssapi.exceptions.GSSError:
print("Signature verification failed")Query detailed information about established security contexts.
def inquire(self, initiator_name=True, target_name=True, lifetime=True, mech=True,
flags=True, locally_init=True, complete=True):
"""
Inquire about context information.
Parameters:
- initiator_name: bool, include initiator name
- target_name: bool, include target name
- lifetime: bool, include lifetime
- mech: bool, include mechanism
- flags: bool, include context flags
- locally_init: bool, include local initiation status
- complete: bool, include completion status
Returns:
InquireContextResult: requested information fields
"""Export contexts for transport or storage and import them later.
def export(self):
"""
Export the security context to a token.
Returns:
bytes: Exported context token
"""
@classmethod
def from_token(cls, token):
"""
Import a security context from a token.
Parameters:
- token: bytes, exported context token
Returns:
SecurityContext: Imported context
"""Use channel bindings to bind the authentication to an underlying secure channel.
from gssapi.raw.chan_bindings import ChannelBindings
# Create channel bindings
bindings = ChannelBindings(
initiator_address_type=gssapi.AddressType.ip,
initiator_address=b'192.168.1.100',
acceptor_address_type=gssapi.AddressType.ip,
acceptor_address=b'192.168.1.200',
application_data=b'TLS_session_data'
)
# Use in context creation
ctx = gssapi.SecurityContext(
name=target_name,
creds=creds,
channel_bindings=bindings,
usage='initiate'
)Perform message protection on scatter-gather data structures (requires DCE extension).
def wrap_iov(self, iov, encrypt=None):
"""
Wrap data using IOV (scatter-gather) interface.
Parameters:
- iov: list of IOV objects
- encrypt: bool, whether to encrypt
Returns:
bool: Whether message was encrypted
Requires DCE extension.
"""
def unwrap_iov(self, iov):
"""
Unwrap data using IOV (scatter-gather) interface.
Parameters:
- iov: list of IOV objects
Returns:
IOVUnwrapResult: encrypted (bool), qop (int)
Requires DCE extension.
"""Perform Authenticated Encryption with Associated Data operations (requires DCE AEAD extension).
def wrap_aead(self, message, associated_data=None):
"""
Wrap message using AEAD.
Parameters:
- message: bytes, message to encrypt
- associated_data: bytes, additional authenticated data
Returns:
bytes: Wrapped message
Requires DCE AEAD extension.
"""
def unwrap_aead(self, message, associated_data=None):
"""
Unwrap AEAD-protected message.
Parameters:
- message: bytes, wrapped message
- associated_data: bytes, additional authenticated data
Returns:
bytes: Unwrapped message
Requires DCE AEAD extension.
"""from gssapi.raw.types import RequirementFlag
class RequirementFlag(IntEnumFlagSet):
delegate_to_peer: int # Delegate credentials to peer
mutual_authentication: int # Require mutual authentication
replay_detection: int # Enable replay detection
out_of_sequence_detection: int # Enable sequence detection
confidentiality: int # Enable message encryption
integrity: int # Enable message integrity
anonymity: int # Enable anonymous authentication
protection_ready: int # Per-message protection available
transferable: int # Context ready for data transfer
ok_as_delegate: int # Delegation policy flag
channel_bound: int # Channel bound flagUsage example:
import gssapi
# Request specific context features
flags = (gssapi.RequirementFlag.mutual_authentication |
gssapi.RequirementFlag.integrity |
gssapi.RequirementFlag.confidentiality)
ctx = gssapi.SecurityContext(
name=target_name,
creds=creds,
flags=flags,
usage='initiate'
)
# Check what was actually negotiated
if ctx.flags & gssapi.RequirementFlag.confidentiality:
print("Confidentiality is available")Direct access to underlying C-style GSSAPI context functions.
# From gssapi.raw.sec_contexts module
def init_sec_context(creds, context=None, target_name=None, mech=None, req_flags=None, time_req=None, channel_bindings=None, input_token=None):
"""Initialize security context (initiator side)."""
def accept_sec_context(creds, context=None, input_token=None, channel_bindings=None):
"""Accept security context (acceptor side)."""
def inquire_context(context, initiator_name=True, target_name=True, lifetime=True, mech=True, flags=True, locally_init=True, complete=True):
"""Inquire about context information."""
def delete_sec_context(context):
"""Delete security context."""
def export_sec_context(context):
"""Export security context."""
def import_sec_context(token):
"""Import security context."""import gssapi
def authenticate_and_communicate():
# Setup
target_name = gssapi.Name('service@hostname.example.com')
init_creds = gssapi.Credentials(usage='initiate')
accept_creds = gssapi.Credentials(usage='accept')
# Create contexts
init_ctx = gssapi.SecurityContext(
name=target_name,
creds=init_creds,
flags=gssapi.RequirementFlag.mutual_authentication | gssapi.RequirementFlag.integrity,
usage='initiate'
)
accept_ctx = gssapi.SecurityContext(
creds=accept_creds,
usage='accept'
)
# Authentication handshake
token = None
while not init_ctx.complete or not accept_ctx.complete:
if not init_ctx.complete:
init_token = init_ctx.step(token)
if init_token:
# Send to acceptor
pass
if init_token and not accept_ctx.complete:
token = accept_ctx.step(init_token)
if token:
# Send back to initiator
pass
# Secure communication
message = b"Hello, secure world!"
# Initiator protects message
wrapped = init_ctx.wrap(message, encrypt=True)
# Acceptor unprotects message
unwrapped = accept_ctx.unwrap(wrapped.message)
print(f"Secure message: {unwrapped.message}")
print(f"Was encrypted: {unwrapped.encrypted}")
if __name__ == "__main__":
authenticate_and_communicate()Install with Tessl CLI
npx tessl i tessl/pypi-gssapi