0
点赞
收藏
分享

微信扫一扫

【程序】DTWNet 中 Util.py

笙烛 2022-01-16 阅读 38
import numpy as np
import torch
import torch.utils.data
from torch import nn
from torch.autograd import Variable
import sklearn
from sklearn.metrics import roc_auc_score
import time


## auc and accuracy
def auc(y_true, y_score):
  return roc_auc_score(y_true, y_score[:,1])

def accuracy(output, target, topk=(1,)):
    """Computes the precision@k for the specified values of k"""
    with torch.no_grad():
        maxk = max(topk)
        batch_size = target.size(0)

        _, pred = output.topk(maxk, 1, True, True)
        pred = pred.t()
        correct = pred.eq(target.view(1, -1).expand_as(pred))

        res = []
        for k in topk:
            correct_k = correct[:k].view(-1).float().sum(0, keepdim=True)
            res.append(correct_k.mul_(100.0 / batch_size))
        return res[0]

## loss functions
def loss_function(labels, predictions):
  loss = nn.MSELoss()
  return loss(predictions, labels)

def loss_function_multi(labels, predictions, **kwargs):
  num_filter = kwargs.get('params')['num_filter']
  predictions = predictions.view(predictions.shape[1], -1)
  loss = nn.MSELoss()
  total = 0
  for i in range(num_filter):
    total += loss(predictions[i], labels[i])
  return total

def loss_function_crossentropy(labels, predictions, **kwargs):
  loss = nn.CrossEntropyLoss()
  return loss(predictions, labels)

def loss_function_crossentropy_diverse(labels, predictions):
  l = nn.CrossEntropyLoss()
  loss_ce = l(predictions,labels)
  loss_di = torch.abs(torch.dot(model.dtw[0].filters[0].kernel, model.dtw[0].filters[1].kernel))
  #loss_di = torch.abs(torch.dot(model.conv.weight[0,0], model.conv.weight[1,0]))
  alpha = 1e-1
  return (1-alpha)*loss_ce + alpha*loss_di

def loss_function_crossentropy_shape(labels, predictions, model, param_dict=None, alpha=0.1): # need to fix later
  l = nn.CrossEntropyLoss()
  loss_ce = l(predictions,labels)
  # loss_shape = ((model.dtw[0].filters[0].kernel)[0] - (model.dtw[0].filters[0].kernel)[-1])**2
  ## need fix
  loss_shape2 = (model.dtwlayers[0].filters[0].kernel)[0]**2
  loss_shape3 = (model.dtwlayers[0].filters[0].kernel)[-1]**2
  return (1-alpha)*loss_ce + alpha*(loss_shape2+loss_shape3)

def loss_function_mse(labels, predictions, **kwargs):
  loss = nn.MSELoss()
  return loss(predictions, labels)

def loss_function_mse_flat(labels, signalreduce, **kwargs):
  zero_sig = torch.zeros(signalreduce.shape)
  return loss_function_mse(zero_sig, signalreduce)

def loss_function_hybrid(labels, signalreduce, predictions, alpha=10, **kwargs):
  mseloss = loss_function_mse_flat(labels, signalreduce)
  clsloss = loss_function_crossentropy(labels, predictions)
  return alpha*mseloss + clsloss


## dataset read util
def read_ucr(filename):
    data = np.loadtxt(filename, delimiter = ',')
    Y = data[:,0]
    Y = list(map(lambda e: e-1, Y))
    Y = np.asarray(Y)
    X = data[:,1:]
    X = np.reshape(X, (X.shape[0], 1, -1)) # shape is (N, channel, L)
    return X, Y, X.shape[0], X.shape[2]

def read_ucr_downsample(filename, sample_rate):
    # if sample_rate == 0, no downsample
    data = np.loadtxt(filename, delimiter = ',')
    Y = data[:,0]
    Y = np.asarray(Y)
    X = data[:,1:]
    if sample_rate > 0:
      X = X[:, ::sample_rate]
    X = np.reshape(X, (X.shape[0], 1, -1)) # shape is (N, channel, L)
    return X, Y, X.shape[0], X.shape[2]

class UCRDataset(torch.utils.data.Dataset): ## need to fix later
  def __init__(self, file_path):
    X, Y, self.count, self.input_len = read_ucr(file_path)
    self.data = torch.from_numpy(X).double()
    self.label= torch.from_numpy(Y)

  def __getitem__(self, index):
    return (self.data[index], self.label[index])

  def __len__(self):
    return self.count
 
  def __feature_len__(self):
    return self.input_len

