or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

async-clients.mdconfiguration-types.mdindex.mdlong-audio-synthesis.mdspeech-synthesis.mdstreaming-synthesis.mdvoice-management.md

async-clients.mddocs/

0

# Async Clients

1

2

## Overview

3

4

The Google Cloud Text-to-Speech API provides full async/await support through asynchronous client classes. These clients enable non-blocking operations, making them ideal for applications that need to handle multiple synthesis requests concurrently or integrate with async frameworks like FastAPI, aiohttp, or asyncio-based applications.

5

6

## Async Client Classes

7

8

### TextToSpeechAsyncClient

9

10

```api { .api }

11

import asyncio

12

from google.cloud import texttospeech

13

14

# Initialize async client

15

async_client = texttospeech.TextToSpeechAsyncClient()

16

17

# Basic async synthesis

18

async def basic_async_synthesis():

19

request = texttospeech.SynthesizeSpeechRequest(

20

input=texttospeech.SynthesisInput(text="Hello from async synthesis!"),

21

voice=texttospeech.VoiceSelectionParams(language_code="en-US"),

22

audio_config=texttospeech.AudioConfig(

23

audio_encoding=texttospeech.AudioEncoding.MP3

24

)

25

)

26

27

response = await async_client.synthesize_speech(request=request)

28

return response.audio_content

29

30

# Run async function

31

audio_data = asyncio.run(basic_async_synthesis())

32

```

33

34

### TextToSpeechLongAudioSynthesizeAsyncClient

35

36

```api { .api }

37

from google.cloud.texttospeech_v1.services import text_to_speech_long_audio_synthesize

38

39

# Initialize async long audio client

40

async_long_client = text_to_speech_long_audio_synthesize.TextToSpeechLongAudioSynthesizeAsyncClient()

41

42

# Async long audio synthesis

43

async def async_long_audio_synthesis():

44

request = texttospeech.SynthesizeLongAudioRequest(

45

parent="projects/your-project-id/locations/us-central1",

46

input=texttospeech.SynthesisInput(text="Very long text content..." * 100),

47

voice=texttospeech.VoiceSelectionParams(language_code="en-US"),

48

audio_config=texttospeech.AudioConfig(

49

audio_encoding=texttospeech.AudioEncoding.MP3

50

),

51

output_gcs_uri="gs://your-bucket/async-long-audio.mp3"

52

)

53

54

operation = await async_long_client.synthesize_long_audio(request=request)

55

return operation

56

57

# Usage

58

# operation = asyncio.run(async_long_audio_synthesis())

59

```

60

61

## Core Async Operations

62

63

### Async Speech Synthesis

64

65

```api { .api }

66

import asyncio

67

from google.cloud import texttospeech

68

69

class AsyncTextToSpeech:

70

"""Async Text-to-Speech wrapper class."""

71

72

def __init__(self):

73

self.client = texttospeech.TextToSpeechAsyncClient()

74

75

async def synthesize_text(self, text: str, language_code: str = "en-US",

76

voice_name: str = None) -> bytes:

77

"""Synthesize text to audio asynchronously."""

78

79

voice = texttospeech.VoiceSelectionParams(

80

language_code=language_code,

81

name=voice_name

82

)

83

84

audio_config = texttospeech.AudioConfig(

85

audio_encoding=texttospeech.AudioEncoding.MP3

86

)

87

88

request = texttospeech.SynthesizeSpeechRequest(

89

input=texttospeech.SynthesisInput(text=text),

90

voice=voice,

91

audio_config=audio_config

92

)

93

94

response = await self.client.synthesize_speech(request=request)

95

return response.audio_content

96

97

async def synthesize_ssml(self, ssml: str, language_code: str = "en-US",

98

voice_name: str = None) -> bytes:

99

"""Synthesize SSML to audio asynchronously."""

100

101

voice = texttospeech.VoiceSelectionParams(

102

language_code=language_code,

103

name=voice_name

104

)

105

106

audio_config = texttospeech.AudioConfig(

107

audio_encoding=texttospeech.AudioEncoding.LINEAR16,

108

sample_rate_hertz=24000

109

)

110

111

request = texttospeech.SynthesizeSpeechRequest(

112

input=texttospeech.SynthesisInput(ssml=ssml),

113

voice=voice,

114

audio_config=audio_config

115

)

116

117

response = await self.client.synthesize_speech(request=request)

118

return response.audio_content

119

120

async def close(self):

121

"""Close the async client."""

122

await self.client.close()

123

124

# Usage example

125

async def demo_async_synthesis():

126

tts = AsyncTextToSpeech()

127

128

try:

129

# Synthesize text

130

audio1 = await tts.synthesize_text("Hello async world!")

131

print(f"Generated {len(audio1)} bytes of audio")

132

133

# Synthesize SSML

134

ssml = '<speak>This is <emphasis level="strong">emphasized</emphasis> text.</speak>'

135

audio2 = await tts.synthesize_ssml(ssml)

136

print(f"Generated {len(audio2)} bytes from SSML")

137

138

finally:

139

await tts.close()

140

141

# Run the demo

142

# asyncio.run(demo_async_synthesis())

143

```

