目录
1 加载预训练模型的分词器
from transformers import AutoTokenizer
tokenizer = AutoTokenizer.from_pretrained('../data/model/opus-mt-en-ro/', use_fast=True)
print(tokenizer)
#假设文本,查看分词器的输出结果
text = [['hello, everyone today is a good day', 'It is late, please go home']]
tokenizer.batch_encode_plus(text)
2 加载本地数据集
from datasets import load_dataset
dataset = load_dataset('../data/datasets/wmt16-ro-en/')
dataset
#数据采样,数据量太多的, 需要随机抽取一些
dataset['train'] = dataset['train'].shuffle(1).select(range(20000))
dataset['validation'] = dataset['validation'].shuffle(1).select(range(200))
dataset['test'] = dataset['test'].shuffle(1).select(range(200))
3 数据预处理
#查看训练数据的第一条数据
dataset['train'][0]
def preprocess_function(data, tokenizer):
"""定义数据预处理的函数"""
#分别获取'en'与'ro'对应的文本句子
en = [ex['en'] for ex in data['translation']]
ro = [ex['ro'] for ex in data['translation']]
#对‘en’文本进行编码分词
data = tokenizer.batch_encode_plus(en, max_length=128, truncation=True)
#对'ro'文本进行编码分词,并将结果的'input_ids'作为labels
with tokenizer.as_target_tokenizer():
data['labels'] = tokenizer.batch_encode_plus(
ro, max_length=128, truncation=True)['input_ids']
return data
#用map函数将定义的预处理函数加载进来
dataset = dataset.map(preprocess_function,
batched=True,
batch_size=1000,
num_proc=1,
remove_columns=['translation'],
fn_kwargs={'tokenizer' : tokenizer})
#查看训练数据的第一条数据
print(dataset['train'][0])
#数据批次处理函数:将数据一批批进行输出
def collate_fn(data):
# 求最长的label
max_length=max([len(i['labels']) for i in data])
for i in data:
#获取每一句需要补充的pad数量,赋值为100,
pads = [-100] * (max_length - len(i['labels']))
#每一句都加上需要补的pad
i['labels'] = i['labels'] + pads
#会自动将数据集中的所有类型的数据都按照最大序列长度进行补全pad
data = tokenizer.pad(
encoded_inputs=data,
padding=True,
max_length=None,
pad_to_multiple_of=None, #数据位数补齐到指定的倍数上(否)
return_tensors='pt'
)
#序列数据也有编码器的输入数据 decoder_input_ids
#字典添加数据的方式
data['decoder_input_ids'] = torch.full_like(data['labels'],
tokenizer.get_vocab()['pad'],
dtype=torch.long)
#第一个token是cls,不需要传入校验预测值,就从索引为1的开始
data['decoder_input_ids'][:, 1:] = data['labels'][:, :-1]
data['decoder_input_ids'][data['decoder_input_ids'] == -100] = tokenizer.get_vocab()['<pad>']
return data
tokenizer.get_vocab()['pad'], tokenizer.get_vocab()['<pad>']
4 创建数据加载器
import torch
loader = torch.utils.data.DataLoader(dataset=dataset['train'],
batch_size=8,
collate_fn=collate_fn,
shuffle=True,
drop_last=True)
for data in loader:
break
data
for k, v in data.items():
print(k, v.shape)
5 定义下游任务的模型
#翻译任务是标准的seq2seq的任务, LM是使用的模型的Linear Model那一层, MarianModel是专门用于翻译的模型
from transformers import AutoModelForSeq2SeqLM, MarianModel
class Model(torch.nn.Module):
def __init__(self):
super().__init__()
#加载的预训练模型与加载的预训练分词器保持一致
self.pretrained = MarianModel.from_pretrained('../data/model/opus-mt-en-ro/')
#创建一个变量 最后一层有vocab_size个类别概率输出,对应的也会有vocab_size个bias参数
self.register_buffer('final_logits_bias', torch.zeros(1, tokenizer.vocab_size))
#输出层:全连接层
self.fc = torch.nn.Linear(512, tokenizer.vocab_size, bias=False)
#加载预训练权重参数
parameters = AutoModelForSeq2SeqLM.from_pretrained('../data/model/opus-mt-en-ro/')
self.fc.load_state_dict(parameters.lm_head.state_dict())
#创建损失函数 ,创建在训练代码里面也可以!!!
self.criterion = torch.nn.CrossEntropyLoss()
def forward(self, input_ids, attention_mask, labels, decoder_input_ids):
logits = self.pretrained(input_ids=input_ids,
attention_mask= attention_mask,
decoder_input_ids=decoder_input_ids)
#获取最后一层的hidden_state
logits = logits.last_hidden_state
#输入全连接层中(输出层), 加上一个自己创建的偏差self.final_logits_bias
logits = self.fc(logits) + self.final_logits_bias
#计算损失
#flatten()在这里相当于reshape, 将logits三维变二维, labels二维变一维
loss = self.criterion(logits.flatten(end_dim=1), labels.flatten())
return {'loss': loss, 'logits': logits}
model = Model() #创建模型时,会重新加载预训练模型,会占用C盘内存
#查看一下模型的参数量
print(sum(p.numel() for p in model.parameters()))
#试跑一下
outs = model(**data)
outs['loss'], outs['logits'].shape
6 测试代码
data['input_ids'][0]
def test(model):
#创建测试时输入的数据加载器
loader_test = torch.utils.data.DataLoader(dataset=dataset['test'],
batch_size=8,
collate_fn=collate_fn,
shuffle=True,
drop_last=True)
predictions = [] #用于存放预测的结果
references = [] #用于存放真实的结果
for i, data in enumerate(loader_test):
#只需要下游任务的前向传播,不需要反向传播
with torch.no_grad():
#outs是下游任务模型的输出结果,是一个元祖,包含loss和logits
outs = model(**data)
#预测结果 outs['logits']是一个三维数组 (batch_size , 序列长度lens, 类别数量vocab_size)
pred = tokenizer.batch_decode(outs['logits'].argmax(dim=2))
#真实值
label = tokenizer.batch_decode(data['decoder_input_ids'])
#将预测值与真实值分别放于列表中
predictions.append(pred)
references.append(label)
if i % 2 == 0:
print(i)
input_ids = tokenizer.decode(data['input_ids'][0])
print('input_ids=', input_ids)
print('pred=', pred[0])
print('label=', label[0])
if i == 10:
break
#for遍历组成一个列表
references = [[j] for j in references]
test(model)
7 训练代码
from transformers import AdamW
from transformers.optimization import get_scheduler
#设置设备
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
device
#训练代码
def train(model):
#设置优化器与学习率衰减计划
optimizer = AdamW(model.parameters(), lr=2e-5)
scheduler = get_scheduler(name='linear',
num_warmup_steps=0, #没有预热缓冲区,从一开始学习率就开始梯度下降
num_training_steps=len(loader),
optimizer=optimizer)
#模型发到设备上, 调到训练模式
model.to(device)
model.train()
for i,data in enumerate(loader):
#不一个个变量接收数据,并将变量发送到设备上的 简便写法
for k in data.keys():
#将data传到设备上
data[k] = data[k].to(device)
#将data里的数据传到下游任务模型中,获取输出结果outs(一个字典)
outs = model(**data)
#从输出结果中获取损失
loss = outs['loss']
#进行反向传播数据的必须是标量(就是一个数值)
#反向传播
loss.backward()
#为了梯度下降的稳定性,进行梯度裁剪
torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
#梯度更新
optimizer.step()
scheduler.step()
#梯度清零
optimizer.zero_grad()
model.zero_grad()
if i % 50 == 0:
#聚合类别概率
outs = outs['logits'].argmax(dim=2)
#准确的数量
correct = (data['decoder_input_ids'] == outs).sum().item()
#数据总量
total = data['decoder_input_ids'].shape[1] * 8 #batch_size=8
#计算准确率
accuracy = correct / total
predictions = []
references = []
for j in range(8):
pred = tokenizer.decode(outs[j])
label = tokenizer.decode(data['decoder_input_ids'][j])
predictions.append(pred)
references.append(label)
#取出lr,方式基本是固定的
lr = optimizer.state_dict()['param_groups'][0]['lr']
print(i, loss.item(), accuracy, lr)
train(model)
#翻译问题看准确率没有意义,需要看损失是否在减小,还有预测的最终结果是否正确
8.保存与加载训练好的模型
#保存训练好的模型
torch.save(model, '../data/model/翻译.model')
#加载保存的模型
model_2 = torch.load('../data/model/翻译.model', map_location='cpu')
test(model_2)