CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-google-cloud-texttospeech

Google Cloud Texttospeech API client library for converting text to speech with multiple voices and audio formats

Pending
Overview
Eval results
Files

streaming-synthesis.mddocs/

Streaming Synthesis

Overview

Streaming synthesis enables real-time, bidirectional audio generation where text can be sent incrementally and audio is received as it's generated. This is ideal for interactive applications like chatbots, live assistants, and real-time communication systems where low latency is crucial.

Core Streaming Operations

Basic Streaming Setup

from google.cloud import texttospeech

# Initialize client for streaming
client = texttospeech.TextToSpeechClient()

# Configure streaming synthesis
config = texttospeech.StreamingSynthesizeConfig(
    voice=texttospeech.VoiceSelectionParams(
        language_code="en-US",
        name="en-US-Neural2-A"
    ),
    audio_config=texttospeech.StreamingAudioConfig(
        audio_encoding=texttospeech.AudioEncoding.LINEAR16,
        sample_rate_hertz=22050
    )
)

# Create streaming request iterator
def create_streaming_requests():
    # First request with configuration
    yield texttospeech.StreamingSynthesizeRequest(streaming_config=config)
    
    # Input requests
    yield texttospeech.StreamingSynthesizeRequest(
        input=texttospeech.StreamingSynthesisInput(text="Hello, ")
    )
    yield texttospeech.StreamingSynthesizeRequest(
        input=texttospeech.StreamingSynthesisInput(text="this is streaming synthesis.")
    )

# Perform streaming synthesis
streaming_responses = client.streaming_synthesize(create_streaming_requests())

# Process responses
for response in streaming_responses:
    if response.audio_content:
        # Handle audio chunks as they arrive
        print(f"Received audio chunk: {len(response.audio_content)} bytes")
        # Process or play audio chunk immediately

Streaming with SSML

from google.cloud.texttospeech import (
    StreamingSynthesizeRequest,
    StreamingSynthesizeConfig, 
    StreamingSynthesisInput,
    StreamingAudioConfig
)

def streaming_ssml_synthesis():
    """Stream SSML content with markup."""
    client = texttospeech.TextToSpeechClient()
    
    # Configure for SSML streaming
    config = StreamingSynthesizeConfig(
        voice=texttospeech.VoiceSelectionParams(
            language_code="en-US",
            name="en-US-Wavenet-D"
        ),
        audio_config=StreamingAudioConfig(
            audio_encoding=texttospeech.AudioEncoding.LINEAR16,
            sample_rate_hertz=24000
        )
    )
    
    def request_generator():
        # Configuration request
        yield StreamingSynthesizeRequest(streaming_config=config)
        
        # SSML input chunks
        ssml_parts = [
            '<speak><prosody rate="slow">Hello there!</prosody>',
            '<break time="1s"/>',
            '<prosody pitch="+5st">This is exciting!</prosody>',
            '</speak>'
        ]
        
        for ssml_part in ssml_parts:
            yield StreamingSynthesizeRequest(
                input=StreamingSynthesisInput(markup=ssml_part)
            )
    
    # Stream and collect audio
    responses = client.streaming_synthesize(request_generator())
    
    audio_chunks = []
    for response in responses:
        if response.audio_content:
            audio_chunks.append(response.audio_content)
    
    return b''.join(audio_chunks)

# Usage
streaming_audio = streaming_ssml_synthesis()

Configuration Classes

StreamingSynthesizeConfig

from google.cloud.texttospeech import (
    StreamingSynthesizeConfig,
    VoiceSelectionParams,
    StreamingAudioConfig,
    AudioEncoding
)

# Complete streaming configuration
streaming_config = StreamingSynthesizeConfig(
    voice=VoiceSelectionParams(
        language_code="en-US",
        name="en-US-Neural2-C",
        ssml_gender=texttospeech.SsmlVoiceGender.FEMALE
    ),
    audio_config=StreamingAudioConfig(
        audio_encoding=AudioEncoding.LINEAR16,
        sample_rate_hertz=22050,
        speaking_rate=1.1,              # Optional: speech rate
        pitch=2.0,                      # Optional: pitch adjustment
        volume_gain_db=1.5              # Optional: volume gain
    )
)

