0
点赞
收藏
分享

微信扫一扫

从零实现深度学习框架——优化反向传播相关代码


引言

本着“凡我不能创造的,我就不能理解”的思想,​​本系列​​文章会基于纯Python以及NumPy从零创建自己的深度学习框架,该框架类似PyTorch能实现自动求导。

要深入理解深度学习,从零开始创建的经验非常重要,从自己可以理解的角度出发,尽量不适用外部完备的框架前提下,实现我们想要的模型。​​本系列​​​文章的宗旨就是通过这样的过程,让大家切实掌握深度学习底层实现,而不是仅做一个调包侠。
本系列文章首发于微信公众号:JavaNLP

在​​前面的文章​​中,我们实现了反向传播的模式。并实现了加法和乘法的计算图。但是这种实现方式有一些弊端,本文就来优化实现反向传播模式的代码,同时修复加法和乘法计算图实现的问题。

优化反向传播代码

在上篇文章中,我们将​​_Function​​​与​​Tensor​​​放到了同一个文件中,这不符合单一职责模式。同时在实现​​Tensor​​的加法和乘法时,我们需要手动添加很多代码,这也不优雅。

def __add__(self, other):
ctx = Add(self, ensure_tensor(other))
return ctx.apply(ctx, self, ensure_tensor(other))

def __mul__(self, other):
ctx = Mul(self, ensure_tensor(other))
return ctx.apply(ctx, self, ensure_tensor(other))

首先,我们把与​​_Function​​​相关的代码移动到新文件​​ops.py​​中:

from typing import Any
import numpy as np

from core.tensor import Tensor

'''
ops.py保存所有运算操作相关的类
'''


class _Function:
def __init__(self, *tensors: "Tensor") -> None:
# 该操作所依赖的所有输入
self.depends_on = [t for t in tensors]
# 保存需要在backward()中使用的Tensor或其他对象(如Shape)
self.saved_tensors = []

def __new__(cls, *args, **kwargs):
'''__new__是静态方法,当该类被实例化时调用'''
# 把以下方法转换为静态方法,我们可以通过类名直接调用
cls.forward = staticmethod(cls.forward)
cls.backward = staticmethod(cls.backward)
cls.apply = staticmethod(cls.apply) # 新增

return super().__new__(cls)

def save_for_backward(ctx, *x: Any) -> None:
ctx.saved_tensors.extend(x)

def forward(ctx, *args: Any, **kwargs: Any) -> np.ndarray:
'''前向传播,进行真正运算的地方'''
raise NotImplementedError("You must implement the forward function for custom Function.")

def backward(ctx, grad: Any) -> Any:
'''实现反向传播,计算梯度'''
raise NotImplementedError("You must implement the backward method for your custom Function "
"to use it with backward mode AD.")

def apply(fxn, *xs: "Tensor", **kwargs) -> "Tensor":
'''与PyTorch一样,我们也不直接调用forward,而是调用此方法'''
# 先调用构造函数,传入运算依赖的Tensor
ctx = fxn(*xs) # 调用到了_Function的__init__方法
# [t.data for t in xs]遍历Tensor中的data(np.ndarray)值,参与实际计算的都是NumPy的数组。
ret = Tensor(ctx.forward(ctx, *[t.data for t in xs], **kwargs),
requires_grad=any([t.requires_grad for t in xs]))

if ret.requires_grad:
ret._ctx = ctx

return ret


class Add(_Function):

def forward(ctx, x: np.ndarray, y: np.ndarray) -> np.ndarray:
'''
实现 z = x + y ,我们这里的x和y都是Numpy数组,因此可能发生广播,
在实现反向传播是需要注意
'''
# 我们只要保存输入各自的形状即可
ctx.save_for_backward(x.shape, y.shape)
# 进行真正的运算
return x + y

def backward(ctx, grad: Any) -> Any:
# 输入有两个,都是需要计算梯度的,因此输出也是两个
return grad, grad


class Mul(_Function):

def forward(ctx, x: np.ndarray, y: np.ndarray) -> np.ndarray:
'''
实现 z = x * y
'''
# 乘法需要保存输入x和y,用于反向传播
ctx.save_for_backward(x, y)
return x * y

def backward(ctx, grad: Any) -> Any:
x, y = ctx.saved_tensors
# 分别返回∂L/∂x 和 ∂L/∂y
return grad * y, grad *

同时修改​​apply​​方法为:

def apply(fxn, *xs: "Tensor", **kwargs) -> "Tensor":
'''与PyTorch一样,我们也不直接调用forward,而是调用此方法'''
# 先调用构造函数,传入运算依赖的Tensor
ctx = fxn(*xs) # 调用到了_Function的__init__方法
# [t.data for t in xs]遍历Tensor中的data(np.ndarray)值,参与实际计算的都是NumPy的数组。
ret = Tensor(ctx.forward(ctx, *[t.data for t in xs], **kwargs),
requires_grad=any([t.requires_grad for t in xs]))