144

145

### Async Voice Listing

146

147

```api { .api }

148

import asyncio

149

from google.cloud import texttospeech

150

151

async def list_voices_async(language_filter: str = None):

152

"""List available voices asynchronously."""

153

154

async_client = texttospeech.TextToSpeechAsyncClient()

155

156

try:

157

if language_filter:

158

request = texttospeech.ListVoicesRequest(language_code=language_filter)

159

response = await async_client.list_voices(request=request)

160

else:

161

response = await async_client.list_voices()

162

163

voices = []

164

for voice in response.voices:

165

voices.append({

166

'name': voice.name,

167

'language_codes': voice.language_codes,

168

'ssml_gender': voice.ssml_gender.name,

169

'natural_sample_rate_hertz': voice.natural_sample_rate_hertz

170

})

171

172

return voices

173

174

finally:

175

await async_client.close()

176

177

async def find_best_voice_async(language_code: str, gender: str = None):

178

"""Find the best voice for language and gender asynchronously."""

179

180

voices = await list_voices_async(language_code)

181

182

# Filter by gender if specified

183

if gender:

184

gender_upper = gender.upper()

185

voices = [v for v in voices if v['ssml_gender'] == gender_upper]

186

187

# Prefer Neural2 > Wavenet > Standard

188

for voice_type in ['Neural2', 'Wavenet', 'Standard']:

189

for voice in voices:

190

if voice_type in voice['name']:

191

return voice

192

193

return voices[0] if voices else None

194

195

# Usage

196

async def voice_discovery_demo():

197

# List all English voices

198

en_voices = await list_voices_async("en-US")

199

print(f"Found {len(en_voices)} English voices")

200

201

# Find best female voice

202

best_female = await find_best_voice_async("en-US", "female")

203

if best_female:

204

print(f"Best female voice: {best_female['name']}")

205

206

# asyncio.run(voice_discovery_demo())

207

```

208

209

## Concurrent Operations

210

211

### Batch Processing with asyncio

212

213

