0
点赞
收藏
分享

微信扫一扫

Velox的序列化

简介

Velox作为计算引擎,可以被Presto、Spark嵌入使用,Velox内部在Operator数据传递中使用的数据结构是列式结构RowVector,而Presto和Spark在计算节点之间也有对应的数据结构SerializedPage和UnsafeRow。在Presto、Spark使用Velox计算引擎的过程中需要处理两种数据格式的转换。

对于这种情况Velox提供了VectorSerde接口,同时提供了注册方法:registerVectorSerde,在不同的系统对接时,实现了相应的VectorSerde接口并注册,就可以将外部系统与Velox的RowVector实现互转。

接下来本文主要介绍VectorSerde接口的两种实现,PrestoVectorSerde和UnsafeRowVectorSerde,分别用来实现RowVector与Presto的SerializedPage、RowVector与Spark UnsafeRow的转换,同时SerailzedPage和UnsafeRow分别是列模型和行模型,两种实现也具有一定的代表性。

接下来会先介绍下Velox在序列化过程中内存管理相关的类StreamArena,ByteStream等基础实现,然后介绍PrestoVectorSerde和UnsafeRowVectorSerde的实现。

内存管理相关类

StreamArena概览

首先直接来看下内存相关的概念:

  • page是velox中定义的内存页,大小为由AllocationTraits::kPageSize(4K)定义,也是StreamArena申请的最小单位。
  • PageRun是指多个连续的page,PageRun提供data()方法,指向PageRun的首地址。
  • Allocation存储了PageRun的数组,Allocation由MemoryPool::allocateNonContiguous方法得到,该可以指定申请的页的数量。
  • MemoryPool是velox的内存管理接口,以树状的形式管理各个层次的内存申请和释放,在query、task、node、operator层次可以进行分别管理,本文暂不赘述。
  • Allocation的析构函数会直接释放Allocation中申请的内存。
  • StreamArena是元素为Allocation的vector,暴露两个内存申请的接口:
  • newRange:指定需要的字节数,背后通过memoryPool申请到相关的大小。需要指出的是newRange申请到内存都是page的倍数,但不会超过一个PageRun的大小。
  • newTinyRange:相对newRange来说,没有通过memoryPool申请,而是直接在StreamArena中通过类型为vector<string>的成员变量来申请,这类内存往往比较小,不需要动辄4K的页的大小来申请。
  • newRange和newTinyRange申请到的内存都以ByteRange来执行,ByteRange主要包括指针buffer和长度size,分别指向申请到的内存的开始位置和大小。

Allocation & PageRun实现

PageRun结构比较简单,构造函数中指定了头地址。

Allocation包含了PageRun的vector。

class PageRun {
   public:
...
    PageRun(void* address, MachinePageCount numPages) {
      auto word = reinterpret_cast<uint64_t>(address); // NOLINT
      data_ =
          word | (static_cast<uint64_t>(numPages) << kPointerSignificantBits);
    }
    template <typename T = uint8_t>
    T* data() const {
      return reinterpret_cast<T*>(data_ & kPointerMask); // NOLINT
    }
//...
  private:
    uint64_t data_;
}
class Allocation {
  MemoryPool* pool_{nullptr};
  std::vector<PageRun> runs_;
}

StreamArena实现

  • StreamArena通过allocations_保存申请到的内存Allocation,通过currentRun_和currentPage_用来指向需要当前用到的Allocation的PageRun和Page的索引:
  • 在newRange申请的过程中,会检查当前PageRun的索引是否超过了当前Allocation的PageRun总数,如果超过了需要申请新的Allocation。
  • 在当前PageRun中查找合适数量的Page,然后赋值给range的buffer和size。
  • tinyRanges_是string的vector:
  • 在newTinyRange时,直接vector在中申请新的string,设置特定大小,然后赋值给range的buffer和size。

