aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorIgor Munkin <imunkin@ydb.tech>2024-09-13 19:34:40 +0500
committerGitHub <noreply@github.com>2024-09-13 19:34:40 +0500
commit79360acb9face692b8094ebe8a49e4428de09c72 (patch)
tree29431a41895f893da3738d7dec9afa80a7c8a3b1
parent7dd3efca63b7412638bcf314f48f00d08f53c381 (diff)
downloadydb-79360acb9face692b8094ebe8a49e4428de09c72.tar.gz
Add multikey support for BlockMapJoinCore computation node (#9191)
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_block_map_join.cpp137
-rw-r--r--ydb/library/yql/minikql/comp_nodes/ut/mkql_block_map_join_ut.cpp338
2 files changed, 419 insertions, 56 deletions
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_map_join.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_map_join.cpp
index ce870dca54..d9e1807ed5 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_block_map_join.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_map_join.cpp
@@ -180,10 +180,10 @@ private:
TVector<std::unique_ptr<IArrayBuilder>> Builders_;
};
-template <bool WithoutRight, bool RightRequired>
-class TBlockWideMapJoinWrapper : public TStatefulWideFlowComputationNode<TBlockWideMapJoinWrapper<WithoutRight, RightRequired>>
+template <bool WithoutRight, bool RightRequired, bool IsTuple>
+class TBlockWideMapJoinWrapper : public TStatefulWideFlowComputationNode<TBlockWideMapJoinWrapper<WithoutRight, RightRequired, IsTuple>>
{
-using TBaseComputation = TStatefulWideFlowComputationNode<TBlockWideMapJoinWrapper<WithoutRight, RightRequired>>;
+using TBaseComputation = TStatefulWideFlowComputationNode<TBlockWideMapJoinWrapper<WithoutRight, RightRequired, IsTuple>>;
using TState = TBlockJoinState<RightRequired>;
public:
TBlockWideMapJoinWrapper(TComputationMutables& mutables,
@@ -198,6 +198,7 @@ public:
, Flow_(flow)
, Dict_(dict)
, WideFieldsIndex_(mutables.IncrementWideFieldsIndex(LeftFlowItems_.size()))
+ , KeyTuple_(mutables)
{}
EFetchResult DoCalculate(NUdf::TUnboxedValue& state, TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const {
@@ -207,7 +208,7 @@ public:
while (!blockState.HasBlocks()) {
while (blockState.IsNotFull() && blockState.NextRow()) {
- const auto key = MakeKeysTuple(ctx, blockState, LeftKeyColumns_);
+ const auto key = MakeKeysTuple(ctx, blockState);
if constexpr (WithoutRight) {
if (key && dict.Contains(key) == RightRequired) {
blockState.CopyRow();
@@ -270,10 +271,18 @@ private:
return *static_cast<TState*>(state.AsBoxed().Get());
}
- NUdf::TUnboxedValue MakeKeysTuple(const TComputationContext& ctx, const TState& state, const TVector<ui32>& keyColumns) const {
- // TODO: Handle complex key.
+ NUdf::TUnboxedValue MakeKeysTuple(TComputationContext& ctx, const TState& state) const {
// TODO: Handle converters.
- return state.GetValue(ctx.HolderFactory, keyColumns.front());
+ if constexpr (!IsTuple) {
+ return state.GetValue(ctx.HolderFactory, LeftKeyColumns_.front());
+ }
+
+ NUdf::TUnboxedValue* items = nullptr;
+ const auto keys = KeyTuple_.NewArray(ctx, LeftKeyColumns_.size(), items);
+ for (size_t i = 0; i < LeftKeyColumns_.size(); i++) {
+ items[i] = state.GetValue(ctx.HolderFactory, LeftKeyColumns_[i]);
+ }
+ return keys;
}
const TVector<TType*> ResultJoinItems_;
@@ -283,12 +292,13 @@ private:
IComputationWideFlowNode* const Flow_;
IComputationNode* const Dict_;
ui32 WideFieldsIndex_;
+ const TContainerCacheOnContext KeyTuple_;
};
-template<bool RightRequired>
-class TBlockWideMultiMapJoinWrapper : public TPairStateWideFlowComputationNode<TBlockWideMultiMapJoinWrapper<RightRequired>>
+template<bool RightRequired, bool IsTuple>
+class TBlockWideMultiMapJoinWrapper : public TPairStateWideFlowComputationNode<TBlockWideMultiMapJoinWrapper<RightRequired, IsTuple>>
{
-using TBaseComputation = TPairStateWideFlowComputationNode<TBlockWideMultiMapJoinWrapper<RightRequired>>;
+using TBaseComputation = TPairStateWideFlowComputationNode<TBlockWideMultiMapJoinWrapper<RightRequired, IsTuple>>;
using TState = TBlockJoinState<RightRequired>;
public:
TBlockWideMultiMapJoinWrapper(TComputationMutables& mutables,
@@ -303,6 +313,7 @@ public:
, Flow_(flow)
, Dict_(dict)
, WideFieldsIndex_(mutables.IncrementWideFieldsIndex(LeftFlowItems_.size()))
+ , KeyTuple_(mutables)
{}
EFetchResult DoCalculate(NUdf::TUnboxedValue& state, NUdf::TUnboxedValue& iterator, TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const {
@@ -320,7 +331,7 @@ public:
}
}
if (blockState.IsNotFull() && blockState.NextRow()) {
- const auto key = MakeKeysTuple(ctx, blockState, LeftKeyColumns_);
+ const auto key = MakeKeysTuple(ctx, blockState);
// Lookup the item in the right dict. If the lookup succeeds,
// reset the iterator and proceed the execution from the
// beginning of the outer loop. Otherwise, the iterState is
@@ -419,10 +430,18 @@ private:
return *static_cast<TIterator*>(iterator.AsBoxed().Get());
}
- NUdf::TUnboxedValue MakeKeysTuple(const TComputationContext& ctx, const TState& state, const TVector<ui32>& keyColumns) const {
- // TODO: Handle complex key.
+ NUdf::TUnboxedValue MakeKeysTuple(TComputationContext& ctx, const TState& state) const {
// TODO: Handle converters.
- return state.GetValue(ctx.HolderFactory, keyColumns.front());
+ if constexpr (!IsTuple) {
+ return state.GetValue(ctx.HolderFactory, LeftKeyColumns_.front());
+ }
+
+ NUdf::TUnboxedValue* items = nullptr;
+ const auto keys = KeyTuple_.NewArray(ctx, LeftKeyColumns_.size(), items);
+ for (size_t i = 0; i < LeftKeyColumns_.size(); i++) {
+ items[i] = state.GetValue(ctx.HolderFactory, LeftKeyColumns_[i]);
+ }
+ return keys;
}
const TVector<TType*> ResultJoinItems_;
@@ -432,6 +451,7 @@ private:
IComputationWideFlowNode* const Flow_;
IComputationNode* const Dict_;
ui32 WideFieldsIndex_;
+ const TContainerCacheOnContext KeyTuple_;
};
} // namespace
@@ -483,8 +503,7 @@ IComputationNode* WrapBlockMapJoinCore(TCallable& callable, const TComputationNo
const auto item = AS_VALUE(TDataLiteral, keyColumnsTuple->GetValue(i));
leftKeyColumns.emplace_back(item->AsValue().Get<ui32>());
}
- // TODO: Handle multi keys.
- Y_ENSURE(leftKeyColumns.size() == 1);
+ const bool isTupleKey = leftKeyColumns.size() > 1;
const auto keyDropsLiteral = callable.GetInput(4);
const auto keyDropsTuple = AS_VALUE(TTupleLiteral, keyDropsLiteral);
@@ -514,44 +533,54 @@ IComputationNode* WrapBlockMapJoinCore(TCallable& callable, const TComputationNo
const auto flow = LocateNode(ctx.NodeLocator, callable, 0);
const auto dict = LocateNode(ctx.NodeLocator, callable, 1);
- switch (joinKind) {
- static const auto joinNames = GetEnumNames<EJoinKind>();
- case EJoinKind::Inner:
- if (isMulti) {
- return new TBlockWideMultiMapJoinWrapper<true>(ctx.Mutables,
- std::move(joinItems), std::move(leftFlowItems),
- std::move(leftKeyColumns), std::move(leftIOMap),
- static_cast<IComputationWideFlowNode*>(flow), dict);
- }
- return new TBlockWideMapJoinWrapper<false, true>(ctx.Mutables,
- std::move(joinItems), std::move(leftFlowItems),
- std::move(leftKeyColumns), std::move(leftIOMap),
- static_cast<IComputationWideFlowNode*>(flow), dict);
- case EJoinKind::Left:
- if (isMulti) {
- return new TBlockWideMultiMapJoinWrapper<false>(ctx.Mutables,
- std::move(joinItems), std::move(leftFlowItems),
- std::move(leftKeyColumns), std::move(leftIOMap),
- static_cast<IComputationWideFlowNode*>(flow), dict);
- }
- return new TBlockWideMapJoinWrapper<false, false>(ctx.Mutables,
- std::move(joinItems), std::move(leftFlowItems),
- std::move(leftKeyColumns), std::move(leftIOMap),
- static_cast<IComputationWideFlowNode*>(flow), dict);
- case EJoinKind::LeftSemi:
- return new TBlockWideMapJoinWrapper<true, true>(ctx.Mutables,
- std::move(joinItems), std::move(leftFlowItems),
- std::move(leftKeyColumns), std::move(leftIOMap),
- static_cast<IComputationWideFlowNode*>(flow), dict);
- case EJoinKind::LeftOnly:
- return new TBlockWideMapJoinWrapper<true, false>(ctx.Mutables,
- std::move(joinItems), std::move(leftFlowItems),
- std::move(leftKeyColumns), std::move(leftIOMap),
- static_cast<IComputationWideFlowNode*>(flow), dict);
- default:
- MKQL_ENSURE(false, "BlockMapJoinCore doesn't support %s join type"
- << joinNames.at(joinKind));
- }
+#define DISPATCH_JOIN(IS_TUPLE) do { \
+ switch (joinKind) { \
+ case EJoinKind::Inner: \
+ if (isMulti) { \
+ return new TBlockWideMultiMapJoinWrapper<true, IS_TUPLE>(ctx.Mutables, \
+ std::move(joinItems), std::move(leftFlowItems), \
+ std::move(leftKeyColumns), std::move(leftIOMap), \
+ static_cast<IComputationWideFlowNode*>(flow), dict); \
+ } \
+ return new TBlockWideMapJoinWrapper<false, true, IS_TUPLE>(ctx.Mutables, \
+ std::move(joinItems), std::move(leftFlowItems), \
+ std::move(leftKeyColumns), std::move(leftIOMap), \
+ static_cast<IComputationWideFlowNode*>(flow), dict); \
+ case EJoinKind::Left: \
+ if (isMulti) { \
+ return new TBlockWideMultiMapJoinWrapper<false, IS_TUPLE>(ctx.Mutables, \
+ std::move(joinItems), std::move(leftFlowItems), \
+ std::move(leftKeyColumns), std::move(leftIOMap), \
+ static_cast<IComputationWideFlowNode*>(flow), dict); \
+ } \
+ return new TBlockWideMapJoinWrapper<false, false, IS_TUPLE>(ctx.Mutables, \
+ std::move(joinItems), std::move(leftFlowItems), \
+ std::move(leftKeyColumns), std::move(leftIOMap), \
+ static_cast<IComputationWideFlowNode*>(flow), dict); \
+ case EJoinKind::LeftSemi: \
+ return new TBlockWideMapJoinWrapper<true, true, IS_TUPLE>(ctx.Mutables, \
+ std::move(joinItems), std::move(leftFlowItems), \
+ std::move(leftKeyColumns), std::move(leftIOMap), \
+ static_cast<IComputationWideFlowNode*>(flow), dict); \
+ case EJoinKind::LeftOnly: \
+ return new TBlockWideMapJoinWrapper<true, false, IS_TUPLE>(ctx.Mutables, \
+ std::move(joinItems), std::move(leftFlowItems), \
+ std::move(leftKeyColumns), std::move(leftIOMap), \
+ static_cast<IComputationWideFlowNode*>(flow), dict); \
+ default: \
+ /* TODO: Display the human-readable join kind name. */ \
+ MKQL_ENSURE(false, "BlockMapJoinCore doesn't support join type #" \
+ << static_cast<ui32>(joinKind)); \
+ } \
+} while(0)
+
+ if (isTupleKey) {
+ DISPATCH_JOIN(true);
+ } else {
+ DISPATCH_JOIN(false);
+ }
+
+#undef DISPATCH_JOIN
}
} // namespace NMiniKQL
diff --git a/ydb/library/yql/minikql/comp_nodes/ut/mkql_block_map_join_ut.cpp b/ydb/library/yql/minikql/comp_nodes/ut/mkql_block_map_join_ut.cpp
index 13e5bc038d..0a84c9f61d 100644
--- a/ydb/library/yql/minikql/comp_nodes/ut/mkql_block_map_join_ut.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/ut/mkql_block_map_join_ut.cpp
@@ -18,7 +18,7 @@ namespace {
const TRuntimeNode MakeSet(TProgramBuilder& pgmBuilder,
const TVector<const TRuntimeNode>& keys
) {
- const auto keysList = keys.front();
+ const auto keysList = keys.size() > 1 ? pgmBuilder.Zip(keys) : keys.front();
return pgmBuilder.ToHashedDict(keysList, false,
[&](TRuntimeNode item) {
@@ -32,7 +32,7 @@ const TRuntimeNode MakeDict(TProgramBuilder& pgmBuilder,
const TVector<const TRuntimeNode>& keys,
const TVector<const TRuntimeNode>& payloads
) {
- const auto keysList = keys.front();
+ const auto keysList = keys.size() > 1 ? pgmBuilder.Zip(keys) : keys.front();
// TODO: Process containers properly. Now just use Zip to pack
// the data type in a tuple.
TVector<const TRuntimeNode> wrappedPayloads;
@@ -1240,5 +1240,339 @@ Y_UNIT_TEST_SUITE(TMiniKQLBlockMapJoinDropKeyColumns) {
} // Y_UNIT_TEST_SUITE
+Y_UNIT_TEST_SUITE(TMiniKQLBlockMapJoinMultiKeyBasicTest) {
+
+ constexpr size_t testSize = 1 << 14;
+ constexpr size_t valueSize = 3;
+ static const TVector<TString> threeLetterValues = GenerateValues(valueSize);
+ static const TSet<ui64> fibonacci = GenerateFibonacci(21);
+
+ Y_UNIT_TEST(TestInnerOnUint64Uint64) {
+ TSetup<false> setup;
+ TProgramBuilder& pgmBuilder = *setup.PgmBuilder;
+ // 1. Make input for the "left" flow.
+ TVector<ui64> keyInit(testSize);
+ std::iota(keyInit.begin(), keyInit.end(), 1);
+ TVector<ui64> subkeyInit;
+ std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(subkeyInit),
+ [](const auto key) { return key * 1001; });
+ TVector<TString> valueInit;
+ std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(valueInit),
+ [](const auto key) { return threeLetterValues[key]; });
+ // 2. Make input for the "right" dict.
+ TVector<ui64> rightKey1Init(fibonacci.cbegin(), fibonacci.cend());
+ TVector<ui64> rightKey2Init;
+ std::transform(rightKey1Init.cbegin(), rightKey1Init.cend(), std::back_inserter(rightKey2Init),
+ [](const auto& key) { return key * 1001; });
+ TVector<TString> rightPayloadInit;
+ std::transform(rightKey1Init.cbegin(), rightKey1Init.cend(), std::back_inserter(rightPayloadInit),
+ [](const auto& key) { return std::to_string(key); });
+ // 3. Make "expected" data.
+ TMap<std::tuple<ui64, ui64>, TString> rightMap;
+ for (size_t i = 0; i < rightKey1Init.size(); i++) {
+ const auto key = std::make_tuple(rightKey1Init[i], rightKey2Init[i]);
+ rightMap[key] = rightPayloadInit[i];
+ }
+ TVector<ui64> keyExpected;
+ TVector<ui64> subkeyExpected;
+ TVector<TString> valueExpected;
+ TVector<TString> rightExpected;
+ for (size_t i = 0; i < keyInit.size(); i++) {
+ const auto key = std::make_tuple(keyInit[i], subkeyInit[i]);
+ const auto found = rightMap.find(key);
+ if (found != rightMap.cend()) {
+ keyExpected.push_back(keyInit[i]);
+ subkeyExpected.push_back(subkeyInit[i]);
+ valueExpected.push_back(valueInit[i]);
+ rightExpected.push_back(found->second);
+ }
+ }
+ // 4. Convert input and expected TVectors to List<UV>.
+ const auto [leftType, leftList] = ConvertVectorsToTuples(setup,
+ keyInit, subkeyInit, valueInit);
+ const auto [expectedType, expected] = ConvertVectorsToTuples(setup,
+ keyExpected, subkeyExpected, valueExpected, rightExpected);
+ // 5. Build "right" computation node.
+ const auto rightKeys = BuildListNodes(pgmBuilder, rightKey1Init, rightKey2Init);
+ const auto rightPayloads = BuildListNodes(pgmBuilder, rightPayloadInit);
+ const auto rightMapNode = MakeDict(pgmBuilder, rightKeys, rightPayloads);
+ // 6. Run tests.
+ RunTestBlockJoin(setup, EJoinKind::Inner, expectedType, expected,
+ rightMapNode, leftType, leftList, {0, 1});
+ }
+
+ Y_UNIT_TEST(TestInnerMultiOnUint64Uint64) {
+ TSetup<false> setup;
+ TProgramBuilder& pgmBuilder = *setup.PgmBuilder;
+ // 1. Make input for the "left" flow.
+ TVector<ui64> keyInit(testSize);
+ std::iota(keyInit.begin(), keyInit.end(), 1);
+ TVector<ui64> subkeyInit;
+ std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(subkeyInit),
+ [](const auto key) { return key * 1001; });
+ TVector<TString> valueInit;
+ std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(valueInit),
+ [](const auto key) { return threeLetterValues[key]; });
+ // 2. Make input for the "right" dict.
+ TVector<ui64> rightKey1Init(fibonacci.cbegin(), fibonacci.cend());
+ TVector<ui64> rightKey2Init;
+ std::transform(rightKey1Init.cbegin(), rightKey1Init.cend(), std::back_inserter(rightKey2Init),
+ [](const auto& key) { return key * 1001; });
+ TVector<TString> rightPayload1Init;
+ std::transform(rightKey1Init.cbegin(), rightKey1Init.cend(), std::back_inserter(rightPayload1Init),
+ [](const auto& key) { return std::to_string(key); });
+ TVector<TString> rightPayload2Init;
+ std::transform(rightKey2Init.cbegin(), rightKey2Init.cend(), std::back_inserter(rightPayload2Init),
+ [](const auto& key) { return std::to_string(key); });
+ // 3. Make "expected" data.
+ TMap<std::tuple<ui64, ui64>, TVector<TString>> rightMultiMap;
+ for (size_t i = 0; i < rightKey1Init.size(); i++) {
+ const auto key = std::make_tuple(rightKey1Init[i], rightKey2Init[i]);
+ rightMultiMap[key] = {rightPayload1Init[i], rightPayload2Init[i]};
+ }
+ TVector<ui64> keyExpected;
+ TVector<ui64> subkeyExpected;
+ TVector<TString> valueExpected;
+ TVector<TString> rightExpected;
+ for (size_t i = 0; i < keyInit.size(); i++) {
+ const auto key = std::make_tuple(keyInit[i], subkeyInit[i]);
+ const auto found = rightMultiMap.find(key);
+ if (found != rightMultiMap.cend()) {
+ for (const auto& right : found->second) {
+ keyExpected.push_back(keyInit[i]);
+ subkeyExpected.push_back(subkeyInit[i]);
+ valueExpected.push_back(valueInit[i]);
+ rightExpected.push_back(right);
+ }
+ }
+ }
+ // 4. Convert input and expected TVectors to List<UV>.
+ const auto [leftType, leftList] = ConvertVectorsToTuples(setup,
+ keyInit, subkeyInit, valueInit);
+ const auto [expectedType, expected] = ConvertVectorsToTuples(setup,
+ keyExpected, subkeyExpected, valueExpected, rightExpected);
+ // 5. Build "right" computation node.
+ const auto rightKeys = BuildListNodes(pgmBuilder, rightKey1Init, rightKey2Init);
+ const auto rightPayloads = BuildListNodes(pgmBuilder, rightPayload1Init, rightPayload2Init);
+ const auto rightMultiMapNode = MakeDict(pgmBuilder, rightKeys, rightPayloads);
+ // 6. Run tests.
+ RunTestBlockJoin(setup, EJoinKind::Inner, expectedType, expected,
+ rightMultiMapNode, leftType, leftList, {0, 1});
+ }
+
+ Y_UNIT_TEST(TestLeftOnUint64Uint64) {
+ TSetup<false> setup;
+ TProgramBuilder& pgmBuilder = *setup.PgmBuilder;
+ // 1. Make input for the "left" flow.
+ TVector<ui64> keyInit(testSize);
+ std::iota(keyInit.begin(), keyInit.end(), 1);
+ TVector<ui64> subkeyInit;
+ std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(subkeyInit),
+ [](const auto key) { return key * 1001; });
+ TVector<TString> valueInit;
+ std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(valueInit),
+ [](const auto key) { return threeLetterValues[key]; });
+ // 2. Make input for the "right" dict.
+ TVector<ui64> rightKey1Init(fibonacci.cbegin(), fibonacci.cend());
+ TVector<ui64> rightKey2Init;
+ std::transform(rightKey1Init.cbegin(), rightKey1Init.cend(), std::back_inserter(rightKey2Init),
+ [](const auto& key) { return key * 1001; });
+ TVector<TString> rightPayloadInit;
+ std::transform(rightKey1Init.cbegin(), rightKey1Init.cend(), std::back_inserter(rightPayloadInit),
+ [](const auto& key) { return std::to_string(key); });
+ // 3. Make "expected" data.
+ TMap<std::tuple<ui64, ui64>, TString> rightMap;
+ for (size_t i = 0; i < rightKey1Init.size(); i++) {
+ const auto key = std::make_tuple(rightKey1Init[i], rightKey2Init[i]);
+ rightMap[key] = rightPayloadInit[i];
+ }
+ TVector<ui64> keyExpected;
+ TVector<ui64> subkeyExpected;
+ TVector<TString> valueExpected;
+ TVector<std::optional<TString>> rightExpected;
+ for (size_t i = 0; i < keyInit.size(); i++) {
+ keyExpected.push_back(keyInit[i]);
+ subkeyExpected.push_back(subkeyInit[i]);
+ valueExpected.push_back(valueInit[i]);
+ const auto key = std::make_tuple(keyInit[i], subkeyInit[i]);
+ const auto found = rightMap.find(key);
+ if (found != rightMap.cend()) {
+ rightExpected.push_back(found->second);
+ } else {
+ rightExpected.push_back(std::nullopt);
+ }
+ }
+ // 4. Convert input and expected TVectors to List<UV>.
+ const auto [leftType, leftList] = ConvertVectorsToTuples(setup,
+ keyInit, subkeyInit, valueInit);
+ const auto [expectedType, expected] = ConvertVectorsToTuples(setup,
+ keyExpected, subkeyExpected, valueExpected, rightExpected);
+ // 5. Build "right" computation node.
+ const auto rightKeys = BuildListNodes(pgmBuilder, rightKey1Init, rightKey2Init);
+ const auto rightPayloads = BuildListNodes(pgmBuilder, rightPayloadInit);
+ const auto rightMapNode = MakeDict(pgmBuilder, rightKeys, rightPayloads);
+ // 6. Run tests.
+ RunTestBlockJoin(setup, EJoinKind::Left, expectedType, expected,
+ rightMapNode, leftType, leftList, {0, 1});
+ }
+
+ Y_UNIT_TEST(TestLeftMultiOnUint64Uint64) {
+ TSetup<false> setup;
+ TProgramBuilder& pgmBuilder = *setup.PgmBuilder;
+ // 1. Make input for the "left" flow.
+ TVector<ui64> keyInit(testSize);
+ std::iota(keyInit.begin(), keyInit.end(), 1);
+ TVector<ui64> subkeyInit;
+ std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(subkeyInit),
+ [](const auto key) { return key * 1001; });
+ TVector<TString> valueInit;
+ std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(valueInit),
+ [](const auto key) { return threeLetterValues[key]; });
+ // 2. Make input for the "right" dict.
+ TVector<ui64> rightKey1Init(fibonacci.cbegin(), fibonacci.cend());
+ TVector<ui64> rightKey2Init;
+ std::transform(rightKey1Init.cbegin(), rightKey1Init.cend(), std::back_inserter(rightKey2Init),
+ [](const auto& key) { return key * 1001; });
+ TVector<TString> rightPayload1Init;
+ std::transform(rightKey1Init.cbegin(), rightKey1Init.cend(), std::back_inserter(rightPayload1Init),
+ [](const auto& key) { return std::to_string(key); });
+ TVector<TString> rightPayload2Init;
+ std::transform(rightKey2Init.cbegin(), rightKey2Init.cend(), std::back_inserter(rightPayload2Init),
+ [](const auto& key) { return std::to_string(key); });
+ // 3. Make "expected" data.
+ TMap<std::tuple<ui64, ui64>, TVector<TString>> rightMultiMap;
+ for (size_t i = 0; i < rightKey1Init.size(); i++) {
+ const auto key = std::make_tuple(rightKey1Init[i], rightKey2Init[i]);
+ rightMultiMap[key] = {rightPayload1Init[i], rightPayload2Init[i]};
+ }
+ TVector<ui64> keyExpected;
+ TVector<ui64> subkeyExpected;
+ TVector<TString> valueExpected;
+ TVector<std::optional<TString>> rightExpected;
+ for (size_t i = 0; i < keyInit.size(); i++) {
+ const auto key = std::make_tuple(keyInit[i], subkeyInit[i]);
+ const auto found = rightMultiMap.find(key);
+ if (found != rightMultiMap.cend()) {
+ for (const auto& right : found->second) {
+ keyExpected.push_back(keyInit[i]);
+ subkeyExpected.push_back(subkeyInit[i]);
+ valueExpected.push_back(valueInit[i]);
+ rightExpected.push_back(right);
+ }
+ } else {
+ keyExpected.push_back(keyInit[i]);
+ subkeyExpected.push_back(subkeyInit[i]);
+ valueExpected.push_back(valueInit[i]);
+ rightExpected.push_back(std::nullopt);
+ }
+ }
+ // 4. Convert input and expected TVectors to List<UV>.
+ const auto [leftType, leftList] = ConvertVectorsToTuples(setup,
+ keyInit, subkeyInit, valueInit);
+ const auto [expectedType, expected] = ConvertVectorsToTuples(setup,
+ keyExpected, subkeyExpected, valueExpected, rightExpected);
+ // 5. Build "right" computation node.
+ const auto rightKeys = BuildListNodes(pgmBuilder, rightKey1Init, rightKey2Init);
+ const auto rightPayloads = BuildListNodes(pgmBuilder, rightPayload1Init, rightPayload2Init);
+ const auto rightMultiMapNode = MakeDict(pgmBuilder, rightKeys, rightPayloads);
+ // 6. Run tests.
+ RunTestBlockJoin(setup, EJoinKind::Left, expectedType, expected,
+ rightMultiMapNode, leftType, leftList, {0, 1});
+ }
+
+ Y_UNIT_TEST(TestLeftSemiOnUint64Uint64) {
+ TSetup<false> setup;
+ TProgramBuilder& pgmBuilder = *setup.PgmBuilder;
+ // 1. Make input for the "left" flow.
+ TVector<ui64> keyInit(testSize);
+ std::iota(keyInit.begin(), keyInit.end(), 1);
+ TVector<ui64> subkeyInit;
+ std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(subkeyInit),
+ [](const auto key) { return key * 1001; });
+ TVector<TString> valueInit;
+ std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(valueInit),
+ [](const auto key) { return threeLetterValues[key]; });
+ // 2. Make input for the "right" dict.
+ TVector<ui64> rightKey1Init(fibonacci.cbegin(), fibonacci.cend());
+ TVector<ui64> rightKey2Init;
+ std::transform(rightKey1Init.cbegin(), rightKey1Init.cend(), std::back_inserter(rightKey2Init),
+ [](const auto& key) { return key * 1001; });
+ // 3. Make "expected" data.
+ TSet<std::tuple<ui64, ui64>> rightSet;
+ for (size_t i = 0; i < rightKey1Init.size(); i++) {
+ rightSet.emplace(std::make_tuple(rightKey1Init[i], rightKey2Init[i]));
+ }
+ TVector<ui64> keyExpected;
+ TVector<ui64> subkeyExpected;
+ TVector<TString> valueExpected;
+ for (size_t i = 0; i < keyInit.size(); i++) {
+ const auto key = std::make_tuple(keyInit[i], subkeyInit[i]);
+ if (rightSet.contains(key)) {
+ keyExpected.push_back(keyInit[i]);
+ subkeyExpected.push_back(subkeyInit[i]);
+ valueExpected.push_back(valueInit[i]);
+ }
+ }
+ // 4. Convert input and expected TVectors to List<UV>.
+ const auto [leftType, leftList] = ConvertVectorsToTuples(setup,
+ keyInit, subkeyInit, valueInit);
+ const auto [expectedType, expected] = ConvertVectorsToTuples(setup,
+ keyExpected, subkeyExpected, valueExpected);
+ // 5. Build "right" computation node.
+ const auto rightKeys = BuildListNodes(pgmBuilder, rightKey1Init, rightKey2Init);
+ const auto rightSetNode = MakeSet(pgmBuilder, rightKeys);
+ // 6. Run tests.
+ RunTestBlockJoin(setup, EJoinKind::LeftSemi, expectedType, expected,
+ rightSetNode, leftType, leftList, {0, 1});
+ }
+
+ Y_UNIT_TEST(TestLeftOnlyOnUint64Uint64) {
+ TSetup<false> setup;
+ TProgramBuilder& pgmBuilder = *setup.PgmBuilder;
+ // 1. Make input for the "left" flow.
+ TVector<ui64> keyInit(testSize);
+ std::iota(keyInit.begin(), keyInit.end(), 1);
+ TVector<ui64> subkeyInit;
+ std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(subkeyInit),
+ [](const auto key) { return key * 1001; });
+ TVector<TString> valueInit;
+ std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(valueInit),
+ [](const auto key) { return threeLetterValues[key]; });
+ // 2. Make input for the "right" dict.
+ TVector<ui64> rightKey1Init(fibonacci.cbegin(), fibonacci.cend());
+ TVector<ui64> rightKey2Init;
+ std::transform(rightKey1Init.cbegin(), rightKey1Init.cend(), std::back_inserter(rightKey2Init),
+ [](const auto& key) { return key * 1001; });
+ // 3. Make "expected" data.
+ TSet<std::tuple<ui64, ui64>> rightSet;
+ for (size_t i = 0; i < rightKey1Init.size(); i++) {
+ rightSet.emplace(std::make_tuple(rightKey1Init[i], rightKey2Init[i]));
+ }
+ TVector<ui64> keyExpected;
+ TVector<ui64> subkeyExpected;
+ TVector<TString> valueExpected;
+ for (size_t i = 0; i < keyInit.size(); i++) {
+ const auto key = std::make_tuple(keyInit[i], subkeyInit[i]);
+ if (!rightSet.contains(key)) {
+ keyExpected.push_back(keyInit[i]);
+ subkeyExpected.push_back(subkeyInit[i]);
+ valueExpected.push_back(valueInit[i]);
+ }
+ }
+ // 4. Convert input and expected TVectors to List<UV>.
+ const auto [leftType, leftList] = ConvertVectorsToTuples(setup,
+ keyInit, subkeyInit, valueInit);
+ const auto [expectedType, expected] = ConvertVectorsToTuples(setup,
+ keyExpected, subkeyExpected, valueExpected);
+ // 5. Build "right" computation node.
+ const auto rightKeys = BuildListNodes(pgmBuilder, rightKey1Init, rightKey2Init);
+ const auto rightSetNode = MakeSet(pgmBuilder, rightKeys);
+ // 6. Run tests.
+ RunTestBlockJoin(setup, EJoinKind::LeftOnly, expectedType, expected,
+ rightSetNode, leftType, leftList, {0, 1});
+ }
+
+} // Y_UNIT_TEST_SUITE
+
} // namespace NMiniKQL
} // namespace NKikimr