```api { .api }

214

import asyncio

215

from typing import List, Dict

216

from google.cloud import texttospeech

217

218

class AsyncBatchProcessor:

219

"""Process multiple TTS requests concurrently."""

220

221

def __init__(self, max_concurrent: int = 10):

222

self.max_concurrent = max_concurrent

223

self.client = texttospeech.TextToSpeechAsyncClient()

224

self.semaphore = asyncio.Semaphore(max_concurrent)

225

226

async def synthesize_single(self, text: str, voice_config: dict,

227

audio_config: dict) -> Dict:

228

"""Synthesize a single text with rate limiting."""

229

230

async with self.semaphore: # Limit concurrent requests

231

try:

232

request = texttospeech.SynthesizeSpeechRequest(

233

input=texttospeech.SynthesisInput(text=text),

234

voice=texttospeech.VoiceSelectionParams(**voice_config),

235

audio_config=texttospeech.AudioConfig(**audio_config)

236

)

237

238

response = await self.client.synthesize_speech(request=request)

239

240

return {

241

'success': True,

242

'audio_content': response.audio_content,

243

'text': text[:50] + "..." if len(text) > 50 else text

244

}

245

246

except Exception as e:

247

return {

248

'success': False,

249

'error': str(e),

250

'text': text[:50] + "..." if len(text) > 50 else text

251

}

252

253

async def process_batch(self, text_list: List[str],

254

voice_config: dict = None,

255

audio_config: dict = None) -> List[Dict]:

256

"""Process multiple texts concurrently."""

257

258

# Default configurations

259

default_voice = voice_config or {'language_code': 'en-US'}

260

default_audio = audio_config or {

261

'audio_encoding': texttospeech.AudioEncoding.MP3

262

}

263

264

# Create tasks for all texts

265

tasks = []

266

for text in text_list:

267

task = self.synthesize_single(text, default_voice, default_audio)

268

tasks.append(task)

269

270

# Execute all tasks concurrently

271

results = await asyncio.gather(*tasks, return_exceptions=True)

272

273

return results

274

275

async def process_with_different_voices(self, text_voice_pairs: List[tuple]) -> List[Dict]:

276

"""Process texts with different voice configurations."""

277

278

tasks = []

279

for text, voice_config, audio_config in text_voice_pairs:

280

task = self.synthesize_single(text, voice_config, audio_config)

281

tasks.append(task)

282

283

results = await asyncio.gather(*tasks, return_exceptions=True)

284

return results

285

286

async def close(self):

287

"""Close the async client."""

288

await self.client.close()

289

290

# Usage example

291

async def batch_processing_demo():

292

"""Demonstrate batch processing with async."""

293

294

processor = AsyncBatchProcessor(max_concurrent=5)

295

296

try:

297

# Batch of texts to process

298

texts = [

299

"This is the first text to synthesize.",

300

"Here's the second piece of content.",

301

"And this is the third text sample.",

302

"Fourth text for our batch processing demo.",

303

"Finally, the last text in our batch."

304

]

305

306

print("Processing batch of texts...")

307

start_time = asyncio.get_event_loop().time()

308

309

# Process all texts concurrently

310

results = await processor.process_batch(texts)

311

312

end_time = asyncio.get_event_loop().time()

313

processing_time = end_time - start_time

314

315

# Analyze results

316

successful = [r for r in results if isinstance(r, dict) and r.get('success')]

317

failed = [r for r in results if isinstance(r, dict) and not r.get('success')]

318

319

print(f"Batch processing completed in {processing_time:.2f} seconds")

320

print(f"Successful: {len(successful)}")

321

print(f"Failed: {len(failed)}")

322

323

# Save successful results

324

for i, result in enumerate(successful):

325

filename = f"batch_output_{i}.mp3"

326

with open(filename, "wb") as f:

327

f.write(result['audio_content'])

328

print(f"Saved: {filename}")

329

330

return results

331

332

finally:

333

await processor.close()

334

335

# Run batch processing

336

# results = asyncio.run(batch_processing_demo())

337

```

338

339

### Multi-Voice Processing

340

341

```api { .api }

342

import asyncio

343

from google.cloud import texttospeech

344

345

async def create_multi_voice_conversation():

346

"""Create conversation with different voices asynchronously."""

347

348

processor = AsyncBatchProcessor(max_concurrent=3)

349

350

try:

351

# Conversation parts with different voices

352

conversation_parts = [

353

(

354

"Hello, welcome to our customer service.",

355

{'language_code': 'en-US', 'name': 'en-US-Neural2-A'}, # Female agent

356

{'audio_encoding': texttospeech.AudioEncoding.MP3}

357

),

358

(

359

"Hi there, I have a question about my account.",

360

{'language_code': 'en-US', 'name': 'en-US-Neural2-C'}, # Male customer

361

{'audio_encoding': texttospeech.AudioEncoding.MP3}

362

),

363

(

364

"I'd be happy to help you with that. Can you provide your account number?",

365

{'language_code': 'en-US', 'name': 'en-US-Neural2-A'}, # Female agent

366

{'audio_encoding': texttospeech.AudioEncoding.MP3}

367

),

368

(

369

"Sure, my account number is 12345.",

370

{'language_code': 'en-US', 'name': 'en-US-Neural2-C'}, # Male customer

371

{'audio_encoding': texttospeech.AudioEncoding.MP3}

372

)

373

]

374

375

print("Creating multi-voice conversation...")

376

results = await processor.process_with_different_voices(conversation_parts)

377

378

# Combine successful results in order

379

conversation_audio = []

380

for i, result in enumerate(results):

381

if isinstance(result, dict) and result.get('success'):

382

conversation_audio.append(result['audio_content'])

383

print(f"Part {i+1}: Generated {len(result['audio_content'])} bytes")

384

385

# Save complete conversation

386

if conversation_audio:

387

complete_audio = b''.join(conversation_audio)

388

with open("conversation.mp3", "wb") as f:

389

f.write(complete_audio)

390

print(f"Saved complete conversation: {len(complete_audio)} bytes")

391

392

return conversation_audio

393

394

finally:

395

await processor.close()

396

397

# asyncio.run(create_multi_voice_conversation())

398

```

399

400

## Async Streaming Operations

401

402

### Async Streaming Synthesis

403

404

