0
点赞
收藏
分享

微信扫一扫

ResNet训练CIFAR10数据集的pytorch实现


代码在kaggle上跑了1个小时,精度最终达到90%

Sequential output shape: torch.Size([1, 64, 56, 56]) Sequential output shape: torch.Size([1, 64, 56, 56]) Sequential output shape: torch.Size([1, 128, 28, 28]) Sequential output shape: torch.Size([1, 256, 14, 14]) Sequential output shape: torch.Size([1, 512, 7, 7]) AdaptiveAvgPool2d output shape: torch.Size([1, 512, 1, 1]) Flatten output shape: torch.Size([1, 512]) Linear output shape: torch.Size([1, 10])

import os
import datetime
import torch
import torchvision
from torch import nn
from torch import optim
import torch.nn.functional as F
from torch.autograd import Variable
from torch.utils.data import DataLoader
from torchvision import transforms
import torchvision.models as models
from torchvision.utils import save_image
import numpy as np
import matplotlib.pyplot as plt
from scipy.stats import norm
from PIL import Image
import time
import argparse

def try_gpu(i=0):
"""如果存在,则返回gpu(i),否则返回cpu()"""
if torch.cuda.device_count() >= i + 1:
return torch.device(f'cuda:{i}')
return torch.device('cpu')

batch_size = 128
path = './'
train_transform = transforms.Compose([
transforms.RandomSizedCrop(224),# 随机剪切成227*227
transforms.RandomHorizontalFlip(),# 随机水平翻转
transforms.ToTensor(),
transforms.Normalize(mean = [ 0.485, 0.456, 0.406 ],
std = [ 0.229, 0.224, 0.225 ]),
])
val_transform = transforms.Compose([
transforms.Resize((224,224)),
transforms.ToTensor(),
transforms.Normalize(mean = [ 0.485, 0.456, 0.406 ],
std = [ 0.229, 0.224, 0.225 ]),
])

traindir = os.path.join(path, 'train')
valdir = os.path.join(path, 'val')

train_set = torchvision.datasets.CIFAR10(
traindir, train=True, transform=train_transform, download=True)
valid_set = torchvision.datasets.CIFAR10(
valdir, train=False, transform=val_transform, download=True)

train_loader = DataLoader(train_set, batch_size=batch_size, shuffle=True)
valid_loader = DataLoader(valid_set, batch_size=batch_size, shuffle=False)

dataloaders = {
'train': train_loader,
'valid': valid_loader,
# 'test': dataloader_test
}

dataset_sizes = {
'train': len(train_set),
'valid': len(valid_set),
# 'test': len(test_set)
}
print(dataset_sizes)

def train(model, criterion, optimizer, scheduler, device, num_epochs, dataloaders,dataset_sizes):
model = model.to(device)
print('training on ', device)
since = time.time()

best_model_wts = []
best_acc = 0.0

