Google Cloud Texttospeech API client library for converting text to speech with multiple voices and audio formats
—
Streaming synthesis enables real-time, bidirectional audio generation where text can be sent incrementally and audio is received as it's generated. This is ideal for interactive applications like chatbots, live assistants, and real-time communication systems where low latency is crucial.
from google.cloud import texttospeech
# Initialize client for streaming
client = texttospeech.TextToSpeechClient()
# Configure streaming synthesis
config = texttospeech.StreamingSynthesizeConfig(
voice=texttospeech.VoiceSelectionParams(
language_code="en-US",
name="en-US-Neural2-A"
),
audio_config=texttospeech.StreamingAudioConfig(
audio_encoding=texttospeech.AudioEncoding.LINEAR16,
sample_rate_hertz=22050
)
)
# Create streaming request iterator
def create_streaming_requests():
# First request with configuration
yield texttospeech.StreamingSynthesizeRequest(streaming_config=config)
# Input requests
yield texttospeech.StreamingSynthesizeRequest(
input=texttospeech.StreamingSynthesisInput(text="Hello, ")
)
yield texttospeech.StreamingSynthesizeRequest(
input=texttospeech.StreamingSynthesisInput(text="this is streaming synthesis.")
)
# Perform streaming synthesis
streaming_responses = client.streaming_synthesize(create_streaming_requests())
# Process responses
for response in streaming_responses:
if response.audio_content:
# Handle audio chunks as they arrive
print(f"Received audio chunk: {len(response.audio_content)} bytes")
# Process or play audio chunk immediatelyfrom google.cloud.texttospeech import (
StreamingSynthesizeRequest,
StreamingSynthesizeConfig,
StreamingSynthesisInput,
StreamingAudioConfig
)
def streaming_ssml_synthesis():
"""Stream SSML content with markup."""
client = texttospeech.TextToSpeechClient()
# Configure for SSML streaming
config = StreamingSynthesizeConfig(
voice=texttospeech.VoiceSelectionParams(
language_code="en-US",
name="en-US-Wavenet-D"
),
audio_config=StreamingAudioConfig(
audio_encoding=texttospeech.AudioEncoding.LINEAR16,
sample_rate_hertz=24000
)
)
def request_generator():
# Configuration request
yield StreamingSynthesizeRequest(streaming_config=config)
# SSML input chunks
ssml_parts = [
'<speak><prosody rate="slow">Hello there!</prosody>',
'<break time="1s"/>',
'<prosody pitch="+5st">This is exciting!</prosody>',
'</speak>'
]
for ssml_part in ssml_parts:
yield StreamingSynthesizeRequest(
input=StreamingSynthesisInput(markup=ssml_part)
)
# Stream and collect audio
responses = client.streaming_synthesize(request_generator())
audio_chunks = []
for response in responses:
if response.audio_content:
audio_chunks.append(response.audio_content)
return b''.join(audio_chunks)
# Usage
streaming_audio = streaming_ssml_synthesis()from google.cloud.texttospeech import (
StreamingSynthesizeConfig,
VoiceSelectionParams,
StreamingAudioConfig,
AudioEncoding
)
# Complete streaming configuration
streaming_config = StreamingSynthesizeConfig(
voice=VoiceSelectionParams(
language_code="en-US",
name="en-US-Neural2-C",
ssml_gender=texttospeech.SsmlVoiceGender.FEMALE
),
audio_config=StreamingAudioConfig(
audio_encoding=AudioEncoding.LINEAR16,
sample_rate_hertz=22050,
speaking_rate=1.1, # Optional: speech rate
pitch=2.0, # Optional: pitch adjustment
volume_gain_db=1.5 # Optional: volume gain
)
)
# Streaming config with advanced voice options
streaming_config = StreamingSynthesizeConfig(
voice=VoiceSelectionParams(
language_code="en-US",
name="en-US-Neural2-A",
advanced_voice_options=texttospeech.AdvancedVoiceOptions(
low_latency_journey_synthesis=True # Enable low latency
)
),
audio_config=StreamingAudioConfig(
audio_encoding=AudioEncoding.LINEAR16,
sample_rate_hertz=16000 # Lower rate for reduced latency
)
)class StreamingAudioConfig:
"""Description of the desired output audio data for streaming.
Parameters:
- audio_encoding (AudioEncoding): Required. Format of audio byte stream.
Streaming supports PCM, ALAW, MULAW and OGG_OPUS only.
- sample_rate_hertz (int): Optional. Synthesis sample rate in hertz.
- speaking_rate (float): Optional. Speaking rate/speed in range [0.25, 2.0].
1.0 is normal speed, 2.0 is twice as fast, 0.5 is half speed.
"""
def __init__(self, audio_encoding, sample_rate_hertz=None, speaking_rate=None): ...from google.cloud.texttospeech import StreamingAudioConfig, AudioEncoding
# Basic streaming audio configuration
audio_config = StreamingAudioConfig(
audio_encoding=AudioEncoding.LINEAR16, # Required: audio format
sample_rate_hertz=22050 # Optional: sample rate
)
# Advanced streaming audio configuration
audio_config = StreamingAudioConfig(
audio_encoding=AudioEncoding.OGG_OPUS, # Compressed format for streaming
sample_rate_hertz=48000,
speaking_rate=0.9 # Slightly slower speech
)
# Low-latency configuration
low_latency_config = StreamingAudioConfig(
audio_encoding=AudioEncoding.LINEAR16,
sample_rate_hertz=16000, # Lower sample rate
speaking_rate=1.0 # Normal rate
)class StreamingSynthesisInput:
"""Input to be synthesized in streaming requests.
This uses oneof fields - only one can be set at a time.
Parameters:
- text (str): Raw text to be synthesized. Recommended to use complete sentences.
- markup (str): Markup for HD voices specifically. Cannot be used with other voices.
- prompt (str): System instruction for controllable voice models only.
"""
def __init__(self, text=None, markup=None, prompt=None): ...from google.cloud.texttospeech import StreamingSynthesisInput
# Text input for streaming
text_input = StreamingSynthesisInput(
text="This is a chunk of text to be synthesized."
)
# Markup input for streaming (HD voices only)
markup_input = StreamingSynthesisInput(
markup="Markup content for HD voices specifically."
)
# Prompt input for controllable voice models
prompt_input = StreamingSynthesisInput(
prompt="System instruction for controllable voice models."
)
# Note: StreamingSynthesisInput uses oneof fields - only one can be set at a timeclass StreamingSynthesizeRequest:
"""Request message for StreamingSynthesize method.
Uses oneof fields - only one can be set at a time.
First message must contain streaming_config, subsequent messages contain input.
Parameters:
- streaming_config (StreamingSynthesizeConfig): Configuration for first request only.
- input (StreamingSynthesisInput): Input text/markup for subsequent requests.
"""
def __init__(self, streaming_config=None, input=None): ...from google.cloud.texttospeech import (
StreamingSynthesizeRequest,
StreamingSynthesizeConfig,
StreamingSynthesisInput
)
# Configuration request (first request in stream)
config_request = StreamingSynthesizeRequest(
streaming_config=StreamingSynthesizeConfig(
voice=texttospeech.VoiceSelectionParams(language_code="en-US"),
audio_config=texttospeech.StreamingAudioConfig(
audio_encoding=texttospeech.AudioEncoding.LINEAR16,
sample_rate_hertz=22050
)
)
)
# Input request (subsequent requests)
input_request = StreamingSynthesizeRequest(
input=StreamingSynthesisInput(text="Text to synthesize")
)
# Markup input request (for HD voices)
markup_request = StreamingSynthesizeRequest(
input=StreamingSynthesisInput(
markup='Markup content with specific formatting for HD voices'
)
)from google.cloud.texttospeech import StreamingSynthesizeResponse
# Response processing
def process_streaming_response(response: StreamingSynthesizeResponse):
"""Process individual streaming response."""
# Check for audio content
if response.audio_content:
audio_size = len(response.audio_content)
print(f"Received audio chunk: {audio_size} bytes")
return response.audio_content
# Handle other response fields
if hasattr(response, 'error') and response.error:
print(f"Streaming error: {response.error}")
return None
# Example response handling
def handle_streaming_responses(response_iterator):
"""Handle complete streaming response sequence."""
audio_chunks = []
total_chunks = 0
total_bytes = 0
for response in response_iterator:
audio_chunk = process_streaming_response(response)
if audio_chunk:
audio_chunks.append(audio_chunk)
total_chunks += 1
total_bytes += len(audio_chunk)
print(f"Streaming complete: {total_chunks} chunks, {total_bytes} bytes total")
return b''.join(audio_chunks)import threading
import queue
import time
from google.cloud import texttospeech
class RealTimeTextToSpeech:
"""Real-time text-to-speech streaming processor."""
def __init__(self, language_code="en-US", voice_name=None):
self.client = texttospeech.TextToSpeechClient()
self.text_queue = queue.Queue()
self.audio_queue = queue.Queue()
self.is_running = False
# Configure streaming
self.config = texttospeech.StreamingSynthesizeConfig(
voice=texttospeech.VoiceSelectionParams(
language_code=language_code,
name=voice_name or "en-US-Neural2-A",
advanced_voice_options=texttospeech.AdvancedVoiceOptions(
low_latency_journey_synthesis=True
)
),
audio_config=texttospeech.StreamingAudioConfig(
audio_encoding=texttospeech.AudioEncoding.LINEAR16,
sample_rate_hertz=16000 # Lower rate for real-time
)
)
def start_streaming(self):
"""Start the streaming synthesis thread."""
self.is_running = True
self.streaming_thread = threading.Thread(target=self._stream_worker)
self.streaming_thread.start()
def stop_streaming(self):
"""Stop streaming synthesis."""
self.is_running = False
self.text_queue.put(None) # Sentinel to end stream
if hasattr(self, 'streaming_thread'):
self.streaming_thread.join()
def add_text(self, text: str):
"""Add text to synthesis queue."""
if self.is_running:
self.text_queue.put(text)
def get_audio(self, timeout: float = 1.0):
"""Get synthesized audio chunk."""
try:
return self.audio_queue.get(timeout=timeout)
except queue.Empty:
return None
def _stream_worker(self):
"""Background streaming worker."""
def request_generator():
# Send configuration first
yield texttospeech.StreamingSynthesizeRequest(
streaming_config=self.config
)
# Send text inputs as they arrive
while self.is_running:
try:
text = self.text_queue.get(timeout=1.0)
if text is None: # Sentinel to end
break
yield texttospeech.StreamingSynthesizeRequest(
input=texttospeech.StreamingSynthesisInput(text=text)
)
except queue.Empty:
continue
try:
# Start streaming
responses = self.client.streaming_synthesize(request_generator())
# Process responses
for response in responses:
if response.audio_content and self.is_running:
self.audio_queue.put(response.audio_content)
except Exception as e:
print(f"Streaming error: {e}")
finally:
self.audio_queue.put(None) # Signal end of audio
# Usage example
tts_stream = RealTimeTextToSpeech()
tts_stream.start_streaming()
# Add text for synthesis
tts_stream.add_text("Hello, this is real-time synthesis.")
tts_stream.add_text("Each text chunk is processed immediately.")
tts_stream.add_text("Great for interactive applications!")
# Collect audio chunks
audio_chunks = []
while True:
audio_chunk = tts_stream.get_audio()
if audio_chunk is None:
break
audio_chunks.append(audio_chunk)
print(f"Got audio chunk: {len(audio_chunk)} bytes")
tts_stream.stop_streaming()
# Combine all audio
complete_audio = b''.join(audio_chunks)
with open("realtime_output.wav", "wb") as f:
f.write(complete_audio)import asyncio
from google.cloud import texttospeech
class ConversationSynthesizer:
"""Interactive conversation streaming synthesis."""
def __init__(self):
self.client = texttospeech.TextToSpeechClient()
def synthesize_conversation(self, conversation_parts: list, output_file: str):
"""Synthesize conversation with different voices for different speakers."""
# Voice configurations for different speakers
speaker_configs = {
"speaker1": texttospeech.StreamingSynthesizeConfig(
voice=texttospeech.VoiceSelectionParams(
language_code="en-US",
name="en-US-Neural2-A" # Female voice
),
audio_config=texttospeech.StreamingAudioConfig(
audio_encoding=texttospeech.AudioEncoding.LINEAR16,
sample_rate_hertz=22050
)
),
"speaker2": texttospeech.StreamingSynthesizeConfig(
voice=texttospeech.VoiceSelectionParams(
language_code="en-US",
name="en-US-Neural2-C" # Male voice
),
audio_config=texttospeech.StreamingAudioConfig(
audio_encoding=texttospeech.AudioEncoding.LINEAR16,
sample_rate_hertz=22050
)
)
}
all_audio_chunks = []
# Process each speaker separately for voice consistency
for speaker_id, config in speaker_configs.items():
speaker_parts = [part for part in conversation_parts
if part.get('speaker') == speaker_id]
if not speaker_parts:
continue
def request_generator():
# Configuration
yield texttospeech.StreamingSynthesizeRequest(
streaming_config=config
)
# Speaker's dialogue parts
for part in speaker_parts:
yield texttospeech.StreamingSynthesizeRequest(
input=texttospeech.StreamingSynthesisInput(
text=part['text']
)
)
# Collect audio for this speaker
responses = self.client.streaming_synthesize(request_generator())
speaker_audio = []
for response in responses:
if response.audio_content:
speaker_audio.append(response.audio_content)
# Store with timing information
for i, part in enumerate(speaker_parts):
part['audio_data'] = b''.join(speaker_audio) if i == 0 else b''
# Reconstruct conversation in original order
final_audio = []
for part in conversation_parts:
if 'audio_data' in part and part['audio_data']:
final_audio.append(part['audio_data'])
# Save complete conversation
with open(output_file, "wb") as f:
f.write(b''.join(final_audio))
return output_file
# Usage example
conversation = [
{"speaker": "speaker1", "text": "Hello! How are you today?"},
{"speaker": "speaker2", "text": "I'm doing great, thanks for asking!"},
{"speaker": "speaker1", "text": "That's wonderful to hear. What are your plans?"},
{"speaker": "speaker2", "text": "I'm planning to work on some exciting projects."}
]
synthesizer = ConversationSynthesizer()
output_file = synthesizer.synthesize_conversation(conversation, "conversation.wav")
print(f"Conversation saved to {output_file}")def stream_long_text(text: str, chunk_size: int = 100):
"""Stream long text by breaking it into manageable chunks."""
import re
client = texttospeech.TextToSpeechClient()
# Configure streaming for long content
config = texttospeech.StreamingSynthesizeConfig(
voice=texttospeech.VoiceSelectionParams(
language_code="en-US",
name="en-US-Wavenet-A"
),
audio_config=texttospeech.StreamingAudioConfig(
audio_encoding=texttospeech.AudioEncoding.LINEAR16,
sample_rate_hertz=22050
)
)
# Smart text chunking (respect sentence boundaries)
def smart_chunk_text(text: str, max_size: int):
"""Break text into chunks at sentence boundaries when possible."""
sentences = re.split(r'(?<=[.!?])\s+', text)
chunks = []
current_chunk = ""
for sentence in sentences:
if len(current_chunk + sentence) <= max_size:
current_chunk += sentence + " "
else:
if current_chunk:
chunks.append(current_chunk.strip())
current_chunk = sentence + " "
if current_chunk:
chunks.append(current_chunk.strip())
return chunks
# Create text chunks
text_chunks = smart_chunk_text(text, chunk_size)
def request_generator():
# Configuration request
yield texttospeech.StreamingSynthesizeRequest(streaming_config=config)
# Send text chunks
for i, chunk in enumerate(text_chunks):
print(f"Streaming chunk {i+1}/{len(text_chunks)}: {len(chunk)} chars")
yield texttospeech.StreamingSynthesizeRequest(
input=texttospeech.StreamingSynthesisInput(text=chunk)
)
# Stream and collect results
responses = client.streaming_synthesize(request_generator())
audio_chunks = []
chunk_count = 0
for response in responses:
if response.audio_content:
chunk_count += 1
audio_chunks.append(response.audio_content)
print(f"Received audio chunk {chunk_count}: {len(response.audio_content)} bytes")
return b''.join(audio_chunks)
# Usage with long text
long_text = """
This is a very long piece of text that demonstrates streaming synthesis
with automatic chunking. The system will break this text into smaller
pieces and stream them to the Text-to-Speech API. This approach is useful
for processing long documents, articles, or books where you want to start
receiving audio output before the entire text is processed. The streaming
approach also helps manage memory usage and provides better user experience
for real-time applications.
"""
audio_data = stream_long_text(long_text, chunk_size=80)
with open("streamed_long_text.wav", "wb") as f:
f.write(audio_data)def create_low_latency_stream_config():
"""Create optimized configuration for minimal latency."""
return texttospeech.StreamingSynthesizeConfig(
voice=texttospeech.VoiceSelectionParams(
language_code="en-US",
name="en-US-Standard-A", # Standard voices have lower latency
advanced_voice_options=texttospeech.AdvancedVoiceOptions(
low_latency_journey_synthesis=True
)
),
audio_config=texttospeech.StreamingAudioConfig(
audio_encoding=texttospeech.AudioEncoding.LINEAR16, # Uncompressed
sample_rate_hertz=16000, # Lower sample rate
speaking_rate=1.1 # Slightly faster speech
)
)
def optimized_streaming_synthesis(text_parts: list):
"""Optimized streaming for real-time applications."""
client = texttospeech.TextToSpeechClient()
config = create_low_latency_stream_config()
def fast_request_generator():
yield texttospeech.StreamingSynthesizeRequest(streaming_config=config)
for text in text_parts:
# Send smaller chunks for faster processing
if len(text) > 50:
# Break into smaller pieces
words = text.split()
chunk_size = 10 # words per chunk
for i in range(0, len(words), chunk_size):
chunk = " ".join(words[i:i + chunk_size])
yield texttospeech.StreamingSynthesizeRequest(
input=texttospeech.StreamingSynthesisInput(text=chunk)
)
else:
yield texttospeech.StreamingSynthesizeRequest(
input=texttospeech.StreamingSynthesisInput(text=text)
)
# Process with timing
import time
start_time = time.time()
responses = client.streaming_synthesize(fast_request_generator())
first_response_time = None
audio_chunks = []
for response in responses:
if response.audio_content:
if first_response_time is None:
first_response_time = time.time()
print(f"First audio received in: {first_response_time - start_time:.2f}s")
audio_chunks.append(response.audio_content)
total_time = time.time() - start_time
print(f"Total streaming time: {total_time:.2f}s")
return b''.join(audio_chunks)from google.api_core import exceptions
import logging
def robust_streaming_synthesis(text_parts: list, max_retries: int = 3):
"""Streaming synthesis with comprehensive error handling."""
client = texttospeech.TextToSpeechClient()
config = texttospeech.StreamingSynthesizeConfig(
voice=texttospeech.VoiceSelectionParams(language_code="en-US"),
audio_config=texttospeech.StreamingAudioConfig(
audio_encoding=texttospeech.AudioEncoding.LINEAR16,
sample_rate_hertz=22050
)
)
for attempt in range(max_retries):
try:
def request_generator():
yield texttospeech.StreamingSynthesizeRequest(streaming_config=config)
for text in text_parts:
yield texttospeech.StreamingSynthesizeRequest(
input=texttospeech.StreamingSynthesisInput(text=text)
)
# Attempt streaming
responses = client.streaming_synthesize(request_generator())
audio_chunks = []
for response in responses:
if response.audio_content:
audio_chunks.append(response.audio_content)
return b''.join(audio_chunks)
except exceptions.DeadlineExceeded as e:
logging.warning(f"Streaming timeout (attempt {attempt + 1}): {e}")
if attempt == max_retries - 1:
raise
except exceptions.ResourceExhausted as e:
logging.warning(f"Rate limit exceeded (attempt {attempt + 1}): {e}")
if attempt == max_retries - 1:
raise
# Wait before retry
import time
time.sleep(2 ** attempt) # Exponential backoff
except exceptions.ServiceUnavailable as e:
logging.warning(f"Service unavailable (attempt {attempt + 1}): {e}")
if attempt == max_retries - 1:
raise
import time
time.sleep(1)
except Exception as e:
logging.error(f"Unexpected streaming error: {e}")
raise
raise RuntimeError(f"Streaming failed after {max_retries} attempts")
# Usage with error handling
try:
text_parts = [
"This is the first part of the streaming text.",
"Here's the second part with more content.",
"And finally, this is the conclusion."
]
audio_result = robust_streaming_synthesis(text_parts)
print(f"Successfully generated {len(audio_result)} bytes of audio")
except Exception as e:
print(f"Streaming synthesis failed: {e}")Install with Tessl CLI
npx tessl i tessl/pypi-google-cloud-texttospeech