```api { .api }

405

import asyncio

406

from google.cloud import texttospeech

407

from typing import AsyncGenerator

408

409

class AsyncStreamingSynthesis:

410

"""Async streaming text-to-speech synthesis."""

411

412

def __init__(self):

413

self.client = texttospeech.TextToSpeechAsyncClient()

414

415

async def stream_synthesis(self, text_chunks: list) -> AsyncGenerator[bytes, None]:

416

"""Stream synthesis of multiple text chunks."""

417

418

# Configure streaming

419

config = texttospeech.StreamingSynthesizeConfig(

420

voice=texttospeech.VoiceSelectionParams(

421

language_code="en-US",

422

name="en-US-Neural2-A"

423

),

424

audio_config=texttospeech.StreamingAudioConfig(

425

audio_encoding=texttospeech.AudioEncoding.LINEAR16,

426

sample_rate_hertz=22050

427

)

428

)

429

430

async def request_generator():

431

# Configuration request

432

yield texttospeech.StreamingSynthesizeRequest(streaming_config=config)

433

434

# Input requests

435

for chunk in text_chunks:

436

yield texttospeech.StreamingSynthesizeRequest(

437

input=texttospeech.StreamingSynthesisInput(text=chunk)

438

)

439

440

# Stream synthesis

441

response_stream = await self.client.streaming_synthesize(request_generator())

442

443

async for response in response_stream:

444

if response.audio_content:

445

yield response.audio_content

446

447

async def process_streaming_text(self, long_text: str, chunk_size: int = 100):

448

"""Process long text with streaming synthesis."""

449

450

# Break text into chunks

451

words = long_text.split()

452

text_chunks = []

453

current_chunk = []

454

current_length = 0

455

456

for word in words:

457

current_chunk.append(word)

458

current_length += len(word) + 1 # +1 for space

459

460

if current_length >= chunk_size:

461

text_chunks.append(' '.join(current_chunk))

462

current_chunk = []

463

current_length = 0

464

465

if current_chunk:

466

text_chunks.append(' '.join(current_chunk))

467

468

# Stream synthesis

469

audio_chunks = []

470

async for audio_chunk in self.stream_synthesis(text_chunks):

471

audio_chunks.append(audio_chunk)

472

print(f"Received streaming audio chunk: {len(audio_chunk)} bytes")

473

474

return b''.join(audio_chunks)

475

476

async def close(self):

477

"""Close the async client."""

478

await self.client.close()

479

480

# Usage example

481

async def streaming_demo():

482

"""Demonstrate async streaming synthesis."""

483

484

streamer = AsyncStreamingSynthesis()

485

486

try:

487

long_text = """

488

This is a long piece of text that will be processed using async streaming

489

synthesis. The text will be broken into smaller chunks and each chunk will

490

be sent to the synthesis service as part of a streaming request. This allows

491

for more efficient processing of long content and enables real-time audio

492

generation as the text is being processed.

493

""" * 3

494

495

print("Starting async streaming synthesis...")

496

497

audio_data = await streamer.process_streaming_text(long_text, chunk_size=80)

498

499

print(f"Streaming synthesis complete: {len(audio_data)} bytes generated")

500

501

# Save result

502

with open("async_streaming_output.wav", "wb") as f:

503

f.write(audio_data)

504

505

return audio_data

506

507

finally:

508

await streamer.close()

509

510

# asyncio.run(streaming_demo())

511

```

512

513

## Integration with Web Frameworks

514

515

### FastAPI Integration

516

517

