Source code for tianshou.algorithm.modelfree.reinforce

import logging
import warnings
from collections.abc import Callable
from dataclasses import dataclass
from typing import Literal, TypeVar, cast

import gymnasium as gym
import numpy as np
import torch

from tianshou.algorithm import Algorithm
from tianshou.algorithm.algorithm_base import (
    OnPolicyAlgorithm,
    Policy,
    TrainingStats,
)
from tianshou.algorithm.optim import OptimizerFactory
from tianshou.data import (
    Batch,
    ReplayBuffer,
    SequenceSummaryStats,
    to_torch,
    to_torch_as,
)
from tianshou.data.batch import BatchProtocol
from tianshou.data.types import (
    BatchWithReturnsProtocol,
    DistBatchProtocol,
    ObsBatchProtocol,
    RolloutBatchProtocol,
)
from tianshou.utils import RunningMeanStd
from tianshou.utils.net.common import (
    AbstractContinuousActorProbabilistic,
    AbstractDiscreteActor,
    ActionReprNet,
)
from tianshou.utils.net.discrete import dist_fn_categorical_from_logits

log = logging.getLogger(__name__)


# Dimension Naming Convention
# B - Batch Size
# A - Action
# D - Dist input (usually 2, loc and scale)
# H - Dimension of hidden, can be None

TDistFnContinuous = Callable[
    [tuple[torch.Tensor, torch.Tensor]],
    torch.distributions.Distribution,
]
TDistFnDiscrete = Callable[[torch.Tensor], torch.distributions.Distribution]

TDistFnDiscrOrCont = TDistFnContinuous | TDistFnDiscrete


