参考书目《联邦学习实战》 杨强
在阅读本书的过程中,我尝试根据书中的代码,自己实现横向联邦学习中的图像分类任务,这里是我对代码和逻辑的理解还有出现的问题,希望对大家的学习有所帮助。
下面的表格是一些实验基本信息:
配置信息 | 解释 |
---|---|
数据集 | Cifar10(其将样本划分后给每个客户端作为本地数据) |
全局迭代次数 | 服务器和客户端的通信次数 |
本地模型迭代次数 | 每一次客户端训练的轮数,各个客户端可以相同,也可以不同 |
一些其它基础的模型配置信息在json文件中给出:
{
"model_name" : "resnet18",
"no_models" : 10,
"type" : "cifar",
"global_epochs" : 20,
"local_epochs": 3,
"k" : 6,
"batch_size" : 32,
"lr" : 0.001,
"momentum" : 0.0001,
"lambda" : 0.1
}
获取训练数据集函数dataset.py:
import torchvision.datasets as dataset
import torchvision.transforms as transform
def get_dataset(dir, name):
if name == 'mnist':
# 获取训练集和测试集
train_dataset = dataset.MNIST(dir, train=True, download=True, transform=transform.ToTensor()) # 设置下载数据集并转
# 换为torch识别的tensor数据类型
eval_dataset = dataset.MNIST(dir, train=False, transform=transform.ToTensor()) # 测试集
elif name == 'cifar':
transform_train = transform.Compose([ # 数据增强操作,训练集的预处理
transform.RandomCrop(32, padding=4), # 随机剪裁,大小为32*32,添加4个像素的填充内容
transform.RandomHorizontalFlip(), # 随机垂直方向的翻转
transform.ToTensor(),
transform.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)) # 归一化操作,数值是抽样得到的,无需考虑太
# 多,分别是均值和标准差
])
transform_test = transform.Compose([ # 对测试集进行预处理
transform.ToTensor(),
transform.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010))
])
train_dataset = dataset.CIFAR10(dir, train=True, download=True, transform=transform_train) # 获得训练集
eval_dataset = dataset.CIFAR10(dir, train=False, transform=transform_test) # 获得测试集
return train_dataset, eval_dataset
这是一个简单的测试,采用本地模拟的方式进行客户端和服务器的交互,现在定义一个服务端类Server,其中的聚合函数采用的是FedAvg算法更新全局模型,公式如下:
G
t
+
1
=
G
t
+
λ
∑
i
=
1
m
(
L
i
t
+
1
−
G
i
t
)
G_{t+1}=G_t+\lambda\sum_{i=1}^m(L_i^{t+1}-G_i^t)
Gt+1=Gt+λi=1∑m(Lit+1−Git)
其中
G
t
G_t
Gt表示第t轮聚合后的全局模型,
L
i
t
+
1
L_i^{t+1}
Lit+1表示第i个客户端在第t+1轮时本地更新后的模型,
G
t
+
1
G_{t+1}
Gt+1表示第t+1轮聚合后的全局模型:
import torch
import torch.utils.data
from torchvision import models
class Server(object):
def __init__(self, conf, eval_dataset): # 构造函数
self.conf = conf
self.global_model = models.get_model(self.conf["model_name"]) # 从配置文件获得模型名称并创建服务器模型
self.eval_loader = torch.utils.data.DataLoader(eval_dataset, batch_size=self.conf["batch_size"],
shuffle=True) # 创建测试集加载器用于测试最终的聚合模型
def model_aggregrate(self, weight_accumulator): # 聚合函数,更新全局模型用
# 其中weight_accumulator存放客户端上传参数的变化值, 即更新前全局模型和本地更新后模型的参数变化L_t+1-G_t
for name, data in self.global_model.state_dict().items(): # 获得全局模型的变量名和参数值
update_per_layer = weight_accumulator[name] * self.conf["lambda"] # 乘以系数λ
if data.type() != update_per_layer.type(): # 如果数据类型不符
data.add_(update_per_layer.to(torch.int64)) # 进行数据转换后再累加
else:
data.add_(update_per_layer) # 直接进行累加
def model_eval(self): # 训练结束后,对全局模型进行评估的函数
self.global_model.eval() # 标记进入测试模式,模型参数不发生变化
total_loss = 0.0 # 记录损失和,计算平均损失用
correct = 0 # 记录正确数目
dataset_size = 0 # 测试数据总数
for batch_id, batch in enumerate(self.eval_loader): # 对测试数据进行编号和按batch提取数据
data, target = batch # 解包数据和标签
dataset_size += data.size()[0] # 获得当前batch的数据量,进行累加
if torch.cuda.is_available():
data, target = data.cuda(), target.cuda() # 如果pytorch支持GPU,则使用cuda计算
output = self.global_model(data) # 获得预测结果
total_loss += torch.nn.functional.cross_entropy(output, target, reduction='sum').item() # 把计算的损失进行累加
pred = output.data.max(1)[1] # 取预测值最大的类索引
correct += pred.eq(target.data.view_as(pred)).cpu().sum().item() # 获得该batch中预测正确的数目
acc = 100.0 * (float(correct) / float(dataset_size)) # 得到准确率的百分值
total_l = total_loss / dataset_size # 计算平均损失值
return acc, total_l
客户端进行本地的训练,注意数据集采用的是整个数据集中的一部分,最后需要计算出本地模型与之前全局模型的差值用于传输给服务器更新:
import torch
class Client(object):
def __init__(self, conf, model, train_dataset, id=1):
self.conf = conf # 配置信息
self.local_model = model # 本地模型
self.client_id = id # 客户端id
self.train_dataset = train_dataset # 训练集
all_range = list(range(len(self.train_dataset))) # 获得整个未分割训练数据集的下标
data_len = int(len(self.train_dataset) / self.conf['no_models']) # 计算本地数据集长度
indices = all_range[id * data_len : (id + 1) * data_len] # 切分该客户端对应的数据部分
self.train_loader = torch.utils.data.DataLoader(self.train_dataset, batch_size=conf['batch_size'],
sampler=torch.utils.data.sampler.SubsetRandomSampler(indices))
# 训练数据加载器
def local_train(self, model):
for name, param in model.state_dict().items(): # 获得全局模型参数
self.local_model.state_dict().items()[name].copy_(param.clone()) # 复制全局参数到本地
# 定义优化器
optimizer = torch.optim.SGD(self.local_model.parameters(), lr=self.conf['lr'], momentum=self.conf['momentum'])
self.local_model.train() # 标记为训练模式,参数可以改变
for e in range(self.conf['local_epochs']): # 本地轮数
for batch_id, bach in enumerate(self.train_loader): # 按batch加载训练数据
data, target = bach # 获得本batch数据和标签
if torch.cuda.is_available(): # 如果GPU可用
data, target = data.cuda(), target.cuda() # 放在GPU计算
optimizer.zero_grad() # 优化器置零
output = self.local_model(data) # 获得预测结果
loss = torch.nn.functional.cross_entropy(output, target) # 获得预测损失
loss.backward() # 进行反向传播
optimizer.step()
print('本地模型{}完成第{}轮训练'.format(self.client_id, e)) # 打印目前训练进度
diff = dict() # 计算参数差异的容器
for name, data in self.local_model.state_dict().items(): # 计算差异
diff[name] = (data - model.state_dict()[name])
return diff
最后是主函数,其中我将准确率信息和loss记录在excel中用于绘图:
import json
import random
import time
import torch
import dataset
from server import Server
from client import Client
import pandas as pd
import matplotlib.pyplot as plt
if __name__ == '__main__':
# 存储容器,用于绘制图像
accs = [] # 存放准确率
# 载入配置文件
with open('config.json', 'r') as f:
conf = json.load(f)
train_datasets, eval_datasets = dataset.get_dataset('./data/', conf['type']) # 获取训练数据和测试数据
server = Server(conf, eval_datasets) # 创建服务器
clients = [] # 客户端列表
for c in range(conf['no_models']): # 创建客户端
clients.append(Client(conf, server.global_model, train_datasets, c))
for e in range(conf['global_epochs']): # 进行全局轮数
candidates = random.sample(clients, conf['k']) # 随机选取k个客户端
weight_accumulator = {} # 创建计算好的参数字典,其值为本地模型计算的变化量之和
for name, params in server.global_model.state_dict().items():
weight_accumulator[name] = torch.zeros_like(params) # 初始化上面的参数字典,大小和全局模型相同
for c in candidates: # 逐一获取本地模型更新的变化量并进行累加
diff = c.local_train(server.global_model) # 进行本地训练并计算差值字典
for name, params in server.global_model.state_dict().items():
weight_accumulator[name].add_(diff[name])
server.model_aggregrate(weight_accumulator) # 模型聚合
acc, loss = server.model_eval() # 进行全局模型测试
print('全局模型:第{}轮完成!准确率:{:.2f} loss: {:.2f}'.format(e, acc, loss))
# 将准确率信息存储在txt文件中用于绘图
df = pd.DataFrame([acc, loss]) # 计入表格
df.to_excel("data_{}.xlsx".format(int(time.time()))) # 存入文件并加上时间戳