Source code for tianshou.data.buffer.cached
import numpy as np
from tianshou.data import ReplayBuffer, ReplayBufferManager
from tianshou.data.types import RolloutBatchProtocol
[docs]
class CachedReplayBuffer(ReplayBufferManager):
"""CachedReplayBuffer contains a given main buffer and n cached buffers, ``cached_buffer_num * ReplayBuffer(size=max_episode_length)``.
The memory layout is: ``| main_buffer | cached_buffers[0] | cached_buffers[1] | ...
| cached_buffers[cached_buffer_num - 1] |``.
The data is first stored in cached buffers. When an episode is terminated, the data
will move to the main buffer and the corresponding cached buffer will be reset.
:param main_buffer: the main buffer whose ``.update()`` function
behaves normally.
:param cached_buffer_num: number of ReplayBuffer needs to be created for cached
buffer.
:param max_episode_length: the maximum length of one episode, used in each
cached buffer's maxsize.
.. seealso::
Please refer to :class:`~tianshou.data.ReplayBuffer` for other APIs' usage.
"""
def __init__(
self,
main_buffer: ReplayBuffer,
cached_buffer_num: int,
max_episode_length: int,
) -> None:
assert cached_buffer_num > 0
assert max_episode_length > 0
assert isinstance(main_buffer, ReplayBuffer)
kwargs = main_buffer.options
buffers = [main_buffer] + [
ReplayBuffer(max_episode_length, **kwargs) for _ in range(cached_buffer_num)
]
super().__init__(buffer_list=buffers)
self.main_buffer = self.buffers[0]
self.cached_buffers = self.buffers[1:]
self.cached_buffer_num = cached_buffer_num
[docs]
def add(
self,
batch: RolloutBatchProtocol,
buffer_ids: np.ndarray | list[int] | None = None,
) -> tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
"""Add a batch of data into CachedReplayBuffer.
Each of the data's length (first dimension) must equal to the length of
buffer_ids. By default the buffer_ids is [0, 1, ..., cached_buffer_num - 1].
Return (current_index, episode_reward, episode_length, episode_start_index)
with each of the shape (len(buffer_ids), ...), where (current_index[i],
episode_reward[i], episode_length[i], episode_start_index[i]) refers to the
cached_buffer_ids[i]th cached buffer's corresponding episode result.
"""
if buffer_ids is None:
cached_buffer_ids = np.arange(1, 1 + self.cached_buffer_num)
else: # make sure it is np.ndarray, +1 means it's never the main buffer
cached_buffer_ids = np.asarray(buffer_ids) + 1
insertion_idx, ep_return, ep_len, ep_start_idx = super().add(
batch,
buffer_ids=cached_buffer_ids,
)
# find the terminated episode, move data from cached buf to main buf
updated_insertion_idx, updated_ep_start_idx = [], []
done = np.logical_or(batch.terminated, batch.truncated)
for buffer_idx in cached_buffer_ids[done]:
index = self.main_buffer.update(self.buffers[buffer_idx])
if len(index) == 0: # unsuccessful move, replace with -1
index = [-1]
updated_ep_start_idx.append(index[0])
updated_insertion_idx.append(index[-1])
self.buffers[buffer_idx].reset()
self._lengths[0] = len(self.main_buffer)
self._lengths[buffer_idx] = 0
self.last_index[0] = index[-1]
self.last_index[buffer_idx] = self._offset[buffer_idx]
insertion_idx[done] = updated_insertion_idx
ep_start_idx[done] = updated_ep_start_idx
return insertion_idx, ep_return, ep_len, ep_start_idx