Python wrapper module around the OpenSSL library providing cryptographic functionality and TLS/SSL capabilities
—
Complete SSL/TLS connection handling with support for modern protocols, session management, and advanced features. Provides high-level SSL/TLS client and server capabilities with extensive configuration options.
SSL contexts define the parameters and behavior for SSL/TLS connections, including protocol versions, certificates, verification settings, and cryptographic options.
class Context:
def __init__(self, method: int):
"""
Create SSL context with specific method.
Parameters:
- method: SSL method constant (TLS_METHOD, TLS_CLIENT_METHOD, TLS_SERVER_METHOD, etc.)
"""
def set_min_proto_version(self, version: int):
"""Set minimum protocol version (TLS1_VERSION, TLS1_2_VERSION, etc.)"""
def set_max_proto_version(self, version: int):
"""Set maximum protocol version"""
def load_verify_locations(self, cafile, capath=None):
"""Load CA certificates from file or directory"""
def set_default_verify_paths(self):
"""Use system default CA certificate locations"""
def use_certificate_file(self, certfile, filetype=FILETYPE_PEM):
"""Load certificate from file"""
def use_privatekey_file(self, keyfile, filetype=FILETYPE_PEM):
"""Load private key from file"""
def check_privatekey(self):
"""Verify private key matches certificate"""
def set_verify(self, mode: int, callback=None):
"""
Set peer certificate verification mode.
Parameters:
- mode: VERIFY_NONE, VERIFY_PEER, VERIFY_FAIL_IF_NO_PEER_CERT
- callback: Optional verification callback function
"""
def set_cipher_list(self, cipher_list):
"""Set allowed cipher suites"""
def set_options(self, options: int):
"""Set SSL option flags (OP_NO_SSLv3, OP_SINGLE_DH_USE, etc.)"""
def set_alpn_protos(self, protos):
"""Set ALPN protocols for client"""
def set_alpn_select_callback(self, callback):
"""Set ALPN selection callback for server"""
def use_certificate(self, cert: X509):
"""Use X509 certificate object"""
def use_certificate_chain_file(self, certfile):
"""Load certificate chain from file"""
def add_extra_chain_cert(self, certobj: X509):
"""Add certificate to chain"""
def use_privatekey(self, pkey: PKey):
"""Use PKey private key object"""
def set_passwd_cb(self, callback, userdata=None):
"""Set private key password callback"""
def load_client_ca(self, cafile):
"""Load client CA certificate from file"""
def set_client_ca_list(self, certificate_authorities):
"""Set list of client CA certificates"""
def add_client_ca(self, certificate_authority: X509):
"""Add client CA certificate"""
def set_session_id(self, buf: bytes):
"""Set SSL session ID context"""
def set_session_cache_mode(self, mode: int) -> int:
"""Set session cache mode, returns previous mode"""
def get_session_cache_mode(self) -> int:
"""Get current session cache mode"""
def set_verify_depth(self, depth: int):
"""Set maximum certificate chain depth"""
def get_verify_mode(self) -> int:
"""Get current verification mode"""
def get_verify_depth(self) -> int:
"""Get maximum verification depth"""
def load_tmp_dh(self, dhfile):
"""Load Diffie-Hellman parameters from file"""
def set_tmp_ecdh(self, curve):
"""Set elliptic curve for ECDH"""
def set_timeout(self, timeout: int):
"""Set session timeout in seconds"""
def get_timeout(self) -> int:
"""Get session timeout"""
def set_info_callback(self, callback):
"""Set info callback for debugging"""
def set_keylog_callback(self, callback):
"""Set key logging callback (for debugging)"""
def get_app_data(self):
"""Get application data associated with context"""
def set_app_data(self, data):
"""Set application data for context"""
def get_cert_store(self):
"""Get certificate store (X509Store)"""
def set_options(self, options: int) -> int:
"""Set SSL options, returns new options value"""
def set_mode(self, mode: int) -> int:
"""Set SSL mode, returns new mode value"""
def set_tlsext_servername_callback(self, callback):
"""Set SNI (Server Name Indication) callback"""
def set_tlsext_use_srtp(self, profiles):
"""Set SRTP profiles for DTLS"""
def set_ocsp_server_callback(self, callback, data=None):
"""Set OCSP stapling callback for server"""
def set_ocsp_client_callback(self, callback, data=None):
"""Set OCSP callback for client"""
def set_cookie_generate_callback(self, callback):
"""Set DTLS cookie generation callback"""
def set_cookie_verify_callback(self, callback):
"""Set DTLS cookie verification callback"""SSL connections wrap sockets with SSL/TLS encryption, providing secure data transmission with certificate validation and session management.
class Connection:
def __init__(self, context: Context, socket=None):
"""
Create SSL connection with context and optional socket.
Parameters:
- context: SSL context defining connection parameters
- socket: Optional existing socket to wrap
"""
def connect(self, addr):
"""Connect to remote address (client mode)"""
def accept(self) -> tuple:
"""Accept incoming connection (server mode), returns (conn, addr)"""
def do_handshake(self):
"""Perform SSL handshake"""
def send(self, buf, flags=0) -> int:
"""
Send data over SSL connection.
Parameters:
- buf: Data to send (bytes)
- flags: Optional socket flags
Returns:
Number of bytes sent
"""
def sendall(self, buf, flags=0):
"""Send all data over SSL connection"""
def recv(self, bufsiz, flags=None) -> bytes:
"""
Receive data from SSL connection.
Parameters:
- bufsiz: Maximum bytes to receive
- flags: Optional socket flags
Returns:
Received data as bytes
"""
def recv_into(self, buffer, nbytes=None, flags=None) -> int:
"""Receive data into existing buffer"""
def pending(self) -> int:
"""Get number of bytes pending in SSL buffer"""
def shutdown(self) -> bool:
"""Send SSL shutdown notification"""
def get_peer_certificate(self, *, as_cryptography=False):
"""
Get peer's certificate.
Parameters:
- as_cryptography: Return cryptography.x509.Certificate if True
Returns:
X509 certificate or None
"""
def get_peer_cert_chain(self, *, as_cryptography=False) -> list:
"""Get complete peer certificate chain"""
def get_cipher_name(self) -> str:
"""Get current cipher suite name"""
def get_protocol_version_name(self) -> str:
"""Get protocol version string (e.g., 'TLSv1.3')"""
def get_protocol_version(self) -> int:
"""Get protocol version as integer constant"""
def get_context(self) -> Context:
"""Get SSL context"""
def set_context(self, context: Context):
"""Change SSL context"""
def get_servername(self) -> bytes:
"""Get SNI server name"""
def set_verify(self, mode: int, callback=None):
"""Set verification mode for this connection"""
def get_verify_mode(self) -> int:
"""Get verification mode"""
def use_certificate(self, cert: X509):
"""Use certificate for this connection"""
def use_privatekey(self, pkey: PKey):
"""Use private key for this connection"""
def set_ciphertext_mtu(self, mtu: int):
"""Set DTLS ciphertext MTU"""
def get_cleartext_mtu(self) -> int:
"""Get DTLS cleartext MTU"""
def set_tlsext_host_name(self, name: bytes):
"""Set SNI hostname for client"""
def bio_read(self, bufsiz: int) -> bytes:
"""Read from internal BIO buffer"""
def bio_write(self, buf: bytes) -> int:
"""Write to internal BIO buffer"""
def renegotiate(self) -> bool:
"""Request SSL renegotiation"""
def renegotiate_pending(self) -> bool:
"""Check if renegotiation is pending"""
def total_renegotiations(self) -> int:
"""Get total number of renegotiations"""
def connect_ex(self, addr) -> int:
"""Non-blocking connect, returns error code"""
def DTLSv1_listen(self):
"""DTLS server listen for cookie exchange"""
def DTLSv1_get_timeout(self) -> int:
"""Get DTLS timeout value"""
def DTLSv1_handle_timeout(self) -> bool:
"""Handle DTLS timeout"""
def bio_shutdown(self):
"""Shutdown internal BIO"""
def get_cipher_list(self) -> list:
"""Get list of available ciphers"""
def get_client_ca_list(self) -> list:
"""Get list of client CA names"""
def get_app_data(self):
"""Get application data"""
def set_app_data(self, data):
"""Set application data"""
def get_shutdown(self) -> int:
"""Get shutdown state"""
def set_shutdown(self, state: int):
"""Set shutdown state"""
def get_state_string(self) -> bytes:
"""Get SSL state as string"""
def server_random(self) -> bytes:
"""Get server random bytes"""
def client_random(self) -> bytes:
"""Get client random bytes"""
def master_key(self) -> bytes:
"""Get master key (for debugging)"""
def export_keying_material(self, label: bytes, olen: int, context: bytes = None) -> bytes:
"""Export keying material for external use"""
def sock_shutdown(self, *args, **kwargs):
"""Call underlying socket shutdown"""
def get_certificate(self, *, as_cryptography=False):
"""Get local certificate"""
def get_peer_cert_chain(self, *, as_cryptography=False) -> list:
"""Get complete peer certificate chain"""
def get_verified_chain(self, *, as_cryptography=False) -> list:
"""Get verified certificate chain"""
def want_read(self) -> bool:
"""Check if SSL wants to read"""
def want_write(self) -> bool:
"""Check if SSL wants to write"""
def set_accept_state(self):
"""Set connection to server mode"""
def set_connect_state(self):
"""Set connection to client mode"""
def get_session(self):
"""Get SSL session object"""
def set_session(self, session):
"""Set SSL session for resumption"""
def get_finished(self) -> bytes:
"""Get local finished message"""
def get_peer_finished(self) -> bytes:
"""Get peer finished message"""
def get_cipher_bits(self) -> int:
"""Get cipher strength in bits"""
def get_cipher_version(self) -> str:
"""Get cipher protocol version"""
def set_alpn_protos(self, protos):
"""Set ALPN protocols for this connection"""
def get_alpn_proto_negotiated(self) -> bytes:
"""Get negotiated ALPN protocol"""
def get_selected_srtp_profile(self) -> bytes:
"""Get selected SRTP profile"""
def request_ocsp(self):
"""Request OCSP stapling"""SSL session objects for session resumption and caching.
class Session:
"""
Opaque SSL session object for session resumption.
Sessions allow clients to resume previous SSL connections
without performing a full handshake.
"""
passUtility functions for accessing OpenSSL version information and debugging.
def OpenSSL_version(type: int = OPENSSL_VERSION) -> bytes:
"""
Get OpenSSL version information.
Parameters:
- type: Information type (OPENSSL_VERSION, OPENSSL_CFLAGS, etc.)
Returns:
Requested information as bytes
"""
# Backward compatibility alias
SSLeay_version = OpenSSL_versionComprehensive set of protocol methods, versions, options, and verification modes for SSL/TLS configuration.
# SSL Methods
TLS_METHOD: int # Auto-negotiate TLS (recommended)
TLS_CLIENT_METHOD: int # Client-side TLS
TLS_SERVER_METHOD: int # Server-side TLS
DTLS_METHOD: int # DTLS protocol
DTLS_CLIENT_METHOD: int # Client-side DTLS
DTLS_SERVER_METHOD: int # Server-side DTLS
# Deprecated methods (for backward compatibility)
SSLv23_METHOD: int # Deprecated, use TLS_METHOD instead
TLSv1_METHOD: int # Deprecated TLS 1.0 only
TLSv1_1_METHOD: int # Deprecated TLS 1.1 only
TLSv1_2_METHOD: int # Deprecated TLS 1.2 only
# Protocol Versions
SSL3_VERSION: int # SSL 3.0 (deprecated)
TLS1_VERSION: int # TLS 1.0
TLS1_1_VERSION: int # TLS 1.1
TLS1_2_VERSION: int # TLS 1.2
TLS1_3_VERSION: int # TLS 1.3
# OpenSSL Version Information
OPENSSL_VERSION_NUMBER: int # OpenSSL version as integer
OPENSSL_VERSION: bytes # OpenSSL version string
OPENSSL_CFLAGS: bytes # Compilation flags
OPENSSL_PLATFORM: bytes # Platform information
OPENSSL_DIR: bytes # OpenSSL directory
OPENSSL_BUILT_ON: bytes # Build timestamp
# Legacy aliases
SSLEAY_VERSION: bytes # Alias for OPENSSL_VERSION
SSLEAY_CFLAGS: bytes # Alias for OPENSSL_CFLAGS
SSLEAY_PLATFORM: bytes # Alias for OPENSSL_PLATFORM
SSLEAY_DIR: bytes # Alias for OPENSSL_DIR
SSLEAY_BUILT_ON: bytes # Alias for OPENSSL_BUILT_ON
# SSL Options (OP_*)
OP_ALL: int # Enable all bug workarounds
OP_NO_SSLv2: int # Disable SSL 2.0
OP_NO_SSLv3: int # Disable SSL 3.0
OP_NO_TLSv1: int # Disable TLS 1.0
OP_NO_TLSv1_1: int # Disable TLS 1.1
OP_NO_TLSv1_2: int # Disable TLS 1.2
OP_NO_TLSv1_3: int # Disable TLS 1.3
OP_SINGLE_DH_USE: int # Generate new DH key per handshake
OP_SINGLE_ECDH_USE: int # Generate new ECDH key per handshake
OP_NO_COMPRESSION: int # Disable compression
OP_NO_TICKET: int # Disable session tickets
OP_CIPHER_SERVER_PREFERENCE: int # Use server cipher preference
OP_COOKIE_EXCHANGE: int # Enable cookie exchange for DTLS
OP_NO_QUERY_MTU: int # Don't query MTU for DTLS
# Verification Modes
VERIFY_NONE: int # No peer verification
VERIFY_PEER: int # Verify peer certificate
VERIFY_FAIL_IF_NO_PEER_CERT: int # Fail if no peer cert
VERIFY_CLIENT_ONCE: int # Only verify client once
# Session Cache Modes
SESS_CACHE_OFF: int # Disable session cache
SESS_CACHE_CLIENT: int # Client-side caching
SESS_CACHE_SERVER: int # Server-side caching
SESS_CACHE_BOTH: int # Both client and server caching
SESS_CACHE_NO_AUTO_CLEAR: int # Don't auto-clear cache
SESS_CACHE_NO_INTERNAL: int # Don't use internal cache
SESS_CACHE_NO_INTERNAL_LOOKUP: int # Don't lookup in internal cache
SESS_CACHE_NO_INTERNAL_STORE: int # Don't store in internal cache
# SSL States
SSL_ST_CONNECT: int # Connect state
SSL_ST_ACCEPT: int # Accept state
SSL_ST_MASK: int # State mask
# Callback Constants
SSL_CB_LOOP: int # Handshake loop callback
SSL_CB_EXIT: int # Handshake exit callback
SSL_CB_READ: int # Read callback
SSL_CB_WRITE: int # Write callback
SSL_CB_ALERT: int # Alert callback
SSL_CB_HANDSHAKE_START: int # Handshake start callback
SSL_CB_HANDSHAKE_DONE: int # Handshake done callback
# Shutdown States
SENT_SHUTDOWN: int # Shutdown notification sent
RECEIVED_SHUTDOWN: int # Shutdown notification received
# Special Objects
NO_OVERLAPPING_PROTOCOLS: object # Sentinel for ALPN callbackSpecialized exceptions for SSL/TLS operations and non-blocking I/O handling.
class Error(Exception):
"""Base SSL exception"""
class WantReadError(Error):
"""SSL needs to read more data (non-blocking)"""
class WantWriteError(Error):
"""SSL needs to write data (non-blocking)"""
class WantX509LookupError(Error):
"""SSL needs X509 lookup (certificate callback)"""
class ZeroReturnError(Error):
"""Clean SSL connection shutdown"""
class SysCallError(Error):
"""System call error during SSL operation"""from OpenSSL import SSL
import socket
# Create SSL context
context = SSL.Context(SSL.TLS_CLIENT_METHOD)
context.set_default_verify_paths()
context.set_verify(SSL.VERIFY_PEER, None)
# Create connection
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
connection = SSL.Connection(context, sock)
# Connect and perform handshake
connection.connect(('www.example.com', 443))
connection.do_handshake()
# Verify peer certificate
peer_cert = connection.get_peer_certificate()
if peer_cert:
print(f"Connected to: {peer_cert.get_subject().CN}")
# Send data
connection.send(b'GET / HTTP/1.1\r\nHost: www.example.com\r\n\r\n')
response = connection.recv(4096)
connection.close()from OpenSSL import SSL
import socket
# Create server context
context = SSL.Context(SSL.TLS_SERVER_METHOD)
context.use_certificate_file('server.crt')
context.use_privatekey_file('server.key')
context.check_privatekey()
# Create server socket
server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_sock.bind(('localhost', 8443))
server_sock.listen(5)
while True:
# Accept connection
client_sock, addr = server_sock.accept()
connection = SSL.Connection(context, client_sock)
try:
connection.do_handshake()
data = connection.recv(1024)
connection.send(b'HTTP/1.1 200 OK\r\n\r\nHello SSL!')
finally:
connection.close()from OpenSSL import SSL
context = SSL.Context(SSL.TLS_SERVER_METHOD)
# Set protocol version range
context.set_min_proto_version(SSL.TLS1_2_VERSION)
context.set_max_proto_version(SSL.TLS1_3_VERSION)
# Configure cipher suites
context.set_cipher_list(b'ECDHE+AESGCM:ECDHE+CHACHA20:DHE+AESGCM')
# Set SSL options
context.set_options(SSL.OP_NO_SSLv3 | SSL.OP_SINGLE_DH_USE)
# Configure ALPN for HTTP/2
context.set_alpn_protos([b'h2', b'http/1.1'])
# Set verification
context.set_verify(SSL.VERIFY_PEER | SSL.VERIFY_FAIL_IF_NO_PEER_CERT, None)
context.load_verify_locations('ca-bundle.crt')Install with Tessl CLI
npx tessl i tessl/pypi-pyopenssl