or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

async-inference.mdchat-completions.mdconfiguration.mdindex.mdparameters-types.mdtext-classification.mdtext-embeddings.mdtext-generation.mdtext-scoring.md

async-inference.mddocs/

0

# Asynchronous Inference

1

2

High-performance asynchronous inference engine for concurrent request handling, streaming responses, and integration with async frameworks. Enables scalable serving of LLMs with efficient resource utilization and non-blocking operations.

3

4

## Capabilities

5

6

### Async LLM Engine

7

8

Core asynchronous inference engine supporting concurrent request processing, streaming generation, and distributed execution with Ray.

9

10

```python { .api }

11

class AsyncLLMEngine:

12

@classmethod

13

def from_engine_args(cls, engine_args: AsyncEngineArgs) -> "AsyncLLMEngine":

14

"""

15

Create AsyncLLMEngine from configuration arguments.

16

17

Parameters:

18

- engine_args: Configuration for engine initialization

19

20

Returns:

21

AsyncLLMEngine instance ready for inference

22

"""

23

24

async def generate(

25

self,

26

prompt: Optional[str],

27

sampling_params: SamplingParams,

28

request_id: str,

29

prompt_token_ids: Optional[List[int]] = None,

30

lora_request: Optional[LoRARequest] = None

31

) -> AsyncIterator[RequestOutput]:

32

"""

33

Generate text asynchronously with streaming output.

34

35

Parameters:

36

- prompt: Input text prompt

37

- sampling_params: Generation parameters

38

- request_id: Unique identifier for request tracking

39

- prompt_token_ids: Pre-tokenized input (optional)

40

- lora_request: LoRA adapter configuration

41

42

Yields:

43

RequestOutput objects as generation progresses

44

"""

45

46

async def encode(

47

self,

48

prompt: Optional[str],

49

pooling_params: PoolingParams,

50

request_id: str,

51

prompt_token_ids: Optional[List[int]] = None,

52

lora_request: Optional[LoRARequest] = None

53

) -> AsyncIterator[EmbeddingRequestOutput]:

54

"""

55

Generate embeddings asynchronously.

56

57

Parameters:

58

- prompt: Input text for embedding

59

- pooling_params: Embedding pooling configuration

60

- request_id: Unique request identifier

61

- prompt_token_ids: Pre-tokenized input (optional)

62

- lora_request: LoRA adapter configuration

63

64

Yields:

65

EmbeddingRequestOutput with vector representations

66

"""

67

68

async def abort(self, request_id: str) -> None:

69

"""

70

Cancel a running request.

71

72

Parameters:

73

- request_id: ID of request to abort

74

"""

75

76

async def get_model_config(self) -> ModelConfig:

77

"""Get the model configuration."""

78

79

async def get_num_unfinished_requests(self) -> int:

80

"""Get count of active requests."""

81

82

async def check_health(self) -> None:

83

"""

84

Health check for the engine.

85

86

Raises:

87

RuntimeError: If engine is unhealthy

88

"""

89

90

async def add_request(

91

self,

92

request_id: str,

93

prompt: Optional[str],

94

params: Union[SamplingParams, PoolingParams],

95

arrival_time: Optional[float] = None,

96

prompt_token_ids: Optional[List[int]] = None,

97

lora_request: Optional[LoRARequest] = None,

98

trace_headers: Optional[Dict[str, str]] = None,

99

prompt_adapter_request: Optional["PromptAdapterRequest"] = None,

100

priority: int = 0

101

) -> AsyncIterator[RequestOutput]:

102

"""

103

Add request and return async generator for results.

104

105

Parameters:

106

- request_id: Unique identifier for the request

107

- prompt: Input text prompt

108

- params: Sampling or pooling parameters

109

- arrival_time: Request arrival timestamp

110

- prompt_token_ids: Pre-tokenized input

111

- lora_request: LoRA adapter configuration

112

- trace_headers: Tracing headers

113

- prompt_adapter_request: Prompt adapter configuration

114

- priority: Request priority level

115

116

Yields:

117

RequestOutput objects as processing progresses

118

"""

119

120

async def get_tokenizer(self, lora_request: Optional[LoRARequest] = None) -> "PreTrainedTokenizerBase":

121

"""

122

Get tokenizer asynchronously with optional LoRA support.

123

124

Parameters:

125

- lora_request: Optional LoRA adapter configuration

126

127

Returns:

128

Tokenizer instance configured for this model

129

"""

130

131

async def get_input_preprocessor(self) -> "InputPreprocessor":

132

"""

133

Get input preprocessor for this engine.

134

135

Returns:

136

Input preprocessor instance

137

"""

138

139

async def start_background_loop(self) -> None:

140

"""Start background loop for processing requests."""

141

142

async def shutdown_background_loop(self) -> None:

143

"""Shutdown background processing loop."""

144

145

async def get_vllm_config(self) -> "VllmConfig":

146

"""Get complete vLLM configuration."""

147

148

async def get_parallel_config(self) -> "ParallelConfig":

149

"""Get parallel execution configuration."""

150

151

async def get_decoding_config(self) -> "DecodingConfig":

152

"""Get decoding configuration."""

153

154

async def get_scheduler_config(self) -> "SchedulerConfig":

155

"""Get request scheduler configuration."""

156

157

async def get_lora_config(self) -> Optional["LoRAConfig"]:

158

"""Get LoRA adapter configuration."""

159

160

async def do_log_stats(

161

self,

162

scheduler_outputs: Optional[Any] = None,

163

model_output: Optional[Any] = None

164

) -> None:

165

"""

166

Log performance statistics.

167

168

Parameters:

169

- scheduler_outputs: Optional scheduler output data

170

- model_output: Optional model output data

171

"""

172

173

async def is_tracing_enabled(self) -> bool:

174

"""Check if request tracing is enabled."""

175

176

async def add_logger(self, logger_name: str, logger: Any) -> None:

177

"""

178

Add custom logger to the engine.

179

180

Parameters:

181

- logger_name: Name for the logger

182

- logger: Logger instance to add

183

"""

184

185

async def remove_logger(self, logger_name: str) -> None:

186

"""

187

Remove logger from the engine.

188

189

Parameters:

190

- logger_name: Name of logger to remove

191

"""

192

193

async def start_profile(self) -> None:

194

"""Start performance profiling for async engine."""

195

196

async def stop_profile(self) -> None:

197

"""Stop performance profiling and save results."""

198

199

async def reset_mm_cache(self) -> None:

200

"""Reset multimodal processing cache."""

201

202

async def reset_prefix_cache(self, device: Optional[Union[str, int]] = None) -> None:

203

"""

204

Reset prefix cache for memory optimization.

205

206

Parameters:

207

- device: Optional device specification for cache reset

208

"""

209

210

async def sleep(self, level: int = 1) -> None:

211

"""

212

Put async engine to sleep to free resources.

213

214

Parameters:

215

- level: Sleep level (1=light, 2=deep)

216

"""

217

218

async def wake_up(self, tags: Optional[List[str]] = None) -> None:

219

"""

220

Wake up sleeping async engine.

221

222

Parameters:

223

- tags: Optional tags for selective wake-up

224

"""

225

226

async def is_sleeping(self) -> bool:

227

"""Check if engine is currently sleeping."""

228

229

async def add_lora(self, lora_request: LoRARequest) -> None:

230

"""

231

Add LoRA adapter to engine.

232

233

Parameters:

234

- lora_request: LoRA adapter configuration to add

235

"""

236

237

async def collective_rpc(

238

self,

239

method: str,

240

timeout: Optional[float] = None,

241

args: Tuple[Any, ...] = (),

242

kwargs: Optional[Dict[str, Any]] = None

243

) -> List[Any]:

244

"""

245

Execute RPC calls on all workers asynchronously.

246

247

Parameters:

248

- method: Method name to call on workers

249

- timeout: Optional timeout for RPC calls

250

- args: Positional arguments for method

251

- kwargs: Keyword arguments for method

252

253

Returns:

254

List of results from all workers

255

"""

256

257

@property

258

def is_running(self) -> bool:

259

"""Check if background loop is running."""

260

261

@property

262

def is_stopped(self) -> bool:

263

"""Check if engine is stopped."""

264

265

@property

266

def errored(self) -> bool:

267

"""Check if engine has errored."""

268

269

@property

270

def dead_error(self) -> Optional[BaseException]:

271

"""Get dead error exception if any."""

272

```

