0
点赞
收藏
分享

微信扫一扫

『迷你教程』识别人类活动的一维卷积神经网络模型,附完整代码

文章目录

人类活动识别是将专用线束或智能手机记录的加速度计数据序列分类为已知明确定义的运动的问题。
在这里插入图片描述

该问题的经典方法涉及基于固定大小的窗口和训练机器学习模型(例如决策树的集合)从时间序列数据中手工制作特征。困难在于,这种特征工程需要该领域的深厚专业知识。

循环神经网络和一维卷积神经网络或 CNN等深度学习方法已被证明可以在很少或没有数据特征工程的情况下,在具有挑战性的活动识别任务上提供最先进的结果,而不是使用特征学习原始数据。

在本文中,您将了解如何针对人体活动识别问题开发用于时间序列分类的一维卷积神经网络。
在这里插入图片描述

使用智能手机数据集进行活动识别

人体活动识别,或简称 HAR,是使用传感器根据一个人的运动轨迹来预测一个人正在做什么的问题。

标准的人类活动识别数据集是 2012 年推出的“使用智能手机的活动识别数据集”。

它由 Davide Anguita 等人准备并提供。来自意大利热那亚大学,并在他们 2013 年的论文“使用智能手机进行人类活动识别的公共领域数据集”中进行了完整描述。该数据集在其 2012 年题为“使用多类硬件友好支持向量机对智能手机进行人类活动识别”的论文中使用机器学习算法建模。

UCI 机器学习存储库 免费下载

这些数据是从 30 名年龄在 19 至 48 岁之间的受试者身上收集的,他们在佩戴记录运动数据的腰部智能手机的情况下执行六项标准活动中的一项。记录每个受试者执行活动的视频,并从这些视频中手动标记运动数据。

记录的运动数据是来自智能手机(特别是三星 Galaxy S II)的 x、y 和 z 加速度计数据(线性加速度)和陀螺仪数据(角速度)。以 50 Hz(即每秒 50 个数据点)记录观察结果。每个受试者执行两次活动序列,一次将设备放在他们的左侧,一次将设备放在他们的右侧。

原始数据不可用。相反,提供了数据集的预处理版本。预处理步骤包括:

  • 使用噪声滤波器预处理加速度计和陀螺仪。
  • 将数据拆分为 2.56 秒(128 个数据点)的固定窗口,重叠率为 50%。
  • 将加速度计数据拆分为重力(总)和身体运动分量。

对窗口数据应用了特征工程,并提供了具有这些工程特征的数据副本。

从每个窗口中提取了一些人类活动识别领域常用的时间和频率特征。结果是一个 561 个元素的特征向量。

根据受试者的数据,将数据集分为训练 (70%) 和测试 (30%) 集,例如 21 个受试者用于训练,9 个受试者用于测试。

使用旨在用于智能手机的支持向量机(例如定点算法)的实验结果在测试数据集上产生了 89% 的预测准确率,实现了与未修改的 SVM 实现类似的结果。

该数据集是免费提供的,可以从 UCI 机器学习存储库下载。

数据以单个 zip 文件的形式提供,大小约为 58 兆字节。此下载的直接链接如下:

UCI HAR 数据集.zip

下载数据集并将所有文件解压缩到当前工作目录中名为“HARDataset”的新目录中。

开发一维卷积神经网络

在本节中,我们将为人类活动识别数据集开发一个一维卷积神经网络模型 (1D CNN)。

卷积神经网络模型是为图像分类问题开发的,其中模型在称为特征学习的过程中学习二维输入的内部表示。

同样的过程可以用于一维数据序列,例如用于人类活动识别的加速度和陀螺仪数据。该模型学习从观察序列中提取特征以及如何将内部特征映射到不同的活动类型。

使用 CNN 进行序列分类的好处是它们可以直接从原始时间序列数据中学习,反过来不需要领域专业知识来手动设计输入特征。该模型可以学习时间序列数据的内部表示,并在理想情况下实现与适合具有工程特征的数据集版本的模型相当的性能。

1.加载数据

第一步是将原始数据集加载到内存中。

原始数据中主要有三种信号类型:总加速度、体加速度和体陀螺。每个都有三个数据轴。这意味着每个时间步长共有九个变量。

此外,每个数据系列都被划分为 2.65 秒数据或 128 个时间步长的重叠窗口。这些数据窗口对应于上一节中的工程特征(行)窗口。

这意味着一行数据具有 (128 * 9) 或 1,152 个元素。这比上一节中 561 个元素向量的大小略小一倍,很可能存在一些冗余数据。

信号存储在train和test子目录下的/ Inertial Signals /目录中。每个信号的每个轴都存储在一个单独的文件中,这意味着每个训练和测试数据集都有 9 个要加载的输入文件和一个要加载的输出文件。考虑到一致的目录结构和文件命名约定,我们可以将这些文件批量加载到组中。

输入数据采用 CSV 格式,其中列由空格分隔。这些文件中的每一个都可以作为 NumPy 数组加载。下面的load_file()函数根据文件的文件路径加载数据集,并将加载的数据作为 NumPy 数组返回。

# 加载数据
def load_file(filepath):
	dataframe = read_csv(filepath, header=None, delim_whitespace=True)
	return dataframe.values

然后我们可以将给定组(训练或测试)的所有数据加载到单个三维 NumPy 数组中,其中数组的维度是 [样本、时间步长、特征]。

为了更清楚地说明这一点,有 128 个时间步长和 9 个特征,其中样本数是任何给定原始信号数据文件中的行数。

下面的load_group()函数实现了这种行为。所述dstack()NumPy的函数允许我们堆叠每个加载的3D阵列成一个单一的3D阵列,其中变量是在第三维(特征)分离。

# 加载三维的数组 [samples, timesteps, features]
def load_group(filenames, prefix=''):
	loaded = list()
	for name in filenames:
		data = load_file(prefix + name)
		loaded.append(data)
	# 三维堆栈组
	loaded = dstack(loaded)
	return loaded

我们可以使用此函数加载给定组的所有输入信号数据,例如训练或测试。

下面的load_dataset_group()函数使用 train 和 test 目录之间的一致命名约定加载单个组的所有输入信号数据和输出数据。

# 加载数据集组,训练或测试
def load_dataset_group(group, prefix=''):
	filepath = prefix + group + '/Inertial Signals/'
	# 将所有 9 个文件加载为单个数组
	filenames = list()
	# total acceleration
	filenames += ['total_acc_x_'+group+'.txt', 'total_acc_y_'+group+'.txt', 'total_acc_z_'+group+'.txt']
	# body acceleration
	filenames += ['body_acc_x_'+group+'.txt', 'body_acc_y_'+group+'.txt', 'body_acc_z_'+group+'.txt']
	# body gyroscope
	filenames += ['body_gyro_x_'+group+'.txt', 'body_gyro_y_'+group+'.txt', 'body_gyro_z_'+group+'.txt']
	# 加载输入文件
	X = load_group(filenames, filepath)
	# 加载输出类别
	y = load_file(prefix + group + '/y_'+group+'.txt')
	return X, y

最后,我们可以加载每个训练和测试数据集。

输出数据被定义为类号的整数。我们必须对这些类整数进行热编码,以便数据适合拟合神经网络多类分类模型。我们可以通过调用to_categorical() Keras 函数来做到这一点。

