aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoraneporada <aneporada@ydb.tech>2023-02-21 07:25:55 +0300
committeraneporada <aneporada@ydb.tech>2023-02-21 07:25:55 +0300
commitfc7cc9d8fb7089819868bb4c5446fef707b2c61c (patch)
treefabcea8ada5348839af845b78f6c293d9f831a3e
parent95faf209707e9cb22e5109c9bf35611fe7c1d449 (diff)
downloadydb-fc7cc9d8fb7089819868bb4c5446fef707b2c61c.tar.gz
Fix handling of chunked arrays in BlockTop
Add more IArrayBuilder::AddMany() overloads
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_block_top.cpp100
-rw-r--r--ydb/library/yql/public/udf/arrow/block_builder.h263
2 files changed, 324 insertions, 39 deletions
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_top.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_top.cpp
index 63e4b01324..d50b5a1c76 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_block_top.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_top.cpp
@@ -20,6 +20,7 @@ namespace NMiniKQL {
namespace {
class TTopBlocksWrapper : public TStatefulWideFlowBlockComputationNode<TTopBlocksWrapper> {
+ using TChunkedArrayIndex = TVector<IArrayBuilder::TArrayDataItem>;
public:
TTopBlocksWrapper(TComputationMutables& mutables, IComputationWideFlowNode* flow, TTupleType* tupleType, IComputationNode* count,
TVector<IComputationNode*>&& directions, TVector<ui32>&& keyIndicies, bool sort)
@@ -88,7 +89,15 @@ public:
blockIndicies->emplace_back(row);
}
- TBlockLess cmp(KeyIndicies_, s, s.Values_);
+ TVector<TChunkedArrayIndex> arrayIndicies(Columns_.size());
+ for (ui32 i = 0; i < Columns_.size(); ++i) {
+ if (Columns_[i]->GetShape() != TBlockType::EShape::Scalar) {
+ auto datum = TArrowBlock::From(s.Values_[i]).GetDatum();
+ arrayIndicies[i] = MakeChunkedArrayIndex(datum);
+ }
+ }
+
+ TBlockLess cmp(KeyIndicies_, s, arrayIndicies);
NYql::FastNthElement(blockIndicies->begin(), blockIndicies->begin() + s.Count_, blockIndicies->end(), cmp);
}
@@ -144,8 +153,6 @@ private:
TVector<NUdf::TUnboxedValue> Values_;
TVector<NUdf::TUnboxedValue*> ValuePointers_;
- TVector<NUdf::TUnboxedValue> TmpValues_;
-
TState(TMemoryUsageInfo* memInfo, const TVector<ui32>& keyIndicies, const TVector<TBlockType*>& columns)
: TComputationValue(memInfo)
{
@@ -168,8 +175,6 @@ private:
ValuePointers_[i] = &Values_[i];
}
- TmpValues_.resize(columns.size());
-
Comparators_.resize(keyIndicies.size());
for (ui32 k = 0; k < keyIndicies.size(); ++k) {
Comparators_[k] = NUdf::MakeBlockItemComparator(TTypeInfoHelper(), columns[keyIndicies[k]]->GetItemType());
@@ -202,11 +207,13 @@ private:
void CompressBuilders(bool sort, const TVector<TBlockType*>& columns, const TVector<ui32>& keyIndicies, TComputationContext& ctx) {
Y_VERIFY(ScalarsFilled_);
+ TVector<TChunkedArrayIndex> arrayIndicies(columns.size());
+ TVector<arrow::Datum> tmpDatums(columns.size());
for (ui32 i = 0; i < columns.size(); ++i) {
- if (columns[i]->GetShape() == TBlockType::EShape::Scalar) {
- TmpValues_[i] = ScalarValues_[i];
- } else {
- TmpValues_[i] = ctx.HolderFactory.CreateArrowBlock(arrow::Datum(Builders_[i]->Build(false)));
+ if (columns[i]->GetShape() != TBlockType::EShape::Scalar) {
+ auto datum = Builders_[i]->Build(false);
+ arrayIndicies[i] = MakeChunkedArrayIndex(datum);
+ tmpDatums[i] = std::move(datum);
}
}
@@ -217,7 +224,7 @@ private:
}
ui64 blockLen = Min(BuilderLength_, Count_);
- TBlockLess cmp(keyIndicies, *this, TmpValues_);
+ TBlockLess cmp(keyIndicies, *this, arrayIndicies);
if (BuilderLength_ <= Count_) {
if (sort) {
std::sort(blockIndicies.begin(), blockIndicies.end(), cmp);
@@ -235,18 +242,11 @@ private:
continue;
}
- const auto& datum = TArrowBlock::From(TmpValues_[i]).GetDatum();
- const auto& array = *datum.array();
- for (ui64 j = 0; j < blockLen; ++j) {
- Builders_[i]->Add(LeftReaders_[i]->GetItem(array, blockIndicies[j]));
- }
+ auto& arrayIndex = arrayIndicies[i];
+ Builders_[i]->AddMany(arrayIndex.data(), arrayIndex.size(), blockIndicies.data(), blockLen);
}
BuilderLength_ = blockLen;
-
- for (ui32 i = 0; i < columns.size(); ++i) {
- TmpValues_[i] = {};
- }
}
void FillOutput(const TVector<TBlockType*>& columns, NUdf::TUnboxedValue*const* output, TComputationContext& ctx) {
@@ -272,15 +272,11 @@ private:
}
const auto& datum = TArrowBlock::From(Values_[i]).GetDatum();
- const auto& array = *datum.array();
+ auto arrayIndex = MakeChunkedArrayIndex(datum);
if (blockIndicies) {
- for (ui64 j = 0; j < Count_; ++j) {
- Builders_[i]->Add(LeftReaders_[i]->GetItem(array, (*blockIndicies)[j]));
- }
+ Builders_[i]->AddMany(arrayIndex.data(), arrayIndex.size(), blockIndicies->data(), Count_);
} else {
- for (ui64 row = 0; row < blockLen; ++row) {
- Builders_[i]->Add(LeftReaders_[i]->GetItem(array, row));
- }
+ Builders_[i]->AddMany(arrayIndex.data(), arrayIndex.size(), ui64(0), blockLen);
}
}
@@ -300,25 +296,41 @@ private:
return *static_cast<TState*>(state.AsBoxed().Get());
}
+ static TChunkedArrayIndex MakeChunkedArrayIndex(const arrow::Datum& datum) {
+ TChunkedArrayIndex result;
+ if (datum.is_array()) {
+ result.push_back({datum.array().get(), 0});
+ } else {
+ auto chunks = datum.chunks();
+ ui64 offset = 0;
+ for (auto& chunk : chunks) {
+ auto arrayData = chunk->data();
+ result.push_back({arrayData.get(), offset});
+ offset += arrayData->length;
+ }
+ }
+ return result;
+ }
+
class TBlockLess {
public:
- TBlockLess(const TVector<ui32>& keyIndicies, const TState& state, const TVector<NUdf::TUnboxedValue>& values)
+ TBlockLess(const TVector<ui32>& keyIndicies, const TState& state, const TVector<TChunkedArrayIndex>& arrayIndicies)
: KeyIndicies_(keyIndicies)
+ , ArrayIndicies_(arrayIndicies)
, State_(state)
- , Values_(values)
{}
bool operator()(ui64 lhs, ui64 rhs) const {
if (KeyIndicies_.size() == 1) {
auto i = KeyIndicies_[0];
- const auto& datum = TArrowBlock::From(Values_[i]).GetDatum();
- if (datum.is_scalar()) {
+ auto& arrayIndex = ArrayIndicies_[i];
+ if (arrayIndex.empty()) {
+ // scalar
return false;
}
- const auto& array = *datum.array();
- auto leftItem = State_.LeftReaders_[i]->GetItem(array, lhs);
- auto rightItem = State_.RightReaders_[i]->GetItem(array, rhs);
+ auto leftItem = GetBlockItem(*State_.LeftReaders_[i], arrayIndex, lhs);
+ auto rightItem = GetBlockItem(*State_.RightReaders_[i], arrayIndex, rhs);
if (State_.Directions_[0]) {
return State_.Comparators_[0]->Less(leftItem, rightItem);
} else {
@@ -327,14 +339,14 @@ private:
} else {
for (ui32 k = 0; k < KeyIndicies_.size(); ++k) {
auto i = KeyIndicies_[k];
- const auto& datum = TArrowBlock::From(Values_[i]).GetDatum();
- if (datum.is_scalar()) {
+ auto& arrayIndex = ArrayIndicies_[i];
+ if (arrayIndex.empty()) {
+ // scalar
continue;
}
- const auto& array = *datum.array();
- auto leftItem = State_.LeftReaders_[i]->GetItem(array, lhs);
- auto rightItem = State_.RightReaders_[i]->GetItem(array, rhs);
+ auto leftItem = GetBlockItem(*State_.LeftReaders_[i], arrayIndex, lhs);
+ auto rightItem = GetBlockItem(*State_.RightReaders_[i], arrayIndex, rhs);
auto cmp = State_.Comparators_[k]->Compare(leftItem, rightItem);
if (cmp == 0) {
continue;
@@ -352,9 +364,19 @@ private:
}
private:
+ static TBlockItem GetBlockItem(IBlockReader& reader, const TChunkedArrayIndex& arrayIndex, ui64 idx) {
+ Y_VERIFY_DEBUG(!arrayIndex.empty());
+ if (arrayIndex.size() == 1) {
+ return reader.GetItem(*arrayIndex.front().Data, idx);
+ }
+
+ auto it = LookupArrayDataItem(arrayIndex.data(), arrayIndex.size(), idx);
+ return reader.GetItem(*it->Data, idx);
+ }
+
const TVector<ui32>& KeyIndicies_;
+ const TVector<TChunkedArrayIndex> ArrayIndicies_;
const TState& State_;
- const TVector<NUdf::TUnboxedValue>& Values_;
};
IComputationWideFlowNode* Flow_;
diff --git a/ydb/library/yql/public/udf/arrow/block_builder.h b/ydb/library/yql/public/udf/arrow/block_builder.h
index d923a3e5ef..bcbd97361f 100644
--- a/ydb/library/yql/public/udf/arrow/block_builder.h
+++ b/ydb/library/yql/public/udf/arrow/block_builder.h
@@ -3,26 +3,52 @@
#include "util.h"
#include "bit_util.h"
#include "block_io_buffer.h"
+#include "block_item.h"
+#include <ydb/library/yql/public/udf/udf_value.h>
#include <ydb/library/yql/public/udf/udf_type_inspection.h>
#include <arrow/datum.h>
#include <arrow/c/bridge.h>
+#include <deque>
+
namespace NYql {
namespace NUdf {
class IArrayBuilder {
public:
+ struct TArrayDataItem {
+ const arrow::ArrayData* Data = nullptr;
+ ui64 StartOffset;
+ };
virtual ~IArrayBuilder() = default;
virtual size_t MaxLength() const = 0;
virtual void Add(NUdf::TUnboxedValuePod value) = 0;
virtual void Add(TBlockItem value) = 0;
virtual void Add(TInputBuffer& input) = 0;
virtual void AddMany(const arrow::ArrayData& array, size_t popCount, const ui8* sparseBitmap, size_t bitmapSize) = 0;
+ virtual void AddMany(const TArrayDataItem* arrays, size_t arrayCount, ui64 beginIndex, size_t count) = 0;
+ virtual void AddMany(const TArrayDataItem* arrays, size_t arrayCount, const ui64* indexes, size_t count) = 0;
virtual arrow::Datum Build(bool finish) = 0;
};
+inline const IArrayBuilder::TArrayDataItem* LookupArrayDataItem(const IArrayBuilder::TArrayDataItem* arrays, size_t arrayCount, ui64& idx) {
+ IArrayBuilder::TArrayDataItem lookup{ nullptr, idx };
+
+ auto it = std::lower_bound(arrays, arrays + arrayCount, lookup, [](const auto& left, const auto& right) {
+ return left.StartOffset < right.StartOffset;
+ });
+
+ if (it == arrays + arrayCount || it->StartOffset > idx) {
+ --it;
+ }
+
+ Y_VERIFY_DEBUG(it->StartOffset <= idx);
+ idx -= it->StartOffset;
+ return it;
+}
+
class IScalarBuilder {
public:
virtual ~IScalarBuilder() = default;
@@ -38,6 +64,7 @@ inline std::shared_ptr<arrow::DataType> GetArrowType(const ITypeInfoHelper& type
}
class TArrayBuilderBase : public IArrayBuilder {
+ using Self = TArrayBuilderBase;
public:
using Ptr = std::unique_ptr<TArrayBuilderBase>;
@@ -86,6 +113,16 @@ public:
CurrLen++;
}
+ inline void AddMany(const arrow::ArrayData& array, ui64 beginIndex, size_t count) {
+ TArrayDataItem item = { &array, 0 };
+ Self::AddMany(&item, 1, beginIndex, count);
+ }
+
+ inline void AddMany(const arrow::ArrayData& array, const ui64* indexes, size_t count) {
+ TArrayDataItem item = { &array, 0 };
+ Self::AddMany(&item, 1, indexes, count);
+ }
+
void AddMany(const arrow::ArrayData& array, size_t popCount, const ui8* sparseBitmap, size_t bitmapSize) final {
Y_VERIFY(size_t(array.length) == bitmapSize);
Y_VERIFY(popCount <= bitmapSize);
@@ -98,6 +135,68 @@ public:
CurrLen += popCount;
}
+ void AddMany(const TArrayDataItem* arrays, size_t arrayCount, ui64 beginIndex, size_t count) final {
+ Y_VERIFY(arrays);
+ Y_VERIFY(arrayCount > 0);
+ if (arrayCount == 1) {
+ Y_VERIFY(arrays->Data);
+ DoAddMany(*arrays->Data, beginIndex, count);
+ } else {
+ ui64 idx = beginIndex;
+ auto item = LookupArrayDataItem(arrays, arrayCount, idx);
+ size_t avail = item->Data->length;
+ size_t toAdd = count;
+ Y_VERIFY(idx <= avail);
+ while (toAdd) {
+ size_t adding = std::min(avail, toAdd);
+ DoAddMany(*item->Data, idx, adding);
+ avail -= adding;
+ toAdd -= adding;
+
+ if (!avail && toAdd) {
+ ++item;
+ Y_VERIFY(item < arrays + arrayCount);
+ avail = item->Data->length;
+ idx = 0;
+ }
+ }
+ }
+ CurrLen += count;
+ }
+
+ void AddMany(const TArrayDataItem* arrays, size_t arrayCount, const ui64* indexes, size_t count) final {
+ Y_VERIFY(arrays);
+ Y_VERIFY(arrayCount > 0);
+ Y_VERIFY(indexes);
+ Y_VERIFY(CurrLen + count <= MaxLen);
+
+ if (arrayCount == 1) {
+ Y_VERIFY(arrays->Data);
+ DoAddMany(*arrays->Data, indexes, count);
+ } else {
+ const IArrayBuilder::TArrayDataItem* currData = nullptr;
+ TVector<ui64> currDataIndexes;
+ for (size_t i = 0; i < count; ++i) {
+ ui64 idx = indexes[i];
+ const IArrayBuilder::TArrayDataItem* data = LookupArrayDataItem(arrays, arrayCount, idx);
+ if (!currData) {
+ currData = data;
+ }
+
+ if (data != currData) {
+ DoAddMany(*currData->Data, currDataIndexes.data(), currDataIndexes.size());
+ currDataIndexes.clear();
+ currData = data;
+ }
+ currDataIndexes.push_back(idx);
+ }
+ if (!currDataIndexes.empty()) {
+ DoAddMany(*currData->Data, currDataIndexes.data(), currDataIndexes.size());
+ }
+ }
+ CurrLen += count;
+ }
+
arrow::Datum Build(bool finish) final {
auto tree = BuildTree(finish);
TVector<std::shared_ptr<arrow::ArrayData>> chunks;
@@ -119,6 +218,8 @@ protected:
virtual void DoAdd(TInputBuffer& input) = 0;
virtual void DoAddDefault() = 0;
virtual void DoAddMany(const arrow::ArrayData& array, const ui8* sparseBitmap, size_t popCount) = 0;
+ virtual void DoAddMany(const arrow::ArrayData& array, ui64 beginIndex, size_t count) = 0;
+ virtual void DoAddMany(const arrow::ArrayData& array, const ui64* indexes, size_t count) = 0;
virtual TBlockArrayTree::Ptr DoBuildTree(bool finish) = 0;
private:
@@ -254,6 +355,36 @@ public:
DataBuilder->UnsafeAdvance(popCount);
}
+ void DoAddMany(const arrow::ArrayData& array, ui64 beginIndex, size_t count) final {
+ Y_VERIFY(array.buffers.size() > 1);
+ if constexpr (Nullable) {
+ Y_VERIFY(NullBuilder->Length() == DataBuilder->Length());
+ for (size_t i = beginIndex; i < beginIndex + count; ++i) {
+ NullBuilder->UnsafeAppend(IsNull(array, i));
+ }
+ }
+
+ const T* values = array.GetValues<T>(1);
+ for (size_t i = beginIndex; i < beginIndex + count; ++i) {
+ DataBuilder->UnsafeAppend(T(values[i]));
+ }
+ }
+
+ void DoAddMany(const arrow::ArrayData& array, const ui64* indexes, size_t count) final {
+ Y_VERIFY(array.buffers.size() > 1);
+ if constexpr (Nullable) {
+ Y_VERIFY(NullBuilder->Length() == DataBuilder->Length());
+ for (size_t i = 0; i < count; ++i) {
+ NullBuilder->UnsafeAppend(IsNull(array, indexes[i]));
+ }
+ }
+
+ const T* values = array.GetValues<T>(1);
+ for (size_t i = 0; i < count; ++i) {
+ DataBuilder->UnsafeAppend(T(values[indexes[i]]));
+ }
+ }
+
TBlockArrayTree::Ptr DoBuildTree(bool finish) final {
const size_t len = DataBuilder->Length();
std::shared_ptr<arrow::Buffer> nulls;
@@ -443,6 +574,86 @@ public:
}
}
+ void DoAddMany(const arrow::ArrayData& array, ui64 beginIndex, size_t count) final {
+ Y_VERIFY(array.buffers.size() > 2);
+ Y_VERIFY(!Nullable || NullBuilder->Length() == OffsetsBuilder->Length());
+
+ size_t dataLen = DataBuilder->Length();
+
+ const TOffset* offsets = array.GetValues<TOffset>(1);
+ const ui8* srcData = array.GetValues<ui8>(2, 0);
+ const ui8* chunkStart = srcData + offsets[beginIndex];
+ const ui8* chunkEnd = chunkStart;
+ for (size_t i = beginIndex; i < beginIndex + count; ++i) {
+ const ui8* begin = srcData + offsets[i];
+ const ui8* end = srcData + offsets[i + 1];
+ const size_t strSize = end - begin;
+
+ size_t availBytes = std::max(dataLen, MaxBlockSizeInBytes) - dataLen;
+ for (;;) {
+ if (strSize <= availBytes) {
+ if constexpr (Nullable) {
+ NullBuilder->UnsafeAppend(IsNull(array, i));
+ }
+ OffsetsBuilder->UnsafeAppend(TOffset(dataLen));
+ chunkEnd = end;
+ dataLen += strSize;
+ break;
+ }
+
+ if (dataLen) {
+ DataBuilder->UnsafeAppend(chunkStart, chunkEnd - chunkStart);
+ chunkStart = begin;
+ chunkEnd = end;
+ FlushChunk(false);
+ dataLen = 0;
+ } else {
+ DataBuilder->Reserve(strSize);
+ availBytes = strSize;
+ }
+ }
+ }
+ if (chunkStart != chunkEnd) {
+ DataBuilder->UnsafeAppend(chunkStart, chunkEnd - chunkStart);
+ }
+ }
+
+ void DoAddMany(const arrow::ArrayData& array, const ui64* indexes, size_t count) final {
+ Y_VERIFY(array.buffers.size() > 2);
+ Y_VERIFY(!Nullable || NullBuilder->Length() == OffsetsBuilder->Length());
+
+ size_t dataLen = DataBuilder->Length();
+
+ const TOffset* offsets = array.GetValues<TOffset>(1);
+ const char* strData = array.GetValues<char>(2, 0);
+ for (size_t i = 0; i < count; ++i) {
+ ui64 idx = indexes[i];
+ std::string_view str(strData + offsets[idx], offsets[idx + 1] - offsets[idx]);
+
+ size_t availBytes = std::max(dataLen, MaxBlockSizeInBytes) - dataLen;
+ for (;;) {
+ if (str.size() <= availBytes) {
+ if constexpr (Nullable) {
+ NullBuilder->UnsafeAppend(IsNull(array, idx));
+ }
+ OffsetsBuilder->UnsafeAppend(TOffset(dataLen));
+ DataBuilder->UnsafeAppend((const ui8*)str.data(), str.size());
+ dataLen += str.size();
+ break;
+ }
+
+ if (dataLen) {
+ FlushChunk(false);
+ dataLen = 0;
+ } else {
+ DataBuilder->Reserve(str.size());
+ availBytes = str.size();
+ }
+ }
+ }
+ }
+
+
TBlockArrayTree::Ptr DoBuildTree(bool finish) final {
FlushChunk(finish);
TBlockArrayTree::Ptr result = std::make_shared<TBlockArrayTree>();
@@ -588,6 +799,36 @@ public:
}
}
+ void DoAddMany(const arrow::ArrayData& array, ui64 beginIndex, size_t count) final {
+ Y_VERIFY(!array.buffers.empty());
+ Y_VERIFY(array.child_data.size() == Children.size());
+
+ if constexpr (Nullable) {
+ for (ui64 i = beginIndex; i < beginIndex + count; ++i) {
+ NullBuilder->UnsafeAppend(IsNull(array, i));
+ }
+ }
+
+ for (size_t i = 0; i < Children.size(); ++i) {
+ Children[i]->AddMany(*array.child_data[i], beginIndex, count);
+ }
+ }
+
+ void DoAddMany(const arrow::ArrayData& array, const ui64* indexes, size_t count) final {
+ Y_VERIFY(!array.buffers.empty());
+ Y_VERIFY(array.child_data.size() == Children.size());
+
+ if constexpr (Nullable) {
+ for (size_t i = 0; i < count; ++i) {
+ NullBuilder->UnsafeAppend(IsNull(array, indexes[i]));
+ }
+ }
+
+ for (size_t i = 0; i < Children.size(); ++i) {
+ Children[i]->AddMany(*array.child_data[i], indexes, count);
+ }
+ }
+
TBlockArrayTree::Ptr DoBuildTree(bool finish) final {
TBlockArrayTree::Ptr result = std::make_shared<TBlockArrayTree>();
@@ -687,6 +928,28 @@ public:
Inner->AddMany(*array.child_data[0], popCount, sparseBitmap, array.length);
}
+ void DoAddMany(const arrow::ArrayData& array, ui64 beginIndex, size_t count) final {
+ Y_VERIFY(!array.buffers.empty());
+ Y_VERIFY(array.child_data.size() == 1);
+
+ for (ui64 i = beginIndex; i < beginIndex + count; ++i) {
+ NullBuilder->UnsafeAppend(IsNull(array, i));
+ }
+
+ Inner->AddMany(*array.child_data[0], beginIndex, count);
+ }
+
+ void DoAddMany(const arrow::ArrayData& array, const ui64* indexes, size_t count) final {
+ Y_VERIFY(!array.buffers.empty());
+ Y_VERIFY(array.child_data.size() == 1);
+
+ for (size_t i = 0; i < count; ++i) {
+ NullBuilder->UnsafeAppend(IsNull(array, indexes[i]));
+ }
+
+ Inner->AddMany(*array.child_data[0], indexes, count);
+ }
+
TBlockArrayTree::Ptr DoBuildTree(bool finish) final {
TBlockArrayTree::Ptr result = std::make_shared<TBlockArrayTree>();