Environments#

In reinforcement learning, agents interact with environments to improve their performance through trial and error. This tutorial explores how Tianshou handles environments, from basic single-environment setups to advanced vectorized and parallel configurations.


The agent-environment interaction loop

Tianshou maintains full compatibility with the Gymnasium API (formerly OpenAI Gym), making it easy to use any Gymnasium-compatible environment.

The Bottleneck Problem#

In a standard Gymnasium environment, each interaction follows a sequential pattern:

  1. Agent selects an action

  2. Environment processes the action and returns observation and reward

  3. Repeat

This sequential process can become a significant bottleneck in deep reinforcement learning experiments, especially when:

  • The environment simulation is computationally intensive

  • Network training is fast but data collection is slow

  • You have multiple CPU cores available but aren’t using them

Tianshou addresses this bottleneck through vectorized environments, which allow parallel sampling across multiple CPU cores.

Vectorized Environments#

Vectorized environments enable you to run multiple environment instances in parallel, dramatically accelerating data collection. Let’s see this in action.

import time

import gymnasium as gym
import numpy as np

from tianshou.env import DummyVectorEnv, SubprocVectorEnv

Performance Comparison#

Let’s compare the sampling speed with different numbers of parallel environments:

num_cpus = [1, 2, 5]

for num_cpu in num_cpus:
    # Create vectorized environment with multiple processes
    env = SubprocVectorEnv([lambda: gym.make("CartPole-v1") for _ in range(num_cpu)])
    env.reset()

    sampled_steps = 0
    time_start = time.time()

    # Sample 1000 steps
    while sampled_steps < 1000:
        act = np.random.choice(2, size=num_cpu)
        obs, rew, terminated, truncated, info = env.step(act)

        # Reset terminated environments
        if np.sum(terminated):
            env.reset(np.where(terminated)[0])

        sampled_steps += num_cpu

    time_used = time.time() - time_start
    print(f"Sampled 1000 steps in {time_used:.3f}s using {num_cpu} CPU(s)")
    print(f"  → Speed: {1000 / time_used:.1f} steps/second")
Sampled 1000 steps in 0.131s using 1 CPU(s)
  → Speed: 7630.2 steps/second
Sampled 1000 steps in 0.088s using 2 CPU(s)
  → Speed: 11388.9 steps/second
Sampled 1000 steps in 0.080s using 5 CPU(s)
  → Speed: 12458.5 steps/second

Understanding the Results#

You might notice that the speedup isn’t perfectly linear with the number of CPUs. Several factors contribute to this:

  1. Straggler Effect: In synchronous mode, all environments must complete before the next batch begins. Slower environments hold back faster ones.

  2. Communication Overhead: Inter-process communication has costs, especially for fast environments.

  3. Environment Complexity: For simple environments like CartPole, the overhead may outweigh the benefits.

Important: SubprocVectorEnv should only be used when environment execution is slow. For simple, fast environments like CartPole, DummyVectorEnv (or even raw Gymnasium environments) can be more efficient because they avoid both the straggler effect and inter-process communication overhead.

Types of Vectorized Environments#

Tianshou provides several vectorized environment implementations, each optimized for different scenarios:

1. DummyVectorEnv#

Pseudo-parallel simulation using a for-loop

  • Best for: Simple/fast environments, debugging

  • Pros: No overhead, deterministic execution

  • Cons: No actual parallelization

2. SubprocVectorEnv#

Multiple processes for true parallel simulation

  • Best for: Most parallel simulation scenarios

  • Pros: True parallelization, good balance

  • Cons: Inter-process communication overhead

3. ShmemVectorEnv#

Shared memory optimization of SubprocVectorEnv

  • Best for: Environments with large observations (e.g., images)

  • Pros: Reduced memory footprint, faster for large states

  • Cons: More complex implementation

4. RayVectorEnv#

Ray-based distributed simulation

  • Best for: Cluster computing with multiple machines

  • Pros: Scales to multiple machines

  • Cons: Requires Ray installation and setup

All these classes share the same API through their base class BaseVectorEnv, making it easy to switch between them.

Basic Usage#

Creating a Vectorized Environment#

# Standard Gymnasium environment
gym_env = gym.make("CartPole-v1")


# Tianshou vectorized environment
def create_cartpole_env() -> gym.Env:
    return gym.make("CartPole-v1")


# Create 5 parallel environments
vector_env = DummyVectorEnv([create_cartpole_env for _ in range(5)])

print(f"Created vectorized environment with {vector_env.env_num} environments")
Created vectorized environment with 5 environments

Environment Interaction#

The key difference from standard Gymnasium is that actions, observations, and rewards are all vectorized:

# Standard Gymnasium: reset() returns a single observation
print("Standard Gymnasium reset:")
single_obs, info = gym_env.reset()
print(f"  Shape: {single_obs.shape}")
print(f"  Value: {single_obs}")

print("\n" + "=" * 50 + "\n")

# Vectorized environment: reset() returns stacked observations
print("Vectorized environment reset:")
vector_obs, info = vector_env.reset()
print(f"  Shape: {vector_obs.shape}")
print(f"  Value:\n{vector_obs}")
Standard Gymnasium reset:
  Shape: (4,)
  Value: [ 0.02636302 -0.03295556  0.00295677 -0.00111402]

==================================================

Vectorized environment reset:
  Shape: (5, 4)
  Value:
[[-0.01547127 -0.00080151  0.02923353  0.017684  ]
 [-0.01257151 -0.0094416   0.00460688  0.0339024 ]
 [-0.03552658  0.03945942 -0.04704935  0.01015025]
 [ 0.01002491  0.01548317  0.01778141 -0.01460267]
 [ 0.04058615 -0.00712666  0.02138552  0.03704324]]

Taking Vectorized Steps#

# Take random actions in all environments
actions = np.random.choice(2, size=vector_env.env_num)
obs, rew, terminated, truncated, info = vector_env.step(actions)

print(f"Actions taken: {actions}")
print(f"Rewards received: {rew}")
print(f"Terminated flags: {terminated}")
print("Info", info)
Actions taken: [0 0 1 0 0]
Rewards received: [1. 1. 1. 1. 1.]
Terminated flags: [False False False False False]
Info [{'env_id': 0} {'env_id': 1} {'env_id': 2} {'env_id': 3} {'env_id': 4}]

Selective Environment Execution#

You can interact with specific environments using the id parameter:

# Execute only environments 0, 1, and 3
selected_actions = np.random.choice(2, size=3)
obs, rew, terminated, truncated, info = vector_env.step(selected_actions, id=[0, 1, 3])

print("Executed actions in environments [0, 1, 3]")
print(f"Received {len(rew)} results")
Executed actions in environments [0, 1, 3]
Received 3 results

Parallel Sampling: Synchronous vs Asynchronous#

Synchronous Mode (Default)#

By default, vectorized environments operate synchronously: a step completes only after all environments finish their step. This works well when all environments take roughly the same time per step.

Asynchronous Mode#

When environment step times vary significantly (e.g., 90% of steps take 1s, but 10% take 10s), asynchronous mode can help. It allows faster environments to continue without waiting for slower ones.


Comparison of synchronous and asynchronous vectorized environments
(Steps with the same color are processed together)

Enabling Asynchronous Mode#

Use the wait_num or timeout parameters (or both):

from functools import partial


# Create environments with varying step times
class SlowEnv(gym.Env):
    """Environment with variable step duration."""

    def __init__(self, sleep_time):
        self.sleep_time = sleep_time
        self.observation_space = gym.spaces.Box(low=0, high=1, shape=(4,))
        self.action_space = gym.spaces.Discrete(2)
        super().__init__()

    def reset(self, seed=None, options=None):
        super().reset(seed=seed)
        return np.random.rand(4), {}

    def step(self, action):
        time.sleep(self.sleep_time)  # Simulate slow computation
        return np.random.rand(4), 0.0, False, False, {}


# Create async vectorized environment
env_fns = [partial(SlowEnv, sleep_time=0.01 * i) for i in [1, 2, 3, 4]]
async_env = SubprocVectorEnv(env_fns, wait_num=3, timeout=0.1)

print("Asynchronous environment created")
print("  wait_num=3: Returns after 3 environments complete")
print("  timeout=0.1: Or after 0.1 seconds, whichever comes first")
Asynchronous environment created
  wait_num=3: Returns after 3 environments complete
  timeout=0.1: Or after 0.1 seconds, whichever comes first

How Async Parameters Work#

  • wait_num: Minimum number of environments to wait for (e.g., wait_num=3 means each step returns results from at least 3 environments)

  • timeout: Maximum time to wait in seconds (acts as a dynamic wait_num—returns whatever is ready after timeout)

  • If no environment finishes within the timeout, the system waits until at least one completes

Warning: Asynchronous collectors can cause exceptions when used as test_collector in trainers. Always use synchronous mode for test collectors.

EnvPool Integration#

EnvPool is a C++-based vectorized environment library that provides significant performance improvements over Python-based solutions for many of the standard environments. Tianshou fully supports EnvPool with minimal code changes.

Why EnvPool?#

  • Performance: 10x-100x faster than standard vectorized environments for supported environments

  • Memory Efficient: Optimized memory usage through shared buffers

  • Drop-in Replacement: Nearly identical API to Tianshou’s vectorized environments

Supported Environments#