下面的load_dataset()函数实现了这种行为,并返回训练和测试X和y元素,准备好拟合和评估定义的模型。

# 加载数据集,返回训练和测试 X 和 y 元素
def load_dataset(prefix=''):
	# 加载全部训练数据
	trainX, trainy = load_dataset_group('train', prefix + 'HARDataset/')
	print(trainX.shape, trainy.shape)
	# 加载全部测试数据
	testX, testy = load_dataset_group('test', prefix + 'HARDataset/')
	print(testX.shape, testy.shape)
	# 零偏移类值
	trainy = trainy - 1
	testy = testy - 1
	# 对 y 进行 one hot 编码
	trainy = to_categorical(trainy)
	testy = to_categorical(testy)
	print(trainX.shape, trainy.shape, testX.shape, testy.shape)
	return trainX, trainy, testX, testy

拟合和评估模型

现在我们已经将数据加载到内存中准备建模,我们可以定义、拟合和评估一维 CNN 模型。

我们可以定义一个名为evaluate_model()的函数,它接受训练和测试数据集,在训练数据集上拟合模型,在测试数据集上对其进行评估,并返回模型性能的估计值。

首先,我们必须使用 Keras 深度学习库定义 CNN 模型。该模型需要一个具有[样本、时间步长、特征]的三维输入。

这正是我们加载数据的方式,其中一个样本是时间序列数据的一个窗口,每个窗口有 128 个时间步,一个时间步有 9 个变量或特征。

该模型的输出将是一个六元素向量,其中包含属于六种活动类型中每一种的给定窗口的概率。

拟合模型时需要这些输入和输出维度,我们可以从提供的训练数据集中提取它们。

n_timesteps, n_features, n_outputs = trainX.shape[1], trainX.shape[2], trainy.shape[1]

为简单起见,该模型被定义为 Sequential Keras 模型。

我们将模型定义为具有两个 1D CNN 层,然后是一个用于正则化的 dropout 层,然后是一个池化层。通常以两个为一组定义 CNN 层,以便为模型提供从输入数据中学习特征的良好机会。CNN 的学习速度非常快,因此 dropout 层旨在帮助减缓学习过程,并希望产生更好的最终模型。池化层将学习到的特征减少到其大小的 1/4,将它们合并为最重要的元素。

在 CNN 和池化之后,学习到的特征被展平为一个长向量,并在用于进行预测的输出层之前通过一个全连接层。理想情况下,全连接层在学习特征和输出之间提供缓冲区,目的是在进行预测之前解释学习特征。

对于这个模型,我们将使用 64 个并行特征图的标准配置和 3 的内核大小。 特征图是输入被处理或解释的次数,而内核大小是输入时间步的数量被认为是输入序列被读取或处理到特征图上。

随机梯度下降的高效Adam版本将用于优化网络,鉴于我们正在学习多类分类问题,将使用分类交叉熵损失函数。

下面列出了模型的定义。

model = Sequential()
model.add(Conv1D(filters=64, kernel_size=3, activation='relu', input_shape=(n_timesteps,n_features)))
model.add(Conv1D(filters=64, kernel_size=3, activation='relu'))
model.add(Dropout(0.5))
model.add(MaxPooling1D(pool_size=2))
model.add(Flatten())
model.add(Dense(100, activation='relu'))
model.add(Dense(n_outputs, activation='softmax'))
model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])

该模型适用于固定数量的 epoch,在本例中为 10,将使用 32 个样本的批量大小,其中 32 个数据窗口将在模型权重更新之前暴露给模型。

模型拟合后,将在测试数据集上对其进行评估,并返回拟合模型在测试数据集上的准确度。

下面列出了完整的evaluate_model()函数。

# 拟合和评估模型
def evaluate_model(trainX, trainy, testX, testy):
	verbose, epochs, batch_size = 0, 10, 32
	n_timesteps, n_features, n_outputs = trainX.shape[1], trainX.shape[2], trainy.shape[1]
	model = Sequential()
	model.add(Conv1D(filters=64, kernel_size=3, activation='relu', input_shape=(n_timesteps,n_features)))
	model.add(Conv1D(filters=64, kernel_size=3, activation='relu'))
	model.add(Dropout(0.5))
	model.add(MaxPooling1D(pool_size=2))
	model.add(Flatten())
	model.add(Dense(100, activation='relu'))
	model.add(Dense(n_outputs, activation='softmax'))
	model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
	# 拟合网络
	model.fit(trainX, trainy, epochs=epochs, batch_size=batch_size, verbose=verbose)
	# 评估模型返回acc
	_, accuracy = model.evaluate(testX, testy, batch_size=batch_size, verbose=0)
	return accuracy

网络结构或选择的超参数没有什么特别之处;它们只是这个问题的起点。

总结结果

我们不能从单一的评估中判断模型的技能。

这样做的原因是神经网络是随机的,这意味着在相同的数据上训练相同的模型配置时会产生不同的特定模型。

这是网络的一个特性,因为它赋予模型自适应能力,但需要对模型进行稍微复杂的评估。

我们将多次重复评估模型,然后总结模型在每次运行中的性能。例如,我们可以总共调用evaluate_model() 10 次。这将导致必须汇总的模型评估分数总体。

# 重复实验
scores = list()
for r in range(repeats):
	score = evaluate_model(trainX, trainy, testX, testy)
	score = score * 100.0
	print('>#%d: %.3f' % (r+1, score))
	scores.append(score)

我们可以通过计算和报告性能的平均值和标准差来总结分数样本。均值给出了模型在数据集上的平均准确度,而标准差给出了准确度与均值的平均方差。

下面的函数summarize_results()总结了一次运行的结果。

# 汇总准确率得分
def summarize_results(scores):
	print(scores)
	m, s = mean(scores), std(scores)
	print('Accuracy: %.3f%% (+/-%.3f)' % (m, s))

我们可以将重复评估、结果收集和结果汇总捆绑到一个名为run_experiment()的主要实验函数中,如下所示。

默认情况下,模型会在报告模型性能之前评估 10 次。

# 进行实验
def run_experiment(repeats=10):
	# 加载数据
	trainX, trainy, testX, testy = load_dataset()
	# 重复试验
	scores = list()
	for r in range(repeats):
		score = evaluate_model(trainX, trainy, testX, testy)
		score = score * 100.0
		print('>#%d: %.3f' % (r+1, score))
		scores.append(score)
	# 汇总结果
	summarize_results(scores)

完整示例
现在我们有了所有的部分,我们可以将它们组合成一个工作示例。

下面提供了完整的代码清单。

# cnn 模型
from numpy import mean
from numpy import std
from numpy import dstack
from pandas import read_csv
from matplotlib import pyplot
from keras.models import Sequential
from keras.layers import Dense
from keras.layers import Flatten
from keras.layers import Dropout
from keras.layers.convolutional import Conv1D
from keras.layers.convolutional import MaxPooling1D
from keras.utils import to_categorical

# 将单个文件加载为 numpy 数组
def load_file(filepath):
	dataframe = read_csv(filepath, header=None, delim_whitespace=True)
	return dataframe.values

# 加载文件列表并作为 3d numpy 数组返回
def load_group(filenames, prefix=''):
	loaded = list()
	for name in filenames:
		data = load_file(prefix + name)
		loaded.append(data)
	# 堆栈组
	loaded = dstack(loaded)
	return loaded