[docs] @dataclass(kw_only=True) class LossSequenceTrainingStats(TrainingStats): loss: SequenceSummaryStats
[docs] @dataclass(kw_only=True) class SimpleLossTrainingStats(TrainingStats): loss: float
[docs] class ProbabilisticActorPolicy(Policy): """ A policy that outputs (representations of) probability distributions from which actions can be sampled. """ def __init__( self, *, actor: AbstractContinuousActorProbabilistic | AbstractDiscreteActor | ActionReprNet, dist_fn: TDistFnDiscrOrCont, deterministic_eval: bool = False, action_space: gym.Space, observation_space: gym.Space | None = None, action_scaling: bool = True, action_bound_method: Literal["clip", "tanh"] | None = "clip", ) -> None: """ :param actor: the actor network following the rules: If `self.action_type == "discrete"`: (`s_B` -> `action_values_BA`). If `self.action_type == "continuous"`: (`s_B` -> `dist_input_BD`). :param dist_fn: the function/type which creates a distribution from the actor output, i.e. it maps the tensor(s) generated by the actor to a torch distribution. For continuous action spaces, the output is typically a pair of tensors (mean, std) and the distribution is a Gaussian distribution. For discrete action spaces, the output is typically a tensor of unnormalized log probabilities ("logits" in PyTorch terminology) or a tensor of probabilities which can serve as the parameters of a Categorical distribution. Note that if the actor uses softmax activation in its final layer, it will produce probabilities, whereas if it uses no activation, it can be considered as producing "logits". As a user, you are responsible for ensuring that the distribution is compatible with the output of the actor model and the action space. :param deterministic_eval: flag indicating whether the policy should use deterministic actions (using the mode of the action distribution) instead of stochastic ones (using random sampling) during evaluation. When enabled, the policy will always select the most probable action according to the learned distribution during evaluation phases, while still using stochastic sampling during training. This creates a clear distinction between exploration (training) and exploitation (evaluation) behaviors. Deterministic actions are generally preferred for final deployment and reproducible evaluation as they provide consistent behavior, reduce variance in performance metrics, and are more interpretable for human observers. Note that this parameter only affects behavior when the policy is not within a training step. When collecting rollouts for training, actions remain stochastic regardless of this setting to maintain proper exploration behaviour. :param action_space: the environment's action space. :param observation_space: the environment's observation space. :param action_scaling: flag indicating whether, for continuous action spaces, actions should be scaled from the standard neural network output range [-1, 1] to the environment's action space range [action_space.low, action_space.high]. This applies to continuous action spaces only (gym.spaces.Box) and has no effect for discrete spaces. When enabled, policy outputs are expected to be in the normalized range [-1, 1] (after bounding), and are then linearly transformed to the actual required range. This improves neural network training stability, allows the same algorithm to work across environments with different action ranges, and standardizes exploration strategies. Should be disabled if the actor model already produces outputs in the correct range. :param action_bound_method: the method used for bounding actions in continuous action spaces to the range [-1, 1] before scaling them to the environment's action space (provided that `action_scaling` is enabled). This applies to continuous action spaces only (`gym.spaces.Box`) and should be set to None for discrete spaces. When set to "clip", actions exceeding the [-1, 1] range are simply clipped to this range. When set to "tanh", a hyperbolic tangent function is applied, which smoothly constrains outputs to [-1, 1] while preserving gradients. The choice of bounding method affects both training dynamics and exploration behavior. Clipping provides hard boundaries but may create plateau regions in the gradient landscape, while tanh provides smoother transitions but can compress sensitivity near the boundaries. Should be set to None if the actor model inherently produces bounded outputs. Typically used together with `action_scaling=True`. """ super().__init__( action_space=action_space, observation_space=observation_space, action_scaling=action_scaling, action_bound_method=action_bound_method, ) if action_scaling: try: max_action = float(actor.max_action) if np.isclose(max_action, 1.0): warnings.warn( "action_scaling and action_bound_method are only intended " "to deal with unbounded model action space, but found actor model " f"bound action space with max_action={actor.max_action}. " "Consider using unbounded=True option of the actor model, " "or set action_scaling to False and action_bound_method to None.", ) except BaseException: pass self.actor = actor self.dist_fn = dist_fn self._eps = 1e-8 self.deterministic_eval = deterministic_eval
[docs] def forward( self, batch: ObsBatchProtocol, state: dict | BatchProtocol | np.ndarray | None = None, ) -> DistBatchProtocol: """Compute action over the given batch data by applying the actor. Will sample from the dist_fn, if appropriate. Returns a new object representing the processed batch data (contrary to other methods that modify the input batch inplace). """ action_dist_input_BD, hidden_BH = self.actor(batch.obs, state=state, info=batch.info) # in the case that self.action_type == "discrete", the dist should always be Categorical, and D=A # therefore action_dist_input_BD is equivalent to logits_BA # If discrete, dist_fn will typically map loc, scale to a distribution (usually a Gaussian) # the action_dist_input_BD in that case is a tuple of loc_B, scale_B and needs to be unpacked dist = self.dist_fn(action_dist_input_BD) act_B = ( dist.mode if self.deterministic_eval and not self.is_within_training_step else dist.sample() ) # act is of dimension BA in continuous case and of dimension B in discrete result = Batch(logits=action_dist_input_BD, act=act_B, state=hidden_BH, dist=dist) return cast(DistBatchProtocol, result)
[docs] class DiscreteActorPolicy(ProbabilisticActorPolicy): def __init__( self, *, actor: AbstractDiscreteActor | ActionReprNet, dist_fn: TDistFnDiscrete = dist_fn_categorical_from_logits, deterministic_eval: bool = False, action_space: gym.Space, observation_space: gym.Space | None = None, ) -> None: """ :param actor: the actor network following the rules: (`s_B` -> `dist_input_BD`). :param dist_fn: the function/type which creates a distribution from the actor output, i.e. it maps the tensor(s) generated by the actor to a torch distribution. For discrete action spaces, the output is typically a tensor of unnormalized log probabilities ("logits" in PyTorch terminology) or a tensor of probabilities which serve as the parameters of a Categorical distribution. Note that if the actor uses softmax activation in its final layer, it will produce probabilities, whereas if it uses no activation, it can be considered as producing "logits". As a user, you are responsible for ensuring that the distribution is compatible with the output of the actor model and the action space. :param deterministic_eval: flag indicating whether the policy should use deterministic actions (using the mode of the action distribution) instead of stochastic ones (using random sampling) during evaluation. When enabled, the policy will always select the most probable action according to the learned distribution during evaluation phases, while still using stochastic sampling during training. This creates a clear distinction between exploration (training) and exploitation (evaluation) behaviors. Deterministic actions are generally preferred for final deployment and reproducible evaluation as they provide consistent behavior, reduce variance in performance metrics, and are more interpretable for human observers. Note that this parameter only affects behavior when the policy is not within a training step. When collecting rollouts for training, actions remain stochastic regardless of this setting to maintain proper exploration behaviour. :param action_space: the environment's (discrete) action space. :param observation_space: the environment's observation space. """ if not isinstance(action_space, gym.spaces.Discrete): raise ValueError(f"Action space must be an instance of Discrete; got {action_space}") super().__init__( actor=actor, dist_fn=dist_fn, deterministic_eval=deterministic_eval, action_space=action_space, observation_space=observation_space, action_scaling=False, action_bound_method=None, )
TActorPolicy = TypeVar("TActorPolicy", bound=ProbabilisticActorPolicy)
[docs] class DiscountedReturnComputation: def __init__( self, gamma: float = 0.99, return_standardization: bool = False, ): """ :param gamma: the discount factor in [0, 1] for future rewards. This determines how much future rewards are valued compared to immediate ones. Lower values (closer to 0) make the agent focus on immediate rewards, creating "myopic" behavior. Higher values (closer to 1) make the agent value long-term rewards more, potentially improving performance in tasks where delayed rewards are important but increasing training variance by incorporating more environmental stochasticity. Typically set between 0.9 and 0.99 for most reinforcement learning tasks :param return_standardization: whether to standardize episode returns by subtracting the running mean and dividing by the running standard deviation. Note that this is known to be detrimental to performance in many cases! """ assert 0.0 <= gamma <= 1.0, "discount factor gamma should be in [0, 1]" self.gamma = gamma self.return_standardization = return_standardization self.ret_rms = RunningMeanStd() self.eps = 1e-8
[docs] def add_discounted_returns( self, batch: RolloutBatchProtocol, buffer: ReplayBuffer, indices: np.ndarray ) -> BatchWithReturnsProtocol: r"""Compute the discounted returns (Monte Carlo estimates) for each transition. They are added to the batch under the field `returns`. Note: this function will modify the input batch! .. math:: G_t = \sum_{i=t}^T \gamma^{i-t}r_i where :math:`T` is the terminal time step, :math:`\gamma` is the discount factor, :math:`\gamma \in [0, 1]`. :param batch: a data batch which contains several episodes of data in sequential order. Mind that the end of each finished episode of batch should be marked by done flag, unfinished (or collecting) episodes will be recognized by buffer.unfinished_index(). :param buffer: the corresponding replay buffer. :param indices: tell batch's location in buffer, batch is equal to buffer[indices]. """ v_s_ = np.full(indices.shape, self.ret_rms.mean) # gae_lambda = 1.0 means we use Monte Carlo estimate unnormalized_returns, _ = Algorithm.compute_episodic_return( batch, buffer, indices, v_s_=v_s_, gamma=self.gamma, gae_lambda=1.0, ) if self.return_standardization: batch.returns = (unnormalized_returns - self.ret_rms.mean) / np.sqrt( self.ret_rms.var + self.eps, ) self.ret_rms.update(unnormalized_returns) else: batch.returns = unnormalized_returns return cast(BatchWithReturnsProtocol, batch)
[docs] class Reinforce(OnPolicyAlgorithm[ProbabilisticActorPolicy]): """Implementation of the REINFORCE (a.k.a. vanilla policy gradient) algorithm.""" def __init__( self, *, policy: ProbabilisticActorPolicy, gamma: float = 0.99, return_standardization: bool = False, optim: OptimizerFactory, ) -> None: """ :param policy: the policy :param optim: the optimizer factory for the policy's model. :param gamma: the discount factor in [0, 1] for future rewards. This determines how much future rewards are valued compared to immediate ones. Lower values (closer to 0) make the agent focus on immediate rewards, creating "myopic" behavior. Higher values (closer to 1) make the agent value long-term rewards more, potentially improving performance in tasks where delayed rewards are important but increasing training variance by incorporating more environmental stochasticity. Typically set between 0.9 and 0.99 for most reinforcement learning tasks :param return_standardization: if True, will scale/standardize returns by subtracting the running mean and dividing by the running standard deviation. Can be detrimental to performance! """ super().__init__( policy=policy, ) self.discounted_return_computation = DiscountedReturnComputation( gamma=gamma, return_standardization=return_standardization, ) self.optim = self._create_optimizer(self.policy, optim) def _preprocess_batch( self, batch: RolloutBatchProtocol, buffer: ReplayBuffer, indices: np.ndarray, ) -> BatchWithReturnsProtocol: return self.discounted_return_computation.add_discounted_returns( batch, buffer, indices, ) # Needs BatchWithReturnsProtocol, which violates the substitution principle. But not a problem since it's a private method and # the remainder of the class was adjusted to provide the correct batch def _update_with_batch( # type: ignore[override] self, batch: BatchWithReturnsProtocol, batch_size: int | None, repeat: int, ) -> LossSequenceTrainingStats: losses = [] split_batch_size = batch_size or -1 for _ in range(repeat): for minibatch in batch.split(split_batch_size, merge_last=True): result = self.policy(minibatch) dist = result.dist act = to_torch_as(minibatch.act, result.act) ret = to_torch(minibatch.returns, torch.float, result.act.device) log_prob = dist.log_prob(act).reshape(len(ret), -1).transpose(0, 1) loss = -(log_prob * ret).mean() self.optim.step(loss) losses.append(loss.item()) return LossSequenceTrainingStats(loss=SequenceSummaryStats.from_sequence(losses))