class StreamArena {
//...
 private:
  // All allocations.
  std::vector<std::unique_ptr<memory::Allocation>> allocations_;
  // The allocation from which pages are given out. Moved to 'allocations_' when used up.
  memory::Allocation allocation_;
  int32_t currentRun_ = 0;
  int32_t currentPage_ = 0;
  memory::MachinePageCount allocationQuantum_ = 2;
  std::vector<std::string> tinyRanges_;
}
void StreamArena::newRange(int32_t bytes, ByteRange* range) {
  VELOX_CHECK_GT(bytes, 0);
  memory::MachinePageCount numPages =
      bits::roundUp(bytes, memory::AllocationTraits::kPageSize) /
      memory::AllocationTraits::kPageSize;
  int32_t numRuns = allocation_.numRuns();
  if (currentRun_ >= numRuns) {
    if (numRuns) {
      allocations_.push_back(
          std::make_unique<memory::Allocation>(std::move(allocation_)));
    }
    pool_->allocateNonContiguous(
        std::max(allocationQuantum_, numPages), allocation_);
    currentRun_ = 0;
    currentPage_ = 0;
    size_ += allocation_.byteSize();
  }
  auto run = allocation_.runAt(currentRun_);
  int32_t available = run.numPages() - currentPage_;
  range->buffer =
      run.data() + memory::AllocationTraits::kPageSize * currentPage_;
  range->size = std::min<int32_t>(numPages, available) *
      memory::AllocationTraits::kPageSize;
  range->position = 0;
  currentPage_ += std::min<int32_t>(available, numPages);
  if (currentPage_ == run.numPages()) {
    ++currentRun_;
    currentPage_ = 0;
  }
}
void StreamArena::newTinyRange(int32_t bytes, ByteRange* range) {
  tinyRanges_.emplace_back();
  tinyRanges_.back().resize(bytes);
  range->position = 0;
  range->buffer = reinterpret_cast<uint8_t*>(tinyRanges_.back().data());
  range->size = bytes;
}

ByteStream概览

上面介绍了StreamArena可以用来申请内存并保存在ByteRange中,在序列化/反序列化的过程中主要用到了ByteStream这个类,ByteStream用来直接对特定类型的数据进行读写,内部仍然存储ByteRange,主要方法如下:

  • append系列泛型方法提供了对不同类型的数据写入,数据最终被写入到ByteRange中。
  • read系列泛型方法提供了对特定类型的读取,读取的源都是ByteRange。

序列化接口

VeloxSerde

VectorSerde是用来做RowVector的序列化和反序列化的接口,从VectorSerde的结构来看:

  • 序列化:没有直接提供serialize方法,而是使用VectorSerializer来做Vector的序列化,提供了createSerializer方法来创建VectorSerializer。在创建VectorSerializer时:
  • 会指定RowVector的类型type,包含每一列的名称和类型。
  • 指定需要序列化的行数
  • 指定用到的StreamArena,在序列化的过程中,中间结果需要存储在StreamArena中。
  • 反序列化:使用deserialize方法来做Vector的反序列化,从ByteStream中反序列化到RowVector中。

class VectorSerde {
 public:
//...
  virtual std::unique_ptr<VectorSerializer> createSerializer(
      RowTypePtr type,
      int32_t numRows,
      StreamArena* streamArena,
      const Options* options = nullptr) = 0;
  virtual void deserialize(
      ByteStream* source,
      velox::memory::MemoryPool* pool,
      RowTypePtr type,
      RowVectorPtr* result,
      const Options* options = nullptr) = 0;
};

VectorSerializer

VectorSerializer没有直接提供serialize方法,而是提供了append方法进行序列化,在使用VectorSerializer完成序列化主要包含两步,首先是append数据,然后flush结果到OutputStream中。

  • append方法可以将一部分数据行添加到serializer的“内部的存储”中。
  • flush方法将“内部的存储”写到OutputStream中。

class VectorSerializer {
 public:
  virtual ~VectorSerializer() = default;
  /// Serialize a subset of rows in a vector.
  virtual void append(
      const RowVectorPtr& vector,
      const folly::Range<const IndexRange*>& ranges) = 0;
  /// Serialize all rows in a vector.
  void append(const RowVectorPtr& vector);
  /// Write serialized data to 'stream'.
  virtual void flush(OutputStream* stream) = 0;
};


在外部系统数据结构与Velox的RowVector相互转换的过程中需要同时实现VectorSerde和VectorSerializer两个接口,例如:

  • 针对Presto SerializedPage的PrestoVectorSerde和PrestoVectorSerializer。
  • 针对Spark UnsafeRow的UnsafeRowVectorSerde和UnsafeRowVectorSerializer。

接下来就两种实现分别进行解析。

PrestoVectorSerde

在探究其实现的过程中,也需要关注内存的申请和拷贝情况。

SerializedPage