# 加载数据集组,训练或测试
def load_dataset_group(group, prefix=''):
	filepath = prefix + group + '/Inertial Signals/'
	# 将所有 9 个文件加载为单个数组
	filenames = list()
	# total acceleration
	filenames += ['total_acc_x_'+group+'.txt', 'total_acc_y_'+group+'.txt', 'total_acc_z_'+group+'.txt']
	# body acceleration
	filenames += ['body_acc_x_'+group+'.txt', 'body_acc_y_'+group+'.txt', 'body_acc_z_'+group+'.txt']
	# body gyroscope
	filenames += ['body_gyro_x_'+group+'.txt', 'body_gyro_y_'+group+'.txt', 'body_gyro_z_'+group+'.txt']
	# 加载输入数据
	X = load_group(filenames, filepath)
	# 加载类输出
	y = load_file(prefix + group + '/y_'+group+'.txt')
	return X, y

# 加载数据集,返回训练和测试 X 和 y 元素
def load_dataset(prefix=''):
	# 加载全部训练数据
	trainX, trainy = load_dataset_group('train', prefix + 'HARDataset/')
	print(trainX.shape, trainy.shape)
	# 加载全部测试数据
	testX, testy = load_dataset_group('test', prefix + 'HARDataset/')
	print(testX.shape, testy.shape)
	# 零偏移类值
	trainy = trainy - 1
	testy = testy - 1
	# 将 y 转换 one hot encode
	trainy = to_categorical(trainy)
	testy = to_categorical(testy)
	print(trainX.shape, trainy.shape, testX.shape, testy.shape)
	return trainX, trainy, testX, testy

# 拟合和评估模型
def evaluate_model(trainX, trainy, testX, testy):
	verbose, epochs, batch_size = 0, 10, 32
	n_timesteps, n_features, n_outputs = trainX.shape[1], trainX.shape[2], trainy.shape[1]
	model = Sequential()
	model.add(Conv1D(filters=64, kernel_size=3, activation='relu', input_shape=(n_timesteps,n_features)))
	model.add(Conv1D(filters=64, kernel_size=3, activation='relu'))
	model.add(Dropout(0.5))
	model.add(MaxPooling1D(pool_size=2))
	model.add(Flatten())
	model.add(Dense(100, activation='relu'))
	model.add(Dense(n_outputs, activation='softmax'))
	model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
	# 拟合网络
	model.fit(trainX, trainy, epochs=epochs, batch_size=batch_size, verbose=verbose)
	# 评估模型
	_, accuracy = model.evaluate(testX, testy, batch_size=batch_size, verbose=0)
	return accuracy

# 汇总得分acc
def summarize_results(scores):
	print(scores)
	m, s = mean(scores), std(scores)
	print('Accuracy: %.3f%% (+/-%.3f)' % (m, s))

# 进行实验
def run_experiment(repeats=10):
	# 加载数据
	trainX, trainy, testX, testy = load_dataset()
	# 重复试验
	scores = list()
	for r in range(repeats):
		score = evaluate_model(trainX, trainy, testX, testy)
		score = score * 100.0
		print('>#%d: %.3f' % (r+1, score))
		scores.append(score)
	# 汇总结果
	summarize_results(scores)

# 执行
run_experiment()

运行该示例首先打印加载数据集的形状,然后打印训练集和测试集的形状以及输入和输出元素。这确认了样本、时间步长和变量的数量,以及类的数量。

接下来,创建和评估模型,并为每个模型打印调试消息。

注意:您的结果可能会因算法或评估程序的随机性或数值精度的差异而有所不同。考虑多次运行该示例并比较平均结果。

最后,打印分数样本,然后是平均值和标准偏差。我们可以看到模型表现良好,在原始数据集上训练的分类准确率约为 90.9%,标准差约为 1.3。

这是一个很好的结果,考虑到原始论文发表了 89% 的结果,是在具有大量特定领域特征工程的数据集上训练的,而不是原始数据集。

(7352, 128, 9) (7352, 1)
(2947, 128, 9) (2947, 1)
(7352, 128, 9) (7352, 6) (2947, 128, 9) (2947, 6)

>#1: 91.347
>#2: 91.551
>#3: 90.804
>#4: 90.058
>#5: 89.752
>#6: 90.940
>#7: 91.347
>#8: 87.547
>#9: 92.637
>#10: 91.890

[91.34713267729894, 91.55072955548015, 90.80420766881574, 90.05768578215134, 89.75229046487954, 90.93993892093654, 91.34713267729894, 87.54665761791652, 92.63657957244655, 91.89005768578215]

Accuracy: 90.787% (+/-1.341)

现在我们已经了解了如何加载数据并拟合 1D CNN 模型,我们可以研究是否可以通过一些超参数调整来进一步提升模型的技能。

调整一维卷积神经网络

调整模型以进一步提高问题的性能,我们将研究三个主要领域:

  • 数据准备
  • 过滤器数量
  • 内核大小
  • 数据准备

之前我们没有进行任何数据准备。我们按原样使用数据。

每个主要数据集(身体加速度、身体陀螺仪和总加速度)都已缩放到 -1、1 的范围内。目前尚不清楚这些数据是针对每个对象还是针对所有对象进行缩放的。

一种可能导致改进的可能变换是在拟合模型之前对观察进行标准化。

标准化是指移动每个变量的分布,使其均值为 0,标准差为 1。只有当每个变量的分布是高斯分布时才有意义。

我们可以通过绘制训练数据集中每个变量的直方图来快速检查每个变量的分布。

这方面的一个小困难是数据被分成 128 个时间步长的窗口,有 50% 的重叠。因此,为了对数据分布有一个公平的了解,我们必须首先去除重复的观察(重叠),然后去除数据的窗口。

我们可以使用 NumPy 做到这一点,首先对数组进行切片,只保留每个窗口的后半部分,然后将窗口展平为每个变量的长向量。这是快速而肮脏的,并且确实意味着我们会丢失第一个窗口的前半部分中的数据。

# 数据去重
cut = int(trainX.shape[1] / 2)
longX = trainX[:, -cut:, :]
# 数据拉平
longX = longX.reshape((longX.shape[0] * longX.shape[1], longX.shape[2]))

下面列出了加载数据、展平数据并为九个变量中的每一个绘制直方图的完整示例。

# 数据分布视图
from numpy import dstack
from pandas import read_csv
from keras.utils import to_categorical
from matplotlib import pyplot

# 将单个文件加载为 numpy 数组
def load_file(filepath):
	dataframe = read_csv(filepath, header=None, delim_whitespace=True)
	return dataframe.values

# 加载文件列表并作为 3d numpy 数组返回
def load_group(filenames, prefix=''):
	loaded = list()
	for name in filenames:
		data = load_file(prefix + name)
		loaded.append(data)
	# 堆栈组
	loaded = dstack(loaded)
	return loaded

