CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-google-cloud-texttospeech

Google Cloud Texttospeech API client library for converting text to speech with multiple voices and audio formats

Pending
Overview
Eval results
Files

async-clients.mddocs/

Async Clients

Overview

The Google Cloud Text-to-Speech API provides full async/await support through asynchronous client classes. These clients enable non-blocking operations, making them ideal for applications that need to handle multiple synthesis requests concurrently or integrate with async frameworks like FastAPI, aiohttp, or asyncio-based applications.

Async Client Classes

TextToSpeechAsyncClient

import asyncio
from google.cloud import texttospeech

# Initialize async client
async_client = texttospeech.TextToSpeechAsyncClient()

# Basic async synthesis
async def basic_async_synthesis():
    request = texttospeech.SynthesizeSpeechRequest(
        input=texttospeech.SynthesisInput(text="Hello from async synthesis!"),
        voice=texttospeech.VoiceSelectionParams(language_code="en-US"),
        audio_config=texttospeech.AudioConfig(
            audio_encoding=texttospeech.AudioEncoding.MP3
        )
    )
    
    response = await async_client.synthesize_speech(request=request)
    return response.audio_content

# Run async function
audio_data = asyncio.run(basic_async_synthesis())

TextToSpeechLongAudioSynthesizeAsyncClient

from google.cloud.texttospeech_v1.services import text_to_speech_long_audio_synthesize

# Initialize async long audio client
async_long_client = text_to_speech_long_audio_synthesize.TextToSpeechLongAudioSynthesizeAsyncClient()

# Async long audio synthesis
async def async_long_audio_synthesis():
    request = texttospeech.SynthesizeLongAudioRequest(
        parent="projects/your-project-id/locations/us-central1",
        input=texttospeech.SynthesisInput(text="Very long text content..." * 100),
        voice=texttospeech.VoiceSelectionParams(language_code="en-US"),
        audio_config=texttospeech.AudioConfig(
            audio_encoding=texttospeech.AudioEncoding.MP3
        ),
        output_gcs_uri="gs://your-bucket/async-long-audio.mp3"
    )
    
    operation = await async_long_client.synthesize_long_audio(request=request)
    return operation

# Usage
# operation = asyncio.run(async_long_audio_synthesis())

Core Async Operations

Async Speech Synthesis

import asyncio
from google.cloud import texttospeech

class AsyncTextToSpeech:
    """Async Text-to-Speech wrapper class."""
    
    def __init__(self):
        self.client = texttospeech.TextToSpeechAsyncClient()
    
    async def synthesize_text(self, text: str, language_code: str = "en-US", 
                             voice_name: str = None) -> bytes:
        """Synthesize text to audio asynchronously."""
        
        voice = texttospeech.VoiceSelectionParams(
            language_code=language_code,
            name=voice_name
        )
        
        audio_config = texttospeech.AudioConfig(
            audio_encoding=texttospeech.AudioEncoding.MP3
        )
        
        request = texttospeech.SynthesizeSpeechRequest(
            input=texttospeech.SynthesisInput(text=text),
            voice=voice,
            audio_config=audio_config
        )
        
        response = await self.client.synthesize_speech(request=request)
        return response.audio_content
    
    async def synthesize_ssml(self, ssml: str, language_code: str = "en-US",
                             voice_name: str = None) -> bytes:
        """Synthesize SSML to audio asynchronously."""
        
        voice = texttospeech.VoiceSelectionParams(
            language_code=language_code,
            name=voice_name
        )
        
        audio_config = texttospeech.AudioConfig(
            audio_encoding=texttospeech.AudioEncoding.LINEAR16,
            sample_rate_hertz=24000
        )
        
        request = texttospeech.SynthesizeSpeechRequest(
            input=texttospeech.SynthesisInput(ssml=ssml),
            voice=voice,
            audio_config=audio_config
        )
        
        response = await self.client.synthesize_speech(request=request)
        return response.audio_content
    
    async def close(self):
        """Close the async client."""
        await self.client.close()

