aboutsummaryrefslogtreecommitdiffstats
path: root/yql/essentials
diff options
context:
space:
mode:
authorMaxim Yurchuk <maxim-yurchuk@ydb.tech>2024-12-12 15:00:43 +0000
committerGitHub <noreply@github.com>2024-12-12 15:00:43 +0000
commit42701242eaf5be980cb935631586d0e90b82641c (patch)
tree6dbf5fcd37d3c16591e196c4a69d166e3ab3a398 /yql/essentials
parent7f5a9f394dbd9ac290cabbb7977538656b3a541e (diff)
parentf7c04b5876af3d16849ab5e3079c0eabbd4e3a00 (diff)
downloadydb-42701242eaf5be980cb935631586d0e90b82641c.tar.gz
Merge pull request #12554 from vitalyisaev2/YQ-3839.with_rightlib.3
Import from Arcadia + YDB FQ: turning gateways_config.proto into a file without external dependencies
Diffstat (limited to 'yql/essentials')
-rw-r--r--yql/essentials/ast/ya.make2
-rw-r--r--yql/essentials/ast/yql_expr.h6
-rw-r--r--yql/essentials/ast/yql_type_string.cpp22
-rw-r--r--yql/essentials/ast/yql_type_string_ut.cpp4
-rw-r--r--yql/essentials/core/common_opt/yql_co_simple1.cpp16
-rw-r--r--yql/essentials/minikql/comp_nodes/mkql_block_map_join.cpp976
-rw-r--r--yql/essentials/minikql/comp_nodes/mkql_rh_hash.h51
-rw-r--r--yql/essentials/minikql/comp_nodes/ut/mkql_block_map_join_ut.cpp2518
-rw-r--r--yql/essentials/minikql/comp_nodes/ut/mkql_block_map_join_ut_utils.cpp331
-rw-r--r--yql/essentials/minikql/comp_nodes/ut/mkql_block_map_join_ut_utils.h113
-rw-r--r--yql/essentials/minikql/comp_nodes/ut/mkql_rh_hash_ut.cpp84
-rw-r--r--yql/essentials/minikql/comp_nodes/ut/ya.make.inc1
-rw-r--r--yql/essentials/minikql/mkql_program_builder.cpp61
-rw-r--r--yql/essentials/minikql/mkql_program_builder.h11
-rw-r--r--yql/essentials/minikql/mkql_runtime_version.h2
-rw-r--r--yql/essentials/providers/common/proto/gateways_config.proto129
-rw-r--r--yql/essentials/providers/common/proto/ya.make1
-rw-r--r--yql/essentials/public/udf/udf_types.h5
-rw-r--r--yql/essentials/public/udf/udf_version.h2
-rw-r--r--yql/essentials/sql/pg/pg_sql.cpp2
-rw-r--r--yql/essentials/sql/v1/builtin.cpp16
-rw-r--r--yql/essentials/sql/v1/format/sql_format.cpp6
-rw-r--r--yql/essentials/sql/v1/format/sql_format_ut.h9
-rw-r--r--yql/essentials/tests/s-expressions/suites/Optimizers/FuseLMapAfterLReduce.cfg2
-rw-r--r--yql/essentials/tests/s-expressions/suites/Optimizers/FuseLMapAfterLReduce.sql2
-rw-r--r--yql/essentials/tests/s-expressions/suites/Optimizers/FuseLMapAfterLReduce.yql8
-rw-r--r--yql/essentials/tests/s-expressions/suites/Optimizers/FuseLMapAfterReduce.cfg2
-rw-r--r--yql/essentials/tests/s-expressions/suites/Optimizers/FuseLMapAfterReduce.sql2
-rw-r--r--yql/essentials/tests/s-expressions/suites/Optimizers/FuseLMapAfterReduce.yql4
-rw-r--r--yql/essentials/tests/s-expressions/suites/Optimizers/FuseMapAfterLReduce.cfg2
-rw-r--r--yql/essentials/tests/s-expressions/suites/Optimizers/FuseMapAfterLReduce.sql2
-rw-r--r--yql/essentials/tests/s-expressions/suites/Optimizers/FuseMapAfterLReduce.yql6
-rw-r--r--yql/essentials/tests/sql/sql2yql/canondata/result.json36
-rw-r--r--yql/essentials/tests/sql/sql2yql/canondata/test_sql_format.test_aggregate-compact_distinct_/formatted.sql36
-rw-r--r--yql/essentials/tests/sql/sql2yql/canondata/test_sql_format.test_aggregate-no_compact_distinct_/formatted.sql35
-rw-r--r--yql/essentials/tests/sql/sql2yql/canondata/test_sql_format.test_expr-variant_builtins_opt_/formatted.sql2
-rw-r--r--yql/essentials/tests/sql/sql2yql/canondata/test_sql_format.test_expr-variant_tuple_builtins_/formatted.sql2
-rw-r--r--yql/essentials/tests/sql/sql2yql/canondata/test_sql_format.test_produce-reduce_with_trivial_remaps_/formatted.sql23
-rw-r--r--yql/essentials/tests/sql/suites/aggregate/compact_distinct.cfg3
-rw-r--r--yql/essentials/tests/sql/suites/aggregate/compact_distinct.sql14
-rw-r--r--yql/essentials/tests/sql/suites/aggregate/no_compact_distinct.cfg3
-rw-r--r--yql/essentials/tests/sql/suites/aggregate/no_compact_distinct.sql13
-rw-r--r--yql/essentials/tests/sql/suites/expr/variant_builtins_opt.sql2
-rw-r--r--yql/essentials/tests/sql/suites/expr/variant_tuple_builtins.sql2
-rw-r--r--yql/essentials/tests/sql/suites/join/cbo_4tables.cfg2
-rw-r--r--yql/essentials/tests/sql/suites/produce/reduce_with_trivial_remaps.cfg3
-rw-r--r--yql/essentials/tests/sql/suites/produce/reduce_with_trivial_remaps.sql15
-rw-r--r--yql/essentials/tests/sql/suites/produce/sorted2.txt10
-rw-r--r--yql/essentials/tests/sql/suites/produce/sorted2.txt.attr11
49 files changed, 2499 insertions, 2111 deletions
diff --git a/yql/essentials/ast/ya.make b/yql/essentials/ast/ya.make
index 04b39457a0..58c21a1d7f 100644
--- a/yql/essentials/ast/ya.make
+++ b/yql/essentials/ast/ya.make
@@ -41,6 +41,8 @@ PEERDIR(
yql/essentials/parser/pg_catalog
)
+YQL_LAST_ABI_VERSION()
+
END()
RECURSE_FOR_TESTS(
diff --git a/yql/essentials/ast/yql_expr.h b/yql/essentials/ast/yql_expr.h
index 8238204552..a903c0022b 100644
--- a/yql/essentials/ast/yql_expr.h
+++ b/yql/essentials/ast/yql_expr.h
@@ -1095,12 +1095,6 @@ public:
}
};
-struct TArgumentFlags {
- enum {
- AutoMap = 0x01,
- };
-};
-
class TCallableExprType : public TTypeAnnotationNode {
public:
static constexpr ETypeAnnotationKind KindValue = ETypeAnnotationKind::Callable;
diff --git a/yql/essentials/ast/yql_type_string.cpp b/yql/essentials/ast/yql_type_string.cpp
index b9b6570cf0..8d38a751c8 100644
--- a/yql/essentials/ast/yql_type_string.cpp
+++ b/yql/essentials/ast/yql_type_string.cpp
@@ -3,8 +3,10 @@
#include "yql_ast_escaping.h"
#include <yql/essentials/parser/pg_catalog/catalog.h>
+#include <yql/essentials/public/udf/udf_types.h>
#include <library/cpp/containers/stack_vector/stack_vec.h>
+
#include <util/string/cast.h>
#include <util/generic/map.h>
#include <util/generic/utility.h>
@@ -559,7 +561,9 @@ private:
for (;;) {
if (Token == TOKEN_IDENTIFIER) {
if (Identifier == TStringBuf("AutoMap")) {
- argFlags |= TArgumentFlags::AutoMap;
+ argFlags |= NUdf::ICallablePayload::TArgumentFlags::AutoMap;
+ } else if (Identifier == TStringBuf("NoYield")) {
+ argFlags |= NUdf::ICallablePayload::TArgumentFlags::NoYield;
} else {
AddError(TString("Unknown flag name: ") + Identifier);
return false;
@@ -1329,8 +1333,22 @@ private:
argInfo.Type->Accept(*this);
if (argInfo.Flags) {
Out_ << TStringBuf("{Flags:");
- if (argInfo.Flags & TArgumentFlags::AutoMap) {
+ bool start = true;
+ if (argInfo.Flags & NUdf::ICallablePayload::TArgumentFlags::AutoMap) {
+ if (!start) {
+ Out_ << '|';
+ }
+
Out_ << TStringBuf("AutoMap");
+ start = false;
+ }
+ if (argInfo.Flags & NUdf::ICallablePayload::TArgumentFlags::NoYield) {
+ if (!start) {
+ Out_ << '|';
+ }
+
+ Out_ << TStringBuf("NoYield");
+ start = false;
}
Out_ << '}';
}
diff --git a/yql/essentials/ast/yql_type_string_ut.cpp b/yql/essentials/ast/yql_type_string_ut.cpp
index 9b6db5f11e..676f6c2397 100644
--- a/yql/essentials/ast/yql_type_string_ut.cpp
+++ b/yql/essentials/ast/yql_type_string_ut.cpp
@@ -203,6 +203,10 @@ Y_UNIT_TEST_SUITE(TTypeString)
"(CallableType '() '((DataType 'Double)) "
"'((DataType 'Int32) 'x '1)"
")");
+ TestOk("(x:Int32{Flags: AutoMap | NoYield})->Double",
+ "(CallableType '() '((DataType 'Double)) "
+ "'((DataType 'Int32) 'x '3)"
+ ")");
}
Y_UNIT_TEST(ParseCallableWithPayload) {
diff --git a/yql/essentials/core/common_opt/yql_co_simple1.cpp b/yql/essentials/core/common_opt/yql_co_simple1.cpp
index 06107c3bfd..4f0ddab741 100644
--- a/yql/essentials/core/common_opt/yql_co_simple1.cpp
+++ b/yql/essentials/core/common_opt/yql_co_simple1.cpp
@@ -5465,12 +5465,18 @@ void RegisterCoSimpleCallables1(TCallableOptimizerMap& map) {
if (node->ChildrenSize() % 2 == 1) { // No default value
bool allJust = true;
+ bool allSingleAsList = true;
TNodeSet uniqLambdas;
for (ui32 index = 1; index < node->ChildrenSize(); index += 2) {
- uniqLambdas.insert(node->Child(index + 1));
- if (!TCoJust::Match(node->Child(index + 1)->Child(1))) {
+ const TExprNode* visitLambda = node->Child(index + 1);
+ const TExprNode* body = visitLambda->Child(1);
+ uniqLambdas.insert(visitLambda);
+ if (!TCoJust::Match(body)) {
allJust = false;
}
+ if (!TCoAsList::Match(body) || body->ChildrenSize() != 1) {
+ allSingleAsList = false;
+ }
}
if (uniqLambdas.size() == 1 && node->ChildrenSize() > 3) {
@@ -5486,10 +5492,10 @@ void RegisterCoSimpleCallables1(TCallableOptimizerMap& map) {
.Build();
}
- if (allJust) {
- YQL_CLOG(DEBUG, Core) << node->Content() << " - extract Just";
+ if (allJust || allSingleAsList) {
+ YQL_CLOG(DEBUG, Core) << node->Content() << " - extract " << (allJust ? "Just" : "AsList");
return ctx.Builder(node->Pos())
- .Callable("Just")
+ .Callable(allJust ? "Just" : "AsList")
.Callable(0, "Visit")
.Add(0, node->HeadPtr())
.Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& {
diff --git a/yql/essentials/minikql/comp_nodes/mkql_block_map_join.cpp b/yql/essentials/minikql/comp_nodes/mkql_block_map_join.cpp
index e5b53bee5c..230509757b 100644
--- a/yql/essentials/minikql/comp_nodes/mkql_block_map_join.cpp
+++ b/yql/essentials/minikql/comp_nodes/mkql_block_map_join.cpp
@@ -1,9 +1,10 @@
-#include "mkql_map_join.h"
+#include "mkql_block_map_join.h"
#include <yql/essentials/minikql/computation/mkql_block_builder.h>
#include <yql/essentials/minikql/computation/mkql_block_impl.h>
#include <yql/essentials/minikql/computation/mkql_block_reader.h>
#include <yql/essentials/minikql/computation/mkql_computation_node_holders_codegen.h>
+#include <yql/essentials/minikql/comp_nodes/mkql_rh_hash.h>
#include <yql/essentials/minikql/invoke_builtins/mkql_builtins.h>
#include <yql/essentials/minikql/mkql_node_cast.h>
#include <yql/essentials/minikql/mkql_program_builder.h>
@@ -23,6 +24,19 @@ size_t CalcMaxBlockLength(const TVector<TType*>& items) {
}));
}
+ui64 CalculateTupleHash(const std::vector<ui64>& hashes) {
+ ui64 hash = 0;
+ for (size_t i = 0; i < hashes.size(); i++) {
+ if (!hashes[i]) {
+ return 0;
+ }
+
+ hash = CombineHashes(hash, hashes[i]);
+ }
+
+ return hash;
+}
+
template <bool RightRequired>
class TBlockJoinState : public TBlockState {
public:
@@ -39,10 +53,12 @@ public:
{
const auto& pgBuilder = ctx.Builder->GetPgBuilder();
MaxLength_ = CalcMaxBlockLength(outputItems);
+ TBlockTypeHelper helper;
for (size_t i = 0; i < inputItems.size(); i++) {
- const TType* blockItemType = AS_TYPE(TBlockType, inputItems[i])->GetItemType();
+ TType* blockItemType = AS_TYPE(TBlockType, inputItems[i])->GetItemType();
Readers_.push_back(MakeBlockReader(TTypeInfoHelper(), blockItemType));
Converters_.push_back(MakeBlockItemConverter(TTypeInfoHelper(), blockItemType, pgBuilder));
+ Hashers_.push_back(helper.MakeHasher(blockItemType));
}
// The last output column (i.e. block length) doesn't require a block builder.
for (size_t i = 0; i < OutputWidth_; i++) {
@@ -91,6 +107,27 @@ public:
OutputRows_++;
}
+ void MakeRow(const std::vector<NYql::NUdf::TBlockItem>& rightColumns) {
+ size_t builderIndex = 0;
+
+ for (size_t i = 0; i < LeftIOMap_.size(); i++, builderIndex++) {
+ AddItem(GetItem(LeftIOMap_[i]), builderIndex);
+ }
+
+ if (!rightColumns.empty()) {
+ Y_ENSURE(LeftIOMap_.size() + rightColumns.size() == OutputWidth_);
+ for (size_t i = 0; i < rightColumns.size(); i++) {
+ AddItem(rightColumns[i], builderIndex++);
+ }
+ } else {
+ while (builderIndex < OutputWidth_) {
+ AddItem(TBlockItem(), builderIndex++);
+ }
+ }
+
+ OutputRows_++;
+ }
+
void MakeBlocks(const THolderFactory& holderFactory) {
Values.back() = holderFactory.CreateArrowBlock(arrow::Datum(std::make_shared<arrow::UInt64Scalar>(OutputRows_)));
OutputRows_ = 0;
@@ -102,14 +139,21 @@ public:
FillArrays();
}
- TBlockItem GetItem(size_t idx) const {
+ TBlockItem GetItem(size_t idx, size_t offset = 0) const {
+ Y_ENSURE(Current_ + offset < InputRows_);
const auto& datum = TArrowBlock::From(Inputs_[idx]).GetDatum();
ARROW_DEBUG_CHECK_DATUM_TYPES(InputsDescr_[idx], datum.descr());
if (datum.is_scalar()) {
return Readers_[idx]->GetScalarItem(*datum.scalar());
}
MKQL_ENSURE(datum.is_array(), "Expecting array");
- return Readers_[idx]->GetItem(*datum.array(), Current_);
+ return Readers_[idx]->GetItem(*datum.array(), Current_ + offset);
+ }
+
+ std::pair<TBlockItem, ui64> GetItemWithHash(size_t idx, size_t offset) const {
+ auto item = GetItem(idx, offset);
+ ui64 hash = Hashers_[idx]->Hash(item);
+ return std::make_pair(item, hash);
}
NUdf::TUnboxedValuePod GetValue(const THolderFactory& holderFactory, size_t idx) const {
@@ -117,7 +161,7 @@ public:
}
void Reset() {
- Next_ = 0;
+ Current_ = 0;
InputRows_ = GetBlockCount(Inputs_.back());
}
@@ -125,12 +169,8 @@ public:
IsFinished_ = true;
}
- bool NextRow() {
- if (Next_ >= InputRows_) {
- return false;
- }
- Current_ = Next_++;
- return true;
+ void NextRow() {
+ Current_++;
}
bool HasBlocks() {
@@ -150,6 +190,11 @@ public:
return IsFinished_;
}
+ size_t RemainingRowsCount() const {
+ Y_ENSURE(InputRows_ >= Current_);
+ return InputRows_ - Current_;
+ }
+
NUdf::TUnboxedValue* GetRawInputFields() {
return Inputs_.data();
}
@@ -174,7 +219,6 @@ private:
}
size_t Current_ = 0;
- size_t Next_ = 0;
bool IsFinished_ = false;
size_t MaxLength_;
size_t BuilderAllocatedSize_ = 0;
@@ -190,307 +234,649 @@ private:
TVector<std::unique_ptr<IBlockReader>> Readers_;
TVector<std::unique_ptr<IBlockItemConverter>> Converters_;
TVector<std::unique_ptr<IArrayBuilder>> Builders_;
+ TVector<NYql::NUdf::IBlockItemHasher::TPtr> Hashers_;
};
-template <bool WithoutRight, bool RightRequired, bool IsTuple>
-class TBlockWideMapJoinWrapper : public TMutableComputationNode<TBlockWideMapJoinWrapper<WithoutRight, RightRequired, IsTuple>>
-{
-using TBaseComputation = TMutableComputationNode<TBlockWideMapJoinWrapper<WithoutRight, RightRequired, IsTuple>>;
-using TState = TBlockJoinState<RightRequired>;
-public:
- TBlockWideMapJoinWrapper(TComputationMutables& mutables,
- const TVector<TType*>&& resultJoinItems, const TVector<TType*>&& leftStreamItems,
- const TVector<ui32>&& leftKeyColumns, const TVector<ui32>&& leftIOMap,
- IComputationNode* stream, IComputationNode* dict)
- : TBaseComputation(mutables, EValueRepresentation::Boxed)
- , ResultJoinItems_(std::move(resultJoinItems))
- , LeftStreamItems_(std::move(leftStreamItems))
- , LeftKeyColumns_(std::move(leftKeyColumns))
- , LeftIOMap_(std::move(leftIOMap))
- , Stream_(stream)
- , Dict_(dict)
- , KeyTupleCache_(mutables)
- {}
+class TBlockIndex : public TComputationValue<TBlockIndex> {
+ struct TIndexEntry {
+ ui32 BlockOffset;
+ ui32 ItemOffset;
- NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const {
- NUdf::TUnboxedValue* items = nullptr;
- const auto keys = KeyTupleCache_.NewArray(ctx, LeftKeyColumns_.size(), items);
- const auto state = ctx.HolderFactory.Create<TState>(ctx, LeftStreamItems_,
- LeftIOMap_, ResultJoinItems_);
- return ctx.HolderFactory.Create<TStreamValue>(ctx.HolderFactory,
- std::move(state),
- std::move(Stream_->GetValue(ctx)),
- std::move(Dict_->GetValue(ctx)),
- LeftKeyColumns_,
- std::move(keys), items);
- }
+ TIndexEntry() = default;
+ TIndexEntry(ui32 blockOffset, ui32 itemOffset)
+ : BlockOffset(blockOffset)
+ , ItemOffset(itemOffset)
+ {}
+ };
-private:
- class TStreamValue : public TComputationValue<TStreamValue> {
- using TBase = TComputationValue<TStreamValue>;
+ struct TIndexNode {
+ TIndexEntry Entry;
+ TIndexNode* Next;
+
+ TIndexNode() = delete;
+ TIndexNode(TIndexEntry entry, TIndexNode* next = nullptr)
+ : Entry(entry)
+ , Next(next)
+ {}
+ };
+
+ class TIndexMapValue {
public:
- TStreamValue(TMemoryUsageInfo* memInfo, const THolderFactory& holderFactory,
- NUdf::TUnboxedValue&& blockState, NUdf::TUnboxedValue&& stream,
- NUdf::TUnboxedValue&& dict, const TVector<ui32>& leftKeyColumns,
- NUdf::TUnboxedValue&& keyValue, NUdf::TUnboxedValue* keyItems)
- : TBase(memInfo)
- , BlockState_(blockState)
- , Stream_(stream)
- , Dict_(dict)
- , KeyValue_(keyValue)
- , KeyItems_(keyItems)
- , LeftKeyColumns_(leftKeyColumns)
- , HolderFactory_(holderFactory)
+ TIndexMapValue()
+ : Raw(0)
+ {}
+
+ TIndexMapValue(TIndexEntry entry) {
+ TIndexEntryUnion un;
+ un.Entry = entry;
+
+ Y_ENSURE(((un.Raw << 1) >> 1) == un.Raw);
+ Raw = (un.Raw << 1) | 1;
+ }
+
+ TIndexMapValue(TIndexNode* entryList)
+ : EntryList(entryList)
{}
+ bool IsInplace() const {
+ return Raw & 1;
+ }
+
+ TIndexNode* GetList() const {
+ Y_ENSURE(!IsInplace());
+ return EntryList;
+ }
+
+ TIndexEntry GetEntry() const {
+ Y_ENSURE(IsInplace());
+
+ TIndexEntryUnion un;
+ un.Raw = Raw >> 1;
+ return un.Entry;
+ }
+
private:
- NUdf::EFetchStatus WideFetch(NUdf::TUnboxedValue* output, ui32 width) {
- auto& blockState = *static_cast<TState*>(BlockState_.AsBoxed().Get());
- auto* inputFields = blockState.GetRawInputFields();
- const size_t inputWidth = blockState.GetInputWidth();
- const size_t outputWidth = blockState.GetOutputWidth();
+ union TIndexEntryUnion {
+ TIndexEntry Entry;
+ ui64 Raw;
+ };
+
+ union {
+ TIndexNode* EntryList;
+ ui64 Raw;
+ };
+ };
- MKQL_ENSURE(width == outputWidth,
- "The given width doesn't equal to the result type size");
+ static_assert(sizeof(TIndexMapValue) == 8);
- while (!blockState.HasBlocks()) {
- while (blockState.IsNotFull() && blockState.NextRow()) {
- const auto key = MakeKeysTuple(blockState);
- if constexpr (WithoutRight) {
- if ((key && Dict_.Contains(key)) == RightRequired) {
- blockState.CopyRow();
- }
- } else if (NUdf::TUnboxedValue lookup; key && (lookup = Dict_.Lookup(key))) {
- blockState.MakeRow(lookup);
- } else if constexpr (!RightRequired) {
- blockState.MakeRow(NUdf::TUnboxedValue());
- }
+ using TBase = TComputationValue<TBlockIndex>;
+ using TIndexMap = TRobinHoodHashFixedMap<
+ ui64,
+ TIndexMapValue,
+ std::equal_to<ui64>,
+ std::hash<ui64>,
+ TMKQLAllocator<char>
+ >;
+
+public:
+ class TIterator {
+ enum class EIteratorType {
+ EMPTY,
+ INPLACE,
+ LIST
+ };
+
+ public:
+ TIterator() = default;
+
+ TIterator(TBlockIndex* blockIndex)
+ : Type_(EIteratorType::EMPTY)
+ , BlockIndex_(blockIndex)
+ {}
+
+ TIterator(TBlockIndex* blockIndex, TIndexEntry entry, std::vector<NYql::NUdf::TBlockItem> itemsToLookup)
+ : Type_(EIteratorType::INPLACE)
+ , BlockIndex_(blockIndex)
+ , Entry_(entry)
+ , EntryConsumed_(false)
+ , ItemsToLookup_(std::move(itemsToLookup))
+ {}
+
+ TIterator(TBlockIndex* blockIndex, TIndexNode* node, std::vector<NYql::NUdf::TBlockItem> itemsToLookup)
+ : Type_(EIteratorType::LIST)
+ , BlockIndex_(blockIndex)
+ , Node_(node)
+ , ItemsToLookup_(std::move(itemsToLookup))
+ {}
+
+ TIterator(const TIterator&) = delete;
+ TIterator& operator=(const TIterator&) = delete;
+
+ TIterator(TIterator&& other) {
+ *this = std::move(other);
+ }
+
+ TIterator& operator=(TIterator&& other) {
+ if (this != &other) {
+ Type_ = other.Type_;
+ BlockIndex_ = other.BlockIndex_;
+ ItemsToLookup_ = std::move(other.ItemsToLookup_);
+
+ switch (Type_) {
+ case EIteratorType::EMPTY:
+ break;
+
+ case EIteratorType::INPLACE:
+ Entry_ = other.Entry_;
+ EntryConsumed_ = other.EntryConsumed_;
+ break;
+
+ case EIteratorType::LIST:
+ Node_ = other.Node_;
+ break;
}
- if (blockState.IsNotFull() && !blockState.IsFinished()) {
- switch (Stream_.WideFetch(inputFields, inputWidth)) {
- case NUdf::EFetchStatus::Yield:
- return NUdf::EFetchStatus::Yield;
- case NUdf::EFetchStatus::Ok:
- blockState.Reset();
- continue;
- case NUdf::EFetchStatus::Finish:
- blockState.Finish();
- break;
+
+ other.BlockIndex_ = nullptr;
+ }
+ return *this;
+ }
+
+ TMaybe<TIndexEntry> Next() {
+ Y_ENSURE(IsValid());
+
+ switch (Type_) {
+ case EIteratorType::EMPTY:
+ return Nothing();
+
+ case EIteratorType::INPLACE:
+ if (EntryConsumed_) {
+ return Nothing();
+ }
+
+ EntryConsumed_ = true;
+ return CheckEntry(Entry_) ? TMaybe<TIndexEntry>(Entry_) : Nothing();
+
+ case EIteratorType::LIST:
+ for (; Node_ != nullptr; Node_ = Node_->Next) {
+ if (CheckEntry(Node_->Entry)) {
+ auto entry = Node_->Entry;
+ Node_ = Node_->Next;
+ return entry;
}
- // Leave the loop, if no values left in the stream.
- Y_DEBUG_ABORT_UNLESS(blockState.IsFinished());
}
- if (blockState.IsEmpty()) {
- return NUdf::EFetchStatus::Finish;
+
+ return Nothing();
+ }
+ }
+
+ bool IsValid() const {
+ return BlockIndex_;
+ }
+
+ bool IsEmpty() const {
+ Y_ENSURE(IsValid());
+
+ switch (Type_) {
+ case EIteratorType::EMPTY:
+ return true;
+ case EIteratorType::INPLACE:
+ return EntryConsumed_;
+ case EIteratorType::LIST:
+ return Node_ == nullptr;
+ }
+ }
+
+ void Reset() {
+ *this = TIterator();
+ }
+
+ private:
+ bool CheckEntry(const TIndexEntry& entry) {
+ for (size_t i = 0; i < BlockIndex_->KeyColumns_.size(); i++) {
+ auto indexItem = BlockIndex_->GetItem(entry, BlockIndex_->KeyColumns_[i]);
+ if (BlockIndex_->Comparators_[BlockIndex_->KeyColumns_[i]]->Equals(indexItem, ItemsToLookup_[i])) {
+ return true;
}
- blockState.MakeBlocks(HolderFactory_);
}
- const auto sliceSize = blockState.Slice();
+ return false;
+ }
- for (size_t i = 0; i < outputWidth; i++) {
- output[i] = blockState.Get(sliceSize, HolderFactory_, i);
+ private:
+ EIteratorType Type_;
+ TBlockIndex* BlockIndex_ = nullptr;
+
+ union {
+ TIndexNode* Node_;
+ struct {
+ TIndexEntry Entry_;
+ bool EntryConsumed_;
+ };
+ };
+
+ std::vector<NYql::NUdf::TBlockItem> ItemsToLookup_;
+ };
+
+public:
+ TBlockIndex(
+ TMemoryUsageInfo* memInfo,
+ const TVector<TType*>& itemTypes,
+ const TVector<ui32>& keyColumns,
+ NUdf::TUnboxedValue stream
+ )
+ : TBase(memInfo)
+ , InputsDescr_(ToValueDescr(itemTypes))
+ , KeyColumns_(keyColumns)
+ , Stream_(stream)
+ , Inputs_(itemTypes.size())
+ {
+ TBlockTypeHelper helper;
+ for (size_t i = 0; i < itemTypes.size(); i++) {
+ TType* blockItemType = AS_TYPE(TBlockType, itemTypes[i])->GetItemType();
+ Readers_.push_back(MakeBlockReader(TTypeInfoHelper(), blockItemType));
+ Hashers_.push_back(helper.MakeHasher(blockItemType));
+ Comparators_.push_back(helper.MakeComparator(blockItemType));
+ }
+ }
+
+ NUdf::EFetchStatus FetchStream() {
+ switch (Stream_.WideFetch(Inputs_.data(), Inputs_.size())) {
+ case NUdf::EFetchStatus::Yield:
+ return NUdf::EFetchStatus::Yield;
+ case NUdf::EFetchStatus::Finish:
+ return NUdf::EFetchStatus::Finish;
+ case NUdf::EFetchStatus::Ok:
+ break;
+ }
+
+ std::vector<arrow::Datum> block;
+ for (size_t i = 0; i < Inputs_.size() - 1; i++) {
+ auto& datum = TArrowBlock::From(Inputs_[i]).GetDatum();
+ ARROW_DEBUG_CHECK_DATUM_TYPES(InputsDescr_[i], datum.descr());
+ block.push_back(std::move(datum));
+ }
+
+ auto blockSize = GetBlockCount(Inputs_[Inputs_.size() - 1]);
+
+ std::array<TRobinHoodBatchRequestItem<ui64>, PrefetchBatchSize> insertBatch;
+ std::array<TIndexEntry, PrefetchBatchSize> insertBatchEntries;
+ ui32 insertBatchLen = 0;
+
+ auto processInsertBatch = [&]() {
+ Index_.BatchInsert({insertBatch.data(), insertBatchLen}, [&](size_t i, TIndexMap::iterator iter, bool isNew) {
+ auto value = static_cast<TIndexMapValue*>(Index_.GetMutablePayload(iter));
+ if (isNew) {
+ // Store single entry inplace
+ *value = TIndexMapValue(insertBatchEntries[i]);
+ Index_.CheckGrow();
+ } else {
+ // Store as list
+ if (value->IsInplace()) {
+ *value = TIndexMapValue(InsertIndexNode(value->GetEntry()));
+ }
+
+ *value = TIndexMapValue(InsertIndexNode(insertBatchEntries[i], value->GetList()));
+ }
+ });
+ };
+
+ Y_ENSURE(Data_.size() <= std::numeric_limits<ui32>::max());
+ Y_ENSURE(blockSize <= std::numeric_limits<ui32>::max());
+ for (size_t i = 0; i < blockSize; i++) {
+ ui64 keyHash = CalculateKeyHash(block, i);
+ if (!keyHash) {
+ continue;
}
- return NUdf::EFetchStatus::Ok;
+ insertBatchEntries[insertBatchLen] = TIndexEntry(Data_.size(), i);
+ insertBatch[insertBatchLen].ConstructKey(keyHash);
+ insertBatchLen++;
+
+ if (insertBatchLen == PrefetchBatchSize) {
+ processInsertBatch();
+ insertBatchLen = 0;
+ }
+ }
+
+ if (insertBatchLen > 0) {
+ processInsertBatch();
}
- NUdf::TUnboxedValue MakeKeysTuple(const TState& blockState) const {
- // TODO: Handle converters.
- if constexpr (!IsTuple) {
- return blockState.GetValue(HolderFactory_, LeftKeyColumns_.front());
+ Data_.push_back(std::move(block));
+ return NUdf::EFetchStatus::Ok;
+ }
+
+ template<typename TGetKey>
+ void BatchLookup(size_t batchSize, std::array<TBlockIndex::TIterator, PrefetchBatchSize>& iterators, TGetKey&& getKey) {
+ Y_ENSURE(batchSize <= PrefetchBatchSize);
+
+ std::array<TRobinHoodBatchRequestItem<ui64>, PrefetchBatchSize> lookupBatch;
+ std::array<std::vector<NYql::NUdf::TBlockItem>, PrefetchBatchSize> itemsBatch;
+
+ for (size_t i = 0; i < batchSize; i++) {
+ const auto& [items, keyHash] = getKey(i);
+ lookupBatch[i].ConstructKey(keyHash);
+ itemsBatch[i] = items;
+ }
+
+ Index_.BatchLookup({lookupBatch.data(), batchSize}, [&](size_t i, TIndexMap::iterator iter) {
+ if (!iter) {
+ // Empty iterator
+ iterators[i] = TIterator(this);
+ return;
}
- Y_ABORT_IF(KeyItems_ == nullptr);
- for (size_t i = 0; i < LeftKeyColumns_.size(); i++) {
- KeyItems_[i] = blockState.GetValue(HolderFactory_, LeftKeyColumns_[i]);
+ auto value = static_cast<TIndexMapValue*>(Index_.GetMutablePayload(iter));
+ if (value->IsInplace()) {
+ iterators[i] = TIterator(this, value->GetEntry(), std::move(itemsBatch[i]));
+ } else {
+ iterators[i] = TIterator(this, value->GetList(), std::move(itemsBatch[i]));
}
- return KeyValue_;
+ });
+ }
+
+ TBlockItem GetItem(TIndexEntry entry, ui32 columnIdx) {
+ Y_ENSURE(columnIdx < Inputs_.size() - 1);
+
+ auto& datum = Data_[entry.BlockOffset][columnIdx];
+ MKQL_ENSURE(datum.is_array(), "Expecting array");
+ return Readers_[columnIdx]->GetItem(*datum.array(), entry.ItemOffset);
+ }
+
+ void GetRow(TIndexEntry entry, const TVector<ui32>& ioMap, std::vector<NYql::NUdf::TBlockItem>& row) {
+ Y_ENSURE(row.size() == ioMap.size());
+ for (size_t i = 0; i < row.size(); i++) {
+ row[i] = GetItem(entry, ioMap[i]);
}
+ }
- NUdf::TUnboxedValue BlockState_;
- NUdf::TUnboxedValue Stream_;
- NUdf::TUnboxedValue Dict_;
- NUdf::TUnboxedValue KeyValue_;
- NUdf::TUnboxedValue* KeyItems_;
+private:
+ ui64 CalculateKeyHash(const std::vector<arrow::Datum>& block, size_t offset) const {
+ ui64 keyHash = 0;
+ for (ui32 keyColumn : KeyColumns_) {
+ auto& datum = block[keyColumn];
+ MKQL_ENSURE(datum.is_array(), "Expecting array");
+
+ auto item = Readers_[keyColumn]->GetItem(*datum.array(), offset);
+ if (!item) {
+ return 0;
+ }
- const TVector<ui32>& LeftKeyColumns_;
- const THolderFactory& HolderFactory_;
- };
+ keyHash = CombineHashes(keyHash, Hashers_[keyColumn]->Hash(item));
+ }
+ return keyHash;
+ }
- void RegisterDependencies() const final {
- this->DependsOn(Stream_);
- this->DependsOn(Dict_);
+ TIndexNode* InsertIndexNode(TIndexEntry entry, TIndexNode* currentHead = nullptr) {
+ return &IndexNodes_.emplace_back(entry, currentHead);
}
- const TVector<TType*> ResultJoinItems_;
- const TVector<TType*> LeftStreamItems_;
- const TVector<ui32> LeftKeyColumns_;
- const TVector<ui32> LeftIOMap_;
- IComputationNode* const Stream_;
- IComputationNode* const Dict_;
- const TContainerCacheOnContext KeyTupleCache_;
+private:
+ const std::vector<arrow::ValueDescr> InputsDescr_;
+ const TVector<ui32>& KeyColumns_;
+
+ TVector<std::unique_ptr<IBlockReader>> Readers_;
+ TVector<NUdf::IBlockItemHasher::TPtr> Hashers_;
+ TVector<NUdf::IBlockItemComparator::TPtr> Comparators_;
+
+ std::vector<std::vector<arrow::Datum>> Data_;
+
+ TIndexMap Index_;
+ std::deque<TIndexNode> IndexNodes_;
+
+ NUdf::TUnboxedValue Stream_;
+ TUnboxedValueVector Inputs_;
};
-template<bool RightRequired, bool IsTuple>
-class TBlockWideMultiMapJoinWrapper : public TMutableComputationNode<TBlockWideMultiMapJoinWrapper<RightRequired, IsTuple>>
+template <bool WithoutRight, bool RightRequired, bool RightAny>
+class TBlockMapJoinCoreWraper : public TMutableComputationNode<TBlockMapJoinCoreWraper<WithoutRight, RightRequired, RightAny>>
{
-using TBaseComputation = TMutableComputationNode<TBlockWideMultiMapJoinWrapper<RightRequired, IsTuple>>;
-using TState = TBlockJoinState<RightRequired>;
+using TBaseComputation = TMutableComputationNode<TBlockMapJoinCoreWraper<WithoutRight, RightRequired, RightAny>>;
+using TJoinState = TBlockJoinState<RightRequired>;
+using TIndexState = TBlockIndex;
public:
- TBlockWideMultiMapJoinWrapper(TComputationMutables& mutables,
- const TVector<TType*>&& resultJoinItems, const TVector<TType*>&& leftStreamItems,
- const TVector<ui32>&& leftKeyColumns, const TVector<ui32>&& leftIOMap,
- IComputationNode* stream, IComputationNode* dict)
+ TBlockMapJoinCoreWraper(
+ TComputationMutables& mutables,
+ const TVector<TType*>&& resultItemTypes,
+ const TVector<TType*>&& leftItemTypes,
+ const TVector<ui32>&& leftKeyColumns,
+ const TVector<ui32>&& leftIOMap,
+ const TVector<TType*>&& rightItemTypes,
+ const TVector<ui32>&& rightKeyColumns,
+ const TVector<ui32>&& rightIOMap,
+ IComputationNode* leftStream,
+ IComputationNode* rightStream
+ )
: TBaseComputation(mutables, EValueRepresentation::Boxed)
- , ResultJoinItems_(std::move(resultJoinItems))
- , LeftStreamItems_(std::move(leftStreamItems))
+ , ResultItemTypes_(std::move(resultItemTypes))
+ , LeftItemTypes_(std::move(leftItemTypes))
, LeftKeyColumns_(std::move(leftKeyColumns))
, LeftIOMap_(std::move(leftIOMap))
- , Stream_(stream)
- , Dict_(dict)
+ , RightItemTypes_(std::move(rightItemTypes))
+ , RightKeyColumns_(std::move(rightKeyColumns))
+ , RightIOMap_(std::move(rightIOMap))
+ , LeftStream_(std::move(leftStream))
+ , RightStream_(std::move(rightStream))
, KeyTupleCache_(mutables)
{}
NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const {
- NUdf::TUnboxedValue* items = nullptr;
- const auto keys = KeyTupleCache_.NewArray(ctx, LeftKeyColumns_.size(), items);
- const auto state = ctx.HolderFactory.Create<TState>(ctx, LeftStreamItems_,
- LeftIOMap_, ResultJoinItems_);
+ const auto joinState = ctx.HolderFactory.Create<TJoinState>(
+ ctx,
+ LeftItemTypes_,
+ LeftIOMap_,
+ ResultItemTypes_
+ );
+ const auto indexState = ctx.HolderFactory.Create<TIndexState>(
+ RightItemTypes_,
+ RightKeyColumns_,
+ std::move(RightStream_->GetValue(ctx))
+ );
+
return ctx.HolderFactory.Create<TStreamValue>(ctx.HolderFactory,
- std::move(state),
- std::move(Stream_->GetValue(ctx)),
- std::move(Dict_->GetValue(ctx)),
+ std::move(joinState),
+ std::move(indexState),
+ std::move(LeftStream_->GetValue(ctx)),
+ LeftItemTypes_,
LeftKeyColumns_,
- std::move(keys), items);
+ std::move(RightStream_->GetValue(ctx)),
+ RightItemTypes_,
+ RightKeyColumns_,
+ RightIOMap_
+ );
}
private:
class TStreamValue : public TComputationValue<TStreamValue> {
using TBase = TComputationValue<TStreamValue>;
public:
- TStreamValue(TMemoryUsageInfo* memInfo, const THolderFactory& holderFactory,
- NUdf::TUnboxedValue&& blockState, NUdf::TUnboxedValue&& stream,
- NUdf::TUnboxedValue&& dict, const TVector<ui32>& leftKeyColumns,
- NUdf::TUnboxedValue&& keyValue, NUdf::TUnboxedValue* keyItems)
+ TStreamValue(
+ TMemoryUsageInfo* memInfo,
+ const THolderFactory& holderFactory,
+ NUdf::TUnboxedValue&& joinState,
+ NUdf::TUnboxedValue&& indexState,
+ NUdf::TUnboxedValue&& leftStream,
+ const TVector<TType*>& leftTypes,
+ const TVector<ui32>& leftKeyColumns,
+ NUdf::TUnboxedValue&& rightStream,
+ const TVector<TType*>& rightTypes,
+ const TVector<ui32>& rightKeyColumns,
+ const TVector<ui32>& rightIOMap
+ )
: TBase(memInfo)
- , BlockState_(blockState)
- , Stream_(stream)
- , Dict_(dict)
- , KeyValue_(keyValue)
- , KeyItems_(keyItems)
- , List_(NUdf::TUnboxedValue::Invalid())
- , Iterator_(NUdf::TUnboxedValue::Invalid())
- , Current_(NUdf::TUnboxedValue::Invalid())
+ , JoinState_(joinState)
+ , IndexState_(indexState)
+ , LeftStream_(leftStream)
+ , LeftItemTypes_(leftTypes)
, LeftKeyColumns_(leftKeyColumns)
+ , RightStream_(rightStream)
+ , RightItemTypes_(rightTypes)
+ , RightKeyColumns_(rightKeyColumns)
+ , RightIOMap_(rightIOMap)
, HolderFactory_(holderFactory)
{}
private:
NUdf::EFetchStatus WideFetch(NUdf::TUnboxedValue* output, ui32 width) {
- auto& blockState = *static_cast<TState*>(BlockState_.AsBoxed().Get());
- auto* inputFields = blockState.GetRawInputFields();
- const size_t inputWidth = blockState.GetInputWidth();
- const size_t outputWidth = blockState.GetOutputWidth();
+ auto& joinState = *static_cast<TJoinState*>(JoinState_.AsBoxed().Get());
+ auto& indexState = *static_cast<TIndexState*>(IndexState_.AsBoxed().Get());
+
+ if (!RightStreamConsumed_) {
+ auto fetchStatus = NUdf::EFetchStatus::Ok;
+ while (fetchStatus != NUdf::EFetchStatus::Finish) {
+ fetchStatus = indexState.FetchStream();
+ if (fetchStatus == NUdf::EFetchStatus::Yield) {
+ return NUdf::EFetchStatus::Yield;
+ }
+ }
+
+ RightStreamConsumed_ = true;
+ }
+
+ auto* inputFields = joinState.GetRawInputFields();
+ const size_t inputWidth = joinState.GetInputWidth();
+ const size_t outputWidth = joinState.GetOutputWidth();
MKQL_ENSURE(width == outputWidth,
"The given width doesn't equal to the result type size");
- while (!blockState.HasBlocks()) {
- if (!Iterator_.IsInvalid()) {
- // Process the remaining items from the iterator.
- while (blockState.IsNotFull() && Iterator_.Next(Current_)) {
- blockState.MakeRow(Current_);
- }
- }
- if (blockState.IsNotFull() && blockState.NextRow()) {
- const auto key = MakeKeysTuple(blockState);
- // Lookup the item in the right dict. If the lookup succeeds,
- // reset the iterator and proceed the execution from the
- // beginning of the outer loop. Otherwise, the iterator is
- // already invalidated (i.e. finished), so the execution will
- // process the next tuple from the left stream.
- if (key && (List_ = Dict_.Lookup(key))) {
- Iterator_ = List_.GetListIterator();
+ std::vector<NYql::NUdf::TBlockItem> leftKeyColumns(LeftKeyColumns_.size());
+ std::vector<ui64> leftKeyColumnHashes(LeftKeyColumns_.size());
+ std::vector<NYql::NUdf::TBlockItem> rightRow(RightIOMap_.size());
+
+ while (!joinState.HasBlocks()) {
+ while (joinState.IsNotFull() && LookupBatchCurrent_ < LookupBatchSize_) {
+ auto& iter = LookupBatchIterators_[LookupBatchCurrent_];
+ if constexpr (WithoutRight) {
+ if (bool(iter.IsEmpty()) != RightRequired) {
+ joinState.CopyRow();
+ }
+
+ joinState.NextRow();
+ LookupBatchCurrent_++;
+ continue;
} else if constexpr (!RightRequired) {
- blockState.MakeRow(NUdf::TUnboxedValue());
+ if (iter.IsEmpty()) {
+ joinState.MakeRow(std::vector<NYql::NUdf::TBlockItem>());
+ joinState.NextRow();
+ LookupBatchCurrent_++;
+ continue;
+ }
}
+
+ while (joinState.IsNotFull() && !iter.IsEmpty()) {
+ auto key = iter.Next();
+ indexState.GetRow(*key, RightIOMap_, rightRow);
+ joinState.MakeRow(rightRow);
+
+ if constexpr (RightAny) {
+ break;
+ }
+ }
+
+ if (RightAny || iter.IsEmpty()) {
+ joinState.NextRow();
+ LookupBatchCurrent_++;
+ }
+ }
+
+ if (joinState.IsNotFull() && joinState.RemainingRowsCount() > 0) {
+ LookupBatchSize_ = std::min(PrefetchBatchSize, static_cast<ui32>(joinState.RemainingRowsCount()));
+ indexState.BatchLookup(LookupBatchSize_, LookupBatchIterators_, [&](size_t i) {
+ MakeLeftKeys(leftKeyColumns, leftKeyColumnHashes, i);
+ ui64 keyHash = CalculateTupleHash(leftKeyColumnHashes);
+ return std::make_pair(std::ref(leftKeyColumns), keyHash);
+ });
+
+ LookupBatchCurrent_ = 0;
continue;
}
- if (blockState.IsNotFull() && !blockState.IsFinished()) {
- switch (Stream_.WideFetch(inputFields, inputWidth)) {
+
+ if (joinState.IsNotFull() && !joinState.IsFinished()) {
+ switch (LeftStream_.WideFetch(inputFields, inputWidth)) {
case NUdf::EFetchStatus::Yield:
return NUdf::EFetchStatus::Yield;
case NUdf::EFetchStatus::Ok:
- blockState.Reset();
+ joinState.Reset();
continue;
case NUdf::EFetchStatus::Finish:
- blockState.Finish();
+ joinState.Finish();
break;
}
// Leave the loop, if no values left in the stream.
- Y_DEBUG_ABORT_UNLESS(blockState.IsFinished());
+ Y_DEBUG_ABORT_UNLESS(joinState.IsFinished());
}
- if (blockState.IsEmpty()) {
+ if (joinState.IsEmpty()) {
return NUdf::EFetchStatus::Finish;
}
- blockState.MakeBlocks(HolderFactory_);
+ joinState.MakeBlocks(HolderFactory_);
}
- const auto sliceSize = blockState.Slice();
+ const auto sliceSize = joinState.Slice();
for (size_t i = 0; i < outputWidth; i++) {
- output[i] = blockState.Get(sliceSize, HolderFactory_, i);
+ output[i] = joinState.Get(sliceSize, HolderFactory_, i);
}
return NUdf::EFetchStatus::Ok;
}
- NUdf::TUnboxedValue MakeKeysTuple(const TState& state) const {
- // TODO: Handle converters.
- if constexpr (!IsTuple) {
- return state.GetValue(HolderFactory_, LeftKeyColumns_.front());
- }
+ void MakeLeftKeys(std::vector<NYql::NUdf::TBlockItem>& items, std::vector<ui64>& hashes, size_t offset) const {
+ auto& joinState = *static_cast<TJoinState*>(JoinState_.AsBoxed().Get());
- Y_ABORT_IF(KeyItems_ == nullptr);
+ Y_ENSURE(items.size() == LeftKeyColumns_.size());
+ Y_ENSURE(hashes.size() == LeftKeyColumns_.size());
for (size_t i = 0; i < LeftKeyColumns_.size(); i++) {
- KeyItems_[i] = state.GetValue(HolderFactory_, LeftKeyColumns_[i]);
+ std::tie(items[i], hashes[i]) = joinState.GetItemWithHash(LeftKeyColumns_[i], offset);
}
- return KeyValue_;
}
- NUdf::TUnboxedValue BlockState_;
- NUdf::TUnboxedValue Stream_;
- NUdf::TUnboxedValue Dict_;
- NUdf::TUnboxedValue KeyValue_;
- NUdf::TUnboxedValue* KeyItems_;
-
- NUdf::TUnboxedValue List_;
- NUdf::TUnboxedValue Iterator_;
- NUdf::TUnboxedValue Current_;
+ NUdf::TUnboxedValue JoinState_;
+ NUdf::TUnboxedValue IndexState_;
+ NUdf::TUnboxedValue LeftStream_;
+ const TVector<TType*>& LeftItemTypes_;
const TVector<ui32>& LeftKeyColumns_;
+
+ NUdf::TUnboxedValue RightStream_;
+ const TVector<TType*>& RightItemTypes_;
+ const TVector<ui32>& RightKeyColumns_;
+ const TVector<ui32>& RightIOMap_;
+ bool RightStreamConsumed_ = false;
+
+ std::array<TBlockIndex::TIterator, PrefetchBatchSize> LookupBatchIterators_;
+ ui32 LookupBatchCurrent_ = 0;
+ ui32 LookupBatchSize_ = 0;
+
const THolderFactory& HolderFactory_;
};
void RegisterDependencies() const final {
- this->DependsOn(Stream_);
- this->DependsOn(Dict_);
+ this->DependsOn(LeftStream_);
+ this->DependsOn(RightStream_);
}
- const TVector<TType*> ResultJoinItems_;
- const TVector<TType*> LeftStreamItems_;
+private:
+ const TVector<TType*> ResultItemTypes_;
+
+ const TVector<TType*> LeftItemTypes_;
const TVector<ui32> LeftKeyColumns_;
const TVector<ui32> LeftIOMap_;
- IComputationNode* const Stream_;
- IComputationNode* const Dict_;
+
+ const TVector<TType*> RightItemTypes_;
+ const TVector<ui32> RightKeyColumns_;
+ const TVector<ui32> RightIOMap_;
+
+ IComputationNode* const LeftStream_;
+ IComputationNode* const RightStream_;
+
const TContainerCacheOnContext KeyTupleCache_;
};
} // namespace
IComputationNode* WrapBlockMapJoinCore(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
- MKQL_ENSURE(callable.GetInputsCount() == 5, "Expected 5 args");
+ MKQL_ENSURE(callable.GetInputsCount() == 8, "Expected 8 args");
const auto joinType = callable.GetType()->GetReturnType();
MKQL_ENSURE(joinType->IsStream(), "Expected WideStream as a resulting stream");
@@ -510,16 +896,14 @@ IComputationNode* WrapBlockMapJoinCore(TCallable& callable, const TComputationNo
MKQL_ENSURE(leftStreamComponents.size() > 0, "Expected at least one column");
const TVector<TType*> leftStreamItems(leftStreamComponents.cbegin(), leftStreamComponents.cend());
- const auto rightDictNode = callable.GetInput(1);
- MKQL_ENSURE(rightDictNode.GetStaticType()->IsDict(),
- "Expected Dict as a right join part");
- const auto rightDictType = AS_TYPE(TDictType, rightDictNode)->GetPayloadType();
- const auto isMulti = rightDictType->IsList();
- const auto rightDictItemType = isMulti
- ? AS_TYPE(TListType, rightDictType)->GetItemType()
- : rightDictType;
- MKQL_ENSURE(rightDictItemType->IsVoid() || rightDictItemType->IsTuple(),
- "Expected Void or Tuple as a right dict item type");
+ const auto rightType = callable.GetInput(1).GetStaticType();
+ MKQL_ENSURE(rightType->IsStream(), "Expected WideStream as a right stream");
+ const auto rightStreamType = AS_TYPE(TStreamType, rightType);
+ MKQL_ENSURE(rightStreamType->GetItemType()->IsMulti(),
+ "Expected Multi as a right stream item type");
+ const auto rightStreamComponents = GetWideComponents(rightStreamType);
+ MKQL_ENSURE(rightStreamComponents.size() > 0, "Expected at least one column");
+ const TVector<TType*> rightStreamItems(rightStreamComponents.cbegin(), rightStreamComponents.cend());
const auto joinKindNode = callable.GetInput(2);
const auto rawKind = AS_VALUE(TDataLiteral, joinKindNode)->AsValue().Get<ui32>();
@@ -527,34 +911,61 @@ IComputationNode* WrapBlockMapJoinCore(TCallable& callable, const TComputationNo
Y_ENSURE(joinKind == EJoinKind::Inner || joinKind == EJoinKind::Left ||
joinKind == EJoinKind::LeftSemi || joinKind == EJoinKind::LeftOnly);
- const auto keyColumnsLiteral = callable.GetInput(3);
- const auto keyColumnsTuple = AS_VALUE(TTupleLiteral, keyColumnsLiteral);
+ const auto leftKeyColumnsLiteral = callable.GetInput(3);
+ const auto leftKeyColumnsTuple = AS_VALUE(TTupleLiteral, leftKeyColumnsLiteral);
TVector<ui32> leftKeyColumns;
- leftKeyColumns.reserve(keyColumnsTuple->GetValuesCount());
- for (ui32 i = 0; i < keyColumnsTuple->GetValuesCount(); i++) {
- const auto item = AS_VALUE(TDataLiteral, keyColumnsTuple->GetValue(i));
+ leftKeyColumns.reserve(leftKeyColumnsTuple->GetValuesCount());
+ for (ui32 i = 0; i < leftKeyColumnsTuple->GetValuesCount(); i++) {
+ const auto item = AS_VALUE(TDataLiteral, leftKeyColumnsTuple->GetValue(i));
leftKeyColumns.emplace_back(item->AsValue().Get<ui32>());
}
- const bool isTupleKey = leftKeyColumns.size() > 1;
+ const THashSet<ui32> leftKeySet(leftKeyColumns.cbegin(), leftKeyColumns.cend());
- const auto keyDropsLiteral = callable.GetInput(4);
- const auto keyDropsTuple = AS_VALUE(TTupleLiteral, keyDropsLiteral);
+ const auto leftKeyDropsLiteral = callable.GetInput(4);
+ const auto leftKeyDropsTuple = AS_VALUE(TTupleLiteral, leftKeyDropsLiteral);
THashSet<ui32> leftKeyDrops;
- leftKeyDrops.reserve(keyDropsTuple->GetValuesCount());
- for (ui32 i = 0; i < keyDropsTuple->GetValuesCount(); i++) {
- const auto item = AS_VALUE(TDataLiteral, keyDropsTuple->GetValue(i));
+ leftKeyDrops.reserve(leftKeyDropsTuple->GetValuesCount());
+ for (ui32 i = 0; i < leftKeyDropsTuple->GetValuesCount(); i++) {
+ const auto item = AS_VALUE(TDataLiteral, leftKeyDropsTuple->GetValue(i));
leftKeyDrops.emplace(item->AsValue().Get<ui32>());
}
- const THashSet<ui32> leftKeySet(leftKeyColumns.cbegin(), leftKeyColumns.cend());
for (const auto& drop : leftKeyDrops) {
MKQL_ENSURE(leftKeySet.contains(drop),
"Only key columns has to be specified in drop column set");
+ }
+ const auto rightKeyColumnsLiteral = callable.GetInput(5);
+ const auto rightKeyColumnsTuple = AS_VALUE(TTupleLiteral, rightKeyColumnsLiteral);
+ TVector<ui32> rightKeyColumns;
+ rightKeyColumns.reserve(rightKeyColumnsTuple->GetValuesCount());
+ for (ui32 i = 0; i < rightKeyColumnsTuple->GetValuesCount(); i++) {
+ const auto item = AS_VALUE(TDataLiteral, rightKeyColumnsTuple->GetValue(i));
+ rightKeyColumns.emplace_back(item->AsValue().Get<ui32>());
+ }
+ const THashSet<ui32> rightKeySet(rightKeyColumns.cbegin(), rightKeyColumns.cend());
+
+ const auto rightKeyDropsLiteral = callable.GetInput(6);
+ const auto rightKeyDropsTuple = AS_VALUE(TTupleLiteral, rightKeyDropsLiteral);
+ THashSet<ui32> rightKeyDrops;
+ rightKeyDrops.reserve(rightKeyDropsTuple->GetValuesCount());
+ for (ui32 i = 0; i < rightKeyDropsTuple->GetValuesCount(); i++) {
+ const auto item = AS_VALUE(TDataLiteral, rightKeyDropsTuple->GetValue(i));
+ rightKeyDrops.emplace(item->AsValue().Get<ui32>());
}
- TVector<ui32> leftIOMap;
+ for (const auto& drop : rightKeyDrops) {
+ MKQL_ENSURE(rightKeySet.contains(drop),
+ "Only key columns has to be specified in drop column set");
+ }
+
+ MKQL_ENSURE(leftKeyColumns.size() == rightKeyColumns.size(), "Key columns mismatch");
+
+ const auto rightAnyNode = callable.GetInput(7);
+ const auto rightAny = AS_VALUE(TDataLiteral, rightAnyNode)->AsValue().Get<bool>();
+
// XXX: Mind the last wide item, containing block length.
+ TVector<ui32> leftIOMap;
for (size_t i = 0; i < leftStreamItems.size() - 1; i++) {
if (leftKeyDrops.contains(i)) {
continue;
@@ -562,51 +973,70 @@ IComputationNode* WrapBlockMapJoinCore(TCallable& callable, const TComputationNo
leftIOMap.push_back(i);
}
- const auto stream = LocateNode(ctx.NodeLocator, callable, 0);
- const auto dict = LocateNode(ctx.NodeLocator, callable, 1);
-
-#define DISPATCH_JOIN(IS_TUPLE) do { \
- switch (joinKind) { \
- case EJoinKind::Inner: \
- if (isMulti) { \
- return new TBlockWideMultiMapJoinWrapper<true, IS_TUPLE>(ctx.Mutables, \
- std::move(joinItems), std::move(leftStreamItems), \
- std::move(leftKeyColumns), std::move(leftIOMap), stream, dict); \
- } \
- return new TBlockWideMapJoinWrapper<false, true, IS_TUPLE>(ctx.Mutables, \
- std::move(joinItems), std::move(leftStreamItems), \
- std::move(leftKeyColumns), std::move(leftIOMap), stream, dict); \
- case EJoinKind::Left: \
- if (isMulti) { \
- return new TBlockWideMultiMapJoinWrapper<false, IS_TUPLE>(ctx.Mutables, \
- std::move(joinItems), std::move(leftStreamItems), \
- std::move(leftKeyColumns), std::move(leftIOMap), stream, dict); \
- } \
- return new TBlockWideMapJoinWrapper<false, false, IS_TUPLE>(ctx.Mutables, \
- std::move(joinItems), std::move(leftStreamItems), \
- std::move(leftKeyColumns), std::move(leftIOMap), stream, dict); \
- case EJoinKind::LeftSemi: \
- return new TBlockWideMapJoinWrapper<true, true, IS_TUPLE>(ctx.Mutables, \
- std::move(joinItems), std::move(leftStreamItems), \
- std::move(leftKeyColumns), std::move(leftIOMap), stream, dict); \
- case EJoinKind::LeftOnly: \
- return new TBlockWideMapJoinWrapper<true, false, IS_TUPLE>(ctx.Mutables, \
- std::move(joinItems), std::move(leftStreamItems), \
- std::move(leftKeyColumns), std::move(leftIOMap), stream, dict); \
- default: \
- /* TODO: Display the human-readable join kind name. */ \
- MKQL_ENSURE(false, "BlockMapJoinCore doesn't support join type #" \
- << static_cast<ui32>(joinKind)); \
- } \
-} while(0)
-
- if (isTupleKey) {
- DISPATCH_JOIN(true);
+ // XXX: Mind the last wide item, containing block length.
+ TVector<ui32> rightIOMap;
+ if (joinKind == EJoinKind::Inner || joinKind == EJoinKind::Left) {
+ for (size_t i = 0; i < rightStreamItems.size() - 1; i++) {
+ if (rightKeyDrops.contains(i)) {
+ continue;
+ }
+ rightIOMap.push_back(i);
+ }
} else {
- DISPATCH_JOIN(false);
+ MKQL_ENSURE(rightKeyDrops.empty(), "Right key drops are not allowed for semi/only join");
+ }
+
+ const auto leftStream = LocateNode(ctx.NodeLocator, callable, 0);
+ const auto rightStream = LocateNode(ctx.NodeLocator, callable, 1);
+
+#define JOIN_WRAPPER(WITHOUT_RIGHT, RIGHT_REQUIRED, RIGHT_ANY) \
+ return new TBlockMapJoinCoreWraper<WITHOUT_RIGHT, RIGHT_REQUIRED, RIGHT_ANY>( \
+ ctx.Mutables, \
+ std::move(joinItems), \
+ std::move(leftStreamItems), \
+ std::move(leftKeyColumns), \
+ std::move(leftIOMap), \
+ std::move(rightStreamItems), \
+ std::move(rightKeyColumns), \
+ std::move(rightIOMap), \
+ leftStream, \
+ rightStream \
+ )
+
+ switch (joinKind) {
+ case EJoinKind::Inner:
+ if (rightAny) {
+ JOIN_WRAPPER(false, true, true);
+ } else {
+ JOIN_WRAPPER(false, true, false);
+ }
+ case EJoinKind::Left:
+ if (rightAny) {
+ JOIN_WRAPPER(false, false, true);
+ } else {
+ JOIN_WRAPPER(false, false, false);
+ }
+ case EJoinKind::LeftSemi:
+ MKQL_ENSURE(rightIOMap.empty(), "Can't access right table on left semi join");
+ if (rightAny) {
+ JOIN_WRAPPER(true, true, true);
+ } else {
+ JOIN_WRAPPER(true, true, false);
+ }
+ case EJoinKind::LeftOnly:
+ MKQL_ENSURE(rightIOMap.empty(), "Can't access right table on left only join");
+ if (rightAny) {
+ JOIN_WRAPPER(true, false, true);
+ } else {
+ JOIN_WRAPPER(true, false, false);
+ }
+ default:
+ /* TODO: Display the human-readable join kind name. */
+ MKQL_ENSURE(false, "BlockMapJoinCore doesn't support join type #"
+ << static_cast<ui32>(joinKind));
}
-#undef DISPATCH_JOIN
+#undef JOIN_WRAPPER
}
} // namespace NMiniKQL
diff --git a/yql/essentials/minikql/comp_nodes/mkql_rh_hash.h b/yql/essentials/minikql/comp_nodes/mkql_rh_hash.h
index 1909b2e714..f5c40a6944 100644
--- a/yql/essentials/minikql/comp_nodes/mkql_rh_hash.h
+++ b/yql/essentials/minikql/comp_nodes/mkql_rh_hash.h
@@ -31,7 +31,7 @@ struct TRobinHoodBatchRequestItem {
void ConstructKey(const TKey& key) {
new (KeyStorage) TKey(key);
}
-
+
// intermediate data
ui64 Hash;
char* InitialIterator;
@@ -114,6 +114,14 @@ public:
}
}
+ // returns iterator or nullptr if key is not present
+ Y_FORCE_INLINE char* Lookup(TKey key) {
+ auto hash = HashLocal(key);
+ auto ptr = MakeIterator(hash, Data, CapacityShift);
+ auto ret = LookupImpl(key, hash, Data, DataEnd, ptr);
+ return ret;
+ }
+
template <typename TSink>
Y_NO_INLINE void BatchInsert(std::span<TRobinHoodBatchRequestItem<TKey>> batchRequest, TSink&& sink) {
while (2 * (Size + batchRequest.size()) >= Capacity) {
@@ -136,6 +144,22 @@ public:
}
}
+ template <typename TSink>
+ Y_NO_INLINE void BatchLookup(std::span<TRobinHoodBatchRequestItem<TKey>> batchRequest, TSink&& sink) {
+ for (size_t i = 0; i < batchRequest.size(); ++i) {
+ auto& r = batchRequest[i];
+ r.Hash = HashLocal(r.GetKey());
+ r.InitialIterator = MakeIterator(r.Hash, Data, CapacityShift);
+ NYql::PrefetchForRead(r.InitialIterator);
+ }
+
+ for (size_t i = 0; i < batchRequest.size(); ++i) {
+ auto& r = batchRequest[i];
+ auto iter = LookupImpl(r.GetKey(), r.Hash, Data, DataEnd, r.InitialIterator);
+ sink(i, iter);
+ }
+ }
+
ui64 GetCapacity() const {
return Capacity;
}
@@ -215,7 +239,7 @@ private:
};
Y_FORCE_INLINE char* MakeIterator(const ui64 hash, char* data, ui64 capacityShift) {
- // https://probablydance.com/2018/06/16/fibonacci-hashing-the-optimization-that-the-world-forgot-or-a-better-alternative-to-integer-modulo/
+ // https://probablydance.com/2018/06/16/fibonacci-hashing-the-optimization-that-the-world-forgot-or-a-better-alternative-to-integer-modulo/
ui64 bucket = ((SelfHash ^ hash) * 11400714819323198485llu) >> capacityShift;
char* ptr = data + AsDeriv().GetCellSize() * bucket;
return ptr;
@@ -283,6 +307,29 @@ private:
}
}
+ Y_FORCE_INLINE char* LookupImpl(TKey key, const ui64 hash, char* data, char* dataEnd, char* ptr) {
+ i32 currDistance = 0;
+ for (;;) {
+ auto& pslPtr = GetPSL(ptr);
+ if (pslPtr.Distance < 0 || currDistance > pslPtr.Distance) {
+ return nullptr;
+ }
+
+ if constexpr (CacheHash) {
+ if (pslPtr.Hash == hash && EqualLocal(GetKey(ptr), key)) {
+ return ptr;
+ }
+ } else {
+ if (EqualLocal(GetKey(ptr), key)) {
+ return ptr;
+ }
+ }
+
+ ++currDistance;
+ AdvancePointer(ptr, data, dataEnd);
+ }
+ }
+
Y_NO_INLINE void Grow() {
ui64 growFactor;
if (Capacity < 100'000) {
diff --git a/yql/essentials/minikql/comp_nodes/ut/mkql_block_map_join_ut.cpp b/yql/essentials/minikql/comp_nodes/ut/mkql_block_map_join_ut.cpp
index 1a7546eeed..f1cb1ea1aa 100644
--- a/yql/essentials/minikql/comp_nodes/ut/mkql_block_map_join_ut.cpp
+++ b/yql/essentials/minikql/comp_nodes/ut/mkql_block_map_join_ut.cpp
@@ -1,1985 +1,995 @@
+#include "mkql_block_map_join_ut_utils.h"
#include "mkql_computation_node_ut.h"
-#include <arrow/array/builder_binary.h>
-#include <arrow/array/builder_primitive.h>
-#include <arrow/compute/kernel.h>
-#include <yql/essentials/minikql/computation/mkql_block_builder.h>
-#include <yql/essentials/minikql/computation/mkql_block_impl.h>
-#include <yql/essentials/minikql/computation/mkql_block_reader.h>
-#include <yql/essentials/minikql/computation/mkql_computation_node_holders.h>
#include <yql/essentials/minikql/mkql_node_cast.h>
-#include <yql/essentials/public/udf/arrow/udf_arrow_helpers.h>
namespace NKikimr {
namespace NMiniKQL {
namespace {
-const TRuntimeNode MakeSet(TProgramBuilder& pgmBuilder,
- const TVector<const TRuntimeNode>& keys
-) {
- const auto keysList = keys.size() > 1 ? pgmBuilder.Zip(keys) : keys.front();
-
- return pgmBuilder.ToHashedDict(keysList, false,
- [&](TRuntimeNode item) {
- return item;
- }, [&](TRuntimeNode) {
- return pgmBuilder.NewVoid();
- });
-}
-
-const TRuntimeNode MakeDict(TProgramBuilder& pgmBuilder,
- const TVector<const TRuntimeNode>& keys,
- const TVector<const TRuntimeNode>& payloads
-) {
- const auto keysList = keys.size() > 1 ? pgmBuilder.Zip(keys) : keys.front();
- // TODO: Process containers properly. Now just use Zip to pack
- // the data type in a tuple.
- TVector<const TRuntimeNode> wrappedPayloads;
- std::transform(payloads.cbegin(), payloads.cend(), std::back_inserter(wrappedPayloads),
- [&](const auto payload) {
- return pgmBuilder.Zip({payload});
- });
- TVector<const TRuntimeNode> pairsChunks;
- std::transform(wrappedPayloads.cbegin(), wrappedPayloads.cend(), std::back_inserter(pairsChunks),
- [&](const auto payload) {
- return pgmBuilder.Zip({keysList, payload});
- });
- const auto pairsList = pgmBuilder.Extend(pairsChunks);
-
- return pgmBuilder.ToHashedDict(pairsList, payloads.size() > 1,
- [&](TRuntimeNode item) {
- return pgmBuilder.Nth(item, 0);
- }, [&](TRuntimeNode item) {
- return pgmBuilder.Nth(item, 1);
- });
-}
-
-// XXX: Copy-pasted from program builder sources. Adjusted on demand.
-const std::vector<TType*> ValidateBlockStreamType(const TType* streamType) {
- const auto wideComponents = GetWideComponents(AS_TYPE(TStreamType, streamType));
- Y_ENSURE(wideComponents.size() > 0, "Expected at least one column");
- std::vector<TType*> items;
- items.reserve(wideComponents.size());
- // XXX: Declare these variables outside the loop body to use for the last
- // item (i.e. block length column) in the assertions below.
- bool isScalar;
- TType* itemType;
- for (const auto& wideComponent : wideComponents) {
- auto blockType = AS_TYPE(TBlockType, wideComponent);
- isScalar = blockType->GetShape() == TBlockType::EShape::Scalar;
- itemType = blockType->GetItemType();
- items.push_back(blockType);
- }
-
- Y_ENSURE(isScalar, "Last column should be scalar");
- Y_ENSURE(AS_TYPE(TDataType, itemType)->GetSchemeType() == NUdf::TDataType<ui64>::Id, "Expected Uint64");
- return items;
-}
-
-bool IsOptionalOrNull(const TType* type) {
- return type->IsOptional() || type->IsNull() || type->IsPg();
-}
-
-const TRuntimeNode BuildBlockJoin(TProgramBuilder& pgmBuilder, EJoinKind joinKind,
- const TVector<ui32>& leftKeyColumns, const TVector<ui32>& leftKeyDrops,
- TRuntimeNode& leftArg, TType* leftTuple, const TRuntimeNode& dictNode
-) {
- // 1. Make left argument node.
- const auto tupleType = AS_TYPE(TTupleType, leftTuple);
- const auto listTupleType = pgmBuilder.NewListType(leftTuple);
- leftArg = pgmBuilder.Arg(listTupleType);
-
- // 2. Make left wide stream node.
- const auto leftWideStream = pgmBuilder.FromFlow(pgmBuilder.ExpandMap(pgmBuilder.ToFlow(leftArg),
+// List<Tuple<...>> -> Stream<Multi<...>>
+TRuntimeNode ToWideStream(TProgramBuilder& pgmBuilder, TRuntimeNode list) {
+ auto wideFlow = pgmBuilder.ExpandMap(pgmBuilder.ToFlow(list),
[&](TRuntimeNode tupleNode) -> TRuntimeNode::TList {
+ TTupleType* tupleType = AS_TYPE(TTupleType, tupleNode.GetStaticType());
TRuntimeNode::TList wide;
wide.reserve(tupleType->GetElementsCount());
for (size_t i = 0; i < tupleType->GetElementsCount(); i++) {
wide.emplace_back(pgmBuilder.Nth(tupleNode, i));
}
return wide;
- }));
-
- // 3. Calculate the resulting join type.
- const auto leftStreamItems = ValidateBlockStreamType(leftWideStream.GetStaticType());
- const THashSet<ui32> leftKeyDropsSet(leftKeyDrops.cbegin(), leftKeyDrops.cend());
- TVector<TType*> returnJoinItems;
- for (size_t i = 0; i < leftStreamItems.size(); i++) {
- if (leftKeyDropsSet.contains(i)) {
- continue;
- }
- returnJoinItems.push_back(leftStreamItems[i]);
- }
-
- const auto payloadType = AS_TYPE(TDictType, dictNode.GetStaticType())->GetPayloadType();
- const auto payloadItemType = payloadType->IsList()
- ? AS_TYPE(TListType, payloadType)->GetItemType()
- : payloadType;
- if (joinKind == EJoinKind::Inner || joinKind == EJoinKind::Left) {
- // XXX: This is the contract ensured by the expression compiler and
- // optimizers to ease the processing of the dict payload in wide context.
- Y_ENSURE(payloadItemType->IsTuple(), "Dict payload has to be a Tuple");
- const auto payloadItems = AS_TYPE(TTupleType, payloadItemType)->GetElements();
- TVector<TType*> dictBlockItems;
- dictBlockItems.reserve(payloadItems.size());
- for (const auto& payloadItem : payloadItems) {
- MKQL_ENSURE(!payloadItem->IsBlock(), "Dict payload item has to be non-block");
- const auto itemType = joinKind == EJoinKind::Inner ? payloadItem
- : IsOptionalOrNull(payloadItem) ? payloadItem
- : pgmBuilder.NewOptionalType(payloadItem);
- dictBlockItems.emplace_back(pgmBuilder.NewBlockType(itemType, TBlockType::EShape::Many));
}
- // Block length column has to be the last column in wide block stream item,
- // so all contents of the dict payload should be appended to the resulting
- // wide type before the block size column.
- const auto blockLenPos = std::prev(returnJoinItems.end());
- returnJoinItems.insert(blockLenPos, dictBlockItems.cbegin(), dictBlockItems.cend());
- } else {
- // XXX: This is the contract ensured by the expression compiler and
- // optimizers for join types that don't require the right (i.e. dict) part.
- Y_ENSURE(payloadItemType->IsVoid(), "Dict payload has to be Void");
- }
- TType* returnJoinType = pgmBuilder.NewStreamType(pgmBuilder.NewMultiType(returnJoinItems));
+ );
- // 4. Build BlockMapJoinCore node.
- const auto joinNode = pgmBuilder.BlockMapJoinCore(leftWideStream, dictNode, joinKind,
- leftKeyColumns, leftKeyDrops,
- returnJoinType);
-
- // 5. Build the root node with list of tuples.
- const auto joinItems = GetWideComponents(AS_TYPE(TStreamType, joinNode.GetStaticType()));
- const auto resultType = AS_TYPE(TTupleType, pgmBuilder.NewTupleType(joinItems));
+ return pgmBuilder.FromFlow(wideFlow);
+}
- const auto rootNode = pgmBuilder.Collect(pgmBuilder.FromFlow(pgmBuilder.NarrowMap(pgmBuilder.ToFlow(joinNode),
+// Stream<Multi<...>> -> List<Tuple<...>>
+TRuntimeNode FromWideStream(TProgramBuilder& pgmBuilder, TRuntimeNode stream) {
+ return pgmBuilder.Collect(pgmBuilder.NarrowMap(pgmBuilder.ToFlow(stream),
[&](TRuntimeNode::TList items) -> TRuntimeNode {
TVector<TRuntimeNode> tupleElements;
- tupleElements.reserve(resultType->GetElementsCount());
- for (size_t i = 0; i < resultType->GetElementsCount(); i++) {
+ tupleElements.reserve(items.size());
+ for (size_t i = 0; i < items.size(); i++) {
tupleElements.emplace_back(items[i]);
}
return pgmBuilder.NewTuple(tupleElements);
- })));
-
- return rootNode;
+ })
+ );
}
-NUdf::TUnboxedValuePod ToBlocks(TComputationContext& ctx, size_t blockSize,
- const TArrayRef<TType* const> types, const NUdf::TUnboxedValuePod& values
+TRuntimeNode BuildBlockJoin(TProgramBuilder& pgmBuilder, EJoinKind joinKind,
+ TRuntimeNode leftList, const TVector<ui32>& leftKeyColumns, const TVector<ui32>& leftKeyDrops,
+ TRuntimeNode rightList, const TVector<ui32>& rightKeyColumns, const TVector<ui32>& rightKeyDrops, bool rightAny
) {
- const auto maxLength = CalcBlockLen(std::accumulate(types.cbegin(), types.cend(), 0ULL,
- [](size_t max, const TType* type) {
- return std::max(max, CalcMaxBlockItemSize(type));
- }));
- TVector<std::unique_ptr<IArrayBuilder>> builders;
- std::transform(types.cbegin(), types.cend(), std::back_inserter(builders),
- [&](const auto& type) {
- return MakeArrayBuilder(TTypeInfoHelper(), type, ctx.ArrowMemoryPool,
- maxLength, &ctx.Builder->GetPgBuilder());
- });
-
- const auto& holderFactory = ctx.HolderFactory;
- const size_t width = types.size();
- const size_t total = values.GetListLength();
- NUdf::TUnboxedValue iterator = values.GetListIterator();
- NUdf::TUnboxedValue current;
- size_t converted = 0;
- TDefaultListRepresentation listValues;
- while (converted < total) {
- for (size_t i = 0; i < blockSize && iterator.Next(current); i++, converted++) {
- for (size_t j = 0; j < builders.size(); j++) {
- const NUdf::TUnboxedValuePod& item = current.GetElement(j);
- builders[j]->Add(item);
- }
- }
- NUdf::TUnboxedValue* items = nullptr;
- const auto tuple = holderFactory.CreateDirectArrayHolder(width + 1, items);
- for (size_t i = 0; i < width; i++) {
- items[i] = holderFactory.CreateArrowBlock(builders[i]->Build(converted >= total));
- }
- items[width] = MakeBlockCount(holderFactory, blockSize);
- listValues = listValues.Append(std::move(tuple));
- }
- return holderFactory.CreateDirectListHolder(std::move(listValues));
-}
+ const auto leftStream = ThrottleStream(pgmBuilder, ToWideStream(pgmBuilder, leftList));
+ const auto rightStream = ThrottleStream(pgmBuilder, ToWideStream(pgmBuilder, rightList));
-NUdf::TUnboxedValuePod FromBlocks(TComputationContext& ctx,
- const TArrayRef<TType* const> types, const NUdf::TUnboxedValuePod& values
-) {
- TVector<std::unique_ptr<IBlockReader>> readers;
- TVector<std::unique_ptr<IBlockItemConverter>> converters;
- for (const auto& type : types) {
- const auto blockItemType = AS_TYPE(TBlockType, type)->GetItemType();
- readers.push_back(MakeBlockReader(TTypeInfoHelper(), blockItemType));
- converters.push_back(MakeBlockItemConverter(TTypeInfoHelper(), blockItemType,
- ctx.Builder->GetPgBuilder()));
+ const auto leftStreamItems = ValidateBlockStreamType(leftStream.GetStaticType());
+ const auto rightStreamItems = ValidateBlockStreamType(rightStream.GetStaticType());
+
+ TVector<TType*> joinReturnItems;
+
+ const THashSet<ui32> leftKeyDropsSet(leftKeyDrops.cbegin(), leftKeyDrops.cend());
+ for (size_t i = 0; i < leftStreamItems.size() - 1; i++) { // Excluding block size
+ if (leftKeyDropsSet.contains(i)) {
+ continue;
+ }
+ joinReturnItems.push_back(pgmBuilder.NewBlockType(leftStreamItems[i], TBlockType::EShape::Many));
}
- const auto& holderFactory = ctx.HolderFactory;
- const size_t width = types.size() - 1;
- TDefaultListRepresentation listValues;
- NUdf::TUnboxedValue iterator = values.GetListIterator();
- NUdf::TUnboxedValue current;
- while (iterator.Next(current)) {
- const auto blockLengthValue = current.GetElement(width);
- const auto blockLengthDatum = TArrowBlock::From(blockLengthValue).GetDatum();
- Y_ENSURE(blockLengthDatum.is_scalar());
- const auto blockLength = blockLengthDatum.scalar_as<arrow::UInt64Scalar>().value;
- for (size_t i = 0; i < blockLength; i++) {
- NUdf::TUnboxedValue* items = nullptr;
- const auto tuple = holderFactory.CreateDirectArrayHolder(width, items);
- for (size_t j = 0; j < width; j++) {
- const auto arrayValue = current.GetElement(j);
- const auto arrayDatum = TArrowBlock::From(arrayValue).GetDatum();
- UNIT_ASSERT(arrayDatum.is_array());
- const auto blockItem = readers[j]->GetItem(*arrayDatum.array(), i);
- items[j] = converters[j]->MakeValue(blockItem, holderFactory);
+ if (joinKind != EJoinKind::LeftSemi && joinKind != EJoinKind::LeftOnly) {
+ const THashSet<ui32> rightKeyDropsSet(rightKeyDrops.cbegin(), rightKeyDrops.cend());
+ for (size_t i = 0; i < rightStreamItems.size() - 1; i++) { // Excluding block size
+ if (rightKeyDropsSet.contains(i)) {
+ continue;
}
- listValues = listValues.Append(std::move(tuple));
+
+ joinReturnItems.push_back(pgmBuilder.NewBlockType(
+ joinKind == EJoinKind::Inner ? rightStreamItems[i]
+ : IsOptionalOrNull(rightStreamItems[i]) ? rightStreamItems[i]
+ : pgmBuilder.NewOptionalType(rightStreamItems[i]),
+ TBlockType::EShape::Many
+ ));
}
}
- return holderFactory.CreateDirectListHolder(std::move(listValues));
-}
-NUdf::TUnboxedValue DoTestBlockJoin(TSetup<false>& setup,
- const TType* leftType, const NUdf::TUnboxedValue& leftListValue,
- const TVector<ui32>& leftKeyColumns, const TVector<ui32>& leftKeyDrops,
- const TRuntimeNode& rightNode, EJoinKind joinKind, size_t blockSize
-) {
- TProgramBuilder& pb = *setup.PgmBuilder;
-
- // 1. Prepare block type for the input produced by the left node.
- Y_ENSURE(leftType->IsList(), "Left node has to be list");
- const auto leftListType = AS_TYPE(TListType, leftType)->GetItemType();
- Y_ENSURE(leftListType->IsTuple(), "List item has to be tuple");
- const auto leftItems = AS_TYPE(TTupleType, leftListType)->GetElements();
- const auto ui64Type = pb.NewDataType(NUdf::TDataType<ui64>::Id);
- const auto blockLenType = pb.NewBlockType(ui64Type, TBlockType::EShape::Scalar);
- TVector<TType*> leftBlockItems;
- std::transform(leftItems.cbegin(), leftItems.cend(), std::back_inserter(leftBlockItems),
- [&](const auto& itemType) {
- return pb.NewBlockType(itemType, TBlockType::EShape::Many);
- });
- // XXX: Mind the last block length column.
- leftBlockItems.push_back(blockLenType);
- const auto leftBlockType = pb.NewTupleType(leftBlockItems);
-
- // 2. Build AST with BlockMapJoinCore.
- TRuntimeNode leftArg;
- const auto joinNode = BuildBlockJoin(pb, joinKind, leftKeyColumns, leftKeyDrops,
- leftArg, leftBlockType, rightNode);
-
- // 3. Prepare non-block type for the result of the join node.
- const auto joinBlockType = joinNode.GetStaticType();
- Y_ENSURE(joinBlockType->IsList(), "Join result has to be list");
- const auto joinListType = AS_TYPE(TListType, joinBlockType)->GetItemType();
- Y_ENSURE(joinListType->IsTuple(), "List item has to be tuple");
- const auto joinBlockItems = AS_TYPE(TTupleType, joinListType)->GetElements();
- TVector<TType*> joinItems;
- // XXX: Mind the last block length column.
- std::transform(joinBlockItems.cbegin(), std::prev(joinBlockItems.cend()), std::back_inserter(joinItems),
- [](const auto& blockItemType) {
- const auto& blockType = AS_TYPE(TBlockType, blockItemType);
- Y_ENSURE(blockType->GetShape() == TBlockType::EShape::Many);
- return blockType->GetItemType();
- });
-
- // 4. Build computation graph with BlockMapJoinCore node as a root.
- // Pass the values from the "left node" as the input for the
- // BlockMapJoinCore graph.
- const auto graph = setup.BuildGraph(joinNode, {leftArg.GetNode()});
- const auto& leftBlocks = graph->GetEntryPoint(0, true);
- auto& ctx = graph->GetContext();
- leftBlocks->SetValue(ctx, ToBlocks(ctx, blockSize, leftItems, leftListValue));
- const auto joinValues = FromBlocks(ctx, joinBlockItems, graph->GetValue());
- return joinValues;
-}
+ joinReturnItems.push_back(pgmBuilder.NewBlockType(pgmBuilder.NewDataType(NUdf::TDataType<ui64>::Id), TBlockType::EShape::Scalar));
-TVector<NUdf::TUnboxedValue> ConvertListToVector(const NUdf::TUnboxedValue& list) {
- NUdf::TUnboxedValue current;
- NUdf::TUnboxedValue iterator = list.GetListIterator();
- TVector<NUdf::TUnboxedValue> items;
- while (iterator.Next(current)) {
- items.push_back(current);
- }
- return items;
-}
+ TType* joinReturnType = pgmBuilder.NewStreamType(pgmBuilder.NewMultiType(joinReturnItems));
+ auto joinNode = pgmBuilder.BlockMapJoinCore(
+ leftStream,
+ rightStream,
+ joinKind,
+ leftKeyColumns,
+ leftKeyDrops,
+ rightKeyColumns,
+ rightKeyDrops,
+ rightAny,
+ joinReturnType
+ );
-void CompareResults(const TType* type, const NUdf::TUnboxedValue& expected,
- const NUdf::TUnboxedValue& got
-) {
- const auto itemType = AS_TYPE(TListType, type)->GetItemType();
- const NUdf::ICompare::TPtr compare = MakeCompareImpl(itemType);
- const NUdf::IEquate::TPtr equate = MakeEquateImpl(itemType);
- // XXX: Stub both keyTypes and isTuple arguments, since
- // ICompare/IEquate are used.
- TKeyTypes keyTypesStub;
- bool isTupleStub = false;
- const TValueLess valueLess(keyTypesStub, isTupleStub, compare.Get());
- const TValueEqual valueEqual(keyTypesStub, isTupleStub, equate.Get());
-
- auto expectedItems = ConvertListToVector(expected);
- auto gotItems = ConvertListToVector(got);
- UNIT_ASSERT_VALUES_EQUAL(expectedItems.size(), gotItems.size());
- Sort(expectedItems, valueLess);
- Sort(gotItems, valueLess);
- for (size_t i = 0; i < expectedItems.size(); i++) {
- UNIT_ASSERT(valueEqual(gotItems[i], expectedItems[i]));
- }
+ return FromWideStream(pgmBuilder, DethrottleStream(pgmBuilder, joinNode));
}
-void RunTestBlockJoin(TSetup<false>& setup, EJoinKind joinKind,
- const TType* expectedType, const NUdf::TUnboxedValue& expected,
- const TRuntimeNode& rightNode, const TType* leftType,
- const NUdf::TUnboxedValue& leftListValue, const TVector<ui32>& leftKeyColumns,
- const TVector<ui32>& leftKeyDrops = {}
+NUdf::TUnboxedValue DoTestBlockJoin(TSetup<false>& setup,
+ TType* leftType, NUdf::TUnboxedValue&& leftListValue, const TVector<ui32>& leftKeyColumns, const TVector<ui32>& leftKeyDrops,
+ TType* rightType, NUdf::TUnboxedValue&& rightListValue, const TVector<ui32>& rightKeyColumns, const TVector<ui32>& rightKeyDrops, bool rightAny,
+ EJoinKind joinKind, size_t blockSize
) {
- const size_t testSize = leftListValue.GetListLength();
- for (size_t blockSize = 8; blockSize <= testSize; blockSize <<= 1) {
- const auto got = DoTestBlockJoin(setup, leftType, leftListValue,
- leftKeyColumns, leftKeyDrops,
- rightNode, joinKind, blockSize);
- CompareResults(expectedType, expected, got);
- }
-}
+ TProgramBuilder& pb = *setup.PgmBuilder;
-//
-// Auxiliary routines to build list nodes from the given vectors.
-//
-
-struct TTypeMapperBase {
- TProgramBuilder& Pb;
- TType* ItemType;
- auto GetType() { return ItemType; }
-};
-
-template <typename Type>
-struct TTypeMapper: TTypeMapperBase {
- TTypeMapper(TProgramBuilder& pb): TTypeMapperBase {pb, pb.NewDataType(NUdf::TDataType<Type>::Id) } {}
- auto GetValue(const Type& value) {
- return Pb.NewDataLiteral<Type>(value);
- }
-};
+ Y_ENSURE(leftType->IsList(), "Left node has to be list");
+ const auto leftItemType = AS_TYPE(TListType, leftType)->GetItemType();
+ Y_ENSURE(leftItemType->IsTuple(), "List item has to be tuple");
+ TType* leftBlockType = MakeBlockTupleType(pb, leftItemType);
-template <>
-struct TTypeMapper<TString>: TTypeMapperBase {
- TTypeMapper(TProgramBuilder& pb): TTypeMapperBase {pb, pb.NewDataType(NUdf::EDataSlot::String)} {}
- auto GetValue(const TString& value) {
- return Pb.NewDataLiteral<NUdf::EDataSlot::String>(value);
- }
-};
-
-template <typename TNested>
-class TTypeMapper<std::optional<TNested>>: TTypeMapper<TNested> {
- using TBase = TTypeMapper<TNested>;
-public:
- TTypeMapper(TProgramBuilder& pb): TBase(pb) {}
- auto GetType() { return TBase::Pb.NewOptionalType(TBase::GetType()); }
- auto GetValue(const std::optional<TNested>& value) {
- if (value == std::nullopt) {
- return TBase::Pb.NewEmptyOptional(GetType());
- } else {
- return TBase::Pb.NewOptional(TBase::GetValue(*value));
- }
- }
-};
+ Y_ENSURE(rightType->IsList(), "Right node has to be list");
+ const auto rightItemType = AS_TYPE(TListType, rightType)->GetItemType();
+ Y_ENSURE(rightItemType->IsTuple(), "Right item has to be tuple");
+ TType* rightBlockType = MakeBlockTupleType(pb, rightItemType);
-template<typename Type>
-const TVector<const TRuntimeNode> BuildListNodes(TProgramBuilder& pb,
- const TVector<Type>& vector
-) {
- TTypeMapper<Type> mapper(pb);
+ TRuntimeNode leftList = pb.Arg(pb.NewListType(leftBlockType));
+ TRuntimeNode rightList = pb.Arg(pb.NewListType(rightBlockType));
+ const auto joinNode = BuildBlockJoin(pb, joinKind, leftList, leftKeyColumns, leftKeyDrops, rightList, rightKeyColumns, rightKeyDrops, rightAny);
- TRuntimeNode::TList listItems;
- std::transform(vector.cbegin(), vector.cend(), std::back_inserter(listItems),
- [&](const auto value) {
- return mapper.GetValue(value);
- });
+ const auto joinType = joinNode.GetStaticType();
+ Y_ENSURE(joinType->IsList(), "Join result has to be list");
+ const auto joinItemType = AS_TYPE(TListType, joinType)->GetItemType();
+ Y_ENSURE(joinItemType->IsTuple(), "List item has to be tuple");
- return {pb.NewList(mapper.GetType(), listItems)};
-}
+ const auto graph = setup.BuildGraph(joinNode, {leftList.GetNode(), rightList.GetNode()});
-template<typename Type, typename... Tail>
-const TVector<const TRuntimeNode> BuildListNodes(TProgramBuilder& pb,
- const TVector<Type>& vector, Tail... vectors
-) {
- const auto frontList = BuildListNodes(pb, vector);
- const auto tailLists = BuildListNodes(pb, std::forward<Tail>(vectors)...);
- TVector<const TRuntimeNode> lists;
- lists.reserve(tailLists.size() + 1);
- lists.push_back(frontList.front());;
- for (const auto& list : tailLists) {
- lists.push_back(list);
- }
- return lists;
+ auto& ctx = graph->GetContext();
+ graph->GetEntryPoint(0, true)->SetValue(ctx, ToBlocks(ctx, blockSize, AS_TYPE(TTupleType, leftItemType)->GetElements(), std::move(leftListValue)));
+ graph->GetEntryPoint(1, true)->SetValue(ctx, ToBlocks(ctx, blockSize, AS_TYPE(TTupleType, rightItemType)->GetElements(), std::move(rightListValue)));
+ return FromBlocks(ctx, AS_TYPE(TTupleType, joinItemType)->GetElements(), graph->GetValue());
}
-template<typename... TVectors>
-const std::pair<TType*, NUdf::TUnboxedValue> ConvertVectorsToTuples(
- TSetup<false>& setup, TVectors... vectors
+void RunTestBlockJoin(TSetup<false>& setup, EJoinKind joinKind,
+ TType* expectedType, const NUdf::TUnboxedValue& expected,
+ TType* leftType, NUdf::TUnboxedValue&& leftListValue, const TVector<ui32>& leftKeyColumns,
+ TType* rightType, NUdf::TUnboxedValue&& rightListValue, const TVector<ui32>& rightKeyColumns,
+ const TVector<ui32>& leftKeyDrops = {}, const TVector<ui32>& rightKeyDrops = {}, bool rightAny = false
) {
- TProgramBuilder& pb = *setup.PgmBuilder;
- const auto lists = BuildListNodes(pb, std::forward<TVectors>(vectors)...);
- const auto tuplesNode = pb.Zip(lists);
- const auto tuplesNodeType = tuplesNode.GetStaticType();
- const auto tuples = setup.BuildGraph(tuplesNode)->GetValue();
- return std::make_pair(tuplesNodeType, tuples);
-}
-
-TVector<TString> GenerateValues(size_t level) {
- constexpr size_t alphaSize = 'Z' - 'A' + 1;
- if (level == 1) {
- TVector<TString> alphabet(alphaSize);
- std::iota(alphabet.begin(), alphabet.end(), 'A');
- return alphabet;
- }
- const auto subValues = GenerateValues(level - 1);
- TVector<TString> values;
- values.reserve(alphaSize * subValues.size());
- for (char ch = 'A'; ch <= 'Z'; ch++) {
- for (const auto& tail : subValues) {
- values.emplace_back(ch + tail);
- }
- }
- return values;
-}
-
-TSet<ui64> GenerateFibonacci(size_t count) {
- TSet<ui64> fibSet;
- ui64 a = 0, b = 1;
- fibSet.insert(a);
- while (count--) {
- a = std::exchange(b, a + b);
- fibSet.insert(b);
+ const size_t testSize = leftListValue.GetListLength();
+ for (size_t blockSize = 1; blockSize <= testSize; blockSize <<= 1) {
+ const auto got = DoTestBlockJoin(setup,
+ leftType, std::move(leftListValue), leftKeyColumns, leftKeyDrops,
+ rightType, std::move(rightListValue), rightKeyColumns, rightKeyDrops, rightAny,
+ joinKind, blockSize
+ );
+ CompareResults(expectedType, expected, got);
}
- return fibSet;
}
} // namespace
-Y_UNIT_TEST_SUITE(TMiniKQLBlockMapJoinBasicTest) {
-
+Y_UNIT_TEST_SUITE(TMiniKQLBlockMapJoinTestBasic) {
constexpr size_t testSize = 1 << 14;
constexpr size_t valueSize = 3;
static const TVector<TString> threeLetterValues = GenerateValues(valueSize);
- static const TSet<ui64> fibonacci = GenerateFibonacci(21);
+ static const TSet<ui64> fibonacci = GenerateFibonacci(testSize);
+ static const TString hugeString(128, '1');
+
+ Y_UNIT_TEST(TestInnerJoin) {
+ TSetup<false> setup(GetNodeFactory());
- Y_UNIT_TEST(TestInnerOnUint64) {
- TSetup<false> setup;
- TProgramBuilder& pgmBuilder = *setup.PgmBuilder;
// 1. Make input for the "left" stream.
- TVector<ui64> keyInit(testSize);
- std::iota(keyInit.begin(), keyInit.end(), 1);
- TVector<ui64> subkeyInit;
- std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(subkeyInit),
+ TVector<ui64> leftKeyInit(testSize);
+ std::iota(leftKeyInit.begin(), leftKeyInit.end(), 1);
+ TVector<ui64> leftSubkeyInit;
+ std::transform(leftKeyInit.cbegin(), leftKeyInit.cend(), std::back_inserter(leftSubkeyInit),
[](const auto key) { return key * 1001; });
- TVector<TString> valueInit;
- std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(valueInit),
+ TVector<TString> leftValueInit;
+ std::transform(leftKeyInit.cbegin(), leftKeyInit.cend(), std::back_inserter(leftValueInit),
[](const auto key) { return threeLetterValues[key]; });
- // 2. Make input for the "right" dict.
+
+ // 2. Make input for the "right" stream.
const TVector<ui64> rightKeyInit(fibonacci.cbegin(), fibonacci.cend());
- TVector<TString> rightPayloadInit;
- std::transform(rightKeyInit.cbegin(), rightKeyInit.cend(), std::back_inserter(rightPayloadInit),
+ TVector<TString> rightValueInit;
+ std::transform(rightKeyInit.cbegin(), rightKeyInit.cend(), std::back_inserter(rightValueInit),
[](const auto key) { return std::to_string(key); });
+
// 3. Make "expected" data.
TMap<ui64, TString> rightMap;
for (size_t i = 0; i < rightKeyInit.size(); i++) {
- rightMap[rightKeyInit[i]] = rightPayloadInit[i];
- }
- TVector<ui64> keyExpected;
- TVector<ui64> subkeyExpected;
- TVector<TString> valueExpected;
- TVector<TString> rightExpected;
- for (size_t i = 0; i < keyInit.size(); i++) {
- const auto& found = rightMap.find(keyInit[i]);
+ rightMap[rightKeyInit[i]] = rightValueInit[i];
+ }
+ TVector<ui64> expectedKey;
+ TVector<ui64> expectedSubkey;
+ TVector<TString> expectedValue;
+ TVector<TString> expectedRightValue;
+ for (size_t i = 0; i < leftKeyInit.size(); i++) {
+ const auto& found = rightMap.find(leftKeyInit[i]);
if (found != rightMap.cend()) {
- keyExpected.push_back(keyInit[i]);
- subkeyExpected.push_back(subkeyInit[i]);
- valueExpected.push_back(valueInit[i]);
- rightExpected.push_back(found->second);
+ expectedKey.push_back(leftKeyInit[i]);
+ expectedSubkey.push_back(leftSubkeyInit[i]);
+ expectedValue.push_back(leftValueInit[i]);
+ expectedRightValue.push_back(found->second);
}
}
- // 4. Convert input and expected TVectors to List<UV>.
- const auto [leftType, leftList] = ConvertVectorsToTuples(setup,
- keyInit, subkeyInit, valueInit);
- const auto [expectedType, expected] = ConvertVectorsToTuples(setup,
- keyExpected, subkeyExpected, valueExpected, rightExpected);
- // 5. Build "right" computation node.
- const auto rightKeys = BuildListNodes(pgmBuilder, rightKeyInit);
- const auto rightPayloads = BuildListNodes(pgmBuilder, rightPayloadInit);
- const auto rightMapNode = MakeDict(pgmBuilder, rightKeys, rightPayloads);
- // 6. Run tests.
- RunTestBlockJoin(setup, EJoinKind::Inner, expectedType, expected,
- rightMapNode, leftType, leftList, {0});
- }
- Y_UNIT_TEST(TestInnerMultiOnUint64) {
- TSetup<false> setup;
- TProgramBuilder& pgmBuilder = *setup.PgmBuilder;
- // 1. Make input for the "left" stream.
- TVector<ui64> keyInit(testSize);
- std::iota(keyInit.begin(), keyInit.end(), 1);
- TVector<ui64> subkeyInit;
- std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(subkeyInit),
- [](const auto key) { return key * 1001; });
- TVector<TString> valueInit;
- std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(valueInit),
- [](const auto key) { return threeLetterValues[key]; });
- // 2. Make input for the "right" dict.
- const TVector<ui64> rightKeyInit(fibonacci.cbegin(), fibonacci.cend());
- TVector<TString> rightPayload1Init;
- std::transform(rightKeyInit.cbegin(), rightKeyInit.cend(), std::back_inserter(rightPayload1Init),
- [](const auto key) { return std::to_string(key); });
- TVector<TString> rightPayload2Init;
- std::transform(rightKeyInit.cbegin(), rightKeyInit.cend(), std::back_inserter(rightPayload2Init),
- [](const auto key) { return std::to_string(key * 1001); });
- // 3. Make "expected" data.
- TMap<ui64, TVector<TString>> rightMultiMap;
- for (size_t i = 0; i < rightKeyInit.size(); i++) {
- rightMultiMap[rightKeyInit[i]] = {rightPayload1Init[i], rightPayload2Init[i]};
- }
- TVector<ui64> keyExpected;
- TVector<ui64> subkeyExpected;
- TVector<TString> valueExpected;
- TVector<TString> rightExpected;
- for (size_t i = 0; i < keyInit.size(); i++) {
- const auto& found = rightMultiMap.find(keyInit[i]);
- if (found != rightMultiMap.cend()) {
- for (const auto& right : found->second) {
- keyExpected.push_back(keyInit[i]);
- subkeyExpected.push_back(subkeyInit[i]);
- valueExpected.push_back(valueInit[i]);
- rightExpected.push_back(right);
- }
- }
- }
- // 4. Convert input and expected TVectors to List<UV>.
- const auto [leftType, leftList] = ConvertVectorsToTuples(setup,
- keyInit, subkeyInit, valueInit);
- const auto [expectedType, expected] = ConvertVectorsToTuples(setup,
- keyExpected, subkeyExpected, valueExpected, rightExpected);
- // 5. Build "right" computation node.
- const auto rightKeys = BuildListNodes(pgmBuilder, rightKeyInit);
- const auto rightPayloads = BuildListNodes(pgmBuilder, rightPayload1Init, rightPayload2Init);
- const auto rightMultiMapNode = MakeDict(pgmBuilder, rightKeys, rightPayloads);
- // 6. Run tests.
+ auto [leftType, leftList] = ConvertVectorsToTuples(setup,
+ leftKeyInit, leftSubkeyInit, leftValueInit);
+ auto [rightType, rightList] = ConvertVectorsToTuples(setup,
+ rightKeyInit, rightValueInit);
+ auto [expectedType, expected] = ConvertVectorsToTuples(setup,
+ expectedKey, expectedSubkey, expectedValue, expectedRightValue);
+
RunTestBlockJoin(setup, EJoinKind::Inner, expectedType, expected,
- rightMultiMapNode, leftType, leftList, {0});
+ leftType, std::move(leftList), {0},
+ rightType, std::move(rightList), {0},
+ {}, {0}
+ );
}
- Y_UNIT_TEST(TestLeftOnUint64) {
- TSetup<false> setup;
- TProgramBuilder& pgmBuilder = *setup.PgmBuilder;
- // 1. Make input for the "left" stream.
- TVector<ui64> keyInit(testSize);
- std::iota(keyInit.begin(), keyInit.end(), 1);
- TVector<ui64> subkeyInit;
- std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(subkeyInit),
- [](const auto key) { return key * 1001; });
- TVector<TString> valueInit;
- std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(valueInit),
- [](const auto key) { return threeLetterValues[key]; });
- // 2. Make input for the "right" dict.
- const TVector<ui64> rightKeyInit(fibonacci.cbegin(), fibonacci.cend());
- TVector<TString> rightPayloadInit;
- std::transform(rightKeyInit.cbegin(), rightKeyInit.cend(), std::back_inserter(rightPayloadInit),
- [](const auto key) { return std::to_string(key); });
- // 3. Make "expected" data.
- TMap<ui64, TString> rightMap;
- for (size_t i = 0; i < rightKeyInit.size(); i++) {
- rightMap[rightKeyInit[i]] = rightPayloadInit[i];
- }
- TVector<ui64> keyExpected;
- TVector<ui64> subkeyExpected;
- TVector<TString> valueExpected;
- TVector<std::optional<TString>> rightExpected;
- for (size_t i = 0; i < keyInit.size(); i++) {
- keyExpected.push_back(keyInit[i]);
- subkeyExpected.push_back(subkeyInit[i]);
- valueExpected.push_back(valueInit[i]);
- const auto& found = rightMap.find(keyInit[i]);
- if (found != rightMap.cend()) {
- rightExpected.push_back(found->second);
- } else {
- rightExpected.push_back(std::nullopt);
- }
- }
- // 4. Convert input and expected TVectors to List<UV>.
- const auto [leftType, leftList] = ConvertVectorsToTuples(setup,
- keyInit, subkeyInit, valueInit);
- const auto [expectedType, expected] = ConvertVectorsToTuples(setup,
- keyExpected, subkeyExpected, valueExpected, rightExpected);
- // 5. Build "right" computation node.
- const auto rightKeys = BuildListNodes(pgmBuilder, rightKeyInit);
- const auto rightPayloads = BuildListNodes(pgmBuilder, rightPayloadInit);
- const auto rightMapNode = MakeDict(pgmBuilder, rightKeys, rightPayloads);
- // 6. Run tests.
- RunTestBlockJoin(setup, EJoinKind::Left, expectedType, expected,
- rightMapNode, leftType, leftList, {0});
- }
+ Y_UNIT_TEST(TestInnerJoinMulti) {
+ TSetup<false> setup(GetNodeFactory());
- Y_UNIT_TEST(TestLeftMultiOnUint64) {
- TSetup<false> setup;
- TProgramBuilder& pgmBuilder = *setup.PgmBuilder;
// 1. Make input for the "left" stream.
- TVector<ui64> keyInit(testSize);
- std::iota(keyInit.begin(), keyInit.end(), 1);
- TVector<ui64> subkeyInit;
- std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(subkeyInit),
+ TVector<ui64> leftKeyInit(testSize);
+ std::iota(leftKeyInit.begin(), leftKeyInit.end(), 1);
+ TVector<ui64> leftSubkeyInit;
+ std::transform(leftKeyInit.cbegin(), leftKeyInit.cend(), std::back_inserter(leftSubkeyInit),
[](const auto key) { return key * 1001; });
- TVector<TString> valueInit;
- std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(valueInit),
+ TVector<TString> leftValueInit;
+ std::transform(leftKeyInit.cbegin(), leftKeyInit.cend(), std::back_inserter(leftValueInit),
[](const auto key) { return threeLetterValues[key]; });
- // 2. Make input for the "right" dict.
- const TVector<ui64> rightKeyInit(fibonacci.cbegin(), fibonacci.cend());
- TVector<TString> rightPayload1Init;
- std::transform(rightKeyInit.cbegin(), rightKeyInit.cend(), std::back_inserter(rightPayload1Init),
+
+ // 2. Make input for the "right" stream.
+ TVector<ui64> rightKeyInit(fibonacci.cbegin(), fibonacci.cend());
+ TVector<TString> rightValueInit;
+ std::transform(rightKeyInit.cbegin(), rightKeyInit.cend(), std::back_inserter(rightValueInit),
[](const auto key) { return std::to_string(key); });
- TVector<TString> rightPayload2Init;
- std::transform(rightKeyInit.cbegin(), rightKeyInit.cend(), std::back_inserter(rightPayload2Init),
+
+ // Add rows with the same keys
+ std::copy_n(rightKeyInit.begin(), rightKeyInit.size(), std::back_inserter(rightKeyInit));
+ std::transform(rightKeyInit.cbegin(), rightKeyInit.cend(), std::back_inserter(rightValueInit),
[](const auto key) { return std::to_string(key * 1001); });
+
// 3. Make "expected" data.
- TMap<ui64, TVector<TString>> rightMultiMap;
+ TMultiMap<ui64, TString> rightMap;
for (size_t i = 0; i < rightKeyInit.size(); i++) {
- rightMultiMap[rightKeyInit[i]] = {rightPayload1Init[i], rightPayload2Init[i]};
+ rightMap.insert({rightKeyInit[i], rightValueInit[i]});
}
- TVector<ui64> keyExpected;
- TVector<ui64> subkeyExpected;
- TVector<TString> valueExpected;
- TVector<std::optional<TString>> rightExpected;
- for (size_t i = 0; i < keyInit.size(); i++) {
- const auto& found = rightMultiMap.find(keyInit[i]);
- if (found != rightMultiMap.cend()) {
- for (const auto& right : found->second) {
- keyExpected.push_back(keyInit[i]);
- subkeyExpected.push_back(subkeyInit[i]);
- valueExpected.push_back(valueInit[i]);
- rightExpected.push_back(right);
- }
- } else {
- keyExpected.push_back(keyInit[i]);
- subkeyExpected.push_back(subkeyInit[i]);
- valueExpected.push_back(valueInit[i]);
- rightExpected.push_back(std::nullopt);
+ TVector<ui64> expectedKey;
+ TVector<ui64> expectedSubkey;
+ TVector<TString> expectedValue;
+ TVector<TString> expectedRightValue;
+ for (size_t i = 0; i < leftKeyInit.size(); i++) {
+ const auto& [begin, end] = rightMap.equal_range(leftKeyInit[i]);
+ for (auto it = begin; it != end; it++) {
+ expectedKey.push_back(leftKeyInit[i]);
+ expectedSubkey.push_back(leftSubkeyInit[i]);
+ expectedValue.push_back(leftValueInit[i]);
+ expectedRightValue.push_back(it->second);
}
}
- // 4. Convert input and expected TVectors to List<UV>.
- const auto [leftType, leftList] = ConvertVectorsToTuples(setup,
- keyInit, subkeyInit, valueInit);
- const auto [expectedType, expected] = ConvertVectorsToTuples(setup,
- keyExpected, subkeyExpected, valueExpected, rightExpected);
- // 5. Build "right" computation node.
- const auto rightKeys = BuildListNodes(pgmBuilder, rightKeyInit);
- const auto rightPayloads = BuildListNodes(pgmBuilder, rightPayload1Init, rightPayload2Init);
- const auto rightMultiMapNode = MakeDict(pgmBuilder, rightKeys, rightPayloads);
- // 6. Run tests.
- RunTestBlockJoin(setup, EJoinKind::Left, expectedType, expected,
- rightMultiMapNode, leftType, leftList, {0});
- }
- Y_UNIT_TEST(TestLeftSemiOnUint64) {
- TSetup<false> setup;
- TProgramBuilder& pgmBuilder = *setup.PgmBuilder;
- // 1. Make input for the "left" stream.
- TVector<ui64> keyInit(testSize);
- std::iota(keyInit.begin(), keyInit.end(), 1);
- TVector<ui64> subkeyInit;
- std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(subkeyInit),
- [](const auto key) { return key * 1001; });
- TVector<TString> valueInit;
- std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(valueInit),
- [](const auto key) { return threeLetterValues[key]; });
- // 2. Make input for the "right" dict.
- const TVector<ui64> rightKeyInit(fibonacci.cbegin(), fibonacci.cend());
- // 3. Make "expected" data.
- TSet<ui64> rightSet(rightKeyInit.cbegin(), rightKeyInit.cend());
- TVector<ui64> keyExpected;
- TVector<ui64> subkeyExpected;
- TVector<TString> valueExpected;
- for (size_t i = 0; i < keyInit.size(); i++) {
- if (rightSet.contains(keyInit[i])) {
- keyExpected.push_back(keyInit[i]);
- subkeyExpected.push_back(subkeyInit[i]);
- valueExpected.push_back(valueInit[i]);
- }
- }
- // 4. Convert input and expected TVectors to List<UV>.
- const auto [leftType, leftList] = ConvertVectorsToTuples(setup,
- keyInit, subkeyInit, valueInit);
- const auto [expectedType, expected] = ConvertVectorsToTuples(setup,
- keyExpected, subkeyExpected, valueExpected);
- // 5. Build "right" computation node.
- const auto rightKeys = BuildListNodes(pgmBuilder, rightKeyInit);
- const auto rightSetNode = MakeSet(pgmBuilder, rightKeys);
- // 6. Run tests.
- RunTestBlockJoin(setup, EJoinKind::LeftSemi, expectedType, expected,
- rightSetNode, leftType, leftList, {0});
+ auto [leftType, leftList] = ConvertVectorsToTuples(setup,
+ leftKeyInit, leftSubkeyInit, leftValueInit);
+ auto [rightType, rightList] = ConvertVectorsToTuples(setup,
+ rightKeyInit, rightValueInit);
+ auto [expectedType, expected] = ConvertVectorsToTuples(setup,
+ expectedKey, expectedSubkey, expectedValue, expectedRightValue);
+
+ RunTestBlockJoin(setup, EJoinKind::Inner, expectedType, expected,
+ leftType, std::move(leftList), {0},
+ rightType, std::move(rightList), {0},
+ {}, {0}
+ );
}
- Y_UNIT_TEST(TestLeftOnlyOnUint64) {
- TSetup<false> setup;
- TProgramBuilder& pgmBuilder = *setup.PgmBuilder;
+ Y_UNIT_TEST(TestInnerJoinRightAny) {
+ TSetup<false> setup(GetNodeFactory());
+
// 1. Make input for the "left" stream.
- TVector<ui64> keyInit(testSize);
- std::iota(keyInit.begin(), keyInit.end(), 1);
- TVector<ui64> subkeyInit;
- std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(subkeyInit),
+ TVector<ui64> leftKeyInit(testSize);
+ std::iota(leftKeyInit.begin(), leftKeyInit.end(), 1);
+ TVector<ui64> leftSubkeyInit;
+ std::transform(leftKeyInit.cbegin(), leftKeyInit.cend(), std::back_inserter(leftSubkeyInit),
[](const auto key) { return key * 1001; });
- TVector<TString> valueInit;
- std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(valueInit),
+ TVector<TString> leftValueInit;
+ std::transform(leftKeyInit.cbegin(), leftKeyInit.cend(), std::back_inserter(leftValueInit),
[](const auto key) { return threeLetterValues[key]; });
- // 2. Make input for the "right" dict.
- const TVector<ui64> rightKeyInit(fibonacci.cbegin(), fibonacci.cend());
- // 3. Make "expected" data.
- TSet<ui64> rightSet(rightKeyInit.cbegin(), rightKeyInit.cend());
- TVector<ui64> keyExpected;
- TVector<ui64> subkeyExpected;
- TVector<TString> valueExpected;
- for (size_t i = 0; i < keyInit.size(); i++) {
- if (!rightSet.contains(keyInit[i])) {
- keyExpected.push_back(keyInit[i]);
- subkeyExpected.push_back(subkeyInit[i]);
- valueExpected.push_back(valueInit[i]);
- }
- }
- // 4. Convert input and expected TVectors to List<UV>.
- const auto [leftType, leftList] = ConvertVectorsToTuples(setup,
- keyInit, subkeyInit, valueInit);
- const auto [expectedType, expected] = ConvertVectorsToTuples(setup,
- keyExpected, subkeyExpected, valueExpected);
- // 5. Build "right" computation node.
- const auto rightKeys = BuildListNodes(pgmBuilder, rightKeyInit);
- const auto rightSetNode = MakeSet(pgmBuilder, rightKeys);
- // 6. Run tests.
- RunTestBlockJoin(setup, EJoinKind::LeftOnly, expectedType, expected,
- rightSetNode, leftType, leftList, {0});
- }
-} // Y_UNIT_TEST_SUITE
-
-Y_UNIT_TEST_SUITE(TMiniKQLBlockMapJoinNullKeysTest) {
-
- constexpr size_t testSize = 1 << 14;
- constexpr size_t valueSize = 3;
- static const TVector<TString> threeLetterValues = GenerateValues(valueSize);
- static const TSet<ui64> fibonacci = GenerateFibonacci(21);
+ // 2. Make input for the "right" stream.
+ TVector<ui64> rightKeyInit(fibonacci.cbegin(), fibonacci.cend());
+ TVector<TString> rightValueInit;
+ std::transform(rightKeyInit.cbegin(), rightKeyInit.cend(), std::back_inserter(rightValueInit),
+ [](const auto key) { return std::to_string(key); });
- Y_UNIT_TEST(TestInnerOnOptionalUint64) {
- TSetup<false> setup;
- TProgramBuilder& pgmBuilder = *setup.PgmBuilder;
- // 1. Make input for the "left" stream.
- TVector<std::optional<ui64>> keyInit(testSize);
- std::iota(keyInit.begin(), keyInit.end(), 1);
- TVector<ui64> subkeyInit;
- std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(subkeyInit),
- [](const auto key) { return *key * 1001; });
- TVector<TString> valueInit;
- std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(valueInit),
- [](const auto key) { return threeLetterValues[*key]; });
- // 1a. Make some keys NULL
- keyInit[0] = std::nullopt;
- // 2. Make input for the "right" dict.
- const TVector<ui64> rightKeyInit(fibonacci.cbegin(), fibonacci.cend());
- TVector<TString> rightPayloadInit;
- std::transform(rightKeyInit.cbegin(), rightKeyInit.cend(), std::back_inserter(rightPayloadInit),
+ // Add rows with the same keys
+ std::copy_n(rightKeyInit.begin(), rightKeyInit.size(), std::back_inserter(rightKeyInit));
+ std::transform(rightKeyInit.cbegin(), rightKeyInit.cend(), std::back_inserter(rightValueInit),
[](const auto key) { return std::to_string(key); });
+
// 3. Make "expected" data.
TMap<ui64, TString> rightMap;
for (size_t i = 0; i < rightKeyInit.size(); i++) {
- rightMap[rightKeyInit[i]] = rightPayloadInit[i];
- }
- TVector<std::optional<ui64>> keyExpected;
- TVector<ui64> subkeyExpected;
- TVector<TString> valueExpected;
- TVector<TString> rightExpected;
- for (size_t i = 0; i < keyInit.size(); i++) {
- if (!keyInit[i]) {
- continue;
- }
- const auto& found = rightMap.find(*keyInit[i]);
+ rightMap[rightKeyInit[i]] = rightValueInit[i];
+ }
+ TVector<ui64> expectedKey;
+ TVector<ui64> expectedSubkey;
+ TVector<TString> expectedValue;
+ TVector<TString> expectedRightValue;
+ for (size_t i = 0; i < leftKeyInit.size(); i++) {
+ auto found = rightMap.find(leftKeyInit[i]);
if (found != rightMap.cend()) {
- keyExpected.push_back(keyInit[i]);
- subkeyExpected.push_back(subkeyInit[i]);
- valueExpected.push_back(valueInit[i]);
- rightExpected.push_back(found->second);
+ expectedKey.push_back(leftKeyInit[i]);
+ expectedSubkey.push_back(leftSubkeyInit[i]);
+ expectedValue.push_back(leftValueInit[i]);
+ expectedRightValue.push_back(found->second);
}
}
- // 4. Convert input and expected TVectors to List<UV>.
- const auto [leftType, leftList] = ConvertVectorsToTuples(setup,
- keyInit, subkeyInit, valueInit);
- const auto [expectedType, expected] = ConvertVectorsToTuples(setup,
- keyExpected, subkeyExpected, valueExpected, rightExpected);
- // 5. Build "right" computation node.
- const auto rightKeys = BuildListNodes(pgmBuilder, rightKeyInit);
- const auto rightPayloads = BuildListNodes(pgmBuilder, rightPayloadInit);
- const auto rightMapNode = MakeDict(pgmBuilder, rightKeys, rightPayloads);
- // 6. Run tests.
- RunTestBlockJoin(setup, EJoinKind::Inner, expectedType, expected,
- rightMapNode, leftType, leftList, {0});
- }
- Y_UNIT_TEST(TestInnerMultiOnOptionalUint64) {
- TSetup<false> setup;
- TProgramBuilder& pgmBuilder = *setup.PgmBuilder;
- // 1. Make input for the "left" stream.
- TVector<std::optional<ui64>> keyInit(testSize);
- std::iota(keyInit.begin(), keyInit.end(), 1);
- TVector<ui64> subkeyInit;
- std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(subkeyInit),
- [](const auto key) { return *key * 1001; });
- TVector<TString> valueInit;
- std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(valueInit),
- [](const auto key) { return threeLetterValues[*key]; });
- // 1a. Make some keys NULL
- keyInit[0] = std::nullopt;
- // 2. Make input for the "right" dict.
- const TVector<ui64> rightKeyInit(fibonacci.cbegin(), fibonacci.cend());
- TVector<TString> rightPayload1Init;
- std::transform(rightKeyInit.cbegin(), rightKeyInit.cend(), std::back_inserter(rightPayload1Init),
- [](const auto key) { return std::to_string(key); });
- TVector<TString> rightPayload2Init;
- std::transform(rightKeyInit.cbegin(), rightKeyInit.cend(), std::back_inserter(rightPayload2Init),
- [](const auto key) { return std::to_string(key * 1001); });
- // 3. Make "expected" data.
- TMap<ui64, TVector<TString>> rightMultiMap;
- for (size_t i = 0; i < rightKeyInit.size(); i++) {
- rightMultiMap[rightKeyInit[i]] = {rightPayload1Init[i], rightPayload2Init[i]};
- }
- TVector<std::optional<ui64>> keyExpected;
- TVector<ui64> subkeyExpected;
- TVector<TString> valueExpected;
- TVector<TString> rightExpected;
- for (size_t i = 0; i < keyInit.size(); i++) {
- if (!keyInit[i]) {
- continue;
- }
- const auto& found = rightMultiMap.find(*keyInit[i]);
- if (found != rightMultiMap.cend()) {
- for (const auto& right : found->second) {
- keyExpected.push_back(keyInit[i]);
- subkeyExpected.push_back(subkeyInit[i]);
- valueExpected.push_back(valueInit[i]);
- rightExpected.push_back(right);
- }
- }
- }
- // 4. Convert input and expected TVectors to List<UV>.
- const auto [leftType, leftList] = ConvertVectorsToTuples(setup,
- keyInit, subkeyInit, valueInit);
- const auto [expectedType, expected] = ConvertVectorsToTuples(setup,
- keyExpected, subkeyExpected, valueExpected, rightExpected);
- // 5. Build "right" computation node.
- const auto rightKeys = BuildListNodes(pgmBuilder, rightKeyInit);
- const auto rightPayloads = BuildListNodes(pgmBuilder, rightPayload1Init, rightPayload2Init);
- const auto rightMultiMapNode = MakeDict(pgmBuilder, rightKeys, rightPayloads);
- // 6. Run tests.
+ auto [leftType, leftList] = ConvertVectorsToTuples(setup,
+ leftKeyInit, leftSubkeyInit, leftValueInit);
+ auto [rightType, rightList] = ConvertVectorsToTuples(setup,
+ rightKeyInit, rightValueInit);
+ auto [expectedType, expected] = ConvertVectorsToTuples(setup,
+ expectedKey, expectedSubkey, expectedValue, expectedRightValue);
+
RunTestBlockJoin(setup, EJoinKind::Inner, expectedType, expected,
- rightMultiMapNode, leftType, leftList, {0});
+ leftType, std::move(leftList), {0},
+ rightType, std::move(rightList), {0},
+ {}, {0}, true
+ );
}
- Y_UNIT_TEST(TestLeftOnOptionalUint64) {
- TSetup<false> setup;
- TProgramBuilder& pgmBuilder = *setup.PgmBuilder;
+ Y_UNIT_TEST(TestLeftJoin) {
+ TSetup<false> setup(GetNodeFactory());
+
// 1. Make input for the "left" stream.
- TVector<std::optional<ui64>> keyInit(testSize);
- std::iota(keyInit.begin(), keyInit.end(), 1);
- TVector<ui64> subkeyInit;
- std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(subkeyInit),
- [](const auto key) { return *key * 1001; });
- TVector<TString> valueInit;
- std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(valueInit),
- [](const auto key) { return threeLetterValues[*key]; });
- // 1a. Make some keys NULL
- keyInit[0] = std::nullopt;
- // 2. Make input for the "right" dict.
+ TVector<ui64> leftKeyInit(testSize);
+ std::iota(leftKeyInit.begin(), leftKeyInit.end(), 1);
+ TVector<ui64> leftSubkeyInit;
+ std::transform(leftKeyInit.cbegin(), leftKeyInit.cend(), std::back_inserter(leftSubkeyInit),
+ [](const auto key) { return key * 1001; });
+ TVector<TString> leftValueInit;
+ std::transform(leftKeyInit.cbegin(), leftKeyInit.cend(), std::back_inserter(leftValueInit),
+ [](const auto key) { return threeLetterValues[key]; });
+
+ // 2. Make input for the "right" stream.
const TVector<ui64> rightKeyInit(fibonacci.cbegin(), fibonacci.cend());
- TVector<TString> rightPayloadInit;
- std::transform(rightKeyInit.cbegin(), rightKeyInit.cend(), std::back_inserter(rightPayloadInit),
+ TVector<TString> rightValueInit;
+ std::transform(rightKeyInit.cbegin(), rightKeyInit.cend(), std::back_inserter(rightValueInit),
[](const auto key) { return std::to_string(key); });
+
// 3. Make "expected" data.
TMap<ui64, TString> rightMap;
for (size_t i = 0; i < rightKeyInit.size(); i++) {
- rightMap[rightKeyInit[i]] = rightPayloadInit[i];
- }
- TVector<std::optional<ui64>> keyExpected;
- TVector<ui64> subkeyExpected;
- TVector<TString> valueExpected;
- TVector<std::optional<TString>> rightExpected;
- for (size_t i = 0; i < keyInit.size(); i++) {
- keyExpected.push_back(keyInit[i]);
- subkeyExpected.push_back(subkeyInit[i]);
- valueExpected.push_back(valueInit[i]);
- const auto& found = keyInit[i] ? rightMap.find(*keyInit[i]) : rightMap.cend();
+ rightMap[rightKeyInit[i]] = rightValueInit[i];
+ }
+ TVector<ui64> expectedKey;
+ TVector<ui64> expectedSubkey;
+ TVector<TString> expectedValue;
+ TVector<std::optional<TString>> expectedRightValue;
+ for (size_t i = 0; i < leftKeyInit.size(); i++) {
+ expectedKey.push_back(leftKeyInit[i]);
+ expectedSubkey.push_back(leftSubkeyInit[i]);
+ expectedValue.push_back(leftValueInit[i]);
+ const auto& found = rightMap.find(leftKeyInit[i]);
if (found != rightMap.cend()) {
- rightExpected.push_back(found->second);
+ expectedRightValue.push_back(found->second);
} else {
- rightExpected.push_back(std::nullopt);
+ expectedRightValue.push_back(std::nullopt);
}
}
- // 4. Convert input and expected TVectors to List<UV>.
- const auto [leftType, leftList] = ConvertVectorsToTuples(setup,
- keyInit, subkeyInit, valueInit);
- const auto [expectedType, expected] = ConvertVectorsToTuples(setup,
- keyExpected, subkeyExpected, valueExpected, rightExpected);
- // 5. Build "right" computation node.
- const auto rightKeys = BuildListNodes(pgmBuilder, rightKeyInit);
- const auto rightPayloads = BuildListNodes(pgmBuilder, rightPayloadInit);
- const auto rightMapNode = MakeDict(pgmBuilder, rightKeys, rightPayloads);
- // 6. Run tests.
+
+ auto [leftType, leftList] = ConvertVectorsToTuples(setup,
+ leftKeyInit, leftSubkeyInit, leftValueInit);
+ auto [rightType, rightList] = ConvertVectorsToTuples(setup,
+ rightKeyInit, rightValueInit);
+ auto [expectedType, expected] = ConvertVectorsToTuples(setup,
+ expectedKey, expectedSubkey, expectedValue, expectedRightValue);
+
RunTestBlockJoin(setup, EJoinKind::Left, expectedType, expected,
- rightMapNode, leftType, leftList, {0});
+ leftType, std::move(leftList), {0},
+ rightType, std::move(rightList), {0},
+ {}, {0}
+ );
}
- Y_UNIT_TEST(TestLeftMultiOnOptionalUint64) {
- TSetup<false> setup;
- TProgramBuilder& pgmBuilder = *setup.PgmBuilder;
+ Y_UNIT_TEST(TestLeftJoinMulti) {
+ TSetup<false> setup(GetNodeFactory());
+
// 1. Make input for the "left" stream.
- TVector<std::optional<ui64>> keyInit(testSize);
- std::iota(keyInit.begin(), keyInit.end(), 1);
- TVector<ui64> subkeyInit;
- std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(subkeyInit),
- [](const auto key) { return *key * 1001; });
- TVector<TString> valueInit;
- std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(valueInit),
- [](const auto key) { return threeLetterValues[*key]; });
- // 1a. Make some keys NULL
- keyInit[0] = std::nullopt;
- // 2. Make input for the "right" dict.
- const TVector<ui64> rightKeyInit(fibonacci.cbegin(), fibonacci.cend());
- TVector<TString> rightPayload1Init;
- std::transform(rightKeyInit.cbegin(), rightKeyInit.cend(), std::back_inserter(rightPayload1Init),
+ TVector<ui64> leftKeyInit(testSize);
+ std::iota(leftKeyInit.begin(), leftKeyInit.end(), 1);
+ TVector<ui64> leftSubkeyInit;
+ std::transform(leftKeyInit.cbegin(), leftKeyInit.cend(), std::back_inserter(leftSubkeyInit),
+ [](const auto key) { return key * 1001; });
+ TVector<TString> leftValueInit;
+ std::transform(leftKeyInit.cbegin(), leftKeyInit.cend(), std::back_inserter(leftValueInit),
+ [](const auto key) { return threeLetterValues[key]; });
+
+ // 2. Make input for the "right" stream.
+ TVector<ui64> rightKeyInit(fibonacci.cbegin(), fibonacci.cend());
+ TVector<TString> rightValueInit;
+ std::transform(rightKeyInit.cbegin(), rightKeyInit.cend(), std::back_inserter(rightValueInit),
[](const auto key) { return std::to_string(key); });
- TVector<TString> rightPayload2Init;
- std::transform(rightKeyInit.cbegin(), rightKeyInit.cend(), std::back_inserter(rightPayload2Init),
+
+ // Add rows with the same keys
+ std::copy_n(rightKeyInit.begin(), rightKeyInit.size(), std::back_inserter(rightKeyInit));
+ std::transform(rightKeyInit.cbegin(), rightKeyInit.cend(), std::back_inserter(rightValueInit),
[](const auto key) { return std::to_string(key * 1001); });
+
// 3. Make "expected" data.
- TMap<ui64, TVector<TString>> rightMultiMap;
+ TMultiMap<ui64, TString> rightMap;
for (size_t i = 0; i < rightKeyInit.size(); i++) {
- rightMultiMap[rightKeyInit[i]] = {rightPayload1Init[i], rightPayload2Init[i]};
- }
- TVector<std::optional<ui64>> keyExpected;
- TVector<ui64> subkeyExpected;
- TVector<TString> valueExpected;
- TVector<std::optional<TString>> rightExpected;
- for (size_t i = 0; i < keyInit.size(); i++) {
- const auto& found = keyInit[i] ? rightMultiMap.find(*keyInit[i]) : rightMultiMap.cend();
- if (found != rightMultiMap.cend()) {
- for (const auto& right : found->second) {
- keyExpected.push_back(keyInit[i]);
- subkeyExpected.push_back(subkeyInit[i]);
- valueExpected.push_back(valueInit[i]);
- rightExpected.push_back(right);
+ rightMap.insert({rightKeyInit[i], rightValueInit[i]});
+ }
+ TVector<ui64> expectedKey;
+ TVector<ui64> expectedSubkey;
+ TVector<TString> expectedValue;
+ TVector<std::optional<TString>> expectedRightValue;
+ for (size_t i = 0; i < leftKeyInit.size(); i++) {
+ const auto& [begin, end] = rightMap.equal_range(leftKeyInit[i]);
+ if (begin != end) {
+ for (auto it = begin; it != end; it++) {
+ expectedKey.push_back(leftKeyInit[i]);
+ expectedSubkey.push_back(leftSubkeyInit[i]);
+ expectedValue.push_back(leftValueInit[i]);
+ expectedRightValue.push_back(it->second);
}
} else {
- keyExpected.push_back(keyInit[i]);
- subkeyExpected.push_back(subkeyInit[i]);
- valueExpected.push_back(valueInit[i]);
- rightExpected.push_back(std::nullopt);
+ expectedKey.push_back(leftKeyInit[i]);
+ expectedSubkey.push_back(leftSubkeyInit[i]);
+ expectedValue.push_back(leftValueInit[i]);
+ expectedRightValue.push_back(std::nullopt);
}
}
- // 4. Convert input and expected TVectors to List<UV>.
- const auto [leftType, leftList] = ConvertVectorsToTuples(setup,
- keyInit, subkeyInit, valueInit);
- const auto [expectedType, expected] = ConvertVectorsToTuples(setup,
- keyExpected, subkeyExpected, valueExpected, rightExpected);
- // 5. Build "right" computation node.
- const auto rightKeys = BuildListNodes(pgmBuilder, rightKeyInit);
- const auto rightPayloads = BuildListNodes(pgmBuilder, rightPayload1Init, rightPayload2Init);
- const auto rightMultiMapNode = MakeDict(pgmBuilder, rightKeys, rightPayloads);
- // 6. Run tests.
+
+ auto [leftType, leftList] = ConvertVectorsToTuples(setup,
+ leftKeyInit, leftSubkeyInit, leftValueInit);
+ auto [rightType, rightList] = ConvertVectorsToTuples(setup,
+ rightKeyInit, rightValueInit);
+ auto [expectedType, expected] = ConvertVectorsToTuples(setup,
+ expectedKey, expectedSubkey, expectedValue, expectedRightValue);
+
RunTestBlockJoin(setup, EJoinKind::Left, expectedType, expected,
- rightMultiMapNode, leftType, leftList, {0});
+ leftType, std::move(leftList), {0},
+ rightType, std::move(rightList), {0},
+ {}, {0}
+ );
}
- Y_UNIT_TEST(TestLeftSemiOnOptionalUint64) {
- TSetup<false> setup;
- TProgramBuilder& pgmBuilder = *setup.PgmBuilder;
+ Y_UNIT_TEST(TestLeftSemiJoin) {
+ TSetup<false> setup(GetNodeFactory());
+
// 1. Make input for the "left" stream.
- TVector<std::optional<ui64>> keyInit(testSize);
- std::iota(keyInit.begin(), keyInit.end(), 1);
- TVector<ui64> subkeyInit;
- std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(subkeyInit),
- [](const auto key) { return *key * 1001; });
- TVector<TString> valueInit;
- std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(valueInit),
- [](const auto key) { return threeLetterValues[*key]; });
- // 1a. Make some keys NULL
- keyInit[0] = std::nullopt;
- // 2. Make input for the "right" dict.
- const TVector<ui64> rightKeyInit(fibonacci.cbegin(), fibonacci.cend());
+ TVector<ui64> leftKeyInit(testSize);
+ std::iota(leftKeyInit.begin(), leftKeyInit.end(), 1);
+ TVector<ui64> leftSubkeyInit;
+ std::transform(leftKeyInit.cbegin(), leftKeyInit.cend(), std::back_inserter(leftSubkeyInit),
+ [](const auto key) { return key * 1001; });
+ TVector<TString> leftValueInit;
+ std::transform(leftKeyInit.cbegin(), leftKeyInit.cend(), std::back_inserter(leftValueInit),
+ [](const auto key) { return threeLetterValues[key]; });
+
+ // 2. Make input for the "right" stream.
+ TVector<ui64> rightKeyInit(fibonacci.cbegin(), fibonacci.cend());
+ // Add rows with the same keys
+ std::copy_n(rightKeyInit.begin(), rightKeyInit.size(), std::back_inserter(rightKeyInit));
+
// 3. Make "expected" data.
TSet<ui64> rightSet(rightKeyInit.cbegin(), rightKeyInit.cend());
- TVector<std::optional<ui64>> keyExpected;
- TVector<ui64> subkeyExpected;
- TVector<TString> valueExpected;
- for (size_t i = 0; i < keyInit.size(); i++) {
- if (keyInit[i] && rightSet.contains(*keyInit[i])) {
- keyExpected.push_back(keyInit[i]);
- subkeyExpected.push_back(subkeyInit[i]);
- valueExpected.push_back(valueInit[i]);
+ TVector<ui64> expectedKey;
+ TVector<ui64> expectedSubkey;
+ TVector<TString> expectedValue;
+ for (size_t i = 0; i < leftKeyInit.size(); i++) {
+ if (rightSet.contains(leftKeyInit[i])) {
+ expectedKey.push_back(leftKeyInit[i]);
+ expectedSubkey.push_back(leftSubkeyInit[i]);
+ expectedValue.push_back(leftValueInit[i]);
}
}
- // 4. Convert input and expected TVectors to List<UV>.
- const auto [leftType, leftList] = ConvertVectorsToTuples(setup,
- keyInit, subkeyInit, valueInit);
- const auto [expectedType, expected] = ConvertVectorsToTuples(setup,
- keyExpected, subkeyExpected, valueExpected);
- // 5. Build "right" computation node.
- const auto rightKeys = BuildListNodes(pgmBuilder, rightKeyInit);
- const auto rightSetNode = MakeSet(pgmBuilder, rightKeys);
- // 6. Run tests.
+
+ auto [leftType, leftList] = ConvertVectorsToTuples(setup,
+ leftKeyInit, leftSubkeyInit, leftValueInit);
+ auto [rightType, rightList] = ConvertVectorsToTuples(setup, rightKeyInit);
+ auto [expectedType, expected] = ConvertVectorsToTuples(setup,
+ expectedKey, expectedSubkey, expectedValue);
+
RunTestBlockJoin(setup, EJoinKind::LeftSemi, expectedType, expected,
- rightSetNode, leftType, leftList, {0});
+ leftType, std::move(leftList), {0},
+ rightType, std::move(rightList), {0}
+ );
}
- Y_UNIT_TEST(TestLeftOnlyOnOptionalUint64) {
- TSetup<false> setup;
- TProgramBuilder& pgmBuilder = *setup.PgmBuilder;
+ Y_UNIT_TEST(TestLeftOnlyJoin) {
+ TSetup<false> setup(GetNodeFactory());
+
// 1. Make input for the "left" stream.
- TVector<std::optional<ui64>> keyInit(testSize);
- std::iota(keyInit.begin(), keyInit.end(), 1);
- TVector<ui64> subkeyInit;
- std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(subkeyInit),
- [](const auto key) { return *key * 1001; });
- TVector<TString> valueInit;
- std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(valueInit),
- [](const auto key) { return threeLetterValues[*key]; });
- // 1a. Make some keys NULL
- keyInit[0] = std::nullopt;
- // 2. Make input for the "right" dict.
- const TVector<ui64> rightKeyInit(fibonacci.cbegin(), fibonacci.cend());
+ TVector<ui64> leftKeyInit(testSize);
+ std::iota(leftKeyInit.begin(), leftKeyInit.end(), 1);
+ TVector<ui64> leftSubkeyInit;
+ std::transform(leftKeyInit.cbegin(), leftKeyInit.cend(), std::back_inserter(leftSubkeyInit),
+ [](const auto key) { return key * 1001; });
+ TVector<TString> leftValueInit;
+ std::transform(leftKeyInit.cbegin(), leftKeyInit.cend(), std::back_inserter(leftValueInit),
+ [](const auto key) { return threeLetterValues[key]; });
+
+ // 2. Make input for the "right" stream.
+ TVector<ui64> rightKeyInit(fibonacci.cbegin(), fibonacci.cend());
+ // Add rows with the same keys
+ std::copy_n(rightKeyInit.begin(), rightKeyInit.size(), std::back_inserter(rightKeyInit));
+
// 3. Make "expected" data.
TSet<ui64> rightSet(rightKeyInit.cbegin(), rightKeyInit.cend());
- TVector<std::optional<ui64>> keyExpected;
- TVector<ui64> subkeyExpected;
- TVector<TString> valueExpected;
- for (size_t i = 0; i < keyInit.size(); i++) {
- if (!(keyInit[i] && rightSet.contains(*keyInit[i]))) {
- keyExpected.push_back(keyInit[i]);
- subkeyExpected.push_back(subkeyInit[i]);
- valueExpected.push_back(valueInit[i]);
+ TVector<ui64> expectedKey;
+ TVector<ui64> expectedSubkey;
+ TVector<TString> expectedValue;
+ for (size_t i = 0; i < leftKeyInit.size(); i++) {
+ if (!rightSet.contains(leftKeyInit[i])) {
+ expectedKey.push_back(leftKeyInit[i]);
+ expectedSubkey.push_back(leftSubkeyInit[i]);
+ expectedValue.push_back(leftValueInit[i]);
}
}
- // 4. Convert input and expected TVectors to List<UV>.
- const auto [leftType, leftList] = ConvertVectorsToTuples(setup,
- keyInit, subkeyInit, valueInit);
- const auto [expectedType, expected] = ConvertVectorsToTuples(setup,
- keyExpected, subkeyExpected, valueExpected);
- // 5. Build "right" computation node.
- const auto rightKeys = BuildListNodes(pgmBuilder, rightKeyInit);
- const auto rightSetNode = MakeSet(pgmBuilder, rightKeys);
- // 6. Run tests.
+
+ auto [leftType, leftList] = ConvertVectorsToTuples(setup,
+ leftKeyInit, leftSubkeyInit, leftValueInit);
+ auto [rightType, rightList] = ConvertVectorsToTuples(setup, rightKeyInit);
+ auto [expectedType, expected] = ConvertVectorsToTuples(setup,
+ expectedKey, expectedSubkey, expectedValue);
+
RunTestBlockJoin(setup, EJoinKind::LeftOnly, expectedType, expected,
- rightSetNode, leftType, leftList, {0});
+ leftType, std::move(leftList), {0},
+ rightType, std::move(rightList), {0}
+ );
}
-} // Y_UNIT_TEST_SUITE
+ Y_UNIT_TEST(TestKeyTuple) {
+ TSetup<false> setup(GetNodeFactory());
-Y_UNIT_TEST_SUITE(TMiniKQLBlockMapJoinMoreTest) {
+ // 1. Make input for the "left" stream.
+ TVector<ui64> leftKeyInit(testSize);
+ std::iota(leftKeyInit.begin(), leftKeyInit.end(), 1);
+ TVector<ui64> leftSubkeyInit;
+ std::transform(leftKeyInit.cbegin(), leftKeyInit.cend(), std::back_inserter(leftSubkeyInit),
+ [](const auto key) { return key * 1001; });
+ TVector<TString> leftValueInit;
+ std::transform(leftKeyInit.cbegin(), leftKeyInit.cend(), std::back_inserter(leftValueInit),
+ [](const auto key) { return threeLetterValues[key]; });
- constexpr size_t testSize = 1 << 14;
- constexpr size_t valueSize = 3;
- static const TVector<TString> threeLetterValues = GenerateValues(valueSize);
- static const TString hugeString(128, '1');
+ // 2. Make input for the "right" stream.
+ TVector<ui64> rightKey1Init(fibonacci.cbegin(), fibonacci.cend());
+ TVector<ui64> rightKey2Init;
+ std::transform(rightKey1Init.cbegin(), rightKey1Init.cend(), std::back_inserter(rightKey2Init),
+ [](const auto& key) { return key * 1001; });
+ TVector<TString> rightValueInit;
+ std::transform(rightKey1Init.cbegin(), rightKey1Init.cend(), std::back_inserter(rightValueInit),
+ [](const auto& key) { return std::to_string(key); });
- Y_UNIT_TEST(TestInnerOn1) {
- TSetup<false> setup;
- TProgramBuilder& pgmBuilder = *setup.PgmBuilder;
- // 1. Make input for the "left" stream.
- TVector<ui64> keyInit(testSize);
- std::fill(keyInit.begin(), keyInit.end(), 1);
- TVector<ui64> subkeyInit;
- TVector<TString> valueInit;
- for (size_t i = 0; i < keyInit.size(); i++) {
- subkeyInit.push_back(i * 1001);
- valueInit.push_back(threeLetterValues[i]);
- }
- // 2. Make input for the "right" dict.
- const TVector<ui64> rightKeyInit({1});
- TVector<TString> rightPayloadInit({hugeString});
// 3. Make "expected" data.
- TMap<ui64, TString> rightMap;
- for (size_t i = 0; i < rightKeyInit.size(); i++) {
- rightMap[rightKeyInit[i]] = rightPayloadInit[i];
- }
- TVector<ui64> keyExpected;
- TVector<ui64> subkeyExpected;
- TVector<TString> valueExpected;
- TVector<TString> rightExpected;
- for (size_t i = 0; i < keyInit.size(); i++) {
- const auto& found = rightMap.find(keyInit[i]);
+ TMap<std::tuple<ui64, ui64>, TString> rightMap;
+ for (size_t i = 0; i < rightKey1Init.size(); i++) {
+ const auto key = std::make_tuple(rightKey1Init[i], rightKey2Init[i]);
+ rightMap[key] = rightValueInit[i];
+ }
+ TVector<ui64> expectedKey;
+ TVector<ui64> expectedSubkey;
+ TVector<TString> expectedValue;
+ TVector<TString> expectedRightValue;
+ for (size_t i = 0; i < leftKeyInit.size(); i++) {
+ const auto key = std::make_tuple(leftKeyInit[i], leftSubkeyInit[i]);
+ const auto found = rightMap.find(key);
if (found != rightMap.cend()) {
- keyExpected.push_back(keyInit[i]);
- subkeyExpected.push_back(subkeyInit[i]);
- valueExpected.push_back(valueInit[i]);
- rightExpected.push_back(found->second);
+ expectedKey.push_back(leftKeyInit[i]);
+ expectedSubkey.push_back(leftSubkeyInit[i]);
+ expectedValue.push_back(leftValueInit[i]);
+ expectedRightValue.push_back(found->second);
}
}
- // 4. Convert input and expected TVectors to List<UV>.
- const auto [leftType, leftList] = ConvertVectorsToTuples(setup,
- keyInit, subkeyInit, valueInit);
- const auto [expectedType, expected] = ConvertVectorsToTuples(setup,
- keyExpected, subkeyExpected, valueExpected, rightExpected);
- // 5. Build "right" computation node.
- const auto rightKeys = BuildListNodes(pgmBuilder, rightKeyInit);
- const auto rightPayloads = BuildListNodes(pgmBuilder, rightPayloadInit);
- const auto rightMapNode = MakeDict(pgmBuilder, rightKeys, rightPayloads);
- // 6. Run tests.
- RunTestBlockJoin(setup, EJoinKind::Inner, expectedType, expected,
- rightMapNode, leftType, leftList, {0});
- }
- Y_UNIT_TEST(TestInnerMultiOn1) {
- TSetup<false> setup;
- TProgramBuilder& pgmBuilder = *setup.PgmBuilder;
- // 1. Make input for the "left" stream.
- TVector<ui64> keyInit(testSize);
- std::fill(keyInit.begin(), keyInit.end(), 1);
- TVector<ui64> subkeyInit;
- TVector<TString> valueInit;
- for (size_t i = 0; i < keyInit.size(); i++) {
- subkeyInit.push_back(i * 1001);
- valueInit.push_back(threeLetterValues[i]);
- }
- // 2. Make input for the "right" dict.
- const TVector<ui64> rightKeyInit({1});
- TVector<TString> rightPayload1Init({"1"});
- TVector<TString> rightPayload2Init({hugeString});
- // 3. Make "expected" data.
- TMap<ui64, TVector<TString>> rightMultiMap;
- for (size_t i = 0; i < rightKeyInit.size(); i++) {
- rightMultiMap[rightKeyInit[i]] = {rightPayload1Init[i], rightPayload2Init[i]};
- }
- TVector<ui64> keyExpected;
- TVector<ui64> subkeyExpected;
- TVector<TString> valueExpected;
- TVector<TString> rightExpected;
- for (size_t i = 0; i < keyInit.size(); i++) {
- const auto& found = rightMultiMap.find(keyInit[i]);
- if (found != rightMultiMap.cend()) {
- for (const auto& right : found->second) {
- keyExpected.push_back(keyInit[i]);
- subkeyExpected.push_back(subkeyInit[i]);
- valueExpected.push_back(valueInit[i]);
- rightExpected.push_back(right);
- }
- }
- }
- // 4. Convert input and expected TVectors to List<UV>.
- const auto [leftType, leftList] = ConvertVectorsToTuples(setup,
- keyInit, subkeyInit, valueInit);
- const auto [expectedType, expected] = ConvertVectorsToTuples(setup,
- keyExpected, subkeyExpected, valueExpected, rightExpected);
- // 5. Build "right" computation node.
- const auto rightKeys = BuildListNodes(pgmBuilder, rightKeyInit);
- const auto rightPayloads = BuildListNodes(pgmBuilder, rightPayload1Init, rightPayload2Init);
- const auto rightMultiMapNode = MakeDict(pgmBuilder, rightKeys, rightPayloads);
- // 6. Run tests.
+ auto [leftType, leftList] = ConvertVectorsToTuples(setup,
+ leftKeyInit, leftSubkeyInit, leftValueInit);
+ auto [rightType, rightList] = ConvertVectorsToTuples(setup,
+ rightKey1Init, rightValueInit, rightKey2Init);
+ auto [expectedType, expected] = ConvertVectorsToTuples(setup,
+ expectedKey, expectedSubkey, expectedValue, expectedRightValue);
+
RunTestBlockJoin(setup, EJoinKind::Inner, expectedType, expected,
- rightMultiMapNode, leftType, leftList, {0});
+ leftType, std::move(leftList), {0, 1},
+ rightType, std::move(rightList), {0, 2},
+ {}, {0, 2}
+ );
}
- Y_UNIT_TEST(TestLeftOn1) {
- TSetup<false> setup;
- TProgramBuilder& pgmBuilder = *setup.PgmBuilder;
+ Y_UNIT_TEST(TestInnerJoinOutputSlicing) {
+ TSetup<false> setup(GetNodeFactory());
+
// 1. Make input for the "left" stream.
- TVector<ui64> keyInit(testSize);
- std::fill(keyInit.begin(), keyInit.end(), 1);
- TVector<ui64> subkeyInit;
- TVector<TString> valueInit;
- for (size_t i = 0; i < keyInit.size(); i++) {
- subkeyInit.push_back(i * 1001);
- valueInit.push_back(threeLetterValues[i]);
- }
- // 2. Make input for the "right" dict.
+ TVector<ui64> leftKeyInit(testSize);
+ std::fill(leftKeyInit.begin(), leftKeyInit.end(), 1);
+ TVector<ui64> leftSubkeyInit;
+ std::transform(leftKeyInit.cbegin(), leftKeyInit.cend(), std::back_inserter(leftSubkeyInit),
+ [](const auto key) { return key * 1001; });
+ TVector<TString> leftValueInit;
+ std::transform(leftKeyInit.cbegin(), leftKeyInit.cend(), std::back_inserter(leftValueInit),
+ [](const auto key) { return threeLetterValues[key]; });
+
+ // 2. Make input for the "right" stream.
+ // Huge string is used to make less rows fit into one block
const TVector<ui64> rightKeyInit({1});
- TVector<TString> rightPayloadInit({hugeString});
+ TVector<TString> rightValueInit({hugeString});
+
// 3. Make "expected" data.
TMap<ui64, TString> rightMap;
for (size_t i = 0; i < rightKeyInit.size(); i++) {
- rightMap[rightKeyInit[i]] = rightPayloadInit[i];
- }
- TVector<ui64> keyExpected;
- TVector<ui64> subkeyExpected;
- TVector<TString> valueExpected;
- TVector<std::optional<TString>> rightExpected;
- for (size_t i = 0; i < keyInit.size(); i++) {
- keyExpected.push_back(keyInit[i]);
- subkeyExpected.push_back(subkeyInit[i]);
- valueExpected.push_back(valueInit[i]);
- const auto& found = rightMap.find(keyInit[i]);
+ rightMap[rightKeyInit[i]] = rightValueInit[i];
+ }
+ TVector<ui64> expectedKey;
+ TVector<ui64> expectedSubkey;
+ TVector<TString> expectedValue;
+ TVector<TString> expectedRightValue;
+ for (size_t i = 0; i < leftKeyInit.size(); i++) {
+ const auto& found = rightMap.find(leftKeyInit[i]);
if (found != rightMap.cend()) {
- rightExpected.push_back(found->second);
- } else {
- rightExpected.push_back(std::nullopt);
+ expectedKey.push_back(leftKeyInit[i]);
+ expectedSubkey.push_back(leftSubkeyInit[i]);
+ expectedValue.push_back(leftValueInit[i]);
+ expectedRightValue.push_back(found->second);
}
}
- // 4. Convert input and expected TVectors to List<UV>.
- const auto [leftType, leftList] = ConvertVectorsToTuples(setup,
- keyInit, subkeyInit, valueInit);
- const auto [expectedType, expected] = ConvertVectorsToTuples(setup,
- keyExpected, subkeyExpected, valueExpected, rightExpected);
- // 5. Build "right" computation node.
- const auto rightKeys = BuildListNodes(pgmBuilder, rightKeyInit);
- const auto rightPayloads = BuildListNodes(pgmBuilder, rightPayloadInit);
- const auto rightMapNode = MakeDict(pgmBuilder, rightKeys, rightPayloads);
- // 6. Run tests.
- RunTestBlockJoin(setup, EJoinKind::Left, expectedType, expected,
- rightMapNode, leftType, leftList, {0});
+
+ auto [leftType, leftList] = ConvertVectorsToTuples(setup,
+ leftKeyInit, leftSubkeyInit, leftValueInit);
+ auto [rightType, rightList] = ConvertVectorsToTuples(setup,
+ rightKeyInit, rightValueInit);
+ auto [expectedType, expected] = ConvertVectorsToTuples(setup,
+ expectedKey, expectedSubkey, expectedValue, expectedRightValue);
+
+ RunTestBlockJoin(setup, EJoinKind::Inner, expectedType, expected,
+ leftType, std::move(leftList), {0},
+ rightType, std::move(rightList), {0},
+ {}, {0}
+ );
}
- Y_UNIT_TEST(TestLeftMultiOn1) {
- TSetup<false> setup;
- TProgramBuilder& pgmBuilder = *setup.PgmBuilder;
+ Y_UNIT_TEST(TestInnerJoinHugeIterator) {
+ TSetup<false> setup(GetNodeFactory());
+
// 1. Make input for the "left" stream.
- TVector<ui64> keyInit(testSize);
- std::fill(keyInit.begin(), keyInit.end(), 1);
- TVector<ui64> subkeyInit;
- TVector<TString> valueInit;
- for (size_t i = 0; i < keyInit.size(); i++) {
- subkeyInit.push_back(i * 1001);
- valueInit.push_back(threeLetterValues[i]);
- }
- // 2. Make input for the "right" dict.
- const TVector<ui64> rightKeyInit({1});
- TVector<TString> rightPayload1Init({"1"});
- TVector<TString> rightPayload2Init({hugeString});
+ TVector<ui64> leftKeyInit({1});
+ TVector<ui64> leftSubkeyInit({1001});
+ TVector<TString> leftValueInit({threeLetterValues[1]});
+
+ // 2. Make input for the "right" stream.
+ // Huge string is used to make less rows fit into one block
+ TVector<ui64> rightKeyInit(1 << 16);
+ std::fill(rightKeyInit.begin(), rightKeyInit.end(), 1);
+ TVector<TString> rightValueInit;
+ std::transform(rightKeyInit.cbegin(), rightKeyInit.cend(), std::back_inserter(rightValueInit),
+ [](const auto& key) { return std::to_string(key); });
+
// 3. Make "expected" data.
- TMap<ui64, TVector<TString>> rightMultiMap;
+ TMultiMap<ui64, TString> rightMap;
for (size_t i = 0; i < rightKeyInit.size(); i++) {
- rightMultiMap[rightKeyInit[i]] = {rightPayload1Init[i], rightPayload2Init[i]};
+ rightMap.insert({rightKeyInit[i], rightValueInit[i]});
}
- TVector<ui64> keyExpected;
- TVector<ui64> subkeyExpected;
- TVector<TString> valueExpected;
- TVector<std::optional<TString>> rightExpected;
- for (size_t i = 0; i < keyInit.size(); i++) {
- const auto& found = rightMultiMap.find(keyInit[i]);
- if (found != rightMultiMap.cend()) {
- for (const auto& right : found->second) {
- keyExpected.push_back(keyInit[i]);
- subkeyExpected.push_back(subkeyInit[i]);
- valueExpected.push_back(valueInit[i]);
- rightExpected.push_back(right);
- }
- } else {
- keyExpected.push_back(keyInit[i]);
- subkeyExpected.push_back(subkeyInit[i]);
- valueExpected.push_back(valueInit[i]);
- rightExpected.push_back(std::nullopt);
+ TVector<ui64> expectedKey;
+ TVector<ui64> expectedSubkey;
+ TVector<TString> expectedValue;
+ TVector<TString> expectedRightValue;
+ for (size_t i = 0; i < leftKeyInit.size(); i++) {
+ const auto& [begin, end] = rightMap.equal_range(leftKeyInit[i]);
+ for (auto it = begin; it != end; it++) {
+ expectedKey.push_back(leftKeyInit[i]);
+ expectedSubkey.push_back(leftSubkeyInit[i]);
+ expectedValue.push_back(leftValueInit[i]);
+ expectedRightValue.push_back(it->second);
}
}
- // 4. Convert input and expected TVectors to List<UV>.
- const auto [leftType, leftList] = ConvertVectorsToTuples(setup,
- keyInit, subkeyInit, valueInit);
- const auto [expectedType, expected] = ConvertVectorsToTuples(setup,
- keyExpected, subkeyExpected, valueExpected, rightExpected);
- // 5. Build "right" computation node.
- const auto rightKeys = BuildListNodes(pgmBuilder, rightKeyInit);
- const auto rightPayloads = BuildListNodes(pgmBuilder, rightPayload1Init, rightPayload2Init);
- const auto rightMultiMapNode = MakeDict(pgmBuilder, rightKeys, rightPayloads);
- // 6. Run tests.
- RunTestBlockJoin(setup, EJoinKind::Left, expectedType, expected,
- rightMultiMapNode, leftType, leftList, {0});
- }
- Y_UNIT_TEST(TestLeftSemiOn1) {
- TSetup<false> setup;
- TProgramBuilder& pgmBuilder = *setup.PgmBuilder;
- // 1. Make input for the "left" stream.
- TVector<ui64> keyInit(testSize);
- std::fill(keyInit.begin(), keyInit.end(), 1);
- TVector<ui64> subkeyInit;
- TVector<TString> valueInit;
- for (size_t i = 0; i < keyInit.size(); i++) {
- subkeyInit.push_back(i * 1001);
- valueInit.push_back(threeLetterValues[i]);
- }
- // 2. Make input for the "right" dict.
- const TVector<ui64> rightKeyInit({1});
- // 3. Make "expected" data.
- TSet<ui64> rightSet(rightKeyInit.cbegin(), rightKeyInit.cend());
- TVector<ui64> keyExpected;
- TVector<ui64> subkeyExpected;
- TVector<TString> valueExpected;
- for (size_t i = 0; i < keyInit.size(); i++) {
- if (rightSet.contains(keyInit[i])) {
- keyExpected.push_back(keyInit[i]);
- subkeyExpected.push_back(subkeyInit[i]);
- valueExpected.push_back(valueInit[i]);
- }
- }
- // 4. Convert input and expected TVectors to List<UV>.
- const auto [leftType, leftList] = ConvertVectorsToTuples(setup,
- keyInit, subkeyInit, valueInit);
- const auto [expectedType, expected] = ConvertVectorsToTuples(setup,
- keyExpected, subkeyExpected, valueExpected);
- // 5. Build "right" computation node.
- const auto rightKeys = BuildListNodes(pgmBuilder, rightKeyInit);
- const auto rightSetNode = MakeSet(pgmBuilder, rightKeys);
- // 6. Run tests.
- RunTestBlockJoin(setup, EJoinKind::LeftSemi, expectedType, expected,
- rightSetNode, leftType, leftList, {0});
- }
+ auto [leftType, leftList] = ConvertVectorsToTuples(setup,
+ leftKeyInit, leftSubkeyInit, leftValueInit);
+ auto [rightType, rightList] = ConvertVectorsToTuples(setup,
+ rightKeyInit, rightValueInit);
+ auto [expectedType, expected] = ConvertVectorsToTuples(setup,
+ expectedKey, expectedSubkey, expectedValue, expectedRightValue);
- Y_UNIT_TEST(TestLeftOnlyOn1) {
- TSetup<false> setup;
- TProgramBuilder& pgmBuilder = *setup.PgmBuilder;
- // 1. Make input for the "left" stream.
- TVector<ui64> keyInit(testSize);
- std::fill(keyInit.begin(), keyInit.end(), 1);
- TVector<ui64> subkeyInit;
- TVector<TString> valueInit;
- for (size_t i = 0; i < keyInit.size(); i++) {
- subkeyInit.push_back(i * 1001);
- valueInit.push_back(threeLetterValues[i]);
- }
- // 2. Make input for the "right" dict.
- const TVector<ui64> rightKeyInit({1});
- // 3. Make "expected" data.
- TSet<ui64> rightSet(rightKeyInit.cbegin(), rightKeyInit.cend());
- TVector<ui64> keyExpected;
- TVector<ui64> subkeyExpected;
- TVector<TString> valueExpected;
- for (size_t i = 0; i < keyInit.size(); i++) {
- if (!rightSet.contains(keyInit[i])) {
- keyExpected.push_back(keyInit[i]);
- subkeyExpected.push_back(subkeyInit[i]);
- valueExpected.push_back(valueInit[i]);
- }
- }
- // 4. Convert input and expected TVectors to List<UV>.
- const auto [leftType, leftList] = ConvertVectorsToTuples(setup,
- keyInit, subkeyInit, valueInit);
- const auto [expectedType, expected] = ConvertVectorsToTuples(setup,
- keyExpected, subkeyExpected, valueExpected);
- // 5. Build "right" computation node.
- const auto rightKeys = BuildListNodes(pgmBuilder, rightKeyInit);
- const auto rightSetNode = MakeSet(pgmBuilder, rightKeys);
- // 6. Run tests.
- RunTestBlockJoin(setup, EJoinKind::LeftOnly, expectedType, expected,
- rightSetNode, leftType, leftList, {0});
+ RunTestBlockJoin(setup, EJoinKind::Inner, expectedType, expected,
+ leftType, std::move(leftList), {0},
+ rightType, std::move(rightList), {0},
+ {}, {0}
+ );
}
} // Y_UNIT_TEST_SUITE
-Y_UNIT_TEST_SUITE(TMiniKQLBlockMapJoinDropKeyColumns) {
-
+Y_UNIT_TEST_SUITE(TMiniKQLBlockMapJoinTestOptional) {
constexpr size_t testSize = 1 << 14;
constexpr size_t valueSize = 3;
static const TVector<TString> threeLetterValues = GenerateValues(valueSize);
- static const TSet<ui64> fibonacci = GenerateFibonacci(21);
+ static const TSet<ui64> fibonacci = GenerateFibonacci(testSize);
+
+ Y_UNIT_TEST(TestInnerJoin) {
+ TSetup<false> setup(GetNodeFactory());
- Y_UNIT_TEST(TestInnerOnUint64) {
- TSetup<false> setup;
- TProgramBuilder& pgmBuilder = *setup.PgmBuilder;
// 1. Make input for the "left" stream.
- TVector<ui64> keyInit(testSize);
- std::iota(keyInit.begin(), keyInit.end(), 1);
- TVector<ui64> subkeyInit;
- std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(subkeyInit),
- [](const auto key) { return key * 1001; });
- TVector<TString> valueInit;
- std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(valueInit),
- [](const auto key) { return threeLetterValues[key]; });
- // 2. Make input for the "right" dict.
- const TVector<ui64> rightKeyInit(fibonacci.cbegin(), fibonacci.cend());
- TVector<TString> rightPayloadInit;
- std::transform(rightKeyInit.cbegin(), rightKeyInit.cend(), std::back_inserter(rightPayloadInit),
- [](const auto key) { return std::to_string(key); });
- // 3. Make "expected" data.
- TMap<ui64, TString> rightMap;
+ TVector<std::optional<ui64>> leftKeyInit(testSize);
+ std::iota(leftKeyInit.begin(), leftKeyInit.end(), 1);
+ TVector<ui64> leftSubkeyInit;
+ std::transform(leftKeyInit.cbegin(), leftKeyInit.cend(), std::back_inserter(leftSubkeyInit),
+ [](const auto key) { return *key * 1001; });
+ TVector<std::optional<TString>> leftValueInit;
+ std::transform(leftKeyInit.cbegin(), leftKeyInit.cend(), std::back_inserter(leftValueInit),
+ [](const auto key) { return threeLetterValues[*key]; });
+
+ // 2. Make input for the "right" stream.
+ TVector<std::optional<ui64>> rightKeyInit(fibonacci.cbegin(), fibonacci.cend());
+ TVector<std::optional<TString>> rightValueInit;
+ std::transform(rightKeyInit.cbegin(), rightKeyInit.cend(), std::back_inserter(rightValueInit),
+ [](const auto key) { return std::to_string(*key); });
+
+ // 3. Add some NULLs
+ leftKeyInit[0] = leftKeyInit[2] = std::nullopt;
+ rightKeyInit[2] = rightKeyInit[3] = std::nullopt;
+
+ leftValueInit[1] = leftValueInit[11] = leftValueInit[41] = std::nullopt;
+ rightValueInit[2] = rightValueInit[12] = rightValueInit[42] = std::nullopt;
+
+ // 4. Make "expected" data.
+ TMap<ui64, std::optional<TString>> rightMap;
for (size_t i = 0; i < rightKeyInit.size(); i++) {
- rightMap[rightKeyInit[i]] = rightPayloadInit[i];
+ if (rightKeyInit[i].has_value()) {
+ rightMap[*rightKeyInit[i]] = rightValueInit[i];
+ }
}
- TVector<ui64> subkeyExpected;
- TVector<TString> valueExpected;
- TVector<TString> rightExpected;
- for (size_t i = 0; i < keyInit.size(); i++) {
- const auto& found = rightMap.find(keyInit[i]);
+ TVector<std::optional<ui64>> expectedLeftKey;
+ TVector<ui64> expectedSubkey;
+ TVector<std::optional<TString>> expectedValue;
+ TVector<std::optional<ui64>> expectedRightKey;
+ TVector<std::optional<TString>> expectedRightValue;
+ for (size_t i = 0; i < leftKeyInit.size(); i++) {
+ if (!leftKeyInit[i]) {
+ continue;
+ }
+ const auto& found = rightMap.find(*leftKeyInit[i]);
if (found != rightMap.cend()) {
- subkeyExpected.push_back(subkeyInit[i]);
- valueExpected.push_back(valueInit[i]);
- rightExpected.push_back(found->second);
+ expectedLeftKey.push_back(leftKeyInit[i]);
+ expectedSubkey.push_back(leftSubkeyInit[i]);
+ expectedValue.push_back(leftValueInit[i]);
+ expectedRightKey.push_back(found->first);
+ expectedRightValue.push_back(found->second);
}
}
- // 4. Convert input and expected TVectors to List<UV>.
- const auto [leftType, leftList] = ConvertVectorsToTuples(setup,
- keyInit, subkeyInit, valueInit);
- const auto [expectedType, expected] = ConvertVectorsToTuples(setup,
- subkeyExpected, valueExpected, rightExpected);
- // 5. Build "right" computation node.
- const auto rightKeys = BuildListNodes(pgmBuilder, rightKeyInit);
- const auto rightPayloads = BuildListNodes(pgmBuilder, rightPayloadInit);
- const auto rightMapNode = MakeDict(pgmBuilder, rightKeys, rightPayloads);
- // 6. Run tests.
+
+ auto [leftType, leftList] = ConvertVectorsToTuples(setup,
+ leftKeyInit, leftSubkeyInit, leftValueInit);
+ auto [rightType, rightList] = ConvertVectorsToTuples(setup,
+ rightKeyInit, rightValueInit);
+ auto [expectedType, expected] = ConvertVectorsToTuples(setup,
+ expectedLeftKey, expectedSubkey, expectedValue, expectedRightKey, expectedRightValue);
+
RunTestBlockJoin(setup, EJoinKind::Inner, expectedType, expected,
- rightMapNode, leftType, leftList, {0}, {0});
+ leftType, std::move(leftList), {0},
+ rightType, std::move(rightList), {0}
+ );
}
- Y_UNIT_TEST(TestInnerMultiOnUint64) {
- TSetup<false> setup;
- TProgramBuilder& pgmBuilder = *setup.PgmBuilder;
+ Y_UNIT_TEST(TestLeftJoin) {
+ TSetup<false> setup(GetNodeFactory());
+
// 1. Make input for the "left" stream.
- TVector<ui64> keyInit(testSize);
- std::iota(keyInit.begin(), keyInit.end(), 1);
- TVector<ui64> subkeyInit;
- std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(subkeyInit),
- [](const auto key) { return key * 1001; });
- TVector<TString> valueInit;
- std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(valueInit),
- [](const auto key) { return threeLetterValues[key]; });
- // 2. Make input for the "right" dict.
- const TVector<ui64> rightKeyInit(fibonacci.cbegin(), fibonacci.cend());
- TVector<TString> rightPayload1Init;
- std::transform(rightKeyInit.cbegin(), rightKeyInit.cend(), std::back_inserter(rightPayload1Init),
- [](const auto key) { return std::to_string(key); });
- TVector<TString> rightPayload2Init;
- std::transform(rightKeyInit.cbegin(), rightKeyInit.cend(), std::back_inserter(rightPayload2Init),
- [](const auto key) { return std::to_string(key * 1001); });
- // 3. Make "expected" data.
- TMap<ui64, TVector<TString>> rightMultiMap;
+ TVector<std::optional<ui64>> leftKeyInit(testSize);
+ std::iota(leftKeyInit.begin(), leftKeyInit.end(), 1);
+ TVector<ui64> leftSubkeyInit;
+ std::transform(leftKeyInit.cbegin(), leftKeyInit.cend(), std::back_inserter(leftSubkeyInit),
+ [](const auto key) { return *key * 1001; });
+ TVector<std::optional<TString>> leftValueInit;
+ std::transform(leftKeyInit.cbegin(), leftKeyInit.cend(), std::back_inserter(leftValueInit),
+ [](const auto key) { return threeLetterValues[*key]; });
+
+ // 2. Make input for the "right" stream.
+ TVector<std::optional<ui64>> rightKeyInit(fibonacci.cbegin(), fibonacci.cend());
+ TVector<std::optional<TString>> rightValueInit;
+ std::transform(rightKeyInit.cbegin(), rightKeyInit.cend(), std::back_inserter(rightValueInit),
+ [](const auto key) { return std::to_string(*key); });
+
+ // 3. Add some NULLs
+ leftKeyInit[0] = leftKeyInit[2] = std::nullopt;
+ rightKeyInit[2] = rightKeyInit[3] = std::nullopt;
+
+ leftValueInit[1] = leftValueInit[11] = leftValueInit[41] = std::nullopt;
+ rightValueInit[2] = rightValueInit[12] = rightValueInit[42] = std::nullopt;
+
+ // 4. Make "expected" data.
+ TMap<ui64, std::optional<TString>> rightMap;
for (size_t i = 0; i < rightKeyInit.size(); i++) {
- rightMultiMap[rightKeyInit[i]] = {rightPayload1Init[i], rightPayload2Init[i]};
- }
- TVector<ui64> subkeyExpected;
- TVector<TString> valueExpected;
- TVector<TString> rightExpected;
- for (size_t i = 0; i < keyInit.size(); i++) {
- const auto& found = rightMultiMap.find(keyInit[i]);
- if (found != rightMultiMap.cend()) {
- for (const auto& right : found->second) {
- subkeyExpected.push_back(subkeyInit[i]);
- valueExpected.push_back(valueInit[i]);
- rightExpected.push_back(right);
- }
+ if (rightKeyInit[i].has_value()) {
+ rightMap[*rightKeyInit[i]] = rightValueInit[i];
}
}
- // 4. Convert input and expected TVectors to List<UV>.
- const auto [leftType, leftList] = ConvertVectorsToTuples(setup,
- keyInit, subkeyInit, valueInit);
- const auto [expectedType, expected] = ConvertVectorsToTuples(setup,
- subkeyExpected, valueExpected, rightExpected);
- // 5. Build "right" computation node.
- const auto rightKeys = BuildListNodes(pgmBuilder, rightKeyInit);
- const auto rightPayloads = BuildListNodes(pgmBuilder, rightPayload1Init, rightPayload2Init);
- const auto rightMultiMapNode = MakeDict(pgmBuilder, rightKeys, rightPayloads);
- // 6. Run tests.
- RunTestBlockJoin(setup, EJoinKind::Inner, expectedType, expected,
- rightMultiMapNode, leftType, leftList, {0}, {0});
- }
+ TVector<std::optional<ui64>> expectedKey;
+ TVector<ui64> expectedSubkey;
+ TVector<std::optional<TString>> expectedValue;
+ TVector<std::optional<ui64>> expectedRightKey;
+ TVector<std::optional<TString>> expectedRightValue;
+ for (size_t i = 0; i < leftKeyInit.size(); i++) {
+ expectedKey.push_back(leftKeyInit[i]);
+ expectedSubkey.push_back(leftSubkeyInit[i]);
+ expectedValue.push_back(leftValueInit[i]);
- Y_UNIT_TEST(TestLeftOnUint64) {
- TSetup<false> setup;
- TProgramBuilder& pgmBuilder = *setup.PgmBuilder;
- // 1. Make input for the "left" stream.
- TVector<ui64> keyInit(testSize);
- std::iota(keyInit.begin(), keyInit.end(), 1);
- TVector<ui64> subkeyInit;
- std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(subkeyInit),
- [](const auto key) { return key * 1001; });
- TVector<TString> valueInit;
- std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(valueInit),
- [](const auto key) { return threeLetterValues[key]; });
- // 2. Make input for the "right" dict.
- const TVector<ui64> rightKeyInit(fibonacci.cbegin(), fibonacci.cend());
- TVector<TString> rightPayloadInit;
- std::transform(rightKeyInit.cbegin(), rightKeyInit.cend(), std::back_inserter(rightPayloadInit),
- [](const auto key) { return std::to_string(key); });
- // 3. Make "expected" data.
- TMap<ui64, TString> rightMap;
- for (size_t i = 0; i < rightKeyInit.size(); i++) {
- rightMap[rightKeyInit[i]] = rightPayloadInit[i];
- }
- TVector<ui64> subkeyExpected;
- TVector<TString> valueExpected;
- TVector<std::optional<TString>> rightExpected;
- for (size_t i = 0; i < keyInit.size(); i++) {
- subkeyExpected.push_back(subkeyInit[i]);
- valueExpected.push_back(valueInit[i]);
- const auto& found = rightMap.find(keyInit[i]);
+ const auto& found = rightMap.find(*leftKeyInit[i]);
if (found != rightMap.cend()) {
- rightExpected.push_back(found->second);
+ expectedRightKey.push_back(found->first);
+ expectedRightValue.push_back(found->second);
} else {
- rightExpected.push_back(std::nullopt);
+ expectedRightKey.push_back(std::nullopt);
+ expectedRightValue.push_back(std::nullopt);
}
}
- // 4. Convert input and expected TVectors to List<UV>.
- const auto [leftType, leftList] = ConvertVectorsToTuples(setup,
- keyInit, subkeyInit, valueInit);
- const auto [expectedType, expected] = ConvertVectorsToTuples(setup,
- subkeyExpected, valueExpected, rightExpected);
- // 5. Build "right" computation node.
- const auto rightKeys = BuildListNodes(pgmBuilder, rightKeyInit);
- const auto rightPayloads = BuildListNodes(pgmBuilder, rightPayloadInit);
- const auto rightMapNode = MakeDict(pgmBuilder, rightKeys, rightPayloads);
- // 6. Run tests.
+
+ auto [leftType, leftList] = ConvertVectorsToTuples(setup,
+ leftKeyInit, leftSubkeyInit, leftValueInit);
+ auto [rightType, rightList] = ConvertVectorsToTuples(setup,
+ rightKeyInit, rightValueInit);
+ auto [expectedType, expected] = ConvertVectorsToTuples(setup,
+ expectedKey, expectedSubkey, expectedValue, expectedRightKey, expectedRightValue);
+
RunTestBlockJoin(setup, EJoinKind::Left, expectedType, expected,
- rightMapNode, leftType, leftList, {0}, {0});
+ leftType, std::move(leftList), {0},
+ rightType, std::move(rightList), {0}
+ );
}
- Y_UNIT_TEST(TestLeftMultiOnUint64) {
- TSetup<false> setup;
- TProgramBuilder& pgmBuilder = *setup.PgmBuilder;
+ Y_UNIT_TEST(TestLeftSemiJoin) {
+ TSetup<false> setup(GetNodeFactory());
+
// 1. Make input for the "left" stream.
- TVector<ui64> keyInit(testSize);
- std::iota(keyInit.begin(), keyInit.end(), 1);
- TVector<ui64> subkeyInit;
- std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(subkeyInit),
- [](const auto key) { return key * 1001; });
- TVector<TString> valueInit;
- std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(valueInit),
- [](const auto key) { return threeLetterValues[key]; });
- // 2. Make input for the "right" dict.
- const TVector<ui64> rightKeyInit(fibonacci.cbegin(), fibonacci.cend());
- TVector<TString> rightPayload1Init;
- std::transform(rightKeyInit.cbegin(), rightKeyInit.cend(), std::back_inserter(rightPayload1Init),
- [](const auto key) { return std::to_string(key); });
- TVector<TString> rightPayload2Init;
- std::transform(rightKeyInit.cbegin(), rightKeyInit.cend(), std::back_inserter(rightPayload2Init),
- [](const auto key) { return std::to_string(key * 1001); });
- // 3. Make "expected" data.
- TMap<ui64, TVector<TString>> rightMultiMap;
+ TVector<std::optional<ui64>> leftKeyInit(testSize);
+ std::iota(leftKeyInit.begin(), leftKeyInit.end(), 1);
+ TVector<ui64> leftSubkeyInit;
+ std::transform(leftKeyInit.cbegin(), leftKeyInit.cend(), std::back_inserter(leftSubkeyInit),
+ [](const auto key) { return *key * 1001; });
+ TVector<std::optional<TString>> leftValueInit;
+ std::transform(leftKeyInit.cbegin(), leftKeyInit.cend(), std::back_inserter(leftValueInit),
+ [](const auto key) { return threeLetterValues[*key]; });
+
+ // 2. Make input for the "right" stream.
+ TVector<std::optional<ui64>> rightKeyInit(fibonacci.cbegin(), fibonacci.cend());
+
+ // 3. Add some NULLs
+ leftKeyInit[0] = leftKeyInit[2] = std::nullopt;
+ rightKeyInit[2] = rightKeyInit[3] = std::nullopt;
+ leftValueInit[1] = leftValueInit[11] = leftValueInit[41] = std::nullopt;
+
+ // 4. Make "expected" data.
+ TSet<ui64> rightSet;
for (size_t i = 0; i < rightKeyInit.size(); i++) {
- rightMultiMap[rightKeyInit[i]] = {rightPayload1Init[i], rightPayload2Init[i]};
- }
- TVector<ui64> subkeyExpected;
- TVector<TString> valueExpected;
- TVector<std::optional<TString>> rightExpected;
- for (size_t i = 0; i < keyInit.size(); i++) {
- const auto& found = rightMultiMap.find(keyInit[i]);
- if (found != rightMultiMap.cend()) {
- for (const auto& right : found->second) {
- subkeyExpected.push_back(subkeyInit[i]);
- valueExpected.push_back(valueInit[i]);
- rightExpected.push_back(right);
- }
- } else {
- subkeyExpected.push_back(subkeyInit[i]);
- valueExpected.push_back(valueInit[i]);
- rightExpected.push_back(std::nullopt);
+ if (rightKeyInit[i].has_value()) {
+ rightSet.insert(*rightKeyInit[i]);
}
}
- // 4. Convert input and expected TVectors to List<UV>.
- const auto [leftType, leftList] = ConvertVectorsToTuples(setup,
- keyInit, subkeyInit, valueInit);
- const auto [expectedType, expected] = ConvertVectorsToTuples(setup,
- subkeyExpected, valueExpected, rightExpected);
- // 5. Build "right" computation node.
- const auto rightKeys = BuildListNodes(pgmBuilder, rightKeyInit);
- const auto rightPayloads = BuildListNodes(pgmBuilder, rightPayload1Init, rightPayload2Init);
- const auto rightMultiMapNode = MakeDict(pgmBuilder, rightKeys, rightPayloads);
- // 6. Run tests.
- RunTestBlockJoin(setup, EJoinKind::Left, expectedType, expected,
- rightMultiMapNode, leftType, leftList, {0}, {0});
- }
-
- Y_UNIT_TEST(TestLeftSemiOnUint64) {
- TSetup<false> setup;
- TProgramBuilder& pgmBuilder = *setup.PgmBuilder;
- // 1. Make input for the "left" stream.
- TVector<ui64> keyInit(testSize);
- std::iota(keyInit.begin(), keyInit.end(), 1);
- TVector<ui64> subkeyInit;
- std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(subkeyInit),
- [](const auto key) { return key * 1001; });
- TVector<TString> valueInit;
- std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(valueInit),
- [](const auto key) { return threeLetterValues[key]; });
- // 2. Make input for the "right" dict.
- const TVector<ui64> rightKeyInit(fibonacci.cbegin(), fibonacci.cend());
- // 3. Make "expected" data.
- TSet<ui64> rightSet(rightKeyInit.cbegin(), rightKeyInit.cend());
- TVector<ui64> subkeyExpected;
- TVector<TString> valueExpected;
- for (size_t i = 0; i < keyInit.size(); i++) {
- if (rightSet.contains(keyInit[i])) {
- subkeyExpected.push_back(subkeyInit[i]);
- valueExpected.push_back(valueInit[i]);
+ TVector<std::optional<ui64>> expectedKey;
+ TVector<ui64> expectedSubkey;
+ TVector<std::optional<TString>> expectedValue;
+ for (size_t i = 0; i < leftKeyInit.size(); i++) {
+ if (!leftKeyInit[i]) {
+ continue;
+ }
+ if (rightSet.contains(*leftKeyInit[i])) {
+ expectedKey.push_back(leftKeyInit[i]);
+ expectedSubkey.push_back(leftSubkeyInit[i]);
+ expectedValue.push_back(leftValueInit[i]);
}
}
- // 4. Convert input and expected TVectors to List<UV>.
- const auto [leftType, leftList] = ConvertVectorsToTuples(setup,
- keyInit, subkeyInit, valueInit);
- const auto [expectedType, expected] = ConvertVectorsToTuples(setup,
- subkeyExpected, valueExpected);
- // 5. Build "right" computation node.
- const auto rightKeys = BuildListNodes(pgmBuilder, rightKeyInit);
- const auto rightSetNode = MakeSet(pgmBuilder, rightKeys);
- // 6. Run tests.
+
+ auto [leftType, leftList] = ConvertVectorsToTuples(setup,
+ leftKeyInit, leftSubkeyInit, leftValueInit);
+ auto [rightType, rightList] = ConvertVectorsToTuples(setup, rightKeyInit);
+ auto [expectedType, expected] = ConvertVectorsToTuples(setup,
+ expectedKey, expectedSubkey, expectedValue);
+
RunTestBlockJoin(setup, EJoinKind::LeftSemi, expectedType, expected,
- rightSetNode, leftType, leftList, {0}, {0});
+ leftType, std::move(leftList), {0},
+ rightType, std::move(rightList), {0}
+ );
}
- Y_UNIT_TEST(TestLeftOnlyOnUint64) {
- TSetup<false> setup;
- TProgramBuilder& pgmBuilder = *setup.PgmBuilder;
- // 1. Make input for the "left" stream.
- TVector<ui64> keyInit(testSize);
- std::iota(keyInit.begin(), keyInit.end(), 1);
- TVector<ui64> subkeyInit;
- std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(subkeyInit),
- [](const auto key) { return key * 1001; });
- TVector<TString> valueInit;
- std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(valueInit),
- [](const auto key) { return threeLetterValues[key]; });
- // 2. Make input for the "right" dict.
- const TVector<ui64> rightKeyInit(fibonacci.cbegin(), fibonacci.cend());
- // 3. Make "expected" data.
- TSet<ui64> rightSet(rightKeyInit.cbegin(), rightKeyInit.cend());
- TVector<ui64> subkeyExpected;
- TVector<TString> valueExpected;
- for (size_t i = 0; i < keyInit.size(); i++) {
- if (!rightSet.contains(keyInit[i])) {
- subkeyExpected.push_back(subkeyInit[i]);
- valueExpected.push_back(valueInit[i]);
- }
- }
- // 4. Convert input and expected TVectors to List<UV>.
- const auto [leftType, leftList] = ConvertVectorsToTuples(setup,
- keyInit, subkeyInit, valueInit);
- const auto [expectedType, expected] = ConvertVectorsToTuples(setup,
- subkeyExpected, valueExpected);
- // 5. Build "right" computation node.
- const auto rightKeys = BuildListNodes(pgmBuilder, rightKeyInit);
- const auto rightSetNode = MakeSet(pgmBuilder, rightKeys);
- // 6. Run tests.
- RunTestBlockJoin(setup, EJoinKind::LeftOnly, expectedType, expected,
- rightSetNode, leftType, leftList, {0}, {0});
- }
+ Y_UNIT_TEST(TestLeftOnlyJoin) {
+ TSetup<false> setup(GetNodeFactory());
-} // Y_UNIT_TEST_SUITE
+ // 1. Make input for the "left" stream.
+ TVector<std::optional<ui64>> leftKeyInit(testSize);
+ std::iota(leftKeyInit.begin(), leftKeyInit.end(), 1);
+ TVector<ui64> leftSubkeyInit;
+ std::transform(leftKeyInit.cbegin(), leftKeyInit.cend(), std::back_inserter(leftSubkeyInit),
+ [](const auto key) { return *key * 1001; });
+ TVector<std::optional<TString>> leftValueInit;
+ std::transform(leftKeyInit.cbegin(), leftKeyInit.cend(), std::back_inserter(leftValueInit),
+ [](const auto key) { return threeLetterValues[*key]; });
-Y_UNIT_TEST_SUITE(TMiniKQLBlockMapJoinMultiKeyBasicTest) {
+ // 2. Make input for the "right" stream.
+ TVector<std::optional<ui64>> rightKeyInit(fibonacci.cbegin(), fibonacci.cend());
- constexpr size_t testSize = 1 << 14;
- constexpr size_t valueSize = 3;
- static const TVector<TString> threeLetterValues = GenerateValues(valueSize);
- static const TSet<ui64> fibonacci = GenerateFibonacci(21);
+ // 3. Add some NULLs
+ leftKeyInit[0] = leftKeyInit[2] = std::nullopt;
+ rightKeyInit[2] = rightKeyInit[3] = std::nullopt;
+ leftValueInit[1] = leftValueInit[11] = leftValueInit[41] = std::nullopt;
- Y_UNIT_TEST(TestInnerOnUint64Uint64) {
- TSetup<false> setup;
- TProgramBuilder& pgmBuilder = *setup.PgmBuilder;
- // 1. Make input for the "left" stream.
- TVector<ui64> keyInit(testSize);
- std::iota(keyInit.begin(), keyInit.end(), 1);
- TVector<ui64> subkeyInit;
- std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(subkeyInit),
- [](const auto key) { return key * 1001; });
- TVector<TString> valueInit;
- std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(valueInit),
- [](const auto key) { return threeLetterValues[key]; });
- // 2. Make input for the "right" dict.
- TVector<ui64> rightKey1Init(fibonacci.cbegin(), fibonacci.cend());
- TVector<ui64> rightKey2Init;
- std::transform(rightKey1Init.cbegin(), rightKey1Init.cend(), std::back_inserter(rightKey2Init),
- [](const auto& key) { return key * 1001; });
- TVector<TString> rightPayloadInit;
- std::transform(rightKey1Init.cbegin(), rightKey1Init.cend(), std::back_inserter(rightPayloadInit),
- [](const auto& key) { return std::to_string(key); });
- // 3. Make "expected" data.
- TMap<std::tuple<ui64, ui64>, TString> rightMap;
- for (size_t i = 0; i < rightKey1Init.size(); i++) {
- const auto key = std::make_tuple(rightKey1Init[i], rightKey2Init[i]);
- rightMap[key] = rightPayloadInit[i];
- }
- TVector<ui64> keyExpected;
- TVector<ui64> subkeyExpected;
- TVector<TString> valueExpected;
- TVector<TString> rightExpected;
- for (size_t i = 0; i < keyInit.size(); i++) {
- const auto key = std::make_tuple(keyInit[i], subkeyInit[i]);
- const auto found = rightMap.find(key);
- if (found != rightMap.cend()) {
- keyExpected.push_back(keyInit[i]);
- subkeyExpected.push_back(subkeyInit[i]);
- valueExpected.push_back(valueInit[i]);
- rightExpected.push_back(found->second);
+ // 4. Make "expected" data.
+ TSet<ui64> rightSet;
+ for (size_t i = 0; i < rightKeyInit.size(); i++) {
+ if (rightKeyInit[i].has_value()) {
+ rightSet.insert(*rightKeyInit[i]);
}
}
- // 4. Convert input and expected TVectors to List<UV>.
- const auto [leftType, leftList] = ConvertVectorsToTuples(setup,
- keyInit, subkeyInit, valueInit);
- const auto [expectedType, expected] = ConvertVectorsToTuples(setup,
- keyExpected, subkeyExpected, valueExpected, rightExpected);
- // 5. Build "right" computation node.
- const auto rightKeys = BuildListNodes(pgmBuilder, rightKey1Init, rightKey2Init);
- const auto rightPayloads = BuildListNodes(pgmBuilder, rightPayloadInit);
- const auto rightMapNode = MakeDict(pgmBuilder, rightKeys, rightPayloads);
- // 6. Run tests.
- RunTestBlockJoin(setup, EJoinKind::Inner, expectedType, expected,
- rightMapNode, leftType, leftList, {0, 1});
- }
-
- Y_UNIT_TEST(TestInnerMultiOnUint64Uint64) {
- TSetup<false> setup;
- TProgramBuilder& pgmBuilder = *setup.PgmBuilder;
- // 1. Make input for the "left" stream.
- TVector<ui64> keyInit(testSize);
- std::iota(keyInit.begin(), keyInit.end(), 1);
- TVector<ui64> subkeyInit;
- std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(subkeyInit),
- [](const auto key) { return key * 1001; });
- TVector<TString> valueInit;
- std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(valueInit),
- [](const auto key) { return threeLetterValues[key]; });
- // 2. Make input for the "right" dict.
- TVector<ui64> rightKey1Init(fibonacci.cbegin(), fibonacci.cend());
- TVector<ui64> rightKey2Init;
- std::transform(rightKey1Init.cbegin(), rightKey1Init.cend(), std::back_inserter(rightKey2Init),
- [](const auto& key) { return key * 1001; });
- TVector<TString> rightPayload1Init;
- std::transform(rightKey1Init.cbegin(), rightKey1Init.cend(), std::back_inserter(rightPayload1Init),
- [](const auto& key) { return std::to_string(key); });
- TVector<TString> rightPayload2Init;
- std::transform(rightKey2Init.cbegin(), rightKey2Init.cend(), std::back_inserter(rightPayload2Init),
- [](const auto& key) { return std::to_string(key); });
- // 3. Make "expected" data.
- TMap<std::tuple<ui64, ui64>, TVector<TString>> rightMultiMap;
- for (size_t i = 0; i < rightKey1Init.size(); i++) {
- const auto key = std::make_tuple(rightKey1Init[i], rightKey2Init[i]);
- rightMultiMap[key] = {rightPayload1Init[i], rightPayload2Init[i]};
- }
- TVector<ui64> keyExpected;
- TVector<ui64> subkeyExpected;
- TVector<TString> valueExpected;
- TVector<TString> rightExpected;
- for (size_t i = 0; i < keyInit.size(); i++) {
- const auto key = std::make_tuple(keyInit[i], subkeyInit[i]);
- const auto found = rightMultiMap.find(key);
- if (found != rightMultiMap.cend()) {
- for (const auto& right : found->second) {
- keyExpected.push_back(keyInit[i]);
- subkeyExpected.push_back(subkeyInit[i]);
- valueExpected.push_back(valueInit[i]);
- rightExpected.push_back(right);
- }
+ TVector<std::optional<ui64>> expectedKey;
+ TVector<ui64> expectedSubkey;
+ TVector<std::optional<TString>> expectedValue;
+ for (size_t i = 0; i < leftKeyInit.size(); i++) {
+ if (!leftKeyInit[i] || !rightSet.contains(*leftKeyInit[i])) {
+ expectedKey.push_back(leftKeyInit[i]);
+ expectedSubkey.push_back(leftSubkeyInit[i]);
+ expectedValue.push_back(leftValueInit[i]);
}
}
- // 4. Convert input and expected TVectors to List<UV>.
- const auto [leftType, leftList] = ConvertVectorsToTuples(setup,
- keyInit, subkeyInit, valueInit);
- const auto [expectedType, expected] = ConvertVectorsToTuples(setup,
- keyExpected, subkeyExpected, valueExpected, rightExpected);
- // 5. Build "right" computation node.
- const auto rightKeys = BuildListNodes(pgmBuilder, rightKey1Init, rightKey2Init);
- const auto rightPayloads = BuildListNodes(pgmBuilder, rightPayload1Init, rightPayload2Init);
- const auto rightMultiMapNode = MakeDict(pgmBuilder, rightKeys, rightPayloads);
- // 6. Run tests.
- RunTestBlockJoin(setup, EJoinKind::Inner, expectedType, expected,
- rightMultiMapNode, leftType, leftList, {0, 1});
+
+ auto [leftType, leftList] = ConvertVectorsToTuples(setup,
+ leftKeyInit, leftSubkeyInit, leftValueInit);
+ auto [rightType, rightList] = ConvertVectorsToTuples(setup, rightKeyInit);
+ auto [expectedType, expected] = ConvertVectorsToTuples(setup,
+ expectedKey, expectedSubkey, expectedValue);
+
+ RunTestBlockJoin(setup, EJoinKind::LeftOnly, expectedType, expected,
+ leftType, std::move(leftList), {0},
+ rightType, std::move(rightList), {0}
+ );
}
- Y_UNIT_TEST(TestLeftOnUint64Uint64) {
- TSetup<false> setup;
- TProgramBuilder& pgmBuilder = *setup.PgmBuilder;
+ Y_UNIT_TEST(TestKeyTuple) {
+ TSetup<false> setup(GetNodeFactory());
+
// 1. Make input for the "left" stream.
- TVector<ui64> keyInit(testSize);
- std::iota(keyInit.begin(), keyInit.end(), 1);
- TVector<ui64> subkeyInit;
- std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(subkeyInit),
- [](const auto key) { return key * 1001; });
- TVector<TString> valueInit;
- std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(valueInit),
- [](const auto key) { return threeLetterValues[key]; });
- // 2. Make input for the "right" dict.
- TVector<ui64> rightKey1Init(fibonacci.cbegin(), fibonacci.cend());
- TVector<ui64> rightKey2Init;
- std::transform(rightKey1Init.cbegin(), rightKey1Init.cend(), std::back_inserter(rightKey2Init),
- [](const auto& key) { return key * 1001; });
- TVector<TString> rightPayloadInit;
- std::transform(rightKey1Init.cbegin(), rightKey1Init.cend(), std::back_inserter(rightPayloadInit),
- [](const auto& key) { return std::to_string(key); });
- // 3. Make "expected" data.
+ TVector<std::optional<ui64>> leftKey1Init(testSize);
+ std::iota(leftKey1Init.begin(), leftKey1Init.end(), 1);
+ TVector<std::optional<ui64>> leftKey2Init(testSize);
+ std::iota(leftKey2Init.begin(), leftKey2Init.end(), 1);
+ TVector<TString> leftValueInit;
+ std::transform(leftKey1Init.cbegin(), leftKey1Init.cend(), std::back_inserter(leftValueInit),
+ [](const auto key) { return threeLetterValues[*key]; });
+
+ // 2. Make input for the "right" stream.
+ TVector<std::optional<ui64>> rightKey1Init(fibonacci.cbegin(), fibonacci.cend());
+ TVector<std::optional<ui64>> rightKey2Init(fibonacci.cbegin(), fibonacci.cend());
+ TVector<TString> rightValueInit;
+ std::transform(rightKey1Init.cbegin(), rightKey1Init.cend(), std::back_inserter(rightValueInit),
+ [](const auto key) { return std::to_string(*key); });
+
+ // 3. Add some NULLs
+ leftKey1Init[0] = leftKey1Init[1] = std::nullopt;
+ leftKey2Init[1] = std::nullopt;
+ rightKey1Init[1] = rightKey1Init[2] = std::nullopt;
+ rightKey2Init[2] = std::nullopt;
+
+ // 4. Make "expected" data.
TMap<std::tuple<ui64, ui64>, TString> rightMap;
for (size_t i = 0; i < rightKey1Init.size(); i++) {
- const auto key = std::make_tuple(rightKey1Init[i], rightKey2Init[i]);
- rightMap[key] = rightPayloadInit[i];
- }
- TVector<ui64> keyExpected;
- TVector<ui64> subkeyExpected;
- TVector<TString> valueExpected;
- TVector<std::optional<TString>> rightExpected;
- for (size_t i = 0; i < keyInit.size(); i++) {
- keyExpected.push_back(keyInit[i]);
- subkeyExpected.push_back(subkeyInit[i]);
- valueExpected.push_back(valueInit[i]);
- const auto key = std::make_tuple(keyInit[i], subkeyInit[i]);
- const auto found = rightMap.find(key);
+ if (rightKey1Init[i].has_value() && rightKey2Init[i].has_value()) {
+ const auto key = std::make_tuple(*rightKey1Init[i], *rightKey2Init[i]);
+ rightMap[key] = rightValueInit[i];
+ }
+ }
+ TVector<std::optional<ui64>> expectedLeftKey1;
+ TVector<std::optional<ui64>> expectedLeftKey2;
+ TVector<TString> expectedValue;
+ TVector<std::optional<ui64>> expectedRightKey1;
+ TVector<std::optional<ui64>> expectedRightKey2;
+ TVector<TString> expectedRightValue;
+ for (size_t i = 0; i < leftKey1Init.size(); i++) {
+ if (!leftKey1Init[i] || !leftKey2Init[i]) {
+ continue;
+ }
+ const auto key = std::make_tuple(*leftKey1Init[i], *leftKey2Init[i]);
+ const auto& found = rightMap.find(key);
if (found != rightMap.cend()) {
- rightExpected.push_back(found->second);
- } else {
- rightExpected.push_back(std::nullopt);
+ expectedLeftKey1.push_back(leftKey1Init[i]);
+ expectedLeftKey2.push_back(leftKey2Init[i]);
+ expectedValue.push_back(leftValueInit[i]);
+ expectedRightKey1.push_back(std::get<0>(found->first));
+ expectedRightKey2.push_back(std::get<1>(found->first));
+ expectedRightValue.push_back(found->second);
}
}
- // 4. Convert input and expected TVectors to List<UV>.
- const auto [leftType, leftList] = ConvertVectorsToTuples(setup,
- keyInit, subkeyInit, valueInit);
- const auto [expectedType, expected] = ConvertVectorsToTuples(setup,
- keyExpected, subkeyExpected, valueExpected, rightExpected);
- // 5. Build "right" computation node.
- const auto rightKeys = BuildListNodes(pgmBuilder, rightKey1Init, rightKey2Init);
- const auto rightPayloads = BuildListNodes(pgmBuilder, rightPayloadInit);
- const auto rightMapNode = MakeDict(pgmBuilder, rightKeys, rightPayloads);
- // 6. Run tests.
- RunTestBlockJoin(setup, EJoinKind::Left, expectedType, expected,
- rightMapNode, leftType, leftList, {0, 1});
- }
- Y_UNIT_TEST(TestLeftMultiOnUint64Uint64) {
- TSetup<false> setup;
- TProgramBuilder& pgmBuilder = *setup.PgmBuilder;
- // 1. Make input for the "left" stream.
- TVector<ui64> keyInit(testSize);
- std::iota(keyInit.begin(), keyInit.end(), 1);
- TVector<ui64> subkeyInit;
- std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(subkeyInit),
- [](const auto key) { return key * 1001; });
- TVector<TString> valueInit;
- std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(valueInit),
- [](const auto key) { return threeLetterValues[key]; });
- // 2. Make input for the "right" dict.
- TVector<ui64> rightKey1Init(fibonacci.cbegin(), fibonacci.cend());
- TVector<ui64> rightKey2Init;
- std::transform(rightKey1Init.cbegin(), rightKey1Init.cend(), std::back_inserter(rightKey2Init),
- [](const auto& key) { return key * 1001; });
- TVector<TString> rightPayload1Init;
- std::transform(rightKey1Init.cbegin(), rightKey1Init.cend(), std::back_inserter(rightPayload1Init),
- [](const auto& key) { return std::to_string(key); });
- TVector<TString> rightPayload2Init;
- std::transform(rightKey2Init.cbegin(), rightKey2Init.cend(), std::back_inserter(rightPayload2Init),
- [](const auto& key) { return std::to_string(key); });
- // 3. Make "expected" data.
- TMap<std::tuple<ui64, ui64>, TVector<TString>> rightMultiMap;
- for (size_t i = 0; i < rightKey1Init.size(); i++) {
- const auto key = std::make_tuple(rightKey1Init[i], rightKey2Init[i]);
- rightMultiMap[key] = {rightPayload1Init[i], rightPayload2Init[i]};
- }
- TVector<ui64> keyExpected;
- TVector<ui64> subkeyExpected;
- TVector<TString> valueExpected;
- TVector<std::optional<TString>> rightExpected;
- for (size_t i = 0; i < keyInit.size(); i++) {
- const auto key = std::make_tuple(keyInit[i], subkeyInit[i]);
- const auto found = rightMultiMap.find(key);
- if (found != rightMultiMap.cend()) {
- for (const auto& right : found->second) {
- keyExpected.push_back(keyInit[i]);
- subkeyExpected.push_back(subkeyInit[i]);
- valueExpected.push_back(valueInit[i]);
- rightExpected.push_back(right);
- }
- } else {
- keyExpected.push_back(keyInit[i]);
- subkeyExpected.push_back(subkeyInit[i]);
- valueExpected.push_back(valueInit[i]);
- rightExpected.push_back(std::nullopt);
- }
- }
- // 4. Convert input and expected TVectors to List<UV>.
- const auto [leftType, leftList] = ConvertVectorsToTuples(setup,
- keyInit, subkeyInit, valueInit);
- const auto [expectedType, expected] = ConvertVectorsToTuples(setup,
- keyExpected, subkeyExpected, valueExpected, rightExpected);
- // 5. Build "right" computation node.
- const auto rightKeys = BuildListNodes(pgmBuilder, rightKey1Init, rightKey2Init);
- const auto rightPayloads = BuildListNodes(pgmBuilder, rightPayload1Init, rightPayload2Init);
- const auto rightMultiMapNode = MakeDict(pgmBuilder, rightKeys, rightPayloads);
- // 6. Run tests.
- RunTestBlockJoin(setup, EJoinKind::Left, expectedType, expected,
- rightMultiMapNode, leftType, leftList, {0, 1});
- }
-
- Y_UNIT_TEST(TestLeftSemiOnUint64Uint64) {
- TSetup<false> setup;
- TProgramBuilder& pgmBuilder = *setup.PgmBuilder;
- // 1. Make input for the "left" stream.
- TVector<ui64> keyInit(testSize);
- std::iota(keyInit.begin(), keyInit.end(), 1);
- TVector<ui64> subkeyInit;
- std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(subkeyInit),
- [](const auto key) { return key * 1001; });
- TVector<TString> valueInit;
- std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(valueInit),
- [](const auto key) { return threeLetterValues[key]; });
- // 2. Make input for the "right" dict.
- TVector<ui64> rightKey1Init(fibonacci.cbegin(), fibonacci.cend());
- TVector<ui64> rightKey2Init;
- std::transform(rightKey1Init.cbegin(), rightKey1Init.cend(), std::back_inserter(rightKey2Init),
- [](const auto& key) { return key * 1001; });
- // 3. Make "expected" data.
- TSet<std::tuple<ui64, ui64>> rightSet;
- for (size_t i = 0; i < rightKey1Init.size(); i++) {
- rightSet.emplace(std::make_tuple(rightKey1Init[i], rightKey2Init[i]));
- }
- TVector<ui64> keyExpected;
- TVector<ui64> subkeyExpected;
- TVector<TString> valueExpected;
- for (size_t i = 0; i < keyInit.size(); i++) {
- const auto key = std::make_tuple(keyInit[i], subkeyInit[i]);
- if (rightSet.contains(key)) {
- keyExpected.push_back(keyInit[i]);
- subkeyExpected.push_back(subkeyInit[i]);
- valueExpected.push_back(valueInit[i]);
- }
- }
- // 4. Convert input and expected TVectors to List<UV>.
- const auto [leftType, leftList] = ConvertVectorsToTuples(setup,
- keyInit, subkeyInit, valueInit);
- const auto [expectedType, expected] = ConvertVectorsToTuples(setup,
- keyExpected, subkeyExpected, valueExpected);
- // 5. Build "right" computation node.
- const auto rightKeys = BuildListNodes(pgmBuilder, rightKey1Init, rightKey2Init);
- const auto rightSetNode = MakeSet(pgmBuilder, rightKeys);
- // 6. Run tests.
- RunTestBlockJoin(setup, EJoinKind::LeftSemi, expectedType, expected,
- rightSetNode, leftType, leftList, {0, 1});
- }
+ auto [leftType, leftList] = ConvertVectorsToTuples(setup,
+ leftKey1Init, leftKey2Init, leftValueInit);
+ auto [rightType, rightList] = ConvertVectorsToTuples(setup,
+ rightKey1Init, rightKey2Init, rightValueInit);
+ auto [expectedType, expected] = ConvertVectorsToTuples(setup,
+ expectedLeftKey1, expectedLeftKey2, expectedValue, expectedRightKey1, expectedRightKey2, expectedRightValue);
- Y_UNIT_TEST(TestLeftOnlyOnUint64Uint64) {
- TSetup<false> setup;
- TProgramBuilder& pgmBuilder = *setup.PgmBuilder;
- // 1. Make input for the "left" stream.
- TVector<ui64> keyInit(testSize);
- std::iota(keyInit.begin(), keyInit.end(), 1);
- TVector<ui64> subkeyInit;
- std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(subkeyInit),
- [](const auto key) { return key * 1001; });
- TVector<TString> valueInit;
- std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(valueInit),
- [](const auto key) { return threeLetterValues[key]; });
- // 2. Make input for the "right" dict.
- TVector<ui64> rightKey1Init(fibonacci.cbegin(), fibonacci.cend());
- TVector<ui64> rightKey2Init;
- std::transform(rightKey1Init.cbegin(), rightKey1Init.cend(), std::back_inserter(rightKey2Init),
- [](const auto& key) { return key * 1001; });
- // 3. Make "expected" data.
- TSet<std::tuple<ui64, ui64>> rightSet;
- for (size_t i = 0; i < rightKey1Init.size(); i++) {
- rightSet.emplace(std::make_tuple(rightKey1Init[i], rightKey2Init[i]));
- }
- TVector<ui64> keyExpected;
- TVector<ui64> subkeyExpected;
- TVector<TString> valueExpected;
- for (size_t i = 0; i < keyInit.size(); i++) {
- const auto key = std::make_tuple(keyInit[i], subkeyInit[i]);
- if (!rightSet.contains(key)) {
- keyExpected.push_back(keyInit[i]);
- subkeyExpected.push_back(subkeyInit[i]);
- valueExpected.push_back(valueInit[i]);
- }
- }
- // 4. Convert input and expected TVectors to List<UV>.
- const auto [leftType, leftList] = ConvertVectorsToTuples(setup,
- keyInit, subkeyInit, valueInit);
- const auto [expectedType, expected] = ConvertVectorsToTuples(setup,
- keyExpected, subkeyExpected, valueExpected);
- // 5. Build "right" computation node.
- const auto rightKeys = BuildListNodes(pgmBuilder, rightKey1Init, rightKey2Init);
- const auto rightSetNode = MakeSet(pgmBuilder, rightKeys);
- // 6. Run tests.
- RunTestBlockJoin(setup, EJoinKind::LeftOnly, expectedType, expected,
- rightSetNode, leftType, leftList, {0, 1});
+ RunTestBlockJoin(setup, EJoinKind::Inner, expectedType, expected,
+ leftType, std::move(leftList), {0, 1},
+ rightType, std::move(rightList), {0, 1}
+ );
}
} // Y_UNIT_TEST_SUITE
diff --git a/yql/essentials/minikql/comp_nodes/ut/mkql_block_map_join_ut_utils.cpp b/yql/essentials/minikql/comp_nodes/ut/mkql_block_map_join_ut_utils.cpp
new file mode 100644
index 0000000000..2a65da92ed
--- /dev/null
+++ b/yql/essentials/minikql/comp_nodes/ut/mkql_block_map_join_ut_utils.cpp
@@ -0,0 +1,331 @@
+#include "mkql_block_map_join_ut_utils.h"
+
+#include <yql/essentials/minikql/computation/mkql_block_impl.h>
+#include <yql/essentials/minikql/computation/mkql_block_reader.h>
+#include <yql/essentials/minikql/computation/mkql_computation_node_holders.h>
+#include <yql/essentials/minikql/mkql_node_cast.h>
+#include <yql/essentials/public/udf/arrow/args_dechunker.h>
+#include <yql/essentials/public/udf/arrow/block_builder.h>
+
+namespace NKikimr {
+namespace NMiniKQL {
+
+namespace {
+
+class TWideStreamThrottlerWrapper: public TMutableComputationNode<TWideStreamThrottlerWrapper> {
+ typedef TMutableComputationNode<TWideStreamThrottlerWrapper> TBaseComputation;
+public:
+ class TStreamValue : public TComputationValue<TStreamValue> {
+ public:
+ using TBase = TComputationValue<TStreamValue>;
+
+ TStreamValue(TMemoryUsageInfo* memInfo, NUdf::TUnboxedValue origStream)
+ : TBase(memInfo)
+ , OrigStream_(std::move(origStream))
+ {
+ }
+
+ private:
+ NUdf::EFetchStatus WideFetch(NUdf::TUnboxedValue* output, ui32 width) {
+ if (Counter_++ % 3) {
+ return NUdf::EFetchStatus::Yield;
+ }
+
+ TUnboxedValueVector items(width);
+ switch (OrigStream_.WideFetch(items.data(), width)) {
+ case NUdf::EFetchStatus::Yield:
+ return NUdf::EFetchStatus::Yield;
+ case NUdf::EFetchStatus::Ok:
+ for (size_t i = 0; i < width; i++) {
+ output[i] = std::move(items[i]);
+ }
+ return NUdf::EFetchStatus::Ok;
+ case NUdf::EFetchStatus::Finish:
+ return NUdf::EFetchStatus::Finish;
+ }
+ }
+
+ private:
+ NUdf::TUnboxedValue OrigStream_;
+ size_t Counter_ = 0;
+ };
+
+ TWideStreamThrottlerWrapper(TComputationMutables& mutables, IComputationNode* origStream)
+ : TBaseComputation(mutables)
+ , OrigStream_(origStream)
+ {
+ }
+
+ NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const {
+ return ctx.HolderFactory.Create<TStreamValue>(OrigStream_->GetValue(ctx));
+ }
+
+private:
+ void RegisterDependencies() const final {
+ DependsOn(OrigStream_);
+ }
+
+private:
+ IComputationNode* OrigStream_;
+};
+
+class TWideStreamDethrottlerWrapper: public TMutableComputationNode<TWideStreamDethrottlerWrapper> {
+ typedef TMutableComputationNode<TWideStreamDethrottlerWrapper> TBaseComputation;
+public:
+ class TStreamValue : public TComputationValue<TStreamValue> {
+ public:
+ using TBase = TComputationValue<TStreamValue>;
+
+ TStreamValue(TMemoryUsageInfo* memInfo, NUdf::TUnboxedValue origStream)
+ : TBase(memInfo)
+ , OrigStream_(std::move(origStream))
+ {
+ }
+
+ private:
+ NUdf::EFetchStatus WideFetch(NUdf::TUnboxedValue* output, ui32 width) {
+ TUnboxedValueVector items(width);
+ for (;;) {
+ switch (OrigStream_.WideFetch(items.data(), width)) {
+ case NUdf::EFetchStatus::Yield:
+ continue;
+ case NUdf::EFetchStatus::Ok:
+ for (size_t i = 0; i < width; i++) {
+ output[i] = std::move(items[i]);
+ }
+ return NUdf::EFetchStatus::Ok;
+ case NUdf::EFetchStatus::Finish:
+ return NUdf::EFetchStatus::Finish;
+ }
+ }
+ }
+
+ private:
+ NUdf::TUnboxedValue OrigStream_;
+ };
+
+ TWideStreamDethrottlerWrapper(TComputationMutables& mutables, IComputationNode* origStream)
+ : TBaseComputation(mutables)
+ , OrigStream_(origStream)
+ {
+ }
+
+ NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const {
+ return ctx.HolderFactory.Create<TStreamValue>(OrigStream_->GetValue(ctx));
+ }
+
+private:
+ void RegisterDependencies() const final {
+ DependsOn(OrigStream_);
+ }
+
+private:
+ IComputationNode* OrigStream_;
+};
+
+IComputationNode* WrapWideStreamThrottler(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
+ MKQL_ENSURE(callable.GetInputsCount() == 1, "Expected 1 arg");
+ const auto origStream = LocateNode(ctx.NodeLocator, callable, 0);
+ return new TWideStreamThrottlerWrapper(ctx.Mutables, origStream);
+}
+
+IComputationNode* WrapWideStreamDethrottler(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
+ MKQL_ENSURE(callable.GetInputsCount() == 1, "Expected 1 arg");
+ const auto origStream = LocateNode(ctx.NodeLocator, callable, 0);
+ return new TWideStreamDethrottlerWrapper(ctx.Mutables, origStream);
+}
+
+}
+
+TType* MakeBlockTupleType(TProgramBuilder& pgmBuilder, TType* tupleType) {
+ const auto itemTypes = AS_TYPE(TTupleType, tupleType)->GetElements();
+ const auto ui64Type = pgmBuilder.NewDataType(NUdf::TDataType<ui64>::Id);
+ const auto blockLenType = pgmBuilder.NewBlockType(ui64Type, TBlockType::EShape::Scalar);
+
+ TVector<TType*> blockItemTypes;
+ std::transform(itemTypes.cbegin(), itemTypes.cend(), std::back_inserter(blockItemTypes),
+ [&](const auto& itemType) {
+ return pgmBuilder.NewBlockType(itemType, TBlockType::EShape::Many);
+ });
+ // XXX: Mind the last block length column.
+ blockItemTypes.push_back(blockLenType);
+
+ return pgmBuilder.NewTupleType(blockItemTypes);
+}
+
+NUdf::TUnboxedValuePod ToBlocks(TComputationContext& ctx, size_t blockSize,
+ const TArrayRef<TType* const> types, const NUdf::TUnboxedValuePod& values
+) {
+ const auto maxLength = CalcBlockLen(std::accumulate(types.cbegin(), types.cend(), 0ULL,
+ [](size_t max, const TType* type) {
+ return std::max(max, CalcMaxBlockItemSize(type));
+ }));
+ TVector<std::unique_ptr<NUdf::IArrayBuilder>> builders;
+ std::transform(types.cbegin(), types.cend(), std::back_inserter(builders),
+ [&](const auto& type) {
+ return MakeArrayBuilder(TTypeInfoHelper(), type, ctx.ArrowMemoryPool,
+ maxLength, &ctx.Builder->GetPgBuilder());
+ });
+
+ const auto& holderFactory = ctx.HolderFactory;
+ const size_t width = types.size();
+ const size_t total = values.GetListLength();
+ NUdf::TUnboxedValue iterator = values.GetListIterator();
+ NUdf::TUnboxedValue current;
+ size_t converted = 0;
+ TDefaultListRepresentation listValues;
+ while (converted < total) {
+ for (size_t i = 0; i < blockSize && iterator.Next(current); i++, converted++) {
+ for (size_t j = 0; j < builders.size(); j++) {
+ const NUdf::TUnboxedValuePod& item = current.GetElement(j);
+ builders[j]->Add(item);
+ }
+ }
+ std::vector<arrow::Datum> batch;
+ batch.reserve(width);
+ for (size_t i = 0; i < width; i++) {
+ batch.emplace_back(builders[i]->Build(converted >= total));
+ }
+
+ NUdf::TArgsDechunker dechunker(std::move(batch));
+ std::vector<arrow::Datum> chunk;
+ ui64 chunkLen = 0;
+ while (dechunker.Next(chunk, chunkLen)) {
+ NUdf::TUnboxedValue* items = nullptr;
+ const auto tuple = holderFactory.CreateDirectArrayHolder(width + 1, items);
+ for (size_t i = 0; i < width; i++) {
+ items[i] = holderFactory.CreateArrowBlock(std::move(chunk[i]));
+ }
+ items[width] = MakeBlockCount(holderFactory, chunkLen);
+
+ listValues = listValues.Append(std::move(tuple));
+ }
+ }
+ return holderFactory.CreateDirectListHolder(std::move(listValues));
+}
+
+NUdf::TUnboxedValuePod FromBlocks(TComputationContext& ctx,
+ const TArrayRef<TType* const> types, const NUdf::TUnboxedValuePod& values
+) {
+ TVector<std::unique_ptr<IBlockReader>> readers;
+ TVector<std::unique_ptr<IBlockItemConverter>> converters;
+ for (const auto& type : types) {
+ const auto blockItemType = AS_TYPE(TBlockType, type)->GetItemType();
+ readers.push_back(MakeBlockReader(TTypeInfoHelper(), blockItemType));
+ converters.push_back(MakeBlockItemConverter(TTypeInfoHelper(), blockItemType,
+ ctx.Builder->GetPgBuilder()));
+ }
+
+ const auto& holderFactory = ctx.HolderFactory;
+ const size_t width = types.size() - 1;
+ TDefaultListRepresentation listValues;
+ NUdf::TUnboxedValue iterator = values.GetListIterator();
+ NUdf::TUnboxedValue current;
+ while (iterator.Next(current)) {
+ const auto blockLengthValue = current.GetElement(width);
+ const auto blockLengthDatum = TArrowBlock::From(blockLengthValue).GetDatum();
+ Y_ENSURE(blockLengthDatum.is_scalar());
+ const auto blockLength = blockLengthDatum.scalar_as<arrow::UInt64Scalar>().value;
+ for (size_t i = 0; i < blockLength; i++) {
+ NUdf::TUnboxedValue* items = nullptr;
+ const auto tuple = holderFactory.CreateDirectArrayHolder(width, items);
+ for (size_t j = 0; j < width; j++) {
+ const auto arrayValue = current.GetElement(j);
+ const auto arrayDatum = TArrowBlock::From(arrayValue).GetDatum();
+ UNIT_ASSERT(arrayDatum.is_array());
+ const auto blockItem = readers[j]->GetItem(*arrayDatum.array(), i);
+ items[j] = converters[j]->MakeValue(blockItem, holderFactory);
+ }
+ listValues = listValues.Append(std::move(tuple));
+ }
+ }
+ return holderFactory.CreateDirectListHolder(std::move(listValues));
+}
+
+TComputationNodeFactory GetNodeFactory() {
+ return [](TCallable& callable, const TComputationNodeFactoryContext& ctx) -> IComputationNode* {
+ if (callable.GetType()->GetName() == "WideStreamThrottler") {
+ return WrapWideStreamThrottler(callable, ctx);
+ } else if (callable.GetType()->GetName() == "WideStreamDethrottler") {
+ return WrapWideStreamDethrottler(callable, ctx);
+ }
+ return GetBuiltinFactory()(callable, ctx);
+ };
+}
+
+TRuntimeNode ThrottleStream(TProgramBuilder& pgmBuilder, TRuntimeNode stream) {
+ TCallableBuilder callableBuilder(pgmBuilder.GetTypeEnvironment(), "WideStreamThrottler", stream.GetStaticType());
+ callableBuilder.Add(stream);
+ return TRuntimeNode(callableBuilder.Build(), false);
+}
+
+TRuntimeNode DethrottleStream(TProgramBuilder& pgmBuilder, TRuntimeNode stream) {
+ TCallableBuilder callableBuilder(pgmBuilder.GetTypeEnvironment(), "WideStreamDethrottler", stream.GetStaticType());
+ callableBuilder.Add(stream);
+ return TRuntimeNode(callableBuilder.Build(), false);
+}
+
+TVector<NUdf::TUnboxedValue> ConvertListToVector(const NUdf::TUnboxedValue& list) {
+ NUdf::TUnboxedValue current;
+ NUdf::TUnboxedValue iterator = list.GetListIterator();
+ TVector<NUdf::TUnboxedValue> items;
+ while (iterator.Next(current)) {
+ items.push_back(current);
+ }
+ return items;
+}
+
+void CompareResults(const TType* type, const NUdf::TUnboxedValue& expected,
+ const NUdf::TUnboxedValue& got
+) {
+ const auto itemType = AS_TYPE(TListType, type)->GetItemType();
+ const NUdf::ICompare::TPtr compare = MakeCompareImpl(itemType);
+ const NUdf::IEquate::TPtr equate = MakeEquateImpl(itemType);
+ // XXX: Stub both keyTypes and isTuple arguments, since
+ // ICompare/IEquate are used.
+ TKeyTypes keyTypesStub;
+ bool isTupleStub = false;
+ const TValueLess valueLess(keyTypesStub, isTupleStub, compare.Get());
+ const TValueEqual valueEqual(keyTypesStub, isTupleStub, equate.Get());
+
+ auto expectedItems = ConvertListToVector(expected);
+ auto gotItems = ConvertListToVector(got);
+ UNIT_ASSERT_VALUES_EQUAL(expectedItems.size(), gotItems.size());
+ Sort(expectedItems, valueLess);
+ Sort(gotItems, valueLess);
+ for (size_t i = 0; i < expectedItems.size(); i++) {
+ UNIT_ASSERT(valueEqual(gotItems[i], expectedItems[i]));
+ }
+}
+
+TVector<TString> GenerateValues(size_t level) {
+ constexpr size_t alphaSize = 'Z' - 'A' + 1;
+ if (level == 1) {
+ TVector<TString> alphabet(alphaSize);
+ std::iota(alphabet.begin(), alphabet.end(), 'A');
+ return alphabet;
+ }
+ const auto subValues = GenerateValues(level - 1);
+ TVector<TString> values;
+ values.reserve(alphaSize * subValues.size());
+ for (char ch = 'A'; ch <= 'Z'; ch++) {
+ for (const auto& tail : subValues) {
+ values.emplace_back(ch + tail);
+ }
+ }
+ return values;
+}
+
+TSet<ui64> GenerateFibonacci(size_t count) {
+ TSet<ui64> fibSet;
+ ui64 a = 0, b = 1;
+ fibSet.insert(a);
+ while (count--) {
+ a = std::exchange(b, a + b);
+ fibSet.insert(b);
+ }
+ return fibSet;
+}
+
+} // namespace NMiniKQL
+} // namespace NKikimr
diff --git a/yql/essentials/minikql/comp_nodes/ut/mkql_block_map_join_ut_utils.h b/yql/essentials/minikql/comp_nodes/ut/mkql_block_map_join_ut_utils.h
new file mode 100644
index 0000000000..2c2f32b312
--- /dev/null
+++ b/yql/essentials/minikql/comp_nodes/ut/mkql_block_map_join_ut_utils.h
@@ -0,0 +1,113 @@
+#include "mkql_computation_node_ut.h"
+
+#include <yql/essentials/minikql/computation/mkql_computation_node_holders.h>
+
+namespace NKikimr {
+namespace NMiniKQL {
+
+inline bool IsOptionalOrNull(const TType* type) {
+ return type->IsOptional() || type->IsNull() || type->IsPg();
+}
+
+TType* MakeBlockTupleType(TProgramBuilder& pgmBuilder, TType* tupleType);
+
+NUdf::TUnboxedValuePod ToBlocks(TComputationContext& ctx, size_t blockSize,
+ const TArrayRef<TType* const> types, const NUdf::TUnboxedValuePod& values);
+NUdf::TUnboxedValuePod FromBlocks(TComputationContext& ctx,
+ const TArrayRef<TType* const> types, const NUdf::TUnboxedValuePod& values);
+
+TComputationNodeFactory GetNodeFactory();
+TRuntimeNode ThrottleStream(TProgramBuilder& pgmBuilder, TRuntimeNode stream);
+TRuntimeNode DethrottleStream(TProgramBuilder& pgmBuilder, TRuntimeNode stream);
+
+TVector<NUdf::TUnboxedValue> ConvertListToVector(const NUdf::TUnboxedValue& list);
+void CompareResults(const TType* type, const NUdf::TUnboxedValue& expected, const NUdf::TUnboxedValue& got);
+
+TVector<TString> GenerateValues(size_t level);
+TSet<ui64> GenerateFibonacci(size_t count);
+
+//
+// Auxiliary routines to build list nodes from the given vectors.
+//
+
+struct TTypeMapperBase {
+ TProgramBuilder& Pb;
+ TType* ItemType;
+ auto GetType() { return ItemType; }
+};
+
+template <typename Type>
+struct TTypeMapper: TTypeMapperBase {
+ TTypeMapper(TProgramBuilder& pb): TTypeMapperBase {pb, pb.NewDataType(NUdf::TDataType<Type>::Id) } {}
+ auto GetValue(const Type& value) {
+ return Pb.NewDataLiteral<Type>(value);
+ }
+};
+
+template <>
+struct TTypeMapper<TString>: TTypeMapperBase {
+ TTypeMapper(TProgramBuilder& pb): TTypeMapperBase {pb, pb.NewDataType(NUdf::EDataSlot::String)} {}
+ auto GetValue(const TString& value) {
+ return Pb.NewDataLiteral<NUdf::EDataSlot::String>(value);
+ }
+};
+
+template <typename TNested>
+class TTypeMapper<std::optional<TNested>>: TTypeMapper<TNested> {
+ using TBase = TTypeMapper<TNested>;
+public:
+ TTypeMapper(TProgramBuilder& pb): TBase(pb) {}
+ auto GetType() { return TBase::Pb.NewOptionalType(TBase::GetType()); }
+ auto GetValue(const std::optional<TNested>& value) {
+ if (value == std::nullopt) {
+ return TBase::Pb.NewEmptyOptional(GetType());
+ } else {
+ return TBase::Pb.NewOptional(TBase::GetValue(*value));
+ }
+ }
+};
+
+template<typename Type>
+const TVector<const TRuntimeNode> BuildListNodes(TProgramBuilder& pb,
+ const TVector<Type>& vector
+) {
+ TTypeMapper<Type> mapper(pb);
+
+ TRuntimeNode::TList listItems;
+ std::transform(vector.cbegin(), vector.cend(), std::back_inserter(listItems),
+ [&](const auto value) {
+ return mapper.GetValue(value);
+ });
+
+ return {pb.NewList(mapper.GetType(), listItems)};
+}
+
+template<typename Type, typename... Tail>
+const TVector<const TRuntimeNode> BuildListNodes(TProgramBuilder& pb,
+ const TVector<Type>& vector, Tail... vectors
+) {
+ const auto frontList = BuildListNodes(pb, vector);
+ const auto tailLists = BuildListNodes(pb, std::forward<Tail>(vectors)...);
+ TVector<const TRuntimeNode> lists;
+ lists.reserve(tailLists.size() + 1);
+ lists.push_back(frontList.front());;
+ for (const auto& list : tailLists) {
+ lists.push_back(list);
+ }
+ return lists;
+}
+
+template<typename... TVectors>
+const std::pair<TType*, NUdf::TUnboxedValue> ConvertVectorsToTuples(
+ TSetup<false>& setup, TVectors... vectors
+) {
+ TProgramBuilder& pb = *setup.PgmBuilder;
+ const auto lists = BuildListNodes(pb, std::forward<TVectors>(vectors)...);
+ const auto tuplesNode = pb.Zip(lists);
+ const auto tuplesNodeType = tuplesNode.GetStaticType();
+ const auto tuples = setup.BuildGraph(tuplesNode)->GetValue();
+ return std::make_pair(tuplesNodeType, tuples);
+}
+
+} // namespace NMiniKQL
+} // namespace NKikimr
diff --git a/yql/essentials/minikql/comp_nodes/ut/mkql_rh_hash_ut.cpp b/yql/essentials/minikql/comp_nodes/ut/mkql_rh_hash_ut.cpp
index c5f3c53ed4..bf68e92ea4 100644
--- a/yql/essentials/minikql/comp_nodes/ut/mkql_rh_hash_ut.cpp
+++ b/yql/essentials/minikql/comp_nodes/ut/mkql_rh_hash_ut.cpp
@@ -375,6 +375,90 @@ Y_UNIT_TEST_SUITE(TMiniKQLRobinHoodHashTest) {
Cerr << "maxDistance2: " << maxDistance2 << "\n";
UNIT_ASSERT(maxDistance2 < 20);
}
+
+ Y_UNIT_TEST(Lookup) {
+ TRobinHoodHashMap<i32> rh(sizeof(i64));
+ std::unordered_map<i32, i64> h;
+ for (ui64 i = 0; i < 10000; ++i) {
+ auto k = i % 1000;
+ auto [it, inserted] = h.emplace(k, 0);
+ bool isNew;
+ auto iter = rh.Insert(k, isNew);
+ UNIT_ASSERT_VALUES_EQUAL(rh.GetKey(iter), k);
+ UNIT_ASSERT_VALUES_EQUAL(isNew, inserted);
+ it->second += i;
+ if (isNew) {
+ *(i64*)rh.GetMutablePayload(iter) = i;
+ rh.CheckGrow();
+ } else {
+ *(i64*)rh.GetMutablePayload(iter) += i;
+ }
+
+ UNIT_ASSERT_VALUES_EQUAL(h.size(), rh.GetSize());
+ }
+
+ for (ui64 i = 0; i < 1000; ++i) {
+ auto iter = rh.Lookup(i);
+ auto hit = h.find(i);
+ UNIT_ASSERT_VALUES_EQUAL(*(i64*)rh.GetPayload(iter), hit->second);
+ h.erase(hit);
+ }
+
+ UNIT_ASSERT(h.empty());
+ }
+
+ Y_UNIT_TEST(BatchLookup) {
+ using THashTable = TRobinHoodHashMap<i32>;
+ THashTable rh(sizeof(i64));
+ std::unordered_map<i32, i64> h;
+ for (ui64 i = 0; i < 10000; ++i) {
+ auto k = i % 1000;
+ auto [it, inserted] = h.emplace(k, 0);
+ bool isNew;
+ auto iter = rh.Insert(k, isNew);
+ UNIT_ASSERT_VALUES_EQUAL(rh.GetKey(iter), k);
+ UNIT_ASSERT_VALUES_EQUAL(isNew, inserted);
+ it->second += i;
+ if (isNew) {
+ *(i64*)rh.GetMutablePayload(iter) = i;
+ rh.CheckGrow();
+ } else {
+ *(i64*)rh.GetMutablePayload(iter) += i;
+ }
+
+ UNIT_ASSERT_VALUES_EQUAL(h.size(), rh.GetSize());
+ }
+
+ std::array<TRobinHoodBatchRequestItem<i32>, PrefetchBatchSize> batch;
+ std::array<ui64, PrefetchBatchSize> batchI;
+ ui32 batchLen = 0;
+
+ auto processBatch = [&]() {
+ rh.BatchLookup({batch.data(), batchLen}, [&](size_t i, THashTable::iterator iter) {
+ auto key = batchI[i];
+ auto hit = h.find(key);
+ UNIT_ASSERT_VALUES_EQUAL(*(i64*)rh.GetPayload(iter), hit->second);
+ h.erase(hit);
+ });
+ };
+
+ for (ui64 i = 0; i < 1000; ++i) {
+ if (batchLen == batch.size()) {
+ processBatch();
+ batchLen = 0;
+ }
+
+ batch[batchLen].ConstructKey(i);
+ batchI[batchLen] = i;
+ ++batchLen;
+ }
+
+ if (batchLen > 0) {
+ processBatch();
+ }
+
+ UNIT_ASSERT(h.empty());
+ }
}
}
diff --git a/yql/essentials/minikql/comp_nodes/ut/ya.make.inc b/yql/essentials/minikql/comp_nodes/ut/ya.make.inc
index fc600ad3c4..46a1ad7f11 100644
--- a/yql/essentials/minikql/comp_nodes/ut/ya.make.inc
+++ b/yql/essentials/minikql/comp_nodes/ut/ya.make.inc
@@ -24,6 +24,7 @@ SET(ORIG_SOURCES
mkql_block_compress_ut.cpp
mkql_block_exists_ut.cpp
mkql_block_skiptake_ut.cpp
+ mkql_block_map_join_ut_utils.cpp
mkql_block_map_join_ut.cpp
mkql_block_top_sort_ut.cpp
mkql_blocks_ut.cpp
diff --git a/yql/essentials/minikql/mkql_program_builder.cpp b/yql/essentials/minikql/mkql_program_builder.cpp
index 0a4cd30828..691a642814 100644
--- a/yql/essentials/minikql/mkql_program_builder.cpp
+++ b/yql/essentials/minikql/mkql_program_builder.cpp
@@ -257,16 +257,6 @@ static std::vector<TType*> ValidateBlockItems(const TArrayRef<TType* const>& wid
return items;
}
-std::vector<TType*> ValidateBlockStreamType(const TType* streamType, bool unwrap = true) {
- const auto wideComponents = GetWideComponents(AS_TYPE(TStreamType, streamType));
- return ValidateBlockItems(wideComponents, unwrap);
-}
-
-std::vector<TType*> ValidateBlockFlowType(const TType* flowType, bool unwrap = true) {
- const auto wideComponents = GetWideComponents(AS_TYPE(TFlowType, flowType));
- return ValidateBlockItems(wideComponents, unwrap);
-}
-
} // namespace
std::string_view ScriptTypeAsStr(EScriptType type) {
@@ -331,6 +321,16 @@ void EnsureDataOrOptionalOfData(TRuntimeNode node) {
->GetItemType()->IsData(), "Expected data or optional of data");
}
+std::vector<TType*> ValidateBlockStreamType(const TType* streamType, bool unwrap) {
+ const auto wideComponents = GetWideComponents(AS_TYPE(TStreamType, streamType));
+ return ValidateBlockItems(wideComponents, unwrap);
+}
+
+std::vector<TType*> ValidateBlockFlowType(const TType* flowType, bool unwrap) {
+ const auto wideComponents = GetWideComponents(AS_TYPE(TFlowType, flowType));
+ return ValidateBlockItems(wideComponents, unwrap);
+}
+
TProgramBuilder::TProgramBuilder(const TTypeEnvironment& env, const IFunctionRegistry& functionRegistry, bool voidWithEffects)
: TTypeBuilder(env)
, FunctionRegistry(functionRegistry)
@@ -5827,7 +5827,7 @@ TRuntimeNode TProgramBuilder::BlockCombineHashed(TRuntimeNode stream, std::optio
return FromFlow(BuildBlockCombineHashed(__func__, ToFlow(stream), filterColumn, keys, aggs, flowReturnType));
} else {
return BuildBlockCombineHashed(__func__, stream, filterColumn, keys, aggs, returnType);
- }
+ }
}
TRuntimeNode TProgramBuilder::BuildBlockMergeFinalizeHashed(const std::string_view& callableName, TRuntimeNode input, const TArrayRef<ui32>& keys,
@@ -5968,22 +5968,22 @@ TRuntimeNode TProgramBuilder::ScalarApply(const TArrayRef<const TRuntimeNode>& a
return TRuntimeNode(builder.Build(), false);
}
-TRuntimeNode TProgramBuilder::BlockMapJoinCore(TRuntimeNode stream, TRuntimeNode dict,
- EJoinKind joinKind, const TArrayRef<const ui32>& leftKeyColumns,
- const TArrayRef<const ui32>& leftKeyDrops, TType* returnType
+TRuntimeNode TProgramBuilder::BlockMapJoinCore(TRuntimeNode leftStream, TRuntimeNode rightStream, EJoinKind joinKind,
+ const TArrayRef<const ui32>& leftKeyColumns, const TArrayRef<const ui32>& leftKeyDrops,
+ const TArrayRef<const ui32>& rightKeyColumns, const TArrayRef<const ui32>& rightKeyDrops, bool rightAny, TType* returnType
) {
- if constexpr (RuntimeVersion < 51U) {
+ if constexpr (RuntimeVersion < 53U) {
THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__;
}
MKQL_ENSURE(joinKind == EJoinKind::Inner || joinKind == EJoinKind::Left ||
joinKind == EJoinKind::LeftSemi || joinKind == EJoinKind::LeftOnly,
"Unsupported join kind");
MKQL_ENSURE(!leftKeyColumns.empty(), "At least one key column must be specified");
- const THashSet<ui32> leftKeySet(leftKeyColumns.cbegin(), leftKeyColumns.cend());
- for (const auto& drop : leftKeyDrops) {
- MKQL_ENSURE(leftKeySet.contains(drop),
- "Only key columns has to be specified in drop column set");
- }
+ MKQL_ENSURE(leftKeyColumns.size() == rightKeyColumns.size(), "Key column count mismatch");
+
+ ValidateBlockStreamType(leftStream.GetStaticType());
+ ValidateBlockStreamType(rightStream.GetStaticType());
+ ValidateBlockStreamType(returnType);
TRuntimeNode::TList leftKeyColumnsNodes;
leftKeyColumnsNodes.reserve(leftKeyColumns.size());
@@ -5999,12 +5999,29 @@ TRuntimeNode TProgramBuilder::BlockMapJoinCore(TRuntimeNode stream, TRuntimeNode
return NewDataLiteral(idx);
});
+ TRuntimeNode::TList rightKeyColumnsNodes;
+ rightKeyColumnsNodes.reserve(rightKeyColumns.size());
+ std::transform(rightKeyColumns.cbegin(), rightKeyColumns.cend(),
+ std::back_inserter(rightKeyColumnsNodes), [this](const ui32 idx) {
+ return NewDataLiteral(idx);
+ });
+
+ TRuntimeNode::TList rightKeyDropsNodes;
+ rightKeyDropsNodes.reserve(leftKeyDrops.size());
+ std::transform(rightKeyDrops.cbegin(), rightKeyDrops.cend(),
+ std::back_inserter(rightKeyDropsNodes), [this](const ui32 idx) {
+ return NewDataLiteral(idx);
+ });
+
TCallableBuilder callableBuilder(Env, __func__, returnType);
- callableBuilder.Add(stream);
- callableBuilder.Add(dict);
+ callableBuilder.Add(leftStream);
+ callableBuilder.Add(rightStream);
callableBuilder.Add(NewDataLiteral((ui32)joinKind));
callableBuilder.Add(NewTuple(leftKeyColumnsNodes));
callableBuilder.Add(NewTuple(leftKeyDropsNodes));
+ callableBuilder.Add(NewTuple(rightKeyColumnsNodes));
+ callableBuilder.Add(NewTuple(rightKeyDropsNodes));
+ callableBuilder.Add(NewDataLiteral((bool)rightAny));
return TRuntimeNode(callableBuilder.Build(), false);
}
diff --git a/yql/essentials/minikql/mkql_program_builder.h b/yql/essentials/minikql/mkql_program_builder.h
index 7e139cd1a9..762d3a013e 100644
--- a/yql/essentials/minikql/mkql_program_builder.h
+++ b/yql/essentials/minikql/mkql_program_builder.h
@@ -133,6 +133,9 @@ struct TAggInfo {
std::vector<ui32> ArgsColumns;
};
+std::vector<TType*> ValidateBlockStreamType(const TType* streamType, bool unwrap = true);
+std::vector<TType*> ValidateBlockFlowType(const TType* flowType, bool unwrap = true);
+
class TProgramBuilder : public TTypeBuilder {
public:
TProgramBuilder(const TTypeEnvironment& env, const IFunctionRegistry& functionRegistry, bool voidWithEffects = false);
@@ -255,9 +258,9 @@ public:
TRuntimeNode BlockFromPg(TRuntimeNode input, TType* returnType);
TRuntimeNode BlockPgResolvedCall(const std::string_view& name, ui32 id,
const TArrayRef<const TRuntimeNode>& args, TType* returnType);
- TRuntimeNode BlockMapJoinCore(TRuntimeNode flow, TRuntimeNode dict,
- EJoinKind joinKind, const TArrayRef<const ui32>& leftKeyColumns,
- const TArrayRef<const ui32>& leftKeyDrops, TType* returnType);
+ TRuntimeNode BlockMapJoinCore(TRuntimeNode leftStream, TRuntimeNode rightStream, EJoinKind joinKind,
+ const TArrayRef<const ui32>& leftKeyColumns, const TArrayRef<const ui32>& leftKeyDrops,
+ const TArrayRef<const ui32>& rightKeyColumns, const TArrayRef<const ui32>& rightKeyDrops, bool rightAny, TType* returnType);
//-- logical functions
TRuntimeNode BlockNot(TRuntimeNode data);
@@ -754,7 +757,7 @@ protected:
TRuntimeNode BuildWideSkipTakeBlocks(const std::string_view& callableName, TRuntimeNode flow, TRuntimeNode count);
TRuntimeNode BuildBlockLogical(const std::string_view& callableName, TRuntimeNode first, TRuntimeNode second);
TRuntimeNode BuildExtend(const std::string_view& callableName, const TArrayRef<const TRuntimeNode>& lists);
-
+
TRuntimeNode BuildBlockDecimalBinary(const std::string_view& callableName, TRuntimeNode first, TRuntimeNode second);
private:
diff --git a/yql/essentials/minikql/mkql_runtime_version.h b/yql/essentials/minikql/mkql_runtime_version.h
index bfd26216ab..b4fbc1951e 100644
--- a/yql/essentials/minikql/mkql_runtime_version.h
+++ b/yql/essentials/minikql/mkql_runtime_version.h
@@ -24,7 +24,7 @@ namespace NMiniKQL {
// 1. Bump this version every time incompatible runtime nodes are introduced.
// 2. Make sure you provide runtime node generation for previous runtime versions.
#ifndef MKQL_RUNTIME_VERSION
-#define MKQL_RUNTIME_VERSION 52U
+#define MKQL_RUNTIME_VERSION 53U
#endif
// History:
diff --git a/yql/essentials/providers/common/proto/gateways_config.proto b/yql/essentials/providers/common/proto/gateways_config.proto
index 875ccaa22f..977ab2e7b9 100644
--- a/yql/essentials/providers/common/proto/gateways_config.proto
+++ b/yql/essentials/providers/common/proto/gateways_config.proto
@@ -2,8 +2,6 @@ package NYql;
option java_package = "ru.yandex.yql.proto";
import "yql/essentials/protos/clickhouse.proto";
-import "ydb/library/yql/providers/generic/connector/api/common/data_source.proto";
-import "ydb/library/yql/providers/generic/connector/api/common/endpoint.proto";
/////////////////////////////// common ///////////////////////////////
@@ -552,29 +550,140 @@ message TDbToolConfig {
/////////// Generic gateway for the external data sources ////////////
+// TGenericEndpoint represents the network address of a generic data source instance
+message TGenericEndpoint {
+ optional string host = 1;
+ optional uint32 port = 2;
+}
+
+
+// TGenericCredentials represents various ways of user authentication in the data source instance
+message TGenericCredentials {
+ message TBasic {
+ optional string username = 1;
+ optional string password = 2;
+ }
+
+ message TToken {
+ optional string type = 1;
+ optional string value = 2;
+ }
+
+ oneof payload {
+ TBasic basic = 1;
+ TToken token = 2;
+ }
+}
+
+// EGenericDataSourceKind enumerates the external data sources
+// supported by the federated query system
+enum EGenericDataSourceKind {
+ DATA_SOURCE_KIND_UNSPECIFIED = 0;
+ CLICKHOUSE = 1;
+ POSTGRESQL = 2;
+ S3 = 3;
+ YDB = 4;
+ MYSQL = 5;
+ MS_SQL_SERVER = 6;
+ GREENPLUM = 7;
+ ORACLE = 8;
+ LOGGING = 9;
+}
+
+// EGenericProtocol generalizes various kinds of network protocols supported by different databases.
+enum EGenericProtocol {
+ PROTOCOL_UNSPECIFIED = 0;
+ NATIVE = 1; // CLICKHOUSE, POSTGRESQL
+ HTTP = 2; // CLICKHOUSE, S3
+}
+
+// TPostgreSQLDataSourceOptions represents settings specific to PostgreSQL
+message TPostgreSQLDataSourceOptions {
+ // PostgreSQL schema
+ optional string schema = 1;
+}
+
+// TClickhouseDataSourceOptions represents settings specific to Clickhouse
+message TClickhouseDataSourceOptions {
+}
+
+// TS3DataSourceOptions represents settings specific to S3 (Simple Storage Service)
+message TS3DataSourceOptions {
+ // the region where data is stored
+ optional string region = 1;
+ // the bucket the object belongs to
+ optional string bucket = 2;
+}
+
+// TGreenplumDataSourceOptions represents settings specific to Greenplum
+message TGreenplumDataSourceOptions {
+ // Greenplum schema
+ optional string schema = 1;
+}
+
+// TOracleDataSourceOptions represents settings specific to Oracle
+message TOracleDataSourceOptions {
+ // Oracle service_name - alias to SID of oracle INSTANCE, or SID, or PDB.
+ // More about connection options in Oracle docs:
+ // https://docs.oracle.com/en/database/other-databases/essbase/21/essoa/connection-string-formats.html
+ optional string service_name = 1;
+}
+
+// TLoggingDataSourceOptions represents settings specific to Logging
+message TLoggingDataSourceOptions {
+ optional string folder_id = 1;
+}
+
+// TGenericDataSourceInstance helps to identify the instance of a data source to redirect request to.
+message TGenericDataSourceInstance {
+ // Data source kind
+ optional EGenericDataSourceKind kind = 1;
+ // Network address
+ optional TGenericEndpoint endpoint = 2;
+ // Database name
+ optional string database = 3;
+ // Credentials to access database
+ optional TGenericCredentials credentials = 4;
+ // If true, Connector server will use secure connections to access remote data sources.
+ // Certificates will be obtained from the standard system paths.
+ optional bool use_tls = 5;
+ // Allows to specify network protocol that should be used between
+ // during the connection between Connector and the remote data source
+ optional EGenericProtocol protocol = 6;
+ // Options specific to various data sources
+ oneof options {
+ TPostgreSQLDataSourceOptions pg_options = 7;
+ TClickhouseDataSourceOptions ch_options = 8;
+ TS3DataSourceOptions s3_options = 9;
+ TGreenplumDataSourceOptions gp_options = 10;
+ TOracleDataSourceOptions oracle_options = 11;
+ TLoggingDataSourceOptions logging_options = 12;
+ }
+}
+
message TGenericClusterConfig {
// Cluster name
optional string Name = 1;
// Data source kind
- optional NYql.NConnector.NApi.EDataSourceKind Kind = 8;
+ optional EGenericDataSourceKind Kind = 8;
// Location represents the network address of a data source instance we want to connect
oneof Location {
// Endpoint must be used for on-premise deployments.
- NYql.NConnector.NApi.TEndpoint Endpoint = 9;
+ TGenericEndpoint Endpoint = 9;
// DatabaseId must be used when the data source is deployed in cloud.
// Data source FQDN and port will be resolved by MDB service.
string DatabaseId = 4;
}
// Credentials used to access data source instance
- optional NYql.NConnector.NApi.TCredentials Credentials = 10;
+ optional TGenericCredentials Credentials = 10;
// Credentials used to access managed databases APIs.
// When working with external data source instances deployed in clouds,
- // one should either set (ServiceAccountId, ServiceAccountIdSignature) pair
- // that will be resolved into IAM Token via Token Accessor,
+ // one should either set (ServiceAccountId, ServiceAccountIdSignature) pair
+ // that will be resolved into IAM Token via Token Accessor,
// or provide IAM Token directly.
optional string ServiceAccountId = 6;
optional string ServiceAccountIdSignature = 7;
@@ -588,7 +697,7 @@ message TGenericClusterConfig {
optional string DatabaseName = 13;
// Transport protocol used to establish a network connection with database
- optional NYql.NConnector.NApi.EProtocol Protocol = 14;
+ optional EGenericProtocol Protocol = 14;
// Data source options specific to various data sources
map<string, string> DataSourceOptions = 15;
@@ -601,7 +710,7 @@ message TGenericConnectorConfig {
// 1. Set address statically via `Endpoint.Host` and `Endpoint.Port`;
// 2. Ask YDB to set `Endpoint.Port` to the value of expression `./ydbd --ic-port + OffsetFromIcPort`,
// while Connector's hostname will still be taken from `Endpoint.Host` (with 'localhost' as a default value).
- optional NYql.NConnector.NApi.TEndpoint Endpoint = 3;
+ optional TGenericEndpoint Endpoint = 3;
optional uint32 OffsetFromIcPort = 6;
// If true, Connector GRPC Client will use TLS encryption.
@@ -638,7 +747,7 @@ message TGenericGatewayConfig {
message TDbResolverConfig {
// Ydb / Yds MVP endpoint.
- // Expected format:
+ // Expected format:
// [http|https]://host:port/ydbc/cloud-prod/
optional string YdbMvpEndpoint = 2;
}
diff --git a/yql/essentials/providers/common/proto/ya.make b/yql/essentials/providers/common/proto/ya.make
index 6ce6a6c99c..eefd4c6b72 100644
--- a/yql/essentials/providers/common/proto/ya.make
+++ b/yql/essentials/providers/common/proto/ya.make
@@ -7,7 +7,6 @@ SRCS(
PEERDIR(
yql/essentials/protos
- ydb/library/yql/providers/generic/connector/api/common
)
IF (NOT PY_PROTOS_FOR)
diff --git a/yql/essentials/public/udf/udf_types.h b/yql/essentials/public/udf/udf_types.h
index 57f2f4e7e9..c5610016dc 100644
--- a/yql/essentials/public/udf/udf_types.h
+++ b/yql/essentials/public/udf/udf_types.h
@@ -131,6 +131,9 @@ public:
struct TArgumentFlags {
enum {
AutoMap = 0x01,
+#if UDF_ABI_COMPATIBILITY_VERSION_CURRENT >= UDF_ABI_COMPATIBILITY_VERSION(2, 41)
+ NoYield = 0x02,
+#endif
};
};
@@ -321,7 +324,7 @@ public:
public:
// returns nullptr if type isn't supported
virtual IArrowType::TPtr MakeArrowType(const TType* type) const = 0;
- // The given ArrowSchema struct is released, even if this function fails.
+ // The given ArrowSchema struct is released, even if this function fails.
virtual IArrowType::TPtr ImportArrowType(ArrowSchema* schema) const = 0;
};
#endif
diff --git a/yql/essentials/public/udf/udf_version.h b/yql/essentials/public/udf/udf_version.h
index 8045789bfc..747d15cf44 100644
--- a/yql/essentials/public/udf/udf_version.h
+++ b/yql/essentials/public/udf/udf_version.h
@@ -7,7 +7,7 @@ namespace NYql {
namespace NUdf {
#define CURRENT_UDF_ABI_VERSION_MAJOR 2
-#define CURRENT_UDF_ABI_VERSION_MINOR 40
+#define CURRENT_UDF_ABI_VERSION_MINOR 41
#define CURRENT_UDF_ABI_VERSION_PATCH 0
#ifdef USE_CURRENT_UDF_ABI_VERSION
diff --git a/yql/essentials/sql/pg/pg_sql.cpp b/yql/essentials/sql/pg/pg_sql.cpp
index 94f90ff39e..84d3c39454 100644
--- a/yql/essentials/sql/pg/pg_sql.cpp
+++ b/yql/essentials/sql/pg/pg_sql.cpp
@@ -6355,7 +6355,7 @@ public:
}
if (!root) {
- Cerr << "Can't parse SQL for function: " << proc.Name << ", " << results[0].Issues.ToString();
+ //Cerr << "Can't parse SQL for function: " << proc.Name << ", " << results[0].Issues.ToString();
return;
}
diff --git a/yql/essentials/sql/v1/builtin.cpp b/yql/essentials/sql/v1/builtin.cpp
index a4bec63066..b7d29303cd 100644
--- a/yql/essentials/sql/v1/builtin.cpp
+++ b/yql/essentials/sql/v1/builtin.cpp
@@ -3731,16 +3731,20 @@ TNodePtr BuildBuiltinFunc(TContext& ctx, TPosition pos, TString name, const TVec
dflt = positional.GetTupleElement(positional.GetTupleSize() - 1);
}
} else {
+ size_t minArgs = withDefault ? 2 : 1;
+ if (args.size() < minArgs) {
+ return new TInvalidBuiltin(pos, TStringBuilder() << name
+ << " requires at least " << minArgs << " positional arguments");
+ }
variant = args[0];
- size_t defaultSuffix = withDefault ? 1 : 0;
- labels.reserve(args.size() - 1 - defaultSuffix);
- handlers.reserve(args.size() - 1 - defaultSuffix);
- for (size_t idx = 0; idx + 1 < args.size() - defaultSuffix; idx++) {
+ labels.reserve(args.size() - minArgs);
+ handlers.reserve(args.size() - minArgs);
+ for (size_t idx = 0; idx < args.size() - minArgs; idx++) {
labels.push_back(BuildQuotedAtom(pos, ToString(idx)));
- handlers.push_back(args[idx + 1]);
+ handlers.push_back(args[minArgs + idx]);
}
if (withDefault) {
- dflt = args.back();
+ dflt = args[1];
}
}
TVector<TNodePtr> resultArgs;
diff --git a/yql/essentials/sql/v1/format/sql_format.cpp b/yql/essentials/sql/v1/format/sql_format.cpp
index c029f6e3ad..8c4b04760f 100644
--- a/yql/essentials/sql/v1/format/sql_format.cpp
+++ b/yql/essentials/sql/v1/format/sql_format.cpp
@@ -62,7 +62,7 @@ void SkipForValidate(
while (
in != query.end() && in->Name == "SEMICOLON" &&
(out == formattedQuery.end() || out->Name != "SEMICOLON") &&
- in != query.begin() && IsIn({"SEMICOLON", "LBRACE_CURLY", "AS"}, SkipWSOrCommentBackward(in - 1, query.begin())->Name)
+ in != query.begin() && IsIn({"SEMICOLON", "LBRACE_CURLY", "AS", "BEGIN"}, SkipWSOrCommentBackward(in - 1, query.begin())->Name)
) {
in = SkipWS(++in, query.end());
}
@@ -562,6 +562,10 @@ private:
WriteComments(true);
}
+ if (AfterComment && Comments[LastComment - 1].Content.StartsWith("--")) {
+ return;
+ }
+
if (OutColumn) {
Out('\n');
}
diff --git a/yql/essentials/sql/v1/format/sql_format_ut.h b/yql/essentials/sql/v1/format/sql_format_ut.h
index 691cc8f8b7..57bcef4bff 100644
--- a/yql/essentials/sql/v1/format/sql_format_ut.h
+++ b/yql/essentials/sql/v1/format/sql_format_ut.h
@@ -593,6 +593,9 @@ Y_UNIT_TEST(If) {
{"evaluate if 1=1 do begin select 1; end do else do begin select 2; end do",
"EVALUATE IF 1 == 1 DO BEGIN\n\tSELECT\n\t\t1\n\t;\nEND DO "
"ELSE DO BEGIN\n\tSELECT\n\t\t2\n\t;\nEND DO;\n"},
+ {"evaluate if 1=1 do begin; select 1 end do else do begin select 2;; select 3 end do",
+ "EVALUATE IF 1 == 1 DO BEGIN\n\tSELECT\n\t\t1\n\t;\nEND DO ELSE DO BEGIN\n\t"
+ "SELECT\n\t\t2\n\t;\n\n\tSELECT\n\t\t3\n\t;\nEND DO;\n"}
};
TSetup setup;
@@ -611,6 +614,8 @@ Y_UNIT_TEST(For) {
"EVALUATE FOR $x IN [] DO BEGIN\n\tSELECT\n\t\t$x\n\t;\nEND DO ELSE DO BEGIN\n\tSELECT\n\t\t2\n\t;\nEND DO;\n"},
{"evaluate parallel for $x in [] do $a($x)",
"EVALUATE PARALLEL FOR $x IN [] DO\n\t$a($x)\n;\n"},
+ {"evaluate for $x in [] do begin; select $x;; select $y end do",
+ "EVALUATE FOR $x IN [] DO BEGIN\n\tSELECT\n\t\t$x\n\t;\n\n\tSELECT\n\t\t$y\n\t;\nEND DO;\n"},
};
TSetup setup;
@@ -867,8 +872,10 @@ Y_UNIT_TEST(Select) {
"SELECT\n\t1\nLIMIT 10;\n"},
{"select 1 limit 10 offset 5",
"SELECT\n\t1\nLIMIT 10 OFFSET 5;\n"},
- { "select 1 union all select 2",
+ {"select 1 union all select 2",
"SELECT\n\t1\nUNION ALL\nSELECT\n\t2\n;\n" },
+ {"select * from $user where key == 1 -- comment",
+ "SELECT\n\t*\nFROM\n\t$user\nWHERE\n\tkey == 1 -- comment\n;\n"},
};
TSetup setup;
diff --git a/yql/essentials/tests/s-expressions/suites/Optimizers/FuseLMapAfterLReduce.cfg b/yql/essentials/tests/s-expressions/suites/Optimizers/FuseLMapAfterLReduce.cfg
index 62b2a5fe18..d575d2a0c9 100644
--- a/yql/essentials/tests/s-expressions/suites/Optimizers/FuseLMapAfterLReduce.cfg
+++ b/yql/essentials/tests/s-expressions/suites/Optimizers/FuseLMapAfterLReduce.cfg
@@ -2,4 +2,4 @@ in Input input.txt
out Output output.txt
res result.txt
udf python3_udf
-providers dummy
+providers yt
diff --git a/yql/essentials/tests/s-expressions/suites/Optimizers/FuseLMapAfterLReduce.sql b/yql/essentials/tests/s-expressions/suites/Optimizers/FuseLMapAfterLReduce.sql
index 0f5cd0b0c6..71ad92d19d 100644
--- a/yql/essentials/tests/s-expressions/suites/Optimizers/FuseLMapAfterLReduce.sql
+++ b/yql/essentials/tests/s-expressions/suites/Optimizers/FuseLMapAfterLReduce.sql
@@ -8,7 +8,7 @@ GROUP BY Length(key) as skey);
$udfScript = @@
def f(input,x):
for i in list(input):
- d = i.__dict__
+ d = {name: getattr(i, name) for name in i.__class__.__match_args__}
d["pass"] = x
yield d
@@;
diff --git a/yql/essentials/tests/s-expressions/suites/Optimizers/FuseLMapAfterLReduce.yql b/yql/essentials/tests/s-expressions/suites/Optimizers/FuseLMapAfterLReduce.yql
index 3df99f0f47..dc7c70cea6 100644
--- a/yql/essentials/tests/s-expressions/suites/Optimizers/FuseLMapAfterLReduce.yql
+++ b/yql/essentials/tests/s-expressions/suites/Optimizers/FuseLMapAfterLReduce.yql
@@ -28,7 +28,7 @@
(let inputType (CallableArgumentType (TypeOf (ScriptUdf 'Python3 '"f" (CallableType '() '((StreamType (StructType '('"cnt" (DataType 'Uint64)) '('"pass" (DataType 'Int32)) '('"skey" (DataType 'Uint32))))) '((StreamType (StructType '('"cnt" (DataType 'Uint64)) '('"skey" (DataType 'Uint32))))) '((DataType 'Int32))) (String '@@
def f(input,x):
for i in list(input):
- d = i.__dict__
+ d = {name: getattr(i, name) for name in i.__class__.__match_args__}
d["pass"] = x
yield d
@@))) '0))
@@ -36,7 +36,7 @@ def f(input,x):
(return (Apply (ScriptUdf 'Python3 '"f" (CallableType '() '((StreamType (StructType '('"cnt" (DataType 'Uint64)) '('"pass" (DataType 'Int32)) '('"skey" (DataType 'Uint32))))) '((StreamType (StructType '('"cnt" (DataType 'Uint64)) '('"skey" (DataType 'Uint32))))) '((DataType 'Int32))) (String '@@
def f(input,x):
for i in list(input):
- d = i.__dict__
+ d = {name: getattr(i, name) for name in i.__class__.__match_args__}
d["pass"] = x
yield d
@@)) inputRowsList (Int32 '"1")))
@@ -49,7 +49,7 @@ def f(input,x):
(let inputType (CallableArgumentType (TypeOf (ScriptUdf 'Python3 '"f" (CallableType '() '((StreamType (StructType '('"cnt" (DataType 'Uint64)) '('"pass" (DataType 'Int32)) '('"skey" (DataType 'Uint32))))) '((StreamType (StructType '('"cnt" (DataType 'Uint64)) '('"pass" (DataType 'Int32)) '('"skey" (DataType 'Uint32))))) '((DataType 'Int32))) (String '@@
def f(input,x):
for i in list(input):
- d = i.__dict__
+ d = {name: getattr(i, name) for name in i.__class__.__match_args__}
d["pass"] = x
yield d
@@))) '0))
@@ -57,7 +57,7 @@ def f(input,x):
(return (Apply (ScriptUdf 'Python3 '"f" (CallableType '() '((StreamType (StructType '('"cnt" (DataType 'Uint64)) '('"pass" (DataType 'Int32)) '('"skey" (DataType 'Uint32))))) '((StreamType (StructType '('"cnt" (DataType 'Uint64)) '('"pass" (DataType 'Int32)) '('"skey" (DataType 'Uint32))))) '((DataType 'Int32))) (String '@@
def f(input,x):
for i in list(input):
- d = i.__dict__
+ d = {name: getattr(i, name) for name in i.__class__.__match_args__}
d["pass"] = x
yield d
@@)) inputRowsList (Int32 '"2")))
diff --git a/yql/essentials/tests/s-expressions/suites/Optimizers/FuseLMapAfterReduce.cfg b/yql/essentials/tests/s-expressions/suites/Optimizers/FuseLMapAfterReduce.cfg
index 62b2a5fe18..d575d2a0c9 100644
--- a/yql/essentials/tests/s-expressions/suites/Optimizers/FuseLMapAfterReduce.cfg
+++ b/yql/essentials/tests/s-expressions/suites/Optimizers/FuseLMapAfterReduce.cfg
@@ -2,4 +2,4 @@ in Input input.txt
out Output output.txt
res result.txt
udf python3_udf
-providers dummy
+providers yt
diff --git a/yql/essentials/tests/s-expressions/suites/Optimizers/FuseLMapAfterReduce.sql b/yql/essentials/tests/s-expressions/suites/Optimizers/FuseLMapAfterReduce.sql
index f0bd1ff3ba..e15ee6feeb 100644
--- a/yql/essentials/tests/s-expressions/suites/Optimizers/FuseLMapAfterReduce.sql
+++ b/yql/essentials/tests/s-expressions/suites/Optimizers/FuseLMapAfterReduce.sql
@@ -8,7 +8,7 @@ GROUP BY Length(key) as skey);
$udfScript = @@
def f(input,x):
for i in list(input):
- d = i.__dict__
+ d = {name: getattr(i, name) for name in i.__class__.__match_args__}
d["pass"] = x
yield d
@@;
diff --git a/yql/essentials/tests/s-expressions/suites/Optimizers/FuseLMapAfterReduce.yql b/yql/essentials/tests/s-expressions/suites/Optimizers/FuseLMapAfterReduce.yql
index 3e8637fc54..2143f06bf2 100644
--- a/yql/essentials/tests/s-expressions/suites/Optimizers/FuseLMapAfterReduce.yql
+++ b/yql/essentials/tests/s-expressions/suites/Optimizers/FuseLMapAfterReduce.yql
@@ -26,7 +26,7 @@
(let inputType (CallableArgumentType (TypeOf (ScriptUdf 'Python3 '"f" (CallableType '() '((StreamType (StructType '('"cnt" (DataType 'Uint64)) '('"pass" (DataType 'Int32)) '('"skey" (DataType 'Uint32))))) '((StreamType (StructType '('"cnt" (DataType 'Uint64)) '('"skey" (DataType 'Uint32))))) '((DataType 'Int32))) (String '@@
def f(input,x):
for i in list(input):
- d = i.__dict__
+ d = {name: getattr(i, name) for name in i.__class__.__match_args__}
d["pass"] = x
yield d
@@))) '0))
@@ -34,7 +34,7 @@ def f(input,x):
(return (Apply (ScriptUdf 'Python3 '"f" (CallableType '() '((StreamType (StructType '('"cnt" (DataType 'Uint64)) '('"pass" (DataType 'Int32)) '('"skey" (DataType 'Uint32))))) '((StreamType (StructType '('"cnt" (DataType 'Uint64)) '('"skey" (DataType 'Uint32))))) '((DataType 'Int32))) (String '@@
def f(input,x):
for i in list(input):
- d = i.__dict__
+ d = {name: getattr(i, name) for name in i.__class__.__match_args__}
d["pass"] = x
yield d
@@)) inputRowsList (Int32 '"1")))
diff --git a/yql/essentials/tests/s-expressions/suites/Optimizers/FuseMapAfterLReduce.cfg b/yql/essentials/tests/s-expressions/suites/Optimizers/FuseMapAfterLReduce.cfg
index 62b2a5fe18..d575d2a0c9 100644
--- a/yql/essentials/tests/s-expressions/suites/Optimizers/FuseMapAfterLReduce.cfg
+++ b/yql/essentials/tests/s-expressions/suites/Optimizers/FuseMapAfterLReduce.cfg
@@ -2,4 +2,4 @@ in Input input.txt
out Output output.txt
res result.txt
udf python3_udf
-providers dummy
+providers yt
diff --git a/yql/essentials/tests/s-expressions/suites/Optimizers/FuseMapAfterLReduce.sql b/yql/essentials/tests/s-expressions/suites/Optimizers/FuseMapAfterLReduce.sql
index c87931cafd..d3690a17e4 100644
--- a/yql/essentials/tests/s-expressions/suites/Optimizers/FuseMapAfterLReduce.sql
+++ b/yql/essentials/tests/s-expressions/suites/Optimizers/FuseMapAfterLReduce.sql
@@ -8,7 +8,7 @@ GROUP BY Length(key) as skey);
$udfScript = @@
def f(input,x):
for i in list(input):
- d = i.__dict__
+ d = {name: getattr(i, name) for name in i.__class__.__match_args__}
d["pass"] = x
yield d
diff --git a/yql/essentials/tests/s-expressions/suites/Optimizers/FuseMapAfterLReduce.yql b/yql/essentials/tests/s-expressions/suites/Optimizers/FuseMapAfterLReduce.yql
index 5656d60fc3..5495292e2e 100644
--- a/yql/essentials/tests/s-expressions/suites/Optimizers/FuseMapAfterLReduce.yql
+++ b/yql/essentials/tests/s-expressions/suites/Optimizers/FuseMapAfterLReduce.yql
@@ -28,7 +28,7 @@
(let inputType (CallableArgumentType (TypeOf (ScriptUdf 'Python3 '"f" (CallableType '() '((StreamType (StructType '('"cnt" (DataType 'Uint64)) '('"pass" (DataType 'Int32)) '('"skey" (DataType 'Uint32))))) '((StreamType (StructType '('"cnt" (DataType 'Uint64)) '('"skey" (DataType 'Uint32))))) '((DataType 'Int32))) (String '@@
def f(input,x):
for i in list(input):
- d = i.__dict__
+ d = {name: getattr(i, name) for name in i.__class__.__match_args__}
d["pass"] = x
yield d
@@ -39,7 +39,7 @@ def f2(a,b,c):
(return (Apply (ScriptUdf 'Python3 '"f" (CallableType '() '((StreamType (StructType '('"cnt" (DataType 'Uint64)) '('"pass" (DataType 'Int32)) '('"skey" (DataType 'Uint32))))) '((StreamType (StructType '('"cnt" (DataType 'Uint64)) '('"skey" (DataType 'Uint32))))) '((DataType 'Int32))) (String '@@
def f(input,x):
for i in list(input):
- d = i.__dict__
+ d = {name: getattr(i, name) for name in i.__class__.__match_args__}
d["pass"] = x
yield d
@@ -55,7 +55,7 @@ def f2(a,b,c):
(let res (Apply (ScriptUdf 'Python3 '"f2" (CallableType '() '((StructType '('"a" (DataType 'Uint32)) '('"b" (DataType 'Uint64)) '('"c" (DataType 'Int32)))) '((DataType 'Uint32) 'skey) '((DataType 'Uint64) 'cnt) '((DataType 'Int32) 'pass)) (String '@@
def f(input,x):
for i in list(input):
- d = i.__dict__
+ d = {name: getattr(i, name) for name in i.__class__.__match_args__}
d["pass"] = x
yield d
diff --git a/yql/essentials/tests/sql/sql2yql/canondata/result.json b/yql/essentials/tests/sql/sql2yql/canondata/result.json
index ebec884c2a..47f93caece 100644
--- a/yql/essentials/tests/sql/sql2yql/canondata/result.json
+++ b/yql/essentials/tests/sql/sql2yql/canondata/result.json
@@ -1833,6 +1833,13 @@
"uri": "https://{canondata_backend}/1937429/434276f26b2857be3c5ad3fdbbf877d2bf775ac5/resource.tar.gz#test_sql2yql.test_aggregate-avg_with_having_/sql.yql"
}
],
+ "test_sql2yql.test[aggregate-compact_distinct]": [
+ {
+ "checksum": "c6aa750c244ef573293be11d07397a54",
+ "size": 3863,
+ "uri": "https://{canondata_backend}/1773845/f26b5f394704b0b9c6108eac91165c8420756fb1/resource.tar.gz#test_sql2yql.test_aggregate-compact_distinct_/sql.yql"
+ }
+ ],
"test_sql2yql.test[aggregate-compare_by]": [
{
"checksum": "a9eacab25486aef4e5000b928097e2f7",
@@ -2610,6 +2617,13 @@
"uri": "https://{canondata_backend}/1937429/434276f26b2857be3c5ad3fdbbf877d2bf775ac5/resource.tar.gz#test_sql2yql.test_aggregate-native_desc_group_compact_by_/sql.yql"
}
],
+ "test_sql2yql.test[aggregate-no_compact_distinct]": [
+ {
+ "checksum": "7a533d8080711b15f39de0c29270692f",
+ "size": 3771,
+ "uri": "https://{canondata_backend}/1899731/184b8df51a2ed58cdacc5419a09c9170dbd7ef88/resource.tar.gz#test_sql2yql.test_aggregate-no_compact_distinct_/sql.yql"
+ }
+ ],
"test_sql2yql.test[aggregate-null_type]": [
{
"checksum": "33f4556d059ae6520e3e360b3a780337",
@@ -16253,6 +16267,13 @@
"uri": "https://{canondata_backend}/1936273/4a1b39013e1bae40e722cff8ccef8829784964e2/resource.tar.gz#test_sql2yql.test_produce-reduce_with_python_row_repack_/sql.yql"
}
],
+ "test_sql2yql.test[produce-reduce_with_trivial_remaps]": [
+ {
+ "checksum": "b094793320668f635c5e837a71e84b50",
+ "size": 2276,
+ "uri": "https://{canondata_backend}/1942100/fc877ab79aa3673db82f29099d27c26f000fde66/resource.tar.gz#test_sql2yql.test_produce-reduce_with_trivial_remaps_/sql.yql"
+ }
+ ],
"test_sql2yql.test[produce-yql-10297]": [
{
"checksum": "0efb35a2c6333d72f2edddd180c70a00",
@@ -21252,6 +21273,11 @@
"uri": "file://test_sql_format.test_aggregate-avg_with_having_/formatted.sql"
}
],
+ "test_sql_format.test[aggregate-compact_distinct]": [
+ {
+ "uri": "file://test_sql_format.test_aggregate-compact_distinct_/formatted.sql"
+ }
+ ],
"test_sql_format.test[aggregate-compare_by]": [
{
"uri": "file://test_sql_format.test_aggregate-compare_by_/formatted.sql"
@@ -21807,6 +21833,11 @@
"uri": "file://test_sql_format.test_aggregate-native_desc_group_compact_by_/formatted.sql"
}
],
+ "test_sql_format.test[aggregate-no_compact_distinct]": [
+ {
+ "uri": "file://test_sql_format.test_aggregate-no_compact_distinct_/formatted.sql"
+ }
+ ],
"test_sql_format.test[aggregate-null_type]": [
{
"uri": "file://test_sql_format.test_aggregate-null_type_/formatted.sql"
@@ -29302,6 +29333,11 @@
"uri": "file://test_sql_format.test_produce-reduce_with_python_row_repack_/formatted.sql"
}
],
+ "test_sql_format.test[produce-reduce_with_trivial_remaps]": [
+ {
+ "uri": "file://test_sql_format.test_produce-reduce_with_trivial_remaps_/formatted.sql"
+ }
+ ],
"test_sql_format.test[produce-yql-10297]": [
{
"uri": "file://test_sql_format.test_produce-yql-10297_/formatted.sql"
diff --git a/yql/essentials/tests/sql/sql2yql/canondata/test_sql_format.test_aggregate-compact_distinct_/formatted.sql b/yql/essentials/tests/sql/sql2yql/canondata/test_sql_format.test_aggregate-compact_distinct_/formatted.sql
new file mode 100644
index 0000000000..c59d082159
--- /dev/null
+++ b/yql/essentials/tests/sql/sql2yql/canondata/test_sql_format.test_aggregate-compact_distinct_/formatted.sql
@@ -0,0 +1,36 @@
+USE plato;
+
+PRAGMA AnsiOptionalAs;
+PRAGMA yt.CompactForDistinct;
+
+$x = (
+ SELECT
+ key,
+ AVG(DISTINCT CAST(subkey AS float)) s
+ FROM
+ InputB
+ GROUP BY
+ key
+);
+
+$y = (
+ SELECT
+ key,
+ SUM(CAST(subkey AS float)) s
+ FROM
+ InputC
+ GROUP BY
+ key
+);
+
+SELECT
+ x.key,
+ x.s AS s1,
+ y.s AS s2
+FROM
+ $x x
+FULL OUTER JOIN
+ $y y
+ON
+ x.key == y.key
+;
diff --git a/yql/essentials/tests/sql/sql2yql/canondata/test_sql_format.test_aggregate-no_compact_distinct_/formatted.sql b/yql/essentials/tests/sql/sql2yql/canondata/test_sql_format.test_aggregate-no_compact_distinct_/formatted.sql
new file mode 100644
index 0000000000..96db5d308f
--- /dev/null
+++ b/yql/essentials/tests/sql/sql2yql/canondata/test_sql_format.test_aggregate-no_compact_distinct_/formatted.sql
@@ -0,0 +1,35 @@
+USE plato;
+
+PRAGMA AnsiOptionalAs;
+
+$x = (
+ SELECT
+ key,
+ AVG(DISTINCT CAST(subkey AS float)) s
+ FROM
+ InputB
+ GROUP BY
+ key
+);
+
+$y = (
+ SELECT
+ key,
+ SUM(CAST(subkey AS float)) s
+ FROM
+ InputC
+ GROUP BY
+ key
+);
+
+SELECT
+ x.key,
+ x.s AS s1,
+ y.s AS s2
+FROM
+ $x x
+FULL OUTER JOIN
+ $y y
+ON
+ x.key == y.key
+;
diff --git a/yql/essentials/tests/sql/sql2yql/canondata/test_sql_format.test_expr-variant_builtins_opt_/formatted.sql b/yql/essentials/tests/sql/sql2yql/canondata/test_sql_format.test_expr-variant_builtins_opt_/formatted.sql
index 3023dcf5c2..e4aa4de0a1 100644
--- a/yql/essentials/tests/sql/sql2yql/canondata/test_sql_format.test_expr-variant_builtins_opt_/formatted.sql
+++ b/yql/essentials/tests/sql/sql2yql/canondata/test_sql_format.test_expr-variant_builtins_opt_/formatted.sql
@@ -25,6 +25,6 @@ $var_2 = VARIANT ("8", "1", $vartype_t);
SELECT
Visit(Just($var_1), $handle_a, $handle_b),
Visit(Just($var_2), $handle_a, $handle_b),
- VisitOrDefault(Just($var_2), $handle_a, Just(777u)),
+ VisitOrDefault(Just($var_2), Just(777u), $handle_a),
VariantItem(Just($var_b))
;
diff --git a/yql/essentials/tests/sql/sql2yql/canondata/test_sql_format.test_expr-variant_tuple_builtins_/formatted.sql b/yql/essentials/tests/sql/sql2yql/canondata/test_sql_format.test_expr-variant_tuple_builtins_/formatted.sql
index b1e21cda10..bdca3d25be 100644
--- a/yql/essentials/tests/sql/sql2yql/canondata/test_sql_format.test_expr-variant_tuple_builtins_/formatted.sql
+++ b/yql/essentials/tests/sql/sql2yql/canondata/test_sql_format.test_expr-variant_tuple_builtins_/formatted.sql
@@ -25,7 +25,7 @@ SELECT
;
$visitor_def = ($var) -> {
- RETURN VisitOrDefault($var, $handle_num, $handle_flag, 999);
+ RETURN VisitOrDefault($var, 999, $handle_num, $handle_flag);
};
SELECT
diff --git a/yql/essentials/tests/sql/sql2yql/canondata/test_sql_format.test_produce-reduce_with_trivial_remaps_/formatted.sql b/yql/essentials/tests/sql/sql2yql/canondata/test_sql_format.test_produce-reduce_with_trivial_remaps_/formatted.sql
new file mode 100644
index 0000000000..a29b85cedb
--- /dev/null
+++ b/yql/essentials/tests/sql/sql2yql/canondata/test_sql_format.test_produce-reduce_with_trivial_remaps_/formatted.sql
@@ -0,0 +1,23 @@
+USE plato;
+
+PRAGMA warning("disable", "4510");
+
+$udf = ($_key, $stream) -> {
+ $init = ($item) -> (AsStruct(1u AS cnt, $item AS row));
+ $switch = ($_item, $_state) -> (FALSE);
+ $update = ($item, $state) -> (
+ AsStruct(
+ $state.cnt + 1u AS cnt,
+ if(($item.value > $state.row.value) ?? FALSE, $item, $state.row) AS row
+ )
+ );
+ $state = YQL::Collect(YQL::Condense1($stream, $init, $switch, $update));
+ RETURN $state;
+};
+
+REDUCE CONCAT(Input1, Input2)
+PRESORT
+ subkey
+ON
+ key
+USING $udf(TableRow());
diff --git a/yql/essentials/tests/sql/suites/aggregate/compact_distinct.cfg b/yql/essentials/tests/sql/suites/aggregate/compact_distinct.cfg
new file mode 100644
index 0000000000..711689a5cb
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/aggregate/compact_distinct.cfg
@@ -0,0 +1,3 @@
+providers yt
+in InputB input2.txt
+in InputC input3.txt
diff --git a/yql/essentials/tests/sql/suites/aggregate/compact_distinct.sql b/yql/essentials/tests/sql/suites/aggregate/compact_distinct.sql
new file mode 100644
index 0000000000..4176ad60aa
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/aggregate/compact_distinct.sql
@@ -0,0 +1,14 @@
+USE plato;
+
+pragma AnsiOptionalAs;
+pragma yt.CompactForDistinct;
+
+$x = (
+ SELECT key, AVG(DISTINCT Cast(subkey as float)) s FROM InputB GROUP BY key
+);
+
+$y = (
+ SELECT key, SUM(Cast(subkey as float)) s FROM InputC GROUP BY key
+);
+
+SELECT x.key, x.s AS s1, y.s AS s2 FROM $x x FULL OUTER JOIN $y y ON x.key = y.key;
diff --git a/yql/essentials/tests/sql/suites/aggregate/no_compact_distinct.cfg b/yql/essentials/tests/sql/suites/aggregate/no_compact_distinct.cfg
new file mode 100644
index 0000000000..711689a5cb
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/aggregate/no_compact_distinct.cfg
@@ -0,0 +1,3 @@
+providers yt
+in InputB input2.txt
+in InputC input3.txt
diff --git a/yql/essentials/tests/sql/suites/aggregate/no_compact_distinct.sql b/yql/essentials/tests/sql/suites/aggregate/no_compact_distinct.sql
new file mode 100644
index 0000000000..757c9d55cc
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/aggregate/no_compact_distinct.sql
@@ -0,0 +1,13 @@
+USE plato;
+
+pragma AnsiOptionalAs;
+
+$x = (
+ SELECT key, AVG(DISTINCT Cast(subkey as float)) s FROM InputB GROUP BY key
+);
+
+$y = (
+ SELECT key, SUM(Cast(subkey as float)) s FROM InputC GROUP BY key
+);
+
+SELECT x.key, x.s AS s1, y.s AS s2 FROM $x x FULL OUTER JOIN $y y ON x.key = y.key;
diff --git a/yql/essentials/tests/sql/suites/expr/variant_builtins_opt.sql b/yql/essentials/tests/sql/suites/expr/variant_builtins_opt.sql
index 443802fe56..3d28d3f09a 100644
--- a/yql/essentials/tests/sql/suites/expr/variant_builtins_opt.sql
+++ b/yql/essentials/tests/sql/suites/expr/variant_builtins_opt.sql
@@ -21,6 +21,6 @@ $var_2 = Variant("8", "1", $vartype_t);
SELECT
Visit(Just($var_1), $handle_a, $handle_b),
Visit(Just($var_2), $handle_a, $handle_b),
- VisitOrDefault(Just($var_2), $handle_a, Just(777u)),
+ VisitOrDefault(Just($var_2), Just(777u), $handle_a),
VariantItem(Just($var_b))
;
diff --git a/yql/essentials/tests/sql/suites/expr/variant_tuple_builtins.sql b/yql/essentials/tests/sql/suites/expr/variant_tuple_builtins.sql
index 35d1775708..075f061974 100644
--- a/yql/essentials/tests/sql/suites/expr/variant_tuple_builtins.sql
+++ b/yql/essentials/tests/sql/suites/expr/variant_tuple_builtins.sql
@@ -12,7 +12,7 @@ SELECT
$visitor(NULL)
;
-$visitor_def = ($var) -> { return VisitOrDefault($var, $handle_num, $handle_flag, 999); };
+$visitor_def = ($var) -> { return VisitOrDefault($var, 999, $handle_num, $handle_flag); };
SELECT
$visitor_def(Variant(5, "0", $vartype)),
$visitor_def(Just(Variant(True, "1", $vartype))),
diff --git a/yql/essentials/tests/sql/suites/join/cbo_4tables.cfg b/yql/essentials/tests/sql/suites/join/cbo_4tables.cfg
index a3f4d2aec3..a5101acc8c 100644
--- a/yql/essentials/tests/sql/suites/join/cbo_4tables.cfg
+++ b/yql/essentials/tests/sql/suites/join/cbo_4tables.cfg
@@ -1,4 +1,4 @@
-providers dummy
+providers yt
in InputA cbo_4tables_a.txt
in InputB cbo_4tables_b.txt
in InputC cbo_4tables_c.txt
diff --git a/yql/essentials/tests/sql/suites/produce/reduce_with_trivial_remaps.cfg b/yql/essentials/tests/sql/suites/produce/reduce_with_trivial_remaps.cfg
new file mode 100644
index 0000000000..1516335c81
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/reduce_with_trivial_remaps.cfg
@@ -0,0 +1,3 @@
+in Input1 sorted1.txt
+in Input2 sorted2.txt
+providers yt
diff --git a/yql/essentials/tests/sql/suites/produce/reduce_with_trivial_remaps.sql b/yql/essentials/tests/sql/suites/produce/reduce_with_trivial_remaps.sql
new file mode 100644
index 0000000000..65e70e8715
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/reduce_with_trivial_remaps.sql
@@ -0,0 +1,15 @@
+use plato;
+pragma warning("disable", "4510");
+
+$udf = ($_key, $stream) -> {
+ $init = ($item) -> (AsStruct(1u as cnt, $item as row));
+ $switch = ($_item, $_state) -> (false);
+ $update = ($item, $state) -> (AsStruct($state.cnt + 1u as cnt,
+ if(($item.value > $state.row.value) ?? false, $item, $state.row) as row));
+ $state = YQL::Collect(YQL::Condense1($stream, $init, $switch, $update));
+ return $state;
+};
+
+REDUCE CONCAT(Input1,Input2)
+presort subkey
+ON key USING $udf(TableRow());
diff --git a/yql/essentials/tests/sql/suites/produce/sorted2.txt b/yql/essentials/tests/sql/suites/produce/sorted2.txt
new file mode 100644
index 0000000000..b214aab0d9
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/sorted2.txt
@@ -0,0 +1,10 @@
+{"key"="023";"subkey"="3";"value"="aaa"};
+{"key"="037";"subkey"="5";"value"="ddd"};
+{"key"="075";"subkey"="1";"value"="abc"};
+{"key"="150";"subkey"="1";"value"="aaa"};
+{"key"="150";"subkey"="3";"value"="iii"};
+{"key"="150";"subkey"="8";"value"="zzz"};
+{"key"="200";"subkey"="7";"value"="qqq"};
+{"key"="527";"subkey"="4";"value"="bbb"};
+{"key"="761";"subkey"="6";"value"="ccc"};
+{"key"="911";"subkey"="2";"value"="kkk"};
diff --git a/yql/essentials/tests/sql/suites/produce/sorted2.txt.attr b/yql/essentials/tests/sql/suites/produce/sorted2.txt.attr
new file mode 100644
index 0000000000..5d3821a576
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/produce/sorted2.txt.attr
@@ -0,0 +1,11 @@
+{"_yql_row_spec"={
+ "Type"=["StructType";[
+ ["key";["DataType";"String"]];
+ ["subkey";["DataType";"String"]];
+ ["value";["DataType";"Utf8"]]
+ ]];
+ "SortDirections"=[1;1;1;];
+ "SortedBy"=["key";"subkey";"value";];
+ "SortedByTypes"=[["DataType";"String";];["DataType";"String";];["DataType";"Utf8";];];
+ "SortMembers"=["key";"subkey";"value";];
+}}