# 加载数据集组,训练或测试 
def load_dataset_group(group, prefix=''):
	filepath = prefix + group + '/Inertial Signals/'
	# 将所有 9 个文件加载为单个数组
	filenames = list()
	# total acceleration
	filenames += ['total_acc_x_'+group+'.txt', 'total_acc_y_'+group+'.txt', 'total_acc_z_'+group+'.txt']
	# body acceleration
	filenames += ['body_acc_x_'+group+'.txt', 'body_acc_y_'+group+'.txt', 'body_acc_z_'+group+'.txt']
	# body gyroscope
	filenames += ['body_gyro_x_'+group+'.txt', 'body_gyro_y_'+group+'.txt', 'body_gyro_z_'+group+'.txt']
	# 加载输入数据
	X = load_group(filenames, filepath)
	# 加载类输出
	y = load_file(prefix + group + '/y_'+group+'.txt')
	return X, y

# 加载数据集,返回训练和测试 X 和 y 元素
def load_dataset(prefix=''):
	# 加载全部训练数据
	trainX, trainy = load_dataset_group('train', prefix + 'HARDataset/')
	print(trainX.shape, trainy.shape)
	# 加载全部测试数据
	testX, testy = load_dataset_group('test', prefix + 'HARDataset/')
	print(testX.shape, testy.shape)
	# 零偏移类值
	trainy = trainy - 1
	testy = testy - 1
	# 将 y 进行 one hot encode 编码
	trainy = to_categorical(trainy)
	testy = to_categorical(testy)
	print(trainX.shape, trainy.shape, testX.shape, testy.shape)
	return trainX, trainy, testX, testy

# 绘制数据集中每个变量的直方图
def plot_variable_distributions(trainX):
	# 数据去重
	cut = int(trainX.shape[1] / 2)
	longX = trainX[:, -cut:, :]
	# 数据拉平
	longX = longX.reshape((longX.shape[0] * longX.shape[1], longX.shape[2]))
	print(longX.shape)
	pyplot.figure()
	xaxis = None
	for i in range(longX.shape[1]):
		ax = pyplot.subplot(longX.shape[1], 1, i+1, sharex=xaxis)
		ax.set_xlim(-1, 1)
		if i == 0:
			xaxis = ax
		pyplot.hist(longX[:, i], bins=100)
	pyplot.show()

# 加载数据
trainX, trainy, testX, testy = load_dataset()
# 绘制直方图
plot_variable_distributions(trainX)

运行该示例创建一个带有九个直方图的图,一个用于训练数据集中的每个变量。

在这里插入图片描述

绘图的顺序与加载数据的顺序相匹配,特别是:

  • Total Acceleration x
  • Total Acceleration y
  • Total Acceleration z
  • Body Acceleration x
  • Body Acceleration y
  • Body Acceleration z
  • Body Gyroscope x
  • Body Gyroscope y
  • Body Gyroscope z

我们可以看到,除了第一个变量(总加速度 x)之外,每个变量都具有类高斯分布。

总加速度数据的分布比身体数据更平坦,更有针对性。

我们可以探索对数据使用幂变换来使分布更加高斯分布,尽管这只是一个练习。

训练数据集中每个变量的直方图
训练数据集中每个变量的直方图

数据足够高斯,可以探索标准化变换是否有助于模型从原始观察中提取显着信号。

下面名为scale_data()的函数可用于在拟合和评估模型之前对数据进行标准化。StandardScaler scikit-learn 类将用于执行转换。它首先拟合训练数据(例如,找出每个变量的均值和标准差),然后应用于训练和测试集。

标准化是可选的,因此我们可以在受控实验中应用该过程并将结果与​​没有标准化的相同代码路径进行比较。

# 标准化数据
def scale_data(trainX, testX, standardize):
	# 数据去重
	cut = int(trainX.shape[1] / 2)
	longX = trainX[:, -cut:, :]
	# 拉平数据
	longX = longX.reshape((longX.shape[0] * longX.shape[1], longX.shape[2]))
	# 拉平数据 train and test
	flatTrainX = trainX.reshape((trainX.shape[0] * trainX.shape[1], trainX.shape[2]))
	flatTestX = testX.reshape((testX.shape[0] * testX.shape[1], testX.shape[2]))
	# 标准化数据
	if standardize:
		s = StandardScaler()
		# 拟合训练数据
		s.fit(longX)
		# 适用于训练和测试数据
		longX = s.transform(longX)
		flatTrainX = s.transform(flatTrainX)
		flatTestX = s.transform(flatTestX)
	# 重塑数据
	flatTrainX = flatTrainX.reshape((trainX.shape))
	flatTestX = flatTestX.reshape((testX.shape))
	return flatTrainX, flatTestX

我们可以更新evaluate_model()函数来获取一个参数,然后使用这个参数来决定是否进行标准化。

# 拟合和评估模型
def evaluate_model(trainX, trainy, testX, testy, param):
	verbose, epochs, batch_size = 0, 10, 32
	n_timesteps, n_features, n_outputs = trainX.shape[1], trainX.shape[2], trainy.shape[1]
	# 缩放数据
	trainX, testX = scale_data(trainX, testX, param)
	model = Sequential()
	model.add(Conv1D(filters=64, kernel_size=3, activation='relu', input_shape=(n_timesteps,n_features)))
	model.add(Conv1D(filters=64, kernel_size=3, activation='relu'))
	model.add(Dropout(0.5))
	model.add(MaxPooling1D(pool_size=2))
	model.add(Flatten())
	model.add(Dense(100, activation='relu'))
	model.add(Dense(n_outputs, activation='softmax'))
	model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
	# 拟合网络
	model.fit(trainX, trainy, epochs=epochs, batch_size=batch_size, verbose=verbose)
	# 评估模型
	_, accuracy = model.evaluate(testX, testy, batch_size=batch_size, verbose=0)
	return accuracy

我们将更新summary_results()函数来汇总每个配置参数的结果样本,并创建一个箱线图来比较每个结果样本。

# 汇总得分
def summarize_results(scores, params):
	print(scores, params)
	# 汇总均值和标准差
	for i in range(len(scores)):
		m, s = mean(scores[i]), std(scores[i])
		print('Param=%d: %.3f%% (+/-%.3f)' % (params[i], m, s))
	# 绘制分数箱线图
	pyplot.boxplot(scores, labels=params)
	pyplot.savefig('exp_cnn_standardize.png')

这些更新将使我们能够直接比较之前的模型拟合结果和标准化后的模型拟合数据集的结果。

这也是一个通用更改,允许我们评估和比较以下部分中其他参数集的结果。

下面提供了完整的代码。

# 标准化的cnn模型
from numpy import mean
from numpy import std
from numpy import dstack
from pandas import read_csv
from matplotlib import pyplot
from sklearn.preprocessing import StandardScaler
from keras.models import Sequential
from keras.layers import Dense
from keras.layers import Flatten
from keras.layers import Dropout
from keras.layers.convolutional import Conv1D
from keras.layers.convolutional import MaxPooling1D
from keras.utils import to_categorical

# 将单个文件加载为 numpy 数组
def load_file(filepath):
	dataframe = read_csv(filepath, header=None, delim_whitespace=True)
	return dataframe.values

# 加载文件列表并作为 3d numpy 数组返回
def load_group(filenames, prefix=''):
	loaded = list()
	for name in filenames:
		data = load_file(prefix + name)
		loaded.append(data)
	# 堆栈组
	loaded = dstack(loaded)
	return loaded