273

274

### Streaming Generation

275

276

Support for real-time streaming of generated tokens as they are produced, enabling responsive user interfaces and immediate feedback.

277

278

```python { .api }

279

async def stream_generate(

280

engine: AsyncLLMEngine,

281

prompt: str,

282

sampling_params: SamplingParams,

283

request_id: str

284

) -> AsyncIterator[str]:

285

"""

286

Stream generated text tokens in real-time.

287

288

Parameters:

289

- engine: AsyncLLMEngine instance

290

- prompt: Input prompt

291

- sampling_params: Generation configuration

292

- request_id: Unique request identifier

293

294

Yields:

295

Individual text tokens as they are generated

296

"""

297

```

298

299

### Concurrent Request Processing

300

301

Handle multiple requests simultaneously with intelligent batching and resource management.

302

303

```python { .api }

304

class RequestManager:

305

async def add_request(

306

self,

307

request_id: str,

308

prompt: str,

309

sampling_params: SamplingParams,

310

prompt_token_ids: Optional[List[int]] = None,

311

arrival_time: Optional[float] = None,

312

lora_request: Optional[LoRARequest] = None

313

) -> None:

314

"""Add new request to processing queue."""

315

316

async def abort_request(self, request_id: str) -> None:

317

"""Cancel pending or running request."""

318

319

async def get_request_status(self, request_id: str) -> RequestStatus:

320

"""Check status of specific request."""

321

```

