CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-python-osc

Open Sound Control server and client implementations in pure Python for networked music and multimedia applications

Pending
Overview
Eval results
Files

data-types.mddocs/

Data Types and Parsing

Low-level OSC data type encoding/decoding, NTP timestamp handling, and SLIP protocol implementation. These modules provide the foundation for OSC message formatting and reliable TCP communication.

Capabilities

OSC Type Conversion

Functions for converting between Python types and OSC binary format with full support for all standard OSC data types.

# String handling
def write_string(val: str) -> bytes:
    """Convert Python string to OSC string format.
    
    Parameters:
    - val: Python string to encode
    
    Returns:
    OSC-formatted string bytes (UTF-8 encoded, null-terminated, padded to 4-byte boundary)
    
    Raises:
    BuildError: If string cannot be encoded
    """

def get_string(dgram: bytes, start_index: int) -> Tuple[str, int]:
    """Parse OSC string from datagram.
    
    Parameters:
    - dgram: Datagram bytes containing OSC string
    - start_index: Starting position in datagram
    
    Returns:
    Tuple of (parsed_string, next_index)
    
    Raises:
    ParseError: If string cannot be parsed
    """

# Integer handling
def write_int(val: int) -> bytes:
    """Convert Python int to OSC int32 format.
    
    Parameters:
    - val: 32-bit integer value
    
    Returns:
    4-byte big-endian integer representation
    
    Raises:
    BuildError: If value exceeds int32 range
    """

def get_int(dgram: bytes, start_index: int) -> Tuple[int, int]:
    """Parse OSC int32 from datagram.
    
    Parameters:
    - dgram: Datagram bytes containing OSC int32
    - start_index: Starting position in datagram
    
    Returns:
    Tuple of (parsed_int, next_index)
    
    Raises:
    ParseError: If integer cannot be parsed
    """

def write_int64(val: int) -> bytes:
    """Convert Python int to OSC int64 format.
    
    Parameters:
    - val: 64-bit integer value
    
    Returns:
    8-byte big-endian integer representation
    """

def get_int64(dgram: bytes, start_index: int) -> Tuple[int, int]:
    """Parse OSC int64 from datagram.
    
    Parameters:
    - dgram: Datagram bytes containing OSC int64
    - start_index: Starting position in datagram
    
    Returns:
    Tuple of (parsed_int64, next_index)
    """

def get_uint64(dgram: bytes, start_index: int) -> Tuple[int, int]:
    """Parse OSC uint64 from datagram.
    
    Parameters:
    - dgram: Datagram bytes containing OSC uint64
    - start_index: Starting position in datagram
    
    Returns:
    Tuple of (parsed_uint64, next_index)
    """

# Float handling
def write_float(val: float) -> bytes:
    """Convert Python float to OSC float32 format.
    
    Parameters:
    - val: Float value for 32-bit representation
    
    Returns:
    4-byte IEEE 754 single-precision representation
    """

def get_float(dgram: bytes, start_index: int) -> Tuple[float, int]:
    """Parse OSC float32 from datagram.
    
    Parameters:
    - dgram: Datagram bytes containing OSC float32
    - start_index: Starting position in datagram
    
    Returns:
    Tuple of (parsed_float, next_index)
    """

def write_double(val: float) -> bytes:
    """Convert Python float to OSC double/float64 format.
    
    Parameters:
    - val: Float value for 64-bit representation
    
    Returns:
    8-byte IEEE 754 double-precision representation
    """

def get_double(dgram: bytes, start_index: int) -> Tuple[float, int]:
    """Parse OSC double/float64 from datagram.
    
    Parameters:
    - dgram: Datagram bytes containing OSC double
    - start_index: Starting position in datagram
    
    Returns:
    Tuple of (parsed_double, next_index)
    """

# Binary data handling
def write_blob(val: bytes) -> bytes:
    """Convert Python bytes to OSC blob format.
    
    Parameters:
    - val: Binary data to encode
    
    Returns:
    OSC blob with 4-byte size prefix and padded data
    """

def get_blob(dgram: bytes, start_index: int) -> Tuple[bytes, int]:
    """Parse OSC blob from datagram.
    
    Parameters:
    - dgram: Datagram bytes containing OSC blob
    - start_index: Starting position in datagram
    
    Returns:
    Tuple of (parsed_bytes, next_index)
    """