# 加载数据集组,训练或测试
def load_dataset_group(group, prefix=''):
	filepath = prefix + group + '/Inertial Signals/'
	# 将所有 9 个文件加载为单个数组
	filenames = list()
	# total acceleration
	filenames += ['total_acc_x_'+group+'.txt', 'total_acc_y_'+group+'.txt', 'total_acc_z_'+group+'.txt']
	# body acceleration
	filenames += ['body_acc_x_'+group+'.txt', 'body_acc_y_'+group+'.txt', 'body_acc_z_'+group+'.txt']
	# body gyroscope
	filenames += ['body_gyro_x_'+group+'.txt', 'body_gyro_y_'+group+'.txt', 'body_gyro_z_'+group+'.txt']
	# 加载输入数据 
	X = load_group(filenames, filepath)
	# 加载类输出
	y = load_file(prefix + group + '/y_'+group+'.txt')
	return X, y

# 加载数据集,返回训练和测试 X 和 y 元素
def load_dataset(prefix=''):
	# 加载全部训练数据
	trainX, trainy = load_dataset_group('train', prefix + 'HARDataset/')
	print(trainX.shape, trainy.shape)
	# 加载全部测试数据
	testX, testy = load_dataset_group('test', prefix + 'HARDataset/')
	print(testX.shape, testy.shape)
	# 零偏移类值
	trainy = trainy - 1
	testy = testy - 1
	# 将 y 进行 one hot encode
	trainy = to_categorical(trainy)
	testy = to_categorical(testy)
	print(trainX.shape, trainy.shape, testX.shape, testy.shape)
	return trainX, trainy, testX, testy

# 标准化数据
def scale_data(trainX, testX, standardize):
	# 去重数据
	cut = int(trainX.shape[1] / 2)
	longX = trainX[:, -cut:, :]
	# 数据拉平
	longX = longX.reshape((longX.shape[0] * longX.shape[1], longX.shape[2]))
	# 拉平数据的训练和测试
	flatTrainX = trainX.reshape((trainX.shape[0] * trainX.shape[1], trainX.shape[2]))
	flatTestX = testX.reshape((testX.shape[0] * testX.shape[1], testX.shape[2]))
	# 标准化数据
	if standardize:
		s = StandardScaler()
		# 拟合训练数据
		s.fit(longX)
		# 应用于训练和测试数据
		longX = s.transform(longX)
		flatTrainX = s.transform(flatTrainX)
		flatTestX = s.transform(flatTestX)
	# 重塑
	flatTrainX = flatTrainX.reshape((trainX.shape))
	flatTestX = flatTestX.reshape((testX.shape))
	return flatTrainX, flatTestX

# 拟合和测试模型
def evaluate_model(trainX, trainy, testX, testy, param):
	verbose, epochs, batch_size = 0, 10, 32
	n_timesteps, n_features, n_outputs = trainX.shape[1], trainX.shape[2], trainy.shape[1]
	# 缩放数据
	trainX, testX = scale_data(trainX, testX, param)
	model = Sequential()
	model.add(Conv1D(filters=64, kernel_size=3, activation='relu', input_shape=(n_timesteps,n_features)))
	model.add(Conv1D(filters=64, kernel_size=3, activation='relu'))
	model.add(Dropout(0.5))
	model.add(MaxPooling1D(pool_size=2))
	model.add(Flatten())
	model.add(Dense(100, activation='relu'))
	model.add(Dense(n_outputs, activation='softmax'))
	model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
	# 拟合网络
	model.fit(trainX, trainy, epochs=epochs, batch_size=batch_size, verbose=verbose)
	# 测试模型
	_, accuracy = model.evaluate(testX, testy, batch_size=batch_size, verbose=0)
	return accuracy

# 汇总得分
def summarize_results(scores, params):
	print(scores, params)
	# 汇总均值和标准差
	for i in range(len(scores)):
		m, s = mean(scores[i]), std(scores[i])
		print('Param=%s: %.3f%% (+/-%.3f)' % (params[i], m, s))
	# 箱线图
	pyplot.boxplot(scores, labels=params)
	pyplot.savefig('exp_cnn_standardize.png')

# 运行一个测试
def run_experiment(params, repeats=10):
	# 加载数据
	trainX, trainy, testX, testy = load_dataset()
	# 测试每个参数
	all_scores = list()
	for p in params:
		# 重复测试
		scores = list()
		for r in range(repeats):
			score = evaluate_model(trainX, trainy, testX, testy, p)
			score = score * 100.0
			print('>p=%s #%d: %.3f' % (p, r+1, score))
			scores.append(score)
		all_scores.append(scores)
	# 汇总结果
	summarize_results(all_scores, params)

# 运行代码
n_params = [False, True]
run_experiment(n_params)

运行该示例可能需要几分钟时间,具体取决于您的硬件。

为每个评估模型打印性能。在运行结束时,将总结每个测试配置的性能,显示平均值和标准偏差。

注意:您的结果可能会因算法或评估程序的随机性或数值精度的差异而有所不同。考虑多次运行该示例并比较平均结果。

我们可以看到,在建模之前对数据集进行标准化确实看起来确实会导致性能从大约 90.4% 的准确度(接近我们在上一节中看到的)提高到大约 91.5% 的准确度。

(7352, 128, 9) (7352, 1)
(2947, 128, 9) (2947, 1)
(7352, 128, 9) (7352, 6) (2947, 128, 9) (2947, 6)

>p=False #1: 91.483
>p=False #2: 91.245
>p=False #3: 90.838
>p=False #4: 89.243
>p=False #5: 90.193
>p=False #6: 90.465
>p=False #7: 90.397
>p=False #8: 90.567
>p=False #9: 88.938
>p=False #10: 91.144
>p=True #1: 92.908
>p=True #2: 90.940
>p=True #3: 92.297
>p=True #4: 91.822
>p=True #5: 92.094
>p=True #6: 91.313
>p=True #7: 91.653
>p=True #8: 89.141
>p=True #9: 91.110
>p=True #10: 91.890

[[91.48286392941975, 91.24533423820834, 90.83814048184594, 89.24329826942655, 90.19341703427214, 90.46487953851374, 90.39701391245333, 90.56667797760434, 88.93790295215473, 91.14353579911774], [92.90804207668816, 90.93993892093654, 92.29725144214456, 91.82219205972176, 92.09365456396336, 91.31319986426874, 91.65252799457076, 89.14149983033593, 91.10960298608755, 91.89005768578215]] [False, True]

Param=False: 90.451% (+/-0.785)
Param=True: 91.517% (+/-0.965)

还创建了结果的箱线图。这允许以非参数方式比较两个结果样本,显示每个样本的中位数和中间 50%。

我们可以看到标准化后的结果分布与没有标准化的结果分布有很大的不同。这很可能是真正的效果。
在这里插入图片描述

过滤器数量

现在我们有了一个实验框架,我们可以探索模型的其他不同超参数。

CNN 的一个重要超参数是过滤器映射的数量。我们可以试验一系列不同的值,从比我们开发的第一个模型中使用的 64 少到多很多。

具体来说,我们将尝试以下数量的特征图:

n_params = [8, 16, 32, 64, 128, 256]

我们可以使用上一节中的相同代码并更新evaluate_model()函数以使用提供的参数作为Conv1D 层中的过滤器数量。我们还可以更新summary_results ()函数以将箱线图保存为exp_cnn_filters.png。