if ret.requires_grad:
ret._ctx = ctx

return

将该方法改为静态方法,同时增加了​​ctx = fxn(*xs)​​​这一句,在该方法实例化​​Function​​对象,传入该运算所依赖的输入。

为了避免我们手动添加​​__add__​​​、​​__mul_​​​这些实现。我们利用​​inspect​​类去自动注册相应的魔法方法。

def register(name, fxn):
print(f"register {name} : {fxn}")

def dispatch(*xs, **kwargs):
# 把所有的输入都转换为Tensor
xs = [ensure_tensor(x) for x in xs]
# 调用apply方法
return fxn.apply(fxn, *xs, **kwargs)

# 为Tensor添加属性,名为name,值为dispatch函数引用
setattr(Tensor, name, dispatch)

# 这几个方法都有__xx__, __ixx__, __rxx__ 魔法方法
if name in ["add", "sub", "mul", "matmul"]:
setattr(Tensor, f"__{name}__", dispatch)
setattr(
Tensor, f"__i{name}__", lambda self, x: self.assign(dispatch(self, x))
) # __i*__ 代表原地操作
setattr(
Tensor, f"__r{name}__", lambda self, x: dispatch(x, self)
) # __r*__ 代表 other在操作符前, self在操作符后


def _register_ops(namespace):
for name, cls in inspect.getmembers(namespace, inspect.isclass):
if name[0] != "_" and name != 'Tensor':
# 注册所有_Function的子类
register(name.lower(), cls)


try:
_register_ops(importlib.import_module("core.ops"))
except ImportError as e:
print(e)

此时当我们初始化​​Tensor​​的时候,它会打印:

register add : <class 'core.ops.Add'>
register mul : <class 'core.ops.Mul'>

比如对于​​add​​​,这段代码会把​​__add__​​​、​​__iadd__​​​、​​__radd__​​​和​​add​​​绑定到其内部的​​dispatch​​方法。

该方法主要做了两件事,第一,统一把所有的输入转换为​​Tensor​​​;第二,调用​​apply​​静态方法。

优化完了之后,我们得试一下还能正常使用么。

但是,这次博主不想写一个​​main​​方法了,而是写一些测试用例。并且,以后所有的代码提交都走PR,利用github的action机制,只有测试通过的PR,才能合入主分支。

编写测试用例

用一种比较简单的方法,就是创建以​​test​​​开头的文件,同时里面的函数也是以​​test​​开头,idea会自动识别为测试用例,如下图所示:

从零实现深度学习框架——优化反向传播相关代码_github

我们分别测试标量的加法、同shape向量的加法以及广播情况下向量的加法。

from core.tensor import Tensor
import numpy as np


def test_simple_add():
x = Tensor(1, requires_grad=True)
y = 2
z = x + y
z.backward()
assert x.grad.data == 1.0


def test_array_add():
x = Tensor([1, 2, 3], requires_grad=True)
y = Tensor([4, 5, 6], requires_grad=True)

z = x + y
assert z.data.tolist() == [5., 7., 9.]

# 如果
z.backward([1, 1, 1])

assert x.grad.data.tolist() == [1, 1, 1]
assert y.grad.data.tolist() == [1, 1, 1]

x += 1
assert x.grad is None
assert x.data.tolist() == [2, 3, 4]


def test_broadcast_add():
"""
测试当发生广播时,我们的代码还能表现正常吗。
对于 z = x + y
如果x.shape == y.shape,那么就像上面的例子一样,没什么问题。
如果x.shape == (2,3) y.shape == (3,) 那么,根据广播,先会在y左边插入一个维度1,变成 -> y.shape == (1,3)
接着,在第0个维度上进行复制,使得新的维度 y.shape == (2,3)
这样的话,对x求梯度时,梯度要和x的shape保持一致;对y求梯度时,也要和y的shape保持一致。
"""
x = Tensor(np.random.randn(2, 3), requires_grad=True) # (2,3)
y = Tensor(np.random.randn(3), requires_grad=True) # (3,)

z = x + y # (2,3)

z.backward(Tensor(np.ones_like(x.data))) # grad.shape == z.shape

assert x.grad.data.tolist() == np.ones_like(x.data).tolist()
assert y.grad.data.tolist == [2, 2]

分别执行每一个测试用例,第一个没有问题:

test_add.py::test_simple_add PASSED                                      [100%]

第二个测试方法报错了:

>           grads = t._ctx.backward(t._ctx, t.grad.data)
E AttributeError: 'list' object has no attribute 'data'

../../core/tensor.py:177: AttributeError


============================== 1 failed in 0.38s ===============================

哦,我们要确保​​backward()​​​方法传入的​​grad​​​为​​Tensor​​对象。