# Usage example
async def demo_async_synthesis():
    tts = AsyncTextToSpeech()
    
    try:
        # Synthesize text
        audio1 = await tts.synthesize_text("Hello async world!")
        print(f"Generated {len(audio1)} bytes of audio")
        
        # Synthesize SSML
        ssml = '<speak>This is <emphasis level="strong">emphasized</emphasis> text.</speak>'
        audio2 = await tts.synthesize_ssml(ssml)
        print(f"Generated {len(audio2)} bytes from SSML")
        
    finally:
        await tts.close()

# Run the demo
# asyncio.run(demo_async_synthesis())

Async Voice Listing

import asyncio
from google.cloud import texttospeech

async def list_voices_async(language_filter: str = None):
    """List available voices asynchronously."""
    
    async_client = texttospeech.TextToSpeechAsyncClient()
    
    try:
        if language_filter:
            request = texttospeech.ListVoicesRequest(language_code=language_filter)
            response = await async_client.list_voices(request=request)
        else:
            response = await async_client.list_voices()
        
        voices = []
        for voice in response.voices:
            voices.append({
                'name': voice.name,
                'language_codes': voice.language_codes,
                'ssml_gender': voice.ssml_gender.name,
                'natural_sample_rate_hertz': voice.natural_sample_rate_hertz
            })
        
        return voices
    
    finally:
        await async_client.close()

async def find_best_voice_async(language_code: str, gender: str = None):
    """Find the best voice for language and gender asynchronously."""
    
    voices = await list_voices_async(language_code)
    
    # Filter by gender if specified
    if gender:
        gender_upper = gender.upper()
        voices = [v for v in voices if v['ssml_gender'] == gender_upper]
    
    # Prefer Neural2 > Wavenet > Standard
    for voice_type in ['Neural2', 'Wavenet', 'Standard']:
        for voice in voices:
            if voice_type in voice['name']:
                return voice
    
    return voices[0] if voices else None

# Usage
async def voice_discovery_demo():
    # List all English voices
    en_voices = await list_voices_async("en-US")
    print(f"Found {len(en_voices)} English voices")
    
    # Find best female voice
    best_female = await find_best_voice_async("en-US", "female")
    if best_female:
        print(f"Best female voice: {best_female['name']}")

# asyncio.run(voice_discovery_demo())

Concurrent Operations

Batch Processing with asyncio

import asyncio
from typing import List, Dict
from google.cloud import texttospeech

class AsyncBatchProcessor:
    """Process multiple TTS requests concurrently."""
    
    def __init__(self, max_concurrent: int = 10):
        self.max_concurrent = max_concurrent
        self.client = texttospeech.TextToSpeechAsyncClient()
        self.semaphore = asyncio.Semaphore(max_concurrent)
    
    async def synthesize_single(self, text: str, voice_config: dict, 
                               audio_config: dict) -> Dict:
        """Synthesize a single text with rate limiting."""
        
        async with self.semaphore:  # Limit concurrent requests
            try:
                request = texttospeech.SynthesizeSpeechRequest(
                    input=texttospeech.SynthesisInput(text=text),
                    voice=texttospeech.VoiceSelectionParams(**voice_config),
                    audio_config=texttospeech.AudioConfig(**audio_config)
                )
                
                response = await self.client.synthesize_speech(request=request)
                
                return {
                    'success': True,
                    'audio_content': response.audio_content,
                    'text': text[:50] + "..." if len(text) > 50 else text
                }
                
            except Exception as e:
                return {
                    'success': False,
                    'error': str(e),
                    'text': text[:50] + "..." if len(text) > 50 else text
                }
    
    async def process_batch(self, text_list: List[str], 
                           voice_config: dict = None,
                           audio_config: dict = None) -> List[Dict]:
        """Process multiple texts concurrently."""
        
        # Default configurations
        default_voice = voice_config or {'language_code': 'en-US'}
        default_audio = audio_config or {
            'audio_encoding': texttospeech.AudioEncoding.MP3
        }
        
        # Create tasks for all texts
        tasks = []
        for text in text_list:
            task = self.synthesize_single(text, default_voice, default_audio)
            tasks.append(task)
        
        # Execute all tasks concurrently
        results = await asyncio.gather(*tasks, return_exceptions=True)
        
        return results
    
    async def process_with_different_voices(self, text_voice_pairs: List[tuple]) -> List[Dict]:
        """Process texts with different voice configurations."""
        
        tasks = []
        for text, voice_config, audio_config in text_voice_pairs:
            task = self.synthesize_single(text, voice_config, audio_config)
            tasks.append(task)
        
        results = await asyncio.gather(*tasks, return_exceptions=True)
        return results
    
    async def close(self):
        """Close the async client."""
        await self.client.close()