下面列出了完整的代码示例。

# 带有过滤器的 cnn 模型
from numpy import mean
from numpy import std
from numpy import dstack
from pandas import read_csv
from matplotlib import pyplot
from keras.models import Sequential
from keras.layers import Dense
from keras.layers import Flatten
from keras.layers import Dropout
from keras.layers.convolutional import Conv1D
from keras.layers.convolutional import MaxPooling1D
from keras.utils import to_categorical

# 将单个文件加载为 numpy 数组
def load_file(filepath):
	dataframe = read_csv(filepath, header=None, delim_whitespace=True)
	return dataframe.values

# 加载文件列表并作为 3d numpy 数组返回
def load_group(filenames, prefix=''):
	loaded = list()
	for name in filenames:
		data = load_file(prefix + name)
		loaded.append(data)
	# 堆栈组
	loaded = dstack(loaded)
	return loaded

# 加载数据集组,训练或测试
def load_dataset_group(group, prefix=''):
	filepath = prefix + group + '/Inertial Signals/'
	# 将所有 9 个文件加载为单个数组
	filenames = list()
	# total acceleration
	filenames += ['total_acc_x_'+group+'.txt', 'total_acc_y_'+group+'.txt', 'total_acc_z_'+group+'.txt']
	# body acceleration
	filenames += ['body_acc_x_'+group+'.txt', 'body_acc_y_'+group+'.txt', 'body_acc_z_'+group+'.txt']
	# body gyroscope
	filenames += ['body_gyro_x_'+group+'.txt', 'body_gyro_y_'+group+'.txt', 'body_gyro_z_'+group+'.txt']
	# 加载输入数据
	X = load_group(filenames, filepath)
	# 加载类输出
	y = load_file(prefix + group + '/y_'+group+'.txt')
	return X, y

# 加载数据集,返回训练和测试 X 和 y 元素
def load_dataset(prefix=''):
	# 加载全部训练数据
	trainX, trainy = load_dataset_group('train', prefix + 'HARDataset/')
	print(trainX.shape, trainy.shape)
	# 加载全部测试数据
	testX, testy = load_dataset_group('test', prefix + 'HARDataset/')
	print(testX.shape, testy.shape)
	# 零偏移类值
	trainy = trainy - 1
	testy = testy - 1
	# 将 y 转换 one hot encode
	trainy = to_categorical(trainy)
	testy = to_categorical(testy)
	print(trainX.shape, trainy.shape, testX.shape, testy.shape)
	return trainX, trainy, testX, testy

# 拟合和测试模型
def evaluate_model(trainX, trainy, testX, testy, n_filters):
	verbose, epochs, batch_size = 0, 10, 32
	n_timesteps, n_features, n_outputs = trainX.shape[1], trainX.shape[2], trainy.shape[1]
	model = Sequential()
	model.add(Conv1D(filters=n_filters, kernel_size=3, activation='relu', input_shape=(n_timesteps,n_features)))
	model.add(Conv1D(filters=n_filters, kernel_size=3, activation='relu'))
	model.add(Dropout(0.5))
	model.add(MaxPooling1D(pool_size=2))
	model.add(Flatten())
	model.add(Dense(100, activation='relu'))
	model.add(Dense(n_outputs, activation='softmax'))
	model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
	# 拟合网络
	model.fit(trainX, trainy, epochs=epochs, batch_size=batch_size, verbose=verbose)
	# 评估模型
	_, accuracy = model.evaluate(testX, testy, batch_size=batch_size, verbose=0)
	return accuracy

# 汇总得分
def summarize_results(scores, params):
	print(scores, params)
	# 汇总均值和标准差
	for i in range(len(scores)):
		m, s = mean(scores[i]), std(scores[i])
		print('Param=%d: %.3f%% (+/-%.3f)' % (params[i], m, s))
	# 得分箱线图
	pyplot.boxplot(scores, labels=params)
	pyplot.savefig('exp_cnn_filters.png')

# 进行测试
def run_experiment(params, repeats=10):
	# 加载数据
	trainX, trainy, testX, testy = load_dataset()
	# 测试每一个参数
	all_scores = list()
	for p in params:
		# 重复测试
		scores = list()
		for r in range(repeats):
			score = evaluate_model(trainX, trainy, testX, testy, p)
			score = score * 100.0
			print('>p=%d #%d: %.3f' % (p, r+1, score))
			scores.append(score)
		all_scores.append(scores)
	# 汇总结果
	summarize_results(all_scores, params)

# 进行测试
n_params = [8, 16, 32, 64, 128, 256]
run_experiment(n_params)

运行示例会对每个指定数量的过滤器重复实验。

在运行结束时,会显示每个过滤器数量的结果摘要。

注意:您的结果可能会因算法或评估程序的随机性或数值精度的差异而有所不同。考虑多次运行该示例并比较平均结果。

我们可能会看到随着过滤器映射数量的增加,平均性能增加的趋势。方差保持相当稳定,也许 128 个特征图可能是网络的一个很好的配置。

...
Param=8: 89.148% (+/-0.790)
Param=16: 90.383% (+/-0.613)
Param=32: 90.356% (+/-1.039)
Param=64: 90.098% (+/-0.615)
Param=128: 91.032% (+/-0.702)
Param=256: 90.706% (+/-0.997)

还创建了结果的箱线图,允许比较每个过滤器数量的结果分布。

从图中,我们可以看到随着特征图数量的增加,中值分类精度(框上的橙色线)呈上升趋势。我们确实看到 64 个特征图(我们实验中的默认值或基线)有所下降,这令人惊讶,并且可能在 32、128 和 256 个过滤器图上的准确度处于平稳状态。也许 32 会是一个更稳定的配置。
在这里插入图片描述

内核大小

核的大小是 1D CNN 需要调整的另一个重要超参数。

内核大小控制在输入序列的每个“读取”中考虑的时间步数,然后将其投影到特征图上(通过卷积过程)。

较大的内核大小意味着对数据的读取不那么严格,但可能会导致输入的更普遍的快照。

除了默认的三个时间步长之外,我们还可以使用相同的实验设置并测试一组不同的内核大小。完整的值列表如下:

n_params = [2, 3, 5, 7, 11]

完整的代码清单如下:

# cnn model vary kernel size
from numpy import mean
from numpy import std
from numpy import dstack
from pandas import read_csv
from matplotlib import pyplot
from keras.models import Sequential
from keras.layers import Dense
from keras.layers import Flatten
from keras.layers import Dropout
from keras.layers.convolutional import Conv1D
from keras.layers.convolutional import MaxPooling1D
from keras.utils import to_categorical

# 将单个文件加载为 numpy 数组
def load_file(filepath):
	dataframe = read_csv(filepath, header=None, delim_whitespace=True)
	return dataframe.values

# 加载文件列表并作为 3d numpy 数组返回
def load_group(filenames, prefix=''):
	loaded = list()
	for name in filenames:
		data = load_file(prefix + name)
		loaded.append(data)
	# 堆栈组
	loaded = dstack(loaded)
	return loaded

