diff options
author | aneporada <aneporada@ydb.tech> | 2023-02-21 07:25:55 +0300 |
---|---|---|
committer | aneporada <aneporada@ydb.tech> | 2023-02-21 07:25:55 +0300 |
commit | fc7cc9d8fb7089819868bb4c5446fef707b2c61c (patch) | |
tree | fabcea8ada5348839af845b78f6c293d9f831a3e | |
parent | 95faf209707e9cb22e5109c9bf35611fe7c1d449 (diff) | |
download | ydb-fc7cc9d8fb7089819868bb4c5446fef707b2c61c.tar.gz |
Fix handling of chunked arrays in BlockTop
Add more IArrayBuilder::AddMany() overloads
-rw-r--r-- | ydb/library/yql/minikql/comp_nodes/mkql_block_top.cpp | 100 | ||||
-rw-r--r-- | ydb/library/yql/public/udf/arrow/block_builder.h | 263 |
2 files changed, 324 insertions, 39 deletions
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_top.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_top.cpp index 63e4b01324..d50b5a1c76 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_block_top.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_top.cpp @@ -20,6 +20,7 @@ namespace NMiniKQL { namespace { class TTopBlocksWrapper : public TStatefulWideFlowBlockComputationNode<TTopBlocksWrapper> { + using TChunkedArrayIndex = TVector<IArrayBuilder::TArrayDataItem>; public: TTopBlocksWrapper(TComputationMutables& mutables, IComputationWideFlowNode* flow, TTupleType* tupleType, IComputationNode* count, TVector<IComputationNode*>&& directions, TVector<ui32>&& keyIndicies, bool sort) @@ -88,7 +89,15 @@ public: blockIndicies->emplace_back(row); } - TBlockLess cmp(KeyIndicies_, s, s.Values_); + TVector<TChunkedArrayIndex> arrayIndicies(Columns_.size()); + for (ui32 i = 0; i < Columns_.size(); ++i) { + if (Columns_[i]->GetShape() != TBlockType::EShape::Scalar) { + auto datum = TArrowBlock::From(s.Values_[i]).GetDatum(); + arrayIndicies[i] = MakeChunkedArrayIndex(datum); + } + } + + TBlockLess cmp(KeyIndicies_, s, arrayIndicies); NYql::FastNthElement(blockIndicies->begin(), blockIndicies->begin() + s.Count_, blockIndicies->end(), cmp); } @@ -144,8 +153,6 @@ private: TVector<NUdf::TUnboxedValue> Values_; TVector<NUdf::TUnboxedValue*> ValuePointers_; - TVector<NUdf::TUnboxedValue> TmpValues_; - TState(TMemoryUsageInfo* memInfo, const TVector<ui32>& keyIndicies, const TVector<TBlockType*>& columns) : TComputationValue(memInfo) { @@ -168,8 +175,6 @@ private: ValuePointers_[i] = &Values_[i]; } - TmpValues_.resize(columns.size()); - Comparators_.resize(keyIndicies.size()); for (ui32 k = 0; k < keyIndicies.size(); ++k) { Comparators_[k] = NUdf::MakeBlockItemComparator(TTypeInfoHelper(), columns[keyIndicies[k]]->GetItemType()); @@ -202,11 +207,13 @@ private: void CompressBuilders(bool sort, const TVector<TBlockType*>& columns, const TVector<ui32>& keyIndicies, TComputationContext& ctx) { Y_VERIFY(ScalarsFilled_); + TVector<TChunkedArrayIndex> arrayIndicies(columns.size()); + TVector<arrow::Datum> tmpDatums(columns.size()); for (ui32 i = 0; i < columns.size(); ++i) { - if (columns[i]->GetShape() == TBlockType::EShape::Scalar) { - TmpValues_[i] = ScalarValues_[i]; - } else { - TmpValues_[i] = ctx.HolderFactory.CreateArrowBlock(arrow::Datum(Builders_[i]->Build(false))); + if (columns[i]->GetShape() != TBlockType::EShape::Scalar) { + auto datum = Builders_[i]->Build(false); + arrayIndicies[i] = MakeChunkedArrayIndex(datum); + tmpDatums[i] = std::move(datum); } } @@ -217,7 +224,7 @@ private: } ui64 blockLen = Min(BuilderLength_, Count_); - TBlockLess cmp(keyIndicies, *this, TmpValues_); + TBlockLess cmp(keyIndicies, *this, arrayIndicies); if (BuilderLength_ <= Count_) { if (sort) { std::sort(blockIndicies.begin(), blockIndicies.end(), cmp); @@ -235,18 +242,11 @@ private: continue; } - const auto& datum = TArrowBlock::From(TmpValues_[i]).GetDatum(); - const auto& array = *datum.array(); - for (ui64 j = 0; j < blockLen; ++j) { - Builders_[i]->Add(LeftReaders_[i]->GetItem(array, blockIndicies[j])); - } + auto& arrayIndex = arrayIndicies[i]; + Builders_[i]->AddMany(arrayIndex.data(), arrayIndex.size(), blockIndicies.data(), blockLen); } BuilderLength_ = blockLen; - - for (ui32 i = 0; i < columns.size(); ++i) { - TmpValues_[i] = {}; - } } void FillOutput(const TVector<TBlockType*>& columns, NUdf::TUnboxedValue*const* output, TComputationContext& ctx) { @@ -272,15 +272,11 @@ private: } const auto& datum = TArrowBlock::From(Values_[i]).GetDatum(); - const auto& array = *datum.array(); + auto arrayIndex = MakeChunkedArrayIndex(datum); if (blockIndicies) { - for (ui64 j = 0; j < Count_; ++j) { - Builders_[i]->Add(LeftReaders_[i]->GetItem(array, (*blockIndicies)[j])); - } + Builders_[i]->AddMany(arrayIndex.data(), arrayIndex.size(), blockIndicies->data(), Count_); } else { - for (ui64 row = 0; row < blockLen; ++row) { - Builders_[i]->Add(LeftReaders_[i]->GetItem(array, row)); - } + Builders_[i]->AddMany(arrayIndex.data(), arrayIndex.size(), ui64(0), blockLen); } } @@ -300,25 +296,41 @@ private: return *static_cast<TState*>(state.AsBoxed().Get()); } + static TChunkedArrayIndex MakeChunkedArrayIndex(const arrow::Datum& datum) { + TChunkedArrayIndex result; + if (datum.is_array()) { + result.push_back({datum.array().get(), 0}); + } else { + auto chunks = datum.chunks(); + ui64 offset = 0; + for (auto& chunk : chunks) { + auto arrayData = chunk->data(); + result.push_back({arrayData.get(), offset}); + offset += arrayData->length; + } + } + return result; + } + class TBlockLess { public: - TBlockLess(const TVector<ui32>& keyIndicies, const TState& state, const TVector<NUdf::TUnboxedValue>& values) + TBlockLess(const TVector<ui32>& keyIndicies, const TState& state, const TVector<TChunkedArrayIndex>& arrayIndicies) : KeyIndicies_(keyIndicies) + , ArrayIndicies_(arrayIndicies) , State_(state) - , Values_(values) {} bool operator()(ui64 lhs, ui64 rhs) const { if (KeyIndicies_.size() == 1) { auto i = KeyIndicies_[0]; - const auto& datum = TArrowBlock::From(Values_[i]).GetDatum(); - if (datum.is_scalar()) { + auto& arrayIndex = ArrayIndicies_[i]; + if (arrayIndex.empty()) { + // scalar return false; } - const auto& array = *datum.array(); - auto leftItem = State_.LeftReaders_[i]->GetItem(array, lhs); - auto rightItem = State_.RightReaders_[i]->GetItem(array, rhs); + auto leftItem = GetBlockItem(*State_.LeftReaders_[i], arrayIndex, lhs); + auto rightItem = GetBlockItem(*State_.RightReaders_[i], arrayIndex, rhs); if (State_.Directions_[0]) { return State_.Comparators_[0]->Less(leftItem, rightItem); } else { @@ -327,14 +339,14 @@ private: } else { for (ui32 k = 0; k < KeyIndicies_.size(); ++k) { auto i = KeyIndicies_[k]; - const auto& datum = TArrowBlock::From(Values_[i]).GetDatum(); - if (datum.is_scalar()) { + auto& arrayIndex = ArrayIndicies_[i]; + if (arrayIndex.empty()) { + // scalar continue; } - const auto& array = *datum.array(); - auto leftItem = State_.LeftReaders_[i]->GetItem(array, lhs); - auto rightItem = State_.RightReaders_[i]->GetItem(array, rhs); + auto leftItem = GetBlockItem(*State_.LeftReaders_[i], arrayIndex, lhs); + auto rightItem = GetBlockItem(*State_.RightReaders_[i], arrayIndex, rhs); auto cmp = State_.Comparators_[k]->Compare(leftItem, rightItem); if (cmp == 0) { continue; @@ -352,9 +364,19 @@ private: } private: + static TBlockItem GetBlockItem(IBlockReader& reader, const TChunkedArrayIndex& arrayIndex, ui64 idx) { + Y_VERIFY_DEBUG(!arrayIndex.empty()); + if (arrayIndex.size() == 1) { + return reader.GetItem(*arrayIndex.front().Data, idx); + } + + auto it = LookupArrayDataItem(arrayIndex.data(), arrayIndex.size(), idx); + return reader.GetItem(*it->Data, idx); + } + const TVector<ui32>& KeyIndicies_; + const TVector<TChunkedArrayIndex> ArrayIndicies_; const TState& State_; - const TVector<NUdf::TUnboxedValue>& Values_; }; IComputationWideFlowNode* Flow_; diff --git a/ydb/library/yql/public/udf/arrow/block_builder.h b/ydb/library/yql/public/udf/arrow/block_builder.h index d923a3e5ef..bcbd97361f 100644 --- a/ydb/library/yql/public/udf/arrow/block_builder.h +++ b/ydb/library/yql/public/udf/arrow/block_builder.h @@ -3,26 +3,52 @@ #include "util.h" #include "bit_util.h" #include "block_io_buffer.h" +#include "block_item.h" +#include <ydb/library/yql/public/udf/udf_value.h> #include <ydb/library/yql/public/udf/udf_type_inspection.h> #include <arrow/datum.h> #include <arrow/c/bridge.h> +#include <deque> + namespace NYql { namespace NUdf { class IArrayBuilder { public: + struct TArrayDataItem { + const arrow::ArrayData* Data = nullptr; + ui64 StartOffset; + }; virtual ~IArrayBuilder() = default; virtual size_t MaxLength() const = 0; virtual void Add(NUdf::TUnboxedValuePod value) = 0; virtual void Add(TBlockItem value) = 0; virtual void Add(TInputBuffer& input) = 0; virtual void AddMany(const arrow::ArrayData& array, size_t popCount, const ui8* sparseBitmap, size_t bitmapSize) = 0; + virtual void AddMany(const TArrayDataItem* arrays, size_t arrayCount, ui64 beginIndex, size_t count) = 0; + virtual void AddMany(const TArrayDataItem* arrays, size_t arrayCount, const ui64* indexes, size_t count) = 0; virtual arrow::Datum Build(bool finish) = 0; }; +inline const IArrayBuilder::TArrayDataItem* LookupArrayDataItem(const IArrayBuilder::TArrayDataItem* arrays, size_t arrayCount, ui64& idx) { + IArrayBuilder::TArrayDataItem lookup{ nullptr, idx }; + + auto it = std::lower_bound(arrays, arrays + arrayCount, lookup, [](const auto& left, const auto& right) { + return left.StartOffset < right.StartOffset; + }); + + if (it == arrays + arrayCount || it->StartOffset > idx) { + --it; + } + + Y_VERIFY_DEBUG(it->StartOffset <= idx); + idx -= it->StartOffset; + return it; +} + class IScalarBuilder { public: virtual ~IScalarBuilder() = default; @@ -38,6 +64,7 @@ inline std::shared_ptr<arrow::DataType> GetArrowType(const ITypeInfoHelper& type } class TArrayBuilderBase : public IArrayBuilder { + using Self = TArrayBuilderBase; public: using Ptr = std::unique_ptr<TArrayBuilderBase>; @@ -86,6 +113,16 @@ public: CurrLen++; } + inline void AddMany(const arrow::ArrayData& array, ui64 beginIndex, size_t count) { + TArrayDataItem item = { &array, 0 }; + Self::AddMany(&item, 1, beginIndex, count); + } + + inline void AddMany(const arrow::ArrayData& array, const ui64* indexes, size_t count) { + TArrayDataItem item = { &array, 0 }; + Self::AddMany(&item, 1, indexes, count); + } + void AddMany(const arrow::ArrayData& array, size_t popCount, const ui8* sparseBitmap, size_t bitmapSize) final { Y_VERIFY(size_t(array.length) == bitmapSize); Y_VERIFY(popCount <= bitmapSize); @@ -98,6 +135,68 @@ public: CurrLen += popCount; } + void AddMany(const TArrayDataItem* arrays, size_t arrayCount, ui64 beginIndex, size_t count) final { + Y_VERIFY(arrays); + Y_VERIFY(arrayCount > 0); + if (arrayCount == 1) { + Y_VERIFY(arrays->Data); + DoAddMany(*arrays->Data, beginIndex, count); + } else { + ui64 idx = beginIndex; + auto item = LookupArrayDataItem(arrays, arrayCount, idx); + size_t avail = item->Data->length; + size_t toAdd = count; + Y_VERIFY(idx <= avail); + while (toAdd) { + size_t adding = std::min(avail, toAdd); + DoAddMany(*item->Data, idx, adding); + avail -= adding; + toAdd -= adding; + + if (!avail && toAdd) { + ++item; + Y_VERIFY(item < arrays + arrayCount); + avail = item->Data->length; + idx = 0; + } + } + } + CurrLen += count; + } + + void AddMany(const TArrayDataItem* arrays, size_t arrayCount, const ui64* indexes, size_t count) final { + Y_VERIFY(arrays); + Y_VERIFY(arrayCount > 0); + Y_VERIFY(indexes); + Y_VERIFY(CurrLen + count <= MaxLen); + + if (arrayCount == 1) { + Y_VERIFY(arrays->Data); + DoAddMany(*arrays->Data, indexes, count); + } else { + const IArrayBuilder::TArrayDataItem* currData = nullptr; + TVector<ui64> currDataIndexes; + for (size_t i = 0; i < count; ++i) { + ui64 idx = indexes[i]; + const IArrayBuilder::TArrayDataItem* data = LookupArrayDataItem(arrays, arrayCount, idx); + if (!currData) { + currData = data; + } + + if (data != currData) { + DoAddMany(*currData->Data, currDataIndexes.data(), currDataIndexes.size()); + currDataIndexes.clear(); + currData = data; + } + currDataIndexes.push_back(idx); + } + if (!currDataIndexes.empty()) { + DoAddMany(*currData->Data, currDataIndexes.data(), currDataIndexes.size()); + } + } + CurrLen += count; + } + arrow::Datum Build(bool finish) final { auto tree = BuildTree(finish); TVector<std::shared_ptr<arrow::ArrayData>> chunks; @@ -119,6 +218,8 @@ protected: virtual void DoAdd(TInputBuffer& input) = 0; virtual void DoAddDefault() = 0; virtual void DoAddMany(const arrow::ArrayData& array, const ui8* sparseBitmap, size_t popCount) = 0; + virtual void DoAddMany(const arrow::ArrayData& array, ui64 beginIndex, size_t count) = 0; + virtual void DoAddMany(const arrow::ArrayData& array, const ui64* indexes, size_t count) = 0; virtual TBlockArrayTree::Ptr DoBuildTree(bool finish) = 0; private: @@ -254,6 +355,36 @@ public: DataBuilder->UnsafeAdvance(popCount); } + void DoAddMany(const arrow::ArrayData& array, ui64 beginIndex, size_t count) final { + Y_VERIFY(array.buffers.size() > 1); + if constexpr (Nullable) { + Y_VERIFY(NullBuilder->Length() == DataBuilder->Length()); + for (size_t i = beginIndex; i < beginIndex + count; ++i) { + NullBuilder->UnsafeAppend(IsNull(array, i)); + } + } + + const T* values = array.GetValues<T>(1); + for (size_t i = beginIndex; i < beginIndex + count; ++i) { + DataBuilder->UnsafeAppend(T(values[i])); + } + } + + void DoAddMany(const arrow::ArrayData& array, const ui64* indexes, size_t count) final { + Y_VERIFY(array.buffers.size() > 1); + if constexpr (Nullable) { + Y_VERIFY(NullBuilder->Length() == DataBuilder->Length()); + for (size_t i = 0; i < count; ++i) { + NullBuilder->UnsafeAppend(IsNull(array, indexes[i])); + } + } + + const T* values = array.GetValues<T>(1); + for (size_t i = 0; i < count; ++i) { + DataBuilder->UnsafeAppend(T(values[indexes[i]])); + } + } + TBlockArrayTree::Ptr DoBuildTree(bool finish) final { const size_t len = DataBuilder->Length(); std::shared_ptr<arrow::Buffer> nulls; @@ -443,6 +574,86 @@ public: } } + void DoAddMany(const arrow::ArrayData& array, ui64 beginIndex, size_t count) final { + Y_VERIFY(array.buffers.size() > 2); + Y_VERIFY(!Nullable || NullBuilder->Length() == OffsetsBuilder->Length()); + + size_t dataLen = DataBuilder->Length(); + + const TOffset* offsets = array.GetValues<TOffset>(1); + const ui8* srcData = array.GetValues<ui8>(2, 0); + const ui8* chunkStart = srcData + offsets[beginIndex]; + const ui8* chunkEnd = chunkStart; + for (size_t i = beginIndex; i < beginIndex + count; ++i) { + const ui8* begin = srcData + offsets[i]; + const ui8* end = srcData + offsets[i + 1]; + const size_t strSize = end - begin; + + size_t availBytes = std::max(dataLen, MaxBlockSizeInBytes) - dataLen; + for (;;) { + if (strSize <= availBytes) { + if constexpr (Nullable) { + NullBuilder->UnsafeAppend(IsNull(array, i)); + } + OffsetsBuilder->UnsafeAppend(TOffset(dataLen)); + chunkEnd = end; + dataLen += strSize; + break; + } + + if (dataLen) { + DataBuilder->UnsafeAppend(chunkStart, chunkEnd - chunkStart); + chunkStart = begin; + chunkEnd = end; + FlushChunk(false); + dataLen = 0; + } else { + DataBuilder->Reserve(strSize); + availBytes = strSize; + } + } + } + if (chunkStart != chunkEnd) { + DataBuilder->UnsafeAppend(chunkStart, chunkEnd - chunkStart); + } + } + + void DoAddMany(const arrow::ArrayData& array, const ui64* indexes, size_t count) final { + Y_VERIFY(array.buffers.size() > 2); + Y_VERIFY(!Nullable || NullBuilder->Length() == OffsetsBuilder->Length()); + + size_t dataLen = DataBuilder->Length(); + + const TOffset* offsets = array.GetValues<TOffset>(1); + const char* strData = array.GetValues<char>(2, 0); + for (size_t i = 0; i < count; ++i) { + ui64 idx = indexes[i]; + std::string_view str(strData + offsets[idx], offsets[idx + 1] - offsets[idx]); + + size_t availBytes = std::max(dataLen, MaxBlockSizeInBytes) - dataLen; + for (;;) { + if (str.size() <= availBytes) { + if constexpr (Nullable) { + NullBuilder->UnsafeAppend(IsNull(array, idx)); + } + OffsetsBuilder->UnsafeAppend(TOffset(dataLen)); + DataBuilder->UnsafeAppend((const ui8*)str.data(), str.size()); + dataLen += str.size(); + break; + } + + if (dataLen) { + FlushChunk(false); + dataLen = 0; + } else { + DataBuilder->Reserve(str.size()); + availBytes = str.size(); + } + } + } + } + + TBlockArrayTree::Ptr DoBuildTree(bool finish) final { FlushChunk(finish); TBlockArrayTree::Ptr result = std::make_shared<TBlockArrayTree>(); @@ -588,6 +799,36 @@ public: } } + void DoAddMany(const arrow::ArrayData& array, ui64 beginIndex, size_t count) final { + Y_VERIFY(!array.buffers.empty()); + Y_VERIFY(array.child_data.size() == Children.size()); + + if constexpr (Nullable) { + for (ui64 i = beginIndex; i < beginIndex + count; ++i) { + NullBuilder->UnsafeAppend(IsNull(array, i)); + } + } + + for (size_t i = 0; i < Children.size(); ++i) { + Children[i]->AddMany(*array.child_data[i], beginIndex, count); + } + } + + void DoAddMany(const arrow::ArrayData& array, const ui64* indexes, size_t count) final { + Y_VERIFY(!array.buffers.empty()); + Y_VERIFY(array.child_data.size() == Children.size()); + + if constexpr (Nullable) { + for (size_t i = 0; i < count; ++i) { + NullBuilder->UnsafeAppend(IsNull(array, indexes[i])); + } + } + + for (size_t i = 0; i < Children.size(); ++i) { + Children[i]->AddMany(*array.child_data[i], indexes, count); + } + } + TBlockArrayTree::Ptr DoBuildTree(bool finish) final { TBlockArrayTree::Ptr result = std::make_shared<TBlockArrayTree>(); @@ -687,6 +928,28 @@ public: Inner->AddMany(*array.child_data[0], popCount, sparseBitmap, array.length); } + void DoAddMany(const arrow::ArrayData& array, ui64 beginIndex, size_t count) final { + Y_VERIFY(!array.buffers.empty()); + Y_VERIFY(array.child_data.size() == 1); + + for (ui64 i = beginIndex; i < beginIndex + count; ++i) { + NullBuilder->UnsafeAppend(IsNull(array, i)); + } + + Inner->AddMany(*array.child_data[0], beginIndex, count); + } + + void DoAddMany(const arrow::ArrayData& array, const ui64* indexes, size_t count) final { + Y_VERIFY(!array.buffers.empty()); + Y_VERIFY(array.child_data.size() == 1); + + for (size_t i = 0; i < count; ++i) { + NullBuilder->UnsafeAppend(IsNull(array, indexes[i])); + } + + Inner->AddMany(*array.child_data[0], indexes, count); + } + TBlockArrayTree::Ptr DoBuildTree(bool finish) final { TBlockArrayTree::Ptr result = std::make_shared<TBlockArrayTree>(); |