0
# Async Clients
1
2
## Overview
3
4
The Google Cloud Text-to-Speech API provides full async/await support through asynchronous client classes. These clients enable non-blocking operations, making them ideal for applications that need to handle multiple synthesis requests concurrently or integrate with async frameworks like FastAPI, aiohttp, or asyncio-based applications.
5
6
## Async Client Classes
7
8
### TextToSpeechAsyncClient
9
10
```api { .api }
11
import asyncio
12
from google.cloud import texttospeech
13
14
# Initialize async client
15
async_client = texttospeech.TextToSpeechAsyncClient()
16
17
# Basic async synthesis
18
async def basic_async_synthesis():
19
request = texttospeech.SynthesizeSpeechRequest(
20
input=texttospeech.SynthesisInput(text="Hello from async synthesis!"),
21
voice=texttospeech.VoiceSelectionParams(language_code="en-US"),
22
audio_config=texttospeech.AudioConfig(
23
audio_encoding=texttospeech.AudioEncoding.MP3
24
)
25
)
26
27
response = await async_client.synthesize_speech(request=request)
28
return response.audio_content
29
30
# Run async function
31
audio_data = asyncio.run(basic_async_synthesis())
32
```
33
34
### TextToSpeechLongAudioSynthesizeAsyncClient
35
36
```api { .api }
37
from google.cloud.texttospeech_v1.services import text_to_speech_long_audio_synthesize
38
39
# Initialize async long audio client
40
async_long_client = text_to_speech_long_audio_synthesize.TextToSpeechLongAudioSynthesizeAsyncClient()
41
42
# Async long audio synthesis
43
async def async_long_audio_synthesis():
44
request = texttospeech.SynthesizeLongAudioRequest(
45
parent="projects/your-project-id/locations/us-central1",
46
input=texttospeech.SynthesisInput(text="Very long text content..." * 100),
47
voice=texttospeech.VoiceSelectionParams(language_code="en-US"),
48
audio_config=texttospeech.AudioConfig(
49
audio_encoding=texttospeech.AudioEncoding.MP3
50
),
51
output_gcs_uri="gs://your-bucket/async-long-audio.mp3"
52
)
53
54
operation = await async_long_client.synthesize_long_audio(request=request)
55
return operation
56
57
# Usage
58
# operation = asyncio.run(async_long_audio_synthesis())
59
```
60
61
## Core Async Operations
62
63
### Async Speech Synthesis
64
65
```api { .api }
66
import asyncio
67
from google.cloud import texttospeech
68
69
class AsyncTextToSpeech:
70
"""Async Text-to-Speech wrapper class."""
71
72
def __init__(self):
73
self.client = texttospeech.TextToSpeechAsyncClient()
74
75
async def synthesize_text(self, text: str, language_code: str = "en-US",
76
voice_name: str = None) -> bytes:
77
"""Synthesize text to audio asynchronously."""
78
79
voice = texttospeech.VoiceSelectionParams(
80
language_code=language_code,
81
name=voice_name
82
)
83
84
audio_config = texttospeech.AudioConfig(
85
audio_encoding=texttospeech.AudioEncoding.MP3
86
)
87
88
request = texttospeech.SynthesizeSpeechRequest(
89
input=texttospeech.SynthesisInput(text=text),
90
voice=voice,
91
audio_config=audio_config
92
)
93
94
response = await self.client.synthesize_speech(request=request)
95
return response.audio_content
96
97
async def synthesize_ssml(self, ssml: str, language_code: str = "en-US",
98
voice_name: str = None) -> bytes:
99
"""Synthesize SSML to audio asynchronously."""
100
101
voice = texttospeech.VoiceSelectionParams(
102
language_code=language_code,
103
name=voice_name
104
)
105
106
audio_config = texttospeech.AudioConfig(
107
audio_encoding=texttospeech.AudioEncoding.LINEAR16,
108
sample_rate_hertz=24000
109
)
110
111
request = texttospeech.SynthesizeSpeechRequest(
112
input=texttospeech.SynthesisInput(ssml=ssml),
113
voice=voice,
114
audio_config=audio_config
115
)
116
117
response = await self.client.synthesize_speech(request=request)
118
return response.audio_content
119
120
async def close(self):
121
"""Close the async client."""
122
await self.client.close()
123
124
# Usage example
125
async def demo_async_synthesis():
126
tts = AsyncTextToSpeech()
127
128
try:
129
# Synthesize text
130
audio1 = await tts.synthesize_text("Hello async world!")
131
print(f"Generated {len(audio1)} bytes of audio")
132
133
# Synthesize SSML
134
ssml = '<speak>This is <emphasis level="strong">emphasized</emphasis> text.</speak>'
135
audio2 = await tts.synthesize_ssml(ssml)
136
print(f"Generated {len(audio2)} bytes from SSML")
137
138
finally:
139
await tts.close()
140
141
# Run the demo
142
# asyncio.run(demo_async_synthesis())
143
```
144
145
### Async Voice Listing
146
147
```api { .api }
148
import asyncio
149
from google.cloud import texttospeech
150
151
async def list_voices_async(language_filter: str = None):
152
"""List available voices asynchronously."""
153
154
async_client = texttospeech.TextToSpeechAsyncClient()
155
156
try:
157
if language_filter:
158
request = texttospeech.ListVoicesRequest(language_code=language_filter)
159
response = await async_client.list_voices(request=request)
160
else:
161
response = await async_client.list_voices()
162
163
voices = []
164
for voice in response.voices:
165
voices.append({
166
'name': voice.name,
167
'language_codes': voice.language_codes,
168
'ssml_gender': voice.ssml_gender.name,
169
'natural_sample_rate_hertz': voice.natural_sample_rate_hertz
170
})
171
172
return voices
173
174
finally:
175
await async_client.close()
176
177
async def find_best_voice_async(language_code: str, gender: str = None):
178
"""Find the best voice for language and gender asynchronously."""
179
180
voices = await list_voices_async(language_code)
181
182
# Filter by gender if specified
183
if gender:
184
gender_upper = gender.upper()
185
voices = [v for v in voices if v['ssml_gender'] == gender_upper]
186
187
# Prefer Neural2 > Wavenet > Standard
188
for voice_type in ['Neural2', 'Wavenet', 'Standard']:
189
for voice in voices:
190
if voice_type in voice['name']:
191
return voice
192
193
return voices[0] if voices else None
194
195
# Usage
196
async def voice_discovery_demo():
197
# List all English voices
198
en_voices = await list_voices_async("en-US")
199
print(f"Found {len(en_voices)} English voices")
200
201
# Find best female voice
202
best_female = await find_best_voice_async("en-US", "female")
203
if best_female:
204
print(f"Best female voice: {best_female['name']}")
205
206
# asyncio.run(voice_discovery_demo())
207
```
208
209
## Concurrent Operations
210
211
### Batch Processing with asyncio
212
213
```api { .api }
214
import asyncio
215
from typing import List, Dict
216
from google.cloud import texttospeech
217
218
class AsyncBatchProcessor:
219
"""Process multiple TTS requests concurrently."""
220
221
def __init__(self, max_concurrent: int = 10):
222
self.max_concurrent = max_concurrent
223
self.client = texttospeech.TextToSpeechAsyncClient()
224
self.semaphore = asyncio.Semaphore(max_concurrent)
225
226
async def synthesize_single(self, text: str, voice_config: dict,
227
audio_config: dict) -> Dict:
228
"""Synthesize a single text with rate limiting."""
229
230
async with self.semaphore: # Limit concurrent requests
231
try:
232
request = texttospeech.SynthesizeSpeechRequest(
233
input=texttospeech.SynthesisInput(text=text),
234
voice=texttospeech.VoiceSelectionParams(**voice_config),
235
audio_config=texttospeech.AudioConfig(**audio_config)
236
)
237
238
response = await self.client.synthesize_speech(request=request)
239
240
return {
241
'success': True,
242
'audio_content': response.audio_content,
243
'text': text[:50] + "..." if len(text) > 50 else text
244
}
245
246
except Exception as e:
247
return {
248
'success': False,
249
'error': str(e),
250
'text': text[:50] + "..." if len(text) > 50 else text
251
}
252
253
async def process_batch(self, text_list: List[str],
254
voice_config: dict = None,
255
audio_config: dict = None) -> List[Dict]:
256
"""Process multiple texts concurrently."""
257
258
# Default configurations
259
default_voice = voice_config or {'language_code': 'en-US'}
260
default_audio = audio_config or {
261
'audio_encoding': texttospeech.AudioEncoding.MP3
262
}
263
264
# Create tasks for all texts
265
tasks = []
266
for text in text_list:
267
task = self.synthesize_single(text, default_voice, default_audio)
268
tasks.append(task)
269
270
# Execute all tasks concurrently
271
results = await asyncio.gather(*tasks, return_exceptions=True)
272
273
return results
274
275
async def process_with_different_voices(self, text_voice_pairs: List[tuple]) -> List[Dict]:
276
"""Process texts with different voice configurations."""
277
278
tasks = []
279
for text, voice_config, audio_config in text_voice_pairs:
280
task = self.synthesize_single(text, voice_config, audio_config)
281
tasks.append(task)
282
283
results = await asyncio.gather(*tasks, return_exceptions=True)
284
return results
285
286
async def close(self):
287
"""Close the async client."""
288
await self.client.close()
289
290
# Usage example
291
async def batch_processing_demo():
292
"""Demonstrate batch processing with async."""
293
294
processor = AsyncBatchProcessor(max_concurrent=5)
295
296
try:
297
# Batch of texts to process
298
texts = [
299
"This is the first text to synthesize.",
300
"Here's the second piece of content.",
301
"And this is the third text sample.",
302
"Fourth text for our batch processing demo.",
303
"Finally, the last text in our batch."
304
]
305
306
print("Processing batch of texts...")
307
start_time = asyncio.get_event_loop().time()
308
309
# Process all texts concurrently
310
results = await processor.process_batch(texts)
311
312
end_time = asyncio.get_event_loop().time()
313
processing_time = end_time - start_time
314
315
# Analyze results
316
successful = [r for r in results if isinstance(r, dict) and r.get('success')]
317
failed = [r for r in results if isinstance(r, dict) and not r.get('success')]
318
319
print(f"Batch processing completed in {processing_time:.2f} seconds")
320
print(f"Successful: {len(successful)}")
321
print(f"Failed: {len(failed)}")
322
323
# Save successful results
324
for i, result in enumerate(successful):
325
filename = f"batch_output_{i}.mp3"
326
with open(filename, "wb") as f:
327
f.write(result['audio_content'])
328
print(f"Saved: {filename}")
329
330
return results
331
332
finally:
333
await processor.close()
334
335
# Run batch processing
336
# results = asyncio.run(batch_processing_demo())
337
```
338
339
### Multi-Voice Processing
340
341
```api { .api }
342
import asyncio
343
from google.cloud import texttospeech
344
345
async def create_multi_voice_conversation():
346
"""Create conversation with different voices asynchronously."""
347
348
processor = AsyncBatchProcessor(max_concurrent=3)
349
350
try:
351
# Conversation parts with different voices
352
conversation_parts = [
353
(
354
"Hello, welcome to our customer service.",
355
{'language_code': 'en-US', 'name': 'en-US-Neural2-A'}, # Female agent
356
{'audio_encoding': texttospeech.AudioEncoding.MP3}
357
),
358
(
359
"Hi there, I have a question about my account.",
360
{'language_code': 'en-US', 'name': 'en-US-Neural2-C'}, # Male customer
361
{'audio_encoding': texttospeech.AudioEncoding.MP3}
362
),
363
(
364
"I'd be happy to help you with that. Can you provide your account number?",
365
{'language_code': 'en-US', 'name': 'en-US-Neural2-A'}, # Female agent
366
{'audio_encoding': texttospeech.AudioEncoding.MP3}
367
),
368
(
369
"Sure, my account number is 12345.",
370
{'language_code': 'en-US', 'name': 'en-US-Neural2-C'}, # Male customer
371
{'audio_encoding': texttospeech.AudioEncoding.MP3}
372
)
373
]
374
375
print("Creating multi-voice conversation...")
376
results = await processor.process_with_different_voices(conversation_parts)
377
378
# Combine successful results in order
379
conversation_audio = []
380
for i, result in enumerate(results):
381
if isinstance(result, dict) and result.get('success'):
382
conversation_audio.append(result['audio_content'])
383
print(f"Part {i+1}: Generated {len(result['audio_content'])} bytes")
384
385
# Save complete conversation
386
if conversation_audio:
387
complete_audio = b''.join(conversation_audio)
388
with open("conversation.mp3", "wb") as f:
389
f.write(complete_audio)
390
print(f"Saved complete conversation: {len(complete_audio)} bytes")
391
392
return conversation_audio
393
394
finally:
395
await processor.close()
396
397
# asyncio.run(create_multi_voice_conversation())
398
```
399
400
## Async Streaming Operations
401
402
### Async Streaming Synthesis
403
404
```api { .api }
405
import asyncio
406
from google.cloud import texttospeech
407
from typing import AsyncGenerator
408
409
class AsyncStreamingSynthesis:
410
"""Async streaming text-to-speech synthesis."""
411
412
def __init__(self):
413
self.client = texttospeech.TextToSpeechAsyncClient()
414
415
async def stream_synthesis(self, text_chunks: list) -> AsyncGenerator[bytes, None]:
416
"""Stream synthesis of multiple text chunks."""
417
418
# Configure streaming
419
config = texttospeech.StreamingSynthesizeConfig(
420
voice=texttospeech.VoiceSelectionParams(
421
language_code="en-US",
422
name="en-US-Neural2-A"
423
),
424
audio_config=texttospeech.StreamingAudioConfig(
425
audio_encoding=texttospeech.AudioEncoding.LINEAR16,
426
sample_rate_hertz=22050
427
)
428
)
429
430
async def request_generator():
431
# Configuration request
432
yield texttospeech.StreamingSynthesizeRequest(streaming_config=config)
433
434
# Input requests
435
for chunk in text_chunks:
436
yield texttospeech.StreamingSynthesizeRequest(
437
input=texttospeech.StreamingSynthesisInput(text=chunk)
438
)
439
440
# Stream synthesis
441
response_stream = await self.client.streaming_synthesize(request_generator())
442
443
async for response in response_stream:
444
if response.audio_content:
445
yield response.audio_content
446
447
async def process_streaming_text(self, long_text: str, chunk_size: int = 100):
448
"""Process long text with streaming synthesis."""
449
450
# Break text into chunks
451
words = long_text.split()
452
text_chunks = []
453
current_chunk = []
454
current_length = 0
455
456
for word in words:
457
current_chunk.append(word)
458
current_length += len(word) + 1 # +1 for space
459
460
if current_length >= chunk_size:
461
text_chunks.append(' '.join(current_chunk))
462
current_chunk = []
463
current_length = 0
464
465
if current_chunk:
466
text_chunks.append(' '.join(current_chunk))
467
468
# Stream synthesis
469
audio_chunks = []
470
async for audio_chunk in self.stream_synthesis(text_chunks):
471
audio_chunks.append(audio_chunk)
472
print(f"Received streaming audio chunk: {len(audio_chunk)} bytes")
473
474
return b''.join(audio_chunks)
475
476
async def close(self):
477
"""Close the async client."""
478
await self.client.close()
479
480
# Usage example
481
async def streaming_demo():
482
"""Demonstrate async streaming synthesis."""
483
484
streamer = AsyncStreamingSynthesis()
485
486
try:
487
long_text = """
488
This is a long piece of text that will be processed using async streaming
489
synthesis. The text will be broken into smaller chunks and each chunk will
490
be sent to the synthesis service as part of a streaming request. This allows
491
for more efficient processing of long content and enables real-time audio
492
generation as the text is being processed.
493
""" * 3
494
495
print("Starting async streaming synthesis...")
496
497
audio_data = await streamer.process_streaming_text(long_text, chunk_size=80)
498
499
print(f"Streaming synthesis complete: {len(audio_data)} bytes generated")
500
501
# Save result
502
with open("async_streaming_output.wav", "wb") as f:
503
f.write(audio_data)
504
505
return audio_data
506
507
finally:
508
await streamer.close()
509
510
# asyncio.run(streaming_demo())
511
```
512
513
## Integration with Web Frameworks
514
515
### FastAPI Integration
516
517
```api { .api }
518
import asyncio
519
from fastapi import FastAPI, HTTPException
520
from fastapi.responses import Response
521
from pydantic import BaseModel
522
from google.cloud import texttospeech
523
from typing import Optional
524
525
app = FastAPI()
526
527
# Global async client (initialized once)
528
tts_client = None
529
530
class TTSRequest(BaseModel):
531
text: str
532
language_code: str = "en-US"
533
voice_name: Optional[str] = None
534
audio_encoding: str = "MP3"
535
speaking_rate: float = 1.0
536
pitch: float = 0.0
537
538
@app.on_event("startup")
539
async def startup_event():
540
"""Initialize TTS client on startup."""
541
global tts_client
542
tts_client = texttospeech.TextToSpeechAsyncClient()
543
544
@app.on_event("shutdown")
545
async def shutdown_event():
546
"""Close TTS client on shutdown."""
547
global tts_client
548
if tts_client:
549
await tts_client.close()
550
551
@app.post("/synthesize")
552
async def synthesize_speech(request: TTSRequest):
553
"""Synthesize speech from text."""
554
555
try:
556
# Map string encoding to enum
557
encoding_map = {
558
"MP3": texttospeech.AudioEncoding.MP3,
559
"LINEAR16": texttospeech.AudioEncoding.LINEAR16,
560
"OGG_OPUS": texttospeech.AudioEncoding.OGG_OPUS
561
}
562
563
audio_encoding = encoding_map.get(request.audio_encoding, texttospeech.AudioEncoding.MP3)
564
565
# Create synthesis request
566
synthesis_request = texttospeech.SynthesizeSpeechRequest(
567
input=texttospeech.SynthesisInput(text=request.text),
568
voice=texttospeech.VoiceSelectionParams(
569
language_code=request.language_code,
570
name=request.voice_name
571
),
572
audio_config=texttospeech.AudioConfig(
573
audio_encoding=audio_encoding,
574
speaking_rate=request.speaking_rate,
575
pitch=request.pitch
576
)
577
)
578
579
# Synthesize speech
580
response = await tts_client.synthesize_speech(request=synthesis_request)
581
582
# Return audio as response
583
media_type = "audio/mpeg" if request.audio_encoding == "MP3" else "audio/wav"
584
return Response(
585
content=response.audio_content,
586
media_type=media_type,
587
headers={"Content-Disposition": "attachment; filename=speech.mp3"}
588
)
589
590
except Exception as e:
591
raise HTTPException(status_code=500, detail=f"Synthesis failed: {str(e)}")
592
593
@app.get("/voices")
594
async def list_voices(language_code: Optional[str] = None):
595
"""List available voices."""
596
597
try:
598
if language_code:
599
request = texttospeech.ListVoicesRequest(language_code=language_code)
600
response = await tts_client.list_voices(request=request)
601
else:
602
response = await tts_client.list_voices()
603
604
voices = []
605
for voice in response.voices:
606
voices.append({
607
"name": voice.name,
608
"language_codes": voice.language_codes,
609
"ssml_gender": voice.ssml_gender.name,
610
"natural_sample_rate_hertz": voice.natural_sample_rate_hertz
611
})
612
613
return {"voices": voices}
614
615
except Exception as e:
616
raise HTTPException(status_code=500, detail=f"Failed to list voices: {str(e)}")
617
618
@app.post("/batch-synthesize")
619
async def batch_synthesize(requests: list[TTSRequest]):
620
"""Synthesize multiple texts in parallel."""
621
622
try:
623
async def synthesize_single(req: TTSRequest):
624
encoding_map = {
625
"MP3": texttospeech.AudioEncoding.MP3,
626
"LINEAR16": texttospeech.AudioEncoding.LINEAR16,
627
"OGG_OPUS": texttospeech.AudioEncoding.OGG_OPUS
628
}
629
630
synthesis_request = texttospeech.SynthesizeSpeechRequest(
631
input=texttospeech.SynthesisInput(text=req.text),
632
voice=texttospeech.VoiceSelectionParams(
633
language_code=req.language_code,
634
name=req.voice_name
635
),
636
audio_config=texttospeech.AudioConfig(
637
audio_encoding=encoding_map.get(req.audio_encoding, texttospeech.AudioEncoding.MP3),
638
speaking_rate=req.speaking_rate,
639
pitch=req.pitch
640
)
641
)
642
643
response = await tts_client.synthesize_speech(request=synthesis_request)
644
return {
645
"text": req.text[:50] + "..." if len(req.text) > 50 else req.text,
646
"audio_size": len(response.audio_content),
647
"success": True
648
}
649
650
# Process requests concurrently
651
tasks = [synthesize_single(req) for req in requests]
652
results = await asyncio.gather(*tasks, return_exceptions=True)
653
654
# Format results
655
formatted_results = []
656
for i, result in enumerate(results):
657
if isinstance(result, Exception):
658
formatted_results.append({
659
"index": i,
660
"success": False,
661
"error": str(result)
662
})
663
else:
664
formatted_results.append({
665
"index": i,
666
**result
667
})
668
669
return {"results": formatted_results}
670
671
except Exception as e:
672
raise HTTPException(status_code=500, detail=f"Batch synthesis failed: {str(e)}")
673
674
# To run: uvicorn main:app --reload
675
```
676
677
### aiohttp Integration
678
679
```api { .api }
680
import asyncio
681
import json
682
from aiohttp import web, ClientSession
683
from google.cloud import texttospeech
684
685
class TTSService:
686
"""Text-to-Speech service for aiohttp application."""
687
688
def __init__(self):
689
self.client = None
690
691
async def initialize(self):
692
"""Initialize the TTS client."""
693
self.client = texttospeech.TextToSpeechAsyncClient()
694
695
async def cleanup(self):
696
"""Cleanup the TTS client."""
697
if self.client:
698
await self.client.close()
699
700
async def synthesize(self, text: str, language_code: str = "en-US",
701
voice_name: str = None) -> bytes:
702
"""Synthesize text to speech."""
703
704
request = texttospeech.SynthesizeSpeechRequest(
705
input=texttospeech.SynthesisInput(text=text),
706
voice=texttospeech.VoiceSelectionParams(
707
language_code=language_code,
708
name=voice_name
709
),
710
audio_config=texttospeech.AudioConfig(
711
audio_encoding=texttospeech.AudioEncoding.MP3
712
)
713
)
714
715
response = await self.client.synthesize_speech(request=request)
716
return response.audio_content
717
718
# Global TTS service
719
tts_service = TTSService()
720
721
async def synthesize_handler(request):
722
"""Handle synthesis requests."""
723
724
try:
725
data = await request.json()
726
text = data.get('text')
727
language_code = data.get('language_code', 'en-US')
728
voice_name = data.get('voice_name')
729
730
if not text:
731
return web.json_response({'error': 'Text is required'}, status=400)
732
733
audio_data = await tts_service.synthesize(text, language_code, voice_name)
734
735
return web.Response(
736
body=audio_data,
737
content_type='audio/mpeg',
738
headers={'Content-Disposition': 'attachment; filename="speech.mp3"'}
739
)
740
741
except Exception as e:
742
return web.json_response({'error': str(e)}, status=500)
743
744
async def health_handler(request):
745
"""Health check endpoint."""
746
return web.json_response({'status': 'healthy'})
747
748
async def init_app():
749
"""Initialize the aiohttp application."""
750
751
app = web.Application()
752
753
# Add routes
754
app.router.add_post('/synthesize', synthesize_handler)
755
app.router.add_get('/health', health_handler)
756
757
# Initialize TTS service
758
await tts_service.initialize()
759
760
# Setup cleanup
761
async def cleanup_handler(app):
762
await tts_service.cleanup()
763
764
app.on_cleanup.append(cleanup_handler)
765
766
return app
767
768
# To run: python -c "import asyncio; from main import init_app; app = asyncio.run(init_app()); web.run_app(app, port=8080)"
769
```
770
771
## Error Handling in Async Operations
772
773
### Async Error Handling Patterns
774
775
```api { .api }
776
import asyncio
777
import logging
778
from google.api_core import exceptions
779
from google.cloud import texttospeech
780
781
class AsyncTTSWithErrorHandling:
782
"""Async TTS with comprehensive error handling."""
783
784
def __init__(self, max_retries: int = 3):
785
self.client = texttospeech.TextToSpeechAsyncClient()
786
self.max_retries = max_retries
787
788
async def synthesize_with_retry(self, text: str, **kwargs) -> dict:
789
"""Synthesize with automatic retry on transient errors."""
790
791
for attempt in range(self.max_retries):
792
try:
793
request = texttospeech.SynthesizeSpeechRequest(
794
input=texttospeech.SynthesisInput(text=text),
795
voice=texttospeech.VoiceSelectionParams(
796
language_code=kwargs.get('language_code', 'en-US'),
797
name=kwargs.get('voice_name')
798
),
799
audio_config=texttospeech.AudioConfig(
800
audio_encoding=texttospeech.AudioEncoding.MP3
801
)
802
)
803
804
response = await self.client.synthesize_speech(request=request)
805
806
return {
807
'success': True,
808
'audio_content': response.audio_content,
809
'attempts': attempt + 1
810
}
811
812
except exceptions.ResourceExhausted as e:
813
logging.warning(f"Rate limit hit (attempt {attempt + 1}): {e}")
814
if attempt == self.max_retries - 1:
815
return {'success': False, 'error': 'Rate limit exceeded', 'attempts': attempt + 1}
816
817
# Exponential backoff
818
await asyncio.sleep(2 ** attempt)
819
820
except exceptions.ServiceUnavailable as e:
821
logging.warning(f"Service unavailable (attempt {attempt + 1}): {e}")
822
if attempt == self.max_retries - 1:
823
return {'success': False, 'error': 'Service unavailable', 'attempts': attempt + 1}
824
825
await asyncio.sleep(1)
826
827
except exceptions.DeadlineExceeded as e:
828
logging.warning(f"Request timeout (attempt {attempt + 1}): {e}")
829
if attempt == self.max_retries - 1:
830
return {'success': False, 'error': 'Request timeout', 'attempts': attempt + 1}
831
832
except exceptions.InvalidArgument as e:
833
# Non-retryable error
834
logging.error(f"Invalid argument: {e}")
835
return {'success': False, 'error': f'Invalid argument: {e}', 'attempts': attempt + 1}
836
837
except Exception as e:
838
logging.error(f"Unexpected error (attempt {attempt + 1}): {e}")
839
if attempt == self.max_retries - 1:
840
return {'success': False, 'error': f'Unexpected error: {e}', 'attempts': attempt + 1}
841
842
return {'success': False, 'error': 'Max retries exceeded', 'attempts': self.max_retries}
843
844
async def safe_batch_synthesis(self, text_list: list, **kwargs) -> list:
845
"""Safely process multiple texts with individual error handling."""
846
847
async def safe_synthesize_single(text: str) -> dict:
848
try:
849
result = await self.synthesize_with_retry(text, **kwargs)
850
result['text'] = text[:50] + "..." if len(text) > 50 else text
851
return result
852
except Exception as e:
853
return {
854
'success': False,
855
'error': f'Failed to process: {e}',
856
'text': text[:50] + "..." if len(text) > 50 else text
857
}
858
859
# Process all texts concurrently with individual error handling
860
tasks = [safe_synthesize_single(text) for text in text_list]
861
results = await asyncio.gather(*tasks, return_exceptions=True)
862
863
# Handle any gather-level exceptions
864
processed_results = []
865
for i, result in enumerate(results):
866
if isinstance(result, Exception):
867
processed_results.append({
868
'success': False,
869
'error': f'Task failed: {result}',
870
'text': text_list[i][:50] + "..." if len(text_list[i]) > 50 else text_list[i]
871
})
872
else:
873
processed_results.append(result)
874
875
return processed_results
876
877
async def close(self):
878
"""Close the async client."""
879
await self.client.close()
880
881
# Usage example
882
async def error_handling_demo():
883
"""Demonstrate error handling in async operations."""
884
885
tts = AsyncTTSWithErrorHandling(max_retries=3)
886
887
try:
888
# Test with various scenarios
889
test_texts = [
890
"This is a normal text that should work fine.",
891
"", # Empty text (should cause InvalidArgument)
892
"This is another normal text.",
893
"A" * 10000, # Very long text (might cause issues)
894
"Final test text."
895
]
896
897
print("Testing batch synthesis with error handling...")
898
results = await tts.safe_batch_synthesis(test_texts, language_code="en-US")
899
900
# Analyze results
901
successful = [r for r in results if r['success']]
902
failed = [r for r in results if not r['success']]
903
904
print(f"Results: {len(successful)} successful, {len(failed)} failed")
905
906
for result in results:
907
status = "✅" if result['success'] else "❌"
908
print(f"{status} {result['text']}")
909
if not result['success']:
910
print(f" Error: {result['error']}")
911
912
return results
913
914
finally:
915
await tts.close()
916
917
# asyncio.run(error_handling_demo())
918
```
919
920
## Performance Optimization for Async Operations
921
922
### Connection Pooling and Client Reuse
923
924
```api { .api }
925
import asyncio
926
from contextlib import asynccontextmanager
927
from google.cloud import texttospeech
928
929
class OptimizedAsyncTTS:
930
"""Optimized async TTS with connection pooling."""
931
932
def __init__(self):
933
self._client = None
934
self._client_lock = asyncio.Lock()
935
936
async def get_client(self):
937
"""Get or create TTS client with thread-safe initialization."""
938
if self._client is None:
939
async with self._client_lock:
940
if self._client is None: # Double-check pattern
941
self._client = texttospeech.TextToSpeechAsyncClient()
942
return self._client
943
944
@asynccontextmanager
945
async def client_context(self):
946
"""Context manager for client lifecycle."""
947
client = await self.get_client()
948
try:
949
yield client
950
finally:
951
# Client cleanup is handled in close() method
952
pass
953
954
async def synthesize_optimized(self, text: str, **config) -> bytes:
955
"""Optimized synthesis with client reuse."""
956
957
async with self.client_context() as client:
958
request = texttospeech.SynthesizeSpeechRequest(
959
input=texttospeech.SynthesisInput(text=text),
960
voice=texttospeech.VoiceSelectionParams(
961
language_code=config.get('language_code', 'en-US'),
962
name=config.get('voice_name')
963
),
964
audio_config=texttospeech.AudioConfig(
965
audio_encoding=texttospeech.AudioEncoding.MP3
966
)
967
)
968
969
response = await client.synthesize_speech(request=request)
970
return response.audio_content
971
972
async def close(self):
973
"""Clean up client resources."""
974
if self._client:
975
await self._client.close()
976
self._client = None
977
978
# Global optimized TTS instance
979
optimized_tts = OptimizedAsyncTTS()
980
981
async def performance_benchmark():
982
"""Benchmark async TTS performance."""
983
import time
984
985
test_texts = [f"This is test text number {i}" for i in range(20)]
986
987
# Sequential processing
988
start_time = time.time()
989
sequential_results = []
990
for text in test_texts:
991
audio = await optimized_tts.synthesize_optimized(text)
992
sequential_results.append(len(audio))
993
sequential_time = time.time() - start_time
994
995
# Concurrent processing
996
start_time = time.time()
997
tasks = [optimized_tts.synthesize_optimized(text) for text in test_texts]
998
concurrent_results = await asyncio.gather(*tasks)
999
concurrent_time = time.time() - start_time
1000
1001
print(f"Sequential processing: {sequential_time:.2f} seconds")
1002
print(f"Concurrent processing: {concurrent_time:.2f} seconds")
1003
print(f"Speedup: {sequential_time / concurrent_time:.2f}x")
1004
1005
await optimized_tts.close()
1006
1007
# asyncio.run(performance_benchmark())
1008
```
1009
1010
### Memory-Efficient Async Processing
1011
1012
```api { .api }
1013
import asyncio
1014
from typing import AsyncIterator
1015
from google.cloud import texttospeech
1016
1017
async def memory_efficient_processing(text_iterator: AsyncIterator[str],
1018
batch_size: int = 5) -> AsyncIterator[bytes]:
1019
"""Process texts in batches to manage memory usage."""
1020
1021
client = texttospeech.TextToSpeechAsyncClient()
1022
1023
try:
1024
batch = []
1025
1026
async for text in text_iterator:
1027
batch.append(text)
1028
1029
if len(batch) >= batch_size:
1030
# Process batch
1031
tasks = []
1032
for text_item in batch:
1033
request = texttospeech.SynthesizeSpeechRequest(
1034
input=texttospeech.SynthesisInput(text=text_item),
1035
voice=texttospeech.VoiceSelectionParams(language_code="en-US"),
1036
audio_config=texttospeech.AudioConfig(
1037
audio_encoding=texttospeech.AudioEncoding.MP3
1038
)
1039
)
1040
task = client.synthesize_speech(request=request)
1041
tasks.append(task)
1042
1043
# Yield results as they complete
1044
results = await asyncio.gather(*tasks)
1045
for response in results:
1046
yield response.audio_content
1047
1048
# Clear batch
1049
batch = []
1050
1051
# Process remaining items
1052
if batch:
1053
tasks = []
1054
for text_item in batch:
1055
request = texttospeech.SynthesizeSpeechRequest(
1056
input=texttospeech.SynthesisInput(text=text_item),
1057
voice=texttospeech.VoiceSelectionParams(language_code="en-US"),
1058
audio_config=texttospeech.AudioConfig(
1059
audio_encoding=texttospeech.AudioEncoding.MP3
1060
)
1061
)
1062
task = client.synthesize_speech(request=request)
1063
tasks.append(task)
1064
1065
results = await asyncio.gather(*tasks)
1066
for response in results:
1067
yield response.audio_content
1068
1069
finally:
1070
await client.close()
1071
1072
# Example usage
1073
async def text_generator():
1074
"""Generate texts for processing."""
1075
for i in range(50):
1076
yield f"This is text number {i} for memory-efficient processing."
1077
1078
async def process_with_memory_efficiency():
1079
"""Demonstrate memory-efficient processing."""
1080
1081
audio_count = 0
1082
total_bytes = 0
1083
1084
async for audio_data in memory_efficient_processing(text_generator(), batch_size=3):
1085
audio_count += 1
1086
total_bytes += len(audio_data)
1087
print(f"Processed audio {audio_count}: {len(audio_data)} bytes")
1088
1089
print(f"Total: {audio_count} audio files, {total_bytes} bytes")
1090
1091
# asyncio.run(process_with_memory_efficiency())
1092
```