for epoch in range(num_epochs):
# 训练模型
s = time.time()
model,train_epoch_acc,train_epoch_loss = train_model(
model, criterion, optimizer, dataloaders['train'], dataset_sizes['train'], device)
print('Epoch {}/{} - train Loss: {:.4f} Acc: {:.4f} Time:{:.1f}s'
.format(epoch+1, num_epochs, train_epoch_loss, train_epoch_acc,time.time()-s))
# 验证模型
s = time.time()
val_epoch_acc,val_epoch_loss = val_model(
model, criterion, dataloaders['valid'], dataset_sizes['valid'], device)
print('Epoch {}/{} - valid Loss: {:.4f} Acc: {:.4f} Time:{:.1f}s'
.format(epoch+1, num_epochs, val_epoch_loss, val_epoch_acc,time.time()-s))
# 每轮都记录最好的参数.
if val_epoch_acc > best_acc:
best_acc = val_epoch_acc
best_model_wts = model.state_dict()
# 优化器
# if scheduler not in None:
# scheduler.step()
# 保存画图参数
train_losses.append(train_epoch_loss.to('cpu'))
train_acc.append(train_epoch_acc.to('cpu'))
val_losses.append(val_epoch_loss.to('cpu'))
val_acc.append(val_epoch_acc.to('cpu'))
print()
time_elapsed = time.time() - since
print('Training complete in {:.0f}m {:.0f}s'.format(time_elapsed // 60, time_elapsed % 60))
print('Best val Acc: {:4f}'.format(best_acc))
# model.load_state_dict(best_model_wts)
return model

def train_model(model, criterion, optimizer, dataloader, dataset_size,device):
model.train()
running_loss = 0.0
running_corrects = 0
for inputs,labels in dataloader:
optimizer.zero_grad()
# 输入的属性
inputs = Variable(inputs.to(device))
# 标签
labels = Variable(labels.to(device))
# 预测
outputs = model(inputs)
_,preds = torch.max(outputs.data,1)
# 计算损失
loss = criterion(outputs,labels)
#梯度下降
loss.backward()
optimizer.step()

running_loss += loss.data
running_corrects += torch.sum(preds == labels.data)

epoch_loss = running_loss / dataset_size
epoch_acc = running_corrects / dataset_size

return model,epoch_acc,epoch_loss

def val_model(model, criterion, dataloader, dataset_size, device):
model.eval()
running_loss = 0.0
running_corrects = 0
for (inputs,labels) in dataloader:
# 输入的属性
inputs = Variable(inputs.to(device))
# 标签
labels = Variable(labels.to(device))
# 预测
outputs = model(inputs)
_,preds = torch.max(outputs.data,1)
# 计算损失
loss = criterion(outputs,labels)

running_loss += loss.data
running_corrects += torch.sum(preds == labels.data)

epoch_loss = running_loss / dataset_size
epoch_acc = running_corrects / dataset_size

return epoch_acc,epoch_loss
class ResNet(nn.Module):

def __init__(self):
super().__init__()

self.b1 = nn.Sequential(
nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3),
nn.BatchNorm2d(64), nn.ReLU(),
nn.MaxPool2d(kernel_size=3, stride=2, padding=1))
self.b2 = nn.Sequential(*resnet_block(64, 64, 2, first_block=True))
self.b3 = nn.Sequential(*resnet_block(64, 128, 2))
self.b4 = nn.Sequential(*resnet_block(128, 256, 2))
self.b5 = nn.Sequential(*resnet_block(256, 512, 2))

self.avgpool = nn.AdaptiveAvgPool2d((1,1))
self.fn = nn.Flatten()
self.fc = nn.Linear(512, 10)

def forward(self, x):
out = self.b1(x)

out = self.b2(out)
out = self.b3(out)
out = self.b4(out)
out = self.b5(out)

out = self.avgpool(out)
out = self.fn(out)
out = self.fc(out)
return out

# 种类型的网络: 一种是当use_1x1conv=False时,应用ReLU非线性函数之前,将输入添加到输出。
# 另一种是当use_1x1conv=True时,添加通过卷积调整通道和分辨率
class Residual(nn.Module):
def __init__(self, input_channels, num_channels, use_1x1conv=False, strides=1):
super().__init__()
self.conv1 = nn.Conv2d(
input_channels, num_channels, kernel_size=3, padding=1, stride=strides)
self.conv2 = nn.Conv2d(
num_channels, num_channels, kernel_size=3, padding=1)
if use_1x1conv:
self.conv3 = nn.Conv2d(
input_channels, num_channels, kernel_size=1, stride=strides)
else:
self.conv3 = None
self.bn1 = nn.BatchNorm2d(num_channels)
self.bn2 = nn.BatchNorm2d(num_channels)
self.relu = nn.ReLU()

def forward(self, x):
out = self.relu(self.bn1(self.conv1(x)))
out = self.bn2(self.conv2(out))
if self.conv3:
x = self.conv3(x)
out += x
self.relu(out)
return out

# 残差块
def resnet_block(input_channels, num_channels, num_residuals,first_block=False):
blk = []
for i in range(num_residuals):
if i == 0 and not first_block:
blk.append(Residual(input_channels, num_channels,
use_1x1conv=True, strides=2))
else:
blk.append(Residual(num_channels, num_channels))
return blk
X = torch.randn(1, 3, 224, 224)
net = ResNet()
net = nn.Sequential(net.b1, net.b2, net.b3, net.b4, net.b5,
net.avgpool,net.fn, net.fc)
for layer in net:
X = layer(X)
print(layer.__class__.__name__,'output shape:\t', X.shape)
val_losses,val_acc = [],[]
train_losses,train_acc = [],[]

lr,num_epochs = 0.01,30
model = ResNet()
criterion = nn.CrossEntropyLoss()
# optimizer = optim.Adam(model.parameters(), lr=lr)
optimizer = optim.SGD(model.parameters(), lr=lr, momentum=0.9)
# scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=30, gamma=0.1)

model = train(model, criterion, optimizer, None ,
try_gpu(), num_epochs, dataloaders, dataset_sizes)

lr,num_epochs = 0.001,10
optimizer = optim.SGD(model.parameters(), lr=lr, momentum=0.9)
model = train(model, criterion, optimizer, None ,
try_gpu(), num_epochs, dataloaders, dataset_sizes)

plt.plot(range(1, len(train_losses)+1),train_losses, 'b', label='training loss')
plt.plot(range(1, len(val_losses)+1), val_losses, 'r', label='val loss')
plt.legend()

plt.plot(range(1,len(train_acc)+1),train_acc,'b--',label = 'train accuracy')
plt.plot(range(1,len(val_acc)+1),val_acc,'r--',label = 'val accuracy')
plt.legend()

举报

相关推荐

0 条评论