在介绍序列化和反序列化之前,先简单介绍下SerializedPage的结构,参考SerializedPage Wire Format。

总体结构

格式包括Header、列数、列;其中行数保存在Header中。

列结构

列结构中也包含列Header、Null Flags、实际值。

  • 列header:表示列的类型
  • Null Flags:使用一个字节表示是否有nullFlag,使用每一个bit标识是否某一行是否为null。
  • 列内容,列内容根据列的类型可以分为如下几种类型:
  • 定长列:主要针对BYTE、INT、SHORT、LONG、INT128等基础类型编码。与Velox的定长列FlatVector不同的是,FlatVector对于null的行也会占用空间,两者各有优劣。
  • 变长列:主要针对VARCHAR等类型,每一行的值不固定,相对于定长列,需要多一个offsets存储,指定每一行数据的开始位置。
  • 复合列:包括ARRAY、MAP、ROW三种类型,复合类型的存储包括基础类型的列。
  • 定长列示例:
  • 变长列示例:

序列化实现

PrestoVectorSerializer

PrestoVectorSerializer的实现比较清晰:

  • 在构造时会根据RowVector的列来创建对应列的VectorStream,VectorStream是用来存储每一列的数据序列化的结果。
  • 在append时会创建按列进行序列化,主要工作在serializeColumn中实现。

这里留几个问题,接下来一一进行研究。

  • VectorStream是如何存储数据的?存储的是RowVector中数据的指针还是拷贝内存?
  • serializeColumn对于RowVector每一列有没有特化处理,比如某一列是ConstantVector、DictionaryVector等怎么处理?

class PrestoVectorSerializer : public VectorSerializer {
 public:
  PrestoVectorSerializer(
      std::shared_ptr<const RowType> rowType,
      int32_t numRows,
      StreamArena* streamArena,
      bool useLosslessTimestamp) {
    auto types = rowType->children();
    auto numTypes = types.size();
    streams_.resize(numTypes);
    for (int i = 0; i < numTypes; i++) {
      streams_[i] = std::make_unique<VectorStream>(
          types[i], streamArena, numRows, useLosslessTimestamp);
    }
  }
  void append(
      const RowVectorPtr& vector,
      const folly::Range<const IndexRange*>& ranges) override {
    auto newRows = rangesTotalSize(ranges);
    if (newRows > 0) {
      numRows_ += newRows;
      for (int32_t i = 0; i < vector->childrenSize(); ++i) {
        serializeColumn(vector->childAt(i).get(), ranges, streams_[i].get());
      }
    }
  }
  void flush(OutputStream* out) override {
    flushInternal(numRows_, false /*rle*/, out);
  }
//...
 private:
  ...
  int32_t numRows_{0};
  std::vector<std::unique_ptr<VectorStream>> streams_;
};
} // namespace

VectorStream

VectorStream是序列化SerializedPage做准备,其成员存储了SerializedPage结构中所需的Column Header、Null Flags、长度等;同时由于SeralizedPage也支持ROW、ARRAY、MAP类型的encoding,与此对应,VectorStream对于这三种类型使用了嵌套的VectorStream来表示。

初始化内存
  • children_:children表示ROW、ARRAY、MAP类型的子VectorStream,对应SerializedPage的三种类型。
  • header_:用来存储每种数据类型的头信息。
  • nulls_:用来存储null的值。
  • lengths_:用来存储长度。
  • values_:用来存储实际的值。

class VectorStream {
//...
private:
  const TypePtr type_;
//...
  ByteRange header_;
  ByteStream nulls_;
  ByteStream lengths_;
  ByteStream values_;
  std::vector<std::unique_ptr<VectorStream>> children_;
};

在VectorStream构造时,会对上述成员进行初始化,调用ByteStream::startWrite方法进行内存申请,根据RowVector中每列的数据类型,进行内存预申请。

下面将重要的字段初始化代码列出:代码中的lengths_对应SerializedPage中的offsets

  • 对于ROW、ARRAY、MAP类型除了lengths_初始化,也需要进行childrens_初始化,因为是嵌套结构
  • 对于VARCHAR、VARBINARY类型,进行了lengths_和values_的初始化;

