aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorIgor Munkin <imunkin@ydb.tech>2024-09-06 16:57:46 +0300
committerGitHub <noreply@github.com>2024-09-06 16:57:46 +0300
commite10ef55a9580e0773e309f9c33a367da06dd2ef8 (patch)
tree605c0441b639785bd5d5351e3db5d0a9b2c20265
parent8c72783b41de368b2b8098d93da5bf1fadd3c45e (diff)
downloadydb-e10ef55a9580e0773e309f9c33a367da06dd2ef8.tar.gz
Fix MakeBlocks usage in BlockMapJoinCore computation node (#8686)
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_block_map_join.cpp37
-rw-r--r--ydb/library/yql/minikql/comp_nodes/ut/mkql_block_map_join_ut.cpp69
-rw-r--r--ydb/library/yql/minikql/computation/mkql_block_impl.cpp1
3 files changed, 89 insertions, 18 deletions
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_map_join.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_map_join.cpp
index b26d8bd097d..84c50b3c405 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_block_map_join.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_map_join.cpp
@@ -126,6 +126,10 @@ public:
return true;
}
+ bool HasBlocks() {
+ return Count > 0;
+ }
+
bool IsNotFull() const {
return OutputRows_ < MaxLength_
&& BuilderAllocatedSize_ <= MaxBuilderAllocatedSize_;
@@ -190,7 +194,7 @@ public:
auto** fields = ctx.WideFields.data() + WideFieldsIndex_;
const auto dict = Dict_->GetValue(ctx);
- do {
+ while (!blockState.HasBlocks()) {
while (blockState.IsNotFull() && blockState.NextRow()) {
const auto key = MakeKeysTuple(ctx, blockState, LeftKeyColumns_);
if constexpr (WithoutRight) {
@@ -205,7 +209,7 @@ public:
blockState.MakeRow(dict.Lookup(key));
}
}
- if (!blockState.IsFinished()) {
+ if (blockState.IsNotFull() && !blockState.IsFinished()) {
switch (Flow_->FetchValues(ctx, fields)) {
case EFetchResult::Yield:
return EFetchResult::Yield;
@@ -216,16 +220,15 @@ public:
blockState.Finish();
break;
}
+ // Leave the loop, if no values left in the flow.
+ Y_DEBUG_ABORT_UNLESS(blockState.IsFinished());
}
- // Leave the outer loop, if no values left in the flow.
- Y_DEBUG_ABORT_UNLESS(blockState.IsFinished());
- break;
- } while (true);
-
- if (blockState.IsEmpty()) {
- return EFetchResult::Finish;
+ if (blockState.IsEmpty()) {
+ return EFetchResult::Finish;
+ }
+ blockState.MakeBlocks(ctx.HolderFactory);
}
- blockState.MakeBlocks(ctx.HolderFactory);
+
const auto sliceSize = blockState.Slice();
for (size_t i = 0; i < ResultJoinItems_.size(); i++) {
@@ -294,7 +297,7 @@ public:
auto** fields = ctx.WideFields.data() + WideFieldsIndex_;
const auto dict = Dict_->GetValue(ctx);
- do {
+ while (!blockState.HasBlocks()) {
if (iterState) {
NUdf::TUnboxedValue lookupItem;
// Process the remaining items from the iterator.
@@ -329,15 +332,13 @@ public:
}
// Leave the loop, if no values left in the flow.
Y_DEBUG_ABORT_UNLESS(blockState.IsFinished());
- break;
}
- break;
- } while(true);
-
- if (blockState.IsEmpty()) {
- return EFetchResult::Finish;
+ if (blockState.IsEmpty()) {
+ return EFetchResult::Finish;
+ }
+ blockState.MakeBlocks(ctx.HolderFactory);
}
- blockState.MakeBlocks(ctx.HolderFactory);
+
const auto sliceSize = blockState.Slice();
for (size_t i = 0; i < ResultJoinItems_.size(); i++) {
diff --git a/ydb/library/yql/minikql/comp_nodes/ut/mkql_block_map_join_ut.cpp b/ydb/library/yql/minikql/comp_nodes/ut/mkql_block_map_join_ut.cpp
index bbb457f151e..47112789960 100644
--- a/ydb/library/yql/minikql/comp_nodes/ut/mkql_block_map_join_ut.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/ut/mkql_block_map_join_ut.cpp
@@ -470,5 +470,74 @@ Y_UNIT_TEST_SUITE(TMiniKQLBlockMapJoinBasicTest) {
} // Y_UNIT_TEST_SUITE
+Y_UNIT_TEST_SUITE(TMiniKQLBlockMapJoinMoreTest) {
+
+ constexpr size_t testSize = 1 << 14;
+ constexpr size_t valueSize = 3;
+ static const TVector<TString> threeLetterValues = GenerateValues(valueSize);
+ static const TString hugeString(128, '1');
+
+ const TVector<TKSV> MakeFillTKSV(const TVector<ui64>& keyInit,
+ const ui64 subkeyMultiplier, const TVector<TString>& valuePayload
+ ) {
+ TVector<TKSV> testKSV;
+ for (size_t i = 0; i < keyInit.size(); i++) {
+ testKSV.push_back(std::make_tuple(keyInit[i],
+ keyInit[i] * subkeyMultiplier,
+ valuePayload[i]));
+ }
+ return testKSV;
+ }
+
+ Y_UNIT_TEST(TestInnerOn1) {
+ TVector<ui64> keyInit(testSize);
+ std::fill(keyInit.begin(), keyInit.end(), 1);
+ const auto leftFlow = MakeFillTKSV(keyInit, 1001, threeLetterValues);
+ TKSWMap rightMap = {{1, hugeString}};
+ TestBlockJoinWithRightOnUint64(EJoinKind::Inner, leftFlow, rightMap);
+ }
+
+ Y_UNIT_TEST(TestInnerMultiOn1) {
+ TVector<ui64> keyInit(testSize);
+ std::fill(keyInit.begin(), keyInit.end(), 1);
+ const auto leftFlow = MakeFillTKSV(keyInit, 1001, threeLetterValues);
+ TKSWMultiMap rightMultiMap = {{1, {"1", hugeString}}};
+ TestBlockMultiJoinWithRightOnUint64(EJoinKind::Inner, leftFlow, rightMultiMap);
+ }
+
+ Y_UNIT_TEST(TestLeftOn1) {
+ TVector<ui64> keyInit(testSize);
+ std::fill(keyInit.begin(), keyInit.end(), 1);
+ const auto leftFlow = MakeFillTKSV(keyInit, 1001, threeLetterValues);
+ TKSWMap rightMap = {{1, hugeString}};
+ TestBlockJoinWithRightOnUint64(EJoinKind::Left, leftFlow, rightMap);;
+ }
+
+ Y_UNIT_TEST(TestLeftMultiOn1) {
+ TVector<ui64> keyInit(testSize);
+ std::fill(keyInit.begin(), keyInit.end(), 1);
+ const auto leftFlow = MakeFillTKSV(keyInit, 1001, threeLetterValues);
+ TKSWMultiMap rightMultiMap = {{1, {"1", hugeString}}};
+ TestBlockMultiJoinWithRightOnUint64(EJoinKind::Left, leftFlow, rightMultiMap);
+ }
+
+ Y_UNIT_TEST(TestLeftSemiOn1) {
+ TVector<ui64> keyInit(testSize);
+ std::fill(keyInit.begin(), keyInit.end(), 1);
+ const auto leftFlow = MakeFillTKSV(keyInit, 1001, threeLetterValues);
+ const TKSVSet rightSet({1});
+ TestBlockJoinWithoutRightOnUint64(EJoinKind::LeftSemi, leftFlow, rightSet);
+ }
+
+ Y_UNIT_TEST(TestLeftOnlyOn1) {
+ TVector<ui64> keyInit(testSize);
+ std::fill(keyInit.begin(), keyInit.end(), 1);
+ const auto leftFlow = MakeFillTKSV(keyInit, 1001, threeLetterValues);
+ const TKSVSet rightSet({1});
+ TestBlockJoinWithoutRightOnUint64(EJoinKind::LeftOnly, leftFlow, rightSet);
+ }
+
+} // Y_UNIT_TEST_SUITE
+
} // namespace NMiniKQL
} // namespace NKikimr
diff --git a/ydb/library/yql/minikql/computation/mkql_block_impl.cpp b/ydb/library/yql/minikql/computation/mkql_block_impl.cpp
index d5afd7d2993..2bb6634023a 100644
--- a/ydb/library/yql/minikql/computation/mkql_block_impl.cpp
+++ b/ydb/library/yql/minikql/computation/mkql_block_impl.cpp
@@ -343,6 +343,7 @@ void TBlockState::ClearValues() {
}
void TBlockState::FillArrays() {
+ MKQL_ENSURE(Count == 0, "All existing arrays have to be processed");
auto& counterDatum = TArrowBlock::From(Values.back()).GetDatum();
MKQL_ENSURE(counterDatum.is_scalar(), "Unexpected block length type (expecting scalar)");
Count = counterDatum.scalar_as<arrow::UInt64Scalar>().value;