# Special type handling
def write_rgba(val: Tuple[int, int, int, int]) -> bytes:
    """Convert RGBA tuple to OSC RGBA color format.
    
    Parameters:
    - val: (red, green, blue, alpha) tuple with values 0-255
    
    Returns:
    4-byte RGBA color representation
    """

def get_rgba(dgram: bytes, start_index: int) -> Tuple[Tuple[int, int, int, int], int]:
    """Parse OSC RGBA color from datagram.
    
    Parameters:
    - dgram: Datagram bytes containing OSC RGBA
    - start_index: Starting position in datagram
    
    Returns:
    Tuple of ((r, g, b, a), next_index)
    """

def write_midi(val: MidiPacket) -> bytes:
    """Convert MIDI packet to OSC MIDI format.
    
    Parameters:
    - val: (port_id, status_byte, data1, data2) MIDI packet
    
    Returns:
    4-byte OSC MIDI representation
    """

def get_midi(dgram: bytes, start_index: int) -> Tuple[MidiPacket, int]:
    """Parse OSC MIDI message from datagram.
    
    Parameters:
    - dgram: Datagram bytes containing OSC MIDI
    - start_index: Starting position in datagram
    
    Returns:
    Tuple of (midi_packet, next_index)
    """

Timestamp Handling

OSC timetag conversion functions for precise timing control with NTP timestamp format.

def write_date(system_time: Union[float, int]) -> bytes:
    """Convert system time to OSC timetag format.
    
    Parameters:
    - system_time: System time in seconds since epoch, or IMMEDIATELY constant
    
    Returns:
    8-byte NTP timestamp for OSC timetag
    """

def get_date(dgram: bytes, start_index: int) -> Tuple[float, int]:
    """Parse OSC timetag from datagram as system time.
    
    Parameters:
    - dgram: Datagram bytes containing OSC timetag
    - start_index: Starting position in datagram
    
    Returns:
    Tuple of (system_time, next_index)
    """

def get_timetag(dgram: bytes, start_index: int) -> Tuple[datetime.datetime, int]:
    """Parse OSC timetag from datagram as datetime object.
    
    Parameters:
    - dgram: Datagram bytes containing OSC timetag
    - start_index: Starting position in datagram
    
    Returns:
    Tuple of (datetime_object, next_index)
    """

# Special timing constant
IMMEDIATELY: int = 0  # Special value for immediate execution

NTP Timestamp Functions

Low-level NTP timestamp manipulation for precise timing operations.

def parse_timestamp(timestamp: int) -> Timestamp:
    """Parse NTP timestamp into seconds and fraction components.
    
    Parameters:
    - timestamp: 64-bit NTP timestamp
    
    Returns:
    Timestamp namedtuple with seconds and fraction fields
    """

def ntp_to_system_time(timestamp: bytes) -> float:
    """Convert NTP timestamp bytes to system time.
    
    Parameters:
    - timestamp: 8-byte NTP timestamp
    
    Returns:
    System time in seconds since Unix epoch
    
    Raises:
    NtpError: If timestamp cannot be converted
    """

def system_time_to_ntp(seconds: float) -> bytes:
    """Convert system time to NTP timestamp bytes.
    
    Parameters:
    - seconds: System time in seconds since Unix epoch
    
    Returns:
    8-byte NTP timestamp representation
    """

def ntp_time_to_system_epoch(seconds: float) -> float:
    """Convert NTP epoch time to system epoch time.
    
    Parameters:
    - seconds: Seconds since NTP epoch (1900-01-01)
    
    Returns:
    Seconds since Unix epoch (1970-01-01)
    """

def system_time_to_ntp_epoch(seconds: float) -> float:
    """Convert system epoch time to NTP epoch time.
    
    Parameters:
    - seconds: Seconds since Unix epoch
    
    Returns:
    Seconds since NTP epoch
    """

class Timestamp:
    """NTP timestamp representation."""
    seconds: int    # Integer seconds component
    fraction: int   # Fractional seconds component (32-bit)

# NTP constants
IMMEDIATELY: bytes  # Special NTP timestamp for immediate execution

SLIP Protocol

Serial Line Internet Protocol implementation for reliable TCP OSC communication (OSC 1.1).

def encode(msg: bytes) -> bytes:
    """Encode message bytes into SLIP packet format.
    
    Parameters:
    - msg: Message bytes to encode
    
    Returns:
    SLIP-encoded packet with proper framing and escaping
    """