# Usage example
async def batch_processing_demo():
    """Demonstrate batch processing with async."""
    
    processor = AsyncBatchProcessor(max_concurrent=5)
    
    try:
        # Batch of texts to process
        texts = [
            "This is the first text to synthesize.",
            "Here's the second piece of content.",
            "And this is the third text sample.",
            "Fourth text for our batch processing demo.",
            "Finally, the last text in our batch."
        ]
        
        print("Processing batch of texts...")
        start_time = asyncio.get_event_loop().time()
        
        # Process all texts concurrently
        results = await processor.process_batch(texts)
        
        end_time = asyncio.get_event_loop().time()
        processing_time = end_time - start_time
        
        # Analyze results
        successful = [r for r in results if isinstance(r, dict) and r.get('success')]
        failed = [r for r in results if isinstance(r, dict) and not r.get('success')]
        
        print(f"Batch processing completed in {processing_time:.2f} seconds")
        print(f"Successful: {len(successful)}")
        print(f"Failed: {len(failed)}")
        
        # Save successful results
        for i, result in enumerate(successful):
            filename = f"batch_output_{i}.mp3"
            with open(filename, "wb") as f:
                f.write(result['audio_content'])
            print(f"Saved: {filename}")
        
        return results
    
    finally:
        await processor.close()

# Run batch processing
# results = asyncio.run(batch_processing_demo())

Multi-Voice Processing

import asyncio
from google.cloud import texttospeech

async def create_multi_voice_conversation():
    """Create conversation with different voices asynchronously."""
    
    processor = AsyncBatchProcessor(max_concurrent=3)
    
    try:
        # Conversation parts with different voices
        conversation_parts = [
            (
                "Hello, welcome to our customer service.",
                {'language_code': 'en-US', 'name': 'en-US-Neural2-A'},  # Female agent
                {'audio_encoding': texttospeech.AudioEncoding.MP3}
            ),
            (
                "Hi there, I have a question about my account.",
                {'language_code': 'en-US', 'name': 'en-US-Neural2-C'},  # Male customer
                {'audio_encoding': texttospeech.AudioEncoding.MP3}
            ),
            (
                "I'd be happy to help you with that. Can you provide your account number?",
                {'language_code': 'en-US', 'name': 'en-US-Neural2-A'},  # Female agent
                {'audio_encoding': texttospeech.AudioEncoding.MP3}
            ),
            (
                "Sure, my account number is 12345.",
                {'language_code': 'en-US', 'name': 'en-US-Neural2-C'},  # Male customer
                {'audio_encoding': texttospeech.AudioEncoding.MP3}
            )
        ]
        
        print("Creating multi-voice conversation...")
        results = await processor.process_with_different_voices(conversation_parts)
        
        # Combine successful results in order
        conversation_audio = []
        for i, result in enumerate(results):
            if isinstance(result, dict) and result.get('success'):
                conversation_audio.append(result['audio_content'])
                print(f"Part {i+1}: Generated {len(result['audio_content'])} bytes")
        
        # Save complete conversation
        if conversation_audio:
            complete_audio = b''.join(conversation_audio)
            with open("conversation.mp3", "wb") as f:
                f.write(complete_audio)
            print(f"Saved complete conversation: {len(complete_audio)} bytes")
        
        return conversation_audio
    
    finally:
        await processor.close()

# asyncio.run(create_multi_voice_conversation())

Async Streaming Operations

Async Streaming Synthesis

import asyncio
from google.cloud import texttospeech
from typing import AsyncGenerator