322

323

## Usage Examples

324

325

### Basic Async Generation

326

327

```python

328

import asyncio

329

from vllm import AsyncLLMEngine, AsyncEngineArgs, SamplingParams

330

331

async def main():

332

# Initialize async engine

333

engine_args = AsyncEngineArgs(

334

model="microsoft/DialoGPT-medium",

335

tensor_parallel_size=1

336

)

337

engine = AsyncLLMEngine.from_engine_args(engine_args)

338

339

# Generate text asynchronously

340

sampling_params = SamplingParams(temperature=0.8, max_tokens=100)

341

request_id = "req-001"

342

343

results = []

344

async for output in engine.generate("Hello world", sampling_params, request_id):

345

results.append(output)

346

print(f"Generated: {output.outputs[0].text}")

347

348

return results

349

350

# Run the async function

351

asyncio.run(main())

352

```

353

354

### Streaming Generation

355

356

```python

357

import asyncio

358

from vllm import AsyncLLMEngine, AsyncEngineArgs, SamplingParams

359

360

async def stream_text():

361

engine_args = AsyncEngineArgs(model="microsoft/DialoGPT-medium")

362

engine = AsyncLLMEngine.from_engine_args(engine_args)

363

364

sampling_params = SamplingParams(

365

temperature=0.8,

366

max_tokens=150,

367

# Enable streaming output

368

)

369

370

request_id = "stream-001"

371

prompt = "Tell me a story about"

372

373

print("Streaming generation:")

374

async for output in engine.generate(prompt, sampling_params, request_id):

375

if output.outputs:

376

# Print only the new text (delta)

377

text = output.outputs[0].text

378

print(text, end="", flush=True)

379

380

print("\nGeneration complete!")

381

382

asyncio.run(stream_text())

383

```

384

385

### Concurrent Multiple Requests

386

387