```api { .api }

518

import asyncio

519

from fastapi import FastAPI, HTTPException

520

from fastapi.responses import Response

521

from pydantic import BaseModel

522

from google.cloud import texttospeech

523

from typing import Optional

524

525

app = FastAPI()

526

527

# Global async client (initialized once)

528

tts_client = None

529

530

class TTSRequest(BaseModel):

531

text: str

532

language_code: str = "en-US"

533

voice_name: Optional[str] = None

534

audio_encoding: str = "MP3"

535

speaking_rate: float = 1.0

536

pitch: float = 0.0

537

538

@app.on_event("startup")

539

async def startup_event():

540

"""Initialize TTS client on startup."""

541

global tts_client

542

tts_client = texttospeech.TextToSpeechAsyncClient()

543

544

@app.on_event("shutdown")

545

async def shutdown_event():

546

"""Close TTS client on shutdown."""

547

global tts_client

548

if tts_client:

549

await tts_client.close()

550

551

@app.post("/synthesize")

552

async def synthesize_speech(request: TTSRequest):

553

"""Synthesize speech from text."""

554

555

try:

556

# Map string encoding to enum

557

encoding_map = {

558

"MP3": texttospeech.AudioEncoding.MP3,

559

"LINEAR16": texttospeech.AudioEncoding.LINEAR16,

560

"OGG_OPUS": texttospeech.AudioEncoding.OGG_OPUS

561

}

562

563

audio_encoding = encoding_map.get(request.audio_encoding, texttospeech.AudioEncoding.MP3)

564

565

# Create synthesis request

566

synthesis_request = texttospeech.SynthesizeSpeechRequest(

567

input=texttospeech.SynthesisInput(text=request.text),

568

voice=texttospeech.VoiceSelectionParams(

569

language_code=request.language_code,

570

name=request.voice_name

571

),

572

audio_config=texttospeech.AudioConfig(

573

audio_encoding=audio_encoding,

574

speaking_rate=request.speaking_rate,

575

pitch=request.pitch

576

)

577

)

578

579

# Synthesize speech

580

response = await tts_client.synthesize_speech(request=synthesis_request)

581

582

# Return audio as response

583

media_type = "audio/mpeg" if request.audio_encoding == "MP3" else "audio/wav"

584

return Response(

585

content=response.audio_content,

586

media_type=media_type,

587

headers={"Content-Disposition": "attachment; filename=speech.mp3"}

588

)

589

590

except Exception as e:

591

raise HTTPException(status_code=500, detail=f"Synthesis failed: {str(e)}")

592

593

@app.get("/voices")

594

async def list_voices(language_code: Optional[str] = None):

595

"""List available voices."""

596

597

try:

598

if language_code:

599

request = texttospeech.ListVoicesRequest(language_code=language_code)

600

response = await tts_client.list_voices(request=request)

601

else:

602

response = await tts_client.list_voices()

603

604

voices = []

605

for voice in response.voices:

606

voices.append({

607

"name": voice.name,

608

"language_codes": voice.language_codes,

609

"ssml_gender": voice.ssml_gender.name,

610

"natural_sample_rate_hertz": voice.natural_sample_rate_hertz

611

})

612

613

return {"voices": voices}

614

615

except Exception as e:

616

raise HTTPException(status_code=500, detail=f"Failed to list voices: {str(e)}")

617

618

@app.post("/batch-synthesize")

619

async def batch_synthesize(requests: list[TTSRequest]):

620

"""Synthesize multiple texts in parallel."""

621

622

try:

623

async def synthesize_single(req: TTSRequest):

624

encoding_map = {

625

"MP3": texttospeech.AudioEncoding.MP3,

626

"LINEAR16": texttospeech.AudioEncoding.LINEAR16,

627

"OGG_OPUS": texttospeech.AudioEncoding.OGG_OPUS

628

}

629

630

synthesis_request = texttospeech.SynthesizeSpeechRequest(

631

input=texttospeech.SynthesisInput(text=req.text),

632

voice=texttospeech.VoiceSelectionParams(

633

language_code=req.language_code,

634

name=req.voice_name

635

),

636

audio_config=texttospeech.AudioConfig(

637

audio_encoding=encoding_map.get(req.audio_encoding, texttospeech.AudioEncoding.MP3),

638

speaking_rate=req.speaking_rate,

639

pitch=req.pitch

640

)

641

)

642

643

response = await tts_client.synthesize_speech(request=synthesis_request)

644

return {

645

"text": req.text[:50] + "..." if len(req.text) > 50 else req.text,

646

"audio_size": len(response.audio_content),

647

"success": True

648

}

649

650

# Process requests concurrently

651

tasks = [synthesize_single(req) for req in requests]

652

results = await asyncio.gather(*tasks, return_exceptions=True)

653

654

# Format results

655

formatted_results = []

656

for i, result in enumerate(results):

657

if isinstance(result, Exception):

658

formatted_results.append({

659

"index": i,

660

"success": False,

661

"error": str(result)

662

})

663

else:

664

formatted_results.append({

665

"index": i,

666

**result

667

})

668

669

return {"results": formatted_results}

670

671

except Exception as e:

672

raise HTTPException(status_code=500, detail=f"Batch synthesis failed: {str(e)}")

673

674

# To run: uvicorn main:app --reload

675

```

676

677

### aiohttp Integration

678

679

