diff options
author | Igor Munkin <imunkin@ydb.tech> | 2024-09-06 16:57:46 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-09-06 16:57:46 +0300 |
commit | e10ef55a9580e0773e309f9c33a367da06dd2ef8 (patch) | |
tree | 605c0441b639785bd5d5351e3db5d0a9b2c20265 | |
parent | 8c72783b41de368b2b8098d93da5bf1fadd3c45e (diff) | |
download | ydb-e10ef55a9580e0773e309f9c33a367da06dd2ef8.tar.gz |
Fix MakeBlocks usage in BlockMapJoinCore computation node (#8686)
3 files changed, 89 insertions, 18 deletions
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_map_join.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_map_join.cpp index b26d8bd097d..84c50b3c405 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_block_map_join.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_map_join.cpp @@ -126,6 +126,10 @@ public: return true; } + bool HasBlocks() { + return Count > 0; + } + bool IsNotFull() const { return OutputRows_ < MaxLength_ && BuilderAllocatedSize_ <= MaxBuilderAllocatedSize_; @@ -190,7 +194,7 @@ public: auto** fields = ctx.WideFields.data() + WideFieldsIndex_; const auto dict = Dict_->GetValue(ctx); - do { + while (!blockState.HasBlocks()) { while (blockState.IsNotFull() && blockState.NextRow()) { const auto key = MakeKeysTuple(ctx, blockState, LeftKeyColumns_); if constexpr (WithoutRight) { @@ -205,7 +209,7 @@ public: blockState.MakeRow(dict.Lookup(key)); } } - if (!blockState.IsFinished()) { + if (blockState.IsNotFull() && !blockState.IsFinished()) { switch (Flow_->FetchValues(ctx, fields)) { case EFetchResult::Yield: return EFetchResult::Yield; @@ -216,16 +220,15 @@ public: blockState.Finish(); break; } + // Leave the loop, if no values left in the flow. + Y_DEBUG_ABORT_UNLESS(blockState.IsFinished()); } - // Leave the outer loop, if no values left in the flow. - Y_DEBUG_ABORT_UNLESS(blockState.IsFinished()); - break; - } while (true); - - if (blockState.IsEmpty()) { - return EFetchResult::Finish; + if (blockState.IsEmpty()) { + return EFetchResult::Finish; + } + blockState.MakeBlocks(ctx.HolderFactory); } - blockState.MakeBlocks(ctx.HolderFactory); + const auto sliceSize = blockState.Slice(); for (size_t i = 0; i < ResultJoinItems_.size(); i++) { @@ -294,7 +297,7 @@ public: auto** fields = ctx.WideFields.data() + WideFieldsIndex_; const auto dict = Dict_->GetValue(ctx); - do { + while (!blockState.HasBlocks()) { if (iterState) { NUdf::TUnboxedValue lookupItem; // Process the remaining items from the iterator. @@ -329,15 +332,13 @@ public: } // Leave the loop, if no values left in the flow. Y_DEBUG_ABORT_UNLESS(blockState.IsFinished()); - break; } - break; - } while(true); - - if (blockState.IsEmpty()) { - return EFetchResult::Finish; + if (blockState.IsEmpty()) { + return EFetchResult::Finish; + } + blockState.MakeBlocks(ctx.HolderFactory); } - blockState.MakeBlocks(ctx.HolderFactory); + const auto sliceSize = blockState.Slice(); for (size_t i = 0; i < ResultJoinItems_.size(); i++) { diff --git a/ydb/library/yql/minikql/comp_nodes/ut/mkql_block_map_join_ut.cpp b/ydb/library/yql/minikql/comp_nodes/ut/mkql_block_map_join_ut.cpp index bbb457f151e..47112789960 100644 --- a/ydb/library/yql/minikql/comp_nodes/ut/mkql_block_map_join_ut.cpp +++ b/ydb/library/yql/minikql/comp_nodes/ut/mkql_block_map_join_ut.cpp @@ -470,5 +470,74 @@ Y_UNIT_TEST_SUITE(TMiniKQLBlockMapJoinBasicTest) { } // Y_UNIT_TEST_SUITE +Y_UNIT_TEST_SUITE(TMiniKQLBlockMapJoinMoreTest) { + + constexpr size_t testSize = 1 << 14; + constexpr size_t valueSize = 3; + static const TVector<TString> threeLetterValues = GenerateValues(valueSize); + static const TString hugeString(128, '1'); + + const TVector<TKSV> MakeFillTKSV(const TVector<ui64>& keyInit, + const ui64 subkeyMultiplier, const TVector<TString>& valuePayload + ) { + TVector<TKSV> testKSV; + for (size_t i = 0; i < keyInit.size(); i++) { + testKSV.push_back(std::make_tuple(keyInit[i], + keyInit[i] * subkeyMultiplier, + valuePayload[i])); + } + return testKSV; + } + + Y_UNIT_TEST(TestInnerOn1) { + TVector<ui64> keyInit(testSize); + std::fill(keyInit.begin(), keyInit.end(), 1); + const auto leftFlow = MakeFillTKSV(keyInit, 1001, threeLetterValues); + TKSWMap rightMap = {{1, hugeString}}; + TestBlockJoinWithRightOnUint64(EJoinKind::Inner, leftFlow, rightMap); + } + + Y_UNIT_TEST(TestInnerMultiOn1) { + TVector<ui64> keyInit(testSize); + std::fill(keyInit.begin(), keyInit.end(), 1); + const auto leftFlow = MakeFillTKSV(keyInit, 1001, threeLetterValues); + TKSWMultiMap rightMultiMap = {{1, {"1", hugeString}}}; + TestBlockMultiJoinWithRightOnUint64(EJoinKind::Inner, leftFlow, rightMultiMap); + } + + Y_UNIT_TEST(TestLeftOn1) { + TVector<ui64> keyInit(testSize); + std::fill(keyInit.begin(), keyInit.end(), 1); + const auto leftFlow = MakeFillTKSV(keyInit, 1001, threeLetterValues); + TKSWMap rightMap = {{1, hugeString}}; + TestBlockJoinWithRightOnUint64(EJoinKind::Left, leftFlow, rightMap);; + } + + Y_UNIT_TEST(TestLeftMultiOn1) { + TVector<ui64> keyInit(testSize); + std::fill(keyInit.begin(), keyInit.end(), 1); + const auto leftFlow = MakeFillTKSV(keyInit, 1001, threeLetterValues); + TKSWMultiMap rightMultiMap = {{1, {"1", hugeString}}}; + TestBlockMultiJoinWithRightOnUint64(EJoinKind::Left, leftFlow, rightMultiMap); + } + + Y_UNIT_TEST(TestLeftSemiOn1) { + TVector<ui64> keyInit(testSize); + std::fill(keyInit.begin(), keyInit.end(), 1); + const auto leftFlow = MakeFillTKSV(keyInit, 1001, threeLetterValues); + const TKSVSet rightSet({1}); + TestBlockJoinWithoutRightOnUint64(EJoinKind::LeftSemi, leftFlow, rightSet); + } + + Y_UNIT_TEST(TestLeftOnlyOn1) { + TVector<ui64> keyInit(testSize); + std::fill(keyInit.begin(), keyInit.end(), 1); + const auto leftFlow = MakeFillTKSV(keyInit, 1001, threeLetterValues); + const TKSVSet rightSet({1}); + TestBlockJoinWithoutRightOnUint64(EJoinKind::LeftOnly, leftFlow, rightSet); + } + +} // Y_UNIT_TEST_SUITE + } // namespace NMiniKQL } // namespace NKikimr diff --git a/ydb/library/yql/minikql/computation/mkql_block_impl.cpp b/ydb/library/yql/minikql/computation/mkql_block_impl.cpp index d5afd7d2993..2bb6634023a 100644 --- a/ydb/library/yql/minikql/computation/mkql_block_impl.cpp +++ b/ydb/library/yql/minikql/computation/mkql_block_impl.cpp @@ -343,6 +343,7 @@ void TBlockState::ClearValues() { } void TBlockState::FillArrays() { + MKQL_ENSURE(Count == 0, "All existing arrays have to be processed"); auto& counterDatum = TArrowBlock::From(Values.back()).GetDatum(); MKQL_ENSURE(counterDatum.is_scalar(), "Unexpected block length type (expecting scalar)"); Count = counterDatum.scalar_as<arrow::UInt64Scalar>().value; |