# 加载数据集组,训练或测试
def load_dataset_group(group, prefix=''):
	filepath = prefix + group + '/Inertial Signals/'
	# 将所有 9 个文件加载为单个数组
	filenames = list()
	# total acceleration
	filenames += ['total_acc_x_'+group+'.txt', 'total_acc_y_'+group+'.txt', 'total_acc_z_'+group+'.txt']
	# body acceleration
	filenames += ['body_acc_x_'+group+'.txt', 'body_acc_y_'+group+'.txt', 'body_acc_z_'+group+'.txt']
	# body gyroscope
	filenames += ['body_gyro_x_'+group+'.txt', 'body_gyro_y_'+group+'.txt', 'body_gyro_z_'+group+'.txt']
	# 加载输入数据
	X = load_group(filenames, filepath)
	# 加载类输出
	y = load_file(prefix + group + '/y_'+group+'.txt')
	return X, y

# 加载数据集,返回训练和测试 X 和 y 元素
def load_dataset(prefix=''):
	# 加载全部训练数据
	trainX, trainy = load_dataset_group('train', prefix + 'HARDataset/')
	print(trainX.shape, trainy.shape)
	# 加载全部测试数据
	testX, testy = load_dataset_group('test', prefix + 'HARDataset/')
	print(testX.shape, testy.shape)
	# 零偏移类值
	trainy = trainy - 1
	testy = testy - 1
	# 将 y 转换 one hot encode
	trainy = to_categorical(trainy)
	testy = to_categorical(testy)
	print(trainX.shape, trainy.shape, testX.shape, testy.shape)
	return trainX, trainy, testX, testy

# 拟合和测试模型
def evaluate_model(trainX, trainy, testX, testy, n_kernel):
	verbose, epochs, batch_size = 0, 15, 32
	n_timesteps, n_features, n_outputs = trainX.shape[1], trainX.shape[2], trainy.shape[1]
	model = Sequential()
	model.add(Conv1D(filters=64, kernel_size=n_kernel, activation='relu', input_shape=(n_timesteps,n_features)))
	model.add(Conv1D(filters=64, kernel_size=n_kernel, activation='relu'))
	model.add(Dropout(0.5))
	model.add(MaxPooling1D(pool_size=2))
	model.add(Flatten())
	model.add(Dense(100, activation='relu'))
	model.add(Dense(n_outputs, activation='softmax'))
	model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
	# 拟合网络
	model.fit(trainX, trainy, epochs=epochs, batch_size=batch_size, verbose=verbose)
	# 评估模型
	_, accuracy = model.evaluate(testX, testy, batch_size=batch_size, verbose=0)
	return accuracy

# 汇总得分
def summarize_results(scores, params):
	print(scores, params)
	# 汇总均值和标准差
	for i in range(len(scores)):
		m, s = mean(scores[i]), std(scores[i])
		print('Param=%d: %.3f%% (+/-%.3f)' % (params[i], m, s))
	# 得分箱线图
	pyplot.boxplot(scores, labels=params)
	pyplot.savefig('exp_cnn_kernel.png')

# 进行测试
def run_experiment(params, repeats=10):
	# 加载数据
	trainX, trainy, testX, testy = load_dataset()
	# 测试每一个参数
	all_scores = list()
	for p in params:
		# 重复测试
		scores = list()
		for r in range(repeats):
			score = evaluate_model(trainX, trainy, testX, testy, p)
			score = score * 100.0
			print('>p=%d #%d: %.3f' % (p, r+1, score))
			scores.append(score)
		all_scores.append(scores)
	# 汇总结果
	summarize_results(all_scores, params)

# 进行测试
n_params = [2, 3, 5, 7, 11]
run_experiment(n_params)

运行示例依次测试每个内核大小。

注意:您的结果可能会因算法或评估程序的随机性或数值精度的差异而有所不同。考虑多次运行该示例并比较平均结果。

结果在运行结束时汇总。我们可以看到模型性能随着内核大小的增加而普遍提高。

结果表明,内核大小为 5 可能不错,平均技能约为 91.8%,但也许大小为 7 或 11 的内核大小也可能与较小的标准偏差一样好。

...
Param=2: 90.176% (+/-0.724)
Param=3: 90.275% (+/-1.277)
Param=5: 91.853% (+/-1.249)
Param=7: 91.347% (+/-0.852)
Param=11: 91.456% (+/-0.743)

还创建了结果的箱线图。

结果表明,较大的内核大小似乎确实会带来更好的准确性,并且内核大小可能为 7,从而在良好的性能和低方差之间取得了良好的平衡。

在这里插入图片描述

这只是调整模型的开始,尽管我们已经关注了可能更重要的元素。探索上述一些发现的组合以查看性能是否可以进一步提升可能会很有趣。

将重复次数从 10 次增加到 30 次或更多以查看是否会产生更稳定的结果也可能很有趣。

多头卷积神经网络

CNN 的另一种流行方法是使用多头模型,其中模型的每个头使用不同大小的内核读取输入时间步长。

例如,三头模型可能具有 3、5、11 三个不同的内核大小,从而允许模型以三种不同的分辨率读取和解释序列数据。然后将来自所有三个头的解释连接到模型中,并在做出预测之前由全连接层进行解释。

我们可以使用 Keras 函数式 API 实现多头 1D CNN。有关此 API 的简要介绍,请参阅帖子:

如何使用 Keras Functional API 进行深度学习
下面列出了evaluate_model()函数的更新版本,它创建了一个三层 CNN 模型。

我们可以看到模型的每个头部都是相同的结构,尽管内核大小不同。然后在进行预测之前被解释之前,三个头进入单个合并层。

# 拟合和测试模型
def evaluate_model(trainX, trainy, testX, testy):
	verbose, epochs, batch_size = 0, 10, 32
	n_timesteps, n_features, n_outputs = trainX.shape[1], trainX.shape[2], trainy.shape[1]
 	# 层 1
	inputs1 = Input(shape=(n_timesteps,n_features))
	conv1 = Conv1D(filters=64, kernel_size=3, activation='relu')(inputs1)
	drop1 = Dropout(0.5)(conv1)
	pool1 = MaxPooling1D(pool_size=2)(drop1)
	flat1 = Flatten()(pool1)
	# 层 2
	inputs2 = Input(shape=(n_timesteps,n_features))
	conv2 = Conv1D(filters=64, kernel_size=5, activation='relu')(inputs2)
	drop2 = Dropout(0.5)(conv2)
	pool2 = MaxPooling1D(pool_size=2)(drop2)
	flat2 = Flatten()(pool2)
	# 层 3
	inputs3 = Input(shape=(n_timesteps,n_features))
	conv3 = Conv1D(filters=64, kernel_size=11, activation='relu')(inputs3)
	drop3 = Dropout(0.5)(conv3)
	pool3 = MaxPooling1D(pool_size=2)(drop3)
	flat3 = Flatten()(pool3)
	# 拼接
	merged = concatenate([flat1, flat2, flat3])
	# 激活函数
	dense1 = Dense(100, activation='relu')(merged)
	outputs = Dense(n_outputs, activation='softmax')(dense1)
	model = Model(inputs=[inputs1, inputs2, inputs3], outputs=outputs)
	# 模型保存到图片
	plot_model(model, show_shapes=True, to_file='multichannel.png')
	model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
	# 拟合网络
	model.fit([trainX,trainX,trainX], trainy, epochs=epochs, batch_size=batch_size, verbose=verbose)
	# 评估模型
	_, accuracy = model.evaluate([testX,testX,testX], testy, batch_size=batch_size, verbose=0)
	return accuracy