class AsyncStreamingSynthesis:
    """Async streaming text-to-speech synthesis."""
    
    def __init__(self):
        self.client = texttospeech.TextToSpeechAsyncClient()
    
    async def stream_synthesis(self, text_chunks: list) -> AsyncGenerator[bytes, None]:
        """Stream synthesis of multiple text chunks."""
        
        # Configure streaming
        config = texttospeech.StreamingSynthesizeConfig(
            voice=texttospeech.VoiceSelectionParams(
                language_code="en-US",
                name="en-US-Neural2-A"
            ),
            audio_config=texttospeech.StreamingAudioConfig(
                audio_encoding=texttospeech.AudioEncoding.LINEAR16,
                sample_rate_hertz=22050
            )
        )
        
        async def request_generator():
            # Configuration request
            yield texttospeech.StreamingSynthesizeRequest(streaming_config=config)
            
            # Input requests
            for chunk in text_chunks:
                yield texttospeech.StreamingSynthesizeRequest(
                    input=texttospeech.StreamingSynthesisInput(text=chunk)
                )
        
        # Stream synthesis
        response_stream = await self.client.streaming_synthesize(request_generator())
        
        async for response in response_stream:
            if response.audio_content:
                yield response.audio_content
    
    async def process_streaming_text(self, long_text: str, chunk_size: int = 100):
        """Process long text with streaming synthesis."""
        
        # Break text into chunks
        words = long_text.split()
        text_chunks = []
        current_chunk = []
        current_length = 0
        
        for word in words:
            current_chunk.append(word)
            current_length += len(word) + 1  # +1 for space
            
            if current_length >= chunk_size:
                text_chunks.append(' '.join(current_chunk))
                current_chunk = []
                current_length = 0
        
        if current_chunk:
            text_chunks.append(' '.join(current_chunk))
        
        # Stream synthesis
        audio_chunks = []
        async for audio_chunk in self.stream_synthesis(text_chunks):
            audio_chunks.append(audio_chunk)
            print(f"Received streaming audio chunk: {len(audio_chunk)} bytes")
        
        return b''.join(audio_chunks)
    
    async def close(self):
        """Close the async client."""
        await self.client.close()

# Usage example
async def streaming_demo():
    """Demonstrate async streaming synthesis."""
    
    streamer = AsyncStreamingSynthesis()
    
    try:
        long_text = """
        This is a long piece of text that will be processed using async streaming 
        synthesis. The text will be broken into smaller chunks and each chunk will 
        be sent to the synthesis service as part of a streaming request. This allows 
        for more efficient processing of long content and enables real-time audio 
        generation as the text is being processed.
        """ * 3
        
        print("Starting async streaming synthesis...")
        
        audio_data = await streamer.process_streaming_text(long_text, chunk_size=80)
        
        print(f"Streaming synthesis complete: {len(audio_data)} bytes generated")
        
        # Save result
        with open("async_streaming_output.wav", "wb") as f:
            f.write(audio_data)
        
        return audio_data
    
    finally:
        await streamer.close()

# asyncio.run(streaming_demo())

Integration with Web Frameworks

FastAPI Integration

import asyncio
from fastapi import FastAPI, HTTPException
from fastapi.responses import Response
from pydantic import BaseModel
from google.cloud import texttospeech
from typing import Optional

app = FastAPI()

# Global async client (initialized once)
tts_client = None

class TTSRequest(BaseModel):
    text: str
    language_code: str = "en-US"
    voice_name: Optional[str] = None
    audio_encoding: str = "MP3"
    speaking_rate: float = 1.0
    pitch: float = 0.0

@app.on_event("startup")
async def startup_event():
    """Initialize TTS client on startup."""
    global tts_client
    tts_client = texttospeech.TextToSpeechAsyncClient()

@app.on_event("shutdown") 
async def shutdown_event():
    """Close TTS client on shutdown."""
    global tts_client
    if tts_client:
        await tts_client.close()