# Streaming config with advanced voice options
streaming_config = StreamingSynthesizeConfig(
    voice=VoiceSelectionParams(
        language_code="en-US",
        name="en-US-Neural2-A",
        advanced_voice_options=texttospeech.AdvancedVoiceOptions(
            low_latency_journey_synthesis=True  # Enable low latency
        )
    ),
    audio_config=StreamingAudioConfig(
        audio_encoding=AudioEncoding.LINEAR16,
        sample_rate_hertz=16000  # Lower rate for reduced latency
    )
)

StreamingAudioConfig

class StreamingAudioConfig:
    """Description of the desired output audio data for streaming.
    
    Parameters:
    - audio_encoding (AudioEncoding): Required. Format of audio byte stream.
        Streaming supports PCM, ALAW, MULAW and OGG_OPUS only.
    - sample_rate_hertz (int): Optional. Synthesis sample rate in hertz.
    - speaking_rate (float): Optional. Speaking rate/speed in range [0.25, 2.0].
        1.0 is normal speed, 2.0 is twice as fast, 0.5 is half speed.
    """
    def __init__(self, audio_encoding, sample_rate_hertz=None, speaking_rate=None): ...
from google.cloud.texttospeech import StreamingAudioConfig, AudioEncoding

# Basic streaming audio configuration
audio_config = StreamingAudioConfig(
    audio_encoding=AudioEncoding.LINEAR16,     # Required: audio format
    sample_rate_hertz=22050                    # Optional: sample rate
)

# Advanced streaming audio configuration
audio_config = StreamingAudioConfig(
    audio_encoding=AudioEncoding.OGG_OPUS,     # Compressed format for streaming
    sample_rate_hertz=48000,
    speaking_rate=0.9                          # Slightly slower speech
)

# Low-latency configuration
low_latency_config = StreamingAudioConfig(
    audio_encoding=AudioEncoding.LINEAR16,
    sample_rate_hertz=16000,                   # Lower sample rate
    speaking_rate=1.0                          # Normal rate
)

StreamingSynthesisInput

class StreamingSynthesisInput:
    """Input to be synthesized in streaming requests.
    
    This uses oneof fields - only one can be set at a time.
    
    Parameters:
    - text (str): Raw text to be synthesized. Recommended to use complete sentences.
    - markup (str): Markup for HD voices specifically. Cannot be used with other voices.
    - prompt (str): System instruction for controllable voice models only.
    """
    def __init__(self, text=None, markup=None, prompt=None): ...
from google.cloud.texttospeech import StreamingSynthesisInput

# Text input for streaming
text_input = StreamingSynthesisInput(
    text="This is a chunk of text to be synthesized."
)

# Markup input for streaming (HD voices only)
markup_input = StreamingSynthesisInput(
    markup="Markup content for HD voices specifically."
)

# Prompt input for controllable voice models
prompt_input = StreamingSynthesisInput(
    prompt="System instruction for controllable voice models."
)

# Note: StreamingSynthesisInput uses oneof fields - only one can be set at a time

Request and Response Types

StreamingSynthesizeRequest

class StreamingSynthesizeRequest:
    """Request message for StreamingSynthesize method.
    
    Uses oneof fields - only one can be set at a time.
    First message must contain streaming_config, subsequent messages contain input.
    
    Parameters:
    - streaming_config (StreamingSynthesizeConfig): Configuration for first request only.
    - input (StreamingSynthesisInput): Input text/markup for subsequent requests.
    """
    def __init__(self, streaming_config=None, input=None): ...
from google.cloud.texttospeech import (
    StreamingSynthesizeRequest,
    StreamingSynthesizeConfig,
    StreamingSynthesisInput
)

