Cross-platform cryptographic library providing TLS sockets, asymmetric/symmetric encryption, and key operations using OS-native crypto libraries
—
Modern TLS socket implementation providing secure network communications with certificate verification using OS trust stores, SNI support, session reuse, and strong cipher suites. All TLS operations use OS-native implementations.
from oscrypto.tls import TLSSocket, TLSSession
from oscrypto.errors import TLSConnectionError, TLSDisconnectError, TLSVerificationErrorSecure socket wrapper that provides TLS encryption over TCP connections with automatic certificate verification.
class TLSSocket:
def __init__(self, address: str, port: int, timeout: int = 10, session: TLSSession = None):
"""
Create a TLS socket connection.
Parameters:
- address: str - Server domain name or IP address for connection
- port: int - Server port number
- timeout: int - Connection timeout in seconds
- session: TLSSession - Optional session for reuse
Raises:
TLSConnectionError if connection fails
TLSVerificationError if certificate verification fails
"""
def read(self, max_length: int) -> bytes:
"""
Read data from the TLS connection.
Parameters:
- max_length: int - Maximum number of bytes to read
Returns:
Data received from the connection
Raises:
TLSDisconnectError if connection is closed
"""
def write(self, data: bytes) -> None:
"""
Write data to the TLS connection.
Parameters:
- data: bytes - Data to send
Raises:
TLSDisconnectError if connection is closed
"""
def select_write(self, timeout: float = None) -> bool:
"""
Check if socket is ready for writing.
Parameters:
- timeout: float - Timeout in seconds (None for blocking)
Returns:
True if socket is ready for writing
"""
def shutdown(self) -> None:
"""
Perform TLS close notify and shutdown the connection gracefully.
"""
def close(self) -> None:
"""
Close the TLS connection and underlying socket.
"""
@property
def cipher_suite(self) -> str:
"""Get the negotiated cipher suite name."""
@property
def protocol(self) -> str:
"""Get the negotiated TLS protocol version."""
@property
def compression(self) -> bool:
"""Get whether TLS compression is enabled."""
@property
def session_id(self) -> bytes:
"""Get the TLS session ID for session reuse."""
@property
def session_ticket(self) -> bytes:
"""Get the TLS session ticket for session reuse."""
@property
def hostname(self) -> str:
"""Get the server hostname."""
@property
def port(self) -> int:
"""Get the server port."""
@property
def certificate(self) -> Certificate:
"""Get the server's certificate."""
@property
def intermediates(self) -> List[Certificate]:
"""Get intermediate certificates from the server."""Session manager for TLS connection reuse and configuration.
class TLSSession:
def __init__(self, protocol: str = None, manual_validation: bool = False,
extra_trust_roots: List[Certificate] = None):
"""
Create a TLS session for connection reuse and configuration.
Parameters:
- protocol: str - TLS protocol version ('TLSv1', 'TLSv1.1', 'TLSv1.2', 'TLSv1.3')
- manual_validation: bool - Disable automatic certificate validation
- extra_trust_roots: List[Certificate] - Additional CA certificates to trust
"""
@property
def protocol(self) -> str:
"""Get the configured TLS protocol version."""
@property
def manual_validation(self) -> bool:
"""Get whether manual validation is enabled."""
@property
def extra_trust_roots(self) -> List[Certificate]:
"""Get the list of extra trust root certificates."""from oscrypto import tls
# Create TLS connection to HTTPS server
socket = tls.TLSSocket('www.example.com', 443)
# Send HTTP request
request = b'GET / HTTP/1.1\r\nHost: www.example.com\r\nConnection: close\r\n\r\n'
socket.write(request)
# Read response
response = b''
while True:
try:
data = socket.read(1024)
if not data:
break
response += data
except tls.TLSDisconnectError:
break
socket.close()
print(f"Response length: {len(response)} bytes")
print(f"Server certificate: {socket.certificate.subject}")from oscrypto import tls
# Create a session for connection reuse
session = tls.TLSSession()
# First connection
socket1 = tls.TLSSocket('api.example.com', 443, session=session)
socket1.write(b'GET /endpoint1 HTTP/1.1\r\nHost: api.example.com\r\n\r\n')
response1 = socket1.read(1024)
socket1.close()
# Second connection reuses the session
socket2 = tls.TLSSocket('api.example.com', 443, session=session)
socket2.write(b'GET /endpoint2 HTTP/1.1\r\nHost: api.example.com\r\n\r\n')
response2 = socket2.read(1024)
socket2.close()
print(f"Session ID: {socket1.session_id.hex()}")
print(f"Cipher suite: {socket1.cipher_suite}")
print(f"Protocol: {socket1.protocol}")from oscrypto import tls, asymmetric
# Load custom CA certificate
with open('custom_ca.pem', 'rb') as f:
custom_ca = asymmetric.load_certificate(f.read())
# Create session with custom trust roots
session = tls.TLSSession(extra_trust_roots=[custom_ca])
# Connect to server using custom CA
socket = tls.TLSSocket('internal.company.com', 443, session=session)
# Connection will now trust the custom CA
socket.write(b'GET / HTTP/1.1\r\nHost: internal.company.com\r\n\r\n')
response = socket.read(1024)
socket.close()from oscrypto import tls, errors
# Create session with manual validation
session = tls.TLSSession(manual_validation=True)
try:
socket = tls.TLSSocket('untrusted.example.com', 443, session=session)
# Manually inspect the certificate
cert = socket.certificate
print(f"Certificate subject: {cert.subject}")
print(f"Certificate issuer: {cert.issuer}")
print(f"Certificate valid from: {cert.not_valid_before}")
print(f"Certificate valid until: {cert.not_valid_after}")
# Proceed with connection (validation bypassed)
socket.write(b'GET / HTTP/1.1\r\nHost: untrusted.example.com\r\n\r\n')
response = socket.read(1024)
socket.close()
except errors.TLSVerificationError as e:
print(f"Certificate verification failed: {e}")
print(f"Server certificate: {e.certificate.subject}")from oscrypto import tls, errors
import socket
try:
tls_socket = tls.TLSSocket('example.com', 443, timeout=5)
tls_socket.write(b'GET / HTTP/1.1\r\nHost: example.com\r\n\r\n')
while True:
try:
data = tls_socket.read(1024)
if not data:
break
print(f"Received {len(data)} bytes")
except errors.TLSGracefulDisconnectError:
# Server closed connection gracefully
print("Server closed connection")
break
except errors.TLSDisconnectError:
# Unexpected disconnection
print("Connection lost unexpectedly")
break
except errors.TLSConnectionError as e:
print(f"Connection failed: {e}")
except errors.TLSVerificationError as e:
print(f"Certificate verification failed: {e}")
print(f"Certificate subject: {e.certificate.subject}")
except socket.timeout:
print("Connection timed out")
finally:
if 'tls_socket' in locals():
tls_socket.close()Install with Tessl CLI
npx tessl i tessl/pypi-oscrypto