Open Sound Control server and client implementations in pure Python for networked music and multimedia applications
—
Complete OSC server implementations supporting UDP and TCP protocols with multiple concurrency models (blocking, threading, forking, asyncio), flexible message dispatching, and comprehensive handler management for building robust OSC services.
Core dispatcher system for routing OSC messages to handler functions based on address patterns.
class Dispatcher:
"""Maps OSC addresses to handler functions and dispatches messages."""
def __init__(self):
"""Initialize empty dispatcher."""
def map(self, address: str, handler: Callable, *args, needs_reply_address: bool = False):
"""Map OSC address pattern to handler function.
Parameters:
- address: OSC address pattern (supports wildcards like /synth/*)
- handler: Function to call when address matches
- args: Fixed arguments to pass to handler before OSC arguments
- needs_reply_address: Whether to pass client address to handler
"""
def unmap(self, address: str, handler: Callable, *args, needs_reply_address: bool = False):
"""Remove mapped handler from address pattern.
Parameters:
- address: OSC address pattern to unmap
- handler: Handler function to remove
- args: Fixed arguments that were mapped
- needs_reply_address: Must match original mapping
"""
def handlers_for_address(self, address_pattern: str) -> Generator[Handler, None, None]:
"""Get handlers matching address pattern.
Parameters:
- address_pattern: OSC address to match against
Yields:
Handler objects that match the address pattern
"""
def call_handlers_for_packet(self, data: bytes, client_address: Tuple[str, int]) -> List:
"""Invoke handlers for all messages in OSC packet.
Parameters:
- data: Raw OSC packet datagram
- client_address: Client IP address and port tuple
Returns:
List of handler return values for response messages
"""
async def async_call_handlers_for_packet(self, data: bytes, client_address: Tuple[str, int]) -> List:
"""Asynchronously invoke handlers for all messages in OSC packet.
Parameters:
- data: Raw OSC packet datagram
- client_address: Client IP address and port tuple
Returns:
List of handler return values for response messages
"""
def set_default_handler(self, handler: Callable, needs_reply_address: bool = False):
"""Set default handler for unmatched addresses.
Parameters:
- handler: Function to call for unmatched addresses
- needs_reply_address: Whether to pass client address to handler
"""
class Handler:
"""Wrapper for callback functions mapped to OSC addresses."""
def __init__(self, callback: Callable, args: Union[Any, List[Any]],
needs_reply_address: bool = False):
"""Initialize handler wrapper.
Parameters:
- callback: Function to call when invoked
- args: Fixed arguments to pass to callback
- needs_reply_address: Whether to pass client address
"""
def invoke(self, client_address: Tuple[str, int], message: OscMessage) -> Union[None, str, Tuple]:
"""Invoke the handler callback.
Parameters:
- client_address: Client IP and port
- message: OSC message that triggered invocation
Returns:
None, OSC address string, or (address, args) tuple for responses
"""UDP server implementations with multiple concurrency models for different performance requirements.
class OSCUDPServer:
"""Base UDP server class."""
def __init__(self, server_address: Tuple[str, int], dispatcher: Dispatcher,
bind_and_activate: bool = True):
"""Initialize UDP server.
Parameters:
- server_address: (IP, port) tuple to bind to
- dispatcher: Dispatcher for routing messages
- bind_and_activate: Whether to bind and activate immediately
"""
@property
def dispatcher(self) -> Dispatcher:
"""Access to message dispatcher."""
def verify_request(self, request, client_address) -> bool:
"""Validate incoming requests.
Parameters:
- request: Raw request data
- client_address: Client address tuple
Returns:
True if request should be processed
"""
class BlockingOSCUDPServer(OSCUDPServer):
"""Sequential UDP server handling one message at a time."""
def serve_forever(self):
"""Start server and handle requests sequentially."""
class ThreadingOSCUDPServer(OSCUDPServer):
"""UDP server spawning thread per message for concurrent handling."""
def serve_forever(self):
"""Start server with thread-per-message handling."""
class ForkingOSCUDPServer(OSCUDPServer):
"""UDP server spawning process per message (Unix only)."""
def serve_forever(self):
"""Start server with process-per-message handling."""
class AsyncIOOSCUDPServer:
"""Asynchronous UDP server using asyncio event loop."""
def __init__(self, server_address: Tuple[str, int], dispatcher: Dispatcher, loop: asyncio.BaseEventLoop):
"""Initialize async UDP server.
Parameters:
- server_address: (IP, port) tuple to bind to
- dispatcher: Dispatcher for routing messages
- loop: asyncio event loop to use for the server
"""
@property
def dispatcher(self) -> Dispatcher:
"""Access to message dispatcher."""
def serve(self):
"""Start server and run forever asynchronously."""
def create_serve_endpoint(self):
"""Create datagram endpoint coroutine for integration."""TCP server implementations supporting OSC 1.0 and 1.1 protocols with connection-based handling.
class OSCTCPServer:
"""Base TCP server class supporting OSC 1.0 and 1.1 protocols."""
def __init__(self, server_address: Tuple[str, int], dispatcher: Dispatcher, mode: str = "1.1"):
"""Initialize TCP server.
Parameters:
- server_address: (IP, port) tuple to bind to
- dispatcher: Dispatcher for routing messages
- mode: OSC protocol mode ("1.0" or "1.1")
"""
@property
def dispatcher(self) -> Dispatcher:
"""Access to message dispatcher."""
class BlockingOSCTCPServer(OSCTCPServer):
"""Sequential TCP server handling one connection at a time."""
def serve_forever(self):
"""Start server and handle connections sequentially."""
class ThreadingOSCTCPServer(OSCTCPServer):
"""TCP server spawning thread per connection for concurrent handling."""
def serve_forever(self):
"""Start server with thread-per-connection handling."""
class ForkingOSCTCPServer(OSCTCPServer):
"""TCP server spawning process per connection (Unix only)."""
def serve_forever(self):
"""Start server with process-per-connection handling."""
class AsyncOSCTCPServer:
"""Asynchronous TCP server with asyncio integration."""
def __init__(self, server_address: str, port: int, dispatcher: Dispatcher, mode: str = "1.1"):
"""Initialize async TCP server.
Parameters:
- server_address: IP address to bind to
- port: TCP port to bind to
- dispatcher: Dispatcher for routing messages
- mode: OSC protocol mode ("1.0" or "1.1")
"""
@property
def dispatcher(self) -> Dispatcher:
"""Access to message dispatcher."""
async def start(self):
"""Start server coroutine."""
async def stop(self):
"""Stop server and close connections."""
async def handle(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
"""Handle individual client connections.
Parameters:
- reader: asyncio stream reader for receiving data
- writer: asyncio stream writer for sending responses
"""from pythonosc.dispatcher import Dispatcher
from pythonosc import osc_server
import threading
# Define message handlers
def volume_handler(address, *args):
print(f"Volume: {args[0]}")
def filter_handler(address, *args):
print(f"Filter: {args[0]}")
def default_handler(address, *args):
print(f"Unhandled: {address} {args}")
# Set up dispatcher
dispatcher = Dispatcher()
dispatcher.map("/volume", volume_handler)
dispatcher.map("/filter", filter_handler)
dispatcher.set_default_handler(default_handler)
# Create and start server
server = osc_server.ThreadingOSCUDPServer(("127.0.0.1", 5005), dispatcher)
print(f"OSC Server running on {server.server_address}")
# Run server in background thread
server_thread = threading.Thread(target=server.serve_forever)
server_thread.daemon = True
server_thread.start()
# Server runs until program exitsfrom pythonosc.dispatcher import Dispatcher
from pythonosc import osc_server
# Handler with fixed arguments
def parametric_handler(address, channel, *osc_args):
print(f"Channel {channel}: {address} -> {osc_args}")
# Handler needing client address for responses
def response_handler(client_address, address, *args):
client_ip, client_port = client_address
print(f"Request from {client_ip}:{client_port}: {address} {args}")
return ("/ack", f"processed_{args[0]}") # Return response
# Wildcard pattern handler
def synth_handler(address, *args):
# Handle all /synth/* addresses
param = address.split('/')[-1] # Get last part of address
print(f"Synth parameter {param}: {args}")
# Set up complex dispatcher
dispatcher = Dispatcher()
# Map with fixed arguments
dispatcher.map("/mixer/channel/1", parametric_handler, "Channel 1")
dispatcher.map("/mixer/channel/2", parametric_handler, "Channel 2")
# Map handler that needs client address
dispatcher.map("/request/*", response_handler, needs_reply_address=True)
# Map wildcard patterns
dispatcher.map("/synth/*", synth_handler)
# Create server
server = osc_server.ThreadingOSCUDPServer(("0.0.0.0", 8000), dispatcher)
server.serve_forever()from pythonosc.dispatcher import Dispatcher
from pythonosc import osc_tcp_server
import threading
def message_handler(address, *args):
print(f"TCP: {address} -> {args}")
# Set up dispatcher
dispatcher = Dispatcher()
dispatcher.map("/*", message_handler) # Handle all messages
# OSC 1.1 server (SLIP-encoded, default)
server_11 = osc_tcp_server.ThreadingOSCTCPServer(("127.0.0.1", 9000), dispatcher, mode="1.1")
thread_11 = threading.Thread(target=server_11.serve_forever)
thread_11.daemon = True
thread_11.start()
# OSC 1.0 server (size-prefixed)
server_10 = osc_tcp_server.ThreadingOSCTCPServer(("127.0.0.1", 9001), dispatcher, mode="1.0")
thread_10 = threading.Thread(target=server_10.serve_forever)
thread_10.daemon = True
thread_10.start()
print("TCP servers running on ports 9000 (OSC 1.1) and 9001 (OSC 1.0)")import asyncio
from pythonosc.dispatcher import Dispatcher
from pythonosc import osc_server
async def async_handler(address, *args):
print(f"Async handler: {address} -> {args}")
# Can perform async operations here
await asyncio.sleep(0.1)
# Set up dispatcher with async handler
dispatcher = Dispatcher()
dispatcher.map("/async/*", async_handler)
async def run_async_server():
# Get current event loop
loop = asyncio.get_running_loop()
# Create async UDP server
server = osc_server.AsyncIOOSCUDPServer(("127.0.0.1", 6000), dispatcher, loop)
# Start server
await server.serve()
# Run async server
asyncio.run(run_async_server())import asyncio
from pythonosc.dispatcher import Dispatcher
from pythonosc import osc_tcp_server
async def async_tcp_handler(address, *args):
print(f"Async TCP: {address} -> {args}")
return ("/response", f"processed_{len(args)}_args")
async def run_async_tcp_server():
# Set up dispatcher
dispatcher = Dispatcher()
dispatcher.map("/*", async_tcp_handler)
# Create and start async TCP server
server = osc_tcp_server.AsyncOSCTCPServer("127.0.0.1", 9002, dispatcher)
await server.start()
print("Async TCP server started on port 9002")
# Keep server running
try:
while True:
await asyncio.sleep(1)
except KeyboardInterrupt:
await server.stop()
print("Server stopped")
asyncio.run(run_async_tcp_server())from pythonosc.dispatcher import Dispatcher
from pythonosc import osc_server, osc_tcp_server
import threading
import time
# Shared message handler
def unified_handler(address, *args):
print(f"Received: {address} -> {args}")
# Create shared dispatcher
dispatcher = Dispatcher()
dispatcher.map("/*", unified_handler)
# Start UDP server
udp_server = osc_server.ThreadingOSCUDPServer(("0.0.0.0", 5005), dispatcher)
udp_thread = threading.Thread(target=udp_server.serve_forever)
udp_thread.daemon = True
udp_thread.start()
# Start TCP servers
tcp_server_10 = osc_tcp_server.ThreadingOSCTCPServer(("0.0.0.0", 8000), dispatcher, mode="1.0")
tcp_thread_10 = threading.Thread(target=tcp_server_10.serve_forever)
tcp_thread_10.daemon = True
tcp_thread_10.start()
tcp_server_11 = osc_tcp_server.ThreadingOSCTCPServer(("0.0.0.0", 8001), dispatcher, mode="1.1")
tcp_thread_11 = threading.Thread(target=tcp_server_11.serve_forever)
tcp_thread_11.daemon = True
tcp_thread_11.start()
print("Multi-protocol server running:")
print(" UDP on port 5005")
print(" TCP OSC 1.0 on port 8000")
print(" TCP OSC 1.1 on port 8001")
# Keep main thread alive
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
print("Shutting down servers...")from pythonosc.dispatcher import Dispatcher
from pythonosc import osc_server
def request_handler(client_address, address, *args):
"""Handler that sends responses back to client."""
client_ip, client_port = client_address
if address == "/get/status":
return ("/status", "running", 1.0)
elif address == "/get/info":
return ("/info", ["server", "v1.0", client_ip])
elif address == "/echo":
return ("/echo_response", *args)
else:
return ("/error", f"unknown_request: {address}")
# Set up dispatcher with response handler
dispatcher = Dispatcher()
dispatcher.map("/get/*", request_handler, needs_reply_address=True)
dispatcher.map("/echo", request_handler, needs_reply_address=True)
# Create server that can send responses
server = osc_server.ThreadingOSCUDPServer(("127.0.0.1", 7000), dispatcher)
print("Response server running on port 7000")
server.serve_forever()from pythonosc.dispatcher import Dispatcher
from pythonosc import osc_server
import time
import threading
class MonitoringHandler:
def __init__(self):
self.message_count = 0
self.start_time = time.time()
def handle_message(self, address, *args):
self.message_count += 1
if self.message_count % 1000 == 0:
elapsed = time.time() - self.start_time
rate = self.message_count / elapsed
print(f"Processed {self.message_count} messages ({rate:.1f} msg/sec)")
def get_stats(self, address, *args):
elapsed = time.time() - self.start_time
rate = self.message_count / elapsed if elapsed > 0 else 0
return ("/stats", self.message_count, elapsed, rate)
# Set up monitoring
monitor = MonitoringHandler()
dispatcher = Dispatcher()
dispatcher.map("/*", monitor.handle_message)
dispatcher.map("/get/stats", monitor.get_stats, needs_reply_address=True)
# High-performance server
server = osc_server.ThreadingOSCUDPServer(("0.0.0.0", 5555), dispatcher)
print("Monitoring server running on port 5555")
print("Send messages to any address, or /get/stats for statistics")
server.serve_forever()from typing import Callable, Union, List, Tuple, Generator, Any
import threading
import asyncio
from pythonosc.osc_message import OscMessage
from pythonosc.dispatcher import Dispatcher, HandlerInstall with Tessl CLI
npx tessl i tessl/pypi-python-osc