diff options
author | Igor Munkin <imunkin@ydb.tech> | 2024-09-13 19:34:40 +0500 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-09-13 19:34:40 +0500 |
commit | 79360acb9face692b8094ebe8a49e4428de09c72 (patch) | |
tree | 29431a41895f893da3738d7dec9afa80a7c8a3b1 | |
parent | 7dd3efca63b7412638bcf314f48f00d08f53c381 (diff) | |
download | ydb-79360acb9face692b8094ebe8a49e4428de09c72.tar.gz |
Add multikey support for BlockMapJoinCore computation node (#9191)
-rw-r--r-- | ydb/library/yql/minikql/comp_nodes/mkql_block_map_join.cpp | 137 | ||||
-rw-r--r-- | ydb/library/yql/minikql/comp_nodes/ut/mkql_block_map_join_ut.cpp | 338 |
2 files changed, 419 insertions, 56 deletions
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_map_join.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_map_join.cpp index ce870dca54..d9e1807ed5 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_block_map_join.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_map_join.cpp @@ -180,10 +180,10 @@ private: TVector<std::unique_ptr<IArrayBuilder>> Builders_; }; -template <bool WithoutRight, bool RightRequired> -class TBlockWideMapJoinWrapper : public TStatefulWideFlowComputationNode<TBlockWideMapJoinWrapper<WithoutRight, RightRequired>> +template <bool WithoutRight, bool RightRequired, bool IsTuple> +class TBlockWideMapJoinWrapper : public TStatefulWideFlowComputationNode<TBlockWideMapJoinWrapper<WithoutRight, RightRequired, IsTuple>> { -using TBaseComputation = TStatefulWideFlowComputationNode<TBlockWideMapJoinWrapper<WithoutRight, RightRequired>>; +using TBaseComputation = TStatefulWideFlowComputationNode<TBlockWideMapJoinWrapper<WithoutRight, RightRequired, IsTuple>>; using TState = TBlockJoinState<RightRequired>; public: TBlockWideMapJoinWrapper(TComputationMutables& mutables, @@ -198,6 +198,7 @@ public: , Flow_(flow) , Dict_(dict) , WideFieldsIndex_(mutables.IncrementWideFieldsIndex(LeftFlowItems_.size())) + , KeyTuple_(mutables) {} EFetchResult DoCalculate(NUdf::TUnboxedValue& state, TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const { @@ -207,7 +208,7 @@ public: while (!blockState.HasBlocks()) { while (blockState.IsNotFull() && blockState.NextRow()) { - const auto key = MakeKeysTuple(ctx, blockState, LeftKeyColumns_); + const auto key = MakeKeysTuple(ctx, blockState); if constexpr (WithoutRight) { if (key && dict.Contains(key) == RightRequired) { blockState.CopyRow(); @@ -270,10 +271,18 @@ private: return *static_cast<TState*>(state.AsBoxed().Get()); } - NUdf::TUnboxedValue MakeKeysTuple(const TComputationContext& ctx, const TState& state, const TVector<ui32>& keyColumns) const { - // TODO: Handle complex key. + NUdf::TUnboxedValue MakeKeysTuple(TComputationContext& ctx, const TState& state) const { // TODO: Handle converters. - return state.GetValue(ctx.HolderFactory, keyColumns.front()); + if constexpr (!IsTuple) { + return state.GetValue(ctx.HolderFactory, LeftKeyColumns_.front()); + } + + NUdf::TUnboxedValue* items = nullptr; + const auto keys = KeyTuple_.NewArray(ctx, LeftKeyColumns_.size(), items); + for (size_t i = 0; i < LeftKeyColumns_.size(); i++) { + items[i] = state.GetValue(ctx.HolderFactory, LeftKeyColumns_[i]); + } + return keys; } const TVector<TType*> ResultJoinItems_; @@ -283,12 +292,13 @@ private: IComputationWideFlowNode* const Flow_; IComputationNode* const Dict_; ui32 WideFieldsIndex_; + const TContainerCacheOnContext KeyTuple_; }; -template<bool RightRequired> -class TBlockWideMultiMapJoinWrapper : public TPairStateWideFlowComputationNode<TBlockWideMultiMapJoinWrapper<RightRequired>> +template<bool RightRequired, bool IsTuple> +class TBlockWideMultiMapJoinWrapper : public TPairStateWideFlowComputationNode<TBlockWideMultiMapJoinWrapper<RightRequired, IsTuple>> { -using TBaseComputation = TPairStateWideFlowComputationNode<TBlockWideMultiMapJoinWrapper<RightRequired>>; +using TBaseComputation = TPairStateWideFlowComputationNode<TBlockWideMultiMapJoinWrapper<RightRequired, IsTuple>>; using TState = TBlockJoinState<RightRequired>; public: TBlockWideMultiMapJoinWrapper(TComputationMutables& mutables, @@ -303,6 +313,7 @@ public: , Flow_(flow) , Dict_(dict) , WideFieldsIndex_(mutables.IncrementWideFieldsIndex(LeftFlowItems_.size())) + , KeyTuple_(mutables) {} EFetchResult DoCalculate(NUdf::TUnboxedValue& state, NUdf::TUnboxedValue& iterator, TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const { @@ -320,7 +331,7 @@ public: } } if (blockState.IsNotFull() && blockState.NextRow()) { - const auto key = MakeKeysTuple(ctx, blockState, LeftKeyColumns_); + const auto key = MakeKeysTuple(ctx, blockState); // Lookup the item in the right dict. If the lookup succeeds, // reset the iterator and proceed the execution from the // beginning of the outer loop. Otherwise, the iterState is @@ -419,10 +430,18 @@ private: return *static_cast<TIterator*>(iterator.AsBoxed().Get()); } - NUdf::TUnboxedValue MakeKeysTuple(const TComputationContext& ctx, const TState& state, const TVector<ui32>& keyColumns) const { - // TODO: Handle complex key. + NUdf::TUnboxedValue MakeKeysTuple(TComputationContext& ctx, const TState& state) const { // TODO: Handle converters. - return state.GetValue(ctx.HolderFactory, keyColumns.front()); + if constexpr (!IsTuple) { + return state.GetValue(ctx.HolderFactory, LeftKeyColumns_.front()); + } + + NUdf::TUnboxedValue* items = nullptr; + const auto keys = KeyTuple_.NewArray(ctx, LeftKeyColumns_.size(), items); + for (size_t i = 0; i < LeftKeyColumns_.size(); i++) { + items[i] = state.GetValue(ctx.HolderFactory, LeftKeyColumns_[i]); + } + return keys; } const TVector<TType*> ResultJoinItems_; @@ -432,6 +451,7 @@ private: IComputationWideFlowNode* const Flow_; IComputationNode* const Dict_; ui32 WideFieldsIndex_; + const TContainerCacheOnContext KeyTuple_; }; } // namespace @@ -483,8 +503,7 @@ IComputationNode* WrapBlockMapJoinCore(TCallable& callable, const TComputationNo const auto item = AS_VALUE(TDataLiteral, keyColumnsTuple->GetValue(i)); leftKeyColumns.emplace_back(item->AsValue().Get<ui32>()); } - // TODO: Handle multi keys. - Y_ENSURE(leftKeyColumns.size() == 1); + const bool isTupleKey = leftKeyColumns.size() > 1; const auto keyDropsLiteral = callable.GetInput(4); const auto keyDropsTuple = AS_VALUE(TTupleLiteral, keyDropsLiteral); @@ -514,44 +533,54 @@ IComputationNode* WrapBlockMapJoinCore(TCallable& callable, const TComputationNo const auto flow = LocateNode(ctx.NodeLocator, callable, 0); const auto dict = LocateNode(ctx.NodeLocator, callable, 1); - switch (joinKind) { - static const auto joinNames = GetEnumNames<EJoinKind>(); - case EJoinKind::Inner: - if (isMulti) { - return new TBlockWideMultiMapJoinWrapper<true>(ctx.Mutables, - std::move(joinItems), std::move(leftFlowItems), - std::move(leftKeyColumns), std::move(leftIOMap), - static_cast<IComputationWideFlowNode*>(flow), dict); - } - return new TBlockWideMapJoinWrapper<false, true>(ctx.Mutables, - std::move(joinItems), std::move(leftFlowItems), - std::move(leftKeyColumns), std::move(leftIOMap), - static_cast<IComputationWideFlowNode*>(flow), dict); - case EJoinKind::Left: - if (isMulti) { - return new TBlockWideMultiMapJoinWrapper<false>(ctx.Mutables, - std::move(joinItems), std::move(leftFlowItems), - std::move(leftKeyColumns), std::move(leftIOMap), - static_cast<IComputationWideFlowNode*>(flow), dict); - } - return new TBlockWideMapJoinWrapper<false, false>(ctx.Mutables, - std::move(joinItems), std::move(leftFlowItems), - std::move(leftKeyColumns), std::move(leftIOMap), - static_cast<IComputationWideFlowNode*>(flow), dict); - case EJoinKind::LeftSemi: - return new TBlockWideMapJoinWrapper<true, true>(ctx.Mutables, - std::move(joinItems), std::move(leftFlowItems), - std::move(leftKeyColumns), std::move(leftIOMap), - static_cast<IComputationWideFlowNode*>(flow), dict); - case EJoinKind::LeftOnly: - return new TBlockWideMapJoinWrapper<true, false>(ctx.Mutables, - std::move(joinItems), std::move(leftFlowItems), - std::move(leftKeyColumns), std::move(leftIOMap), - static_cast<IComputationWideFlowNode*>(flow), dict); - default: - MKQL_ENSURE(false, "BlockMapJoinCore doesn't support %s join type" - << joinNames.at(joinKind)); - } +#define DISPATCH_JOIN(IS_TUPLE) do { \ + switch (joinKind) { \ + case EJoinKind::Inner: \ + if (isMulti) { \ + return new TBlockWideMultiMapJoinWrapper<true, IS_TUPLE>(ctx.Mutables, \ + std::move(joinItems), std::move(leftFlowItems), \ + std::move(leftKeyColumns), std::move(leftIOMap), \ + static_cast<IComputationWideFlowNode*>(flow), dict); \ + } \ + return new TBlockWideMapJoinWrapper<false, true, IS_TUPLE>(ctx.Mutables, \ + std::move(joinItems), std::move(leftFlowItems), \ + std::move(leftKeyColumns), std::move(leftIOMap), \ + static_cast<IComputationWideFlowNode*>(flow), dict); \ + case EJoinKind::Left: \ + if (isMulti) { \ + return new TBlockWideMultiMapJoinWrapper<false, IS_TUPLE>(ctx.Mutables, \ + std::move(joinItems), std::move(leftFlowItems), \ + std::move(leftKeyColumns), std::move(leftIOMap), \ + static_cast<IComputationWideFlowNode*>(flow), dict); \ + } \ + return new TBlockWideMapJoinWrapper<false, false, IS_TUPLE>(ctx.Mutables, \ + std::move(joinItems), std::move(leftFlowItems), \ + std::move(leftKeyColumns), std::move(leftIOMap), \ + static_cast<IComputationWideFlowNode*>(flow), dict); \ + case EJoinKind::LeftSemi: \ + return new TBlockWideMapJoinWrapper<true, true, IS_TUPLE>(ctx.Mutables, \ + std::move(joinItems), std::move(leftFlowItems), \ + std::move(leftKeyColumns), std::move(leftIOMap), \ + static_cast<IComputationWideFlowNode*>(flow), dict); \ + case EJoinKind::LeftOnly: \ + return new TBlockWideMapJoinWrapper<true, false, IS_TUPLE>(ctx.Mutables, \ + std::move(joinItems), std::move(leftFlowItems), \ + std::move(leftKeyColumns), std::move(leftIOMap), \ + static_cast<IComputationWideFlowNode*>(flow), dict); \ + default: \ + /* TODO: Display the human-readable join kind name. */ \ + MKQL_ENSURE(false, "BlockMapJoinCore doesn't support join type #" \ + << static_cast<ui32>(joinKind)); \ + } \ +} while(0) + + if (isTupleKey) { + DISPATCH_JOIN(true); + } else { + DISPATCH_JOIN(false); + } + +#undef DISPATCH_JOIN } } // namespace NMiniKQL diff --git a/ydb/library/yql/minikql/comp_nodes/ut/mkql_block_map_join_ut.cpp b/ydb/library/yql/minikql/comp_nodes/ut/mkql_block_map_join_ut.cpp index 13e5bc038d..0a84c9f61d 100644 --- a/ydb/library/yql/minikql/comp_nodes/ut/mkql_block_map_join_ut.cpp +++ b/ydb/library/yql/minikql/comp_nodes/ut/mkql_block_map_join_ut.cpp @@ -18,7 +18,7 @@ namespace { const TRuntimeNode MakeSet(TProgramBuilder& pgmBuilder, const TVector<const TRuntimeNode>& keys ) { - const auto keysList = keys.front(); + const auto keysList = keys.size() > 1 ? pgmBuilder.Zip(keys) : keys.front(); return pgmBuilder.ToHashedDict(keysList, false, [&](TRuntimeNode item) { @@ -32,7 +32,7 @@ const TRuntimeNode MakeDict(TProgramBuilder& pgmBuilder, const TVector<const TRuntimeNode>& keys, const TVector<const TRuntimeNode>& payloads ) { - const auto keysList = keys.front(); + const auto keysList = keys.size() > 1 ? pgmBuilder.Zip(keys) : keys.front(); // TODO: Process containers properly. Now just use Zip to pack // the data type in a tuple. TVector<const TRuntimeNode> wrappedPayloads; @@ -1240,5 +1240,339 @@ Y_UNIT_TEST_SUITE(TMiniKQLBlockMapJoinDropKeyColumns) { } // Y_UNIT_TEST_SUITE +Y_UNIT_TEST_SUITE(TMiniKQLBlockMapJoinMultiKeyBasicTest) { + + constexpr size_t testSize = 1 << 14; + constexpr size_t valueSize = 3; + static const TVector<TString> threeLetterValues = GenerateValues(valueSize); + static const TSet<ui64> fibonacci = GenerateFibonacci(21); + + Y_UNIT_TEST(TestInnerOnUint64Uint64) { + TSetup<false> setup; + TProgramBuilder& pgmBuilder = *setup.PgmBuilder; + // 1. Make input for the "left" flow. + TVector<ui64> keyInit(testSize); + std::iota(keyInit.begin(), keyInit.end(), 1); + TVector<ui64> subkeyInit; + std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(subkeyInit), + [](const auto key) { return key * 1001; }); + TVector<TString> valueInit; + std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(valueInit), + [](const auto key) { return threeLetterValues[key]; }); + // 2. Make input for the "right" dict. + TVector<ui64> rightKey1Init(fibonacci.cbegin(), fibonacci.cend()); + TVector<ui64> rightKey2Init; + std::transform(rightKey1Init.cbegin(), rightKey1Init.cend(), std::back_inserter(rightKey2Init), + [](const auto& key) { return key * 1001; }); + TVector<TString> rightPayloadInit; + std::transform(rightKey1Init.cbegin(), rightKey1Init.cend(), std::back_inserter(rightPayloadInit), + [](const auto& key) { return std::to_string(key); }); + // 3. Make "expected" data. + TMap<std::tuple<ui64, ui64>, TString> rightMap; + for (size_t i = 0; i < rightKey1Init.size(); i++) { + const auto key = std::make_tuple(rightKey1Init[i], rightKey2Init[i]); + rightMap[key] = rightPayloadInit[i]; + } + TVector<ui64> keyExpected; + TVector<ui64> subkeyExpected; + TVector<TString> valueExpected; + TVector<TString> rightExpected; + for (size_t i = 0; i < keyInit.size(); i++) { + const auto key = std::make_tuple(keyInit[i], subkeyInit[i]); + const auto found = rightMap.find(key); + if (found != rightMap.cend()) { + keyExpected.push_back(keyInit[i]); + subkeyExpected.push_back(subkeyInit[i]); + valueExpected.push_back(valueInit[i]); + rightExpected.push_back(found->second); + } + } + // 4. Convert input and expected TVectors to List<UV>. + const auto [leftType, leftList] = ConvertVectorsToTuples(setup, + keyInit, subkeyInit, valueInit); + const auto [expectedType, expected] = ConvertVectorsToTuples(setup, + keyExpected, subkeyExpected, valueExpected, rightExpected); + // 5. Build "right" computation node. + const auto rightKeys = BuildListNodes(pgmBuilder, rightKey1Init, rightKey2Init); + const auto rightPayloads = BuildListNodes(pgmBuilder, rightPayloadInit); + const auto rightMapNode = MakeDict(pgmBuilder, rightKeys, rightPayloads); + // 6. Run tests. + RunTestBlockJoin(setup, EJoinKind::Inner, expectedType, expected, + rightMapNode, leftType, leftList, {0, 1}); + } + + Y_UNIT_TEST(TestInnerMultiOnUint64Uint64) { + TSetup<false> setup; + TProgramBuilder& pgmBuilder = *setup.PgmBuilder; + // 1. Make input for the "left" flow. + TVector<ui64> keyInit(testSize); + std::iota(keyInit.begin(), keyInit.end(), 1); + TVector<ui64> subkeyInit; + std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(subkeyInit), + [](const auto key) { return key * 1001; }); + TVector<TString> valueInit; + std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(valueInit), + [](const auto key) { return threeLetterValues[key]; }); + // 2. Make input for the "right" dict. + TVector<ui64> rightKey1Init(fibonacci.cbegin(), fibonacci.cend()); + TVector<ui64> rightKey2Init; + std::transform(rightKey1Init.cbegin(), rightKey1Init.cend(), std::back_inserter(rightKey2Init), + [](const auto& key) { return key * 1001; }); + TVector<TString> rightPayload1Init; + std::transform(rightKey1Init.cbegin(), rightKey1Init.cend(), std::back_inserter(rightPayload1Init), + [](const auto& key) { return std::to_string(key); }); + TVector<TString> rightPayload2Init; + std::transform(rightKey2Init.cbegin(), rightKey2Init.cend(), std::back_inserter(rightPayload2Init), + [](const auto& key) { return std::to_string(key); }); + // 3. Make "expected" data. + TMap<std::tuple<ui64, ui64>, TVector<TString>> rightMultiMap; + for (size_t i = 0; i < rightKey1Init.size(); i++) { + const auto key = std::make_tuple(rightKey1Init[i], rightKey2Init[i]); + rightMultiMap[key] = {rightPayload1Init[i], rightPayload2Init[i]}; + } + TVector<ui64> keyExpected; + TVector<ui64> subkeyExpected; + TVector<TString> valueExpected; + TVector<TString> rightExpected; + for (size_t i = 0; i < keyInit.size(); i++) { + const auto key = std::make_tuple(keyInit[i], subkeyInit[i]); + const auto found = rightMultiMap.find(key); + if (found != rightMultiMap.cend()) { + for (const auto& right : found->second) { + keyExpected.push_back(keyInit[i]); + subkeyExpected.push_back(subkeyInit[i]); + valueExpected.push_back(valueInit[i]); + rightExpected.push_back(right); + } + } + } + // 4. Convert input and expected TVectors to List<UV>. + const auto [leftType, leftList] = ConvertVectorsToTuples(setup, + keyInit, subkeyInit, valueInit); + const auto [expectedType, expected] = ConvertVectorsToTuples(setup, + keyExpected, subkeyExpected, valueExpected, rightExpected); + // 5. Build "right" computation node. + const auto rightKeys = BuildListNodes(pgmBuilder, rightKey1Init, rightKey2Init); + const auto rightPayloads = BuildListNodes(pgmBuilder, rightPayload1Init, rightPayload2Init); + const auto rightMultiMapNode = MakeDict(pgmBuilder, rightKeys, rightPayloads); + // 6. Run tests. + RunTestBlockJoin(setup, EJoinKind::Inner, expectedType, expected, + rightMultiMapNode, leftType, leftList, {0, 1}); + } + + Y_UNIT_TEST(TestLeftOnUint64Uint64) { + TSetup<false> setup; + TProgramBuilder& pgmBuilder = *setup.PgmBuilder; + // 1. Make input for the "left" flow. + TVector<ui64> keyInit(testSize); + std::iota(keyInit.begin(), keyInit.end(), 1); + TVector<ui64> subkeyInit; + std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(subkeyInit), + [](const auto key) { return key * 1001; }); + TVector<TString> valueInit; + std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(valueInit), + [](const auto key) { return threeLetterValues[key]; }); + // 2. Make input for the "right" dict. + TVector<ui64> rightKey1Init(fibonacci.cbegin(), fibonacci.cend()); + TVector<ui64> rightKey2Init; + std::transform(rightKey1Init.cbegin(), rightKey1Init.cend(), std::back_inserter(rightKey2Init), + [](const auto& key) { return key * 1001; }); + TVector<TString> rightPayloadInit; + std::transform(rightKey1Init.cbegin(), rightKey1Init.cend(), std::back_inserter(rightPayloadInit), + [](const auto& key) { return std::to_string(key); }); + // 3. Make "expected" data. + TMap<std::tuple<ui64, ui64>, TString> rightMap; + for (size_t i = 0; i < rightKey1Init.size(); i++) { + const auto key = std::make_tuple(rightKey1Init[i], rightKey2Init[i]); + rightMap[key] = rightPayloadInit[i]; + } + TVector<ui64> keyExpected; + TVector<ui64> subkeyExpected; + TVector<TString> valueExpected; + TVector<std::optional<TString>> rightExpected; + for (size_t i = 0; i < keyInit.size(); i++) { + keyExpected.push_back(keyInit[i]); + subkeyExpected.push_back(subkeyInit[i]); + valueExpected.push_back(valueInit[i]); + const auto key = std::make_tuple(keyInit[i], subkeyInit[i]); + const auto found = rightMap.find(key); + if (found != rightMap.cend()) { + rightExpected.push_back(found->second); + } else { + rightExpected.push_back(std::nullopt); + } + } + // 4. Convert input and expected TVectors to List<UV>. + const auto [leftType, leftList] = ConvertVectorsToTuples(setup, + keyInit, subkeyInit, valueInit); + const auto [expectedType, expected] = ConvertVectorsToTuples(setup, + keyExpected, subkeyExpected, valueExpected, rightExpected); + // 5. Build "right" computation node. + const auto rightKeys = BuildListNodes(pgmBuilder, rightKey1Init, rightKey2Init); + const auto rightPayloads = BuildListNodes(pgmBuilder, rightPayloadInit); + const auto rightMapNode = MakeDict(pgmBuilder, rightKeys, rightPayloads); + // 6. Run tests. + RunTestBlockJoin(setup, EJoinKind::Left, expectedType, expected, + rightMapNode, leftType, leftList, {0, 1}); + } + + Y_UNIT_TEST(TestLeftMultiOnUint64Uint64) { + TSetup<false> setup; + TProgramBuilder& pgmBuilder = *setup.PgmBuilder; + // 1. Make input for the "left" flow. + TVector<ui64> keyInit(testSize); + std::iota(keyInit.begin(), keyInit.end(), 1); + TVector<ui64> subkeyInit; + std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(subkeyInit), + [](const auto key) { return key * 1001; }); + TVector<TString> valueInit; + std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(valueInit), + [](const auto key) { return threeLetterValues[key]; }); + // 2. Make input for the "right" dict. + TVector<ui64> rightKey1Init(fibonacci.cbegin(), fibonacci.cend()); + TVector<ui64> rightKey2Init; + std::transform(rightKey1Init.cbegin(), rightKey1Init.cend(), std::back_inserter(rightKey2Init), + [](const auto& key) { return key * 1001; }); + TVector<TString> rightPayload1Init; + std::transform(rightKey1Init.cbegin(), rightKey1Init.cend(), std::back_inserter(rightPayload1Init), + [](const auto& key) { return std::to_string(key); }); + TVector<TString> rightPayload2Init; + std::transform(rightKey2Init.cbegin(), rightKey2Init.cend(), std::back_inserter(rightPayload2Init), + [](const auto& key) { return std::to_string(key); }); + // 3. Make "expected" data. + TMap<std::tuple<ui64, ui64>, TVector<TString>> rightMultiMap; + for (size_t i = 0; i < rightKey1Init.size(); i++) { + const auto key = std::make_tuple(rightKey1Init[i], rightKey2Init[i]); + rightMultiMap[key] = {rightPayload1Init[i], rightPayload2Init[i]}; + } + TVector<ui64> keyExpected; + TVector<ui64> subkeyExpected; + TVector<TString> valueExpected; + TVector<std::optional<TString>> rightExpected; + for (size_t i = 0; i < keyInit.size(); i++) { + const auto key = std::make_tuple(keyInit[i], subkeyInit[i]); + const auto found = rightMultiMap.find(key); + if (found != rightMultiMap.cend()) { + for (const auto& right : found->second) { + keyExpected.push_back(keyInit[i]); + subkeyExpected.push_back(subkeyInit[i]); + valueExpected.push_back(valueInit[i]); + rightExpected.push_back(right); + } + } else { + keyExpected.push_back(keyInit[i]); + subkeyExpected.push_back(subkeyInit[i]); + valueExpected.push_back(valueInit[i]); + rightExpected.push_back(std::nullopt); + } + } + // 4. Convert input and expected TVectors to List<UV>. + const auto [leftType, leftList] = ConvertVectorsToTuples(setup, + keyInit, subkeyInit, valueInit); + const auto [expectedType, expected] = ConvertVectorsToTuples(setup, + keyExpected, subkeyExpected, valueExpected, rightExpected); + // 5. Build "right" computation node. + const auto rightKeys = BuildListNodes(pgmBuilder, rightKey1Init, rightKey2Init); + const auto rightPayloads = BuildListNodes(pgmBuilder, rightPayload1Init, rightPayload2Init); + const auto rightMultiMapNode = MakeDict(pgmBuilder, rightKeys, rightPayloads); + // 6. Run tests. + RunTestBlockJoin(setup, EJoinKind::Left, expectedType, expected, + rightMultiMapNode, leftType, leftList, {0, 1}); + } + + Y_UNIT_TEST(TestLeftSemiOnUint64Uint64) { + TSetup<false> setup; + TProgramBuilder& pgmBuilder = *setup.PgmBuilder; + // 1. Make input for the "left" flow. + TVector<ui64> keyInit(testSize); + std::iota(keyInit.begin(), keyInit.end(), 1); + TVector<ui64> subkeyInit; + std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(subkeyInit), + [](const auto key) { return key * 1001; }); + TVector<TString> valueInit; + std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(valueInit), + [](const auto key) { return threeLetterValues[key]; }); + // 2. Make input for the "right" dict. + TVector<ui64> rightKey1Init(fibonacci.cbegin(), fibonacci.cend()); + TVector<ui64> rightKey2Init; + std::transform(rightKey1Init.cbegin(), rightKey1Init.cend(), std::back_inserter(rightKey2Init), + [](const auto& key) { return key * 1001; }); + // 3. Make "expected" data. + TSet<std::tuple<ui64, ui64>> rightSet; + for (size_t i = 0; i < rightKey1Init.size(); i++) { + rightSet.emplace(std::make_tuple(rightKey1Init[i], rightKey2Init[i])); + } + TVector<ui64> keyExpected; + TVector<ui64> subkeyExpected; + TVector<TString> valueExpected; + for (size_t i = 0; i < keyInit.size(); i++) { + const auto key = std::make_tuple(keyInit[i], subkeyInit[i]); + if (rightSet.contains(key)) { + keyExpected.push_back(keyInit[i]); + subkeyExpected.push_back(subkeyInit[i]); + valueExpected.push_back(valueInit[i]); + } + } + // 4. Convert input and expected TVectors to List<UV>. + const auto [leftType, leftList] = ConvertVectorsToTuples(setup, + keyInit, subkeyInit, valueInit); + const auto [expectedType, expected] = ConvertVectorsToTuples(setup, + keyExpected, subkeyExpected, valueExpected); + // 5. Build "right" computation node. + const auto rightKeys = BuildListNodes(pgmBuilder, rightKey1Init, rightKey2Init); + const auto rightSetNode = MakeSet(pgmBuilder, rightKeys); + // 6. Run tests. + RunTestBlockJoin(setup, EJoinKind::LeftSemi, expectedType, expected, + rightSetNode, leftType, leftList, {0, 1}); + } + + Y_UNIT_TEST(TestLeftOnlyOnUint64Uint64) { + TSetup<false> setup; + TProgramBuilder& pgmBuilder = *setup.PgmBuilder; + // 1. Make input for the "left" flow. + TVector<ui64> keyInit(testSize); + std::iota(keyInit.begin(), keyInit.end(), 1); + TVector<ui64> subkeyInit; + std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(subkeyInit), + [](const auto key) { return key * 1001; }); + TVector<TString> valueInit; + std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(valueInit), + [](const auto key) { return threeLetterValues[key]; }); + // 2. Make input for the "right" dict. + TVector<ui64> rightKey1Init(fibonacci.cbegin(), fibonacci.cend()); + TVector<ui64> rightKey2Init; + std::transform(rightKey1Init.cbegin(), rightKey1Init.cend(), std::back_inserter(rightKey2Init), + [](const auto& key) { return key * 1001; }); + // 3. Make "expected" data. + TSet<std::tuple<ui64, ui64>> rightSet; + for (size_t i = 0; i < rightKey1Init.size(); i++) { + rightSet.emplace(std::make_tuple(rightKey1Init[i], rightKey2Init[i])); + } + TVector<ui64> keyExpected; + TVector<ui64> subkeyExpected; + TVector<TString> valueExpected; + for (size_t i = 0; i < keyInit.size(); i++) { + const auto key = std::make_tuple(keyInit[i], subkeyInit[i]); + if (!rightSet.contains(key)) { + keyExpected.push_back(keyInit[i]); + subkeyExpected.push_back(subkeyInit[i]); + valueExpected.push_back(valueInit[i]); + } + } + // 4. Convert input and expected TVectors to List<UV>. + const auto [leftType, leftList] = ConvertVectorsToTuples(setup, + keyInit, subkeyInit, valueInit); + const auto [expectedType, expected] = ConvertVectorsToTuples(setup, + keyExpected, subkeyExpected, valueExpected); + // 5. Build "right" computation node. + const auto rightKeys = BuildListNodes(pgmBuilder, rightKey1Init, rightKey2Init); + const auto rightSetNode = MakeSet(pgmBuilder, rightKeys); + // 6. Run tests. + RunTestBlockJoin(setup, EJoinKind::LeftOnly, expectedType, expected, + rightSetNode, leftType, leftList, {0, 1}); + } + +} // Y_UNIT_TEST_SUITE + } // namespace NMiniKQL } // namespace NKikimr |