@app.post("/synthesize")
async def synthesize_speech(request: TTSRequest):
    """Synthesize speech from text."""
    
    try:
        # Map string encoding to enum
        encoding_map = {
            "MP3": texttospeech.AudioEncoding.MP3,
            "LINEAR16": texttospeech.AudioEncoding.LINEAR16,
            "OGG_OPUS": texttospeech.AudioEncoding.OGG_OPUS
        }
        
        audio_encoding = encoding_map.get(request.audio_encoding, texttospeech.AudioEncoding.MP3)
        
        # Create synthesis request
        synthesis_request = texttospeech.SynthesizeSpeechRequest(
            input=texttospeech.SynthesisInput(text=request.text),
            voice=texttospeech.VoiceSelectionParams(
                language_code=request.language_code,
                name=request.voice_name
            ),
            audio_config=texttospeech.AudioConfig(
                audio_encoding=audio_encoding,
                speaking_rate=request.speaking_rate,
                pitch=request.pitch
            )
        )
        
        # Synthesize speech
        response = await tts_client.synthesize_speech(request=synthesis_request)
        
        # Return audio as response
        media_type = "audio/mpeg" if request.audio_encoding == "MP3" else "audio/wav"
        return Response(
            content=response.audio_content,
            media_type=media_type,
            headers={"Content-Disposition": "attachment; filename=speech.mp3"}
        )
        
    except Exception as e:
        raise HTTPException(status_code=500, detail=f"Synthesis failed: {str(e)}")

@app.get("/voices")
async def list_voices(language_code: Optional[str] = None):
    """List available voices."""
    
    try:
        if language_code:
            request = texttospeech.ListVoicesRequest(language_code=language_code)
            response = await tts_client.list_voices(request=request)
        else:
            response = await tts_client.list_voices()
        
        voices = []
        for voice in response.voices:
            voices.append({
                "name": voice.name,
                "language_codes": voice.language_codes,
                "ssml_gender": voice.ssml_gender.name,
                "natural_sample_rate_hertz": voice.natural_sample_rate_hertz
            })
        
        return {"voices": voices}
        
    except Exception as e:
        raise HTTPException(status_code=500, detail=f"Failed to list voices: {str(e)}")

@app.post("/batch-synthesize")
async def batch_synthesize(requests: list[TTSRequest]):
    """Synthesize multiple texts in parallel."""
    
    try:
        async def synthesize_single(req: TTSRequest):
            encoding_map = {
                "MP3": texttospeech.AudioEncoding.MP3,
                "LINEAR16": texttospeech.AudioEncoding.LINEAR16,
                "OGG_OPUS": texttospeech.AudioEncoding.OGG_OPUS
            }
            
            synthesis_request = texttospeech.SynthesizeSpeechRequest(
                input=texttospeech.SynthesisInput(text=req.text),
                voice=texttospeech.VoiceSelectionParams(
                    language_code=req.language_code,
                    name=req.voice_name
                ),
                audio_config=texttospeech.AudioConfig(
                    audio_encoding=encoding_map.get(req.audio_encoding, texttospeech.AudioEncoding.MP3),
                    speaking_rate=req.speaking_rate,
                    pitch=req.pitch
                )
            )
            
            response = await tts_client.synthesize_speech(request=synthesis_request)
            return {
                "text": req.text[:50] + "..." if len(req.text) > 50 else req.text,
                "audio_size": len(response.audio_content),
                "success": True
            }
        
        # Process requests concurrently
        tasks = [synthesize_single(req) for req in requests]
        results = await asyncio.gather(*tasks, return_exceptions=True)
        
        # Format results
        formatted_results = []
        for i, result in enumerate(results):
            if isinstance(result, Exception):
                formatted_results.append({
                    "index": i,
                    "success": False,
                    "error": str(result)
                })
            else:
                formatted_results.append({
                    "index": i,
                    **result
                })
        
        return {"results": formatted_results}
        
    except Exception as e:
        raise HTTPException(status_code=500, detail=f"Batch synthesis failed: {str(e)}")

# To run: uvicorn main:app --reload

aiohttp Integration

import asyncio
import json
from aiohttp import web, ClientSession
from google.cloud import texttospeech