创建模型时,会创建网络架构图;下面提供,它清楚地说明了构建的模型如何组合在一起。

在这里插入图片描述

模型的其他方面可能会因头而异,例如过滤器的数量甚至数据本身的准备。

下面列出了多头 1D CNN 的完整代码示例。

# multi-headed cnn model
from numpy import mean
from numpy import std
from numpy import dstack
from pandas import read_csv
from matplotlib import pyplot
from keras.utils import to_categorical
from keras.utils.vis_utils import plot_model
from keras.models import Model
from keras.layers import Input
from keras.layers import Dense
from keras.layers import Flatten
from keras.layers import Dropout
from keras.layers.convolutional import Conv1D
from keras.layers.convolutional import MaxPooling1D
from keras.layers.merge import concatenate

# 将单个文件加载为 numpy 数组
def load_file(filepath):
	dataframe = read_csv(filepath, header=None, delim_whitespace=True)
	return dataframe.values

# 加载文件列表并作为 3d numpy 数组返回
def load_group(filenames, prefix=''):
	loaded = list()
	for name in filenames:
		data = load_file(prefix + name)
		loaded.append(data)
	# 堆栈组
	loaded = dstack(loaded)
	return loaded

# 加载数据集组,训练或测试
def load_dataset_group(group, prefix=''):
	filepath = prefix + group + '/Inertial Signals/'
	# 将所有 9 个文件加载为单个数组
	filenames = list()
	# total acceleration
	filenames += ['total_acc_x_'+group+'.txt', 'total_acc_y_'+group+'.txt', 'total_acc_z_'+group+'.txt']
	# body acceleration
	filenames += ['body_acc_x_'+group+'.txt', 'body_acc_y_'+group+'.txt', 'body_acc_z_'+group+'.txt']
	# body gyroscope
	filenames += ['body_gyro_x_'+group+'.txt', 'body_gyro_y_'+group+'.txt', 'body_gyro_z_'+group+'.txt']
	# 加载输入数据
	X = load_group(filenames, filepath)
	# 加载类输出
	y = load_file(prefix + group + '/y_'+group+'.txt')
	return X, y

# 加载数据集,返回训练和测试 X 和 y 元素
def load_dataset(prefix=''):
	# 加载全部训练数据
	trainX, trainy = load_dataset_group('train', prefix + 'HARDataset/')
	print(trainX.shape, trainy.shape)
	# 加载全部测试数据
	testX, testy = load_dataset_group('test', prefix + 'HARDataset/')
	print(testX.shape, testy.shape)
	# 零偏移类值
	trainy = trainy - 1
	testy = testy - 1
	# 将 y 转换 one hot encode
	trainy = to_categorical(trainy)
	testy = to_categorical(testy)
	print(trainX.shape, trainy.shape, testX.shape, testy.shape)
	return trainX, trainy, testX, testy

# 拟合和测试模型
def evaluate_model(trainX, trainy, testX, testy):
	verbose, epochs, batch_size = 0, 10, 32
	n_timesteps, n_features, n_outputs = trainX.shape[1], trainX.shape[2], trainy.shape[1]
 	# 层 1
	inputs1 = Input(shape=(n_timesteps,n_features))
	conv1 = Conv1D(filters=64, kernel_size=3, activation='relu')(inputs1)
	drop1 = Dropout(0.5)(conv1)
	pool1 = MaxPooling1D(pool_size=2)(drop1)
	flat1 = Flatten()(pool1)
	# 层 2
	inputs2 = Input(shape=(n_timesteps,n_features))
	conv2 = Conv1D(filters=64, kernel_size=5, activation='relu')(inputs2)
	drop2 = Dropout(0.5)(conv2)
	pool2 = MaxPooling1D(pool_size=2)(drop2)
	flat2 = Flatten()(pool2)
	# 层 3
	inputs3 = Input(shape=(n_timesteps,n_features))
	conv3 = Conv1D(filters=64, kernel_size=11, activation='relu')(inputs3)
	drop3 = Dropout(0.5)(conv3)
	pool3 = MaxPooling1D(pool_size=2)(drop3)
	flat3 = Flatten()(pool3)
	# 拼接
	merged = concatenate([flat1, flat2, flat3])
	# 激活函数
	dense1 = Dense(100, activation='relu')(merged)
	outputs = Dense(n_outputs, activation='softmax')(dense1)
	model = Model(inputs=[inputs1, inputs2, inputs3], outputs=outputs)
	# 模型保存到图片
	plot_model(model, show_shapes=True, to_file='multichannel.png')
	model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
	# 拟合网络
	model.fit([trainX,trainX,trainX], trainy, epochs=epochs, batch_size=batch_size, verbose=verbose)
	# 评估模型
	_, accuracy = model.evaluate([testX,testX,testX], testy, batch_size=batch_size, verbose=0)
	return accuracy

# 汇总得分
def summarize_results(scores):
	print(scores)
	m, s = mean(scores), std(scores)
	print('Accuracy: %.3f%% (+/-%.3f)' % (m, s))

# 进行测试
def run_experiment(repeats=10):
	# 加载数据
	trainX, trainy, testX, testy = load_dataset()
	# 重复测试
	scores = list()
	for r in range(repeats):
		score = evaluate_model(trainX, trainy, testX, testy)
		score = score * 100.0
		print('>#%d: %.3f' % (r+1, score))
		scores.append(score)
	# 汇总结果
	summarize_results(scores)

# 进行测试
run_experiment()

运行该示例会在每次重复实验时打印模型的性能,然后将估计分数总结为平均值和标准差,就像我们在第一种情况下使用简单的 1D CNN 所做的那样。

注意:您的结果可能会因算法或评估程序的随机性或数值精度的差异而有所不同。考虑多次运行该示例并比较平均结果。

我们可以看到,该模型的平均性能约为 91.6% 的分类准确率,标准差约为 0.8。

这个例子可以用作探索各种其他模型的基础,这些模型在输入头上改变不同的模型超参数甚至不同的数据准备方案。

考虑到该模型中资源的相对增加三倍,将这个结果与单头 CNN 进行比较并不是一个苹果对苹果的比较。也许一个苹果对苹果的比较将是一个模型,该模型具有相同的架构,并且模型的每个输入头具有相同数量的过滤器。

>#1: 91.788
>#2: 92.942
>#3: 91.551
>#4: 91.415
>#5: 90.974
>#6: 91.992
>#7: 92.162
>#8: 89.888
>#9: 92.671
>#10: 91.415

[91.78825924669155, 92.94197488971835, 91.55072955548015, 91.41499830335935, 90.97387173396675, 91.99185612487275, 92.16152019002375, 89.88802171700034, 92.67051238547675, 91.41499830335935]

Accuracy: 91.680% (+/-0.823)

一些拓展的想法

  • 日期准备,探索其他数据准备方案,例如数据标准化和标准化后的标准化。
  • 网络架构,探索其他网络架构,例如更深的 CNN 架构和更深的全连接层,用于解释 CNN 输入特征。
  • 诊断,使用简单的学习曲线诊断来解释模型如何在 epoch 中学习,以及更多的正则化、不同的学习率或不同的批次大小或 epoch 数是否会导致模型性能更好或更稳定。
举报

相关推荐

0 条评论