def decode(packet: bytes) -> bytes:
    """Decode SLIP packet to retrieve original message.
    
    Parameters:
    - packet: SLIP-encoded packet bytes
    
    Returns:
    Original message bytes
    
    Raises:
    ProtocolError: If packet contains invalid SLIP sequences
    """

def is_valid(packet: bytes) -> bool:
    """Check if packet conforms to SLIP specification.
    
    Parameters:
    - packet: Packet bytes to validate
    
    Returns:
    True if packet is valid SLIP format
    """

# SLIP protocol constants
END: bytes = b"\xc0"        # Frame delimiter
ESC: bytes = b"\xdb"        # Escape character
ESC_END: bytes = b"\xdc"    # Escaped END
ESC_ESC: bytes = b"\xdd"    # Escaped ESC
END_END: bytes = b"\xc0\xc0"  # Double END sequence

Usage Examples

Manual Message Construction

from pythonosc.parsing import osc_types

# Build message datagram manually
address_bytes = osc_types.write_string("/synth/freq")
type_tag_bytes = osc_types.write_string(",f")  # One float argument
freq_bytes = osc_types.write_float(440.0)

# Combine into complete message
message_dgram = address_bytes + type_tag_bytes + freq_bytes
print(f"Message size: {len(message_dgram)} bytes")

Parsing Custom Data Types

from pythonosc.parsing import osc_types

def parse_custom_message(dgram):
    """Parse a message with known structure."""
    index = 0
    
    # Parse address
    address, index = osc_types.get_string(dgram, index)
    print(f"Address: {address}")
    
    # Parse type tag
    type_tag, index = osc_types.get_string(dgram, index)
    print(f"Type tag: {type_tag}")
    
    # Parse arguments based on type tag
    args = []
    for arg_type in type_tag[1:]:  # Skip comma
        if arg_type == 'i':
            value, index = osc_types.get_int(dgram, index)
        elif arg_type == 'f':
            value, index = osc_types.get_float(dgram, index)
        elif arg_type == 's':
            value, index = osc_types.get_string(dgram, index)
        elif arg_type == 'b':
            value, index = osc_types.get_blob(dgram, index)
        elif arg_type == 'd':
            value, index = osc_types.get_double(dgram, index)
        elif arg_type == 'h':
            value, index = osc_types.get_int64(dgram, index)
        elif arg_type == 'm':
            value, index = osc_types.get_midi(dgram, index)
        elif arg_type == 'r':
            value, index = osc_types.get_rgba(dgram, index)
        else:
            print(f"Unknown type: {arg_type}")
            continue
        args.append(value)
    
    return address, args

# Test with sample datagram
sample_dgram = (osc_types.write_string("/test") + 
                osc_types.write_string(",ifs") +
                osc_types.write_int(42) +
                osc_types.write_float(3.14) +
                osc_types.write_string("hello"))

address, args = parse_custom_message(sample_dgram)
print(f"Parsed: {address} -> {args}")

Precise Timing with NTP

from pythonosc.parsing import ntp, osc_types
import time

# Current time as NTP timestamp
current_time = time.time()
ntp_timestamp = ntp.system_time_to_ntp(current_time)
print(f"NTP timestamp: {ntp_timestamp.hex()}")

# Schedule for 2 seconds in the future
future_time = current_time + 2.0
future_ntp = ntp.system_time_to_ntp(future_time)

# Create timetag for bundle
timetag_bytes = osc_types.write_date(future_time)
print(f"Future timetag: {timetag_bytes.hex()}")

# Parse timestamp components
timestamp_int = int.from_bytes(future_ntp, 'big')
parsed = ntp.parse_timestamp(timestamp_int)
print(f"Seconds: {parsed.seconds}, Fraction: {parsed.fraction}")

SLIP Encoding for TCP

from pythonosc import slip
from pythonosc.parsing import osc_types

# Create OSC message
message_data = (osc_types.write_string("/tcp/test") +
                osc_types.write_string(",s") +
                osc_types.write_string("TCP message"))

# Encode for TCP transmission (OSC 1.1)
slip_packet = slip.encode(message_data)
print(f"SLIP packet: {slip_packet.hex()}")

# Validate packet
is_valid = slip.is_valid(slip_packet)
print(f"Valid SLIP packet: {is_valid}")

# Decode received packet
try:
    decoded_message = slip.decode(slip_packet)
    print(f"Decoded message length: {len(decoded_message)}")
    print(f"Original matches decoded: {message_data == decoded_message}")
except slip.ProtocolError as e:
    print(f"SLIP decode error: {e}")

Working with MIDI Data