```api { .api }

680

import asyncio

681

import json

682

from aiohttp import web, ClientSession

683

from google.cloud import texttospeech

684

685

class TTSService:

686

"""Text-to-Speech service for aiohttp application."""

687

688

def __init__(self):

689

self.client = None

690

691

async def initialize(self):

692

"""Initialize the TTS client."""

693

self.client = texttospeech.TextToSpeechAsyncClient()

694

695

async def cleanup(self):

696

"""Cleanup the TTS client."""

697

if self.client:

698

await self.client.close()

699

700

async def synthesize(self, text: str, language_code: str = "en-US",

701

voice_name: str = None) -> bytes:

702

"""Synthesize text to speech."""

703

704

request = texttospeech.SynthesizeSpeechRequest(

705

input=texttospeech.SynthesisInput(text=text),

706

voice=texttospeech.VoiceSelectionParams(

707

language_code=language_code,

708

name=voice_name

709

),

710

audio_config=texttospeech.AudioConfig(

711

audio_encoding=texttospeech.AudioEncoding.MP3

712

)

713

)

714

715

response = await self.client.synthesize_speech(request=request)

716

return response.audio_content

717

718

# Global TTS service

719

tts_service = TTSService()

720

721

async def synthesize_handler(request):

722

"""Handle synthesis requests."""

723

724

try:

725

data = await request.json()

726

text = data.get('text')

727

language_code = data.get('language_code', 'en-US')

728

voice_name = data.get('voice_name')

729

730

if not text:

731

return web.json_response({'error': 'Text is required'}, status=400)

732

733

audio_data = await tts_service.synthesize(text, language_code, voice_name)

734

735

return web.Response(

736

body=audio_data,

737

content_type='audio/mpeg',

738

headers={'Content-Disposition': 'attachment; filename="speech.mp3"'}

739

)

740

741

except Exception as e:

742

return web.json_response({'error': str(e)}, status=500)

743

744

async def health_handler(request):

745

"""Health check endpoint."""

746

return web.json_response({'status': 'healthy'})

747

748

async def init_app():

749

"""Initialize the aiohttp application."""

750

751

app = web.Application()

752

753

# Add routes

754

app.router.add_post('/synthesize', synthesize_handler)

755

app.router.add_get('/health', health_handler)

756

757

# Initialize TTS service

758

await tts_service.initialize()

759

760

# Setup cleanup

761

async def cleanup_handler(app):

762

await tts_service.cleanup()

763

764

app.on_cleanup.append(cleanup_handler)

765

766

return app

767

768

# To run: python -c "import asyncio; from main import init_app; app = asyncio.run(init_app()); web.run_app(app, port=8080)"

769

```

770

771

## Error Handling in Async Operations

772

773

### Async Error Handling Patterns

774

775

