diff options
author | Igor Munkin <imunkin@ydb.tech> | 2024-08-29 15:07:38 +0500 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-08-29 15:07:38 +0500 |
commit | 47066e988649275ba87b47bc47b7ef4fcdea560b (patch) | |
tree | fb60e74a4c7178994820d5e25ea6f6d25ad7a0f3 | |
parent | 6d41d83f92ba5ba9a045f2edd2cf41aae7187272 (diff) | |
download | ydb-47066e988649275ba87b47bc47b7ef4fcdea560b.tar.gz |
Introduce TBlockJoinState (#8368)
-rw-r--r-- | ydb/library/yql/minikql/comp_nodes/mkql_block_map_join.cpp | 340 | ||||
-rw-r--r-- | ydb/library/yql/minikql/computation/mkql_block_impl.cpp | 10 |
2 files changed, 149 insertions, 201 deletions
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_map_join.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_map_join.cpp index f97b90bf73..bcaf1ca751 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_block_map_join.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_map_join.cpp @@ -21,10 +21,154 @@ size_t CalcMaxBlockLength(const TVector<TType*>& items) { })); } +template <bool RightRequired> +class TBlockJoinState : public TBlockState { +public: + TBlockJoinState(TMemoryUsageInfo* memInfo, TComputationContext& ctx, + const TVector<TType*>& inputItems, + const TVector<TType*> outputItems, + NUdf::TUnboxedValue**const fields) + : TBlockState(memInfo, outputItems.size()) + , InputWidth_(inputItems.size() - 1) + , OutputWidth_(outputItems.size() - 1) + , Inputs_(inputItems.size()) + , InputsDescr_(ToValueDescr(inputItems)) + { + const auto& pgBuilder = ctx.Builder->GetPgBuilder(); + MaxLength_ = CalcMaxBlockLength(outputItems); + for (size_t i = 0; i < inputItems.size(); i++) { + fields[i] = &Inputs_[i]; + const TType* blockItemType = AS_TYPE(TBlockType, inputItems[i])->GetItemType(); + Readers_.push_back(MakeBlockReader(TTypeInfoHelper(), blockItemType)); + Converters_.push_back(MakeBlockItemConverter(TTypeInfoHelper(), blockItemType, pgBuilder)); + } + // The last output column (i.e. block length) doesn't require a block builder. + for (size_t i = 0; i < OutputWidth_; i++) { + const TType* blockItemType = AS_TYPE(TBlockType, outputItems[i])->GetItemType(); + Builders_.push_back(MakeArrayBuilder(TTypeInfoHelper(), blockItemType, ctx.ArrowMemoryPool, MaxLength_, &pgBuilder, &BuilderAllocatedSize_)); + } + MaxBuilderAllocatedSize_ = MaxAllocatedFactor_ * BuilderAllocatedSize_; + } + + void CopyRow() { + // Copy items from the "left" flow. + for (size_t i = 0; i < InputWidth_; i++) { + AddItem(GetItem(i), i); + } + OutputRows_++; + } + + void MakeRow(const NUdf::TUnboxedValuePod& value) { + // Copy items from the "left" flow. + for (size_t i = 0; i < InputWidth_; i++) { + AddItem(GetItem(i), i); + } + // Convert and append items from the "right" dict. + if constexpr (RightRequired) { + for (size_t i = InputWidth_, j = 0; i < OutputWidth_; i++, j++) { + AddValue(value.GetElement(j), i); + } + } else { + if (value) { + for (size_t i = InputWidth_, j = 0; i < OutputWidth_; i++, j++) { + AddValue(value.GetElement(j), i); + } + } else { + for (size_t i = InputWidth_; i < OutputWidth_; i++) { + AddValue(value, i); + } + } + } + OutputRows_++; + } + + void MakeBlocks(const THolderFactory& holderFactory) { + Values.back() = holderFactory.CreateArrowBlock(arrow::Datum(std::make_shared<arrow::UInt64Scalar>(OutputRows_))); + OutputRows_ = 0; + BuilderAllocatedSize_ = 0; + + for (size_t i = 0; i < Builders_.size(); i++) { + Values[i] = holderFactory.CreateArrowBlock(Builders_[i]->Build(IsFinished_)); + } + FillArrays(); + } + + TBlockItem GetItem(size_t idx) const { + const auto& datum = TArrowBlock::From(Inputs_[idx]).GetDatum(); + ARROW_DEBUG_CHECK_DATUM_TYPES(InputsDescr_[idx], datum.descr()); + if (datum.is_scalar()) { + return Readers_[idx]->GetScalarItem(*datum.scalar()); + } + MKQL_ENSURE(datum.is_array(), "Expecting array"); + return Readers_[idx]->GetItem(*datum.array(), Current_); + } + + NUdf::TUnboxedValuePod GetValue(const THolderFactory& holderFactory, size_t idx) const { + return Converters_[idx]->MakeValue(GetItem(idx), holderFactory); + } + + void Reset() { + Next_ = 0; + InputRows_ = GetBlockCount(Inputs_.back()); + } + + void Finish() { + IsFinished_ = true; + } + + bool NextRow() { + if (Next_ >= InputRows_) { + return false; + } + Current_ = Next_++; + return true; + } + + bool IsNotFull() const { + return OutputRows_ < MaxLength_ + && BuilderAllocatedSize_ <= MaxBuilderAllocatedSize_; + } + + bool IsEmpty() const { + return OutputRows_ == 0; + } + + bool IsFinished() const { + return IsFinished_; + } + +private: + void AddItem(const TBlockItem& item, size_t idx) { + Builders_[idx]->Add(item); + } + + void AddValue(const NUdf::TUnboxedValuePod& value, size_t idx) { + Builders_[idx]->Add(value); + } + + size_t Current_ = 0; + size_t Next_ = 0; + bool IsFinished_ = false; + size_t MaxLength_; + size_t BuilderAllocatedSize_ = 0; + size_t MaxBuilderAllocatedSize_ = 0; + static const size_t MaxAllocatedFactor_ = 4; + size_t InputRows_ = 0; + size_t OutputRows_ = 0; + size_t InputWidth_; + size_t OutputWidth_; + TUnboxedValueVector Inputs_; + const std::vector<arrow::ValueDescr> InputsDescr_; + TVector<std::unique_ptr<IBlockReader>> Readers_; + TVector<std::unique_ptr<IBlockItemConverter>> Converters_; + TVector<std::unique_ptr<IArrayBuilder>> Builders_; +}; + template <bool WithoutRight, bool RightRequired> class TBlockWideMapJoinWrapper : public TStatefulWideFlowComputationNode<TBlockWideMapJoinWrapper<WithoutRight, RightRequired>> { using TBaseComputation = TStatefulWideFlowComputationNode<TBlockWideMapJoinWrapper<WithoutRight, RightRequired>>; +using TState = TBlockJoinState<RightRequired>; public: TBlockWideMapJoinWrapper(TComputationMutables& mutables, const TVector<TType*>&& resultJoinItems, const TVector<TType*>&& leftFlowItems, @@ -36,7 +180,7 @@ public: , LeftKeyColumns_(std::move(leftKeyColumns)) , Flow_(flow) , Dict_(dict) - , WideFieldsIndex_(mutables.IncrementWideFieldsIndex(ResultJoinItems_.size())) + , WideFieldsIndex_(mutables.IncrementWideFieldsIndex(LeftFlowItems_.size())) {} EFetchResult DoCalculate(NUdf::TUnboxedValue& state, TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const { @@ -79,7 +223,7 @@ public: if (s.IsEmpty()) { return EFetchResult::Finish; } - s.MakeBlocks(); + s.MakeBlocks(ctx.HolderFactory); const auto sliceSize = s.Slice(); for (size_t i = 0; i < ResultJoinItems_.size(); i++) { @@ -98,198 +242,6 @@ private: } } - class TState : public TComputationValue<TState> { - using TBase = TComputationValue<TState>; - size_t Current_ = 0; - size_t Next_ = 0; - bool IsFinished_ = false; - size_t MaxLength_; - size_t BuilderAllocatedSize_ = 0; - size_t MaxBuilderAllocatedSize_ = 0; - static const size_t MaxAllocatedFactor_ = 4; - size_t InputRows_ = 0; - size_t OutputRows_ = 0; - size_t InputWidth_; - size_t OutputWidth_; - TUnboxedValueVector Inputs_; - const std::vector<arrow::ValueDescr> InputsDescr_; - TVector<std::deque<std::shared_ptr<arrow::ArrayData>>> Deques; - TVector<std::shared_ptr<arrow::ArrayData>> Arrays; - TVector<std::unique_ptr<IBlockReader>> Readers_; - TVector<std::unique_ptr<IBlockItemConverter>> Converters_; - TVector<std::unique_ptr<IArrayBuilder>> Builders_; - - public: - TState(TMemoryUsageInfo* memInfo, TComputationContext& ctx, - const TVector<TType*>& inputItems, const TVector<TType*> outputItems, - NUdf::TUnboxedValue**const fields) - : TBase(memInfo) - , InputWidth_(inputItems.size() - 1) - , OutputWidth_(outputItems.size() - 1) - , Inputs_(inputItems.size()) - , InputsDescr_(ToValueDescr(inputItems)) - , Deques(OutputWidth_) - , Arrays(OutputWidth_) - { - const auto& pgBuilder = ctx.Builder->GetPgBuilder(); - MaxLength_ = CalcMaxBlockLength(outputItems); - for (size_t i = 0; i < inputItems.size(); i++) { - fields[i] = &Inputs_[i]; - const TType* blockItemType = AS_TYPE(TBlockType, inputItems[i])->GetItemType(); - Readers_.push_back(MakeBlockReader(TTypeInfoHelper(), blockItemType)); - Converters_.push_back(MakeBlockItemConverter(TTypeInfoHelper(), blockItemType, pgBuilder)); - } - // The last output column (i.e. block length) doesn't require a block builder. - for (size_t i = 0; i < OutputWidth_; i++) { - const TType* blockItemType = AS_TYPE(TBlockType, outputItems[i])->GetItemType(); - Builders_.push_back(MakeArrayBuilder(TTypeInfoHelper(), blockItemType, ctx.ArrowMemoryPool, MaxLength_, &pgBuilder, &BuilderAllocatedSize_)); - } - MaxBuilderAllocatedSize_ = MaxAllocatedFactor_ * BuilderAllocatedSize_; - } - - void Reset() { - Next_ = 0; - InputRows_ = GetBlockCount(Inputs_.back()); - } - - void Finish() { - IsFinished_ = true; - } - - bool NextRow() { - if (Next_ >= InputRows_) { - return false; - } - Current_ = Next_++; - return true; - } - - bool IsNotFull() { - return OutputRows_ < MaxLength_ - && BuilderAllocatedSize_ <= MaxBuilderAllocatedSize_; - } - - bool IsEmpty() { - return OutputRows_ == 0; - } - - bool IsFinished() { - return IsFinished_; - } - - TBlockItem GetItem(size_t idx) const { - const auto& datum = TArrowBlock::From(Inputs_[idx]).GetDatum(); - ARROW_DEBUG_CHECK_DATUM_TYPES(InputsDescr_[idx], datum.descr()); - if (datum.is_scalar()) { - return Readers_[idx]->GetScalarItem(*datum.scalar()); - } - MKQL_ENSURE(datum.is_array(), "Expecting array"); - return Readers_[idx]->GetItem(*datum.array(), Current_); - } - - NUdf::TUnboxedValuePod GetValue(const THolderFactory& holderFactory, size_t idx) const { - return Converters_[idx]->MakeValue(GetItem(idx), holderFactory); - } - - void AddValue(const NUdf::TUnboxedValuePod& value, size_t idx) { - Builders_[idx]->Add(value); - } - - void AddItem(const TBlockItem& item, size_t idx) { - Builders_[idx]->Add(item); - } - - void CopyRow() { - // Copy items from the "left" flow. - for (size_t i = 0; i < InputWidth_; i++) { - AddItem(GetItem(i), i); - } - OutputRows_++; - } - - void MakeRow(const NUdf::TUnboxedValuePod& value) { - // Copy items from the "left" flow. - for (size_t i = 0; i < InputWidth_; i++) { - AddItem(GetItem(i), i); - } - // Convert and append items from the "right" dict. - if (value) { - for (size_t i = InputWidth_, j = 0; i < OutputWidth_; i++, j++) { - AddValue(value.GetElement(j), i); - } - } else { - for (size_t i = InputWidth_; i < OutputWidth_; i++) { - AddValue(value, i); - } - } - OutputRows_++; - } - - void CopyArray(size_t idx, size_t popCount, const ui8* sparseBitmap, size_t bitmapSize) { - const auto& datum = TArrowBlock::From(Inputs_[idx]).GetDatum(); - Y_ENSURE(datum.is_array()); - Builders_[idx]->AddMany(*datum.array(), popCount, sparseBitmap, bitmapSize); - } - - void MakeBlocks() { - if (OutputRows_ == 0) { - return; - } - BuilderAllocatedSize_ = 0; - - for (size_t i = 0; i < Builders_.size(); i++) { - const auto& datum = Builders_[i]->Build(IsFinished_); - Deques[i].clear(); - MKQL_ENSURE(datum.is_arraylike(), "Unexpected block type (expecting array or chunked array)"); - ForEachArrayData(datum, [this, i](const auto& arrayData) { - Deques[i].push_back(arrayData); - }); - } - } - - ui64 Slice() { - auto sliceSize = OutputRows_; - for (size_t i = 0; i < Deques.size(); i++) { - const auto& arrays = Deques[i]; - if (arrays.empty()) { - continue; - } - Y_ABORT_UNLESS(ui64(arrays.front()->length) <= OutputRows_); - sliceSize = std::min<ui64>(sliceSize, arrays.front()->length); - } - - for (size_t i = 0; i < Arrays.size(); i++) { - auto& arrays = Deques[i]; - if (arrays.empty()) { - continue; - } - if (auto& head = arrays.front(); ui64(head->length) == sliceSize) { - Arrays[i] = std::move(head); - arrays.pop_front(); - } else { - Arrays[i] = Chop(head, sliceSize); - } - } - - OutputRows_ -= sliceSize; - return sliceSize; - } - - NUdf::TUnboxedValuePod Get(const ui64 sliceSize, const THolderFactory& holderFactory, const size_t idx) const { - MKQL_ENSURE(idx <= OutputWidth_, "Deques index overflow"); - // Return the slice length as the last column value (i.e. block length). - if (idx == OutputWidth_) { - return holderFactory.CreateArrowBlock(arrow::Datum(std::make_shared<arrow::UInt64Scalar>(sliceSize))); - } - if (auto array = Arrays[idx]) { - return holderFactory.CreateArrowBlock(std::move(array)); - } else { - return NUdf::TUnboxedValuePod(); - } - } - - }; - void MakeState(TComputationContext& ctx, NUdf::TUnboxedValue& state) const { state = ctx.HolderFactory.Create<TState>(ctx, LeftFlowItems_, ResultJoinItems_, ctx.WideFields.data() + WideFieldsIndex_); } diff --git a/ydb/library/yql/minikql/computation/mkql_block_impl.cpp b/ydb/library/yql/minikql/computation/mkql_block_impl.cpp index a46bc9db1c..d5afd7d299 100644 --- a/ydb/library/yql/minikql/computation/mkql_block_impl.cpp +++ b/ydb/library/yql/minikql/computation/mkql_block_impl.cpp @@ -357,13 +357,9 @@ void TBlockState::FillArrays() { return; } MKQL_ENSURE(datum.is_arraylike(), "Unexpected block type (expecting array or chunked array)"); - if (datum.is_array()) { - Deques[i].push_back(datum.array()); - } else { - for (auto& chunk : datum.chunks()) { - Deques[i].push_back(chunk->data()); - } - } + ForEachArrayData(datum, [this, i](const auto& arrayData) { + Deques[i].push_back(arrayData); + }); } } } |