class TTSService:
    """Text-to-Speech service for aiohttp application."""
    
    def __init__(self):
        self.client = None
    
    async def initialize(self):
        """Initialize the TTS client."""
        self.client = texttospeech.TextToSpeechAsyncClient()
    
    async def cleanup(self):
        """Cleanup the TTS client."""
        if self.client:
            await self.client.close()
    
    async def synthesize(self, text: str, language_code: str = "en-US",
                        voice_name: str = None) -> bytes:
        """Synthesize text to speech."""
        
        request = texttospeech.SynthesizeSpeechRequest(
            input=texttospeech.SynthesisInput(text=text),
            voice=texttospeech.VoiceSelectionParams(
                language_code=language_code,
                name=voice_name
            ),
            audio_config=texttospeech.AudioConfig(
                audio_encoding=texttospeech.AudioEncoding.MP3
            )
        )
        
        response = await self.client.synthesize_speech(request=request)
        return response.audio_content

# Global TTS service
tts_service = TTSService()

async def synthesize_handler(request):
    """Handle synthesis requests."""
    
    try:
        data = await request.json()
        text = data.get('text')
        language_code = data.get('language_code', 'en-US')
        voice_name = data.get('voice_name')
        
        if not text:
            return web.json_response({'error': 'Text is required'}, status=400)
        
        audio_data = await tts_service.synthesize(text, language_code, voice_name)
        
        return web.Response(
            body=audio_data,
            content_type='audio/mpeg',
            headers={'Content-Disposition': 'attachment; filename="speech.mp3"'}
        )
        
    except Exception as e:
        return web.json_response({'error': str(e)}, status=500)

async def health_handler(request):
    """Health check endpoint."""
    return web.json_response({'status': 'healthy'})

async def init_app():
    """Initialize the aiohttp application."""
    
    app = web.Application()
    
    # Add routes
    app.router.add_post('/synthesize', synthesize_handler)
    app.router.add_get('/health', health_handler)
    
    # Initialize TTS service
    await tts_service.initialize()
    
    # Setup cleanup
    async def cleanup_handler(app):
        await tts_service.cleanup()
    
    app.on_cleanup.append(cleanup_handler)
    
    return app

# To run: python -c "import asyncio; from main import init_app; app = asyncio.run(init_app()); web.run_app(app, port=8080)"

Error Handling in Async Operations

Async Error Handling Patterns

import asyncio
import logging
from google.api_core import exceptions
from google.cloud import texttospeech