# Configuration request (first request in stream)
config_request = StreamingSynthesizeRequest(
    streaming_config=StreamingSynthesizeConfig(
        voice=texttospeech.VoiceSelectionParams(language_code="en-US"),
        audio_config=texttospeech.StreamingAudioConfig(
            audio_encoding=texttospeech.AudioEncoding.LINEAR16,
            sample_rate_hertz=22050
        )
    )
)

# Input request (subsequent requests)
input_request = StreamingSynthesizeRequest(
    input=StreamingSynthesisInput(text="Text to synthesize")
)

# Markup input request (for HD voices)
markup_request = StreamingSynthesizeRequest(
    input=StreamingSynthesisInput(
        markup='Markup content with specific formatting for HD voices'
    )
)

StreamingSynthesizeResponse

from google.cloud.texttospeech import StreamingSynthesizeResponse

# Response processing
def process_streaming_response(response: StreamingSynthesizeResponse):
    """Process individual streaming response."""
    
    # Check for audio content
    if response.audio_content:
        audio_size = len(response.audio_content)
        print(f"Received audio chunk: {audio_size} bytes")
        return response.audio_content
    
    # Handle other response fields
    if hasattr(response, 'error') and response.error:
        print(f"Streaming error: {response.error}")
    
    return None

# Example response handling
def handle_streaming_responses(response_iterator):
    """Handle complete streaming response sequence."""
    audio_chunks = []
    total_chunks = 0
    total_bytes = 0
    
    for response in response_iterator:
        audio_chunk = process_streaming_response(response)
        if audio_chunk:
            audio_chunks.append(audio_chunk)
            total_chunks += 1
            total_bytes += len(audio_chunk)
    
    print(f"Streaming complete: {total_chunks} chunks, {total_bytes} bytes total")
    return b''.join(audio_chunks)

Practical Streaming Examples

Real-Time Text Processing

import threading
import queue
import time
from google.cloud import texttospeech

class RealTimeTextToSpeech:
    """Real-time text-to-speech streaming processor."""
    
    def __init__(self, language_code="en-US", voice_name=None):
        self.client = texttospeech.TextToSpeechClient()
        self.text_queue = queue.Queue()
        self.audio_queue = queue.Queue()
        self.is_running = False
        
        # Configure streaming
        self.config = texttospeech.StreamingSynthesizeConfig(
            voice=texttospeech.VoiceSelectionParams(
                language_code=language_code,
                name=voice_name or "en-US-Neural2-A",
                advanced_voice_options=texttospeech.AdvancedVoiceOptions(
                    low_latency_journey_synthesis=True
                )
            ),
            audio_config=texttospeech.StreamingAudioConfig(
                audio_encoding=texttospeech.AudioEncoding.LINEAR16,
                sample_rate_hertz=16000  # Lower rate for real-time
            )
        )
    
    def start_streaming(self):
        """Start the streaming synthesis thread."""
        self.is_running = True
        self.streaming_thread = threading.Thread(target=self._stream_worker)
        self.streaming_thread.start()
    
    def stop_streaming(self):
        """Stop streaming synthesis."""
        self.is_running = False
        self.text_queue.put(None)  # Sentinel to end stream
        if hasattr(self, 'streaming_thread'):
            self.streaming_thread.join()
    
    def add_text(self, text: str):
        """Add text to synthesis queue."""
        if self.is_running:
            self.text_queue.put(text)
    
    def get_audio(self, timeout: float = 1.0):
        """Get synthesized audio chunk."""
        try:
            return self.audio_queue.get(timeout=timeout)
        except queue.Empty:
            return None
    
    def _stream_worker(self):
        """Background streaming worker."""
        def request_generator():
            # Send configuration first
            yield texttospeech.StreamingSynthesizeRequest(
                streaming_config=self.config
            )
            
            # Send text inputs as they arrive
            while self.is_running:
                try:
                    text = self.text_queue.get(timeout=1.0)
                    if text is None:  # Sentinel to end
                        break
                    
                    yield texttospeech.StreamingSynthesizeRequest(
                        input=texttospeech.StreamingSynthesisInput(text=text)
                    )
                except queue.Empty:
                    continue
        
        try:
            # Start streaming
            responses = self.client.streaming_synthesize(request_generator())
            
            # Process responses
            for response in responses:
                if response.audio_content and self.is_running:
                    self.audio_queue.put(response.audio_content)
        
        except Exception as e:
            print(f"Streaming error: {e}")
        finally:
            self.audio_queue.put(None)  # Signal end of audio

