简介
Velox作为计算引擎,可以被Presto、Spark嵌入使用,Velox内部在Operator数据传递中使用的数据结构是列式结构RowVector,而Presto和Spark在计算节点之间也有对应的数据结构SerializedPage和UnsafeRow。在Presto、Spark使用Velox计算引擎的过程中需要处理两种数据格式的转换。
对于这种情况Velox提供了VectorSerde接口,同时提供了注册方法:registerVectorSerde,在不同的系统对接时,实现了相应的VectorSerde接口并注册,就可以将外部系统与Velox的RowVector实现互转。
接下来本文主要介绍VectorSerde接口的两种实现,PrestoVectorSerde和UnsafeRowVectorSerde,分别用来实现RowVector与Presto的SerializedPage、RowVector与Spark UnsafeRow的转换,同时SerailzedPage和UnsafeRow分别是列模型和行模型,两种实现也具有一定的代表性。
接下来会先介绍下Velox在序列化过程中内存管理相关的类StreamArena,ByteStream等基础实现,然后介绍PrestoVectorSerde和UnsafeRowVectorSerde的实现。
内存管理相关类
StreamArena概览
首先直接来看下内存相关的概念:
- page是velox中定义的内存页,大小为由AllocationTraits::kPageSize(4K)定义,也是StreamArena申请的最小单位。
- PageRun是指多个连续的page,PageRun提供data()方法,指向PageRun的首地址。
- Allocation存储了PageRun的数组,Allocation由MemoryPool::allocateNonContiguous方法得到,该可以指定申请的页的数量。
- MemoryPool是velox的内存管理接口,以树状的形式管理各个层次的内存申请和释放,在query、task、node、operator层次可以进行分别管理,本文暂不赘述。
- Allocation的析构函数会直接释放Allocation中申请的内存。
- StreamArena是元素为Allocation的vector,暴露两个内存申请的接口:
- newRange:指定需要的字节数,背后通过memoryPool申请到相关的大小。需要指出的是newRange申请到内存都是page的倍数,但不会超过一个PageRun的大小。
- newTinyRange:相对newRange来说,没有通过memoryPool申请,而是直接在StreamArena中通过类型为vector<string>的成员变量来申请,这类内存往往比较小,不需要动辄4K的页的大小来申请。
- newRange和newTinyRange申请到的内存都以ByteRange来执行,ByteRange主要包括指针buffer和长度size,分别指向申请到的内存的开始位置和大小。
Allocation & PageRun实现
PageRun结构比较简单,构造函数中指定了头地址。
Allocation包含了PageRun的vector。
class PageRun {
public:
...
PageRun(void* address, MachinePageCount numPages) {
auto word = reinterpret_cast<uint64_t>(address); // NOLINT
data_ =
word | (static_cast<uint64_t>(numPages) << kPointerSignificantBits);
}
template <typename T = uint8_t>
T* data() const {
return reinterpret_cast<T*>(data_ & kPointerMask); // NOLINT
}
//...
private:
uint64_t data_;
}
class Allocation {
MemoryPool* pool_{nullptr};
std::vector<PageRun> runs_;
}
StreamArena实现
- StreamArena通过allocations_保存申请到的内存Allocation,通过currentRun_和currentPage_用来指向需要当前用到的Allocation的PageRun和Page的索引:
- 在newRange申请的过程中,会检查当前PageRun的索引是否超过了当前Allocation的PageRun总数,如果超过了需要申请新的Allocation。
- 在当前PageRun中查找合适数量的Page,然后赋值给range的buffer和size。
- tinyRanges_是string的vector:
- 在newTinyRange时,直接vector在中申请新的string,设置特定大小,然后赋值给range的buffer和size。
class StreamArena {
//...
private:
// All allocations.
std::vector<std::unique_ptr<memory::Allocation>> allocations_;
// The allocation from which pages are given out. Moved to 'allocations_' when used up.
memory::Allocation allocation_;
int32_t currentRun_ = 0;
int32_t currentPage_ = 0;
memory::MachinePageCount allocationQuantum_ = 2;
std::vector<std::string> tinyRanges_;
}
void StreamArena::newRange(int32_t bytes, ByteRange* range) {
VELOX_CHECK_GT(bytes, 0);
memory::MachinePageCount numPages =
bits::roundUp(bytes, memory::AllocationTraits::kPageSize) /
memory::AllocationTraits::kPageSize;
int32_t numRuns = allocation_.numRuns();
if (currentRun_ >= numRuns) {
if (numRuns) {
allocations_.push_back(
std::make_unique<memory::Allocation>(std::move(allocation_)));
}
pool_->allocateNonContiguous(
std::max(allocationQuantum_, numPages), allocation_);
currentRun_ = 0;
currentPage_ = 0;
size_ += allocation_.byteSize();
}
auto run = allocation_.runAt(currentRun_);
int32_t available = run.numPages() - currentPage_;
range->buffer =
run.data() + memory::AllocationTraits::kPageSize * currentPage_;
range->size = std::min<int32_t>(numPages, available) *
memory::AllocationTraits::kPageSize;
range->position = 0;
currentPage_ += std::min<int32_t>(available, numPages);
if (currentPage_ == run.numPages()) {
++currentRun_;
currentPage_ = 0;
}
}
void StreamArena::newTinyRange(int32_t bytes, ByteRange* range) {
tinyRanges_.emplace_back();
tinyRanges_.back().resize(bytes);
range->position = 0;
range->buffer = reinterpret_cast<uint8_t*>(tinyRanges_.back().data());
range->size = bytes;
}
ByteStream概览
上面介绍了StreamArena可以用来申请内存并保存在ByteRange中,在序列化/反序列化的过程中主要用到了ByteStream这个类,ByteStream用来直接对特定类型的数据进行读写,内部仍然存储ByteRange,主要方法如下:
- append系列泛型方法提供了对不同类型的数据写入,数据最终被写入到ByteRange中。
- read系列泛型方法提供了对特定类型的读取,读取的源都是ByteRange。
序列化接口
VeloxSerde
VectorSerde是用来做RowVector的序列化和反序列化的接口,从VectorSerde的结构来看:
- 序列化:没有直接提供serialize方法,而是使用VectorSerializer来做Vector的序列化,提供了createSerializer方法来创建VectorSerializer。在创建VectorSerializer时:
- 会指定RowVector的类型type,包含每一列的名称和类型。
- 指定需要序列化的行数
- 指定用到的StreamArena,在序列化的过程中,中间结果需要存储在StreamArena中。
- 反序列化:使用deserialize方法来做Vector的反序列化,从ByteStream中反序列化到RowVector中。
class VectorSerde {
public:
//...
virtual std::unique_ptr<VectorSerializer> createSerializer(
RowTypePtr type,
int32_t numRows,
StreamArena* streamArena,
const Options* options = nullptr) = 0;
virtual void deserialize(
ByteStream* source,
velox::memory::MemoryPool* pool,
RowTypePtr type,
RowVectorPtr* result,
const Options* options = nullptr) = 0;
};
VectorSerializer
VectorSerializer没有直接提供serialize方法,而是提供了append方法进行序列化,在使用VectorSerializer完成序列化主要包含两步,首先是append数据,然后flush结果到OutputStream中。
- append方法可以将一部分数据行添加到serializer的“内部的存储”中。
- flush方法将“内部的存储”写到OutputStream中。
class VectorSerializer {
public:
virtual ~VectorSerializer() = default;
/// Serialize a subset of rows in a vector.
virtual void append(
const RowVectorPtr& vector,
const folly::Range<const IndexRange*>& ranges) = 0;
/// Serialize all rows in a vector.
void append(const RowVectorPtr& vector);
/// Write serialized data to 'stream'.
virtual void flush(OutputStream* stream) = 0;
};
在外部系统数据结构与Velox的RowVector相互转换的过程中需要同时实现VectorSerde和VectorSerializer两个接口,例如:
- 针对Presto SerializedPage的PrestoVectorSerde和PrestoVectorSerializer。
- 针对Spark UnsafeRow的UnsafeRowVectorSerde和UnsafeRowVectorSerializer。
接下来就两种实现分别进行解析。
PrestoVectorSerde
在探究其实现的过程中,也需要关注内存的申请和拷贝情况。
SerializedPage
在介绍序列化和反序列化之前,先简单介绍下SerializedPage的结构,参考SerializedPage Wire Format。
总体结构
格式包括Header、列数、列;其中行数保存在Header中。
列结构
列结构中也包含列Header、Null Flags、实际值。
- 列header:表示列的类型
- Null Flags:使用一个字节表示是否有nullFlag,使用每一个bit标识是否某一行是否为null。
- 列内容,列内容根据列的类型可以分为如下几种类型:
- 定长列:主要针对BYTE、INT、SHORT、LONG、INT128等基础类型编码。与Velox的定长列FlatVector不同的是,FlatVector对于null的行也会占用空间,两者各有优劣。
- 变长列:主要针对VARCHAR等类型,每一行的值不固定,相对于定长列,需要多一个offsets存储,指定每一行数据的开始位置。
- 复合列:包括ARRAY、MAP、ROW三种类型,复合类型的存储包括基础类型的列。
- 定长列示例:
- 变长列示例:
序列化实现
PrestoVectorSerializer
PrestoVectorSerializer的实现比较清晰:
- 在构造时会根据RowVector的列来创建对应列的VectorStream,VectorStream是用来存储每一列的数据序列化的结果。
- 在append时会创建按列进行序列化,主要工作在serializeColumn中实现。
这里留几个问题,接下来一一进行研究。
- VectorStream是如何存储数据的?存储的是RowVector中数据的指针还是拷贝内存?
- serializeColumn对于RowVector每一列有没有特化处理,比如某一列是ConstantVector、DictionaryVector等怎么处理?
class PrestoVectorSerializer : public VectorSerializer {
public:
PrestoVectorSerializer(
std::shared_ptr<const RowType> rowType,
int32_t numRows,
StreamArena* streamArena,
bool useLosslessTimestamp) {
auto types = rowType->children();
auto numTypes = types.size();
streams_.resize(numTypes);
for (int i = 0; i < numTypes; i++) {
streams_[i] = std::make_unique<VectorStream>(
types[i], streamArena, numRows, useLosslessTimestamp);
}
}
void append(
const RowVectorPtr& vector,
const folly::Range<const IndexRange*>& ranges) override {
auto newRows = rangesTotalSize(ranges);
if (newRows > 0) {
numRows_ += newRows;
for (int32_t i = 0; i < vector->childrenSize(); ++i) {
serializeColumn(vector->childAt(i).get(), ranges, streams_[i].get());
}
}
}
void flush(OutputStream* out) override {
flushInternal(numRows_, false /*rle*/, out);
}
//...
private:
...
int32_t numRows_{0};
std::vector<std::unique_ptr<VectorStream>> streams_;
};
} // namespace
VectorStream
VectorStream是序列化SerializedPage做准备,其成员存储了SerializedPage结构中所需的Column Header、Null Flags、长度等;同时由于SeralizedPage也支持ROW、ARRAY、MAP类型的encoding,与此对应,VectorStream对于这三种类型使用了嵌套的VectorStream来表示。
初始化内存
- children_:children表示ROW、ARRAY、MAP类型的子VectorStream,对应SerializedPage的三种类型。
- header_:用来存储每种数据类型的头信息。
- nulls_:用来存储null的值。
- lengths_:用来存储长度。
- values_:用来存储实际的值。
class VectorStream {
//...
private:
const TypePtr type_;
//...
ByteRange header_;
ByteStream nulls_;
ByteStream lengths_;
ByteStream values_;
std::vector<std::unique_ptr<VectorStream>> children_;
};
在VectorStream构造时,会对上述成员进行初始化,调用ByteStream::startWrite方法进行内存申请,根据RowVector中每列的数据类型,进行内存预申请。
下面将重要的字段初始化代码列出:代码中的lengths_对应SerializedPage中的offsets
- 对于ROW、ARRAY、MAP类型除了lengths_初始化,也需要进行childrens_初始化,因为是嵌套结构
- 对于VARCHAR、VARBINARY类型,进行了lengths_和values_的初始化;
class VectorStream {
public:
VectorStream(
const TypePtr type,
StreamArena* streamArena,
int32_t initialNumRows,
bool useLosslessTimestamp)
: type_(type),
useLosslessTimestamp_(useLosslessTimestamp),
nulls_(streamArena, true, true),
lengths_(streamArena),
values_(streamArena) {
//...
if (initialNumRows > 0) {
switch (type_->kind()) {
case TypeKind::ROW:
if (isTimestampWithTimeZoneType(type_)) {
values_.startWrite(initialNumRows * 4);
break;
}
[[fallthrough]];
case TypeKind::ARRAY:
case TypeKind::MAP:
hasLengths_ = true;
lengths_.startWrite(initialNumRows * sizeof(vector_size_t));
children_.resize(type_->size());
for (int32_t i = 0; i < type_->size(); ++i) {
children_[i] = std::make_unique<VectorStream>(
type_->childAt(i),
streamArena,
initialNumRows,
useLosslessTimestamp);
}
break;
case TypeKind::VARCHAR:
case TypeKind::VARBINARY:
hasLengths_ = true;
lengths_.startWrite(initialNumRows * sizeof(vector_size_t));
values_.startWrite(initialNumRows * 10);
break;
default:;
values_.startWrite(initialNumRows * 4);
break;
}
}
}
private:
const TypePtr type_;
//...
ByteRange header_;
ByteStream nulls_;
ByteStream lengths_;
ByteStream values_;
std::vector<std::unique_ptr<VectorStream>> children_;
};
序列化
序列化的Append方法,最终会对每一列调用serializeColumn方法,该方法实现中针对每一种列类型进行专门处理,例如对于Flat类型调用了serializeFlatVector。
针对每一种序列化的主要流程是根据提出数据的格式,写入VectorStream的lengths_、values_中,对于复合类型,进行递归调用。
需要注意的是,数据是会被写到StreamArena的内存时,使用了内存拷贝,即使是String类型。也就说在StreamArena中存储的完整的拷贝后的RowVector的数据。
void serializeColumn(
const BaseVector* vector,
const folly::Range<const IndexRange*>& ranges,
VectorStream* stream) {
switch (vector->encoding()) {
case VectorEncoding::Simple::FLAT:
VELOX_DYNAMIC_SCALAR_TYPE_DISPATCH_ALL(
serializeFlatVector, vector->typeKind(), vector, ranges, stream);
break;
case VectorEncoding::Simple::CONSTANT:
VELOX_DYNAMIC_TYPE_DISPATCH_ALL(
serializeConstantVector, vector->typeKind(), vector, ranges, stream);
break;
case VectorEncoding::Simple::BIASED:
switch (vector->typeKind()) {
case TypeKind::SMALLINT:
serializeBiasVector<int16_t>(vector, ranges, stream);
break;
case TypeKind::INTEGER:
serializeBiasVector<int32_t>(vector, ranges, stream);
break;
case TypeKind::BIGINT:
serializeBiasVector<int64_t>(vector, ranges, stream);
break;
default:
throw std::invalid_argument("Invalid biased vector type");
}
break;
case VectorEncoding::Simple::ROW:
serializeRowVector(vector, ranges, stream);
break;
case VectorEncoding::Simple::ARRAY:
serializeArrayVector(vector, ranges, stream);
break;
case VectorEncoding::Simple::MAP:
serializeMapVector(vector, ranges, stream);
break;
case VectorEncoding::Simple::LAZY:
serializeColumn(vector->loadedVector(), ranges, stream);
break;
default:
serializeWrapped(vector, ranges, stream);
}
}
- flush方法
serializeColumn是将数据写入StreamArena的内存中,flush方法可以真正的将数据严格按照SerializedPage的格式写入OutputStream。其中ROW、ARRAY、MAP类型调用了递归调用了children的flush方法。
void flush(OutputStream* out) {
out->write(reinterpret_cast<char*>(header_.buffer), header_.size);
switch (type_->kind()) {
case TypeKind::ROW:
//...
case TypeKind::ARRAY:
//...
case TypeKind::MAP:
//...
case TypeKind::VARCHAR:
case TypeKind::VARBINARY:
//...
default:
//...
}
}
在PrestoVectorSerializer的flush方法中,会按照列的个数,逐个调用对应的VectorStream的flush方法。
反序列化实现
反序列化是指从SerializePage二进制格式转化成RowVector;
- 从函数签名可以看出,需要传入输出类型RowType,说明在反序列化之前已经知道了RowVector有哪些列。
- 反序列化的源头是ByteStream,有一组ByteRange组成。
void deserialize(
ByteStream* source,
velox::memory::MemoryPool* pool,
std::shared_ptr<const RowType> type,
std::shared_ptr<RowVector>* result) override;
反序列化的核心方法是readColumns,读取每一列的数据到result的children中。
auto children = &(*result)->children();
auto childTypes = type->as<TypeKind::ROW>().children();
readColumns(source, pool, childTypes, children);
接下来看下readColumns的核心实现:
- 在readColumns中首先定义了针对不同数据类型的读取函数:对于简单类型,都使用了read函数,对于复杂类型使用了单独的读取方法。
- 然后按照每一列的类型,逐个调用其对应类型的读取方法。
void readColumns(
ByteStream* source,
velox::memory::MemoryPool* pool,
const std::vector<TypePtr>& types,
std::vector<VectorPtr>* result) {
static std::unordered_map<
TypeKind,
std::function<void(
ByteStream * source,
std::shared_ptr<const Type> type,
velox::memory::MemoryPool * pool,
VectorPtr * result)>>
readers = {
{TypeKind::BOOLEAN, &read<bool>},
{TypeKind::TINYINT, &read<int8_t>},
{TypeKind::SMALLINT, &read<int16_t>},
{TypeKind::INTEGER, &read<int32_t>},
{TypeKind::BIGINT, &read<int64_t>},
{TypeKind::REAL, &read<float>},
{TypeKind::DOUBLE, &read<double>},
{TypeKind::TIMESTAMP, &read<Timestamp>},
{TypeKind::DATE, &read<Date>},
{TypeKind::VARCHAR, &read<StringView>},
{TypeKind::VARBINARY, &read<StringView>},
{TypeKind::ARRAY, &readArrayVector},
{TypeKind::MAP, &readMapVector},
{TypeKind::ROW, &readRowVector},
{TypeKind::UNKNOWN, &read<UnknownValue>}};
for (int32_t i = 0; i < types.size(); ++i) {
auto it = readers.find(types[i]->kind());
//...
it->second(source, types[i], pool, &(*result)[i]);
}
}
接下来分别以简单类型和复杂类型来举例说明:
- 简单类型:首先读取size,然后根据size大小创建特定大小的FlatVector,最后对flatVector进行赋值。
template <typename T>
void read(
ByteStream* source,
std::shared_ptr<const Type> type,
velox::memory::MemoryPool* pool,
VectorPtr* result) {
int32_t size = source->read<int32_t>();
if (*result && result->unique()) {
(*result)->resize(size);
} else {
*result = BaseVector::create(type, size, pool);
}
auto flatResult = (*result)->asFlatVector<T>();
auto nullCount = readNulls(source, size, flatResult);
BufferPtr values = flatResult->mutableValues(size);
readValues<T>(source, size, flatResult->nulls(), nullCount, values);
}
- 复杂类型:以MapVector为例,首先会将mapVector的key和value提取出来作为children,通过递归调用readColumns将值写入到children中,然后将key和value的Vector设置到MapVector中。其他复杂类型同理,也是将children先进行序列化,然后children赋值到复杂类型中。
void readMapVector(
ByteStream* source,
std::shared_ptr<const Type> type,
velox::memory::MemoryPool* pool,
VectorPtr* result) {
MapVector* mapVector =
(*result && result->unique()) ? (*result)->as<MapVector>() : nullptr;
std::vector<TypePtr> childTypes = {type->childAt(0), type->childAt(1)};
std::vector<VectorPtr> children(2);
if (mapVector) {
children[0] = mapVector->mapKeys();
children[1] = mapVector->mapValues();
}
readColumns(source, pool, childTypes, &children);
//...
mapVector->setKeysAndValues(children[0], children[1]);
//...
}
同样,在反序列化的过程中,也会将数据从ByteStream中数据复制到Vector中,同样发生了内存复制。