class VectorStream {
 public:
  VectorStream(
      const TypePtr type,
      StreamArena* streamArena,
      int32_t initialNumRows,
      bool useLosslessTimestamp)
      : type_(type),
        useLosslessTimestamp_(useLosslessTimestamp),
        nulls_(streamArena, true, true),
        lengths_(streamArena),
        values_(streamArena) {
//...
    if (initialNumRows > 0) {
      switch (type_->kind()) {
        case TypeKind::ROW:
          if (isTimestampWithTimeZoneType(type_)) {
            values_.startWrite(initialNumRows * 4);
            break;
          }
          [[fallthrough]];
        case TypeKind::ARRAY:
        case TypeKind::MAP:
          hasLengths_ = true;
          lengths_.startWrite(initialNumRows * sizeof(vector_size_t));
          children_.resize(type_->size());
          for (int32_t i = 0; i < type_->size(); ++i) {
            children_[i] = std::make_unique<VectorStream>(
                type_->childAt(i),
                streamArena,
                initialNumRows,
                useLosslessTimestamp);
          }
          break;
        case TypeKind::VARCHAR:
        case TypeKind::VARBINARY:
          hasLengths_ = true;
          lengths_.startWrite(initialNumRows * sizeof(vector_size_t));
          values_.startWrite(initialNumRows * 10);
          break;
        default:;
          values_.startWrite(initialNumRows * 4);
          break;
      }
    }
  }
private:
  const TypePtr type_;
//...
  ByteRange header_;
  ByteStream nulls_;
  ByteStream lengths_;
  ByteStream values_;
  std::vector<std::unique_ptr<VectorStream>> children_;
};

序列化

序列化的Append方法,最终会对每一列调用serializeColumn方法,该方法实现中针对每一种列类型进行专门处理,例如对于Flat类型调用了serializeFlatVector。

针对每一种序列化的主要流程是根据提出数据的格式,写入VectorStream的lengths_、values_中,对于复合类型,进行递归调用。

需要注意的是,数据是会被写到StreamArena的内存时,使用了内存拷贝,即使是String类型。也就说在StreamArena中存储的完整的拷贝后的RowVector的数据。

void serializeColumn(
    const BaseVector* vector,
    const folly::Range<const IndexRange*>& ranges,
    VectorStream* stream) {
  switch (vector->encoding()) {
    case VectorEncoding::Simple::FLAT:
      VELOX_DYNAMIC_SCALAR_TYPE_DISPATCH_ALL(
          serializeFlatVector, vector->typeKind(), vector, ranges, stream);
      break;
    case VectorEncoding::Simple::CONSTANT:
      VELOX_DYNAMIC_TYPE_DISPATCH_ALL(
          serializeConstantVector, vector->typeKind(), vector, ranges, stream);
      break;
    case VectorEncoding::Simple::BIASED:
      switch (vector->typeKind()) {
        case TypeKind::SMALLINT:
          serializeBiasVector<int16_t>(vector, ranges, stream);
          break;
        case TypeKind::INTEGER:
          serializeBiasVector<int32_t>(vector, ranges, stream);
          break;
        case TypeKind::BIGINT:
          serializeBiasVector<int64_t>(vector, ranges, stream);
          break;
        default:
          throw std::invalid_argument("Invalid biased vector type");
      }
      break;
    case VectorEncoding::Simple::ROW:
      serializeRowVector(vector, ranges, stream);
      break;
    case VectorEncoding::Simple::ARRAY:
      serializeArrayVector(vector, ranges, stream);
      break;
    case VectorEncoding::Simple::MAP:
      serializeMapVector(vector, ranges, stream);
      break;
    case VectorEncoding::Simple::LAZY:
      serializeColumn(vector->loadedVector(), ranges, stream);
      break;
    default:
      serializeWrapped(vector, ranges, stream);
  }
}

  • flush方法

serializeColumn是将数据写入StreamArena的内存中,flush方法可以真正的将数据严格按照SerializedPage的格式写入OutputStream。其中ROW、ARRAY、MAP类型调用了递归调用了children的flush方法。

void flush(OutputStream* out) {
    out->write(reinterpret_cast<char*>(header_.buffer), header_.size);
    switch (type_->kind()) {
      case TypeKind::ROW:
        //...
      case TypeKind::ARRAY:
        //...
      case TypeKind::MAP:
        //...
      case TypeKind::VARCHAR:
      case TypeKind::VARBINARY:
        //...
      default:
        //...
    }
  }

在PrestoVectorSerializer的flush方法中,会按照列的个数,逐个调用对应的VectorStream的flush方法。