所以,我们修改下对应的​​backward()​​代码:

# 如果传递过来的grad为空
if grad is None:
if self.shape == ():
# 设置梯度值为1,grad本身不需要计算梯度
self._grad = Tensor(1)
else:
# 如果当前Tensor得到不是标量,那么grad必须制定
raise RuntimeError("grad must be specified for non scalar")
else:
self._grad = ensure_tensor(grad)

此时它也通过了:

test_add.py::test_array_add PASSED                                       [100%]

第三个测试方法又没通过:

# t.shape要和grad.shape保持一致
> assert t.shape == g.shape, f"grad shape must match tensor shape in {self._ctx!r}, {g.shape!r} != {t.shape!r}"
E AssertionError: grad shape must match tensor shape in <core.ops.Add object at 0x7fcda8b31c10>, (2, 3) != (3,)

说的是,梯度形状不一致的问题。我们知道,梯度的形状要和输入保持一致。

对于 ​​z = x + y​​​,如果​​x.shape == y.shape​​,那么就像上面的例子一样,没什么问题;

如果​​x.shape == (2,3) y.shape == (3,)​​​ 那么,根据广播,先会在y左边插入一个维度1,变成 -> ​​y.shape == (1,3)​​​,接着,在第0个维度上进行复制,使得新的维度 ​​y.shape == (2,3)​​​。这样的话,对​​x​​​求梯度时,梯度要和​​x​​​的​​shape​​​保持一致;对​​y​​​求梯度时,也要和​​y​​​的​​shape​​保持一致。

修复广播带来的问题

由于要保证梯度的维度和输入的维度一致,而最后得到的梯度是经过了广播操作的。所以,我们要实现广播操作的逆操作:

def unbroadcast(grad: np.ndarray, in_shape: Tuple) -> np.ndarray:
'''
广播操作的逆操作,确保grad转换成in_shape的形状
Args:
grad: 梯度
in_shape: 梯度要转换的形状
Returns:
'''
# 首先计算维度个数之差
ndims_added = grad.ndim - len(in_shape)
# 由于广播时,先从左边插入,再进行复制,所以逆操作时,也从左边开始,进行复制的逆操作(求和)
for _ in range(ndims_added):
# 在axis=0上进行求和,去掉第0个维度,如果ndims_added > 1,就需要不停的在第0个维度上面求和
grad = grad.sum(axis=0)

return

这样,假设输入的维度是从零实现深度学习框架——优化反向传播相关代码_人工智能_02,梯度的维度从零实现深度学习框架——优化反向传播相关代码_pytorch_03。那么上面的代码,首先计算出维度个数差值为从零实现深度学习框架——优化反向传播相关代码_python_04

然后​​grad.sum(axis=0)​​​,把梯度的维度从零实现深度学习框架——优化反向传播相关代码_github_05。此时刚好和输入的维度一致。我们的这个测试用例应该可以跑通。

test_add.py::test_broadcast_add PASSED                                   [100%]

我们再写一个测试方法:

def test_broadcast_add2():
x = Tensor(np.random.randn(2, 3), requires_grad=True) # (2,3)
y = Tensor(np.random.randn(1, 3), requires_grad=True) # (1,3)

z = x + y # (2,3)

z.backward(Tensor(np.ones_like(x.data))) # grad.shape == z.shape

assert x.grad.data.tolist() == np.ones_like(x.data).tolist()
assert y.grad.data.tolist() == (np.ones_like(y.data) * 2).tolist()

然后跑跑看:

>                   assert t.shape == g.shape, f"grad shape must match tensor shape in {self._ctx!r}, {g.shape!r} != {t.shape!r}"
E AssertionError: grad shape must match tensor shape in <core.ops.Add object at 0x000002489FCC85B0>, (2, 3) != (1, 3)

..\..\core\tensor.py:190: AssertionError

🥺 又没有跑通。说是​​(2, 3) != (1, 3)​​​。所以,我们不仅要比较维度个数的差值,还要看维度是否含有从零实现深度学习框架——优化反向传播相关代码_python_04

在​​理解广播和常见的乘法​​中,我们知道广播时的计算规律为:

首先让所有输入数组都向其中形状最长的数组看齐,形状中不足的部分都通过在维度左边加 1 补齐,然后比较对应维度值,需要满足:

  • 它们是相等的
  • 其他一个为1

所以我们也要考虑这种维度为从零实现深度学习框架——优化反向传播相关代码_python_04的情况。更改后的代码为:

def unbroadcast(grad: np.ndarray, in_shape: Tuple) -> np.ndarray:
'''
广播操作的逆操作,确保grad转换成in_shape的形状
Args:
grad: 梯度
in_shape: 梯度要转换的形状
Returns:
'''
# 首先计算维度个数之差
ndims_added = grad.ndim - len(in_shape)
# 由于广播时,先从左边插入,再进行复制,所以逆操作时,也从左边开始,进行复制的逆操作(求和)
for _ in range(ndims_added):
# 在axis=0上进行求和,去掉第0个维度,如果ndims_added > 1,就需要不停的在第0个维度上面求和
grad = grad.sum(axis=0)

# 处理 (2,3) + (1,3) => (2,3) grad的情况
# 看in_shape中有没有维度=1的情况
for i, dim in enumerate(in_shape):
if dim == 1:
# 那么需要在该axis上求和,并且保持维度 这里(2,3) => (1,3) grad 就和输入维度保持一致了
grad = grad.sum(axis=i, keepdims=True)

return

我们在跑一下测试用例:

test_add.py::test_broadcast_add2 PASSED                                  [100%]

至此,加法运算反向传播广播带来的问题解决了。

上文说过,我们需要先写一些测试用例。然后代码提交都走PR,利用github的action机制,只有测试通过的PR,才能合入主分支。

我们先来通过github的action机制,在代码提交时自动跑所有的测试用例。

利用Github来进行自动测试

本文不会过多讨论Github action实现细节,感兴趣的可以查询Github官方文档。

首先在项目根目录下创建目录​​.github/workflows​​,然后添加以下文件。

从零实现深度学习框架——优化反向传播相关代码_github_08

​test.yaml​​:

name: Unit Test Pipeline

# 当在这些分支上提交时,执行这个workflow
on:
push:
branches:
- '!master' # 排除master,在其他分支提交代码时,需要进行测试
- '*'

# 一个workflow由一个或多个job组成
jobs:
# 此workflow包含一个job,叫作test
test:
# 会在github提供的ubuntu系统上运行测试代码
runs-on: ubuntu-latest
strategy:
matrix:
python-version: [3.8, 3.9] # 同时在这两个版本的python上测试
# Steps represent a sequence of tasks that will be executed as part of the job
steps:
# 首先下载代码
- uses: actions/checkout@v2
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v2
with:
python-version: ${{ matrix.python-version }} # 指定python版本
- name: Install dependencies
run: |
python3 -m pip install --upgrade pip
pip install pytest
pip install -r requirements.txt
- name: Run unit tests # 跑测试
run: |
python3 -m pytest

​requirements.txt​​:

numpy==1.20.1

然后我们就创建一个分支,并且提交本文的相关改动,看能否触发以及通过Github的测试。

从零实现深度学习框架——优化反向传播相关代码_python_09

编写乘法测试用例

import numpy as np

from core.tensor import Tensor


def test_simple_mul():
'''
测试简单的乘法
'''
x = Tensor(1, requires_grad=True)
y = 2
z = x * y
z.backward()
assert x.grad.data == 2.0


def test_array_mul():
'''
测试两个同shape的向量乘法
'''
x = Tensor([1, 2, 3], requires_grad=True)
y = Tensor([4, 5, 6], requires_grad=True)

z = x * y

# 对应元素相乘
assert z.data.tolist() == [4, 10, 18]

z.backward(Tensor([1, 1, 1]))

assert x.grad._data.tolist() == y.data.tolist()
assert y.grad._data.tolist() == x.data.tolist()

x *= 0.1
assert x.grad is None

# assert [0.10000000149011612, 0.20000000298023224, 0.30000001192092896] == [0.1, 0.2, 0.3]
# assert x.data.tolist() == [0.1, 0.2, 0.3]
# 需要用近似相等来判断
np.testing.assert_array_almost_equal(x.data, [0.1, 0.2, 0.3])


def test_broadcast_mul():
x = Tensor([[1, 2, 3], [4, 5, 6]], requires_grad=True) # (2, 3)
y = Tensor([7, 8, 9], requires_grad=True) # (3, )

z = x * y # (2,3) * (3,) => (2,3) * (2,3) -> (2,3)

assert z.data.tolist() == [[7, 16, 27], [28, 40, 54]]

z.backward(Tensor([[1, 1, 1, ], [1, 1, 1]]))

assert x.grad.data.tolist() == [[7, 8, 9], [7, 8, 9]]
assert y.grad.data.tolist() == [5, 7, 9]

E                   AssertionError: grad shape must match tensor shape in <core.ops.Mul object at 0x00000202B9F73DF0>, (2, 3) != (3,)

类似加法,对于乘法我们也要处理广播导致的梯度维度问题。然后再进行测试:

test_mul.py::test_broadcast_mul PASSED                                   [100%]

总结

本文我们优化了​​Tensor​​反向传播的代码,同时修复了加法和乘法在发生广播时遇到的一些问题。由于我们的代码托管在Github上,所以可以利用GitHub的一些机制进行自动测试。

下篇文章就可以根据计算图实现其他运算的代码了。


举报

相关推荐

0 条评论