```api { .api }

776

import asyncio

777

import logging

778

from google.api_core import exceptions

779

from google.cloud import texttospeech

780

781

class AsyncTTSWithErrorHandling:

782

"""Async TTS with comprehensive error handling."""

783

784

def __init__(self, max_retries: int = 3):

785

self.client = texttospeech.TextToSpeechAsyncClient()

786

self.max_retries = max_retries

787

788

async def synthesize_with_retry(self, text: str, **kwargs) -> dict:

789

"""Synthesize with automatic retry on transient errors."""

790

791

for attempt in range(self.max_retries):

792

try:

793

request = texttospeech.SynthesizeSpeechRequest(

794

input=texttospeech.SynthesisInput(text=text),

795

voice=texttospeech.VoiceSelectionParams(

796

language_code=kwargs.get('language_code', 'en-US'),

797

name=kwargs.get('voice_name')

798

),

799

audio_config=texttospeech.AudioConfig(

800

audio_encoding=texttospeech.AudioEncoding.MP3

801

)

802

)

803

804

response = await self.client.synthesize_speech(request=request)

805

806

return {

807

'success': True,

808

'audio_content': response.audio_content,

809

'attempts': attempt + 1

810

}

811

812

except exceptions.ResourceExhausted as e:

813

logging.warning(f"Rate limit hit (attempt {attempt + 1}): {e}")

814

if attempt == self.max_retries - 1:

815

return {'success': False, 'error': 'Rate limit exceeded', 'attempts': attempt + 1}

816

817

# Exponential backoff

818

await asyncio.sleep(2 ** attempt)

819

820

except exceptions.ServiceUnavailable as e:

821

logging.warning(f"Service unavailable (attempt {attempt + 1}): {e}")

822

if attempt == self.max_retries - 1:

823

return {'success': False, 'error': 'Service unavailable', 'attempts': attempt + 1}

824

825

await asyncio.sleep(1)

826

827

except exceptions.DeadlineExceeded as e:

828

logging.warning(f"Request timeout (attempt {attempt + 1}): {e}")

829

if attempt == self.max_retries - 1:

830

return {'success': False, 'error': 'Request timeout', 'attempts': attempt + 1}

831

832

except exceptions.InvalidArgument as e:

833

# Non-retryable error

834

logging.error(f"Invalid argument: {e}")

835

return {'success': False, 'error': f'Invalid argument: {e}', 'attempts': attempt + 1}

836

837

except Exception as e:

838

logging.error(f"Unexpected error (attempt {attempt + 1}): {e}")

839

if attempt == self.max_retries - 1:

840

return {'success': False, 'error': f'Unexpected error: {e}', 'attempts': attempt + 1}

841

842

return {'success': False, 'error': 'Max retries exceeded', 'attempts': self.max_retries}

843

844

async def safe_batch_synthesis(self, text_list: list, **kwargs) -> list:

845

"""Safely process multiple texts with individual error handling."""

846

847

async def safe_synthesize_single(text: str) -> dict:

848

try:

849

result = await self.synthesize_with_retry(text, **kwargs)

850

result['text'] = text[:50] + "..." if len(text) > 50 else text

851

return result

852

except Exception as e:

853

return {

854

'success': False,

855

'error': f'Failed to process: {e}',

856

'text': text[:50] + "..." if len(text) > 50 else text

857

}

858

859

# Process all texts concurrently with individual error handling

860

tasks = [safe_synthesize_single(text) for text in text_list]

861

results = await asyncio.gather(*tasks, return_exceptions=True)

862

863

# Handle any gather-level exceptions

864

processed_results = []

865

for i, result in enumerate(results):

866

if isinstance(result, Exception):

867

processed_results.append({

868

'success': False,

869

'error': f'Task failed: {result}',

870

'text': text_list[i][:50] + "..." if len(text_list[i]) > 50 else text_list[i]

871

})

872

else:

873

processed_results.append(result)

874

875

return processed_results

876

877

async def close(self):

878

"""Close the async client."""

879

await self.client.close()

880

881

# Usage example

882

async def error_handling_demo():

883

"""Demonstrate error handling in async operations."""

884

885

tts = AsyncTTSWithErrorHandling(max_retries=3)

886

887

try:

888

# Test with various scenarios

889

test_texts = [

890

"This is a normal text that should work fine.",

891

"", # Empty text (should cause InvalidArgument)

892

"This is another normal text.",

893

"A" * 10000, # Very long text (might cause issues)

894

"Final test text."

895

]

896

897

print("Testing batch synthesis with error handling...")

898

results = await tts.safe_batch_synthesis(test_texts, language_code="en-US")

899

900

# Analyze results

901

successful = [r for r in results if r['success']]

902

failed = [r for r in results if not r['success']]

903

904

print(f"Results: {len(successful)} successful, {len(failed)} failed")

905

906

for result in results:

907

status = "✅" if result['success'] else "❌"

908

print(f"{status} {result['text']}")

909

if not result['success']:

910

print(f" Error: {result['error']}")

911

912

return results

913

914

finally:

915

await tts.close()

916

917

# asyncio.run(error_handling_demo())

918

```

919

920

## Performance Optimization for Async Operations

921

922

### Connection Pooling and Client Reuse

923

924