EnvPool currently supports:

  • Atari games

  • MuJoCo physics simulations

  • VizDoom 3D environments

  • Classic control environments

  • Toy text environments

Using EnvPool#

First, install EnvPool:

pip install envpool

Then use it directly with Tianshou:

import envpool

# Create EnvPool vectorized environment
envs = envpool.make_gymnasium("CartPole-v1", num_envs=10)

print(f"Created EnvPool environment with {envs.spec.config.num_envs} environments")
print("Ready to use with Tianshou collectors!")

# Use directly with Tianshou
collector = Collector(algorithm, envs, buffer)

EnvPool Examples#

For complete examples of using EnvPool with Tianshou:

Custom Environments and State Representations#

Tianshou works seamlessly with custom environments as long as they follow the Gymnasium API. Let’s explore how to handle different state representations.

Required Gymnasium API#

Your custom environment must implement:

class MyEnv(gym.Env):
    def reset(self, seed=None, options=None) -> Tuple[observation, info]:
        """Reset environment to initial state."""
        pass
    
    def step(self, action) -> Tuple[observation, reward, terminated, truncated, info]:
        """Execute one step in the environment."""
        pass
    
    def seed(self, seed: int) -> List[int]:
        """Set random seed."""
        pass
    
    def render(self, mode='human') -> Any:
        """Render the environment."""
        pass
    
    def close(self) -> None:
        """Clean up resources."""
        pass
    
    # Required spaces
    observation_space: gym.Space
    action_space: gym.Space

Important: Make sure your seed() method is implemented correctly:

def seed(self, seed):
    np.random.seed(seed)
    # Also seed other random generators used in your environment

Without proper seeding, parallel environments may produce identical outputs!

Dictionary Observations#

Many environments return observations as dictionaries rather than simple arrays. Tianshou’s Batch class handles this elegantly.

Example with the FetchReach environment:

from tianshou.data import Batch, ReplayBuffer

# Example: Creating a mock observation similar to FetchReach
observation = {
    "observation": np.array([1.34, 0.75, 0.53, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]),
    "achieved_goal": np.array([1.34, 0.75, 0.53]),
    "desired_goal": np.array([1.24, 0.78, 0.63]),
}

# Store in replay buffer
buffer = ReplayBuffer(size=10)
buffer.add(Batch(obs=observation, act=0, rew=0.0, terminated=False, truncated=False))

print("Stored observation structure:")
print(buffer.obs)
Stored observation structure:
Batch(
    observation: array([[1.34, 0.75, 0.53, 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  ],
                        [0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  ],
                        [0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  ],
                        [0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  ],
                        [0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  ],
                        [0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  ],
                        [0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  ],
                        [0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  ],
                        [0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  ],
                        [0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  ]]),
    achieved_goal: array([[1.34, 0.75, 0.53],
                          [0.  , 0.  , 0.  ],
                          [0.  , 0.  , 0.  ],
                          [0.  , 0.  , 0.  ],
                          [0.  , 0.  , 0.  ],
                          [0.  , 0.  , 0.  ],
                          [0.  , 0.  , 0.  ],
                          [0.  , 0.  , 0.  ],
                          [0.  , 0.  , 0.  ],
                          [0.  , 0.  , 0.  ]]),
    desired_goal: array([[1.24, 0.78, 0.63],
                         [0.  , 0.  , 0.  ],
                         [0.  , 0.  , 0.  ],
                         [0.  , 0.  , 0.  ],
                         [0.  , 0.  , 0.  ],
                         [0.  , 0.  , 0.  ],
                         [0.  , 0.  , 0.  ],
                         [0.  , 0.  , 0.  ],
                         [0.  , 0.  , 0.  ],
                         [0.  , 0.  , 0.  ]]),
)

Accessing Dictionary Observations#

When sampling from the buffer, you can access nested dictionary values in multiple ways:

# Sample a batch
batch, indices = buffer.sample(batch_size=1)

print("Batch keys:", list(batch.keys()))
print("\nAccessing nested observation:")

# Recommended way: access through batch first
print("batch.obs.desired_goal[0]:", batch.obs.desired_goal[0])

# Alternative ways (not recommended)
print("batch.obs[0].desired_goal:", batch.obs[0].desired_goal)
print("batch[0].obs.desired_goal:", batch[0].obs.desired_goal)
Batch keys: ['obs', 'act', 'rew', 'terminated', 'truncated', 'done', 'obs_next', 'info', 'policy']

Accessing nested observation:
batch.obs.desired_goal[0]: [1.24 0.78 0.63]
batch.obs[0].desired_goal: [1.24 0.78 0.63]
batch[0].obs.desired_goal: [1.24 0.78 0.63]

Using Dictionary Observations in Networks#

When designing networks for environments with dictionary observations:

import torch
import torch.nn as nn


