import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
import tensorflow as tf
%load_ext tensorboard
import numpy as np
path='./datasets/mnist.npz'
with np.load(path) as data:
train_examples = data['x_train']
train_labels = data['y_train']
test_examples = data['x_test']
test_labels = data['y_test']
train_dataset1 = tf.data.Dataset.from_tensor_slices((train_examples, train_labels))
test_dataset = tf.data.Dataset.from_tensor_slices((test_examples, test_labels))
for i,j in train_dataset1.take(1):
print(i.shape)
strategy = tf.distribute.MirroredStrategy()
print('Number of devices: {}'.format(strategy.num_replicas_in_sync))
# 关于“REPLICA”这个单词,它在这里指的是一个可以独立执行计算任务的设备或进程。
# 在分布式计算中,多个副本(replicas)协同工作以加速计算过程。每个副本都可以处理数
# 据的一个子集,并在处理完成后与其他副本同步结果。通过增加副本的数量,
# 可以并行处理更多的数据,从而加快训练或推理的速度。
BUFFER_SIZE = 10000
BATCH_SIZE_PER_REPLICA = 64
BATCH_SIZE = BATCH_SIZE_PER_REPLICA * strategy.num_replicas_in_sync
# 定义一个函数,将图像像素值从 [0, 255] 范围归一化到 [0, 1] 范围(特征缩放)
def scale(image, label):
image = tf.cast(image, tf.float32)
image /= 255
return image, label
#训练数据要做刷新,打乱顺序,测试数据只是进行特征缩放,形成批次
train_dataset = train_dataset1.map(scale).cache().shuffle(BUFFER_SIZE).batch(BATCH_SIZE)
eval_dataset =test_dataset.map(scale).batch(BATCH_SIZE)
for i,j in eval_dataset.take(1):
print(i.shape)
import keras
# 在 Strategy.scope 的上下文中,使用 Keras API 创建和编译模型
with strategy.scope():
model = tf.keras.Sequential([
tf.keras.layers.Input(shape=(28,28,1)),#(28,28,1)
tf.keras.layers.Conv2D(32, 3, activation='relu'),#卷积层(26,26,32)
tf.keras.layers.MaxPooling2D(),#最大池化(13,13,32)
tf.keras.layers.Flatten(),# 扁平化(13*13*32),把上面32层排序成行(列)向量
tf.keras.layers.Dense(64, activation='relu'),# 线性层(64),每个单元接收(13*13*32个w_i+b)
tf.keras.layers.Dense(10)# 输出层(10)
])
#损失函数:经过对标签one_hot处理的多元交叉熵,优化器:Adam,指标:acc准确率
model.compile(loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
optimizer=tf.keras.optimizers.Adam(),
metrics=['acc'])
model.summary()
import os
checkpoint_dir = './checkpoints/keras_fbs/training_checkpoints'
if not os.path.exists(checkpoint_dir):
os.makedirs(checkpoint_dir)
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt_{epoch}.weights.h5")
# 定义调节学习率的函数
def decay(epoch):
if epoch < 3:
return 1e-3
elif epoch >= 3 and epoch < 7:
return 1e-4
else:
return 1e-5
# 定义在每个轮次后打印学习率的回调函数
class PrintLR(tf.keras.callbacks.Callback):
def on_epoch_end(self, epoch, logs=None):
print('\n第{}个轮次的学习率是{}'.format(epoch + 1, model.optimizer.learning_rate.numpy()))
#把所有回调组成一个回调列表
callbacks = [
tf.keras.callbacks.TensorBoard(log_dir='./logs/keras_fbs'),#tensorboard回调
tf.keras.callbacks.ModelCheckpoint(filepath=checkpoint_prefix,#检查点回调
save_weights_only=True),
tf.keras.callbacks.LearningRateScheduler(decay),#学习率回调
PrintLR()
]
EPOCHS = 12
model.fit(train_dataset, epochs=EPOCHS, callbacks=callbacks)
checkpoints=os.listdir(checkpoint_dir)
# 根据序号对检查点进行排序
sorted_checkpoints = sorted(checkpoints, key=lambda x: int(x.split('.')[0].split('_')[1]))
# 获取最新的检查点文件
latest_checkpoint = sorted_checkpoints[-1]
# 加载最新检查点的权重
model.load_weights(checkpoint_dir+'/'+latest_checkpoint)
eval_loss, eval_acc = model.evaluate(eval_dataset)
print('Eval loss: {}, Eval accuracy: {}'.format(eval_loss, eval_acc))
%tensorboard --logdir=logs/keras_fbs
os.listdir('./logs/keras_fbs/')
#使用 Model.save 将模型保存到一个 .keras 压缩归档中。保存后,您可以使用或不使用
# Strategy.scope 加载模型
path = 'my_model.keras'
model.save(path)
unreplicated_model = tf.keras.models.load_model(path)
eval_loss, eval_acc = unreplicated_model.evaluate(eval_dataset)
print('Eval loss: {}, Eval Accuracy: {}'.format(eval_loss, eval_acc))
在你的代码中,即使你调用了 compile 方法,只要模型之前已经被编译过并且权重被正确加载,
# 它通常不会影响预测或评估的结果。然而,如果你在加载模型后改变了损失函数、
# 优化器或评估指标,那么重新编译是必要的,以便这些更改能够生效。
with strategy.scope():
replicated_model = tf.keras.models.load_model(path)
eval_loss, eval_acc = replicated_model.evaluate(eval_dataset)
print ('Eval loss: {}, Eval Accuracy: {}'.format(eval_loss, eval_acc))