# Usage example
tts_stream = RealTimeTextToSpeech()
tts_stream.start_streaming()

# Add text for synthesis
tts_stream.add_text("Hello, this is real-time synthesis.")
tts_stream.add_text("Each text chunk is processed immediately.")
tts_stream.add_text("Great for interactive applications!")

# Collect audio chunks
audio_chunks = []
while True:
    audio_chunk = tts_stream.get_audio()
    if audio_chunk is None:
        break
    audio_chunks.append(audio_chunk)
    print(f"Got audio chunk: {len(audio_chunk)} bytes")

tts_stream.stop_streaming()

# Combine all audio
complete_audio = b''.join(audio_chunks)
with open("realtime_output.wav", "wb") as f:
    f.write(complete_audio)

Interactive Conversation Streaming

import asyncio
from google.cloud import texttospeech

class ConversationSynthesizer:
    """Interactive conversation streaming synthesis."""
    
    def __init__(self):
        self.client = texttospeech.TextToSpeechClient()
    
    def synthesize_conversation(self, conversation_parts: list, output_file: str):
        """Synthesize conversation with different voices for different speakers."""
        
        # Voice configurations for different speakers
        speaker_configs = {
            "speaker1": texttospeech.StreamingSynthesizeConfig(
                voice=texttospeech.VoiceSelectionParams(
                    language_code="en-US",
                    name="en-US-Neural2-A"  # Female voice
                ),
                audio_config=texttospeech.StreamingAudioConfig(
                    audio_encoding=texttospeech.AudioEncoding.LINEAR16,
                    sample_rate_hertz=22050
                )
            ),
            "speaker2": texttospeech.StreamingSynthesizeConfig(
                voice=texttospeech.VoiceSelectionParams(
                    language_code="en-US", 
                    name="en-US-Neural2-C"  # Male voice
                ),
                audio_config=texttospeech.StreamingAudioConfig(
                    audio_encoding=texttospeech.AudioEncoding.LINEAR16,
                    sample_rate_hertz=22050
                )
            )
        }
        
        all_audio_chunks = []
        
        # Process each speaker separately for voice consistency
        for speaker_id, config in speaker_configs.items():
            speaker_parts = [part for part in conversation_parts 
                           if part.get('speaker') == speaker_id]
            
            if not speaker_parts:
                continue
            
            def request_generator():
                # Configuration
                yield texttospeech.StreamingSynthesizeRequest(
                    streaming_config=config
                )
                
                # Speaker's dialogue parts
                for part in speaker_parts:
                    yield texttospeech.StreamingSynthesizeRequest(
                        input=texttospeech.StreamingSynthesisInput(
                            text=part['text']
                        )
                    )
            
            # Collect audio for this speaker
            responses = self.client.streaming_synthesize(request_generator())
            speaker_audio = []
            
            for response in responses:
                if response.audio_content:
                    speaker_audio.append(response.audio_content)
            
            # Store with timing information
            for i, part in enumerate(speaker_parts):
                part['audio_data'] = b''.join(speaker_audio) if i == 0 else b''
        
        # Reconstruct conversation in original order
        final_audio = []
        for part in conversation_parts:
            if 'audio_data' in part and part['audio_data']:
                final_audio.append(part['audio_data'])
        
        # Save complete conversation
        with open(output_file, "wb") as f:
            f.write(b''.join(final_audio))
        
        return output_file

