0
# Vectorized Environments
1
2
Environment vectorization and wrappers for parallel training, normalization, monitoring, and other common preprocessing tasks. These components enable efficient training across multiple environment instances and provide essential functionality for production RL systems.
3
4
## Capabilities
5
6
### Vectorized Environment Base Classes
7
8
Foundation classes for creating vectorized environments that enable parallel execution and consistent interfaces across different parallelization strategies.
9
10
```python { .api }
11
class VecEnv:
12
"""
13
Abstract base class for vectorized environments.
14
15
Args:
16
num_envs: Number of environments
17
observation_space: Single environment observation space
18
action_space: Single environment action space
19
"""
20
def __init__(
21
self,
22
num_envs: int,
23
observation_space: gym.spaces.Space,
24
action_space: gym.spaces.Space,
25
): ...
26
27
def reset(self) -> VecEnvObs:
28
"""
29
Reset all environments.
30
31
Returns:
32
Observations from all environments
33
"""
34
35
def step_async(self, actions: np.ndarray) -> None:
36
"""
37
Tell environments to start stepping with given actions.
38
39
Args:
40
actions: Actions for each environment
41
"""
42
43
def step_wait(self) -> VecEnvStepReturn:
44
"""
45
Wait for environments to finish stepping.
46
47
Returns:
48
Tuple of (observations, rewards, dones, infos)
49
"""
50
51
def step(self, actions: np.ndarray) -> VecEnvStepReturn:
52
"""
53
Step all environments synchronously.
54
55
Args:
56
actions: Actions for each environment
57
58
Returns:
59
Tuple of (observations, rewards, dones, infos)
60
"""
61
62
def close(self) -> None:
63
"""Close all environments."""
64
65
def get_attr(self, attr_name: str, indices: VecEnvIndices = None) -> List[Any]:
66
"""
67
Get attribute from environments.
68
69
Args:
70
attr_name: Name of attribute to get
71
indices: Environment indices (None for all)
72
73
Returns:
74
List of attribute values
75
"""
76
77
def set_attr(
78
self, attr_name: str, value: Any, indices: VecEnvIndices = None
79
) -> None:
80
"""
81
Set attribute in environments.
82
83
Args:
84
attr_name: Name of attribute to set
85
value: Value to set
86
indices: Environment indices (None for all)
87
"""
88
89
def env_method(
90
self,
91
method_name: str,
92
*method_args,
93
indices: VecEnvIndices = None,
94
**method_kwargs,
95
) -> List[Any]:
96
"""
97
Call method on environments.
98
99
Args:
100
method_name: Name of method to call
101
*method_args: Positional arguments for method
102
indices: Environment indices (None for all)
103
**method_kwargs: Keyword arguments for method
104
105
Returns:
106
List of method return values
107
"""
108
109
def seed(self, seed: Optional[int] = None) -> List[Union[None, int]]:
110
"""
111
Set random seed for environments.
112
113
Args:
114
seed: Random seed
115
116
Returns:
117
List of seeds used by each environment
118
"""
119
120
def render(self, mode: str = "human") -> Optional[np.ndarray]:
121
"""
122
Render environments.
123
124
Args:
125
mode: Rendering mode
126
127
Returns:
128
Rendered images if mode is 'rgb_array'
129
"""
130
```
131
132
### Sequential Vectorized Environment
133
134
Simple vectorized environment that runs environments sequentially in the same process, suitable for lightweight environments and debugging.
135
136
```python { .api }
137
class DummyVecEnv(VecEnv):
138
"""
139
Sequential vectorized environment.
140
141
Args:
142
env_fns: List of functions that create environments
143
"""
144
def __init__(self, env_fns: List[Callable[[], gym.Env]]): ...
145
146
def reset(self) -> VecEnvObs:
147
"""Reset all environments sequentially."""
148
149
def step_async(self, actions: np.ndarray) -> None:
150
"""Store actions for stepping."""
151
152
def step_wait(self) -> VecEnvStepReturn:
153
"""Step all environments sequentially."""
154
155
def close(self) -> None:
156
"""Close all environments."""
157
158
def render(self, mode: str = "human") -> Optional[np.ndarray]:
159
"""Render first environment."""
160
```
161
162
### Multiprocessing Vectorized Environment
163
164
Vectorized environment that runs environments in separate processes for true parallelization, ideal for computationally expensive environments.
165
166
```python { .api }
167
class SubprocVecEnv(VecEnv):
168
"""
169
Multiprocessing vectorized environment.
170
171
Args:
172
env_fns: List of functions that create environments
173
start_method: Multiprocessing start method ('spawn', 'fork', 'forkserver')
174
"""
175
def __init__(
176
self,
177
env_fns: List[Callable[[], gym.Env]],
178
start_method: Optional[str] = None,
179
): ...
180
181
def reset(self) -> VecEnvObs:
182
"""Reset all environments in parallel."""
183
184
def step_async(self, actions: np.ndarray) -> None:
185
"""Send actions to worker processes."""
186
187
def step_wait(self) -> VecEnvStepReturn:
188
"""Collect results from worker processes."""
189
190
def close(self) -> None:
191
"""Close all worker processes."""
192
193
def render(self, mode: str = "human") -> Optional[np.ndarray]:
194
"""Render first environment."""
195
```
196
197
### Vectorized Environment Wrappers
198
199
Base class and common wrappers for adding functionality to vectorized environments while maintaining the vectorized interface.
200
201
```python { .api }
202
class VecEnvWrapper(VecEnv):
203
"""
204
Base class for vectorized environment wrappers.
205
206
Args:
207
venv: Vectorized environment to wrap
208
"""
209
def __init__(self, venv: VecEnv): ...
210
211
def reset(self) -> VecEnvObs:
212
"""Reset wrapped environment."""
213
214
def step_async(self, actions: np.ndarray) -> None:
215
"""Forward step_async to wrapped environment."""
216
217
def step_wait(self) -> VecEnvStepReturn:
218
"""Forward step_wait to wrapped environment."""
219
220
def close(self) -> None:
221
"""Close wrapped environment."""
222
223
class VecNormalize(VecEnvWrapper):
224
"""
225
Normalize observations and rewards using running statistics.
226
227
Args:
228
venv: Vectorized environment to wrap
229
training: Whether in training mode (updates statistics)
230
norm_obs: Whether to normalize observations
231
norm_reward: Whether to normalize rewards
232
clip_obs: Observation clipping range
233
clip_reward: Reward clipping range
234
gamma: Discount factor for reward normalization
235
epsilon: Small constant for numerical stability
236
norm_obs_keys: Observation keys to normalize (for dict obs)
237
"""
238
def __init__(
239
self,
240
venv: VecEnv,
241
training: bool = True,
242
norm_obs: bool = True,
243
norm_reward: bool = True,
244
clip_obs: float = 10.0,
245
clip_reward: float = 10.0,
246
gamma: float = 0.99,
247
epsilon: float = 1e-8,
248
norm_obs_keys: Optional[List[str]] = None,
249
): ...
250
251
def normalize_obs(self, obs: VecEnvObs) -> VecEnvObs:
252
"""
253
Normalize observations using running statistics.
254
255
Args:
256
obs: Observations to normalize
257
258
Returns:
259
Normalized observations
260
"""
261
262
def normalize_reward(self, reward: np.ndarray) -> np.ndarray:
263
"""
264
Normalize rewards using running statistics.
265
266
Args:
267
reward: Rewards to normalize
268
269
Returns:
270
Normalized rewards
271
"""
272
273
def get_original_obs(self) -> Optional[VecEnvObs]:
274
"""Get unnormalized observations."""
275
276
def get_original_reward(self) -> Optional[np.ndarray]:
277
"""Get unnormalized rewards."""
278
279
def reset(self) -> VecEnvObs:
280
"""Reset and normalize observations."""
281
282
def step_wait(self) -> VecEnvStepReturn:
283
"""Step and normalize observations/rewards."""
284
285
class VecFrameStack(VecEnvWrapper):
286
"""
287
Stack frames for recurrent policies or temporal information.
288
289
Args:
290
venv: Vectorized environment to wrap
291
n_stack: Number of frames to stack
292
channels_order: Channel order ('last' or 'first')
293
"""
294
def __init__(
295
self,
296
venv: VecEnv,
297
n_stack: int,
298
channels_order: str = "last",
299
): ...
300
301
def reset(self) -> VecEnvObs:
302
"""Reset and initialize frame stack."""
303
304
def step_wait(self) -> VecEnvStepReturn:
305
"""Step and update frame stack."""
306
307
class VecTransposeImage(VecEnvWrapper):
308
"""
309
Transpose image observations from (H, W, C) to (C, H, W).
310
311
Args:
312
venv: Vectorized environment to wrap
313
skip: Skip transposition (for debugging)
314
"""
315
def __init__(self, venv: VecEnv, skip: bool = False): ...
316
317
class VecMonitor(VecEnvWrapper):
318
"""
319
Monitor wrapper for vectorized environments.
320
321
Args:
322
venv: Vectorized environment to wrap
323
filename: Path to log file (None for no logging)
324
info_keywords: Info dict keys to log
325
"""
326
def __init__(
327
self,
328
venv: VecEnv,
329
filename: Optional[str] = None,
330
info_keywords: Tuple[str, ...] = (),
331
): ...
332
333
class VecCheckNan(VecEnvWrapper):
334
"""
335
Check for NaN values in observations, rewards, and actions.
336
337
Args:
338
venv: Vectorized environment to wrap
339
raise_exception: Whether to raise exception on NaN detection
340
warn_once: Whether to warn only once per NaN type
341
"""
342
def __init__(
343
self,
344
venv: VecEnv,
345
raise_exception: bool = False,
346
warn_once: bool = True,
347
): ...
348
349
class VecExtractDictObs(VecEnvWrapper):
350
"""
351
Extract specific key from dictionary observations.
352
353
Args:
354
venv: Vectorized environment to wrap
355
key: Dictionary key to extract
356
"""
357
def __init__(self, venv: VecEnv, key: str): ...
358
359
class VecVideoRecorder(VecEnvWrapper):
360
"""
361
Record videos from vectorized environments.
362
363
Args:
364
venv: Vectorized environment to wrap
365
video_folder: Directory to save videos
366
record_video_trigger: Function determining when to record
367
video_length: Length of recorded videos
368
name_prefix: Prefix for video filenames
369
"""
370
def __init__(
371
self,
372
venv: VecEnv,
373
video_folder: str,
374
record_video_trigger: Callable[[int], bool],
375
video_length: int = 200,
376
name_prefix: str = "rl-video",
377
): ...
378
```
379
380
### Environment Utilities
381
382
Additional utilities for environment management, monitoring, and validation that complement the vectorized environment system.
383
384
```python { .api }
385
class Monitor(gym.Wrapper):
386
"""
387
Environment wrapper for logging episode statistics.
388
389
Args:
390
env: Environment to wrap
391
filename: Path to log file (None for no logging)
392
allow_early_resets: Allow resetting before episode completion
393
reset_keywords: Keywords to log from reset info
394
info_keywords: Keywords to log from step info
395
override_existing: Whether to override existing log file
396
"""
397
def __init__(
398
self,
399
env: gym.Env,
400
filename: Optional[str] = None,
401
allow_early_resets: bool = True,
402
reset_keywords: Tuple[str, ...] = (),
403
info_keywords: Tuple[str, ...] = (),
404
override_existing: bool = True,
405
): ...
406
407
def reset(self, **kwargs) -> Tuple[np.ndarray, Dict[str, Any]]:
408
"""Reset environment and log episode statistics."""
409
410
def step(self, action: np.ndarray) -> Tuple[np.ndarray, float, bool, bool, Dict[str, Any]]:
411
"""Step environment and log statistics."""
412
413
def make_vec_env(
414
env_id: Union[str, Callable[[], gym.Env]],
415
n_envs: int = 1,
416
seed: Optional[int] = None,
417
start_index: int = 0,
418
monitor_dir: Optional[str] = None,
419
wrapper_class: Optional[Callable[[gym.Env], gym.Env]] = None,
420
env_kwargs: Optional[Dict[str, Any]] = None,
421
vec_env_cls: Type[VecEnv] = DummyVecEnv,
422
vec_env_kwargs: Optional[Dict[str, Any]] = None,
423
monitor_kwargs: Optional[Dict[str, Any]] = None,
424
wrapper_kwargs: Optional[Dict[str, Any]] = None,
425
) -> VecEnv:
426
"""
427
Create vectorized environment with optional monitoring and wrappers.
428
429
Args:
430
env_id: Environment ID or environment creation function
431
n_envs: Number of environments
432
seed: Random seed for environments
433
start_index: Starting index for environment seeds
434
monitor_dir: Directory for Monitor logs
435
wrapper_class: Environment wrapper class
436
env_kwargs: Arguments for environment creation
437
vec_env_cls: Vectorized environment class
438
vec_env_kwargs: Arguments for vectorized environment
439
monitor_kwargs: Arguments for Monitor wrapper
440
wrapper_kwargs: Arguments for environment wrapper
441
442
Returns:
443
Vectorized environment
444
"""
445
446
def check_env(
447
env: gym.Env,
448
warn: bool = True,
449
skip_render_check: bool = True,
450
) -> None:
451
"""
452
Check environment compliance with Gym interface.
453
454
Args:
455
env: Environment to check
456
warn: Whether to show warnings
457
skip_render_check: Skip render method checking
458
"""
459
```
460
461
## Usage Examples
462
463
### Basic Vectorized Environment Setup
464
465
```python
466
import gymnasium as gym
467
from stable_baselines3.common.vec_env import DummyVecEnv, SubprocVecEnv
468
469
# Sequential vectorization (single process)
470
env_fns = [lambda: gym.make("CartPole-v1") for _ in range(4)]
471
vec_env = DummyVecEnv(env_fns)
472
473
# Parallel vectorization (multiprocessing)
474
vec_env = SubprocVecEnv(env_fns)
475
476
# Use with algorithm
477
from stable_baselines3 import PPO
478
model = PPO("MlpPolicy", vec_env, verbose=1)
479
```
480
481
### Environment Normalization
482
483
```python
484
from stable_baselines3.common.vec_env import VecNormalize
485
486
# Create and wrap environment
487
vec_env = DummyVecEnv([lambda: gym.make("Pendulum-v1") for _ in range(4)])
488
vec_env = VecNormalize(
489
vec_env,
490
norm_obs=True,
491
norm_reward=True,
492
clip_obs=10.0,
493
clip_reward=10.0,
494
)
495
496
# Train with normalization
497
model = PPO("MlpPolicy", vec_env, verbose=1)
498
model.learn(total_timesteps=10000)
499
500
# Save normalization statistics
501
vec_env.save("vecnormalize.pkl")
502
503
# Load for evaluation
504
vec_env = VecNormalize.load("vecnormalize.pkl", vec_env)
505
vec_env.training = False # Disable updates during evaluation
506
```
507
508
### Frame Stacking for Atari
509
510
```python
511
from stable_baselines3.common.vec_env import VecFrameStack, VecTransposeImage
512
513
# Create Atari environment with frame stacking
514
env_fns = [lambda: gym.make("BreakoutNoFrameskip-v4") for _ in range(4)]
515
vec_env = DummyVecEnv(env_fns)
516
517
# Transpose images for CNN (H,W,C) -> (C,H,W)
518
vec_env = VecTransposeImage(vec_env)
519
520
# Stack 4 frames
521
vec_env = VecFrameStack(vec_env, n_stack=4)
522
523
model = PPO("CnnPolicy", vec_env, verbose=1)
524
```
525
526
### Environment Monitoring
527
528
```python
529
from stable_baselines3.common.vec_env import VecMonitor
530
from stable_baselines3.common.monitor import Monitor
531
532
# Single environment monitoring
533
env = Monitor(gym.make("CartPole-v1"), "training.log")
534
535
# Vectorized environment monitoring
536
vec_env = DummyVecEnv([lambda: gym.make("CartPole-v1") for _ in range(4)])
537
vec_env = VecMonitor(vec_env, "vec_training.log")
538
539
# Load monitoring results
540
from stable_baselines3.common.monitor import load_results
541
import pandas as pd
542
543
results = load_results("training.log")
544
print(f"Mean reward: {results['r'].mean():.2f}")
545
```
546
547
### Custom Environment Creation
548
549
```python
550
from stable_baselines3.common.vec_env import make_vec_env
551
552
# Create multiple environments with monitoring
553
vec_env = make_vec_env(
554
"CartPole-v1",
555
n_envs=4,
556
seed=42,
557
monitor_dir="logs/",
558
vec_env_cls=SubprocVecEnv,
559
)
560
561
# Custom environment function
562
def make_custom_env():
563
env = gym.make("CartPole-v1")
564
# Add custom preprocessing here
565
return env
566
567
vec_env = make_vec_env(
568
make_custom_env,
569
n_envs=4,
570
vec_env_cls=DummyVecEnv,
571
)
572
```
573
574
### Utility Functions
575
576
Environment utility functions for wrapper management and synchronization:
577
578
```python { .api }
579
def unwrap_vec_wrapper(env: VecEnv, vec_wrapper_class: type[VecEnvWrapper]) -> Optional[VecEnvWrapper]:
580
"""
581
Retrieve a VecEnvWrapper object by recursively searching.
582
583
Args:
584
env: The VecEnv that is going to be unwrapped
585
vec_wrapper_class: The desired VecEnvWrapper class
586
587
Returns:
588
The VecEnvWrapper object if found, None otherwise
589
"""
590
591
def unwrap_vec_normalize(env: VecEnv) -> Optional[VecNormalize]:
592
"""
593
Retrieve a VecNormalize object by recursively searching.
594
595
Args:
596
env: The VecEnv that is going to be unwrapped
597
598
Returns:
599
The VecNormalize object if found, None otherwise
600
"""
601
602
def is_vecenv_wrapped(env: VecEnv, vec_wrapper_class: type[VecEnvWrapper]) -> bool:
603
"""
604
Check if an environment is already wrapped in a given VecEnvWrapper.
605
606
Args:
607
env: The VecEnv that is going to be checked
608
vec_wrapper_class: The desired VecEnvWrapper class
609
610
Returns:
611
True if wrapped with the desired wrapper, False otherwise
612
"""
613
614
def sync_envs_normalization(env: VecEnv, eval_env: VecEnv) -> None:
615
"""
616
Synchronize the normalization statistics of train and eval environments
617
when both are wrapped in VecNormalize.
618
619
Args:
620
env: Training environment
621
eval_env: Environment used for evaluation
622
"""
623
```
624
625
## Types
626
627
```python { .api }
628
from typing import Union, Optional, Type, Callable, Dict, Any, List, Tuple, Sequence
629
import numpy as np
630
import gymnasium as gym
631
from stable_baselines3.common.vec_env import VecEnv, VecEnvWrapper, DummyVecEnv, SubprocVecEnv
632
from stable_baselines3.common.monitor import Monitor
633
from stable_baselines3.common.type_aliases import GymEnv
634
```