aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorIgor Munkin <imunkin@ydb.tech>2024-08-29 15:07:38 +0500
committerGitHub <noreply@github.com>2024-08-29 15:07:38 +0500
commit47066e988649275ba87b47bc47b7ef4fcdea560b (patch)
treefb60e74a4c7178994820d5e25ea6f6d25ad7a0f3
parent6d41d83f92ba5ba9a045f2edd2cf41aae7187272 (diff)
downloadydb-47066e988649275ba87b47bc47b7ef4fcdea560b.tar.gz
Introduce TBlockJoinState (#8368)
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_block_map_join.cpp340
-rw-r--r--ydb/library/yql/minikql/computation/mkql_block_impl.cpp10
2 files changed, 149 insertions, 201 deletions
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_map_join.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_map_join.cpp
index f97b90bf73..bcaf1ca751 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_block_map_join.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_map_join.cpp
@@ -21,10 +21,154 @@ size_t CalcMaxBlockLength(const TVector<TType*>& items) {
}));
}
+template <bool RightRequired>
+class TBlockJoinState : public TBlockState {
+public:
+ TBlockJoinState(TMemoryUsageInfo* memInfo, TComputationContext& ctx,
+ const TVector<TType*>& inputItems,
+ const TVector<TType*> outputItems,
+ NUdf::TUnboxedValue**const fields)
+ : TBlockState(memInfo, outputItems.size())
+ , InputWidth_(inputItems.size() - 1)
+ , OutputWidth_(outputItems.size() - 1)
+ , Inputs_(inputItems.size())
+ , InputsDescr_(ToValueDescr(inputItems))
+ {
+ const auto& pgBuilder = ctx.Builder->GetPgBuilder();
+ MaxLength_ = CalcMaxBlockLength(outputItems);
+ for (size_t i = 0; i < inputItems.size(); i++) {
+ fields[i] = &Inputs_[i];
+ const TType* blockItemType = AS_TYPE(TBlockType, inputItems[i])->GetItemType();
+ Readers_.push_back(MakeBlockReader(TTypeInfoHelper(), blockItemType));
+ Converters_.push_back(MakeBlockItemConverter(TTypeInfoHelper(), blockItemType, pgBuilder));
+ }
+ // The last output column (i.e. block length) doesn't require a block builder.
+ for (size_t i = 0; i < OutputWidth_; i++) {
+ const TType* blockItemType = AS_TYPE(TBlockType, outputItems[i])->GetItemType();
+ Builders_.push_back(MakeArrayBuilder(TTypeInfoHelper(), blockItemType, ctx.ArrowMemoryPool, MaxLength_, &pgBuilder, &BuilderAllocatedSize_));
+ }
+ MaxBuilderAllocatedSize_ = MaxAllocatedFactor_ * BuilderAllocatedSize_;
+ }
+
+ void CopyRow() {
+ // Copy items from the "left" flow.
+ for (size_t i = 0; i < InputWidth_; i++) {
+ AddItem(GetItem(i), i);
+ }
+ OutputRows_++;
+ }
+
+ void MakeRow(const NUdf::TUnboxedValuePod& value) {
+ // Copy items from the "left" flow.
+ for (size_t i = 0; i < InputWidth_; i++) {
+ AddItem(GetItem(i), i);
+ }
+ // Convert and append items from the "right" dict.
+ if constexpr (RightRequired) {
+ for (size_t i = InputWidth_, j = 0; i < OutputWidth_; i++, j++) {
+ AddValue(value.GetElement(j), i);
+ }
+ } else {
+ if (value) {
+ for (size_t i = InputWidth_, j = 0; i < OutputWidth_; i++, j++) {
+ AddValue(value.GetElement(j), i);
+ }
+ } else {
+ for (size_t i = InputWidth_; i < OutputWidth_; i++) {
+ AddValue(value, i);
+ }
+ }
+ }
+ OutputRows_++;
+ }
+
+ void MakeBlocks(const THolderFactory& holderFactory) {
+ Values.back() = holderFactory.CreateArrowBlock(arrow::Datum(std::make_shared<arrow::UInt64Scalar>(OutputRows_)));
+ OutputRows_ = 0;
+ BuilderAllocatedSize_ = 0;
+
+ for (size_t i = 0; i < Builders_.size(); i++) {
+ Values[i] = holderFactory.CreateArrowBlock(Builders_[i]->Build(IsFinished_));
+ }
+ FillArrays();
+ }
+
+ TBlockItem GetItem(size_t idx) const {
+ const auto& datum = TArrowBlock::From(Inputs_[idx]).GetDatum();
+ ARROW_DEBUG_CHECK_DATUM_TYPES(InputsDescr_[idx], datum.descr());
+ if (datum.is_scalar()) {
+ return Readers_[idx]->GetScalarItem(*datum.scalar());
+ }
+ MKQL_ENSURE(datum.is_array(), "Expecting array");
+ return Readers_[idx]->GetItem(*datum.array(), Current_);
+ }
+
+ NUdf::TUnboxedValuePod GetValue(const THolderFactory& holderFactory, size_t idx) const {
+ return Converters_[idx]->MakeValue(GetItem(idx), holderFactory);
+ }
+
+ void Reset() {
+ Next_ = 0;
+ InputRows_ = GetBlockCount(Inputs_.back());
+ }
+
+ void Finish() {
+ IsFinished_ = true;
+ }
+
+ bool NextRow() {
+ if (Next_ >= InputRows_) {
+ return false;
+ }
+ Current_ = Next_++;
+ return true;
+ }
+
+ bool IsNotFull() const {
+ return OutputRows_ < MaxLength_
+ && BuilderAllocatedSize_ <= MaxBuilderAllocatedSize_;
+ }
+
+ bool IsEmpty() const {
+ return OutputRows_ == 0;
+ }
+
+ bool IsFinished() const {
+ return IsFinished_;
+ }
+
+private:
+ void AddItem(const TBlockItem& item, size_t idx) {
+ Builders_[idx]->Add(item);
+ }
+
+ void AddValue(const NUdf::TUnboxedValuePod& value, size_t idx) {
+ Builders_[idx]->Add(value);
+ }
+
+ size_t Current_ = 0;
+ size_t Next_ = 0;
+ bool IsFinished_ = false;
+ size_t MaxLength_;
+ size_t BuilderAllocatedSize_ = 0;
+ size_t MaxBuilderAllocatedSize_ = 0;
+ static const size_t MaxAllocatedFactor_ = 4;
+ size_t InputRows_ = 0;
+ size_t OutputRows_ = 0;
+ size_t InputWidth_;
+ size_t OutputWidth_;
+ TUnboxedValueVector Inputs_;
+ const std::vector<arrow::ValueDescr> InputsDescr_;
+ TVector<std::unique_ptr<IBlockReader>> Readers_;
+ TVector<std::unique_ptr<IBlockItemConverter>> Converters_;
+ TVector<std::unique_ptr<IArrayBuilder>> Builders_;
+};
+
template <bool WithoutRight, bool RightRequired>
class TBlockWideMapJoinWrapper : public TStatefulWideFlowComputationNode<TBlockWideMapJoinWrapper<WithoutRight, RightRequired>>
{
using TBaseComputation = TStatefulWideFlowComputationNode<TBlockWideMapJoinWrapper<WithoutRight, RightRequired>>;
+using TState = TBlockJoinState<RightRequired>;
public:
TBlockWideMapJoinWrapper(TComputationMutables& mutables,
const TVector<TType*>&& resultJoinItems, const TVector<TType*>&& leftFlowItems,
@@ -36,7 +180,7 @@ public:
, LeftKeyColumns_(std::move(leftKeyColumns))
, Flow_(flow)
, Dict_(dict)
- , WideFieldsIndex_(mutables.IncrementWideFieldsIndex(ResultJoinItems_.size()))
+ , WideFieldsIndex_(mutables.IncrementWideFieldsIndex(LeftFlowItems_.size()))
{}
EFetchResult DoCalculate(NUdf::TUnboxedValue& state, TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const {
@@ -79,7 +223,7 @@ public:
if (s.IsEmpty()) {
return EFetchResult::Finish;
}
- s.MakeBlocks();
+ s.MakeBlocks(ctx.HolderFactory);
const auto sliceSize = s.Slice();
for (size_t i = 0; i < ResultJoinItems_.size(); i++) {
@@ -98,198 +242,6 @@ private:
}
}
- class TState : public TComputationValue<TState> {
- using TBase = TComputationValue<TState>;
- size_t Current_ = 0;
- size_t Next_ = 0;
- bool IsFinished_ = false;
- size_t MaxLength_;
- size_t BuilderAllocatedSize_ = 0;
- size_t MaxBuilderAllocatedSize_ = 0;
- static const size_t MaxAllocatedFactor_ = 4;
- size_t InputRows_ = 0;
- size_t OutputRows_ = 0;
- size_t InputWidth_;
- size_t OutputWidth_;
- TUnboxedValueVector Inputs_;
- const std::vector<arrow::ValueDescr> InputsDescr_;
- TVector<std::deque<std::shared_ptr<arrow::ArrayData>>> Deques;
- TVector<std::shared_ptr<arrow::ArrayData>> Arrays;
- TVector<std::unique_ptr<IBlockReader>> Readers_;
- TVector<std::unique_ptr<IBlockItemConverter>> Converters_;
- TVector<std::unique_ptr<IArrayBuilder>> Builders_;
-
- public:
- TState(TMemoryUsageInfo* memInfo, TComputationContext& ctx,
- const TVector<TType*>& inputItems, const TVector<TType*> outputItems,
- NUdf::TUnboxedValue**const fields)
- : TBase(memInfo)
- , InputWidth_(inputItems.size() - 1)
- , OutputWidth_(outputItems.size() - 1)
- , Inputs_(inputItems.size())
- , InputsDescr_(ToValueDescr(inputItems))
- , Deques(OutputWidth_)
- , Arrays(OutputWidth_)
- {
- const auto& pgBuilder = ctx.Builder->GetPgBuilder();
- MaxLength_ = CalcMaxBlockLength(outputItems);
- for (size_t i = 0; i < inputItems.size(); i++) {
- fields[i] = &Inputs_[i];
- const TType* blockItemType = AS_TYPE(TBlockType, inputItems[i])->GetItemType();
- Readers_.push_back(MakeBlockReader(TTypeInfoHelper(), blockItemType));
- Converters_.push_back(MakeBlockItemConverter(TTypeInfoHelper(), blockItemType, pgBuilder));
- }
- // The last output column (i.e. block length) doesn't require a block builder.
- for (size_t i = 0; i < OutputWidth_; i++) {
- const TType* blockItemType = AS_TYPE(TBlockType, outputItems[i])->GetItemType();
- Builders_.push_back(MakeArrayBuilder(TTypeInfoHelper(), blockItemType, ctx.ArrowMemoryPool, MaxLength_, &pgBuilder, &BuilderAllocatedSize_));
- }
- MaxBuilderAllocatedSize_ = MaxAllocatedFactor_ * BuilderAllocatedSize_;
- }
-
- void Reset() {
- Next_ = 0;
- InputRows_ = GetBlockCount(Inputs_.back());
- }
-
- void Finish() {
- IsFinished_ = true;
- }
-
- bool NextRow() {
- if (Next_ >= InputRows_) {
- return false;
- }
- Current_ = Next_++;
- return true;
- }
-
- bool IsNotFull() {
- return OutputRows_ < MaxLength_
- && BuilderAllocatedSize_ <= MaxBuilderAllocatedSize_;
- }
-
- bool IsEmpty() {
- return OutputRows_ == 0;
- }
-
- bool IsFinished() {
- return IsFinished_;
- }
-
- TBlockItem GetItem(size_t idx) const {
- const auto& datum = TArrowBlock::From(Inputs_[idx]).GetDatum();
- ARROW_DEBUG_CHECK_DATUM_TYPES(InputsDescr_[idx], datum.descr());
- if (datum.is_scalar()) {
- return Readers_[idx]->GetScalarItem(*datum.scalar());
- }
- MKQL_ENSURE(datum.is_array(), "Expecting array");
- return Readers_[idx]->GetItem(*datum.array(), Current_);
- }
-
- NUdf::TUnboxedValuePod GetValue(const THolderFactory& holderFactory, size_t idx) const {
- return Converters_[idx]->MakeValue(GetItem(idx), holderFactory);
- }
-
- void AddValue(const NUdf::TUnboxedValuePod& value, size_t idx) {
- Builders_[idx]->Add(value);
- }
-
- void AddItem(const TBlockItem& item, size_t idx) {
- Builders_[idx]->Add(item);
- }
-
- void CopyRow() {
- // Copy items from the "left" flow.
- for (size_t i = 0; i < InputWidth_; i++) {
- AddItem(GetItem(i), i);
- }
- OutputRows_++;
- }
-
- void MakeRow(const NUdf::TUnboxedValuePod& value) {
- // Copy items from the "left" flow.
- for (size_t i = 0; i < InputWidth_; i++) {
- AddItem(GetItem(i), i);
- }
- // Convert and append items from the "right" dict.
- if (value) {
- for (size_t i = InputWidth_, j = 0; i < OutputWidth_; i++, j++) {
- AddValue(value.GetElement(j), i);
- }
- } else {
- for (size_t i = InputWidth_; i < OutputWidth_; i++) {
- AddValue(value, i);
- }
- }
- OutputRows_++;
- }
-
- void CopyArray(size_t idx, size_t popCount, const ui8* sparseBitmap, size_t bitmapSize) {
- const auto& datum = TArrowBlock::From(Inputs_[idx]).GetDatum();
- Y_ENSURE(datum.is_array());
- Builders_[idx]->AddMany(*datum.array(), popCount, sparseBitmap, bitmapSize);
- }
-
- void MakeBlocks() {
- if (OutputRows_ == 0) {
- return;
- }
- BuilderAllocatedSize_ = 0;
-
- for (size_t i = 0; i < Builders_.size(); i++) {
- const auto& datum = Builders_[i]->Build(IsFinished_);
- Deques[i].clear();
- MKQL_ENSURE(datum.is_arraylike(), "Unexpected block type (expecting array or chunked array)");
- ForEachArrayData(datum, [this, i](const auto& arrayData) {
- Deques[i].push_back(arrayData);
- });
- }
- }
-
- ui64 Slice() {
- auto sliceSize = OutputRows_;
- for (size_t i = 0; i < Deques.size(); i++) {
- const auto& arrays = Deques[i];
- if (arrays.empty()) {
- continue;
- }
- Y_ABORT_UNLESS(ui64(arrays.front()->length) <= OutputRows_);
- sliceSize = std::min<ui64>(sliceSize, arrays.front()->length);
- }
-
- for (size_t i = 0; i < Arrays.size(); i++) {
- auto& arrays = Deques[i];
- if (arrays.empty()) {
- continue;
- }
- if (auto& head = arrays.front(); ui64(head->length) == sliceSize) {
- Arrays[i] = std::move(head);
- arrays.pop_front();
- } else {
- Arrays[i] = Chop(head, sliceSize);
- }
- }
-
- OutputRows_ -= sliceSize;
- return sliceSize;
- }
-
- NUdf::TUnboxedValuePod Get(const ui64 sliceSize, const THolderFactory& holderFactory, const size_t idx) const {
- MKQL_ENSURE(idx <= OutputWidth_, "Deques index overflow");
- // Return the slice length as the last column value (i.e. block length).
- if (idx == OutputWidth_) {
- return holderFactory.CreateArrowBlock(arrow::Datum(std::make_shared<arrow::UInt64Scalar>(sliceSize)));
- }
- if (auto array = Arrays[idx]) {
- return holderFactory.CreateArrowBlock(std::move(array));
- } else {
- return NUdf::TUnboxedValuePod();
- }
- }
-
- };
-
void MakeState(TComputationContext& ctx, NUdf::TUnboxedValue& state) const {
state = ctx.HolderFactory.Create<TState>(ctx, LeftFlowItems_, ResultJoinItems_, ctx.WideFields.data() + WideFieldsIndex_);
}
diff --git a/ydb/library/yql/minikql/computation/mkql_block_impl.cpp b/ydb/library/yql/minikql/computation/mkql_block_impl.cpp
index a46bc9db1c..d5afd7d299 100644
--- a/ydb/library/yql/minikql/computation/mkql_block_impl.cpp
+++ b/ydb/library/yql/minikql/computation/mkql_block_impl.cpp
@@ -357,13 +357,9 @@ void TBlockState::FillArrays() {
return;
}
MKQL_ENSURE(datum.is_arraylike(), "Unexpected block type (expecting array or chunked array)");
- if (datum.is_array()) {
- Deques[i].push_back(datum.array());
- } else {
- for (auto& chunk : datum.chunks()) {
- Deques[i].push_back(chunk->data());
- }
- }
+ ForEachArrayData(datum, [this, i](const auto& arrayData) {
+ Deques[i].push_back(arrayData);
+ });
}
}
}