Google Cloud Texttospeech API client library for converting text to speech with multiple voices and audio formats
—
The Google Cloud Text-to-Speech API provides full async/await support through asynchronous client classes. These clients enable non-blocking operations, making them ideal for applications that need to handle multiple synthesis requests concurrently or integrate with async frameworks like FastAPI, aiohttp, or asyncio-based applications.
import asyncio
from google.cloud import texttospeech
# Initialize async client
async_client = texttospeech.TextToSpeechAsyncClient()
# Basic async synthesis
async def basic_async_synthesis():
request = texttospeech.SynthesizeSpeechRequest(
input=texttospeech.SynthesisInput(text="Hello from async synthesis!"),
voice=texttospeech.VoiceSelectionParams(language_code="en-US"),
audio_config=texttospeech.AudioConfig(
audio_encoding=texttospeech.AudioEncoding.MP3
)
)
response = await async_client.synthesize_speech(request=request)
return response.audio_content
# Run async function
audio_data = asyncio.run(basic_async_synthesis())from google.cloud.texttospeech_v1.services import text_to_speech_long_audio_synthesize
# Initialize async long audio client
async_long_client = text_to_speech_long_audio_synthesize.TextToSpeechLongAudioSynthesizeAsyncClient()
# Async long audio synthesis
async def async_long_audio_synthesis():
request = texttospeech.SynthesizeLongAudioRequest(
parent="projects/your-project-id/locations/us-central1",
input=texttospeech.SynthesisInput(text="Very long text content..." * 100),
voice=texttospeech.VoiceSelectionParams(language_code="en-US"),
audio_config=texttospeech.AudioConfig(
audio_encoding=texttospeech.AudioEncoding.MP3
),
output_gcs_uri="gs://your-bucket/async-long-audio.mp3"
)
operation = await async_long_client.synthesize_long_audio(request=request)
return operation
# Usage
# operation = asyncio.run(async_long_audio_synthesis())import asyncio
from google.cloud import texttospeech
class AsyncTextToSpeech:
"""Async Text-to-Speech wrapper class."""
def __init__(self):
self.client = texttospeech.TextToSpeechAsyncClient()
async def synthesize_text(self, text: str, language_code: str = "en-US",
voice_name: str = None) -> bytes:
"""Synthesize text to audio asynchronously."""
voice = texttospeech.VoiceSelectionParams(
language_code=language_code,
name=voice_name
)
audio_config = texttospeech.AudioConfig(
audio_encoding=texttospeech.AudioEncoding.MP3
)
request = texttospeech.SynthesizeSpeechRequest(
input=texttospeech.SynthesisInput(text=text),
voice=voice,
audio_config=audio_config
)
response = await self.client.synthesize_speech(request=request)
return response.audio_content
async def synthesize_ssml(self, ssml: str, language_code: str = "en-US",
voice_name: str = None) -> bytes:
"""Synthesize SSML to audio asynchronously."""
voice = texttospeech.VoiceSelectionParams(
language_code=language_code,
name=voice_name
)
audio_config = texttospeech.AudioConfig(
audio_encoding=texttospeech.AudioEncoding.LINEAR16,
sample_rate_hertz=24000
)
request = texttospeech.SynthesizeSpeechRequest(
input=texttospeech.SynthesisInput(ssml=ssml),
voice=voice,
audio_config=audio_config
)
response = await self.client.synthesize_speech(request=request)
return response.audio_content
async def close(self):
"""Close the async client."""
await self.client.close()
# Usage example
async def demo_async_synthesis():
tts = AsyncTextToSpeech()
try:
# Synthesize text
audio1 = await tts.synthesize_text("Hello async world!")
print(f"Generated {len(audio1)} bytes of audio")
# Synthesize SSML
ssml = '<speak>This is <emphasis level="strong">emphasized</emphasis> text.</speak>'
audio2 = await tts.synthesize_ssml(ssml)
print(f"Generated {len(audio2)} bytes from SSML")
finally:
await tts.close()
# Run the demo
# asyncio.run(demo_async_synthesis())import asyncio
from google.cloud import texttospeech
async def list_voices_async(language_filter: str = None):
"""List available voices asynchronously."""
async_client = texttospeech.TextToSpeechAsyncClient()
try:
if language_filter:
request = texttospeech.ListVoicesRequest(language_code=language_filter)
response = await async_client.list_voices(request=request)
else:
response = await async_client.list_voices()
voices = []
for voice in response.voices:
voices.append({
'name': voice.name,
'language_codes': voice.language_codes,
'ssml_gender': voice.ssml_gender.name,
'natural_sample_rate_hertz': voice.natural_sample_rate_hertz
})
return voices
finally:
await async_client.close()
async def find_best_voice_async(language_code: str, gender: str = None):
"""Find the best voice for language and gender asynchronously."""
voices = await list_voices_async(language_code)
# Filter by gender if specified
if gender:
gender_upper = gender.upper()
voices = [v for v in voices if v['ssml_gender'] == gender_upper]
# Prefer Neural2 > Wavenet > Standard
for voice_type in ['Neural2', 'Wavenet', 'Standard']:
for voice in voices:
if voice_type in voice['name']:
return voice
return voices[0] if voices else None
# Usage
async def voice_discovery_demo():
# List all English voices
en_voices = await list_voices_async("en-US")
print(f"Found {len(en_voices)} English voices")
# Find best female voice
best_female = await find_best_voice_async("en-US", "female")
if best_female:
print(f"Best female voice: {best_female['name']}")
# asyncio.run(voice_discovery_demo())import asyncio
from typing import List, Dict
from google.cloud import texttospeech
class AsyncBatchProcessor:
"""Process multiple TTS requests concurrently."""
def __init__(self, max_concurrent: int = 10):
self.max_concurrent = max_concurrent
self.client = texttospeech.TextToSpeechAsyncClient()
self.semaphore = asyncio.Semaphore(max_concurrent)
async def synthesize_single(self, text: str, voice_config: dict,
audio_config: dict) -> Dict:
"""Synthesize a single text with rate limiting."""
async with self.semaphore: # Limit concurrent requests
try:
request = texttospeech.SynthesizeSpeechRequest(
input=texttospeech.SynthesisInput(text=text),
voice=texttospeech.VoiceSelectionParams(**voice_config),
audio_config=texttospeech.AudioConfig(**audio_config)
)
response = await self.client.synthesize_speech(request=request)
return {
'success': True,
'audio_content': response.audio_content,
'text': text[:50] + "..." if len(text) > 50 else text
}
except Exception as e:
return {
'success': False,
'error': str(e),
'text': text[:50] + "..." if len(text) > 50 else text
}
async def process_batch(self, text_list: List[str],
voice_config: dict = None,
audio_config: dict = None) -> List[Dict]:
"""Process multiple texts concurrently."""
# Default configurations
default_voice = voice_config or {'language_code': 'en-US'}
default_audio = audio_config or {
'audio_encoding': texttospeech.AudioEncoding.MP3
}
# Create tasks for all texts
tasks = []
for text in text_list:
task = self.synthesize_single(text, default_voice, default_audio)
tasks.append(task)
# Execute all tasks concurrently
results = await asyncio.gather(*tasks, return_exceptions=True)
return results
async def process_with_different_voices(self, text_voice_pairs: List[tuple]) -> List[Dict]:
"""Process texts with different voice configurations."""
tasks = []
for text, voice_config, audio_config in text_voice_pairs:
task = self.synthesize_single(text, voice_config, audio_config)
tasks.append(task)
results = await asyncio.gather(*tasks, return_exceptions=True)
return results
async def close(self):
"""Close the async client."""
await self.client.close()
# Usage example
async def batch_processing_demo():
"""Demonstrate batch processing with async."""
processor = AsyncBatchProcessor(max_concurrent=5)
try:
# Batch of texts to process
texts = [
"This is the first text to synthesize.",
"Here's the second piece of content.",
"And this is the third text sample.",
"Fourth text for our batch processing demo.",
"Finally, the last text in our batch."
]
print("Processing batch of texts...")
start_time = asyncio.get_event_loop().time()
# Process all texts concurrently
results = await processor.process_batch(texts)
end_time = asyncio.get_event_loop().time()
processing_time = end_time - start_time
# Analyze results
successful = [r for r in results if isinstance(r, dict) and r.get('success')]
failed = [r for r in results if isinstance(r, dict) and not r.get('success')]
print(f"Batch processing completed in {processing_time:.2f} seconds")
print(f"Successful: {len(successful)}")
print(f"Failed: {len(failed)}")
# Save successful results
for i, result in enumerate(successful):
filename = f"batch_output_{i}.mp3"
with open(filename, "wb") as f:
f.write(result['audio_content'])
print(f"Saved: {filename}")
return results
finally:
await processor.close()
# Run batch processing
# results = asyncio.run(batch_processing_demo())import asyncio
from google.cloud import texttospeech
async def create_multi_voice_conversation():
"""Create conversation with different voices asynchronously."""
processor = AsyncBatchProcessor(max_concurrent=3)
try:
# Conversation parts with different voices
conversation_parts = [
(
"Hello, welcome to our customer service.",
{'language_code': 'en-US', 'name': 'en-US-Neural2-A'}, # Female agent
{'audio_encoding': texttospeech.AudioEncoding.MP3}
),
(
"Hi there, I have a question about my account.",
{'language_code': 'en-US', 'name': 'en-US-Neural2-C'}, # Male customer
{'audio_encoding': texttospeech.AudioEncoding.MP3}
),
(
"I'd be happy to help you with that. Can you provide your account number?",
{'language_code': 'en-US', 'name': 'en-US-Neural2-A'}, # Female agent
{'audio_encoding': texttospeech.AudioEncoding.MP3}
),
(
"Sure, my account number is 12345.",
{'language_code': 'en-US', 'name': 'en-US-Neural2-C'}, # Male customer
{'audio_encoding': texttospeech.AudioEncoding.MP3}
)
]
print("Creating multi-voice conversation...")
results = await processor.process_with_different_voices(conversation_parts)
# Combine successful results in order
conversation_audio = []
for i, result in enumerate(results):
if isinstance(result, dict) and result.get('success'):
conversation_audio.append(result['audio_content'])
print(f"Part {i+1}: Generated {len(result['audio_content'])} bytes")
# Save complete conversation
if conversation_audio:
complete_audio = b''.join(conversation_audio)
with open("conversation.mp3", "wb") as f:
f.write(complete_audio)
print(f"Saved complete conversation: {len(complete_audio)} bytes")
return conversation_audio
finally:
await processor.close()
# asyncio.run(create_multi_voice_conversation())import asyncio
from google.cloud import texttospeech
from typing import AsyncGenerator
class AsyncStreamingSynthesis:
"""Async streaming text-to-speech synthesis."""
def __init__(self):
self.client = texttospeech.TextToSpeechAsyncClient()
async def stream_synthesis(self, text_chunks: list) -> AsyncGenerator[bytes, None]:
"""Stream synthesis of multiple text chunks."""
# Configure streaming
config = texttospeech.StreamingSynthesizeConfig(
voice=texttospeech.VoiceSelectionParams(
language_code="en-US",
name="en-US-Neural2-A"
),
audio_config=texttospeech.StreamingAudioConfig(
audio_encoding=texttospeech.AudioEncoding.LINEAR16,
sample_rate_hertz=22050
)
)
async def request_generator():
# Configuration request
yield texttospeech.StreamingSynthesizeRequest(streaming_config=config)
# Input requests
for chunk in text_chunks:
yield texttospeech.StreamingSynthesizeRequest(
input=texttospeech.StreamingSynthesisInput(text=chunk)
)
# Stream synthesis
response_stream = await self.client.streaming_synthesize(request_generator())
async for response in response_stream:
if response.audio_content:
yield response.audio_content
async def process_streaming_text(self, long_text: str, chunk_size: int = 100):
"""Process long text with streaming synthesis."""
# Break text into chunks
words = long_text.split()
text_chunks = []
current_chunk = []
current_length = 0
for word in words:
current_chunk.append(word)
current_length += len(word) + 1 # +1 for space
if current_length >= chunk_size:
text_chunks.append(' '.join(current_chunk))
current_chunk = []
current_length = 0
if current_chunk:
text_chunks.append(' '.join(current_chunk))
# Stream synthesis
audio_chunks = []
async for audio_chunk in self.stream_synthesis(text_chunks):
audio_chunks.append(audio_chunk)
print(f"Received streaming audio chunk: {len(audio_chunk)} bytes")
return b''.join(audio_chunks)
async def close(self):
"""Close the async client."""
await self.client.close()
# Usage example
async def streaming_demo():
"""Demonstrate async streaming synthesis."""
streamer = AsyncStreamingSynthesis()
try:
long_text = """
This is a long piece of text that will be processed using async streaming
synthesis. The text will be broken into smaller chunks and each chunk will
be sent to the synthesis service as part of a streaming request. This allows
for more efficient processing of long content and enables real-time audio
generation as the text is being processed.
""" * 3
print("Starting async streaming synthesis...")
audio_data = await streamer.process_streaming_text(long_text, chunk_size=80)
print(f"Streaming synthesis complete: {len(audio_data)} bytes generated")
# Save result
with open("async_streaming_output.wav", "wb") as f:
f.write(audio_data)
return audio_data
finally:
await streamer.close()
# asyncio.run(streaming_demo())import asyncio
from fastapi import FastAPI, HTTPException
from fastapi.responses import Response
from pydantic import BaseModel
from google.cloud import texttospeech
from typing import Optional
app = FastAPI()
# Global async client (initialized once)
tts_client = None
class TTSRequest(BaseModel):
text: str
language_code: str = "en-US"
voice_name: Optional[str] = None
audio_encoding: str = "MP3"
speaking_rate: float = 1.0
pitch: float = 0.0
@app.on_event("startup")
async def startup_event():
"""Initialize TTS client on startup."""
global tts_client
tts_client = texttospeech.TextToSpeechAsyncClient()
@app.on_event("shutdown")
async def shutdown_event():
"""Close TTS client on shutdown."""
global tts_client
if tts_client:
await tts_client.close()
@app.post("/synthesize")
async def synthesize_speech(request: TTSRequest):
"""Synthesize speech from text."""
try:
# Map string encoding to enum
encoding_map = {
"MP3": texttospeech.AudioEncoding.MP3,
"LINEAR16": texttospeech.AudioEncoding.LINEAR16,
"OGG_OPUS": texttospeech.AudioEncoding.OGG_OPUS
}
audio_encoding = encoding_map.get(request.audio_encoding, texttospeech.AudioEncoding.MP3)
# Create synthesis request
synthesis_request = texttospeech.SynthesizeSpeechRequest(
input=texttospeech.SynthesisInput(text=request.text),
voice=texttospeech.VoiceSelectionParams(
language_code=request.language_code,
name=request.voice_name
),
audio_config=texttospeech.AudioConfig(
audio_encoding=audio_encoding,
speaking_rate=request.speaking_rate,
pitch=request.pitch
)
)
# Synthesize speech
response = await tts_client.synthesize_speech(request=synthesis_request)
# Return audio as response
media_type = "audio/mpeg" if request.audio_encoding == "MP3" else "audio/wav"
return Response(
content=response.audio_content,
media_type=media_type,
headers={"Content-Disposition": "attachment; filename=speech.mp3"}
)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Synthesis failed: {str(e)}")
@app.get("/voices")
async def list_voices(language_code: Optional[str] = None):
"""List available voices."""
try:
if language_code:
request = texttospeech.ListVoicesRequest(language_code=language_code)
response = await tts_client.list_voices(request=request)
else:
response = await tts_client.list_voices()
voices = []
for voice in response.voices:
voices.append({
"name": voice.name,
"language_codes": voice.language_codes,
"ssml_gender": voice.ssml_gender.name,
"natural_sample_rate_hertz": voice.natural_sample_rate_hertz
})
return {"voices": voices}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to list voices: {str(e)}")
@app.post("/batch-synthesize")
async def batch_synthesize(requests: list[TTSRequest]):
"""Synthesize multiple texts in parallel."""
try:
async def synthesize_single(req: TTSRequest):
encoding_map = {
"MP3": texttospeech.AudioEncoding.MP3,
"LINEAR16": texttospeech.AudioEncoding.LINEAR16,
"OGG_OPUS": texttospeech.AudioEncoding.OGG_OPUS
}
synthesis_request = texttospeech.SynthesizeSpeechRequest(
input=texttospeech.SynthesisInput(text=req.text),
voice=texttospeech.VoiceSelectionParams(
language_code=req.language_code,
name=req.voice_name
),
audio_config=texttospeech.AudioConfig(
audio_encoding=encoding_map.get(req.audio_encoding, texttospeech.AudioEncoding.MP3),
speaking_rate=req.speaking_rate,
pitch=req.pitch
)
)
response = await tts_client.synthesize_speech(request=synthesis_request)
return {
"text": req.text[:50] + "..." if len(req.text) > 50 else req.text,
"audio_size": len(response.audio_content),
"success": True
}
# Process requests concurrently
tasks = [synthesize_single(req) for req in requests]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Format results
formatted_results = []
for i, result in enumerate(results):
if isinstance(result, Exception):
formatted_results.append({
"index": i,
"success": False,
"error": str(result)
})
else:
formatted_results.append({
"index": i,
**result
})
return {"results": formatted_results}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Batch synthesis failed: {str(e)}")
# To run: uvicorn main:app --reloadimport asyncio
import json
from aiohttp import web, ClientSession
from google.cloud import texttospeech
class TTSService:
"""Text-to-Speech service for aiohttp application."""
def __init__(self):
self.client = None
async def initialize(self):
"""Initialize the TTS client."""
self.client = texttospeech.TextToSpeechAsyncClient()
async def cleanup(self):
"""Cleanup the TTS client."""
if self.client:
await self.client.close()
async def synthesize(self, text: str, language_code: str = "en-US",
voice_name: str = None) -> bytes:
"""Synthesize text to speech."""
request = texttospeech.SynthesizeSpeechRequest(
input=texttospeech.SynthesisInput(text=text),
voice=texttospeech.VoiceSelectionParams(
language_code=language_code,
name=voice_name
),
audio_config=texttospeech.AudioConfig(
audio_encoding=texttospeech.AudioEncoding.MP3
)
)
response = await self.client.synthesize_speech(request=request)
return response.audio_content
# Global TTS service
tts_service = TTSService()
async def synthesize_handler(request):
"""Handle synthesis requests."""
try:
data = await request.json()
text = data.get('text')
language_code = data.get('language_code', 'en-US')
voice_name = data.get('voice_name')
if not text:
return web.json_response({'error': 'Text is required'}, status=400)
audio_data = await tts_service.synthesize(text, language_code, voice_name)
return web.Response(
body=audio_data,
content_type='audio/mpeg',
headers={'Content-Disposition': 'attachment; filename="speech.mp3"'}
)
except Exception as e:
return web.json_response({'error': str(e)}, status=500)
async def health_handler(request):
"""Health check endpoint."""
return web.json_response({'status': 'healthy'})
async def init_app():
"""Initialize the aiohttp application."""
app = web.Application()
# Add routes
app.router.add_post('/synthesize', synthesize_handler)
app.router.add_get('/health', health_handler)
# Initialize TTS service
await tts_service.initialize()
# Setup cleanup
async def cleanup_handler(app):
await tts_service.cleanup()
app.on_cleanup.append(cleanup_handler)
return app
# To run: python -c "import asyncio; from main import init_app; app = asyncio.run(init_app()); web.run_app(app, port=8080)"import asyncio
import logging
from google.api_core import exceptions
from google.cloud import texttospeech
class AsyncTTSWithErrorHandling:
"""Async TTS with comprehensive error handling."""
def __init__(self, max_retries: int = 3):
self.client = texttospeech.TextToSpeechAsyncClient()
self.max_retries = max_retries
async def synthesize_with_retry(self, text: str, **kwargs) -> dict:
"""Synthesize with automatic retry on transient errors."""
for attempt in range(self.max_retries):
try:
request = texttospeech.SynthesizeSpeechRequest(
input=texttospeech.SynthesisInput(text=text),
voice=texttospeech.VoiceSelectionParams(
language_code=kwargs.get('language_code', 'en-US'),
name=kwargs.get('voice_name')
),
audio_config=texttospeech.AudioConfig(
audio_encoding=texttospeech.AudioEncoding.MP3
)
)
response = await self.client.synthesize_speech(request=request)
return {
'success': True,
'audio_content': response.audio_content,
'attempts': attempt + 1
}
except exceptions.ResourceExhausted as e:
logging.warning(f"Rate limit hit (attempt {attempt + 1}): {e}")
if attempt == self.max_retries - 1:
return {'success': False, 'error': 'Rate limit exceeded', 'attempts': attempt + 1}
# Exponential backoff
await asyncio.sleep(2 ** attempt)
except exceptions.ServiceUnavailable as e:
logging.warning(f"Service unavailable (attempt {attempt + 1}): {e}")
if attempt == self.max_retries - 1:
return {'success': False, 'error': 'Service unavailable', 'attempts': attempt + 1}
await asyncio.sleep(1)
except exceptions.DeadlineExceeded as e:
logging.warning(f"Request timeout (attempt {attempt + 1}): {e}")
if attempt == self.max_retries - 1:
return {'success': False, 'error': 'Request timeout', 'attempts': attempt + 1}
except exceptions.InvalidArgument as e:
# Non-retryable error
logging.error(f"Invalid argument: {e}")
return {'success': False, 'error': f'Invalid argument: {e}', 'attempts': attempt + 1}
except Exception as e:
logging.error(f"Unexpected error (attempt {attempt + 1}): {e}")
if attempt == self.max_retries - 1:
return {'success': False, 'error': f'Unexpected error: {e}', 'attempts': attempt + 1}
return {'success': False, 'error': 'Max retries exceeded', 'attempts': self.max_retries}
async def safe_batch_synthesis(self, text_list: list, **kwargs) -> list:
"""Safely process multiple texts with individual error handling."""
async def safe_synthesize_single(text: str) -> dict:
try:
result = await self.synthesize_with_retry(text, **kwargs)
result['text'] = text[:50] + "..." if len(text) > 50 else text
return result
except Exception as e:
return {
'success': False,
'error': f'Failed to process: {e}',
'text': text[:50] + "..." if len(text) > 50 else text
}
# Process all texts concurrently with individual error handling
tasks = [safe_synthesize_single(text) for text in text_list]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Handle any gather-level exceptions
processed_results = []
for i, result in enumerate(results):
if isinstance(result, Exception):
processed_results.append({
'success': False,
'error': f'Task failed: {result}',
'text': text_list[i][:50] + "..." if len(text_list[i]) > 50 else text_list[i]
})
else:
processed_results.append(result)
return processed_results
async def close(self):
"""Close the async client."""
await self.client.close()
# Usage example
async def error_handling_demo():
"""Demonstrate error handling in async operations."""
tts = AsyncTTSWithErrorHandling(max_retries=3)
try:
# Test with various scenarios
test_texts = [
"This is a normal text that should work fine.",
"", # Empty text (should cause InvalidArgument)
"This is another normal text.",
"A" * 10000, # Very long text (might cause issues)
"Final test text."
]
print("Testing batch synthesis with error handling...")
results = await tts.safe_batch_synthesis(test_texts, language_code="en-US")
# Analyze results
successful = [r for r in results if r['success']]
failed = [r for r in results if not r['success']]
print(f"Results: {len(successful)} successful, {len(failed)} failed")
for result in results:
status = "✅" if result['success'] else "❌"
print(f"{status} {result['text']}")
if not result['success']:
print(f" Error: {result['error']}")
return results
finally:
await tts.close()
# asyncio.run(error_handling_demo())import asyncio
from contextlib import asynccontextmanager
from google.cloud import texttospeech
class OptimizedAsyncTTS:
"""Optimized async TTS with connection pooling."""
def __init__(self):
self._client = None
self._client_lock = asyncio.Lock()
async def get_client(self):
"""Get or create TTS client with thread-safe initialization."""
if self._client is None:
async with self._client_lock:
if self._client is None: # Double-check pattern
self._client = texttospeech.TextToSpeechAsyncClient()
return self._client
@asynccontextmanager
async def client_context(self):
"""Context manager for client lifecycle."""
client = await self.get_client()
try:
yield client
finally:
# Client cleanup is handled in close() method
pass
async def synthesize_optimized(self, text: str, **config) -> bytes:
"""Optimized synthesis with client reuse."""
async with self.client_context() as client:
request = texttospeech.SynthesizeSpeechRequest(
input=texttospeech.SynthesisInput(text=text),
voice=texttospeech.VoiceSelectionParams(
language_code=config.get('language_code', 'en-US'),
name=config.get('voice_name')
),
audio_config=texttospeech.AudioConfig(
audio_encoding=texttospeech.AudioEncoding.MP3
)
)
response = await client.synthesize_speech(request=request)
return response.audio_content
async def close(self):
"""Clean up client resources."""
if self._client:
await self._client.close()
self._client = None
# Global optimized TTS instance
optimized_tts = OptimizedAsyncTTS()
async def performance_benchmark():
"""Benchmark async TTS performance."""
import time
test_texts = [f"This is test text number {i}" for i in range(20)]
# Sequential processing
start_time = time.time()
sequential_results = []
for text in test_texts:
audio = await optimized_tts.synthesize_optimized(text)
sequential_results.append(len(audio))
sequential_time = time.time() - start_time
# Concurrent processing
start_time = time.time()
tasks = [optimized_tts.synthesize_optimized(text) for text in test_texts]
concurrent_results = await asyncio.gather(*tasks)
concurrent_time = time.time() - start_time
print(f"Sequential processing: {sequential_time:.2f} seconds")
print(f"Concurrent processing: {concurrent_time:.2f} seconds")
print(f"Speedup: {sequential_time / concurrent_time:.2f}x")
await optimized_tts.close()
# asyncio.run(performance_benchmark())import asyncio
from typing import AsyncIterator
from google.cloud import texttospeech
async def memory_efficient_processing(text_iterator: AsyncIterator[str],
batch_size: int = 5) -> AsyncIterator[bytes]:
"""Process texts in batches to manage memory usage."""
client = texttospeech.TextToSpeechAsyncClient()
try:
batch = []
async for text in text_iterator:
batch.append(text)
if len(batch) >= batch_size:
# Process batch
tasks = []
for text_item in batch:
request = texttospeech.SynthesizeSpeechRequest(
input=texttospeech.SynthesisInput(text=text_item),
voice=texttospeech.VoiceSelectionParams(language_code="en-US"),
audio_config=texttospeech.AudioConfig(
audio_encoding=texttospeech.AudioEncoding.MP3
)
)
task = client.synthesize_speech(request=request)
tasks.append(task)
# Yield results as they complete
results = await asyncio.gather(*tasks)
for response in results:
yield response.audio_content
# Clear batch
batch = []
# Process remaining items
if batch:
tasks = []
for text_item in batch:
request = texttospeech.SynthesizeSpeechRequest(
input=texttospeech.SynthesisInput(text=text_item),
voice=texttospeech.VoiceSelectionParams(language_code="en-US"),
audio_config=texttospeech.AudioConfig(
audio_encoding=texttospeech.AudioEncoding.MP3
)
)
task = client.synthesize_speech(request=request)
tasks.append(task)
results = await asyncio.gather(*tasks)
for response in results:
yield response.audio_content
finally:
await client.close()
# Example usage
async def text_generator():
"""Generate texts for processing."""
for i in range(50):
yield f"This is text number {i} for memory-efficient processing."
async def process_with_memory_efficiency():
"""Demonstrate memory-efficient processing."""
audio_count = 0
total_bytes = 0
async for audio_data in memory_efficient_processing(text_generator(), batch_size=3):
audio_count += 1
total_bytes += len(audio_data)
print(f"Processed audio {audio_count}: {len(audio_data)} bytes")
print(f"Total: {audio_count} audio files, {total_bytes} bytes")
# asyncio.run(process_with_memory_efficiency())Install with Tessl CLI
npx tessl i tessl/pypi-google-cloud-texttospeech