```python

388

import asyncio

389

from vllm import AsyncLLMEngine, AsyncEngineArgs, SamplingParams

390

391

async def process_request(engine, prompt, request_id):

392

sampling_params = SamplingParams(temperature=0.7, max_tokens=100)

393

394

results = []

395

async for output in engine.generate(prompt, sampling_params, request_id):

396

results.append(output)

397

398

return request_id, results[-1].outputs[0].text

399

400

async def concurrent_generation():

401

engine_args = AsyncEngineArgs(

402

model="microsoft/DialoGPT-medium",

403

max_num_seqs=10 # Support up to 10 concurrent requests

404

)

405

engine = AsyncLLMEngine.from_engine_args(engine_args)

406

407

# Process multiple requests concurrently

408

prompts = [

409

"The future of AI is",

410

"Once upon a time",

411

"In a galaxy far away",

412

"The secret to happiness"

413

]

414

415

# Create concurrent tasks

416

tasks = [

417

process_request(engine, prompt, f"req-{i}")

418

for i, prompt in enumerate(prompts)

419

]

420

421

# Wait for all requests to complete

422

results = await asyncio.gather(*tasks)

423

424

for request_id, generated_text in results:

425

print(f"{request_id}: {generated_text}")

426

427

asyncio.run(concurrent_generation())

428

```

429

430

### Request Management and Cancellation

431

432

```python

433

import asyncio

434

from vllm import AsyncLLMEngine, AsyncEngineArgs, SamplingParams

435

436

async def request_with_timeout():

437

engine_args = AsyncEngineArgs(model="microsoft/DialoGPT-medium")

438

engine = AsyncLLMEngine.from_engine_args(engine_args)

439

440

sampling_params = SamplingParams(

441

temperature=0.8,

442

max_tokens=500 # Long generation

443

)

444

445

request_id = "timeout-test"

446

447

async def generate_text():

448

async for output in engine.generate(

449

"Write a very long story",

450

sampling_params,

451

request_id

452

):

453

print(f"Generated so far: {len(output.outputs[0].text)} chars")

454

yield output

455

456

try:

457

# Set a timeout for generation

458

results = []

459

async for output in asyncio.wait_for(generate_text(), timeout=5.0):

460

results.append(output)

461

462

except asyncio.TimeoutError:

463

print("Generation timed out, cancelling request...")

464

await engine.abort(request_id)

465

print("Request cancelled successfully")

466

467

asyncio.run(request_with_timeout())

468

```

469

470

### Integration with Web Framework

471

472

```python

473

from fastapi import FastAPI

474

from fastapi.responses import StreamingResponse

475

from vllm import AsyncLLMEngine, AsyncEngineArgs, SamplingParams

476

import json

477

import uuid

478

479

app = FastAPI()

480

481

# Initialize engine at startup

482

engine_args = AsyncEngineArgs(model="microsoft/DialoGPT-medium")

483

engine = AsyncLLMEngine.from_engine_args(engine_args)

484

485

@app.post("/generate")

486

async def generate_text(prompt: str, max_tokens: int = 100):

487

request_id = str(uuid.uuid4())

488

sampling_params = SamplingParams(

489

temperature=0.8,

490

max_tokens=max_tokens

491

)

492

493

async def generate_stream():

494

async for output in engine.generate(prompt, sampling_params, request_id):

495

yield f"data: {json.dumps({'text': output.outputs[0].text})}\\n\\n"

496

497

return StreamingResponse(

498

generate_stream(),

499

media_type="text/plain",

500

headers={"Cache-Control": "no-cache"}

501

)

502

503

@app.delete("/cancel/{request_id}")

504

async def cancel_request(request_id: str):

505

await engine.abort(request_id)

506

return {"message": "Request cancelled"}

507

```

508

509

## Types

510

511

```python { .api }

512

class RequestStatus(str, Enum):

513

WAITING = "waiting"

514

RUNNING = "running"

515

COMPLETED = "completed"

516

CANCELLED = "cancelled"

517

FAILED = "failed"

518

519

class AsyncEngineArgs(EngineArgs):

520

# Inherits all EngineArgs fields plus:

521

worker_use_ray: bool = False

522

engine_use_ray: bool = False

523

disable_log_requests: bool = False

524

max_log_len: Optional[int] = None

525

526

class StreamingRequest:

527

request_id: str

528

prompt: str

529

sampling_params: SamplingParams

530

arrival_time: float

531

lora_request: Optional[LoRARequest]

532

```