反序列化实现

反序列化是指从SerializePage二进制格式转化成RowVector;

  • 从函数签名可以看出,需要传入输出类型RowType,说明在反序列化之前已经知道了RowVector有哪些列。
  • 反序列化的源头是ByteStream,有一组ByteRange组成。

void deserialize(
      ByteStream* source,
      velox::memory::MemoryPool* pool,
      std::shared_ptr<const RowType> type,
      std::shared_ptr<RowVector>* result) override;

反序列化的核心方法是readColumns,读取每一列的数据到result的children中。

auto children = &(*result)->children();
  auto childTypes = type->as<TypeKind::ROW>().children();
  readColumns(source, pool, childTypes, children);

接下来看下readColumns的核心实现:

  • 在readColumns中首先定义了针对不同数据类型的读取函数:对于简单类型,都使用了read函数,对于复杂类型使用了单独的读取方法。
  • 然后按照每一列的类型,逐个调用其对应类型的读取方法。

void readColumns(
    ByteStream* source,
    velox::memory::MemoryPool* pool,
    const std::vector<TypePtr>& types,
    std::vector<VectorPtr>* result) {
  static std::unordered_map<
      TypeKind,
      std::function<void(
          ByteStream * source,
          std::shared_ptr<const Type> type,
          velox::memory::MemoryPool * pool,
          VectorPtr * result)>>
      readers = {
          {TypeKind::BOOLEAN, &read<bool>},
          {TypeKind::TINYINT, &read<int8_t>},
          {TypeKind::SMALLINT, &read<int16_t>},
          {TypeKind::INTEGER, &read<int32_t>},
          {TypeKind::BIGINT, &read<int64_t>},
          {TypeKind::REAL, &read<float>},
          {TypeKind::DOUBLE, &read<double>},
          {TypeKind::TIMESTAMP, &read<Timestamp>},
          {TypeKind::DATE, &read<Date>},
          {TypeKind::VARCHAR, &read<StringView>},
          {TypeKind::VARBINARY, &read<StringView>},
          {TypeKind::ARRAY, &readArrayVector},
          {TypeKind::MAP, &readMapVector},
          {TypeKind::ROW, &readRowVector},
          {TypeKind::UNKNOWN, &read<UnknownValue>}};
  for (int32_t i = 0; i < types.size(); ++i) {
    auto it = readers.find(types[i]->kind());
//...
    it->second(source, types[i], pool, &(*result)[i]);
  }
}

接下来分别以简单类型和复杂类型来举例说明:

  • 简单类型:首先读取size,然后根据size大小创建特定大小的FlatVector,最后对flatVector进行赋值。

template <typename T>
void read(
    ByteStream* source,
    std::shared_ptr<const Type> type,
    velox::memory::MemoryPool* pool,
    VectorPtr* result) {
  int32_t size = source->read<int32_t>();
  if (*result && result->unique()) {
    (*result)->resize(size);
  } else {
    *result = BaseVector::create(type, size, pool);
  }
  auto flatResult = (*result)->asFlatVector<T>();
  auto nullCount = readNulls(source, size, flatResult);
  BufferPtr values = flatResult->mutableValues(size);
  readValues<T>(source, size, flatResult->nulls(), nullCount, values);
}

  • 复杂类型:以MapVector为例,首先会将mapVector的key和value提取出来作为children,通过递归调用readColumns将值写入到children中,然后将key和value的Vector设置到MapVector中。其他复杂类型同理,也是将children先进行序列化,然后children赋值到复杂类型中。

void readMapVector(
    ByteStream* source,
    std::shared_ptr<const Type> type,
    velox::memory::MemoryPool* pool,
    VectorPtr* result) {
  MapVector* mapVector =
      (*result && result->unique()) ? (*result)->as<MapVector>() : nullptr;
  std::vector<TypePtr> childTypes = {type->childAt(0), type->childAt(1)};
  std::vector<VectorPtr> children(2);
  if (mapVector) {
    children[0] = mapVector->mapKeys();
    children[1] = mapVector->mapValues();
  }
  readColumns(source, pool, childTypes, &children);
  //...
  mapVector->setKeysAndValues(children[0], children[1]);
  //...
}

同样,在反序列化的过程中,也会将数据从ByteStream中数据复制到Vector中,同样发生了内存复制。

举报

相关推荐

0 条评论