# Usage example
conversation = [
    {"speaker": "speaker1", "text": "Hello! How are you today?"},
    {"speaker": "speaker2", "text": "I'm doing great, thanks for asking!"},
    {"speaker": "speaker1", "text": "That's wonderful to hear. What are your plans?"},
    {"speaker": "speaker2", "text": "I'm planning to work on some exciting projects."}
]

synthesizer = ConversationSynthesizer()
output_file = synthesizer.synthesize_conversation(conversation, "conversation.wav")
print(f"Conversation saved to {output_file}")

Chunked Text Streaming

def stream_long_text(text: str, chunk_size: int = 100):
    """Stream long text by breaking it into manageable chunks."""
    import re
    
    client = texttospeech.TextToSpeechClient()
    
    # Configure streaming for long content
    config = texttospeech.StreamingSynthesizeConfig(
        voice=texttospeech.VoiceSelectionParams(
            language_code="en-US",
            name="en-US-Wavenet-A"
        ),
        audio_config=texttospeech.StreamingAudioConfig(
            audio_encoding=texttospeech.AudioEncoding.LINEAR16,
            sample_rate_hertz=22050
        )
    )
    
    # Smart text chunking (respect sentence boundaries)
    def smart_chunk_text(text: str, max_size: int):
        """Break text into chunks at sentence boundaries when possible."""
        sentences = re.split(r'(?<=[.!?])\s+', text)
        chunks = []
        current_chunk = ""
        
        for sentence in sentences:
            if len(current_chunk + sentence) <= max_size:
                current_chunk += sentence + " "
            else:
                if current_chunk:
                    chunks.append(current_chunk.strip())
                current_chunk = sentence + " "
        
        if current_chunk:
            chunks.append(current_chunk.strip())
        
        return chunks
    
    # Create text chunks
    text_chunks = smart_chunk_text(text, chunk_size)
    
    def request_generator():
        # Configuration request
        yield texttospeech.StreamingSynthesizeRequest(streaming_config=config)
        
        # Send text chunks
        for i, chunk in enumerate(text_chunks):
            print(f"Streaming chunk {i+1}/{len(text_chunks)}: {len(chunk)} chars")
            yield texttospeech.StreamingSynthesizeRequest(
                input=texttospeech.StreamingSynthesisInput(text=chunk)
            )
    
    # Stream and collect results
    responses = client.streaming_synthesize(request_generator())
    
    audio_chunks = []
    chunk_count = 0
    
    for response in responses:
        if response.audio_content:
            chunk_count += 1
            audio_chunks.append(response.audio_content)
            print(f"Received audio chunk {chunk_count}: {len(response.audio_content)} bytes")
    
    return b''.join(audio_chunks)

# Usage with long text
long_text = """
This is a very long piece of text that demonstrates streaming synthesis 
with automatic chunking. The system will break this text into smaller 
pieces and stream them to the Text-to-Speech API. This approach is useful 
for processing long documents, articles, or books where you want to start 
receiving audio output before the entire text is processed. The streaming 
approach also helps manage memory usage and provides better user experience 
for real-time applications.
"""

audio_data = stream_long_text(long_text, chunk_size=80)
with open("streamed_long_text.wav", "wb") as f:
    f.write(audio_data)

Performance Optimization

Low-Latency Streaming

def create_low_latency_stream_config():
    """Create optimized configuration for minimal latency."""
    
    return texttospeech.StreamingSynthesizeConfig(
        voice=texttospeech.VoiceSelectionParams(
            language_code="en-US",
            name="en-US-Standard-A",  # Standard voices have lower latency
            advanced_voice_options=texttospeech.AdvancedVoiceOptions(
                low_latency_journey_synthesis=True
            )
        ),
        audio_config=texttospeech.StreamingAudioConfig(
            audio_encoding=texttospeech.AudioEncoding.LINEAR16,  # Uncompressed
            sample_rate_hertz=16000,  # Lower sample rate
            speaking_rate=1.1         # Slightly faster speech
        )
    )