class AsyncTTSWithErrorHandling:
    """Async TTS with comprehensive error handling."""
    
    def __init__(self, max_retries: int = 3):
        self.client = texttospeech.TextToSpeechAsyncClient()
        self.max_retries = max_retries
    
    async def synthesize_with_retry(self, text: str, **kwargs) -> dict:
        """Synthesize with automatic retry on transient errors."""
        
        for attempt in range(self.max_retries):
            try:
                request = texttospeech.SynthesizeSpeechRequest(
                    input=texttospeech.SynthesisInput(text=text),
                    voice=texttospeech.VoiceSelectionParams(
                        language_code=kwargs.get('language_code', 'en-US'),
                        name=kwargs.get('voice_name')
                    ),
                    audio_config=texttospeech.AudioConfig(
                        audio_encoding=texttospeech.AudioEncoding.MP3
                    )
                )
                
                response = await self.client.synthesize_speech(request=request)
                
                return {
                    'success': True,
                    'audio_content': response.audio_content,
                    'attempts': attempt + 1
                }
                
            except exceptions.ResourceExhausted as e:
                logging.warning(f"Rate limit hit (attempt {attempt + 1}): {e}")
                if attempt == self.max_retries - 1:
                    return {'success': False, 'error': 'Rate limit exceeded', 'attempts': attempt + 1}
                
                # Exponential backoff
                await asyncio.sleep(2 ** attempt)
                
            except exceptions.ServiceUnavailable as e:
                logging.warning(f"Service unavailable (attempt {attempt + 1}): {e}")
                if attempt == self.max_retries - 1:
                    return {'success': False, 'error': 'Service unavailable', 'attempts': attempt + 1}
                
                await asyncio.sleep(1)
                
            except exceptions.DeadlineExceeded as e:
                logging.warning(f"Request timeout (attempt {attempt + 1}): {e}")
                if attempt == self.max_retries - 1:
                    return {'success': False, 'error': 'Request timeout', 'attempts': attempt + 1}
                
            except exceptions.InvalidArgument as e:
                # Non-retryable error
                logging.error(f"Invalid argument: {e}")
                return {'success': False, 'error': f'Invalid argument: {e}', 'attempts': attempt + 1}
                
            except Exception as e:
                logging.error(f"Unexpected error (attempt {attempt + 1}): {e}")
                if attempt == self.max_retries - 1:
                    return {'success': False, 'error': f'Unexpected error: {e}', 'attempts': attempt + 1}
        
        return {'success': False, 'error': 'Max retries exceeded', 'attempts': self.max_retries}
    
    async def safe_batch_synthesis(self, text_list: list, **kwargs) -> list:
        """Safely process multiple texts with individual error handling."""
        
        async def safe_synthesize_single(text: str) -> dict:
            try:
                result = await self.synthesize_with_retry(text, **kwargs)
                result['text'] = text[:50] + "..." if len(text) > 50 else text
                return result
            except Exception as e:
                return {
                    'success': False,
                    'error': f'Failed to process: {e}',
                    'text': text[:50] + "..." if len(text) > 50 else text
                }
        
        # Process all texts concurrently with individual error handling
        tasks = [safe_synthesize_single(text) for text in text_list]
        results = await asyncio.gather(*tasks, return_exceptions=True)
        
        # Handle any gather-level exceptions
        processed_results = []
        for i, result in enumerate(results):
            if isinstance(result, Exception):
                processed_results.append({
                    'success': False,
                    'error': f'Task failed: {result}',
                    'text': text_list[i][:50] + "..." if len(text_list[i]) > 50 else text_list[i]
                })
            else:
                processed_results.append(result)
        
        return processed_results
    
    async def close(self):
        """Close the async client."""
        await self.client.close()

# Usage example
async def error_handling_demo():
    """Demonstrate error handling in async operations."""
    
    tts = AsyncTTSWithErrorHandling(max_retries=3)
    
    try:
        # Test with various scenarios
        test_texts = [
            "This is a normal text that should work fine.",
            "",  # Empty text (should cause InvalidArgument)
            "This is another normal text.",
            "A" * 10000,  # Very long text (might cause issues)
            "Final test text."
        ]
        
        print("Testing batch synthesis with error handling...")
        results = await tts.safe_batch_synthesis(test_texts, language_code="en-US")
        
        # Analyze results
        successful = [r for r in results if r['success']]
        failed = [r for r in results if not r['success']]
        
        print(f"Results: {len(successful)} successful, {len(failed)} failed")
        
        for result in results:
            status = "✅" if result['success'] else "❌"
            print(f"{status} {result['text']}")
            if not result['success']:
                print(f"   Error: {result['error']}")
        
        return results
    
    finally:
        await tts.close()

# asyncio.run(error_handling_demo())

Performance Optimization for Async Operations

Connection Pooling and Client Reuse

import asyncio
from contextlib import asynccontextmanager
from google.cloud import texttospeech

class OptimizedAsyncTTS:
    """Optimized async TTS with connection pooling."""
    
    def __init__(self):
        self._client = None
        self._client_lock = asyncio.Lock()
    
    async def get_client(self):
        """Get or create TTS client with thread-safe initialization."""
        if self._client is None:
            async with self._client_lock:
                if self._client is None:  # Double-check pattern
                    self._client = texttospeech.TextToSpeechAsyncClient()
        return self._client
    
    @asynccontextmanager
    async def client_context(self):
        """Context manager for client lifecycle."""
        client = await self.get_client()
        try:
            yield client
        finally:
            # Client cleanup is handled in close() method
            pass
    
    async def synthesize_optimized(self, text: str, **config) -> bytes:
        """Optimized synthesis with client reuse."""
        
        async with self.client_context() as client:
            request = texttospeech.SynthesizeSpeechRequest(
                input=texttospeech.SynthesisInput(text=text),
                voice=texttospeech.VoiceSelectionParams(
                    language_code=config.get('language_code', 'en-US'),
                    name=config.get('voice_name')
                ),
                audio_config=texttospeech.AudioConfig(
                    audio_encoding=texttospeech.AudioEncoding.MP3
                )
            )
            
            response = await client.synthesize_speech(request=request)
            return response.audio_content
    
    async def close(self):
        """Clean up client resources."""
        if self._client:
            await self._client.close()
            self._client = None