class CustomNetwork(nn.Module):
    """Network that processes dictionary observations."""

    def __init__(self, obs_dim, goal_dim, hidden_dim, action_dim):
        super().__init__()

        # Separate processing for different observation components
        self.obs_encoder = nn.Linear(obs_dim, hidden_dim)
        self.goal_encoder = nn.Linear(goal_dim * 2, hidden_dim)  # achieved + desired

        # Combined processing
        self.fc = nn.Sequential(
            nn.Linear(hidden_dim * 2, hidden_dim), nn.ReLU(), nn.Linear(hidden_dim, action_dim)
        )

    def forward(self, obs_batch, **kwargs):
        # Extract components from the batch
        observation = obs_batch.observation
        achieved_goal = obs_batch.achieved_goal
        desired_goal = obs_batch.desired_goal

        # Process each component
        obs_feat = self.obs_encoder(observation)
        goal_feat = self.goal_encoder(torch.cat([achieved_goal, desired_goal], dim=-1))

        # Combine and output
        combined = torch.cat([obs_feat, goal_feat], dim=-1)
        return self.fc(combined)


# Example usage
net = CustomNetwork(obs_dim=10, goal_dim=3, hidden_dim=64, action_dim=4)
print("Network created for dictionary observations")
print("  Input: observation (10D) + achieved_goal (3D) + desired_goal (3D)")
print("  Output: actions (4D)")
Network created for dictionary observations
  Input: observation (10D) + achieved_goal (3D) + desired_goal (3D)
  Output: actions (4D)

Custom Object States#

For more complex state representations (e.g., graphs, custom objects), Tianshou stores references in numpy arrays. However, you must ensure deep copies to avoid state aliasing:

import copy

import networkx as nx


class GraphEnv(gym.Env):
    """Example environment with graph-based states."""

    def __init__(self):
        super().__init__()
        self.graph = nx.Graph()
        self.action_space = gym.spaces.Discrete(5)
        self.observation_space = gym.spaces.Box(low=0, high=1, shape=(10,))  # for compatibility

    def reset(self, seed=None, options=None):
        super().reset(seed=seed)
        self.graph = nx.erdos_renyi_graph(10, 0.3)
        # IMPORTANT: Return deep copy to avoid reference issues
        return copy.deepcopy(self.graph), {}

    def step(self, action):
        # Modify graph based on action
        if action < 4 and len(self.graph.nodes) > 0:
            nodes = list(self.graph.nodes)
            if len(nodes) >= 2:
                self.graph.add_edge(nodes[0], nodes[1])

        # IMPORTANT: Return deep copy
        return copy.deepcopy(self.graph), 0.0, False, False, {}


# Test storing graph objects
graph_buffer = ReplayBuffer(size=5)
env = GraphEnv()
obs, _ = env.reset()
graph_buffer.add(Batch(obs=obs, act=0, rew=0.0, terminated=False, truncated=False))

print("Graph objects stored in buffer:")
print(graph_buffer.obs)
Graph objects stored in buffer:
[[0 1 2 3 4 5 6 7 8 9]
 [0 0 0 0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0 0 0 0]]

Important: When using custom objects as states:

  1. Always return copy.deepcopy(state) in both reset() and step()

  2. Ensure the object is numpy-compatible: np.array([your_object]) should not result in an empty array

  3. The object may be stored as a shallow copy in the buffer—deep copying prevents state aliasing

Best Practices Summary#

Choosing the Right Environment Wrapper#

Scenario

Recommended Wrapper

Why

Simple/fast environments

DummyVectorEnv or raw Gym

Minimal overhead

Most parallel scenarios

SubprocVectorEnv

Good balance of speed and simplicity

Large observations (images)

ShmemVectorEnv

Optimized memory usage

Multi-machine clusters

RayVectorEnv

Distributed computing support

Maximum performance

EnvPool

C++-based, 10x-100x speedup

Performance Tips#

  1. Profile First: Measure whether environment or training is your bottleneck before optimizing

  2. Start Simple: Begin with DummyVectorEnv for debugging, then upgrade to parallel versions

  3. Use EnvPool: If your environment is supported, EnvPool offers the best performance

  4. Async for Variable Times: Use asynchronous mode only when environment step times vary significantly

  5. Proper Seeding: Always implement the seed() method correctly in custom environments

Common Pitfalls#

  • ❌ Using SubprocVectorEnv for fast environments → Use DummyVectorEnv instead

  • ❌ Forgetting to deep-copy custom states → States will be aliased in the buffer

  • ❌ Not implementing seed() properly → Parallel environments produce identical results

  • ❌ Using async collectors for testing → Causes exceptions in trainers

  • ❌ Assuming linear speedup → Account for communication overhead and straggler effects

Further Reading#