相关前文:
面向分布式强化学习的经验回放框架——Reverb: A Framework for Experience Replay
论文题目:
Reverb: A Framework for Experience Replay
地址:
https://arxiv.org/pdf/2102.04736.pdf
框架代码地址:
https://github.com/deepmind/reverb
环境安装:
pip install dm-reverb[tensorflow]
============================================
Example 1: Overlapping Trajectories
Inserting Overlapping Trajectories
import reverb
import tensorflow as tf
OBSERVATION_SPEC = tf.TensorSpec([10, 10], tf.uint8)
ACTION_SPEC = tf.TensorSpec([2], tf.float32)
def agent_step(unused_timestep) -> tf.Tensor:
return tf.cast(tf.random.uniform(ACTION_SPEC.shape) > .5,
ACTION_SPEC.dtype)
def environment_step(unused_action) -> tf.Tensor:
return tf.cast(tf.random.uniform(OBSERVATION_SPEC.shape, maxval=256),
OBSERVATION_SPEC.dtype)
# Initialize the reverb server.
simple_server = reverb.Server(
tables=[
reverb.Table(
name='my_table',
sampler=reverb.selectors.Prioritized(priority_exponent=0.8),
remover=reverb.selectors.Fifo(),
max_size=int(1e6),
# Sets Rate Limiter to a low number for the examples.
# Read the Rate Limiters section for usage info.
rate_limiter=reverb.rate_limiters.MinSize(2),
# The signature is optional but it is good practice to set it as it
# enables data validation and easier dataset construction. Note that
# we prefix all shapes with a 3 as the trajectories we'll be writing
# consist of 3 timesteps.
signature={
'actions':
tf.TensorSpec([3, *ACTION_SPEC.shape], ACTION_SPEC.dtype),
'observations':
tf.TensorSpec([3, *OBSERVATION_SPEC.shape],
OBSERVATION_SPEC.dtype),
},
)
],
# Sets the port to None to make the server pick one automatically.
# This can be omitted as it's the default.
port=9999)
# Initializes the reverb client on the same port as the server.
client = reverb.Client(f'localhost:{simple_server.port}')
# Dynamically adds trajectories of length 3 to 'my_table' using a client writer.
with client.trajectory_writer(num_keep_alive_refs=3) as writer:
timestep = environment_step(None)
for step in range(4):
action = agent_step(timestep)
writer.append({'action': action, 'observation': timestep})
timestep = environment_step(action)
if step >= 2:
# In this example, the item consists of the 3 most recent timesteps that
# were added to the writer and has a priority of 1.5.
writer.create_item(
table='my_table',
priority=1.5,
trajectory={
'actions': writer.history['action'][-3:],
'observations': writer.history['observation'][-3:],
}
)
server端和client端可以不在同一台主机上,这个例子是server和client在同一主机上。上面例子预设server端的端口为9999。其中server端主要功能为维持经验池中数据,client端可以sample,也可以insert,上面例子中client只进行了insert操作。
server端负责数据的sample和insert操作的定义,虽然客户端调用sample操作或insert操作,但是最后的具体执行还是在server端,毕竟数据是由server端所维护的。
关于语句:
with client.trajectory_writer(num_keep_alive_refs=3) as writer:
个人的理解是,client中的数据如果需要进行insert操作,那么需要先申请一段缓存空间的,其中缓存空间的大小定义就是上面的参数num_keep_alive_refs,而writer.append操作是将数据写入到client端的缓存中,也就是num_keep_alive_refs所定义大小的缓存空间中,writer.create_item则是执行将加入到缓存空间中的数据insert到服务端的操作。这就需要保证writer.create_item的时候数据是需要保持在缓存中的,也就是说num_keep_alive_refs需要足够大,不然缓存空间中没有对应的数据而此时执行writer.create_item则是会报错的,当然我们也可以直接将num_keep_alive_refs设置为一个足够大的数,但是这样就会造成client端内存的浪费。
num_keep_alive_refs所定义大小的client端缓存空间中数据会由于writer.append操作造成旧数据移除,比如上面例子中如果设置语句:
with client.trajectory_writer(num_keep_alive_refs=2) as writer:
就会报错,但是设置语句:
with client.trajectory_writer(num_keep_alive_refs=4) as writer:
就不会报错。
Sampling Overlapping Trajectories in TensorFlow
在同一主机上执行server端代码,如下:
import reverb
import tensorflow as tf
OBSERVATION_SPEC = tf.TensorSpec([10, 10], tf.uint8)
ACTION_SPEC = tf.TensorSpec([2], tf.float32)
def agent_step(unused_timestep) -> tf.Tensor:
return tf.cast(tf.random.uniform(ACTION_SPEC.shape) > .5,
ACTION_SPEC.dtype)
def environment_step(unused_action) -> tf.Tensor:
return tf.cast(tf.random.uniform(OBSERVATION_SPEC.shape, maxval=256),
OBSERVATION_SPEC.dtype)
# Initialize the reverb server.
simple_server = reverb.Server(
tables=[
reverb.Table(
name='my_table',
sampler=reverb.selectors.Prioritized(priority_exponent=0.8),
remover=reverb.selectors.Fifo(),
max_size=int(1e6),
# Sets Rate Limiter to a low number for the examples.
# Read the Rate Limiters section for usage info.
rate_limiter=reverb.rate_limiters.MinSize(2),
# The signature is optional but it is good practice to set it as it
# enables data validation and easier dataset construction. Note that
# we prefix all shapes with a 3 as the trajectories we'll be writing
# consist of 3 timesteps.
signature={
'actions':
tf.TensorSpec([3, *ACTION_SPEC.shape], ACTION_SPEC.dtype),
'observations':
tf.TensorSpec([3, *OBSERVATION_SPEC.shape],
OBSERVATION_SPEC.dtype),
},
)
],
# Sets the port to None to make the server pick one automatically.
# This can be omitted as it's the default.
port=9999)
# Initializes the reverb client on the same port as the server.
client = reverb.Client(f'localhost:{simple_server.port}')
# Dynamically adds trajectories of length 3 to 'my_table' using a client writer.
with client.trajectory_writer(num_keep_alive_refs=3) as writer:
timestep = environment_step(None)
for step in range(4):
action = agent_step(timestep)
writer.append({'action': action, 'observation': timestep})
timestep = environment_step(action)
if step >= 2:
# In this example, the item consists of the 3 most recent timesteps that
# were added to the writer and has a priority of 1.5.
writer.create_item(
table='my_table',
priority=1.5,
trajectory={
'actions': writer.history['action'][-3:],
'observations': writer.history['observation'][-3:],
}
)
import time
time.sleep(3333333)
View Code
并同时执行客户端代码:
import reverb
# Dataset samples sequences of length 3 and streams the timesteps one by one.
# This allows streaming large sequences that do not necessarily fit in memory.
dataset = reverb.TrajectoryDataset.from_table_signature(
server_address=f'localhost:9999',
table='my_table',
max_in_flight_samples_per_worker=10)
# Batches 2 sequences together.
# Shapes of items is now [2, 3, 10, 10].
batched_dataset = dataset.batch(2)
for sample in batched_dataset.take(2):
# Results in the following format.
print(sample.info.key) # ([2], uint64)
print(sample.info.probability) # ([2], float64)
print(sample.data['observations']) # ([2, 3, 10, 10], uint8)
print(sample.data['actions']) # ([2, 3, 2], float32)
其中,dataset.batch(2)语句定义每次sample时batch_size的大小,这条语句含义为定义大小。
语句:for sample in batched_dataset.take(2):是设置返回的迭代器可以迭代的此数,也就是说可以迭代返回的batch的个数,这里我们设置可以返回的batch个数为2,那么for循环就可以循环两次。
===================================
其他相关代码见地址:
https://github.com/deepmind/reverb/blob/master/examples/demo.ipynb
https://github.com/deepmind/reverb/blob/master/examples/frame_stacking.ipynb
===================================