def convert_Y(Y):
  uniques = list(set(Y))
  y_converted = np.copy(Y)
  for i in range(len(Y)):
    y_converted[i] = uniques.index(Y[i])
  return y_converted

## data loader and processor
def proc_data(X, Y, batch_size):
  X = torch.from_numpy(X).double()
  Y = convert_Y(Y)
  Y = torch.from_numpy(Y).type(torch.LongTensor)
  data = torch.utils.data.TensorDataset(X,Y)
  loader = torch.utils.data.DataLoader(data, batch_size=batch_size, shuffle=True)
  return loader

def load_train(dir, dataset_name, batch_size, downsample_rate):
  X, Y, n_sample, feature_len = read_ucr_downsample(dir+dataset_name+'/'+dataset_name+'_TRAIN', downsample_rate)
  train_loader = proc_data(X, Y, batch_size)
  return train_loader, X, Y, feature_len, len(set(Y))

def load_test(dir, dataset_name, downsample_rate):
  X, Y, n_test_sample, feature_len = read_ucr_downsample(dir+dataset_name+'/'+dataset_name+'_TEST', downsample_rate) 
  # ## pick a subset for testing
  # test_sample_idx = np.random.randint(0,3000,size=(100,))
  # test_sample_idx = range(100)
  # X = X[test_sample_idx,:,:]
  # Y = Y[test_sample_idx]
  test_loader = proc_data(X, Y, n_test_sample) # use full test data
  return test_loader, X, Y

def load_boosting_mse(dir, dataset_name, batch_size, downsample_rate):
  X, Y, n_sample, feature_len = read_ucr_downsample(dir+dataset_name+'/'+dataset_name+'_TRAIN', downsample_rate)
  X = torch.from_numpy(X).double()
  id = list(Y==1)
  data = torch.utils.data.TensorDataset(X,torch.zeros_like(X).view(X.shape[0],-1))
  train_loader = torch.utils.data.DataLoader(data, batch_size=batch_size, shuffle=True)
  X, Y, n_test_sample, feature_len = read_ucr_downsample(dir+dataset_name+'/'+dataset_name+'_TEST', downsample_rate) 
  X = torch.from_numpy(X).double()
  data = torch.utils.data.TensorDataset(X,torch.zeros_like(X).view(X.shape[0],-1))
  test_loader = torch.utils.data.DataLoader(data, batch_size=n_test_sample, shuffle=True)
  return train_loader, test_loader, X, Y, feature_len
 

## train and test
def train(epoch, batch_size, train_loader, model, optimizer, loss_func, **kwargs):
    model.train()
    train_loss = 0
    ft = 0
    bt = 0
    if 'scheduler' in kwargs: kwargs.get('scheduler').step()
    param_dict = kwargs.get('param_dict') if 'param_dict' in kwargs else None        
    for batch_idx, (data, label) in enumerate(train_loader):
        data = Variable(data).float()
        if loss_func == loss_function_mse: label = Variable(label).type(torch.FloatTensor)
        else: label = Variable(label).type(torch.LongTensor)
        optimizer.zero_grad()
        ts = time.time()
        prediction = model(data)
        ft += time.time() - ts
        loss = loss_func(label, prediction, model=model, param_dict=param_dict)
        ts = time.time()
        loss.backward(retain_graph=True)
        bt += time.time() - ts
        #print('forward: ', time.time()-ts)
        optimizer.step()
        train_loss += loss.data.item()
        if batch_idx == len(train_loader.dataset)/batch_size-1:
          print('====> Epoch: {} Average loss: {:.4f}'.format(epoch, train_loss*batch_size / len(train_loader.dataset)))
          if 'vis' in kwargs: 
            kwargs.get('vis').line(X=torch.FloatTensor([epoch]),Y=torch.FloatTensor([train_loss*batch_size/len(train_loader.dataset)]), win='loss_train', update='append' if epoch > 0 else None)
            if loss_func == loss_function_mse: kwargs.get('vis').line(Y=prediction[0], win='prediction')
    print('forward and backward time: ', ft, bt)

