
介绍
在本系列之前,我们研究了创建自己的自定义Native Container并添加对功能的支持,例如在Job完成时解除分配以及添加并行Job支持的多种方法。在这一部分中,我们将研究另一种使用 [NativeSetThreadIndex] 添加对并行写入的支持的方法。
本文不会使用以前文章中的代码,而是实现一个全新的Native Container。因此,我们假设你已经了解了如何做这件事。如果没有,你可以回去阅读本系列的前几篇文章。
1) NativeSummedFloat3 Setup
我们要实现的容器叫做NativeSummedFloat3。这个容器持有一个float3,但允许多个线程并行地添加到其中。例如,在计算一大组实体的平均位置时,这可能很有用。
在下面的代码中,我们为我们的自定义容器做了所有的基本设置。但值得注意的是我们分配的内存量。我们将为每个工作线程分配一个缓存行。这使我们能够使我们的容器线程安全。通过让每个线程写到它自己的那部分内存,即缓存行,永远不会有多个线程写到同一个内存。它还允许更好的缓存访问,从而使性能得到优化。缺点是,我们将分配大量的内存(从Job开始,总共有8Kb)。我们不能给每个工作线程分配少于一个缓存行的内存,因为CPU在访问数据时总是会加载一个缓存行。
using System;
using System.Runtime.InteropServices;
using Unity.Burst;
using Unity.Collections;
using Unity.Collections.LowLevel.Unsafe;
using Unity.Jobs;
using Unity.Jobs.LowLevel.Unsafe;
using Unity.Mathematics;
[NativeContainer]
[NativeContainerSupportsDeallocateOnJobCompletion]
[StructLayout(LayoutKind.Sequential)]
public unsafe struct NativeSummedFloat3 : IDisposable
{
[NativeDisableUnsafePtrRestriction] internal void* m_Buffer;
#if ENABLE_UNITY_COLLECTIONS_CHECKS
internal AtomicSafetyHandle m_Safety;
[NativeSetClassTypeToNullOnSchedule] internal DisposeSentinel m_DisposeSentinel;
#endif
internal Allocator m_AllocatorLabel;
public NativeSummedFloat3(Allocator allocator)
{
// Safety checks
#if ENABLE_UNITY_COLLECTIONS_CHECKS
if (allocator <= Allocator.None)
throw new ArgumentException("Allocator must be Temp, TempJob or Persistent", nameof(allocator));
// 在处理通用容器和缓存行时,你可能还想进行其他检查。
/*
if (!UnsafeUtility.IsBlittable<T>())
throw new ArgumentException(string.Format("{0} used in NativeValue<{0}> must be blittable", typeof(T)));
if (UnsafeUtility.SizeOf<T>() > JobsUtility.CacheLineSize)
throw new ArgumentException(string.Format("{0} used in NativeValue<{0}> had a size of {1} which is greater than the maximum size of {2}", typeof(T), UnsafeUtility.SizeOf<T>(), JobsUtility.CacheLineSize));
*/
DisposeSentinel.Create(out m_Safety, out m_DisposeSentinel, 0, allocator);
#endif
// 为每个工作线程分配一个缓存行。
m_Buffer = UnsafeUtility.Malloc(JobsUtility.CacheLineSize * JobsUtility.MaxJobThreadCount, JobsUtility.CacheLineSize, allocator);
m_AllocatorLabel = allocator;
Value = float3.zero;
}
//允许将 NativeSummedFloat3 转换为 float3。
public static implicit operator float3(NativeSummedFloat3 value) { return value.Value; }
/*
* ... Next Code ...
*/
让我们把所有枯燥的代码拿出来,立即加入我们的NativeSummedFloat3结构的末端部分。同样,关于这段代码如何工作的更多信息可以在本系列的前几部分找到。
/*
* ... Previous Code ...
*/
[WriteAccessRequired]
public void Dispose()
{
#if ENABLE_UNITY_COLLECTIONS_CHECKS
if (!UnsafeUtility.IsValidAllocator(m_AllocatorLabel))
throw new InvalidOperationException("The NativeSummedFloat3 can not be Disposed because it was not allocated with a valid allocator.");
DisposeSentinel.Dispose(ref m_Safety, ref m_DisposeSentinel);
#endif
// 释放分配的内存并重置我们的变量。
UnsafeUtility.Free(m_Buffer, m_AllocatorLabel);
m_Buffer = null;
}
public unsafe JobHandle Dispose(JobHandle inputDeps)
{
#if ENABLE_UNITY_COLLECTIONS_CHECKS
if (!UnsafeUtility.IsValidAllocator(m_AllocatorLabel))
throw new InvalidOperationException("The NativeSummedFloat3 can not be Disposed because it was not allocated with a valid allocator.");
// 需要在主线程上清除 DisposeSentinel。
DisposeSentinel.Clear(ref m_DisposeSentinel);
#endif
NativeSummedFloat3DisposeJob disposeJob = new NativeSummedFloat3DisposeJob()
{
Data = new NativeSummedFloat3Dispose() { m_Buffer = m_Buffer, m_AllocatorLabel = m_AllocatorLabel }
};
JobHandle result = disposeJob.Schedule(inputDeps);
#if ENABLE_UNITY_COLLECTIONS_CHECKS
AtomicSafetyHandle.Release(m_Safety);
#endif
m_Buffer = null;
return result;
}
}
[NativeContainer]
internal unsafe struct NativeSummedFloat3Dispose
{
[NativeDisableUnsafePtrRestriction] internal void* m_Buffer;
internal Allocator m_AllocatorLabel;
public void Dispose() { UnsafeUtility.Free(m_Buffer, m_AllocatorLabel); }
}
[BurstCompile]
internal struct NativeSummedFloat3DisposeJob : IJob
{
internal NativeSummedFloat3Dispose Data;
public void Execute() { Data.Dispose(); }
}
2) 单线程 Getter 和 Setter
让我们实现一个getter和setter,它们只能从一个线程中访问。这里的getter很有意思。我们在每一个缓存行上循环,并将这些值相加以得到最终的结果。我们使用ReadArrayElementWithStride,因为我们的数组元素是一个缓存行的大小,但我们只对存储在开头的float3感兴趣。
setter首先将所有的缓存行重置为0,然后再添加值。我们接下来将看看这些方法。
/*
* ... Other Code ...
*/
public float3 Value
{
get
{
#if ENABLE_UNITY_COLLECTIONS_CHECKS
AtomicSafetyHandle.CheckReadAndThrow(m_Safety);
#endif
// 对存储在每个工作线程缓存行上的值求和。
float3 result = UnsafeUtility.ReadArrayElement<float3>(m_Buffer, 0);
for (int i = 1; i < JobsUtility.MaxJobThreadCount; i++)
result += UnsafeUtility.ReadArrayElementWithStride<float3>(m_Buffer, i, JobsUtility.CacheLineSize);
return result;
}
[WriteAccessRequired]
set
{
#if ENABLE_UNITY_COLLECTIONS_CHECKS
AtomicSafetyHandle.CheckWriteAndThrow(m_Safety);
#endif
Reset();
AddValue(value);
}
}
/*
* ... Next Code ...
*/
3) 单线程方法
AddValue和Reset方法访问缓存行的方式与我们的getter相似。我们还不需要担心多个写入器,所以我们可以使用WriteArrayElement,只需写入第一个缓存行。但是对于Reset,我们需要再次使用WriteArrayElementWithStride,因为我们的元素是一个缓存行的大小。
/*
* ... Previous Code ...
*/
[WriteAccessRequired]
public void AddValue(float3 value)
{
#if ENABLE_UNITY_COLLECTIONS_CHECKS
AtomicSafetyHandle.CheckWriteAndThrow(m_Safety);
#endif
// 为总和添加一个值。我们正在从单个线程写入,因此我们将写入第一个缓存行。
float3 current = UnsafeUtility.ReadArrayElement<float3>(m_Buffer, 0);
current += value;
UnsafeUtility.WriteArrayElement(m_Buffer, 0, current);
}
[WriteAccessRequired]
public void Reset()
{
#if ENABLE_UNITY_COLLECTIONS_CHECKS
AtomicSafetyHandle.CheckWriteAndThrow(m_Safety);
#endif
// 将每个工作线程缓存行重置为 float3.zero。
for (int i = 0; i < JobsUtility.MaxJobThreadCount; i++)
UnsafeUtility.WriteArrayElementWithStride(m_Buffer, i, JobsUtility.CacheLineSize, float3.zero);
}
/*
* ... Next Code ...
*/
4) 具有线程索引的并行writer
现在是有趣的部分,平行写入。我们在NativeSummedFloat3结构中添加代码,用于创建一个并行写入对象,这在之前的文章中已经解释过了。但需要注意的是[NativeSetThreadIndex]和m_ThreadIndex变量。注意,这里的命名很重要! 这个变量将在作业调度时收到线程索引。我们用这个变量作为缓存行的索引来读和写。
/*
* ... Previous Code ...
*/
[NativeContainerIsAtomicWriteOnly]
[NativeContainer]
unsafe public struct ParallelWriter
{
[NativeDisableUnsafePtrRestriction] internal void* m_Buffer;
#if ENABLE_UNITY_COLLECTIONS_CHECKS
internal AtomicSafetyHandle m_Safety;
#endif
[NativeSetThreadIndex]
internal int m_ThreadIndex;
[WriteAccessRequired]
public void AddValue(float3 value)
{
#if ENABLE_UNITY_COLLECTIONS_CHECKS
AtomicSafetyHandle.CheckWriteAndThrow(m_Safety);
#endif
// 为总和添加一个值。我们正在并行写入,因此我们将写入分配给该线程的缓存行。
float3 current = UnsafeUtility.ReadArrayElementWithStride<float3>(m_Buffer, m_ThreadIndex, JobsUtility.CacheLineSize);
current += value;
UnsafeUtility.WriteArrayElementWithStride(m_Buffer, m_ThreadIndex, JobsUtility.CacheLineSize, current);
}
}
public ParallelWriter AsParallelWriter()
{
ParallelWriter writer;
#if ENABLE_UNITY_COLLECTIONS_CHECKS
AtomicSafetyHandle.CheckWriteAndThrow(m_Safety);
writer.m_Safety = m_Safety;
AtomicSafetyHandle.UseSecondaryVersion(ref writer.m_Safety);
#endif
writer.m_Buffer = m_Buffer;
writer.m_ThreadIndex = 0; // 线程索引将由Job 计划稍后设置。
return writer;
}
/*
* ... More Code ...
*/
使用方法
这就是我们要做的一切 有了这个,我们就创建了一个自定义的Native container,通过利用线程索引,可以实现并行写入。下面的代码显示了我们如何使用它来计算场景中所有具有LocalToWorld组件的实体的平均位置。
using Unity.Collections;
using Unity.Entities;
using Unity.Jobs;
using Unity.Mathematics;
using Unity.Transforms;
public class NativeSummedFloat3System : SystemBase
{
private EntityQuery localToWorldQuery;
protected override void OnUpdate()
{
NativeSummedFloat3 avgPosition = new NativeSummedFloat3(Allocator.TempJob);
NativeSummedFloat3.ParallelWriter avgPositionParallelWriter = avgPosition.AsParallelWriter();
// 将实体的所有位置与 LocalToWorld 组件相加。
JobHandle jobHandle = Entities.WithName("AvgPositionJob")
.WithStoreEntityQueryInField(ref localToWorldQuery)
.ForEach((in LocalToWorld localToWorld) =>
{
avgPositionParallelWriter.AddValue(localToWorld.Position);
}).ScheduleParallel(default);
jobHandle.Complete();
// 我们存储查询,以便我们可以计算有多少实体具有 LocalToWorld 组件。
int entityCount = localToWorldQuery.CalculateEntityCount();
UnityEngine.Debug.Log(avgPosition.Value / entityCount);
avgPosition.Dispose();
}
}
总结
在这篇文章中,我们写了一个新的自定义Native Container,通过给每个线程分配它自己的内存/缓存行,使用线程索引进行并行写入。这使得我们可以创建一个可以并行写入的float3。但我们也可以使这个容器通用化,以允许对任何值进行并行操作(只要它们比缓存行小)。这个容器的通用版本可以在这里找到,还有一个如何使用它的例子。