目录
1 加载预训练模型对应的分词器
from transformers import AutoTokenizer
#use_fast=True 表示使用RUST语言写的分词器,速度比python写的快
tokenizer = AutoTokenizer.from_pretrained('../data/model/distilbert-base-uncased/', use_fast=True)
tokenizer
#编码试算
tokenizer.batch_encode_plus(['hello, everyone, today is a good day',
'how are you, fine thank you, and you?'])
#编码返回的是'input_ids' 和 'attention_mask'
2 加载数据集
from datasets import load_dataset
dataset = load_dataset('../data/datasets/cola/', trust_remote_code=True)
dataset
dataset['train'][0]
3 数据预处理
def f(examples, tokenizer):
"""只对传输数据集的句子文本'text'进行编码分词"""
return tokenizer.batch_encode_plus(examples['text'], truncation=True)
dataset = dataset.map(f,
batched=True,
batch_size=1000, #一批有1000个数据
#num_proc=1 更快 , 数据量不多的时候, 创建进程也是需要时间开销
num_proc=1, #8个进程同时处理,cpu是8核
remove_columns=['text'], #原数据集中的['text']不要了,转化成['input_ids']
fn_kwargs={'tokenizer': tokenizer})
print(dataset['train'][0])
4 构建数据加载器DataLoader
#一批数据传输时,每句话的长度必须相同, 否则无法参与矩阵运算
import torch
#DataCollatorWithPadding 读取数据时,自动补全padding,使句子长度相同
from transformers.data.data_collator import DataCollatorWithPadding
loader = torch.utils.data.DataLoader(
dataset=dataset['train'],
batch_size=8,
#实例化一个匿名的collate_fn ,使数据一批批传输,并自动补全padding,使句子长度相同
collate_fn=DataCollatorWithPadding(tokenizer),
shuffle=True,
drop_last=True)
for data in loader:
break #for循环赋值, 不输出
#data包含'input_ids'和 'attention_mask' 两部分
data
len(loader)
5 定义下游任务模型
from transformers import AutoModelForSequenceClassification, DistilBertModel
#查看模型参数与层结构
model_pretrained_parameters = AutoModelForSequenceClassification.from_pretrained('../data/model/distilbert-base-uncased/', num_labels=2)
model_pretrained_parameters
class Model(torch.nn.Module):
def __init__(self):
super().__init__() #继承父类的方法
self.model_pretrained = DistilBertModel.from_pretrained('../data/model/distilbert-base-uncased/')
#全连接层
#Bert模型输出的数据的最后一维度是768,这里输入的第0维度也要是768
self.fc = torch.nn.Sequential(torch.nn.Linear(768, 768),
torch.nn.ReLU(),
torch.nn.Dropout(p=0.2),
torch.nn.Linear(768, 2)) #二分类问题,情感分析(积极1/消极0)
#加载预训练参数的模型
model_pretrained_parameters = AutoModelForSequenceClassification.from_pretrained('../data/model/distilbert-base-uncased/',
num_labels=2) #labels的类别数量
#让全连接层加载预训练的参数
self.fc[0].load_state_dict(model_pretrained_parameters.pre_classifier.state_dict())
self.fc[3].load_state_dict(model_pretrained_parameters.classifier.state_dict())
#损失函数
self.criterion = torch.nn.CrossEntropyLoss()
def forward(self, input_ids, attention_mask, labels=None):
#将输入数据传入预训练模型,得到一个输出结果
#logits是三维的
logits = self.model_pretrained(input_ids=input_ids, attention_mask=attention_mask)
# :使logits变成二维数据
logits = logits.last_hidden_state[:, 0] #0就是cls的输出结果,因为cls的位置是固定的(每句话的第一个单词就是),其他位置具有不确定性能拿到数据
#将logits传入输出层
logits = self.fc(logits)
#计算损失
loss = None #先将loss设为空
if labels is not None: #若传入了labels数据,不为空了
#计算损失
loss = self.criterion(logits, labels)
return {'loss': loss, 'logits': logits}
model = Model()
#查看模型参数量
print(sum(i.numel() for i in model.parameters()))
#试跑一下下游任务模型
#向模型中传入参数
out = model(**data) #out是一个字典,包含输出的loss和logits
print(out['loss'], out['logits'], out['logits'].shape)
#out['logits'].shape=torch.Size([8, 2]), 8是一批有8个数据, 2是两个类别的概率(哪个值更大,就归哪个类别)
#查看测试数据集的labels是否正常有效(没有-1)
dataset['test'][0]
6 测试代码
def test(model):
model.eval() #测试预测时,调到评估模式
#构建数据加载器
loader_test = torch.utils.data.DataLoader(
dataset=dataset['test'],
batch_size=16, #测试预测是在cpu上进行的,batch_size的值可以大一些,为16
#DataCollatorWithPadding(tokenizer)实例化collate_fn,不然会报错
collate_fn=DataCollatorWithPadding(tokenizer), #成批输送数据时,自动补全pad,使句子长度一致
shuffle=True,
drop_last=True)
outs = [] #存放计算的最大类别概率
labels = [] #存放真实值
for i, data in enumerate(loader_test):
#进行下游任务模型计算预测时,不进行求导梯度下降
with torch.no_grad():
#out是一个字典,包含loss和logits,
out = model(**data)
#out['logits']是一个二维数组,shape=(batch_szie, 类别数量)
outs.append(out['logits'].argmax(dim=1))
labels.append(data['labels'])
if i % 10 ==0: #每隔10次
print(i)
if i == 50:
break #到50,停止
#将outs和labels分别拼接起来
outs = torch.cat(outs)
labels = torch.cat(labels)
#计算准确度
accuracy = (outs == labels).sum().item() / len(labels)
print('accuracy:', accuracy)
test(model)
7 训练代码
from transformers import AdamW #AdamW梯度下降的优化算法
from transformers.optimization import get_scheduler #学习率的衰减计算
#设置设备、
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
device
#训练代码
def train():
#模型训练时,需要梯度下降、学习更新模型参数,以达到最好的预测效果
#定义优化器
optimizer = AdamW(model.parameters(),betas=(0.9, 0.999), eps=1e-8, lr=2e-5) #betas/eps/lr都是默认值
#学习率衰减计划
scheduler = get_scheduler(name='linear',
num_warmup_steps=0, #无预热缓冲区,从一开始就衰减
num_training_steps=len(loader),
optimizer=optimizer)
#将模型发送到设备上
model.to(device)
model.train() #模型训练模式
for i,data in enumerate(loader):
#接收需要输入的数据
input_ids, attention_mask, labels = data['input_ids'], data['attention_mask'], data['labels']
#将数据传到设备上
input_ids, attention_mask, labels = input_ids.to(device), attention_mask.to(device), labels.to(device)
#将这些数据传到设备上的模型,获取输出值out(一个字典,包含loss和logits(类别概率))
out = model(input_ids=input_ids, attention_mask=attention_mask, labels=labels)
#从out中获取loss
loss = out['loss'] #字典key索引
#用损失函数进行反向传播
loss.backward()
#为了梯度下降的稳定性,使用梯度裁剪
torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0) #公式中的c=1.0
#梯度更新
optimizer.step()
scheduler.step() #学习率衰减计划更新
#梯度清零
optimizer.zero_grad()
model.zero_grad()
if i% 50 == 0:
lr = optimizer.state_dict()['param_groups'][0]['lr']
#计算预测类别概率的最大值
out = out['logits'].argmax(dim=1)
#计算准确率
accuracy = (labels==out).sum().item() / 8 #batch_size=8
print(i, loss.item(), lr, accuracy)
print()
train()
#训练完模型,再次测试
test(model.to('cpu')) #因为测试的数据都在cpu上,需要把在gpu上训练的模型发到cpu上