from pythonosc.parsing import osc_types

# Create MIDI note on message
midi_packet = (0, 0x90, 60, 127)  # Channel 1, Note On, Middle C, Velocity 127
midi_bytes = osc_types.write_midi(midi_packet)

# Create OSC message with MIDI data
message_dgram = (osc_types.write_string("/midi/note") +
                 osc_types.write_string(",m") +
                 midi_bytes)

# Parse MIDI data back
index = len(osc_types.write_string("/midi/note") + osc_types.write_string(",m"))
parsed_midi, _ = osc_types.get_midi(message_dgram, index)

print(f"Original MIDI: {midi_packet}")
print(f"Parsed MIDI: {parsed_midi}")
print(f"Port: {parsed_midi[0]}, Status: 0x{parsed_midi[1]:02x}, Data1: {parsed_midi[2]}, Data2: {parsed_midi[3]}")

Color Data Handling

from pythonosc.parsing import osc_types

# Create RGBA color (red with 50% alpha)
color = (255, 0, 0, 128)
color_bytes = osc_types.write_rgba(color)

# Create message with color
message_dgram = (osc_types.write_string("/light/color") +
                 osc_types.write_string(",r") +
                 color_bytes)

# Parse color back
index = len(osc_types.write_string("/light/color") + osc_types.write_string(",r"))
parsed_color, _ = osc_types.get_rgba(message_dgram, index)

print(f"Original color: RGBA{color}")
print(f"Parsed color: RGBA{parsed_color}")

Binary Data (Blobs)

from pythonosc.parsing import osc_types
import struct

# Create binary data (e.g., audio samples)
audio_samples = struct.pack('>10f', *[0.1 * i for i in range(10)])
blob_bytes = osc_types.write_blob(audio_samples)

# Create message with blob
message_dgram = (osc_types.write_string("/audio/samples") +
                 osc_types.write_string(",b") +
                 blob_bytes)

# Parse blob back
index = len(osc_types.write_string("/audio/samples") + osc_types.write_string(",b"))
parsed_blob, _ = osc_types.get_blob(message_dgram, index)

# Unpack audio samples
parsed_samples = struct.unpack('>10f', parsed_blob)
print(f"Blob size: {len(parsed_blob)} bytes")
print(f"Audio samples: {parsed_samples}")

Performance Optimization

from pythonosc.parsing import osc_types
import itertools

# Pre-compute common type tags for performance
common_types = {
    'f': osc_types.write_string(",f"),
    'i': osc_types.write_string(",i"),
    's': osc_types.write_string(",s"),
    'ff': osc_types.write_string(",ff"),
    'ifs': osc_types.write_string(",ifs"),
}

def fast_build_message(address, args):
    """Optimized message building for known patterns."""
    address_bytes = osc_types.write_string(address)
    
    # Determine type pattern
    type_pattern = ''.join('f' if isinstance(arg, float) else
                          'i' if isinstance(arg, int) else
                          's' if isinstance(arg, str) else 'x' 
                          for arg in args)
    
    # Use pre-computed type tag if available
    if type_pattern in common_types:
        type_tag_bytes = common_types[type_pattern]
    else:
        type_tag_bytes = osc_types.write_string(',' + type_pattern)
    
    # Build argument bytes
    arg_bytes = b''
    for arg in args:
        if isinstance(arg, float):
            arg_bytes += osc_types.write_float(arg)
        elif isinstance(arg, int):
            arg_bytes += osc_types.write_int(arg)
        elif isinstance(arg, str):
            arg_bytes += osc_types.write_string(arg)
    
    return address_bytes + type_tag_bytes + arg_bytes

# Test optimized building
fast_msg = fast_build_message("/fast/test", [440.0, 127, "hello"])
print(f"Fast message size: {len(fast_msg)} bytes")

Types and Exceptions

from typing import Tuple, Union
from datetime import datetime

MidiPacket = Tuple[int, int, int, int]  # (port_id, status_byte, data1, data2)

class ParseError(Exception):
    """Raised when OSC data parsing fails."""

class BuildError(Exception):
    """Raised when OSC data building fails."""

class ProtocolError(ValueError):
    """Raised when SLIP protocol error occurs."""

class NtpError(Exception):
    """Raised when NTP timestamp conversion fails."""

Install with Tessl CLI

npx tessl i tessl/pypi-python-osc

docs

data-types.md

index.md

message-handling.md

servers.md

tcp-networking.md

udp-networking.md

tile.json