def optimized_streaming_synthesis(text_parts: list):
    """Optimized streaming for real-time applications."""
    client = texttospeech.TextToSpeechClient()
    
    config = create_low_latency_stream_config()
    
    def fast_request_generator():
        yield texttospeech.StreamingSynthesizeRequest(streaming_config=config)
        
        for text in text_parts:
            # Send smaller chunks for faster processing
            if len(text) > 50:
                # Break into smaller pieces
                words = text.split()
                chunk_size = 10  # words per chunk
                for i in range(0, len(words), chunk_size):
                    chunk = " ".join(words[i:i + chunk_size])
                    yield texttospeech.StreamingSynthesizeRequest(
                        input=texttospeech.StreamingSynthesisInput(text=chunk)
                    )
            else:
                yield texttospeech.StreamingSynthesizeRequest(
                    input=texttospeech.StreamingSynthesisInput(text=text)
                )
    
    # Process with timing
    import time
    start_time = time.time()
    
    responses = client.streaming_synthesize(fast_request_generator())
    first_response_time = None
    audio_chunks = []
    
    for response in responses:
        if response.audio_content:
            if first_response_time is None:
                first_response_time = time.time()
                print(f"First audio received in: {first_response_time - start_time:.2f}s")
            
            audio_chunks.append(response.audio_content)
    
    total_time = time.time() - start_time
    print(f"Total streaming time: {total_time:.2f}s")
    
    return b''.join(audio_chunks)

Error Handling for Streaming

from google.api_core import exceptions
import logging

def robust_streaming_synthesis(text_parts: list, max_retries: int = 3):
    """Streaming synthesis with comprehensive error handling."""
    
    client = texttospeech.TextToSpeechClient()
    
    config = texttospeech.StreamingSynthesizeConfig(
        voice=texttospeech.VoiceSelectionParams(language_code="en-US"),
        audio_config=texttospeech.StreamingAudioConfig(
            audio_encoding=texttospeech.AudioEncoding.LINEAR16,
            sample_rate_hertz=22050
        )
    )
    
    for attempt in range(max_retries):
        try:
            def request_generator():
                yield texttospeech.StreamingSynthesizeRequest(streaming_config=config)
                
                for text in text_parts:
                    yield texttospeech.StreamingSynthesizeRequest(
                        input=texttospeech.StreamingSynthesisInput(text=text)
                    )
            
            # Attempt streaming
            responses = client.streaming_synthesize(request_generator())
            
            audio_chunks = []
            for response in responses:
                if response.audio_content:
                    audio_chunks.append(response.audio_content)
            
            return b''.join(audio_chunks)
            
        except exceptions.DeadlineExceeded as e:
            logging.warning(f"Streaming timeout (attempt {attempt + 1}): {e}")
            if attempt == max_retries - 1:
                raise
        
        except exceptions.ResourceExhausted as e:
            logging.warning(f"Rate limit exceeded (attempt {attempt + 1}): {e}")
            if attempt == max_retries - 1:
                raise
            # Wait before retry
            import time
            time.sleep(2 ** attempt)  # Exponential backoff
        
        except exceptions.ServiceUnavailable as e:
            logging.warning(f"Service unavailable (attempt {attempt + 1}): {e}")
            if attempt == max_retries - 1:
                raise
            import time
            time.sleep(1)
        
        except Exception as e:
            logging.error(f"Unexpected streaming error: {e}")
            raise
    
    raise RuntimeError(f"Streaming failed after {max_retries} attempts")

# Usage with error handling
try:
    text_parts = [
        "This is the first part of the streaming text.",
        "Here's the second part with more content.",
        "And finally, this is the conclusion."
    ]
    
    audio_result = robust_streaming_synthesis(text_parts)
    print(f"Successfully generated {len(audio_result)} bytes of audio")
    
except Exception as e:
    print(f"Streaming synthesis failed: {e}")

Install with Tessl CLI

npx tessl i tessl/pypi-google-cloud-texttospeech

docs

async-clients.md

configuration-types.md

index.md

long-audio-synthesis.md

speech-synthesis.md

streaming-synthesis.md

voice-management.md

tile.json