Open Sound Control server and client implementations in pure Python for networked music and multimedia applications
—
TCP client implementations for reliable OSC communication with support for OSC 1.0 and 1.1 protocols, SLIP encoding, connection management, synchronous and asynchronous patterns, and automatic reconnection handling.
High-level TCP client with message building helpers and connection management.
class SimpleTCPClient:
"""TCP client with message building helpers."""
def __init__(self, address: str, port: int, family: socket.AddressFamily = socket.AF_INET,
mode: str = "1.1"):
"""Initialize TCP client.
Parameters:
- address: Target IP address or hostname
- port: Target TCP port number
- family: Address family (AF_INET, AF_INET6)
- mode: OSC protocol mode ("1.0" or "1.1")
"""
def send_message(self, address: str, value: ArgValue = ""):
"""Build and send OSC message.
Parameters:
- address: OSC address pattern string
- value: Single argument or list of arguments
"""
def get_messages(self, timeout: int = 30) -> Generator[OscMessage, None, None]:
"""Receive and parse OSC messages.
Parameters:
- timeout: Receive timeout in seconds
Yields:
OscMessage objects from received data
Raises:
socket.timeout: If no data received within timeout
"""
def close(self):
"""Close TCP connection."""Low-level TCP client for sending pre-built messages with context manager support.
class TCPClient:
"""Basic OSC TCP client with context manager support."""
def __init__(self, address: str, port: int, family: socket.AddressFamily = socket.AF_INET,
mode: str = "1.1"):
"""Initialize TCP client.
Parameters:
- address: Target IP address or hostname
- port: Target TCP port number
- family: Address family (AF_INET, AF_INET6)
- mode: OSC protocol mode ("1.0" or "1.1")
"""
def send(self, content: Union[OscMessage, OscBundle]):
"""Send OSC message or bundle.
Parameters:
- content: OscMessage or OscBundle to transmit
Raises:
OSError: If transmission fails
"""
def receive(self, timeout: int = 30) -> Generator[Union[OscMessage, OscBundle], None, None]:
"""Receive and parse OSC messages/bundles.
Parameters:
- timeout: Receive timeout in seconds
Yields:
OscMessage or OscBundle objects from received data
Raises:
socket.timeout: If no data received within timeout
"""
def close(self):
"""Close TCP connection."""
def __enter__(self):
"""Context manager entry."""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit with automatic cleanup."""Async TCP client for non-blocking OSC communication with asyncio integration.
class AsyncTCPClient:
"""Asynchronous OSC TCP client."""
def __init__(self, address: str, port: int, family: socket.AddressFamily = socket.AF_INET,
mode: str = "1.1"):
"""Initialize async TCP client.
Parameters:
- address: Target IP address or hostname
- port: Target TCP port number
- family: Address family (AF_INET, AF_INET6)
- mode: OSC protocol mode ("1.0" or "1.1")
"""
async def send(self, content: Union[OscMessage, OscBundle]):
"""Send OSC message or bundle asynchronously.
Parameters:
- content: OscMessage or OscBundle to transmit
Raises:
OSError: If transmission fails
"""
async def receive(self, timeout: int = 30) -> AsyncGenerator[Union[OscMessage, OscBundle], None]:
"""Receive and parse OSC messages/bundles asynchronously.
Parameters:
- timeout: Receive timeout in seconds
Yields:
OscMessage or OscBundle objects from received data
Raises:
asyncio.TimeoutError: If no data received within timeout
"""
async def close(self):
"""Close TCP connection asynchronously."""
async def __aenter__(self):
"""Async context manager entry."""
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Async context manager exit with automatic cleanup."""Async TCP client with message building helpers for high-level async communication.
class AsyncSimpleTCPClient:
"""Async TCP client with message building helpers."""
def __init__(self, address: str, port: int, family: socket.AddressFamily = socket.AF_INET,
mode: str = "1.1"):
"""Initialize async simple TCP client.
Parameters:
- address: Target IP address or hostname
- port: Target TCP port number
- family: Address family (AF_INET, AF_INET6)
- mode: OSC protocol mode ("1.0" or "1.1")
"""
async def send_message(self, address: str, value: ArgValue = ""):
"""Build and send OSC message asynchronously.
Parameters:
- address: OSC address pattern string
- value: Single argument or list of arguments
"""
async def get_messages(self, timeout: int = 30) -> AsyncGenerator[OscMessage, None]:
"""Receive and parse OSC messages asynchronously.
Parameters:
- timeout: Receive timeout in seconds
Yields:
OscMessage objects from received data
Raises:
asyncio.TimeoutError: If no data received within timeout
"""
async def close(self):
"""Close TCP connection asynchronously."""Async TCP client with dispatcher for automatic response handling.
class AsyncDispatchTCPClient:
"""Async TCP client with dispatcher for handling responses."""
dispatcher: Dispatcher # Class-level dispatcher instance
def __init__(self, address: str, port: int, family: socket.AddressFamily = socket.AF_INET,
mode: str = "1.1"):
"""Initialize async dispatcher TCP client.
Parameters:
- address: Target IP address or hostname
- port: Target TCP port number
- family: Address family (AF_INET, AF_INET6)
- mode: OSC protocol mode ("1.0" or "1.1")
"""
async def send_message(self, address: str, value: ArgValue = ""):
"""Build and send OSC message asynchronously.
Parameters:
- address: OSC address pattern string
- value: Single argument or list of arguments
"""
async def handle_messages(self, timeout: int = 30):
"""Process received messages with dispatcher asynchronously.
Parameters:
- timeout: Receive timeout in seconds
Processes incoming messages using the class dispatcher,
calling mapped handlers for matching addresses.
"""
async def close(self):
"""Close TCP connection asynchronously."""MODE_1_0: str = "1.0" # OSC 1.0 protocol (size-prefixed messages)
MODE_1_1: str = "1.1" # OSC 1.1 protocol (SLIP-encoded messages)from pythonosc import tcp_client
# Create simple TCP client
client = tcp_client.SimpleTCPClient("127.0.0.1", 8000)
try:
# Send messages
client.send_message("/synth/note", [60, 127])
client.send_message("/effect/reverb", 0.3)
client.send_message("/transport/play", True)
# Receive responses
for message in client.get_messages(timeout=5):
print(f"Response: {message.address} -> {message.params}")
finally:
client.close()from pythonosc import tcp_client
from pythonosc.osc_message_builder import OscMessageBuilder
# Automatic connection management
with tcp_client.TCPClient("192.168.1.100", 9000) as client:
# Send pre-built message
builder = OscMessageBuilder("/mixer/channel/1/volume")
builder.add_arg(0.8)
message = builder.build()
client.send(message)
# Receive responses
for response in client.receive(timeout=10):
if hasattr(response, 'address'): # OscMessage
print(f"Message: {response.address} {response.params}")
else: # OscBundle
print(f"Bundle with {response.num_contents} items")from pythonosc import tcp_client
# OSC 1.0 mode (size-prefixed messages)
client_10 = tcp_client.SimpleTCPClient("127.0.0.1", 8000, mode="1.0")
client_10.send_message("/osc10/test", "OSC 1.0 message")
# OSC 1.1 mode (SLIP-encoded messages) - default
client_11 = tcp_client.SimpleTCPClient("127.0.0.1", 8001, mode="1.1")
client_11.send_message("/osc11/test", "OSC 1.1 message")
client_10.close()
client_11.close()import asyncio
from pythonosc import tcp_client
async def async_communication():
# Create async client
client = tcp_client.AsyncTCPClient("127.0.0.1", 8000)
try:
# Send message asynchronously
await client.send_message("/async/test", ["hello", "world"])
# Receive messages asynchronously
async for message in client.receive(timeout=5):
print(f"Async response: {message.address} -> {message.params}")
finally:
await client.close()
# Run async function
asyncio.run(async_communication())import asyncio
from pythonosc import tcp_client
async def async_context_example():
async with tcp_client.AsyncTCPClient("127.0.0.1", 8000) as client:
# Send multiple messages
await client.send_message("/async/start", True)
await client.send_message("/async/data", [1, 2, 3, 4, 5])
await client.send_message("/async/end", False)
# Process responses
message_count = 0
async for message in client.receive(timeout=10):
print(f"Message {message_count}: {message.address}")
message_count += 1
if message_count >= 3: # Stop after 3 messages
break
asyncio.run(async_context_example())import asyncio
from pythonosc import tcp_client
from pythonosc.dispatcher import Dispatcher
# Set up async response handlers
async def handle_async_ack(address, *args):
print(f"Async ACK: {address} {args}")
async def handle_async_data(address, *args):
print(f"Async data: {address} {args}")
async def async_dispatcher_example():
# Configure class dispatcher
tcp_client.AsyncDispatchTCPClient.dispatcher = Dispatcher()
tcp_client.AsyncDispatchTCPClient.dispatcher.map("/ack", handle_async_ack)
tcp_client.AsyncDispatchTCPClient.dispatcher.map("/data", handle_async_data)
# Create dispatcher client
client = tcp_client.AsyncDispatchTCPClient("127.0.0.1", 8000)
try:
# Send request
await client.send_message("/request/sensor_data", "temperature")
# Handle responses automatically
await client.handle_messages(timeout=5)
finally:
await client.close()
asyncio.run(async_dispatcher_example())from pythonosc import tcp_client
import socket
# IPv6 TCP client
with tcp_client.TCPClient("::1", 8000, family=socket.AF_INET6) as client:
client.send_message("/ipv6/test", "Hello IPv6 TCP")
for message in client.receive(timeout=5):
print(f"IPv6 response: {message.address}")from pythonosc import tcp_client
import time
import socket
def reliable_tcp_communication():
client = None
max_retries = 3
retry_count = 0
while retry_count < max_retries:
try:
client = tcp_client.SimpleTCPClient("127.0.0.1", 8000)
# Send message
client.send_message("/reliable/test", retry_count)
# Try to receive response
for message in client.get_messages(timeout=5):
print(f"Success: {message.address} {message.params}")
return # Success, exit function
except (socket.error, ConnectionRefusedError, socket.timeout) as e:
print(f"Attempt {retry_count + 1} failed: {e}")
retry_count += 1
if client:
client.close()
client = None
if retry_count < max_retries:
time.sleep(2 ** retry_count) # Exponential backoff
finally:
if client:
client.close()
print("All retry attempts failed")
reliable_tcp_communication()from pythonosc import tcp_client
from pythonosc.osc_bundle_builder import OscBundleBuilder, IMMEDIATELY
# Send multiple messages efficiently
with tcp_client.TCPClient("127.0.0.1", 8000) as client:
# Create bundle for batch sending
bundle_builder = OscBundleBuilder(IMMEDIATELY)
# Add multiple messages
for i in range(10):
msg_builder = OscMessageBuilder(f"/batch/message/{i}")
msg_builder.add_arg(i * 10)
msg_builder.add_arg(f"Message {i}")
bundle_builder.add_content(msg_builder.build())
# Send entire batch as single bundle
batch_bundle = bundle_builder.build()
client.send(batch_bundle)
print("Sent batch of 10 messages in single bundle")from typing import Union, Generator, AsyncGenerator
import socket
import asyncio
from pythonosc.osc_message import OscMessage
from pythonosc.osc_bundle import OscBundle
from pythonosc.osc_message_builder import ArgValue
from pythonosc.dispatcher import DispatcherInstall with Tessl CLI
npx tessl i tessl/pypi-python-osc