def test(epoch, test_loader, model, loss_func, **kwargs):
  if epoch % 10 == 0:
    model.eval()
    test_loss = 0
    test_auc = 0
    test_acc = 0
    correct = 0
    with torch.no_grad():
      for data, label in test_loader:
        data = Variable(data).float()
        if loss_func == loss_function_mse: label = Variable(label).type(torch.FloatTensor)
        else: label = Variable(label).type(torch.LongTensor)
        prediction = model(data)
        loss = loss_func(label, prediction, model=model)
        test_loss += loss.data.item()
        if loss_func == loss_function_mse: 
          test_acc += loss.data.item()
        else: 
          # test_auc += auc(label, prediction)
          _, pred_for_acc = torch.max(prediction.data, 1)
          test_acc += torch.sum(pred_for_acc==label.data)
          # test_acc += accuracy( prediction, label)
    print('test loss: ', test_loss)
    print('test auc: ' , test_auc)
    print('test acc: ' , float(test_acc)/len(test_loader.dataset))
    
    if 'vis' in kwargs:
      vis = kwargs.get('vis')
      vis.line(X=torch.FloatTensor([epoch]),Y=torch.FloatTensor([test_loss]), opts=dict(title='test loss',), win='loss_test', update='append' if epoch > 0 else None)
      vis.line(X=torch.FloatTensor([epoch]),Y=torch.FloatTensor([test_auc]), opts=dict(title='test auc',), win='auc_test', update='append' if epoch > 0 else None)
      vis.line(X=torch.FloatTensor([epoch]),Y=torch.FloatTensor([float(test_acc)/len(test_loader.dataset)]), opts=dict(title='test accuracy',), win='acc_test', update='append' if epoch > 0 else None)

def train_hybrid(epoch, batch_size, train_loader, model, optimizer, loss_func=None, **kwargs):
    model.train()
    train_loss = 0
    ft = 0
    bt = 0
    if 'scheduler' in kwargs: kwargs.get('scheduler').step()
    param_dict = kwargs.get('param_dict') if 'param_dict' in kwargs else None        
    for batch_idx, (data, label) in enumerate(train_loader):
        data = Variable(data).float()
        label = Variable(label).type(torch.LongTensor)
        optimizer.zero_grad()
        ts = time.time()
        signalreduce, prediction = model(data)
        ft += time.time() - ts
        loss = loss_function_hybrid(label, signalreduce, prediction)
        ts = time.time()
        loss.backward(retain_graph=True)
        bt += time.time() - ts
        #print('forward: ', time.time()-ts)
        optimizer.step()
        train_loss += loss.data.item()
        if batch_idx == len(train_loader.dataset)/batch_size-1:
          print('====> Epoch: {} Average loss: {:.4f}'.format(epoch, train_loss*batch_size / len(train_loader.dataset)))
          if 'vis' in kwargs: 
            kwargs.get('vis').line(X=torch.FloatTensor([epoch]),Y=torch.FloatTensor([train_loss*batch_size/len(train_loader.dataset)]), win='loss_train', update='append' if epoch > 0 else None)
    print('forward and backward time: ', ft, bt)

def test_hybrid(epoch, test_loader, model, loss_func=None, **kwargs):
  if epoch % 10 == 0:
    model.eval()
    test_loss = 0
    test_auc = 0
    test_acc = 0
    correct = 0
    with torch.no_grad():
      for data, label in test_loader:
        data = Variable(data).float()
        label = Variable(label).type(torch.LongTensor)
        _, prediction = model(data)
        loss = loss_function_crossentropy(label, prediction)
        test_loss += loss.data.item()
        # test_auc += auc(label, prediction)
        _, pred_for_acc = torch.max(prediction.data, 1)
        test_acc += torch.sum(pred_for_acc==label.data)
        # test_acc += accuracy( prediction, label)
    print('test loss: ', test_loss)
    print('test auc: ' , test_auc)
    print('test acc: ' , float(test_acc)/len(test_loader.dataset))
    
    if 'vis' in kwargs:
      vis = kwargs.get('vis')
      vis.line(X=torch.FloatTensor([epoch]),Y=torch.FloatTensor([test_loss]), opts=dict(title='test loss',), win='loss_test', update='append' if epoch > 0 else None)
      vis.line(X=torch.FloatTensor([epoch]),Y=torch.FloatTensor([test_auc]), opts=dict(title='test auc',), win='auc_test', update='append' if epoch > 0 else None)
      vis.line(X=torch.FloatTensor([epoch]),Y=torch.FloatTensor([float(test_acc)/len(test_loader.dataset)]), opts=dict(title='test accuracy',), win='acc_test', update='append' if epoch > 0 else None)