# Global optimized TTS instance
optimized_tts = OptimizedAsyncTTS()

async def performance_benchmark():
    """Benchmark async TTS performance."""
    import time
    
    test_texts = [f"This is test text number {i}" for i in range(20)]
    
    # Sequential processing
    start_time = time.time()
    sequential_results = []
    for text in test_texts:
        audio = await optimized_tts.synthesize_optimized(text)
        sequential_results.append(len(audio))
    sequential_time = time.time() - start_time
    
    # Concurrent processing
    start_time = time.time()
    tasks = [optimized_tts.synthesize_optimized(text) for text in test_texts]
    concurrent_results = await asyncio.gather(*tasks)
    concurrent_time = time.time() - start_time
    
    print(f"Sequential processing: {sequential_time:.2f} seconds")
    print(f"Concurrent processing: {concurrent_time:.2f} seconds") 
    print(f"Speedup: {sequential_time / concurrent_time:.2f}x")
    
    await optimized_tts.close()

# asyncio.run(performance_benchmark())

Memory-Efficient Async Processing

import asyncio
from typing import AsyncIterator
from google.cloud import texttospeech

async def memory_efficient_processing(text_iterator: AsyncIterator[str], 
                                     batch_size: int = 5) -> AsyncIterator[bytes]:
    """Process texts in batches to manage memory usage."""
    
    client = texttospeech.TextToSpeechAsyncClient()
    
    try:
        batch = []
        
        async for text in text_iterator:
            batch.append(text)
            
            if len(batch) >= batch_size:
                # Process batch
                tasks = []
                for text_item in batch:
                    request = texttospeech.SynthesizeSpeechRequest(
                        input=texttospeech.SynthesisInput(text=text_item),
                        voice=texttospeech.VoiceSelectionParams(language_code="en-US"),
                        audio_config=texttospeech.AudioConfig(
                            audio_encoding=texttospeech.AudioEncoding.MP3
                        )
                    )
                    task = client.synthesize_speech(request=request)
                    tasks.append(task)
                
                # Yield results as they complete
                results = await asyncio.gather(*tasks)
                for response in results:
                    yield response.audio_content
                
                # Clear batch
                batch = []
        
        # Process remaining items
        if batch:
            tasks = []
            for text_item in batch:
                request = texttospeech.SynthesizeSpeechRequest(
                    input=texttospeech.SynthesisInput(text=text_item),
                    voice=texttospeech.VoiceSelectionParams(language_code="en-US"),
                    audio_config=texttospeech.AudioConfig(
                        audio_encoding=texttospeech.AudioEncoding.MP3
                    )
                )
                task = client.synthesize_speech(request=request)
                tasks.append(task)
            
            results = await asyncio.gather(*tasks)
            for response in results:
                yield response.audio_content
    
    finally:
        await client.close()

# Example usage
async def text_generator():
    """Generate texts for processing."""
    for i in range(50):
        yield f"This is text number {i} for memory-efficient processing."

async def process_with_memory_efficiency():
    """Demonstrate memory-efficient processing."""
    
    audio_count = 0
    total_bytes = 0
    
    async for audio_data in memory_efficient_processing(text_generator(), batch_size=3):
        audio_count += 1
        total_bytes += len(audio_data)
        print(f"Processed audio {audio_count}: {len(audio_data)} bytes")
    
    print(f"Total: {audio_count} audio files, {total_bytes} bytes")

# asyncio.run(process_with_memory_efficiency())

Install with Tessl CLI

npx tessl i tessl/pypi-google-cloud-texttospeech

docs

async-clients.md

configuration-types.md

index.md

long-audio-synthesis.md

speech-synthesis.md

streaming-synthesis.md

voice-management.md

tile.json