```api { .api }

925

import asyncio

926

from contextlib import asynccontextmanager

927

from google.cloud import texttospeech

928

929

class OptimizedAsyncTTS:

930

"""Optimized async TTS with connection pooling."""

931

932

def __init__(self):

933

self._client = None

934

self._client_lock = asyncio.Lock()

935

936

async def get_client(self):

937

"""Get or create TTS client with thread-safe initialization."""

938

if self._client is None:

939

async with self._client_lock:

940

if self._client is None: # Double-check pattern

941

self._client = texttospeech.TextToSpeechAsyncClient()

942

return self._client

943

944

@asynccontextmanager

945

async def client_context(self):

946

"""Context manager for client lifecycle."""

947

client = await self.get_client()

948

try:

949

yield client

950

finally:

951

# Client cleanup is handled in close() method

952

pass

953

954

async def synthesize_optimized(self, text: str, **config) -> bytes:

955

"""Optimized synthesis with client reuse."""

956

957

async with self.client_context() as client:

958

request = texttospeech.SynthesizeSpeechRequest(

959

input=texttospeech.SynthesisInput(text=text),

960

voice=texttospeech.VoiceSelectionParams(

961

language_code=config.get('language_code', 'en-US'),

962

name=config.get('voice_name')

963

),

964

audio_config=texttospeech.AudioConfig(

965

audio_encoding=texttospeech.AudioEncoding.MP3

966

)

967

)

968

969

response = await client.synthesize_speech(request=request)

970

return response.audio_content

971

972

async def close(self):

973

"""Clean up client resources."""

974

if self._client:

975

await self._client.close()

976

self._client = None

977

978

# Global optimized TTS instance

979

optimized_tts = OptimizedAsyncTTS()

980

981

async def performance_benchmark():

982

"""Benchmark async TTS performance."""

983

import time

984

985

test_texts = [f"This is test text number {i}" for i in range(20)]

986

987

# Sequential processing

988

start_time = time.time()

989

sequential_results = []

990

for text in test_texts:

991

audio = await optimized_tts.synthesize_optimized(text)

992

sequential_results.append(len(audio))

993

sequential_time = time.time() - start_time

994

995

# Concurrent processing

996

start_time = time.time()

997

tasks = [optimized_tts.synthesize_optimized(text) for text in test_texts]

998

concurrent_results = await asyncio.gather(*tasks)

999

concurrent_time = time.time() - start_time

1000

1001

print(f"Sequential processing: {sequential_time:.2f} seconds")

1002

print(f"Concurrent processing: {concurrent_time:.2f} seconds")

1003

print(f"Speedup: {sequential_time / concurrent_time:.2f}x")

1004

1005

await optimized_tts.close()

1006

1007

# asyncio.run(performance_benchmark())

1008

```

1009

1010

### Memory-Efficient Async Processing

1011

1012

```api { .api }

1013

import asyncio

1014

from typing import AsyncIterator

1015

from google.cloud import texttospeech

1016

1017

async def memory_efficient_processing(text_iterator: AsyncIterator[str],

1018

batch_size: int = 5) -> AsyncIterator[bytes]:

1019

"""Process texts in batches to manage memory usage."""

1020

1021

client = texttospeech.TextToSpeechAsyncClient()

1022

1023

try:

1024

batch = []

1025

1026

async for text in text_iterator:

1027

batch.append(text)

1028

1029

if len(batch) >= batch_size:

1030

# Process batch

1031

tasks = []

1032

for text_item in batch:

1033

request = texttospeech.SynthesizeSpeechRequest(

1034

input=texttospeech.SynthesisInput(text=text_item),

1035

voice=texttospeech.VoiceSelectionParams(language_code="en-US"),

1036

audio_config=texttospeech.AudioConfig(

1037

audio_encoding=texttospeech.AudioEncoding.MP3

1038

)

1039

)

1040

task = client.synthesize_speech(request=request)

1041

tasks.append(task)

1042

1043

# Yield results as they complete

1044

results = await asyncio.gather(*tasks)

1045

for response in results:

1046

yield response.audio_content

1047

1048

# Clear batch

1049

batch = []

1050

1051

# Process remaining items

1052

if batch:

1053

tasks = []

1054

for text_item in batch:

1055

request = texttospeech.SynthesizeSpeechRequest(

1056

input=texttospeech.SynthesisInput(text=text_item),

1057

voice=texttospeech.VoiceSelectionParams(language_code="en-US"),

1058

audio_config=texttospeech.AudioConfig(

1059

audio_encoding=texttospeech.AudioEncoding.MP3

1060

)

1061

)

1062

task = client.synthesize_speech(request=request)

1063

tasks.append(task)

1064

1065

results = await asyncio.gather(*tasks)

1066

for response in results:

1067

yield response.audio_content

1068

1069

finally:

1070

await client.close()

1071

1072

# Example usage

1073

async def text_generator():

1074

"""Generate texts for processing."""

1075

for i in range(50):

1076

yield f"This is text number {i} for memory-efficient processing."

1077

1078

async def process_with_memory_efficiency():

1079

"""Demonstrate memory-efficient processing."""

1080

1081

audio_count = 0

1082

total_bytes = 0

1083

1084

async for audio_data in memory_efficient_processing(text_generator(), batch_size=3):

1085

audio_count += 1

1086

total_bytes += len(audio_data)

1087

print(f"Processed audio {audio_count}: {len(audio_data)} bytes")

1088

1089

print(f"Total: {audio_count} audio files, {total_bytes} bytes")

1090

1091

# asyncio.run(process_with_memory_efficiency())

1092

```