def train_hybrid_switch(epoch, batch_size, train_loader, model, optimizer, loss_func=None, **kwargs):
    model.train()
    train_loss = 0
    ft = 0
    bt = 0
    if 'scheduler' in kwargs: kwargs.get('scheduler').step()
    param_dict = kwargs.get('param_dict') if 'param_dict' in kwargs else None        
    for batch_idx, (data, label) in enumerate(train_loader):
        data = Variable(data).float()
        label = Variable(label).type(torch.LongTensor)
        optimizer.zero_grad()
        ts = time.time()
        if epoch < 200: interval = 2
        else: interval = 2
        if epoch % interval == 0:
          prediction = model(data, 'dtwnet')
          loss = loss_function_crossentropy(label, prediction)
        else:
          signalreduce = model(data, 'boosting')
          loss = loss_function_mse_flat(label, signalreduce)
        ft += time.time() - ts
        ts = time.time()
        loss.backward(retain_graph=True)
        bt += time.time() - ts
        #print('forward: ', time.time()-ts)
        optimizer.step()
        train_loss += loss.data.item()
        if batch_idx == len(train_loader.dataset)/batch_size-1:
          print('====> Epoch: {} Average loss: {:.4f}'.format(epoch, train_loss*batch_size / len(train_loader.dataset)))
          if 'vis' in kwargs: 
            kwargs.get('vis').line(X=torch.FloatTensor([epoch]),Y=torch.FloatTensor([train_loss*batch_size/len(train_loader.dataset)]), win='loss_train', update='append' if epoch > 0 else None)
    print('forward and backward time: ', ft, bt)

def test_hybrid_switch(epoch, test_loader, model, loss_func=None, **kwargs):
  if epoch % 10 == 0:
    model.eval()
    test_loss = 0
    test_auc = 0
    test_acc = 0
    correct = 0
    with torch.no_grad():
      for data, label in test_loader:
        data = Variable(data).float()
        label = Variable(label).type(torch.LongTensor)
        prediction = model(data, 'dtwnet')
        loss = loss_function_crossentropy(label, prediction)
        test_loss += loss.data.item()
        # test_auc += auc(label, prediction)
        _, pred_for_acc = torch.max(prediction.data, 1)
        test_acc += torch.sum(pred_for_acc==label.data)
        # test_acc += accuracy( prediction, label)
    print('test loss: ', test_loss)
    print('test auc: ' , test_auc)
    print('test acc: ' , float(test_acc)/len(test_loader.dataset))
    
    if 'vis' in kwargs:
      vis = kwargs.get('vis')
      vis.line(X=torch.FloatTensor([epoch]),Y=torch.FloatTensor([test_loss]), opts=dict(title='test loss',), win='loss_test', update='append' if epoch > 0 else None)
      vis.line(X=torch.FloatTensor([epoch]),Y=torch.FloatTensor([test_auc]), opts=dict(title='test auc',), win='auc_test', update='append' if epoch > 0 else None)
      vis.line(X=torch.FloatTensor([epoch]),Y=torch.FloatTensor([float(test_acc)/len(test_loader.dataset)]), opts=dict(title='test accuracy',), win='acc_test', update='append' if epoch > 0 else None)


## test code
# def test_code():
#   model = Model.CONVDTW(10, 10, 10)
#   data = Variable(torch.randn(10))
#   labels = Variable(torch.randn(1))
# 
#   # model = CONVDTW_LAYER(5, 10, 100, 10)
#   # data = Variable(torch.randn(1, 100))
#   # labels = Variable(torch.randn(5, 91))
# 
#   optimizer = optim.SGD(model.parameters(), lr=1e-4)
#   for i in range(1000):
#     optimizer.zero_grad()
#     predictions = model(data)
#     #loss = loss_function_multi(5, labels, predictions)
#     loss = loss_function(labels, predictions)
#     loss.backward(retain_graph=True)
#     optimizer.step()
#     print(loss)
#     for name, param in model.named_parameters():
#       if param.requires_grad:
#         print(name, param.data)
#     print(data)


举报

相关推荐

0 条评论